downloader: Prepend any HLS "map" data to every segment

In some formats, most notably DASH, there is a "initialization data" that is required
in order to play the segment. The data is common to all segments so it is served as a seperate URL
under EXT-X-MAP. However, redundant copies of this data are benign and it's very small, so
we just put it in front of EVERY segment so that we can play every one independently (but
concatenating them still works).

We use a very simple cache to avoid downloading it again for every segment.
pull/355/head
Mike Lang 1 year ago committed by Mike Lang
parent e4252544be
commit 05349a62df

@ -253,7 +253,7 @@ class M3U8Parser(object):
elif line.startswith("#EXT-X-MAP"): elif line.startswith("#EXT-X-MAP"):
attr = self.parse_tag(line, self.parse_attributes) attr = self.parse_tag(line, self.parse_attributes)
byterange = self.parse_byterange(attr.get("BYTERANGE", "")) byterange = self.parse_byterange(attr.get("BYTERANGE", ""))
self.state["map"] = Map(attr.get("URI"), byterange) self.state["map"] = Map(self.uri(attr.get("URI")), byterange)
elif line.startswith("#EXT-X-I-FRAME-STREAM-INF"): elif line.startswith("#EXT-X-I-FRAME-STREAM-INF"):
attr = self.parse_tag(line, self.parse_attributes) attr = self.parse_tag(line, self.parse_attributes)
streaminf = self.state.pop("streaminf", attr) streaminf = self.state.pop("streaminf", attr)

@ -305,6 +305,11 @@ class StreamWorker(object):
# This worker's SegmentGetters will use its session by default for performance, # This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong. # but will fall back to a new one if something goes wrong.
self.session = common.requests.InstrumentedSession() self.session = common.requests.InstrumentedSession()
# Map cache is a simple cache to avoid re-downloading the same map URI for every segment,
# since it's generally the same but may occasionally change.
# We expect the map data to be very small so there is no eviction here.
# {uri: data}
self.map_cache = {}
def __repr__(self): def __repr__(self):
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
@ -387,6 +392,7 @@ class StreamWorker(object):
self.quality, self.quality,
segment, segment,
date, date,
self.map_cache,
) )
gevent.spawn(self.getters[segment.uri].run) gevent.spawn(self.getters[segment.uri].run)
if date is not None: if date is not None:
@ -442,13 +448,14 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that. # or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60 GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date): def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir self.base_dir = base_dir
self.channel = channel self.channel = channel
self.quality = quality self.quality = quality
self.segment = segment self.segment = segment
self.date = date self.date = date
self.map_cache = map_cache
self.prefix = self.make_path_prefix() self.prefix = self.make_path_prefix()
self.retry = None # Event, set to begin retrying self.retry = None # Event, set to begin retrying
self.done = gevent.event.Event() # set when file exists or we give up self.done = gevent.event.Event() # set when file exists or we give up
@ -554,6 +561,15 @@ class SegmentGetter(object):
try: try:
self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path))
start_time = monotonic() start_time = monotonic()
map_data = None
if self.segment.map and self.segment.map.uri:
# Usage of map cache is racy here as multiple SegmentGetters can fill it at once, but we don't care.
if self.segment.map.uri not in self.map_cache:
with soft_hard_timeout(self.logger, "getting map data", self.FETCH_HEADERS_TIMEOUTS, retry.set):
resp = self.session.get(self.segment.map.uri, metric_name='get_map_data')
resp.raise_for_status()
self.map_cache[self.segment.map.uri] = resp.content
map_data = self.map_cache[self.segment.map.uri]
with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set):
with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set):
resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment') resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment')
@ -566,6 +582,8 @@ class SegmentGetter(object):
common.ensure_directory(temp_path) common.ensure_directory(temp_path)
with open(temp_path, 'wb') as f: with open(temp_path, 'wb') as f:
file_created = True file_created = True
if map_data is not None:
common.writeall(f.write, map_data)
# We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway, # We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway,
# we may lose part of the last chunk even though we did receive it. # we may lose part of the last chunk even though we did receive it.
# This is a small enough amount of data that we don't really care. # This is a small enough amount of data that we don't really care.

Loading…
Cancel
Save