diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index b7fa10a..fddce06 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -5,7 +5,6 @@ import hashlib import logging import os import signal -import sys import uuid from base64 import b64encode from contextlib import contextmanager @@ -55,7 +54,7 @@ class TimedOutError(Exception): @contextmanager -def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft_timeout): +def soft_hard_timeout(logger, description, timeouts, on_soft_timeout): """Context manager that wraps a piece of code in a pair of timeouts, a "soft" timeout and a "hard" one. If the block does not complete before the soft timeout, the given on_soft_timeout() function is called in a new greenlet. @@ -70,6 +69,7 @@ def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft retrying, even as the original code continues to hold out in hope the call eventually succeeds. """ + soft_timeout, hard_timeout = timeouts # Finished is set after we're done to flag to the pending soft timeout callback # that it shouldn't run. finished = False @@ -214,7 +214,7 @@ class StreamsManager(object): fetch_time = monotonic() with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): master_playlist = twitch.get_master_playlist(self.channel) - new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys()) + new_urls = twitch.get_media_playlist_uris(master_playlist, list(self.stream_workers.keys())) self.update_urls(fetch_time, new_urls) for quality, workers in self.stream_workers.items(): # warn and retry if the url is missing @@ -260,9 +260,11 @@ class StreamsManager(object): # wait min retry interval with jitter, unless we're stopping self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL)) self.logger.info("Stopping workers") - for workers in self.stream_workers.values(): + stream_workers = list(self.stream_workers.values()) + for workers in stream_workers: for worker in workers: worker.stop() + for workers in stream_workers: for worker in workers: worker.done.wait() @@ -510,7 +512,7 @@ class SegmentGetter(object): partial: Segment is incomplete. Hash is included. temp: Segment has not been downloaded yet. A random uuid is added. """ - arg = str(uuid.uuid4()) if type == "temp" else b64encode(hash.digest(), "-_").rstrip("=") + arg = str(uuid.uuid4()) if type == "temp" else b64encode(hash.digest(), b"-_").encode().rstrip("=") return "{}-{}-{}.ts".format(self.prefix, type, arg) def exists(self): @@ -519,10 +521,10 @@ class SegmentGetter(object): try: candidates = os.listdir(dirname) except OSError as e: - # on ENOENT (doesn't exist), return [] + # on ENOENT (doesn't exist), return false if e.errno != errno.ENOENT: raise - return [] + return False full_prefix = "{}-full".format(self.prefix) return any(candidate.startswith(full_prefix) for candidate in candidates) @@ -563,17 +565,14 @@ class SegmentGetter(object): for chunk in resp.iter_content(8192): f.write(chunk) hash.update(chunk) - except Exception: - # save original exception so we can re-raise it later even though we may be handling - # another exception in the interim - ex_type, ex, tb = sys.exc_info() + except Exception as e: if file_created: partial_path = self.make_path("partial", hash) self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) common.rename(temp_path, partial_path) segments_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc() segment_duration_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc(self.segment.duration) - raise ex_type, ex, tb + raise e else: request_duration = monotonic() - start_time segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect"