diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 99a28c6..aea19ba 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -5,6 +5,7 @@ import hashlib import logging import os import random +import signal import sys import uuid from base64 import b64encode @@ -161,6 +162,11 @@ class StreamsManager(object): def start_worker(self, stream): """Start a new worker for given stream""" + # it's possible for fetch_latest to call us after we've started stopping, + # in that case do nothing. + if self.stopping.is_set(): + logging.info("Ignoring worker start as we are stopping") + return url_time, url = self.latest_urls[stream] worker = StreamWorker(self, stream, url, url_time) self.stream_workers[stream].append(worker) @@ -208,11 +214,12 @@ class StreamsManager(object): self.MAX_WORKER_AGE - workers[-1].age() for workers in self.stream_workers.values() if workers ] or [0])) - # wait until refresh triggered or next max age reached logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) - self.refresh_needed.wait(time_to_next_max_age) - self.refresh_needed.clear() - gevent.spawn(self.fetch_latest) + # wait until refresh triggered, next max age reached, or we're stopping (whichever happens first) + gevent.wait([self.stopping, self.refresh_needed], timeout=time_to_next_max_age, count=1) + if not self.stopping.is_set(): + self.refresh_needed.clear() + gevent.spawn(self.fetch_latest) # wait min retry interval with jitter, unless we're stopping self.stopping.wait(jitter(self.FETCH_MIN_INTERVAL)) logging.info("Stopping workers") @@ -507,5 +514,7 @@ class SegmentGetter(object): def main(channel, base_dir=".", qualities=""): qualities = qualities.split(",") if qualities else [] manager = StreamsManager(channel, base_dir, qualities) + gevent.signal(signal.SIGTERM, manager.stop) # shut down on sigterm logging.info("Starting up") manager.run() + logging.info("Gracefully stopped")