From 41152fc1d3df951506cd63b5c987c37869362a09 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 13 May 2019 03:02:58 -0700 Subject: [PATCH] downloader: Support watching multiple channels This is useful eg. for watching db_admin or other testing channels in addition to the main channel. --- downloader/downloader/main.py | 38 +++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index d0810af..0879398 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -10,6 +10,7 @@ import uuid from base64 import b64encode from contextlib import contextmanager +import argh import dateutil.parser import gevent import gevent.backdoor @@ -543,15 +544,44 @@ class SegmentGetter(object): segments_downloaded.labels(partial="False", stream=self.channel, variant=self.stream).inc() -def main(channel, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0): +@argh.arg('channels', nargs="+", help="Twitch channels to watch") +def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0): qualities = qualities.split(",") if qualities else [] - manager = StreamsManager(channel, base_dir, qualities) - gevent.signal(signal.SIGTERM, manager.stop) # shut down on sigterm + + managers = [ + StreamsManager(channel, base_dir, qualities) + for channel in channels + ] + + def stop(): + for manager in managers: + manager.stop() + + gevent.signal(signal.SIGTERM, stop) # shut down on sigterm + common.PromLogCountsHandler.install() common.install_stacksampler() prom.start_http_server(metrics_port) + if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() + logging.info("Starting up") - manager.run() + + workers = [gevent.spawn(manager.run) for manager in managers] + # Wait for any to die + gevent.wait(workers, count=1) + # If one has stopped, either: + # 1. stop() was called and all are stopping + # 2. one errored and we should stop all remaining and report the error + # Our behaviour in both cases is the same: + # 1. Tell all managers to gracefully stop + stop() + # 2. Wait (with timeout) until they've stopped + gevent.wait(workers) + # 3. Check if any of them failed. If they did, report it. If mulitple failed, we report + # one arbitrarily. + for worker in workers: + worker.get() # re-raise error if failed + logging.info("Gracefully stopped")