diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 0a068a7..8783eb0 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -90,6 +90,7 @@ class StreamsManager(object): def __init__(self, channel, base_dir, qualities): self.channel = channel + self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.base_dir = base_dir self.stream_workers = {name: [] for name in qualities + ["source"]} # {stream name: [workers]} self.latest_urls = {} # {stream name: (fetch time, url)} @@ -102,7 +103,7 @@ class StreamsManager(object): and any older workers are safe to stop.""" workers = self.stream_workers[worker.stream] if worker not in workers: - logging.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers)) + self.logger.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers)) return # stop everything older than given worker for old in workers[:workers.index(worker)]: @@ -116,12 +117,12 @@ class StreamsManager(object): """ workers = self.stream_workers[worker.stream] if worker not in workers: - logging.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers)) + self.logger.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers)) return if worker is not workers[-1]: - logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) + self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) return - logging.info("Starting new worker for {} by request of old worker".format(worker.stream)) + self.logger.info("Starting new worker for {} by request of old worker".format(worker.stream)) self.wait_for_new_url(worker.stream, worker.url) self.start_worker(worker.stream) self.trigger_refresh() @@ -146,13 +147,13 @@ class StreamsManager(object): new_time, new_url = self.latest_urls[stream] if new_url != old_url: return - logging.info("Triggering master playlist refresh as we need a new url") + self.logger.info("Triggering master playlist refresh as we need a new url") self.trigger_refresh() self.latest_urls_changed.wait() def stop(self): """Shut down all workers and stop capturing stream.""" - logging.info("Stopping streams manager") + self.logger.info("Stopping") self.stopping.set() def start_worker(self, stream): @@ -160,7 +161,7 @@ class StreamsManager(object): # it's possible for fetch_latest to call us after we've started stopping, # in that case do nothing. if self.stopping.is_set(): - logging.info("Ignoring worker start as we are stopping") + self.logger.info("Ignoring worker start as we are stopping") return url_time, url = self.latest_urls[stream] worker = StreamWorker(self, stream, url, url_time) @@ -171,7 +172,7 @@ class StreamsManager(object): """Re-fetch master playlist and start new workers if needed""" try: # Fetch playlist. On soft timeout, retry. - logging.info("Fetching master playlist") + self.logger.info("Fetching master playlist") fetch_time = monotonic() with soft_hard_timeout("fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): master_playlist = twitch.get_master_playlist(self.channel) @@ -180,23 +181,23 @@ class StreamsManager(object): for stream, workers in self.stream_workers.items(): # warn and retry if the url is missing if stream not in new_urls: - logging.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream)) + self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream)) self.trigger_refresh() # is it newly found? if not workers and stream in self.latest_urls: - logging.info("Starting new worker for {} as none exist".format(stream)) + self.logger.info("Starting new worker for {} as none exist".format(stream)) self.start_worker(stream) latest_worker = workers[-1] # is the old worker too old? if latest_worker.age() > self.MAX_WORKER_AGE: - logging.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) + self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) self.start_worker(stream) except Exception as e: if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404: - logging.info("Stream is not up. Retrying.") + self.logger.info("Stream is not up. Retrying.") self.trigger_refresh() else: - logging.exception("Failed to fetch master playlist") + self.logger.exception("Failed to fetch master playlist") # don't retry on hard timeout as we already retried on soft timeout if not isinstance(e, TimedOutError): self.trigger_refresh() @@ -209,7 +210,7 @@ class StreamsManager(object): self.MAX_WORKER_AGE - workers[-1].age() for workers in self.stream_workers.values() if workers ] or [0])) - logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) + self.logger.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) # wait until refresh triggered, next max age reached, or we're stopping (whichever happens first) gevent.wait([self.stopping, self.refresh_needed], timeout=time_to_next_max_age, count=1) if not self.stopping.is_set(): @@ -217,7 +218,7 @@ class StreamsManager(object): gevent.spawn(self.fetch_latest) # wait min retry interval with jitter, unless we're stopping self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL)) - logging.info("Stopping workers") + self.logger.info("Stopping workers") for workers in self.stream_workers.values(): for worker in workers: worker.stop() @@ -248,6 +249,7 @@ class StreamWorker(object): def __init__(self, manager, stream, url, url_time): self.manager = manager + self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(stream, id(self))) self.stream = stream self.url = url self.url_time = url_time @@ -268,14 +270,14 @@ class StreamWorker(object): self.stopping.set() def run(self): - logging.info("Worker {} starting".format(self)) + self.logger.info("Worker starting") try: self._run() except Exception: - logging.exception("Worker {} failed".format(self)) + self.logger.exception("Worker failed") self.trigger_new_worker() else: - logging.info("Worker {} stopped".format(self)) + self.logger.info("Worker stopped") finally: for getter in self.getters.values(): getter.done.wait() @@ -293,18 +295,18 @@ class StreamWorker(object): first = True while not self.stopping.is_set(): - logging.debug("{} getting media playlist {}".format(self, self.url)) + self.logger.debug("Getting media playlist {}".format(self.url)) try: with soft_hard_timeout("getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): playlist = twitch.get_media_playlist(self.url) except Exception as e: - logging.warning("{} failed to fetch media playlist {}".format(self, self.url), exc_info=True) + self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() if first: - logging.warning("{} failed on first fetch, stopping".format(self)) + self.logger.warning("Failed on first fetch, stopping") self.stop() elif isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 403: - logging.warning("{} failed with 403 Forbidden, stopping".format(self)) + self.logger.warning("Failed with 403 Forbidden, stopping") self.stop() self.wait(self.FETCH_RETRY_INTERVAL) continue @@ -320,7 +322,7 @@ class StreamWorker(object): if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") - self.getters[segment.uri] = SegmentGetter(self.manager.base_dir, self.manager.channel, self.stream, segment, date) + self.getters[segment.uri] = SegmentGetter(self.logger, self.manager.base_dir, self.manager.channel, self.stream, segment, date) gevent.spawn(self.getters[segment.uri].run) if date is not None: date += datetime.timedelta(seconds=segment.duration) @@ -335,7 +337,7 @@ class StreamWorker(object): # Stop if end-of-stream if playlist.is_endlist: - logging.info("{} stopping due to end-of-playlist".format(self)) + self.logger.info("Stopping due to end-of-playlist") # Trigger a new worker for when the stream comes back up. # In the short term this will cause some thrashing until the master playlist # starts returning 404, but it's the best way to avoid missing anything @@ -364,7 +366,8 @@ class SegmentGetter(object): FETCH_HEADERS_TIMEOUTS = 5, 30 FETCH_FULL_TIMEOUTS = 15, 240 - def __init__(self, base_dir, channel, stream, segment, date): + def __init__(self, parent_logger, base_dir, channel, stream, segment, date): + self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.stream = stream @@ -380,7 +383,7 @@ class SegmentGetter(object): try: self._run() except Exception: - logging.exception("Failure in SegmentGetter {}".format(self.segment)) + self.logger.exception("Unexpected exception while getting segment {}, retrying".format(self.segment)) gevent.sleep(common.jitter(self.UNEXPECTED_FAILURE_RETRY)) else: break @@ -388,6 +391,7 @@ class SegmentGetter(object): self.done.set() def _run(self): + self.logger.debug("Getter started") while not self.exists(): self.retry = gevent.event.Event() worker = gevent.spawn(self.get_segment) @@ -398,6 +402,7 @@ class SegmentGetter(object): break # if retry not set, wait for FETCH_RETRY first self.retry.wait(common.jitter(self.FETCH_RETRY)) + self.logger.debug("Getter is done") def make_path_prefix(self): """Generate leading part of filepath which doesn't change with the hash.""" @@ -437,8 +442,6 @@ class SegmentGetter(object): def get_segment(self): - # save current value of self.retry so we can't set any later instance - # after a retry for this round has already occurred. try: self._get_segment() except Exception: @@ -448,13 +451,16 @@ class SegmentGetter(object): return True def _get_segment(self): + # save current value of self.retry so we can't set any later instance + # after a retry for this round has already occurred. + retry = self.retry temp_path = self.make_path("temp") hash = hashlib.sha256() file_created = False try: - logging.debug("Getting segment {}".format(self.segment)) - with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, self.retry.set): - with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, self.retry.set): + logging.debug("Downloading segment {} to {}".format(self.segment, temp_path)) + with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): + with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): resp = requests.get(self.segment.uri, stream=True) if resp.status_code == 403: logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) @@ -474,12 +480,14 @@ class SegmentGetter(object): # another exception in the interim ex_type, ex, tb = sys.exc_info() if file_created: - common.rename(temp_path, self.make_path("partial", hash)) + partial_path = self.make_path("partial", hash) + self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) + common.rename(temp_path, partial_path) raise ex_type, ex, tb else: - common.rename(temp_path, self.make_path("full", hash)) - - + full_path = self.make_path("full", hash) + self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) + common.rename(temp_path, full_path) def main(channel, base_dir=".", qualities=""): diff --git a/downloader/downloader/twitch.py b/downloader/downloader/twitch.py index 92358bb..ba37d6c 100644 --- a/downloader/downloader/twitch.py +++ b/downloader/downloader/twitch.py @@ -7,6 +7,9 @@ import requests import hls_playlist +logger = logging.getLogger(__name__) + + def get_master_playlist(channel): """Get the master playlist for given channel from twitch""" resp = requests.get( @@ -62,10 +65,10 @@ def get_media_playlist_uris(master_playlist, target_qualities): def variant_name(variant): names = set(media.name for media in variant.media if media.type == "VIDEO" and media.name) if not names: - logging.warning("Variant {} has no named video renditions, can't determine name".format(variant)) + logger.warning("Variant {} has no named video renditions, can't determine name".format(variant)) return None if len(names) > 1: - logging.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant)) + logger.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant)) return list(names)[0] if not master_playlist.playlists: @@ -73,7 +76,7 @@ def get_media_playlist_uris(master_playlist, target_qualities): for variant in master_playlist.playlists: if any(media.uri for media in variant.media): - logging.warning("Variant has a rendition with its own URI: {}".format(variant)) + logger.warning("Variant has a rendition with its own URI: {}".format(variant)) by_name = {variant_name(variant): variant for variant in master_playlist.playlists}