From a59fe25cfa7e3967e7420b9a424184232b8763cc Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 13 May 2019 04:33:55 -0700 Subject: [PATCH] downloader: Sloppily restructure main to have dynamic list of channels taken from what game they're playing --- downloader/downloader/main.py | 71 ++++++++++++++++++++++----------- downloader/downloader/twitch.py | 23 +++++++++++ 2 files changed, 71 insertions(+), 23 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index e505e5b..0731fea 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -6,6 +6,7 @@ import logging import os import signal import sys +import time import uuid from base64 import b64encode from contextlib import contextmanager @@ -593,17 +594,18 @@ class SegmentGetter(object): "Twitch channels to watch. Add a '!' suffix to indicate they're expected to be always up. " "This affects retry interval, error reporting and monitoring." ) -def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0): +@argh.arg('--follow-game', action='append', default=[], type=str, + help="Follow given game name and download all channels which stream that game", +) +def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, follow_game=None): qualities = qualities.split(",") if qualities else [] - managers = [ - StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!')) - for channel in channels - ] + managers = {} + workers = {} + stopping = gevent.event.Event() def stop(): - for manager in managers: - manager.stop() + stopping.set() gevent.signal_handler(signal.SIGTERM, stop) # shut down on sigterm @@ -613,24 +615,47 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor logging.info("Starting up") - workers = [gevent.spawn(manager.run) for manager in managers] - if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() - # Wait for any to die - gevent.wait(workers, count=1) - # If one has stopped, either: - # 1. stop() was called and all are stopping - # 2. one errored and we should stop all remaining and report the error - # Our behaviour in both cases is the same: - # 1. Tell all managers to gracefully stop - stop() - # 2. Wait (with timeout) until they've stopped - gevent.wait(workers) - # 3. Check if any of them failed. If they did, report it. If mulitple failed, we report - # one arbitrarily. - for worker in workers: - worker.get() # re-raise error if failed + prev_check = 0 + CHECK_INTERVAL = 60 + while workers or not stopping.is_set(): + if stopping.is_set(): + for channel in managers.keys(): + logging.info("Stopping channel {} for shutdown".format(channel)) + managers.pop(channel).stop() + elif time.time() - prev_check > CHECK_INTERVAL: + logging.debug("Checking channels for games: {}".format(follow_game)) + prev_check = time.time() + try: + game_channels = list(gevent.pool.Group().imap(lambda g: list(twitch.get_channels_for_game(g)), follow_game)) + except Exception: + logging.warning("Failed to fetch channels for games", exc_info=True) + else: + new_channels = set(channels).union(*game_channels) + old_channels = set(managers.keys()) + for channel in new_channels - old_channels: + logging.info("Adding channel {}".format(channel)) + managers[channel] = StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!')) + workers[channel] = gevent.spawn(managers[channel].run) + for channel in old_channels - new_channels: + logging.info("Stopping channel {}".format(channel)) + managers.pop(channel).stop() + for channel, worker in workers.items(): + if not worker.ready(): + continue + del workers[channel] + if worker.successful(): + logging.info("Channel {} gracefully stopped".format(channel)) + continue + try: + worker.get() # re-raise exception + except Exception: + logging.exception("Channel {} failed".format(channel)) + + # Wait for next check, or for any worker to finish + timeout = max(0, prev_check + CHECK_INTERVAL - time.time()) + gevent.wait([stopping] + workers.values(), count=1, timeout=timeout) logging.info("Gracefully stopped") diff --git a/downloader/downloader/twitch.py b/downloader/downloader/twitch.py index e9032ef..cd0b89d 100644 --- a/downloader/downloader/twitch.py +++ b/downloader/downloader/twitch.py @@ -38,6 +38,29 @@ def get_access_token(channel, session): return data['signature'], data['value'] +def get_channels_for_game(game, session=None): + if session is None: + session = InstrumentedSession() + path = "https://api.twitch.tv/kraken/streams" + while True: + resp = session.get( + path, + params={"limit": 100, "game": game}, + headers={ + 'Accept': 'application/vnd.twitchtv.v3+json', + 'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j', + }, + ) + resp.raise_for_status() + data = resp.json() + streams = data["streams"] + if not streams: + return + for stream in streams: + yield stream["channel"]["name"] + path = data["_links"]["next"] + + def get_master_playlist(channel, session=None): """Get the master playlist for given channel from twitch""" if session is None: