diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index 51cd584..f1b0ed2 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -320,7 +320,8 @@ ] + if $.downloader_creds_file != null then ["--twitch-auth-file", "/token"] else [], // Mount the segments directory at /mnt volumes: ["%s:/mnt" % $.segments_path] - + if $.downloader_creds_file != null then ["%s:/token" % $.downloader_creds_file] else [], + + if $.downloader_creds_file != null then ["%s:/token" % $.downloader_creds_file] else [] + + if $.local_path != null then ["%s:/local" % $.local_path] else [], // If the application crashes, restart it. restart: "on-failure", // Expose on the configured host port by mapping that port to the default @@ -534,7 +535,8 @@ if $.nginx_serve_segments then "%s:/mnt" % $.segments_path, if $.ssl_certificate_path != null then "%s:/certs.pem" % $.ssl_certificate_path, if $.thrimbletrimmer_web_dev_path != null then "%s:/etc/nginx/html/thrimbletrimmer" % $.thrimbletrimmer_web_dev_path, - ]), + ]) + + if $.local_path != null then ["%s:/local" % $.local_path] else [], }, [if $.enabled.postgres then "postgres"]: { diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 3b9ae49..871e628 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -21,7 +21,7 @@ import common import common.dateutil import common.requests -from .providers import URLProvider, TwitchProvider, YoutubeProvider +from .providers import URLProvider, TwitchProvider, YoutubeProvider, LocalProvider segments_downloaded = prom.Counter( @@ -384,16 +384,25 @@ class StreamWorker(object): if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") - self.getters[segment.uri] = SegmentGetter( - self.logger, - self.session, - self.manager.base_dir, - self.manager.channel, - self.quality, - segment, - date, - self.map_cache, - ) + if isinstance(self.manager.provider, LocalProvider): + self.getters[segment.uri] = SegmentLinker( + self.logger, + self.manager.base_dir, + self.manager.channel, + segment, + date, + ) + else: + self.getters[segment.uri] = SegmentGetter( + self.logger, + self.session, + self.manager.base_dir, + self.manager.channel, + self.quality, + segment, + date, + self.map_cache, + ) gevent.spawn(self.getters[segment.uri].run) if date is not None: date += datetime.timedelta(seconds=segment.duration) @@ -649,6 +658,9 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor elif type == "youtube": provider = YoutubeProvider(url) channel_qualities = ["source"] + elif type == "local": + provider = LocalProvider(url) + channel_qualities = ["source"] else: raise ValueError(f"Unknown type {type!r}") manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important) @@ -687,3 +699,50 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor worker.get() # re-raise error if failed logging.info("Gracefully stopped") + +class SegmentLinker(SegmentGetter): + def __init__(self, parent_logger, base_dir, channel, segment, date): + self.logger = parent_logger.getChild("SegmentLinker@{:x}".format(id(self))) + self.base_dir = base_dir + self.channel = channel + self.segment = segment + self.date = date + self.prefix = self.make_path_prefix() + self.retry = None + self.done = gevent.event.Event() + # Our parent's connection pool, but we'll replace it if there's any issues + + def make_path_prefix(self): + return os.path.join( + self.base_dir, + 'local', + 'source', + self.date.strftime("%Y-%m-%dT%H"), + "{date}-{duration}".format( + date=self.date.strftime("%M:%S.%f"), + duration=self.segment.duration, + ), + ) + + def make_path(self, hash=None): + """Generate filepath for the segment. + """ + arg = b64encode(hash.digest(), b"-_").decode().rstrip("=") if hash else str(uuid.uuid4()) + return "{}-full-{}.ts".format(self.prefix, arg) + + def _get_segment(self): + hash = hashlib.sha256() + with open(self.segment.uri, 'rb') as f: + while chunk := f.read(8192): + hash.update(chunk) + + dest_path = self.make_path(hash) + common.ensure_directory(dest_path) + self.logger.debug("Linking segment: {} to {}".format(dest_path, self.segment.uri)) + os.symlink(self.segment.uri, dest_path) + + segments_downloaded.labels(type="full", channel=self.channel, quality="source").inc() + segment_duration_downloaded.labels(type="full", channel=self.channel, quality="source").inc(self.segment.duration) + stat = latest_segment.labels(channel=self.channel, quality="source") + timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() + stat.set(max(stat._value.get(), timestamp)) diff --git a/downloader/downloader/providers.py b/downloader/downloader/providers.py index c3c2a02..8d15022 100644 --- a/downloader/downloader/providers.py +++ b/downloader/downloader/providers.py @@ -3,6 +3,7 @@ import json import logging import random import subprocess +import os from common.requests import InstrumentedSession @@ -195,3 +196,26 @@ class TwitchProvider(Provider): variants["source"] = source return {name: variant.uri for name, variant in variants.items()} + +class LocalProvider(Provider): + """Provider that uses a local m3u8 recording. + """ + def __init__(self, directory): + self.directory = directory + + def get_media_playlist(self, uri, session=None): + with open(uri) as f: + playlist = hls_playlist.load(f.read().strip(), uri) + return playlist + + def get_media_playlist_uris(self, qualities, session=None): + if qualities != ["source"]: + raise ValueError("Cannot provide non-source qualities") + + files = os.listdir(self.directory) + + for file in reversed(files): + if file.endswith(".m3u8"): + return {"source": os.path.join(self.directory, file)} + + raise ValueError("Can't find playlist (m3u8) file")