diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index b1f9b33..7fe473c 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -389,6 +389,11 @@ class SegmentGetter(object): # full timeout is for the entire download and stream to disk. FETCH_HEADERS_TIMEOUTS = 5, 60 FETCH_FULL_TIMEOUTS = 15, 240 + # Overall timeout on the Getter before giving up, to prevent always-failing Getters + # from growing without bound and causing resource exhaustion issues. + # The longest we've observed in the wild before a segment goes un-fetchable is 7min + # or so, to be paranoid we set it to considerably longer than that. + GIVE_UP_TIMEOUT = 20 * 60 def __init__(self, parent_logger, session, base_dir, channel, stream, segment, date): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) @@ -417,7 +422,8 @@ class SegmentGetter(object): self.done.set() def _run(self): - self.logger.debug("Getter started") + start = monotonic() + self.logger.debug("Getter started at {}".format(start)) while not self.exists(): self.retry = gevent.event.Event() worker = gevent.spawn(self.get_segment) @@ -426,6 +432,12 @@ class SegmentGetter(object): # If worker has returned, and return value is true, we're done if worker.ready() and worker.value: break + # If a large amount of time has elapsed since starting, our URL is stale + # anyway so we might as well give up to avoid cpu and disk usage. + elapsed = monotonic() - start + if elapsed > self.GIVE_UP_TIMEOUT: + self.logger.warning("Getter has been running for {}s, giving up as our URL has expired".format(elapsed)) + break # Create a new session, so we don't reuse a connection from the old session # which had an error / some other issue. This is mostly just out of paranoia. self.session = requests.Session()