downloader: Add framework to allow alternate "providers" besides twitch

This abstracts out the process of obtaining media playlists so that we can support non-twitch
streaming services.
pull/355/head
Mike Lang 1 year ago committed by Mike Lang
parent 30c1877b36
commit bc08d97e56

@ -300,7 +300,7 @@
"--base-dir", "/mnt", "--base-dir", "/mnt",
"--qualities", std.join(",", $.qualities), "--qualities", std.join(",", $.qualities),
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
] + if $.downloader_creds_file != null then ["--auth-file", "/token"] else [], ] + if $.downloader_creds_file != null then ["--twitch-auth-file", "/token"] else [],
// Mount the segments directory at /mnt // Mount the segments directory at /mnt
volumes: ["%s:/mnt" % $.segments_path] volumes: ["%s:/mnt" % $.segments_path]
+ if $.downloader_creds_file != null then ["%s:/token" % $.downloader_creds_file] else [], + if $.downloader_creds_file != null then ["%s:/token" % $.downloader_creds_file] else [],

@ -21,7 +21,7 @@ import common
import common.dateutil import common.dateutil
import common.requests import common.requests
from . import twitch from .twitch import URLProvider, TwitchProvider, YoutubeProvider
segments_downloaded = prom.Counter( segments_downloaded = prom.Counter(
@ -121,9 +121,9 @@ class StreamsManager(object):
FETCH_MIN_INTERVAL = 20 FETCH_MIN_INTERVAL = 20
FETCH_TIMEOUTS = 5, 30 FETCH_TIMEOUTS = 5, 30
MAX_WORKER_AGE = 20*60*60 # 20 hours, twitch's media playlist links expire after 24 hours
def __init__(self, channel, base_dir, qualities, important=False, auth_token=None): def __init__(self, provider, channel, base_dir, qualities, important=False):
self.provider = provider
self.channel = channel self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.logger = logging.getLogger("StreamsManager({})".format(channel))
self.base_dir = base_dir self.base_dir = base_dir
@ -133,7 +133,6 @@ class StreamsManager(object):
self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now
self.stopping = gevent.event.Event() # set to tell main loop to stop self.stopping = gevent.event.Event() # set to tell main loop to stop
self.important = important self.important = important
self.auth_token = auth_token
self.master_playlist_log_level = logging.INFO if important else logging.DEBUG self.master_playlist_log_level = logging.INFO if important else logging.DEBUG
if self.important: if self.important:
self.FETCH_MIN_INTERVAL = self.IMPORTANT_FETCH_MIN_INTERVAL self.FETCH_MIN_INTERVAL = self.IMPORTANT_FETCH_MIN_INTERVAL
@ -215,8 +214,7 @@ class StreamsManager(object):
self.logger.log(self.master_playlist_log_level, "Fetching master playlist") self.logger.log(self.master_playlist_log_level, "Fetching master playlist")
fetch_time = monotonic() fetch_time = monotonic()
with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh):
master_playlist = twitch.get_master_playlist(self.channel, auth_token=self.auth_token) new_urls = self.provider.get_media_playlist_uris(list(self.stream_workers.keys()))
new_urls = twitch.get_media_playlist_uris(master_playlist, list(self.stream_workers.keys()))
self.update_urls(fetch_time, new_urls) self.update_urls(fetch_time, new_urls)
for quality, workers in self.stream_workers.items(): for quality, workers in self.stream_workers.items():
# warn and retry if the url is missing # warn and retry if the url is missing
@ -231,7 +229,7 @@ class StreamsManager(object):
continue continue
latest_worker = workers[-1] latest_worker = workers[-1]
# is the old worker too old? # is the old worker too old?
if latest_worker.age() > self.MAX_WORKER_AGE: if latest_worker.age() > self.provider.MAX_WORKER_AGE:
self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(quality, latest_worker.age() / 3600.)) self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(quality, latest_worker.age() / 3600.))
self.start_worker(quality) self.start_worker(quality)
except Exception as e: except Exception as e:
@ -250,7 +248,7 @@ class StreamsManager(object):
while not self.stopping.is_set(): while not self.stopping.is_set():
# clamp time to max age to non-negative, and default to 0 if no workers exist # clamp time to max age to non-negative, and default to 0 if no workers exist
time_to_next_max_age = max(0, min([ time_to_next_max_age = max(0, min([
self.MAX_WORKER_AGE - workers[-1].age() self.provider.MAX_WORKER_AGE - workers[-1].age()
for workers in self.stream_workers.values() if workers for workers in self.stream_workers.values() if workers
] or [0])) ] or [0]))
self.logger.log(self.master_playlist_log_level, "Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) self.logger.log(self.master_playlist_log_level, "Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
@ -349,7 +347,7 @@ class StreamWorker(object):
self.logger.debug("Getting media playlist {}".format(self.url)) self.logger.debug("Getting media playlist {}".format(self.url))
try: try:
with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
playlist = twitch.get_media_playlist(self.url, session=self.session) playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
except Exception as e: except Exception as e:
self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True)
self.trigger_new_worker() self.trigger_new_worker()
@ -597,22 +595,38 @@ class SegmentGetter(object):
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe
@argh.arg('channels', nargs="+", help= def parse_channel(channel):
if ":" in channel:
channel, type, url = channel.split(":", 2)
else:
type = "twitch"
url = None
important = channel.endswith("!")
channel = channel.rstrip("!")
return channel, important, type, url
@argh.arg('channels', nargs="+", type=parse_channel, help=
"Twitch channels to watch. Add a '!' suffix to indicate they're expected to be always up. " "Twitch channels to watch. Add a '!' suffix to indicate they're expected to be always up. "
"This affects retry interval, error reporting and monitoring. " "This affects retry interval, error reporting and monitoring. "
"Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL"
) )
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, auth_file=None): def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None):
qualities = qualities.split(",") if qualities else [] qualities = qualities.split(",") if qualities else []
auth_token = None twitch_auth_token = None
if auth_file is not None: if twitch_auth_file is not None:
with open(auth_file) as f: with open(twitch_auth_file) as f:
auth_token = f.read().strip() twitch_auth_token = f.read().strip()
managers = [ managers = []
StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!'), auth_token=auth_token) for channel, important, type, url in channels:
for channel in channels if type == "twitch":
] provider = TwitchProvider(channel, auth_token=twitch_auth_token)
else:
raise ValueError(f"Unknown type {type!r}")
manager = StreamsManager(provider, channel, base_dir, qualities, important=important)
managers.append(manager)
def stop(): def stop():
for manager in managers: for manager in managers:

