diff --git a/downloader/downloader/hls_playlist.py b/downloader/downloader/hls_playlist.py index b05f9f8..96163f1 100644 --- a/downloader/downloader/hls_playlist.py +++ b/downloader/downloader/hls_playlist.py @@ -253,7 +253,7 @@ class M3U8Parser(object): elif line.startswith("#EXT-X-MAP"): attr = self.parse_tag(line, self.parse_attributes) byterange = self.parse_byterange(attr.get("BYTERANGE", "")) - self.state["map"] = Map(attr.get("URI"), byterange) + self.state["map"] = Map(self.uri(attr.get("URI")), byterange) elif line.startswith("#EXT-X-I-FRAME-STREAM-INF"): attr = self.parse_tag(line, self.parse_attributes) streaminf = self.state.pop("streaminf", attr) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index d7ea2a8..d434836 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -305,6 +305,11 @@ class StreamWorker(object): # This worker's SegmentGetters will use its session by default for performance, # but will fall back to a new one if something goes wrong. self.session = common.requests.InstrumentedSession() + # Map cache is a simple cache to avoid re-downloading the same map URI for every segment, + # since it's generally the same but may occasionally change. + # We expect the map data to be very small so there is no eviction here. + # {uri: data} + self.map_cache = {} def __repr__(self): return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) @@ -387,6 +392,7 @@ class StreamWorker(object): self.quality, segment, date, + self.map_cache, ) gevent.spawn(self.getters[segment.uri].run) if date is not None: @@ -442,13 +448,14 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.quality = quality self.segment = segment self.date = date + self.map_cache = map_cache self.prefix = self.make_path_prefix() self.retry = None # Event, set to begin retrying self.done = gevent.event.Event() # set when file exists or we give up @@ -554,6 +561,15 @@ class SegmentGetter(object): try: self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) start_time = monotonic() + map_data = None + if self.segment.map and self.segment.map.uri: + # Usage of map cache is racy here as multiple SegmentGetters can fill it at once, but we don't care. + if self.segment.map.uri not in self.map_cache: + with soft_hard_timeout(self.logger, "getting map data", self.FETCH_HEADERS_TIMEOUTS, retry.set): + resp = self.session.get(self.segment.map.uri, metric_name='get_map_data') + resp.raise_for_status() + self.map_cache[self.segment.map.uri] = resp.content + map_data = self.map_cache[self.segment.map.uri] with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment') @@ -566,6 +582,8 @@ class SegmentGetter(object): common.ensure_directory(temp_path) with open(temp_path, 'wb') as f: file_created = True + if map_data is not None: + common.writeall(f.write, map_data) # We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway, # we may lose part of the last chunk even though we did receive it. # This is a small enough amount of data that we don't really care.