downloader: Stop retrying in SegmentGetter after a long timeout

In resource contention scenarios, all calls can start failing due to
not being able to read the response in a timely manner.
This means SegmentGetters never stop retrying, leading to further contention
and a feedback loop.
We attempt to put at least some cap on this scenario by giving up
if an amount of time has elapsed to the point that we know our URL couldn't be valid anymore.
Since we don't actually know how long segment URLs are valid, we are very conservative about
this time, for now setting it to 20min.
pull/38/head
Mike Lang 6 years ago committed by Christopher Usher
parent 81aee0ee1e
commit c0f94059aa

@ -389,6 +389,11 @@ class SegmentGetter(object):
# full timeout is for the entire download and stream to disk. # full timeout is for the entire download and stream to disk.
FETCH_HEADERS_TIMEOUTS = 5, 60 FETCH_HEADERS_TIMEOUTS = 5, 60
FETCH_FULL_TIMEOUTS = 15, 240 FETCH_FULL_TIMEOUTS = 15, 240
# Overall timeout on the Getter before giving up, to prevent always-failing Getters
# from growing without bound and causing resource exhaustion issues.
# The longest we've observed in the wild before a segment goes un-fetchable is 7min
# or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, stream, segment, date): def __init__(self, parent_logger, session, base_dir, channel, stream, segment, date):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
@ -417,7 +422,8 @@ class SegmentGetter(object):
self.done.set() self.done.set()
def _run(self): def _run(self):
self.logger.debug("Getter started") start = monotonic()
self.logger.debug("Getter started at {}".format(start))
while not self.exists(): while not self.exists():
self.retry = gevent.event.Event() self.retry = gevent.event.Event()
worker = gevent.spawn(self.get_segment) worker = gevent.spawn(self.get_segment)
@ -426,6 +432,12 @@ class SegmentGetter(object):
# If worker has returned, and return value is true, we're done # If worker has returned, and return value is true, we're done
if worker.ready() and worker.value: if worker.ready() and worker.value:
break break
# If a large amount of time has elapsed since starting, our URL is stale
# anyway so we might as well give up to avoid cpu and disk usage.
elapsed = monotonic() - start
if elapsed > self.GIVE_UP_TIMEOUT:
self.logger.warning("Getter has been running for {}s, giving up as our URL has expired".format(elapsed))
break
# Create a new session, so we don't reuse a connection from the old session # Create a new session, so we don't reuse a connection from the old session
# which had an error / some other issue. This is mostly just out of paranoia. # which had an error / some other issue. This is mostly just out of paranoia.
self.session = requests.Session() self.session = requests.Session()

Loading…
Cancel
Save