downloader: Fix bugs and missing bits in initial implementation

* Set a reasonable log format
* Make soft timeouts not always fire
* Change soft_hard_timeout signature slightly for ease-of-use
* Make renames not fail if file already exists
* Misc typos
pull/1/head
Mike Lang 6 years ago
parent f193bd0f54
commit 6e0dcd5e22

@ -8,5 +8,7 @@ import argh
from downloader.main import main
logging.basicConfig(level=logging.INFO)
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -5,6 +5,7 @@ import hashlib
import logging
import os
import random
import sys
import uuid
from base64 import b64encode
from contextlib import contextmanager
@ -23,7 +24,7 @@ class TimedOutError(Exception):
@contextmanager
def soft_hard_timeout(description, soft_timeout, hard_timeout, on_soft_timeout):
def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout):
"""Context manager that wraps a piece of code in a pair of timeouts,
a "soft" timeout and a "hard" one. If the block does not complete before
the soft timeout, the given on_soft_timeout() function is called in a new greenlet.
@ -31,20 +32,27 @@ def soft_hard_timeout(description, soft_timeout, hard_timeout, on_soft_timeout):
Description is a short string, used for logging and error messages.
Note that the timeouts are given as a tuple pair for ease of use,
as it's generally easier to pass them around as a pair.
A typical use-case is for the soft timeout to trigger some other code to begin
retrying, even as the original code continues to hold out in hope the call eventually
succeeds.
"""
# Finished is set after we're done to flag to the pending soft timeout callback
# that it shouldn't run.
finished = False
def dispatch_soft_timeout():
if finished:
# We finished before soft timeout was hit
return
logging.warning("Hit soft timeout {}s while {}".format(soft_timeout, description))
on_soft_timeout()
soft_worker = gevent.spawn_later(soft_timeout, dispatch_soft_timeout)
error = TimedOutError("Timed out after {}s while {}".format(hard_timeout, description))
with gevent.Timeout(hard_timeout, error):
yield
# We didn't hard time out, if we also didn't soft timeout then cancel pending greenlet
if not soft_worker.started:
soft_worker.kill()
finished = True
def jitter(interval):
@ -79,8 +87,7 @@ class StreamsManager(object):
"""
FETCH_MIN_INTERVAL = 5
FETCH_SOFT_TIMEOUT = 5
FETCH_HARD_TIMEOUT = 30
FETCH_TIMEOUTS = 5, 30
MAX_WORKER_AGE = 20*60*60 # 20 hours, twitch's media playlist links expire after 24 hours
def __init__(self, channel, base_dir, qualities):
@ -116,7 +123,7 @@ class StreamsManager(object):
if worker is not workers[-1]:
logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream))
return
logging.info("Starting new worker for {} as old one is failing", worker.stream)
logging.info("Starting new worker for {} as old one is failing".format(worker.stream))
self.wait_for_new_url(worker.stream, worker.url)
self.start_worker(worker.stream)
self.trigger_refresh()
@ -138,8 +145,10 @@ class StreamsManager(object):
def wait_for_new_url(self, stream, old_url):
"""Trigger urls to be re-fetched, and block until a different one is received."""
while True:
if self.latest_urls[stream] != old_url:
new_time, new_url = self.latest_urls[stream]
if new_url != old_url:
return
logging.info("Triggering master playlist refresh as we need a new url")
self.trigger_refresh()
self.latest_urls_changed.wait()
@ -161,9 +170,9 @@ class StreamsManager(object):
# Fetch playlist. On soft timeout, retry.
logging.info("Fetching master playlist")
fetch_time = monotonic()
with soft_hard_timeout(self.FETCH_SOFT_TIMEOUT, self.FETCH_HARD_TIMEOUT, self.trigger_refresh):
with soft_hard_timeout("fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh):
master_playlist = twitch.get_master_playlist(self.channel)
new_urls = twitch.get_media_playlist_uris(master_playlist, self.qualities)
new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys())
self.update_urls(fetch_time, new_urls)
for stream, workers in self.stream_workers.items():
# warn and retry if the url is missing
@ -186,6 +195,7 @@ class StreamsManager(object):
self.trigger_refresh()
def run(self):
self.trigger_refresh() # on first round, always go immediately
while not self.stopping.is_set():
times_to_max_age = [
self.MAX_WORKER_AGE - workers[-1].age()
@ -193,7 +203,9 @@ class StreamsManager(object):
]
time_to_next_max_age = min(times_to_max_age) if times_to_max_age else None
# wait until refresh triggered or next max age reached
logging.info("Next master playlist refresh in {} sec".format(time_to_next_max_age))
self.refresh_needed.wait(time_to_next_max_age)
self.refresh_needed.clear()
gevent.spawn(self.fetch_latest)
# wait min retry interval with jitter, unless we're stopping
self.stopping.wait(jitter(self.FETCH_MIN_INTERVAL))
@ -222,8 +234,7 @@ class StreamWorker(object):
the url has expired.
"""
FETCH_SOFT_TIMEOUT = 5
FETCH_HARD_TIMEOUT = 90
FETCH_TIMEOUTS = 5, 90
FETCH_RETRY_INTERVAL = 0.5
FETCH_POLL_INTERVAL = 2
@ -254,7 +265,7 @@ class StreamWorker(object):
for getter in self.getters.values():
getter.done.wait()
self.done.set()
self.manager[self.stream].remove(self)
self.manager.stream_workers[self.stream].remove(self)
def trigger_new_worker(self):
self.manager.trigger_new_worker(self)
@ -269,16 +280,16 @@ class StreamWorker(object):
logging.debug("{} getting media playlist {}".format(self, self.url))
try:
with soft_hard_timeout(self.FETCH_SOFT_TIMEOUT, self.FETCH_HARD_TIMEOUT, self.trigger_new_worker):
with soft_hard_timeout("getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
playlist = twitch.get_media_playlist(self.url)
except Exception as e:
logging.warning("{} failed to fetch media playlist {}".format(self, self.url), exc_info=True)
self.trigger_new_worker()
if first:
logging.debug("{} failed on first fetch, stopping".format(self))
logging.warning("{} failed on first fetch, stopping".format(self))
self.stop()
elif isinstance(e, requests.HTTPError) and e.response and e.response.status_code == 403:
logging.debug("{} failed with 403 Forbidden, stopping".format(self))
logging.warning("{} failed with 403 Forbidden, stopping".format(self))
self.stop()
self.wait(self.FETCH_RETRY_INTERVAL)
continue
@ -290,7 +301,7 @@ class StreamWorker(object):
date = None # tracks date in case some segment doesn't include it
for segment in playlist.segments:
if segment.date:
date = dateutil.parser.parse(date)
date = dateutil.parser.parse(segment.date)
if segment.uri not in self.getters:
if date is None:
raise ValueError("Cannot determine date of segment")
@ -323,12 +334,14 @@ class SegmentGetter(object):
"""
UNEXPECTED_FAILURE_RETRY = 0.5
FETCH_RETRY = 2
FETCH_HEADERS_SOFT_TIMEOUT = 5
FETCH_HEADERS_HARD_TIMEOUT = 30
FETCH_FULL_SOFT_TIMEOUT = 15
FETCH_FULL_HARD_TIMEOUT = 240
# Headers timeout is timeout before getting the start of a response,
# full timeout is for the entire download and stream to disk.
FETCH_HEADERS_TIMEOUTS = 5, 30
FETCH_FULL_TIMEOUTS = 15, 240
def __init__(self, base_dir, stream, segment, date):
self.base_dir = base_dir
self.stream = stream
self.segment = segment
self.date = date
self.prefix = self.make_path_prefix()
@ -425,8 +438,9 @@ class SegmentGetter(object):
hash = hashlib.sha256()
file_created = False
try:
with soft_hard_timeout(self.FETCH_FULL_SOFT_TIMEOUT, self.FETCH_FULL_HARD_TIMEOUT, self.retry.set):
with soft_hard_timeout(self.FETCH_HEADERS_SOFT_TIMEOUT, self.FETCH_HEADERS_HARD_TIMEOUT, self.retry.set):
logging.debug("Getting segment {}".format(self.segment))
with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, self.retry.set):
with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, self.retry.set):
resp = requests.get(self.segment.uri, stream=True)
if resp.status_code == 403:
logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment))
@ -442,11 +456,26 @@ class SegmentGetter(object):
f.write(chunk)
hash.update(chunk)
except Exception:
# save original exception so we can re-raise it later even though we may be handling
# another exception in the interim
ex_type, ex, tb = sys.exc_info()
if file_created:
os.rename(temp_path, self.make_path("partial", hash))
raise
self.rename(temp_path, self.make_path("partial", hash))
raise ex_type, ex, tb
else:
os.rename(temp_path, self.make_path("full", hash))
self.rename(temp_path, self.make_path("full", hash))
def rename(self, old, new):
"""Atomic rename that succeeds if the target already exists, since we're naming everything
by hash anyway, so if the filepath already exists the file itself is already there.
In this case, we delete the source file.
"""
try:
os.rename(old, new)
except OSError as e:
if e.errno != errno.EEXIST:
raise
os.remove(old)
def main(channel, base_dir=".", qualities=""):

Loading…
Cancel
Save