downloader: Bug fixes and improvements

* Fix bug where soft timeout is not cancelled if an exception occurs
* Various logging tweaks
* Prevent master playlist wait time from going negative
* Stop gracefully if stream worker detects end of stream
* Don't treat master playlist 404 as an error, it just means the stream isn't up
pull/1/head
Mike Lang 6 years ago
parent 6e0dcd5e22
commit 6377db2aa2

@ -48,10 +48,12 @@ def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout
return return
logging.warning("Hit soft timeout {}s while {}".format(soft_timeout, description)) logging.warning("Hit soft timeout {}s while {}".format(soft_timeout, description))
on_soft_timeout() on_soft_timeout()
soft_worker = gevent.spawn_later(soft_timeout, dispatch_soft_timeout) gevent.spawn_later(soft_timeout, dispatch_soft_timeout)
error = TimedOutError("Timed out after {}s while {}".format(hard_timeout, description)) error = TimedOutError("Timed out after {}s while {}".format(hard_timeout, description))
try:
with gevent.Timeout(hard_timeout, error): with gevent.Timeout(hard_timeout, error):
yield yield
finally:
finished = True finished = True
@ -123,7 +125,7 @@ class StreamsManager(object):
if worker is not workers[-1]: if worker is not workers[-1]:
logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream))
return return
logging.info("Starting new worker for {} as old one is failing".format(worker.stream)) logging.info("Starting new worker for {} by request of old worker".format(worker.stream))
self.wait_for_new_url(worker.stream, worker.url) self.wait_for_new_url(worker.stream, worker.url)
self.start_worker(worker.stream) self.start_worker(worker.stream)
self.trigger_refresh() self.trigger_refresh()
@ -189,6 +191,10 @@ class StreamsManager(object):
logging.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) logging.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.))
self.start_worker(stream) self.start_worker(stream)
except Exception as e: except Exception as e:
if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404:
logging.info("Stream is not up. Retrying.")
self.trigger_refresh()
else:
logging.exception("Failed to fetch master playlist") logging.exception("Failed to fetch master playlist")
# don't retry on hard timeout as we already retried on soft timeout # don't retry on hard timeout as we already retried on soft timeout
if not isinstance(e, TimedOutError): if not isinstance(e, TimedOutError):
@ -197,13 +203,13 @@ class StreamsManager(object):
def run(self): def run(self):
self.trigger_refresh() # on first round, always go immediately self.trigger_refresh() # on first round, always go immediately
while not self.stopping.is_set(): while not self.stopping.is_set():
times_to_max_age = [ # clamp time to max age to non-negative, and default to 0 if no workers exist
times_to_max_age = max(0, min([
self.MAX_WORKER_AGE - workers[-1].age() self.MAX_WORKER_AGE - workers[-1].age()
for workers in self.stream_workers.values() if workers for workers in self.stream_workers.values() if workers
] ] + [0])
time_to_next_max_age = min(times_to_max_age) if times_to_max_age else None
# wait until refresh triggered or next max age reached # wait until refresh triggered or next max age reached
logging.info("Next master playlist refresh in {} sec".format(time_to_next_max_age)) logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
self.refresh_needed.wait(time_to_next_max_age) self.refresh_needed.wait(time_to_next_max_age)
self.refresh_needed.clear() self.refresh_needed.clear()
gevent.spawn(self.fetch_latest) gevent.spawn(self.fetch_latest)
@ -235,7 +241,7 @@ class StreamWorker(object):
""" """
FETCH_TIMEOUTS = 5, 90 FETCH_TIMEOUTS = 5, 90
FETCH_RETRY_INTERVAL = 0.5 FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2 FETCH_POLL_INTERVAL = 2
def __init__(self, manager, stream, url, url_time): def __init__(self, manager, stream, url, url_time):
@ -247,6 +253,10 @@ class StreamWorker(object):
self.getters = {} # map from url to SegmentGetter self.getters = {} # map from url to SegmentGetter
self.done = gevent.event.Event() # set when stopped and all getters are done self.done = gevent.event.Event() # set when stopped and all getters are done
def __repr__(self):
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.stream)
__str__ = __repr__
def age(self): def age(self):
"""Return age of our url""" """Return age of our url"""
return monotonic() - self.url_time return monotonic() - self.url_time
@ -256,11 +266,14 @@ class StreamWorker(object):
self.stopping.set() self.stopping.set()
def run(self): def run(self):
logging.info("Worker {} starting".format(self))
try: try:
self._run() self._run()
except Exception: except Exception:
logging.exception("Worker {} failed".format(self)) logging.exception("Worker {} failed".format(self))
self.trigger_new_worker() self.trigger_new_worker()
else:
logging.info("Worker {} stopped".format(self))
finally: finally:
for getter in self.getters.values(): for getter in self.getters.values():
getter.done.wait() getter.done.wait()
@ -288,7 +301,7 @@ class StreamWorker(object):
if first: if first:
logging.warning("{} failed on first fetch, stopping".format(self)) logging.warning("{} failed on first fetch, stopping".format(self))
self.stop() self.stop()
elif isinstance(e, requests.HTTPError) and e.response and e.response.status_code == 403: elif isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 403:
logging.warning("{} failed with 403 Forbidden, stopping".format(self)) logging.warning("{} failed with 403 Forbidden, stopping".format(self))
self.stop() self.stop()
self.wait(self.FETCH_RETRY_INTERVAL) self.wait(self.FETCH_RETRY_INTERVAL)
@ -318,6 +331,16 @@ class StreamWorker(object):
): ):
del self.getters[url] del self.getters[url]
# Stop if end-of-stream
if playlist.is_endlist:
logging.info("{} stopping due to end-of-playlist".format(self))
# Trigger a new worker for when the stream comes back up.
# In the short term this will cause some thrashing until the master playlist
# starts returning 404, but it's the best way to avoid missing anything
# if the stream is only very briefly down.
self.trigger_new_worker()
self.stop()
# Wait until next poll # Wait until next poll
self.wait(self.FETCH_POLL_INTERVAL) self.wait(self.FETCH_POLL_INTERVAL)

Loading…
Cancel
Save