|
|
@ -10,6 +10,7 @@ import uuid
|
|
|
|
from base64 import b64encode
|
|
|
|
from base64 import b64encode
|
|
|
|
from contextlib import contextmanager
|
|
|
|
from contextlib import contextmanager
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import argh
|
|
|
|
import dateutil.parser
|
|
|
|
import dateutil.parser
|
|
|
|
import gevent
|
|
|
|
import gevent
|
|
|
|
import gevent.backdoor
|
|
|
|
import gevent.backdoor
|
|
|
@ -543,15 +544,44 @@ class SegmentGetter(object):
|
|
|
|
segments_downloaded.labels(partial="False", stream=self.channel, variant=self.stream).inc()
|
|
|
|
segments_downloaded.labels(partial="False", stream=self.channel, variant=self.stream).inc()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(channel, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0):
|
|
|
|
@argh.arg('channels', nargs="+", help="Twitch channels to watch")
|
|
|
|
|
|
|
|
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0):
|
|
|
|
qualities = qualities.split(",") if qualities else []
|
|
|
|
qualities = qualities.split(",") if qualities else []
|
|
|
|
manager = StreamsManager(channel, base_dir, qualities)
|
|
|
|
|
|
|
|
gevent.signal(signal.SIGTERM, manager.stop) # shut down on sigterm
|
|
|
|
managers = [
|
|
|
|
|
|
|
|
StreamsManager(channel, base_dir, qualities)
|
|
|
|
|
|
|
|
for channel in channels
|
|
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def stop():
|
|
|
|
|
|
|
|
for manager in managers:
|
|
|
|
|
|
|
|
manager.stop()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gevent.signal(signal.SIGTERM, stop) # shut down on sigterm
|
|
|
|
|
|
|
|
|
|
|
|
common.PromLogCountsHandler.install()
|
|
|
|
common.PromLogCountsHandler.install()
|
|
|
|
common.install_stacksampler()
|
|
|
|
common.install_stacksampler()
|
|
|
|
prom.start_http_server(metrics_port)
|
|
|
|
prom.start_http_server(metrics_port)
|
|
|
|
|
|
|
|
|
|
|
|
if backdoor_port:
|
|
|
|
if backdoor_port:
|
|
|
|
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
|
|
|
|
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
|
|
|
|
|
|
|
|
|
|
|
|
logging.info("Starting up")
|
|
|
|
logging.info("Starting up")
|
|
|
|
manager.run()
|
|
|
|
|
|
|
|
|
|
|
|
workers = [gevent.spawn(manager.run) for manager in managers]
|
|
|
|
|
|
|
|
# Wait for any to die
|
|
|
|
|
|
|
|
gevent.wait(workers, count=1)
|
|
|
|
|
|
|
|
# If one has stopped, either:
|
|
|
|
|
|
|
|
# 1. stop() was called and all are stopping
|
|
|
|
|
|
|
|
# 2. one errored and we should stop all remaining and report the error
|
|
|
|
|
|
|
|
# Our behaviour in both cases is the same:
|
|
|
|
|
|
|
|
# 1. Tell all managers to gracefully stop
|
|
|
|
|
|
|
|
stop()
|
|
|
|
|
|
|
|
# 2. Wait (with timeout) until they've stopped
|
|
|
|
|
|
|
|
gevent.wait(workers)
|
|
|
|
|
|
|
|
# 3. Check if any of them failed. If they did, report it. If mulitple failed, we report
|
|
|
|
|
|
|
|
# one arbitrarily.
|
|
|
|
|
|
|
|
for worker in workers:
|
|
|
|
|
|
|
|
worker.get() # re-raise error if failed
|
|
|
|
|
|
|
|
|
|
|
|
logging.info("Gracefully stopped")
|
|
|
|
logging.info("Gracefully stopped")
|
|
|
|