diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index d927bc5..77f8671 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -48,11 +48,13 @@ def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout return logging.warning("Hit soft timeout {}s while {}".format(soft_timeout, description)) on_soft_timeout() - soft_worker = gevent.spawn_later(soft_timeout, dispatch_soft_timeout) + gevent.spawn_later(soft_timeout, dispatch_soft_timeout) error = TimedOutError("Timed out after {}s while {}".format(hard_timeout, description)) - with gevent.Timeout(hard_timeout, error): - yield - finished = True + try: + with gevent.Timeout(hard_timeout, error): + yield + finally: + finished = True def jitter(interval): @@ -123,7 +125,7 @@ class StreamsManager(object): if worker is not workers[-1]: logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) return - logging.info("Starting new worker for {} as old one is failing".format(worker.stream)) + logging.info("Starting new worker for {} by request of old worker".format(worker.stream)) self.wait_for_new_url(worker.stream, worker.url) self.start_worker(worker.stream) self.trigger_refresh() @@ -189,21 +191,25 @@ class StreamsManager(object): logging.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) self.start_worker(stream) except Exception as e: - logging.exception("Failed to fetch master playlist") - # don't retry on hard timeout as we already retried on soft timeout - if not isinstance(e, TimedOutError): + if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404: + logging.info("Stream is not up. Retrying.") self.trigger_refresh() + else: + logging.exception("Failed to fetch master playlist") + # don't retry on hard timeout as we already retried on soft timeout + if not isinstance(e, TimedOutError): + self.trigger_refresh() def run(self): self.trigger_refresh() # on first round, always go immediately while not self.stopping.is_set(): - times_to_max_age = [ + # clamp time to max age to non-negative, and default to 0 if no workers exist + times_to_max_age = max(0, min([ self.MAX_WORKER_AGE - workers[-1].age() for workers in self.stream_workers.values() if workers - ] - time_to_next_max_age = min(times_to_max_age) if times_to_max_age else None + ] + [0]) # wait until refresh triggered or next max age reached - logging.info("Next master playlist refresh in {} sec".format(time_to_next_max_age)) + logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) self.refresh_needed.wait(time_to_next_max_age) self.refresh_needed.clear() gevent.spawn(self.fetch_latest) @@ -235,7 +241,7 @@ class StreamWorker(object): """ FETCH_TIMEOUTS = 5, 90 - FETCH_RETRY_INTERVAL = 0.5 + FETCH_RETRY_INTERVAL = 1 FETCH_POLL_INTERVAL = 2 def __init__(self, manager, stream, url, url_time): @@ -247,6 +253,10 @@ class StreamWorker(object): self.getters = {} # map from url to SegmentGetter self.done = gevent.event.Event() # set when stopped and all getters are done + def __repr__(self): + return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.stream) + __str__ = __repr__ + def age(self): """Return age of our url""" return monotonic() - self.url_time @@ -256,11 +266,14 @@ class StreamWorker(object): self.stopping.set() def run(self): + logging.info("Worker {} starting".format(self)) try: self._run() except Exception: logging.exception("Worker {} failed".format(self)) self.trigger_new_worker() + else: + logging.info("Worker {} stopped".format(self)) finally: for getter in self.getters.values(): getter.done.wait() @@ -288,7 +301,7 @@ class StreamWorker(object): if first: logging.warning("{} failed on first fetch, stopping".format(self)) self.stop() - elif isinstance(e, requests.HTTPError) and e.response and e.response.status_code == 403: + elif isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 403: logging.warning("{} failed with 403 Forbidden, stopping".format(self)) self.stop() self.wait(self.FETCH_RETRY_INTERVAL) @@ -318,6 +331,16 @@ class StreamWorker(object): ): del self.getters[url] + # Stop if end-of-stream + if playlist.is_endlist: + logging.info("{} stopping due to end-of-playlist".format(self)) + # Trigger a new worker for when the stream comes back up. + # In the short term this will cause some thrashing until the master playlist + # starts returning 404, but it's the best way to avoid missing anything + # if the stream is only very briefly down. + self.trigger_new_worker() + self.stop() + # Wait until next poll self.wait(self.FETCH_POLL_INTERVAL)