From 05349a62df627b69660bd46fb3ba1b5b17cd460a Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 8 Nov 2023 22:51:47 +1100 Subject: [PATCH] downloader: Prepend any HLS "map" data to every segment In some formats, most notably DASH, there is a "initialization data" that is required in order to play the segment. The data is common to all segments so it is served as a seperate URL under EXT-X-MAP. However, redundant copies of this data are benign and it's very small, so we just put it in front of EVERY segment so that we can play every one independently (but concatenating them still works). We use a very simple cache to avoid downloading it again for every segment. --- downloader/downloader/hls_playlist.py | 2 +- downloader/downloader/main.py | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/downloader/downloader/hls_playlist.py b/downloader/downloader/hls_playlist.py index b05f9f8..96163f1 100644 --- a/downloader/downloader/hls_playlist.py +++ b/downloader/downloader/hls_playlist.py @@ -253,7 +253,7 @@ class M3U8Parser(object): elif line.startswith("#EXT-X-MAP"): attr = self.parse_tag(line, self.parse_attributes) byterange = self.parse_byterange(attr.get("BYTERANGE", "")) - self.state["map"] = Map(attr.get("URI"), byterange) + self.state["map"] = Map(self.uri(attr.get("URI")), byterange) elif line.startswith("#EXT-X-I-FRAME-STREAM-INF"): attr = self.parse_tag(line, self.parse_attributes) streaminf = self.state.pop("streaminf", attr) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index d7ea2a8..d434836 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -305,6 +305,11 @@ class StreamWorker(object): # This worker's SegmentGetters will use its session by default for performance, # but will fall back to a new one if something goes wrong. self.session = common.requests.InstrumentedSession() + # Map cache is a simple cache to avoid re-downloading the same map URI for every segment, + # since it's generally the same but may occasionally change. + # We expect the map data to be very small so there is no eviction here. + # {uri: data} + self.map_cache = {} def __repr__(self): return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) @@ -387,6 +392,7 @@ class StreamWorker(object): self.quality, segment, date, + self.map_cache, ) gevent.spawn(self.getters[segment.uri].run) if date is not None: @@ -442,13 +448,14 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.quality = quality self.segment = segment self.date = date + self.map_cache = map_cache self.prefix = self.make_path_prefix() self.retry = None # Event, set to begin retrying self.done = gevent.event.Event() # set when file exists or we give up @@ -554,6 +561,15 @@ class SegmentGetter(object): try: self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) start_time = monotonic() + map_data = None + if self.segment.map and self.segment.map.uri: + # Usage of map cache is racy here as multiple SegmentGetters can fill it at once, but we don't care. + if self.segment.map.uri not in self.map_cache: + with soft_hard_timeout(self.logger, "getting map data", self.FETCH_HEADERS_TIMEOUTS, retry.set): + resp = self.session.get(self.segment.map.uri, metric_name='get_map_data') + resp.raise_for_status() + self.map_cache[self.segment.map.uri] = resp.content + map_data = self.map_cache[self.segment.map.uri] with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment') @@ -566,6 +582,8 @@ class SegmentGetter(object): common.ensure_directory(temp_path) with open(temp_path, 'wb') as f: file_created = True + if map_data is not None: + common.writeall(f.write, map_data) # We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway, # we may lose part of the last chunk even though we did receive it. # This is a small enough amount of data that we don't really care.