@ -7,10 +7,39 @@ from common.requests import InstrumentedSession
from . import hls_playlist from . import hls_playlist
logger = logging.getLogger(__name__) class Provider:
"""Base class with defaults, to be overriden for specific providers"""
# How long (in seconds) we should keep using a media playlist URI before getting a new one.
# This matters because some providers set an expiry on the URI they give you.
# However the default is an arbitrarily long period (ie. never).
MAX_WORKER_AGE = 30 * 24 * 60 * 60 # 30 days
def get_media_playlist_uris(self, qualities, session=None):
"""Fetches master playlist and returns {quality: media playlist URI} for each
requested quality."""
raise NotImplementedError
def get_media_playlist(self, uri, session=None):
"""Fetches the given media playlist. In most cases this is just a simple fetch
and doesn't need to be overriden."""
if session is None:
session = InstrumentedSession()
resp = session.get(uri, metric_name='get_media_playlist')
resp.raise_for_status()
return hls_playlist.load(resp.text, base_uri=resp.url)
class TwitchProvider(Provider):
"""Provider that takes a twitch channel."""
# Twitch links expire after 24h, so roll workers at 20h
MAX_WORKER_AGE = 20 * 60 * 60
def get_access_token(channel, session, auth_token): def __init__(self, channel, auth_token=None):
self.channel = channel
self.auth_token = auth_token
def get_access_token(self, session):
request = { request = {
"operationName": "PlaybackAccessToken", "operationName": "PlaybackAccessToken",
"extensions": { "extensions": {
@ -21,33 +50,29 @@ def get_access_token(channel, session, auth_token):
}, },
"variables": { "variables": {
"isLive": True, "isLive": True,
"login": channel, "login": self.channel,
"isVod": False, "isVod": False,
"vodID": "", "vodID": "",
"playerType": "site" "playerType": "site"
} }
} }
headers = {'Client-ID': 'kimne78kx3ncx6brgo4mv6wki5h1ko'} headers = {'Client-ID': 'kimne78kx3ncx6brgo4mv6wki5h1ko'}
if auth_token is not None: if self.auth_token is not None:
headers["Authorization"] = "OAuth {}".format(auth_token) headers["Authorization"] = "OAuth {}".format(self.auth_token)
resp = session.post( resp = session.post(
"https://gql.twitch.tv/gql", "https://gql.twitch.tv/gql",
json=request, json=request,
headers=headers, headers=headers,
metric_name='get_access_token', metric_name='twitch_get_access_token',
) )
resp.raise_for_status() resp.raise_for_status()
data = resp.json()["data"]["streamPlaybackAccessToken"] data = resp.json()["data"]["streamPlaybackAccessToken"]
return data['signature'], data['value'] return data['signature'], data['value']
def get_master_playlist(self, session):
def get_master_playlist(channel, session=None, auth_token=None): sig, token = self.get_access_token(session)
"""Get the master playlist for given channel from twitch"""
if session is None:
session = InstrumentedSession()
sig, token = get_access_token(channel, session, auth_token)
resp = session.get( resp = session.get(
"https://usher.ttvnw.net/api/channel/hls/{}.m3u8".format(channel), "https://usher.ttvnw.net/api/channel/hls/{}.m3u8".format(self.channel),
headers={ headers={
"referer": "https://player.twitch.tv", "referer": "https://player.twitch.tv",
"origin": "https://player.twitch.tv", "origin": "https://player.twitch.tv",
@ -64,19 +89,13 @@ def get_master_playlist(channel, session=None, auth_token=None):
"sig": sig, "sig": sig,
"token": token, "token": token,
}, },
metric_name='get_master_playlist', metric_name='twitch_get_master_playlist',
) )
resp.raise_for_status() # getting master playlist resp.raise_for_status() # getting master playlist
playlist = hls_playlist.load(resp.text, base_uri=resp.url) playlist = hls_playlist.load(resp.text, base_uri=resp.url)
return playlist return playlist
def get_media_playlist_uris(self, target_qualities, session=None):
def get_media_playlist_uris(master_playlist, target_qualities):
"""From a master playlist, extract URIs of media playlists of interest.
Returns {stream name: uri}.
Note this is not a general method for all HLS streams, and makes twitch-specific assumptions,
though we try to check and emit warnings if these assumptions are broken.
"""
# Twitch master playlists are observed to have the following form: # Twitch master playlists are observed to have the following form:
# The first listed variant is the source playlist and has "(source)" in the name. # The first listed variant is the source playlist and has "(source)" in the name.
# Other variants are listed in order of quality from highest to lowest, followed by audio_only. # Other variants are listed in order of quality from highest to lowest, followed by audio_only.
@ -88,6 +107,12 @@ def get_media_playlist_uris(master_playlist, target_qualities):
# Streams without transcoding options only show source and audio_only. # Streams without transcoding options only show source and audio_only.
# We return the source stream in addition to any in target_qualities that is found. # We return the source stream in addition to any in target_qualities that is found.
logger = logging.getLogger("twitch")
if session is None:
session = InstrumentedSession()
master_playlist = self.get_master_playlist(session)
def variant_name(variant): def variant_name(variant):
names = set(media.name for media in variant.media if media.type == "VIDEO" and media.name) names = set(media.name for media in variant.media if media.type == "VIDEO" and media.name)
if not names: if not names:
@ -117,11 +142,3 @@ def get_media_playlist_uris(master_playlist, target_qualities):
variants["source"] = source variants["source"] = source
return {name: variant.uri for name, variant in variants.items()} return {name: variant.uri for name, variant in variants.items()}
def get_media_playlist(uri, session=None):
if session is None:
session = InstrumentedSession()
resp = session.get(uri, metric_name='get_media_playlist')
resp.raise_for_status()
return hls_playlist.load(resp.text, base_uri=resp.url)

Loading…
Cancel
Save