downloader: Sloppily restructure main to have dynamic list of channels taken from what game they're playing

downloader/follow-game
Mike Lang 6 years ago
parent b18162135f
commit e351aff176

@ -6,6 +6,7 @@ import logging
import os import os
import signal import signal
import sys import sys
import time
import uuid import uuid
from base64 import b64encode from base64 import b64encode
from contextlib import contextmanager from contextlib import contextmanager
@ -544,18 +545,19 @@ class SegmentGetter(object):
segments_downloaded.labels(partial="False", stream=self.channel, variant=self.stream).inc() segments_downloaded.labels(partial="False", stream=self.channel, variant=self.stream).inc()
@argh.arg('channels', nargs="+", help="Twitch channels to watch") @argh.arg('channels', nargs="*", help="Twitch channels to watch")
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0): @argh.arg('--follow-game', action='append', default=[], type=str,
help="Follow given game name and download all channels which stream that game",
)
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, follow_game=None):
qualities = qualities.split(",") if qualities else [] qualities = qualities.split(",") if qualities else []
managers = [ managers = {}
StreamsManager(channel, base_dir, qualities) workers = {}
for channel in channels
]
stopping = gevent.event.Event()
def stop(): def stop():
for manager in managers: stopping.set()
manager.stop()
gevent.signal(signal.SIGTERM, stop) # shut down on sigterm gevent.signal(signal.SIGTERM, stop) # shut down on sigterm
@ -565,24 +567,47 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
logging.info("Starting up") logging.info("Starting up")
workers = [gevent.spawn(manager.run) for manager in managers]
if backdoor_port: if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
# Wait for any to die prev_check = 0
gevent.wait(workers, count=1) CHECK_INTERVAL = 60
# If one has stopped, either: while workers or not stopping.is_set():
# 1. stop() was called and all are stopping if stopping.is_set():
# 2. one errored and we should stop all remaining and report the error for channel in managers.keys():
# Our behaviour in both cases is the same: logging.info("Stopping channel {} for shutdown".format(channel))
# 1. Tell all managers to gracefully stop managers.pop(channel).stop()
stop() elif time.time() - prev_check > CHECK_INTERVAL:
# 2. Wait (with timeout) until they've stopped logging.debug("Checking channels for games: {}".format(follow_game))
gevent.wait(workers) prev_check = time.time()
# 3. Check if any of them failed. If they did, report it. If mulitple failed, we report try:
# one arbitrarily. game_channels = list(gevent.pool.Group().imap(lambda g: list(twitch.get_channels_for_game(g)), follow_game))
for worker in workers: except Exception:
worker.get() # re-raise error if failed logging.warning("Failed to fetch channels for games", exc_info=True)
else:
new_channels = set(channels).union(*game_channels)
old_channels = set(managers.keys())
for channel in new_channels - old_channels:
logging.info("Adding channel {}".format(channel))
managers[channel] = StreamsManager(channel, base_dir, qualities)
workers[channel] = gevent.spawn(managers[channel].run)
for channel in old_channels - new_channels:
logging.info("Stopping channel {}".format(channel))
managers.pop(channel).stop()
for channel, worker in workers.items():
if not worker.ready():
continue
del workers[channel]
if worker.successful():
logging.info("Channel {} gracefully stopped".format(channel))
continue
try:
worker.get() # re-raise exception
except Exception:
logging.exception("Channel {} failed".format(channel))
# Wait for next check, or for any worker to finish
timeout = max(0, prev_check + CHECK_INTERVAL - time.time())
gevent.wait([stopping] + workers.values(), count=1, timeout=timeout)
logging.info("Gracefully stopped") logging.info("Gracefully stopped")

@ -10,6 +10,27 @@ import hls_playlist
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_channels_for_game(game, session=requests):
path = "https://api.twitch.tv/kraken/streams"
while True:
resp = session.get(
path,
params={"limit": 100, "game": game},
headers={
'Accept': 'application/vnd.twitchtv.v3+json',
'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j',
},
)
resp.raise_for_status()
data = resp.json()
streams = data["streams"]
if not streams:
return
for stream in streams:
yield stream["channel"]["name"]
path = data["_links"]["next"]
def get_master_playlist(channel, session=requests): def get_master_playlist(channel, session=requests):
"""Get the master playlist for given channel from twitch""" """Get the master playlist for given channel from twitch"""
resp = session.get( resp = session.get(

Loading…
Cancel
Save