diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index 16d3f05..36dcdb5 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -25,7 +25,9 @@ }, // Twitch channels to capture. The first one will be used as the default channel in the editor. - channels:: ["desertbus"], + // Channels suffixed with a '!' are considered "important" and will be retried more aggressively + // and warned about if they're not currently streaming. + channels:: ["desertbus!", "db_chief", "db_high", "db_audio", "db_bus"], // Stream qualities to capture qualities:: ["source", "480p"], @@ -133,6 +135,9 @@ for key in std.objectFields($.db_args) ]), + // Cleaned up version of $.channels without importance markers + clean_channels:: [std.split(c, '!')[0] for c in $.channels], + // docker-compose version version: "3", @@ -174,7 +179,7 @@ [if $.enabled.backfiller then "backfiller"]: { image: "quay.io/ekimekim/wubloader-backfiller:%s" % $.image_tag, // Args for the backfiller: set channel and qualities - command: $.channels + + command: $.clean_channels + [ "--base-dir", "/mnt", "--qualities", std.join(",", $.qualities), @@ -229,7 +234,7 @@ if location != $.default_location ]), $.db_connect, - $.channels[0], // use first element as default channel + $.clean_channels[0], // use first element as default channel $.bustime_start, ] + if $.authentication then [] else ["--no-authentication"], // Mount the segments directory at /mnt @@ -267,7 +272,7 @@ [if $.enabled.segment_coverage then "segment_coverage"]: { image: "quay.io/ekimekim/wubloader-segment_coverage:%s" % $.image_tag, // Args for the segment_coverage - command: $.channels + + command: $.clean_channels + [ "--base-dir", "/mnt", "--qualities", std.join(",", $.qualities), diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 1f82aa6..6a94e7a 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -109,11 +109,14 @@ class StreamsManager(object): * A worker is older than MAX_WORKER_AGE """ - FETCH_MIN_INTERVAL = 5 + # Important streams are retried more aggressively when down + IMPORTANT_FETCH_MIN_INTERVAL = 5 + FETCH_MIN_INTERVAL = 20 + FETCH_TIMEOUTS = 5, 30 MAX_WORKER_AGE = 20*60*60 # 20 hours, twitch's media playlist links expire after 24 hours - def __init__(self, channel, base_dir, qualities): + def __init__(self, channel, base_dir, qualities, important=False): self.channel = channel self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.base_dir = base_dir @@ -122,6 +125,9 @@ class StreamsManager(object): self.latest_urls_changed = gevent.event.Event() # set when latest_urls changes self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now self.stopping = gevent.event.Event() # set to tell main loop to stop + self.important = important + if self.important: + self.FETCH_MIN_INTERVAL = self.IMPORTANT_FETCH_MIN_INTERVAL def mark_working(self, worker): """Notify the manager that the given worker is up and running, @@ -219,7 +225,9 @@ class StreamsManager(object): self.start_worker(quality) except Exception as e: if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404: - self.logger.info("Stream is not up. Retrying.") + # Log about important streams being down at info, but others at debug. + level = logging.INFO if self.important else logging.DEBUG + self.logger.log(level, "Stream is not up. Retrying.") self.trigger_refresh() else: self.logger.exception("Failed to fetch master playlist") @@ -563,12 +571,15 @@ class SegmentGetter(object): stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe -@argh.arg('channels', nargs="+", help="Twitch channels to watch") +@argh.arg('channels', nargs="+", help= + "Twitch channels to watch. Add a '!' suffix to indicate they're expected to be always up. " + "This affects retry interval, error reporting and monitoring." +) def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0): qualities = qualities.split(",") if qualities else [] managers = [ - StreamsManager(channel, base_dir, qualities) + StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!')) for channel in channels ]