diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index df24588..981053d 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -91,7 +91,7 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment, try: logging.debug('Fetching segment {} from {}'.format(path, node)) uri = '{}/segments/{}/{}/{}/{}'.format(node, channel, quality, hour, missing_segment) - resp = requests.get(uri, channel=True, timeout=timeout) + resp = requests.get(uri, stream=True, timeout=timeout) resp.raise_for_status() diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 4b631cc..05d5cd7 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -77,12 +77,12 @@ def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft class StreamsManager(object): - """Keeps track of what streams are being downloaded and the workers doing so. + """Keeps track of what qualities are being downloaded and the workers doing so. Re-fetches master playlist when needed and starts new stream workers. This is all to ensure that broken or bad media playlist urls are refreshed in a timely manner. - The stream_workers map lists workers for each stream. Generally there should only be + The stream_workers map lists workers for each quality. Generally there should only be one, but during switchover there may be 2 - one old one continuing to (try to) operate while the second one confirms it's working. While trying to get a url working, it won't retry, it'll just ask the manager immediately to create yet another new worker then quit. @@ -94,11 +94,11 @@ class StreamsManager(object): Creation of a new stream worker may be triggered by: * An existing worker failing to refresh its playlist - * The master playlist finding a stream url it doesn't have already + * The master playlist finding a quality url it doesn't have already * A worker is older than MAX_WORKER_AGE The master playlist is only re-fetched as needed. Reasons to re-fetch: - * No url is known for at least one stream + * No url is known for at least one quality * Someone has requested it (eg. because the previous attempt failed, or a worker needs a new url) * A worker is older than MAX_WORKER_AGE """ @@ -111,8 +111,8 @@ class StreamsManager(object): self.channel = channel self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.base_dir = base_dir - self.stream_workers = {name: [] for name in qualities} # {stream name: [workers]} - self.latest_urls = {} # {stream name: (fetch time, url)} + self.stream_workers = {name: [] for name in qualities} # {quality name: [workers]} + self.latest_urls = {} # {quality name: (fetch time, url)} self.latest_urls_changed = gevent.event.Event() # set when latest_urls changes self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now self.stopping = gevent.event.Event() # set to tell main loop to stop @@ -120,7 +120,7 @@ class StreamsManager(object): def mark_working(self, worker): """Notify the manager that the given worker is up and running, and any older workers are safe to stop.""" - workers = self.stream_workers[worker.stream] + workers = self.stream_workers[worker.quality] if worker not in workers: self.logger.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers)) return @@ -129,21 +129,21 @@ class StreamsManager(object): old.stop() def trigger_new_worker(self, worker): - """Called when a worker decides a new worker for a stream is needed, eg. if it seems to be + """Called when a worker decides a new worker for a quality is needed, eg. if it seems to be failing. Causes a new worker with a fresh url to be created. If worker's url is the same as the latest url, blocks until a new url has been fetched. This only has effect if the worker is the current latest worker. """ - workers = self.stream_workers[worker.stream] + workers = self.stream_workers[worker.quality] if worker not in workers: self.logger.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers)) return if worker is not workers[-1]: - self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) + self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.quality)) return - self.logger.info("Starting new worker for {} by request of old worker".format(worker.stream)) - self.wait_for_new_url(worker.stream, worker.url) - self.start_worker(worker.stream) + self.logger.info("Starting new worker for {} by request of old worker".format(worker.quality)) + self.wait_for_new_url(worker.quality, worker.url) + self.start_worker(worker.quality) self.trigger_refresh() def trigger_refresh(self): @@ -160,10 +160,10 @@ class StreamsManager(object): self.latest_urls_changed.set() self.latest_urls_changed = gevent.event.Event() - def wait_for_new_url(self, stream, old_url): + def wait_for_new_url(self, quality, old_url): """Trigger urls to be re-fetched, and block until a different one is received.""" while True: - new_time, new_url = self.latest_urls[stream] + new_time, new_url = self.latest_urls[quality] if new_url != old_url: return self.logger.info("Triggering master playlist refresh as we need a new url") @@ -175,16 +175,16 @@ class StreamsManager(object): self.logger.info("Stopping") self.stopping.set() - def start_worker(self, stream): - """Start a new worker for given stream""" + def start_worker(self, quality): + """Start a new worker for given quality""" # it's possible for fetch_latest to call us after we've started stopping, # in that case do nothing. if self.stopping.is_set(): self.logger.info("Ignoring worker start as we are stopping") return - url_time, url = self.latest_urls[stream] - worker = StreamWorker(self, stream, url, url_time) - self.stream_workers[stream].append(worker) + url_time, url = self.latest_urls[quality] + worker = StreamWorker(self, quality, url, url_time) + self.stream_workers[quality].append(worker) gevent.spawn(worker.run) def fetch_latest(self): @@ -197,20 +197,20 @@ class StreamsManager(object): master_playlist = twitch.get_master_playlist(self.channel) new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys()) self.update_urls(fetch_time, new_urls) - for stream, workers in self.stream_workers.items(): + for quality, workers in self.stream_workers.items(): # warn and retry if the url is missing - if stream not in new_urls: - self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream)) + if quality not in new_urls: + self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(quality)) self.trigger_refresh() # is it newly found? - if not workers and stream in self.latest_urls: - self.logger.info("Starting new worker for {} as none exist".format(stream)) - self.start_worker(stream) + if not workers and quality in self.latest_urls: + self.logger.info("Starting new worker for {} as none exist".format(quality)) + self.start_worker(quality) latest_worker = workers[-1] # is the old worker too old? if latest_worker.age() > self.MAX_WORKER_AGE: - self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) - self.start_worker(stream) + self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(quality, latest_worker.age() / 3600.)) + self.start_worker(quality) except Exception as e: if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404: self.logger.info("Stream is not up. Retrying.") @@ -266,10 +266,10 @@ class StreamWorker(object): FETCH_RETRY_INTERVAL = 1 FETCH_POLL_INTERVAL = 2 - def __init__(self, manager, stream, url, url_time): + def __init__(self, manager, quality, url, url_time): self.manager = manager - self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(stream, id(self))) - self.stream = stream + self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) + self.quality = quality self.url = url self.url_time = url_time self.stopping = gevent.event.Event() # set to stop main loop @@ -359,7 +359,7 @@ class StreamWorker(object): self.session, self.manager.base_dir, self.manager.channel, - self.stream, + self.quality, segment, date, ) @@ -411,11 +411,11 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, stream, segment, date): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel - self.stream = stream + self.quality = quality self.segment = segment self.date = date self.prefix = self.make_path_prefix() @@ -466,7 +466,7 @@ class SegmentGetter(object): return os.path.join( self.base_dir, self.channel, - self.stream, + self.quality, self.date.strftime("%Y-%m-%dT%H"), "{date}-{duration}".format( date=self.date.strftime("%M:%S.%f"), @@ -541,16 +541,16 @@ class SegmentGetter(object): partial_path = self.make_path("partial", hash) self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) common.rename(temp_path, partial_path) - segments_downloaded.labels(partial="True", channel=self.channel, quality=self.stream).inc() + segments_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc() raise ex_type, ex, tb else: full_path = self.make_path("full", hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path) - segments_downloaded.labels(partial="False", channel=self.channel, quality=self.stream).inc() + segments_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc() # Prom doesn't provide a way to compare value to gauge's existing value, # we need to reach into internals - stat = latest_segment.labels(channel=self.channel, quality=self.stream) + stat = latest_segment.labels(channel=self.channel, quality=self.quality) timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 4814256..a05874e 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -160,7 +160,7 @@ def time_range_for_quality(channel, quality): def generate_master_playlist(channel): """Returns a HLS master playlist for the given channel. Takes optional params: - start, end: The time to begin and end the channel at. + start, end: The time to begin and end the stream at. See generate_media_playlist for details. """ start = common.dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None @@ -190,7 +190,7 @@ def generate_master_playlist(channel): def generate_media_playlist(channel, quality): """Returns a HLS media playlist for the given channel and quality. Takes optional params: - start, end: The time to begin and end the channel at. + start, end: The time to begin and end the stream at. Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS) and UTC. If not given, effectively means "infinity", ie. no start means any time ago, no end means any time in the future.