downloader: Sloppily restructure main to have dynamic list of channels taken from what game they're playing

condor-scripts
Mike Lang 6 years ago
parent 32138bbd43
commit a59fe25cfa

@ -6,6 +6,7 @@ import logging
import os
import signal
import sys
import time
import uuid
from base64 import b64encode
from contextlib import contextmanager
@ -593,17 +594,18 @@ class SegmentGetter(object):
"Twitch channels to watch. Add a '!' suffix to indicate they're expected to be always up. "
"This affects retry interval, error reporting and monitoring."
)
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0):
@argh.arg('--follow-game', action='append', default=[], type=str,
help="Follow given game name and download all channels which stream that game",
)
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, follow_game=None):
qualities = qualities.split(",") if qualities else []
managers = [
StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!'))
for channel in channels
]
managers = {}
workers = {}
stopping = gevent.event.Event()
def stop():
for manager in managers:
manager.stop()
stopping.set()
gevent.signal_handler(signal.SIGTERM, stop) # shut down on sigterm
@ -613,24 +615,47 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
logging.info("Starting up")
workers = [gevent.spawn(manager.run) for manager in managers]
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
# Wait for any to die
gevent.wait(workers, count=1)
# If one has stopped, either:
# 1. stop() was called and all are stopping
# 2. one errored and we should stop all remaining and report the error
# Our behaviour in both cases is the same:
# 1. Tell all managers to gracefully stop
stop()
# 2. Wait (with timeout) until they've stopped
gevent.wait(workers)
# 3. Check if any of them failed. If they did, report it. If mulitple failed, we report
# one arbitrarily.
for worker in workers:
worker.get() # re-raise error if failed
prev_check = 0
CHECK_INTERVAL = 60
while workers or not stopping.is_set():
if stopping.is_set():
for channel in managers.keys():
logging.info("Stopping channel {} for shutdown".format(channel))
managers.pop(channel).stop()
elif time.time() - prev_check > CHECK_INTERVAL:
logging.debug("Checking channels for games: {}".format(follow_game))
prev_check = time.time()
try:
game_channels = list(gevent.pool.Group().imap(lambda g: list(twitch.get_channels_for_game(g)), follow_game))
except Exception:
logging.warning("Failed to fetch channels for games", exc_info=True)
else:
new_channels = set(channels).union(*game_channels)
old_channels = set(managers.keys())
for channel in new_channels - old_channels:
logging.info("Adding channel {}".format(channel))
managers[channel] = StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!'))
workers[channel] = gevent.spawn(managers[channel].run)
for channel in old_channels - new_channels:
logging.info("Stopping channel {}".format(channel))
managers.pop(channel).stop()
for channel, worker in workers.items():
if not worker.ready():
continue
del workers[channel]
if worker.successful():
logging.info("Channel {} gracefully stopped".format(channel))
continue
try:
worker.get() # re-raise exception
except Exception:
logging.exception("Channel {} failed".format(channel))
# Wait for next check, or for any worker to finish
timeout = max(0, prev_check + CHECK_INTERVAL - time.time())
gevent.wait([stopping] + workers.values(), count=1, timeout=timeout)
logging.info("Gracefully stopped")

@ -38,6 +38,29 @@ def get_access_token(channel, session):
return data['signature'], data['value']
def get_channels_for_game(game, session=None):
if session is None:
session = InstrumentedSession()
path = "https://api.twitch.tv/kraken/streams"
while True:
resp = session.get(
path,
params={"limit": 100, "game": game},
headers={
'Accept': 'application/vnd.twitchtv.v3+json',
'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j',
},
)
resp.raise_for_status()
data = resp.json()
streams = data["streams"]
if not streams:
return
for stream in streams:
yield stream["channel"]["name"]
path = data["_links"]["next"]
def get_master_playlist(channel, session=None):
"""Get the master playlist for given channel from twitch"""
if session is None:

Loading…
Cancel
Save