Merge pull request #137 from ekimekim/mike/downloader/less-important-channels

downloader: Create concept of an "important" channel
pull/144/head
Mike Lang 5 years ago committed by GitHub
commit 9e6cd71026
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -25,7 +25,9 @@
}, },
// Twitch channels to capture. The first one will be used as the default channel in the editor. // Twitch channels to capture. The first one will be used as the default channel in the editor.
channels:: ["desertbus"], // Channels suffixed with a '!' are considered "important" and will be retried more aggressively
// and warned about if they're not currently streaming.
channels:: ["desertbus!", "db_chief", "db_high", "db_audio", "db_bus"],
// Stream qualities to capture // Stream qualities to capture
qualities:: ["source", "480p"], qualities:: ["source", "480p"],
@ -133,6 +135,9 @@
for key in std.objectFields($.db_args) for key in std.objectFields($.db_args)
]), ]),
// Cleaned up version of $.channels without importance markers
clean_channels:: [std.split(c, '!')[0] for c in $.channels],
// docker-compose version // docker-compose version
version: "3", version: "3",
@ -174,7 +179,7 @@
[if $.enabled.backfiller then "backfiller"]: { [if $.enabled.backfiller then "backfiller"]: {
image: "quay.io/ekimekim/wubloader-backfiller:%s" % $.image_tag, image: "quay.io/ekimekim/wubloader-backfiller:%s" % $.image_tag,
// Args for the backfiller: set channel and qualities // Args for the backfiller: set channel and qualities
command: $.channels + command: $.clean_channels +
[ [
"--base-dir", "/mnt", "--base-dir", "/mnt",
"--qualities", std.join(",", $.qualities), "--qualities", std.join(",", $.qualities),
@ -229,7 +234,7 @@
if location != $.default_location if location != $.default_location
]), ]),
$.db_connect, $.db_connect,
$.channels[0], // use first element as default channel $.clean_channels[0], // use first element as default channel
$.bustime_start, $.bustime_start,
] + if $.authentication then [] else ["--no-authentication"], ] + if $.authentication then [] else ["--no-authentication"],
// Mount the segments directory at /mnt // Mount the segments directory at /mnt
@ -267,7 +272,7 @@
[if $.enabled.segment_coverage then "segment_coverage"]: { [if $.enabled.segment_coverage then "segment_coverage"]: {
image: "quay.io/ekimekim/wubloader-segment_coverage:%s" % $.image_tag, image: "quay.io/ekimekim/wubloader-segment_coverage:%s" % $.image_tag,
// Args for the segment_coverage // Args for the segment_coverage
command: $.channels + command: $.clean_channels +
[ [
"--base-dir", "/mnt", "--base-dir", "/mnt",
"--qualities", std.join(",", $.qualities), "--qualities", std.join(",", $.qualities),

@ -109,11 +109,14 @@ class StreamsManager(object):
* A worker is older than MAX_WORKER_AGE * A worker is older than MAX_WORKER_AGE
""" """
FETCH_MIN_INTERVAL = 5 # Important streams are retried more aggressively when down
IMPORTANT_FETCH_MIN_INTERVAL = 5
FETCH_MIN_INTERVAL = 20
FETCH_TIMEOUTS = 5, 30 FETCH_TIMEOUTS = 5, 30
MAX_WORKER_AGE = 20*60*60 # 20 hours, twitch's media playlist links expire after 24 hours MAX_WORKER_AGE = 20*60*60 # 20 hours, twitch's media playlist links expire after 24 hours
def __init__(self, channel, base_dir, qualities): def __init__(self, channel, base_dir, qualities, important=False):
self.channel = channel self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.logger = logging.getLogger("StreamsManager({})".format(channel))
self.base_dir = base_dir self.base_dir = base_dir
@ -122,6 +125,9 @@ class StreamsManager(object):
self.latest_urls_changed = gevent.event.Event() # set when latest_urls changes self.latest_urls_changed = gevent.event.Event() # set when latest_urls changes
self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now
self.stopping = gevent.event.Event() # set to tell main loop to stop self.stopping = gevent.event.Event() # set to tell main loop to stop
self.important = important
if self.important:
self.FETCH_MIN_INTERVAL = self.IMPORTANT_FETCH_MIN_INTERVAL
def mark_working(self, worker): def mark_working(self, worker):
"""Notify the manager that the given worker is up and running, """Notify the manager that the given worker is up and running,
@ -219,7 +225,9 @@ class StreamsManager(object):
self.start_worker(quality) self.start_worker(quality)
except Exception as e: except Exception as e:
if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404: if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404:
self.logger.info("Stream is not up. Retrying.") # Log about important streams being down at info, but others at debug.
level = logging.INFO if self.important else logging.DEBUG
self.logger.log(level, "Stream is not up. Retrying.")
self.trigger_refresh() self.trigger_refresh()
else: else:
self.logger.exception("Failed to fetch master playlist") self.logger.exception("Failed to fetch master playlist")
@ -563,12 +571,15 @@ class SegmentGetter(object):
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe
@argh.arg('channels', nargs="+", help="Twitch channels to watch") @argh.arg('channels', nargs="+", help=
"Twitch channels to watch. Add a '!' suffix to indicate they're expected to be always up. "
"This affects retry interval, error reporting and monitoring."
)
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0): def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0):
qualities = qualities.split(",") if qualities else [] qualities = qualities.split(",") if qualities else []
managers = [ managers = [
StreamsManager(channel, base_dir, qualities) StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!'))
for channel in channels for channel in channels
] ]

Loading…
Cancel
Save