downloader: Support watching multiple channels

This is useful eg. for watching db_admin or other testing channels in addition to the main channel.
pull/39/head
Mike Lang 6 years ago
parent f0d9aa82c2
commit 41152fc1d3

@ -10,6 +10,7 @@ import uuid
from base64 import b64encode from base64 import b64encode
from contextlib import contextmanager from contextlib import contextmanager
import argh
import dateutil.parser import dateutil.parser
import gevent import gevent
import gevent.backdoor import gevent.backdoor
@ -543,15 +544,44 @@ class SegmentGetter(object):
segments_downloaded.labels(partial="False", stream=self.channel, variant=self.stream).inc() segments_downloaded.labels(partial="False", stream=self.channel, variant=self.stream).inc()
def main(channel, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0): @argh.arg('channels', nargs="+", help="Twitch channels to watch")
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0):
qualities = qualities.split(",") if qualities else [] qualities = qualities.split(",") if qualities else []
manager = StreamsManager(channel, base_dir, qualities)
gevent.signal(signal.SIGTERM, manager.stop) # shut down on sigterm managers = [
StreamsManager(channel, base_dir, qualities)
for channel in channels
]
def stop():
for manager in managers:
manager.stop()
gevent.signal(signal.SIGTERM, stop) # shut down on sigterm
common.PromLogCountsHandler.install() common.PromLogCountsHandler.install()
common.install_stacksampler() common.install_stacksampler()
prom.start_http_server(metrics_port) prom.start_http_server(metrics_port)
if backdoor_port: if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
logging.info("Starting up") logging.info("Starting up")
manager.run()
workers = [gevent.spawn(manager.run) for manager in managers]
# Wait for any to die
gevent.wait(workers, count=1)
# If one has stopped, either:
# 1. stop() was called and all are stopping
# 2. one errored and we should stop all remaining and report the error
# Our behaviour in both cases is the same:
# 1. Tell all managers to gracefully stop
stop()
# 2. Wait (with timeout) until they've stopped
gevent.wait(workers)
# 3. Check if any of them failed. If they did, report it. If mulitple failed, we report
# one arbitrarily.
for worker in workers:
worker.get() # re-raise error if failed
logging.info("Gracefully stopped") logging.info("Gracefully stopped")

Loading…
Cancel
Save