downloader: Fix and improve the stop mechanism, stop on SIGTERM

Allows for graceful shutdown
pull/15/head
Mike Lang 6 years ago committed by Mike Lang
parent 7b10429846
commit 1dce14bf77

@ -5,6 +5,7 @@ import hashlib
import logging import logging
import os import os
import random import random
import signal
import sys import sys
import uuid import uuid
from base64 import b64encode from base64 import b64encode
@ -161,6 +162,11 @@ class StreamsManager(object):
def start_worker(self, stream): def start_worker(self, stream):
"""Start a new worker for given stream""" """Start a new worker for given stream"""
# it's possible for fetch_latest to call us after we've started stopping,
# in that case do nothing.
if self.stopping.is_set():
logging.info("Ignoring worker start as we are stopping")
return
url_time, url = self.latest_urls[stream] url_time, url = self.latest_urls[stream]
worker = StreamWorker(self, stream, url, url_time) worker = StreamWorker(self, stream, url, url_time)
self.stream_workers[stream].append(worker) self.stream_workers[stream].append(worker)
@ -208,11 +214,12 @@ class StreamsManager(object):
self.MAX_WORKER_AGE - workers[-1].age() self.MAX_WORKER_AGE - workers[-1].age()
for workers in self.stream_workers.values() if workers for workers in self.stream_workers.values() if workers
] or [0])) ] or [0]))
# wait until refresh triggered or next max age reached
logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
self.refresh_needed.wait(time_to_next_max_age) # wait until refresh triggered, next max age reached, or we're stopping (whichever happens first)
self.refresh_needed.clear() gevent.wait([self.stopping, self.refresh_needed], timeout=time_to_next_max_age, count=1)
gevent.spawn(self.fetch_latest) if not self.stopping.is_set():
self.refresh_needed.clear()
gevent.spawn(self.fetch_latest)
# wait min retry interval with jitter, unless we're stopping # wait min retry interval with jitter, unless we're stopping
self.stopping.wait(jitter(self.FETCH_MIN_INTERVAL)) self.stopping.wait(jitter(self.FETCH_MIN_INTERVAL))
logging.info("Stopping workers") logging.info("Stopping workers")
@ -507,5 +514,7 @@ class SegmentGetter(object):
def main(channel, base_dir=".", qualities=""): def main(channel, base_dir=".", qualities=""):
qualities = qualities.split(",") if qualities else [] qualities = qualities.split(",") if qualities else []
manager = StreamsManager(channel, base_dir, qualities) manager = StreamsManager(channel, base_dir, qualities)
gevent.signal(signal.SIGTERM, manager.stop) # shut down on sigterm
logging.info("Starting up") logging.info("Starting up")
manager.run() manager.run()
logging.info("Gracefully stopped")

Loading…
Cancel
Save