diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index 6c168f5..659c704 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -32,7 +32,6 @@ // Channels suffixed with a '!' are considered "important" and will be retried more aggressively // and warned about if they're not currently streaming. channels:: [], - follow_games:: ["Crypt of the Necrodancer"], // Stream qualities to capture qualities:: ["source"], @@ -190,9 +189,7 @@ [if $.enabled.downloader then "downloader"]: { image: "quay.io/ekimekim/wubloader-downloader:%s" % $.image_tag, // Args for the downloader: set channel and qualities - command: $.channels + std.flattenArrays([ - ["--follow-game", game] for game in $.follow_games - ]) + [ + command: $.channels + [ "--base-dir", "/mnt", "--qualities", std.join(",", $.qualities), "--backdoor-port", std.toString($.backdoor_port), diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 0731fea..9d05ed2 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -15,6 +15,7 @@ import argh import gevent import gevent.backdoor import gevent.event +import mysql.connector import prometheus_client as prom import requests from monotonic import monotonic @@ -590,14 +591,15 @@ class SegmentGetter(object): stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe -@argh.arg('channels', nargs="+", help= +@argh.arg('channels', nargs="*", help= "Twitch channels to watch. Add a '!' suffix to indicate they're expected to be always up. " "This affects retry interval, error reporting and monitoring." ) -@argh.arg('--follow-game', action='append', default=[], type=str, - help="Follow given game name and download all channels which stream that game", +@argh.arg('--league-db', default="necrobot-read:necrobot-read@condor.host/season_9", help= + "Connection string for database to check for racers and commentators. " + "Should be of form USER:PASSWORD@HOST/DATABASE.", ) -def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, follow_game=None): +def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, league_db=None): qualities = qualities.split(",") if qualities else [] managers = {} @@ -618,6 +620,14 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() + if league_db: + user, rest = league_db.split(':', 1) + password, rest = rest.split('@', 1) + host, db = rest.split('/', 1) + conn = mysql.connector.connect(host=host, user=user, password=password, database=db) + else: + conn = None + prev_check = 0 CHECK_INTERVAL = 60 while workers or not stopping.is_set(): @@ -625,15 +635,32 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor for channel in managers.keys(): logging.info("Stopping channel {} for shutdown".format(channel)) managers.pop(channel).stop() - elif time.time() - prev_check > CHECK_INTERVAL: - logging.debug("Checking channels for games: {}".format(follow_game)) + elif conn and time.time() - prev_check > CHECK_INTERVAL: + logging.debug("Checking for new racers") prev_check = time.time() try: - game_channels = list(gevent.pool.Group().imap(lambda g: list(twitch.get_channels_for_game(g)), follow_game)) + cur = conn.cursor() + # We're looking for all racers and commentators with an unfinished match + # that starts before 5min from now + cur.execute(""" + SELECT + racer_1_name, + racer_2_name, + cawmentator_name + FROM match_info + WHERE scheduled + AND scheduled_time < NOW() + INTERVAL 5 MINUTE + AND NOT completed + """) + channels_from_db = set() + for channels in cur.fetchall(): + # filter out null commentators + channels = [channel for channel in channels if channel] + channels_from_db |= set(channels) except Exception: - logging.warning("Failed to fetch channels for games", exc_info=True) + logging.warning("Failed to fetch current races", exc_info=True) else: - new_channels = set(channels).union(*game_channels) + new_channels = set(channels).union(channels_from_db) old_channels = set(managers.keys()) for channel in new_channels - old_channels: logging.info("Adding channel {}".format(channel)) diff --git a/downloader/setup.py b/downloader/setup.py index 2928579..e527e23 100644 --- a/downloader/setup.py +++ b/downloader/setup.py @@ -9,6 +9,7 @@ setup( "python-dateutil", "gevent", "monotonic", + "mysql-connector-python", "prometheus-client==0.7.1", # locked version as we rely on internals "requests", "wubloader-common",