From bc08d97e56815c2471598109146b94b0296a3c27 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 8 Nov 2023 22:47:57 +1100 Subject: [PATCH] downloader: Add framework to allow alternate "providers" besides twitch This abstracts out the process of obtaining media playlists so that we can support non-twitch streaming services. --- docker-compose.jsonnet | 2 +- downloader/downloader/main.py | 54 ++++--- downloader/downloader/twitch.py | 249 +++++++++++++++++--------------- 3 files changed, 168 insertions(+), 137 deletions(-) diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index 7ba06ae..3cf2fc5 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -300,7 +300,7 @@ "--base-dir", "/mnt", "--qualities", std.join(",", $.qualities), "--backdoor-port", std.toString($.backdoor_port), - ] + if $.downloader_creds_file != null then ["--auth-file", "/token"] else [], + ] + if $.downloader_creds_file != null then ["--twitch-auth-file", "/token"] else [], // Mount the segments directory at /mnt volumes: ["%s:/mnt" % $.segments_path] + if $.downloader_creds_file != null then ["%s:/token" % $.downloader_creds_file] else [], diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index f1fa38e..ff6811e 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -21,7 +21,7 @@ import common import common.dateutil import common.requests -from . import twitch +from .twitch import URLProvider, TwitchProvider, YoutubeProvider segments_downloaded = prom.Counter( @@ -121,9 +121,9 @@ class StreamsManager(object): FETCH_MIN_INTERVAL = 20 FETCH_TIMEOUTS = 5, 30 - MAX_WORKER_AGE = 20*60*60 # 20 hours, twitch's media playlist links expire after 24 hours - def __init__(self, channel, base_dir, qualities, important=False, auth_token=None): + def __init__(self, provider, channel, base_dir, qualities, important=False): + self.provider = provider self.channel = channel self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.base_dir = base_dir @@ -133,7 +133,6 @@ class StreamsManager(object): self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now self.stopping = gevent.event.Event() # set to tell main loop to stop self.important = important - self.auth_token = auth_token self.master_playlist_log_level = logging.INFO if important else logging.DEBUG if self.important: self.FETCH_MIN_INTERVAL = self.IMPORTANT_FETCH_MIN_INTERVAL @@ -215,8 +214,7 @@ class StreamsManager(object): self.logger.log(self.master_playlist_log_level, "Fetching master playlist") fetch_time = monotonic() with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): - master_playlist = twitch.get_master_playlist(self.channel, auth_token=self.auth_token) - new_urls = twitch.get_media_playlist_uris(master_playlist, list(self.stream_workers.keys())) + new_urls = self.provider.get_media_playlist_uris(list(self.stream_workers.keys())) self.update_urls(fetch_time, new_urls) for quality, workers in self.stream_workers.items(): # warn and retry if the url is missing @@ -231,7 +229,7 @@ class StreamsManager(object): continue latest_worker = workers[-1] # is the old worker too old? - if latest_worker.age() > self.MAX_WORKER_AGE: + if latest_worker.age() > self.provider.MAX_WORKER_AGE: self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(quality, latest_worker.age() / 3600.)) self.start_worker(quality) except Exception as e: @@ -250,7 +248,7 @@ class StreamsManager(object): while not self.stopping.is_set(): # clamp time to max age to non-negative, and default to 0 if no workers exist time_to_next_max_age = max(0, min([ - self.MAX_WORKER_AGE - workers[-1].age() + self.provider.MAX_WORKER_AGE - workers[-1].age() for workers in self.stream_workers.values() if workers ] or [0])) self.logger.log(self.master_playlist_log_level, "Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) @@ -349,7 +347,7 @@ class StreamWorker(object): self.logger.debug("Getting media playlist {}".format(self.url)) try: with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): - playlist = twitch.get_media_playlist(self.url, session=self.session) + playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) except Exception as e: self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() @@ -597,22 +595,38 @@ class SegmentGetter(object): stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe -@argh.arg('channels', nargs="+", help= +def parse_channel(channel): + if ":" in channel: + channel, type, url = channel.split(":", 2) + else: + type = "twitch" + url = None + important = channel.endswith("!") + channel = channel.rstrip("!") + return channel, important, type, url + + +@argh.arg('channels', nargs="+", type=parse_channel, help= "Twitch channels to watch. Add a '!' suffix to indicate they're expected to be always up. " - "This affects retry interval, error reporting and monitoring." + "This affects retry interval, error reporting and monitoring. " + "Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL" ) -def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, auth_file=None): +def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None): qualities = qualities.split(",") if qualities else [] - auth_token = None - if auth_file is not None: - with open(auth_file) as f: - auth_token = f.read().strip() + twitch_auth_token = None + if twitch_auth_file is not None: + with open(twitch_auth_file) as f: + twitch_auth_token = f.read().strip() - managers = [ - StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!'), auth_token=auth_token) - for channel in channels - ] + managers = [] + for channel, important, type, url in channels: + if type == "twitch": + provider = TwitchProvider(channel, auth_token=twitch_auth_token) + else: + raise ValueError(f"Unknown type {type!r}") + manager = StreamsManager(provider, channel, base_dir, qualities, important=important) + managers.append(manager) def stop(): for manager in managers: diff --git a/downloader/downloader/twitch.py b/downloader/downloader/twitch.py index 468148a..cc2b44a 100644 --- a/downloader/downloader/twitch.py +++ b/downloader/downloader/twitch.py @@ -7,121 +7,138 @@ from common.requests import InstrumentedSession from . import hls_playlist -logger = logging.getLogger(__name__) - - -def get_access_token(channel, session, auth_token): - request = { - "operationName": "PlaybackAccessToken", - "extensions": { - "persistedQuery": { - "version": 1, - "sha256Hash": "0828119ded1c13477966434e15800ff57ddacf13ba1911c129dc2200705b0712" +class Provider: + """Base class with defaults, to be overriden for specific providers""" + + # How long (in seconds) we should keep using a media playlist URI before getting a new one. + # This matters because some providers set an expiry on the URI they give you. + # However the default is an arbitrarily long period (ie. never). + MAX_WORKER_AGE = 30 * 24 * 60 * 60 # 30 days + + def get_media_playlist_uris(self, qualities, session=None): + """Fetches master playlist and returns {quality: media playlist URI} for each + requested quality.""" + raise NotImplementedError + + def get_media_playlist(self, uri, session=None): + """Fetches the given media playlist. In most cases this is just a simple fetch + and doesn't need to be overriden.""" + if session is None: + session = InstrumentedSession() + resp = session.get(uri, metric_name='get_media_playlist') + resp.raise_for_status() + return hls_playlist.load(resp.text, base_uri=resp.url) + + +class TwitchProvider(Provider): + """Provider that takes a twitch channel.""" + # Twitch links expire after 24h, so roll workers at 20h + MAX_WORKER_AGE = 20 * 60 * 60 + + def __init__(self, channel, auth_token=None): + self.channel = channel + self.auth_token = auth_token + + def get_access_token(self, session): + request = { + "operationName": "PlaybackAccessToken", + "extensions": { + "persistedQuery": { + "version": 1, + "sha256Hash": "0828119ded1c13477966434e15800ff57ddacf13ba1911c129dc2200705b0712" + } + }, + "variables": { + "isLive": True, + "login": self.channel, + "isVod": False, + "vodID": "", + "playerType": "site" } - }, - "variables": { - "isLive": True, - "login": channel, - "isVod": False, - "vodID": "", - "playerType": "site" } - } - headers = {'Client-ID': 'kimne78kx3ncx6brgo4mv6wki5h1ko'} - if auth_token is not None: - headers["Authorization"] = "OAuth {}".format(auth_token) - resp = session.post( - "https://gql.twitch.tv/gql", - json=request, - headers=headers, - metric_name='get_access_token', - ) - resp.raise_for_status() - data = resp.json()["data"]["streamPlaybackAccessToken"] - return data['signature'], data['value'] - - -def get_master_playlist(channel, session=None, auth_token=None): - """Get the master playlist for given channel from twitch""" - if session is None: - session = InstrumentedSession() - sig, token = get_access_token(channel, session, auth_token) - resp = session.get( - "https://usher.ttvnw.net/api/channel/hls/{}.m3u8".format(channel), - headers={ - "referer": "https://player.twitch.tv", - "origin": "https://player.twitch.tv", - }, - params={ - # Taken from streamlink. Unsure what's needed and what changing things can do. - "player": "twitchweb", - "p": random.randrange(1000000), - "type": "any", - "allow_source": "true", - "allow_audio_only": "true", - "allow_spectre": "false", - "fast_bread": "true", - "sig": sig, - "token": token, - }, - metric_name='get_master_playlist', - ) - resp.raise_for_status() # getting master playlist - playlist = hls_playlist.load(resp.text, base_uri=resp.url) - return playlist - - -def get_media_playlist_uris(master_playlist, target_qualities): - """From a master playlist, extract URIs of media playlists of interest. - Returns {stream name: uri}. - Note this is not a general method for all HLS streams, and makes twitch-specific assumptions, - though we try to check and emit warnings if these assumptions are broken. - """ - # Twitch master playlists are observed to have the following form: - # The first listed variant is the source playlist and has "(source)" in the name. - # Other variants are listed in order of quality from highest to lowest, followed by audio_only. - # These transcoded variants are named "Hp[R]" where H is the vertical resolution and - # optionally R is the frame rate. R is elided if == 30. Examples: 720p60, 720p, 480p, 360p, 160p - # These variants are observed to only ever have one rendition, type video, which contains the name - # but no URI. The URI in the main variant entry is the one to use. This is true even of the - # "audio_only" stream. - # Streams without transcoding options only show source and audio_only. - # We return the source stream in addition to any in target_qualities that is found. - - def variant_name(variant): - names = set(media.name for media in variant.media if media.type == "VIDEO" and media.name) - if not names: - logger.warning("Variant {} has no named video renditions, can't determine name".format(variant)) - return None - if len(names) > 1: - logger.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant)) - return list(names)[0] - - if not master_playlist.playlists: - raise ValueError("Master playlist has no variants") - - for variant in master_playlist.playlists: - if any(media.uri for media in variant.media): - logger.warning("Variant has a rendition with its own URI: {}".format(variant)) - - by_name = {variant_name(variant): variant for variant in master_playlist.playlists} - - source_candidates = [name for name in by_name.keys() if "(source)" in name] - if len(source_candidates) != 1: - raise ValueError("Can't find source stream, not exactly one candidate. Candidates: {}, playlist: {}".format( - source_candidates, master_playlist, - )) - source = by_name[source_candidates[0]] - - variants = {name: variant for name, variant in by_name.items() if name in target_qualities} - variants["source"] = source - - return {name: variant.uri for name, variant in variants.items()} - - -def get_media_playlist(uri, session=None): - if session is None: - session = InstrumentedSession() - resp = session.get(uri, metric_name='get_media_playlist') - resp.raise_for_status() - return hls_playlist.load(resp.text, base_uri=resp.url) + headers = {'Client-ID': 'kimne78kx3ncx6brgo4mv6wki5h1ko'} + if self.auth_token is not None: + headers["Authorization"] = "OAuth {}".format(self.auth_token) + resp = session.post( + "https://gql.twitch.tv/gql", + json=request, + headers=headers, + metric_name='twitch_get_access_token', + ) + resp.raise_for_status() + data = resp.json()["data"]["streamPlaybackAccessToken"] + return data['signature'], data['value'] + + def get_master_playlist(self, session): + sig, token = self.get_access_token(session) + resp = session.get( + "https://usher.ttvnw.net/api/channel/hls/{}.m3u8".format(self.channel), + headers={ + "referer": "https://player.twitch.tv", + "origin": "https://player.twitch.tv", + }, + params={ + # Taken from streamlink. Unsure what's needed and what changing things can do. + "player": "twitchweb", + "p": random.randrange(1000000), + "type": "any", + "allow_source": "true", + "allow_audio_only": "true", + "allow_spectre": "false", + "fast_bread": "true", + "sig": sig, + "token": token, + }, + metric_name='twitch_get_master_playlist', + ) + resp.raise_for_status() # getting master playlist + playlist = hls_playlist.load(resp.text, base_uri=resp.url) + return playlist + + def get_media_playlist_uris(self, target_qualities, session=None): + # Twitch master playlists are observed to have the following form: + # The first listed variant is the source playlist and has "(source)" in the name. + # Other variants are listed in order of quality from highest to lowest, followed by audio_only. + # These transcoded variants are named "Hp[R]" where H is the vertical resolution and + # optionally R is the frame rate. R is elided if == 30. Examples: 720p60, 720p, 480p, 360p, 160p + # These variants are observed to only ever have one rendition, type video, which contains the name + # but no URI. The URI in the main variant entry is the one to use. This is true even of the + # "audio_only" stream. + # Streams without transcoding options only show source and audio_only. + # We return the source stream in addition to any in target_qualities that is found. + + logger = logging.getLogger("twitch") + if session is None: + session = InstrumentedSession() + + master_playlist = self.get_master_playlist(session) + + def variant_name(variant): + names = set(media.name for media in variant.media if media.type == "VIDEO" and media.name) + if not names: + logger.warning("Variant {} has no named video renditions, can't determine name".format(variant)) + return None + if len(names) > 1: + logger.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant)) + return list(names)[0] + + if not master_playlist.playlists: + raise ValueError("Master playlist has no variants") + + for variant in master_playlist.playlists: + if any(media.uri for media in variant.media): + logger.warning("Variant has a rendition with its own URI: {}".format(variant)) + + by_name = {variant_name(variant): variant for variant in master_playlist.playlists} + + source_candidates = [name for name in by_name.keys() if "(source)" in name] + if len(source_candidates) != 1: + raise ValueError("Can't find source stream, not exactly one candidate. Candidates: {}, playlist: {}".format( + source_candidates, master_playlist, + )) + source = by_name[source_candidates[0]] + + variants = {name: variant for name, variant in by_name.items() if name in target_qualities} + variants["source"] = source + + return {name: variant.uri for name, variant in variants.items()}