diff --git a/downloader/downloader/__main__.py b/downloader/downloader/__main__.py index cbb0561..45d813e 100644 --- a/downloader/downloader/__main__.py +++ b/downloader/downloader/__main__.py @@ -8,5 +8,7 @@ import argh from downloader.main import main -logging.basicConfig(level=logging.INFO) +LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" + +logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) argh.dispatch_command(main) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 2328208..d927bc5 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -5,6 +5,7 @@ import hashlib import logging import os import random +import sys import uuid from base64 import b64encode from contextlib import contextmanager @@ -23,7 +24,7 @@ class TimedOutError(Exception): @contextmanager -def soft_hard_timeout(description, soft_timeout, hard_timeout, on_soft_timeout): +def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout): """Context manager that wraps a piece of code in a pair of timeouts, a "soft" timeout and a "hard" one. If the block does not complete before the soft timeout, the given on_soft_timeout() function is called in a new greenlet. @@ -31,20 +32,27 @@ def soft_hard_timeout(description, soft_timeout, hard_timeout, on_soft_timeout): Description is a short string, used for logging and error messages. + Note that the timeouts are given as a tuple pair for ease of use, + as it's generally easier to pass them around as a pair. + A typical use-case is for the soft timeout to trigger some other code to begin retrying, even as the original code continues to hold out in hope the call eventually succeeds. """ + # Finished is set after we're done to flag to the pending soft timeout callback + # that it shouldn't run. + finished = False def dispatch_soft_timeout(): + if finished: + # We finished before soft timeout was hit + return logging.warning("Hit soft timeout {}s while {}".format(soft_timeout, description)) on_soft_timeout() soft_worker = gevent.spawn_later(soft_timeout, dispatch_soft_timeout) error = TimedOutError("Timed out after {}s while {}".format(hard_timeout, description)) with gevent.Timeout(hard_timeout, error): yield - # We didn't hard time out, if we also didn't soft timeout then cancel pending greenlet - if not soft_worker.started: - soft_worker.kill() + finished = True def jitter(interval): @@ -79,8 +87,7 @@ class StreamsManager(object): """ FETCH_MIN_INTERVAL = 5 - FETCH_SOFT_TIMEOUT = 5 - FETCH_HARD_TIMEOUT = 30 + FETCH_TIMEOUTS = 5, 30 MAX_WORKER_AGE = 20*60*60 # 20 hours, twitch's media playlist links expire after 24 hours def __init__(self, channel, base_dir, qualities): @@ -116,7 +123,7 @@ class StreamsManager(object): if worker is not workers[-1]: logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) return - logging.info("Starting new worker for {} as old one is failing", worker.stream) + logging.info("Starting new worker for {} as old one is failing".format(worker.stream)) self.wait_for_new_url(worker.stream, worker.url) self.start_worker(worker.stream) self.trigger_refresh() @@ -138,8 +145,10 @@ class StreamsManager(object): def wait_for_new_url(self, stream, old_url): """Trigger urls to be re-fetched, and block until a different one is received.""" while True: - if self.latest_urls[stream] != old_url: + new_time, new_url = self.latest_urls[stream] + if new_url != old_url: return + logging.info("Triggering master playlist refresh as we need a new url") self.trigger_refresh() self.latest_urls_changed.wait() @@ -161,9 +170,9 @@ class StreamsManager(object): # Fetch playlist. On soft timeout, retry. logging.info("Fetching master playlist") fetch_time = monotonic() - with soft_hard_timeout(self.FETCH_SOFT_TIMEOUT, self.FETCH_HARD_TIMEOUT, self.trigger_refresh): + with soft_hard_timeout("fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): master_playlist = twitch.get_master_playlist(self.channel) - new_urls = twitch.get_media_playlist_uris(master_playlist, self.qualities) + new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys()) self.update_urls(fetch_time, new_urls) for stream, workers in self.stream_workers.items(): # warn and retry if the url is missing @@ -186,6 +195,7 @@ class StreamsManager(object): self.trigger_refresh() def run(self): + self.trigger_refresh() # on first round, always go immediately while not self.stopping.is_set(): times_to_max_age = [ self.MAX_WORKER_AGE - workers[-1].age() @@ -193,7 +203,9 @@ class StreamsManager(object): ] time_to_next_max_age = min(times_to_max_age) if times_to_max_age else None # wait until refresh triggered or next max age reached + logging.info("Next master playlist refresh in {} sec".format(time_to_next_max_age)) self.refresh_needed.wait(time_to_next_max_age) + self.refresh_needed.clear() gevent.spawn(self.fetch_latest) # wait min retry interval with jitter, unless we're stopping self.stopping.wait(jitter(self.FETCH_MIN_INTERVAL)) @@ -222,8 +234,7 @@ class StreamWorker(object): the url has expired. """ - FETCH_SOFT_TIMEOUT = 5 - FETCH_HARD_TIMEOUT = 90 + FETCH_TIMEOUTS = 5, 90 FETCH_RETRY_INTERVAL = 0.5 FETCH_POLL_INTERVAL = 2 @@ -254,7 +265,7 @@ class StreamWorker(object): for getter in self.getters.values(): getter.done.wait() self.done.set() - self.manager[self.stream].remove(self) + self.manager.stream_workers[self.stream].remove(self) def trigger_new_worker(self): self.manager.trigger_new_worker(self) @@ -269,16 +280,16 @@ class StreamWorker(object): logging.debug("{} getting media playlist {}".format(self, self.url)) try: - with soft_hard_timeout(self.FETCH_SOFT_TIMEOUT, self.FETCH_HARD_TIMEOUT, self.trigger_new_worker): + with soft_hard_timeout("getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): playlist = twitch.get_media_playlist(self.url) except Exception as e: logging.warning("{} failed to fetch media playlist {}".format(self, self.url), exc_info=True) self.trigger_new_worker() if first: - logging.debug("{} failed on first fetch, stopping".format(self)) + logging.warning("{} failed on first fetch, stopping".format(self)) self.stop() elif isinstance(e, requests.HTTPError) and e.response and e.response.status_code == 403: - logging.debug("{} failed with 403 Forbidden, stopping".format(self)) + logging.warning("{} failed with 403 Forbidden, stopping".format(self)) self.stop() self.wait(self.FETCH_RETRY_INTERVAL) continue @@ -290,7 +301,7 @@ class StreamWorker(object): date = None # tracks date in case some segment doesn't include it for segment in playlist.segments: if segment.date: - date = dateutil.parser.parse(date) + date = dateutil.parser.parse(segment.date) if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") @@ -323,12 +334,14 @@ class SegmentGetter(object): """ UNEXPECTED_FAILURE_RETRY = 0.5 FETCH_RETRY = 2 - FETCH_HEADERS_SOFT_TIMEOUT = 5 - FETCH_HEADERS_HARD_TIMEOUT = 30 - FETCH_FULL_SOFT_TIMEOUT = 15 - FETCH_FULL_HARD_TIMEOUT = 240 + # Headers timeout is timeout before getting the start of a response, + # full timeout is for the entire download and stream to disk. + FETCH_HEADERS_TIMEOUTS = 5, 30 + FETCH_FULL_TIMEOUTS = 15, 240 def __init__(self, base_dir, stream, segment, date): + self.base_dir = base_dir + self.stream = stream self.segment = segment self.date = date self.prefix = self.make_path_prefix() @@ -425,8 +438,9 @@ class SegmentGetter(object): hash = hashlib.sha256() file_created = False try: - with soft_hard_timeout(self.FETCH_FULL_SOFT_TIMEOUT, self.FETCH_FULL_HARD_TIMEOUT, self.retry.set): - with soft_hard_timeout(self.FETCH_HEADERS_SOFT_TIMEOUT, self.FETCH_HEADERS_HARD_TIMEOUT, self.retry.set): + logging.debug("Getting segment {}".format(self.segment)) + with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, self.retry.set): + with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, self.retry.set): resp = requests.get(self.segment.uri, stream=True) if resp.status_code == 403: logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) @@ -442,11 +456,26 @@ class SegmentGetter(object): f.write(chunk) hash.update(chunk) except Exception: + # save original exception so we can re-raise it later even though we may be handling + # another exception in the interim + ex_type, ex, tb = sys.exc_info() if file_created: - os.rename(temp_path, self.make_path("partial", hash)) - raise + self.rename(temp_path, self.make_path("partial", hash)) + raise ex_type, ex, tb else: - os.rename(temp_path, self.make_path("full", hash)) + self.rename(temp_path, self.make_path("full", hash)) + + def rename(self, old, new): + """Atomic rename that succeeds if the target already exists, since we're naming everything + by hash anyway, so if the filepath already exists the file itself is already there. + In this case, we delete the source file. + """ + try: + os.rename(old, new) + except OSError as e: + if e.errno != errno.EEXIST: + raise + os.remove(old) def main(channel, base_dir=".", qualities=""):