From c0f94059aa1083aebafd9b70c84d65dd3e9ad971 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 10 Jan 2019 11:15:40 -0800 Subject: [PATCH] downloader: Stop retrying in SegmentGetter after a long timeout In resource contention scenarios, all calls can start failing due to not being able to read the response in a timely manner. This means SegmentGetters never stop retrying, leading to further contention and a feedback loop. We attempt to put at least some cap on this scenario by giving up if an amount of time has elapsed to the point that we know our URL couldn't be valid anymore. Since we don't actually know how long segment URLs are valid, we are very conservative about this time, for now setting it to 20min. --- downloader/downloader/main.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index b1f9b33..7fe473c 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -389,6 +389,11 @@ class SegmentGetter(object): # full timeout is for the entire download and stream to disk. FETCH_HEADERS_TIMEOUTS = 5, 60 FETCH_FULL_TIMEOUTS = 15, 240 + # Overall timeout on the Getter before giving up, to prevent always-failing Getters + # from growing without bound and causing resource exhaustion issues. + # The longest we've observed in the wild before a segment goes un-fetchable is 7min + # or so, to be paranoid we set it to considerably longer than that. + GIVE_UP_TIMEOUT = 20 * 60 def __init__(self, parent_logger, session, base_dir, channel, stream, segment, date): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) @@ -417,7 +422,8 @@ class SegmentGetter(object): self.done.set() def _run(self): - self.logger.debug("Getter started") + start = monotonic() + self.logger.debug("Getter started at {}".format(start)) while not self.exists(): self.retry = gevent.event.Event() worker = gevent.spawn(self.get_segment) @@ -426,6 +432,12 @@ class SegmentGetter(object): # If worker has returned, and return value is true, we're done if worker.ready() and worker.value: break + # If a large amount of time has elapsed since starting, our URL is stale + # anyway so we might as well give up to avoid cpu and disk usage. + elapsed = monotonic() - start + if elapsed > self.GIVE_UP_TIMEOUT: + self.logger.warning("Getter has been running for {}s, giving up as our URL has expired".format(elapsed)) + break # Create a new session, so we don't reuse a connection from the old session # which had an error / some other issue. This is mostly just out of paranoia. self.session = requests.Session()