From 0df8288013078eb9a861f4bf501f59f90f0738d6 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 11 Dec 2018 04:07:40 -0800 Subject: [PATCH 01/12] common: Implement code for parsing paths and picking the best sequence of segments This is needed by both the restreamer and the cutter, hence its inclusion in common. The algorithm is pretty simple - it takes the 'best' segment per start time by full first, then length of partial. All the other complexity is mainly just around detecting and reporting holes, and being inclusive of start/end points. --- common/common.py | 194 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) diff --git a/common/common.py b/common/common.py index dfce51e..18c40db 100644 --- a/common/common.py +++ b/common/common.py @@ -2,7 +2,14 @@ """A place for common utilities between wubloader components""" +import base64 import datetime +import errno +import itertools +import logging +import os +import sys +from collections import namedtuple import dateutil.parser import yaml @@ -73,3 +80,190 @@ def format_bustime(bustime, round="millisecond"): else: raise ValueError("Bad rounding value: {!r}".format(round)) return sign + ":".join(parts) + + +def unpadded_b64_decode(s): + """Decode base64-encoded string that has had its padding removed""" + # right-pad with '=' to multiple of 4 + s = s + '=' * (- len(s) % 4) + return base64.b64decode(s, "-_") + + +class SegmentInfo( + namedtuple('SegmentInfoBase', [ + 'path', 'stream', 'variant', 'start', 'duration', 'is_partial', 'hash' + ]) +): + """Info parsed from a segment path, including original path.""" + @property + def end(self): + return self.start + self.duration + + +def parse_segment_path(path): + """Parse segment path, returning a SegmentInfo. If path is only the trailing part, + eg. just a filename, it will leave unknown fields as None.""" + parts = path.split('/') + # left-pad parts with None up to 4 parts + parts = [None] * (4 - len(parts)) + parts + # pull info out of path parts + stream, variant, hour, filename = parts[-4:] + # split filename, which should be TIME-DURATION-TYPE-HASH.ts + try: + if not filename.endswith('.ts'): + raise ValueError("Does not end in .ts") + filename = filename[:-len('.ts')] # chop off .ts + parts = filename.split('-', 3) + if len(parts) != 4: + raise ValueError("Not enough dashes in filename") + time, duration, type, hash = parts + if type not in ('full', 'partial'): + raise ValueError("Unknown type {!r}".format(type)) + return SegmentInfo( + path = path, + stream = stream, + variant = variant, + start = dateutil.parser.parse("{}:{}".format(hour, time)), + duration = datetime.timedelta(seconds=float(duration)), + is_partial = type == "partial", + hash = unpadded_b64_decode(hash), + ) + except ValueError as e: + # wrap error but preserve original traceback + _, _, tb = sys.exc_info() + raise ValueError, ValueError("Bad path {!r}: {}".format(path, e)), tb + + +def get_best_segments(hours_path, start, end): + """Return a list of the best sequence of non-overlapping segments + we have for a given time range. Hours path should be the directory containing hour directories. + The first segment may start before the time range, and the last may end after it. + The returned list contains items that are either: + SegmentInfo: a segment + None: represents a discontinuity between the previous segment and the next one. + ie. as long as two segments appear next to each other, we guarentee there is no gap between + them, the second one starts right as the first one finishes. + Similarly, unless the first item is None, the first segment starts <= the start of the time + range, and unless the last item is None, the last segment ends >= the end of the time range. + Example: + Suppose you ask for a time range from 10 to 60. We have 10-second segments covering + the following times: + 5 to 15 + 15 to 25 + 30 to 40 + 40 to 50 + Then the output would look like: + segment from 5 to 15 + segment from 15 to 25 + None, as the previous segment ends 5sec before the next one begins + segment from 30 to 40 + segment from 40 to 50 + None, as the previous segment ends 10sec before the requested end time of 60. + Note that any is_partial=True segment will be followed by a None, since we can't guarentee + it joins on to the next segment fully intact. + """ + # Note: The exact equality checks in this function are not vulnerable to floating point error, + # but only because all input dates and durations are only precise to the millisecond, and + # python's datetime types represent these as integer microseconds internally. So the parsing + # to these types is exact, and all operations on them are exact, so all operations are exact. + + result = [] + + for hour in hour_paths_for_range(hours_path, start, end): + # best_segments_by_start will give us the best available segment for each unique start time + for segment in best_segments_by_start(hour): + + # special case: first segment + if not result: + # first segment is allowed to be before start as long as it includes it + if segment.start <= start < segment.end: + # segment covers start + result.append(segment) + elif start < segment.start < end: + # segment is after start (but before end), so there was no segment that covers start + # so we begin with a None + result.append(None) + result.append(segment) + else: + # segment is before start, and doesn't cover start, or starts after end. + # ignore and go to next. + continue + else: + # normal case: check against previous segment end time + prev_end = result[-1].end + if segment.start < prev_end: + # Overlap! This shouldn't happen, though it might be possible due to weirdness + # if the stream drops then starts again quickly. We simply ignore the overlapping + # segment and let the algorithm continue. + logging.warning("Overlapping segments: {} overlaps end of {}".format(segment, result[-1])) + continue + if result[-1].is_partial or prev_end < segment.start: + # there's a gap between prev end and this start, so add a None + result.append(None) + result.append(segment) + + # check if we've reached the end + if end <= segment.end: + break + + # this is a weird little construct that says "if we broke from the inner loop, + # then also break from the outer one. otherwise continue." + else: + continue + break + + # check if we need a trailing None because last segment is partial or doesn't reach end + if result and (result[-1].is_partial or result[-1].end < end): + result.append(None) + + return result + + +def hour_paths_for_range(hours_path, start, end): + """Generate a list of hour paths to check when looking for segments between start and end.""" + # truncate start and end to the hour + def truncate(dt): + return dt.replace(microsecond=0, second=0, minute=0) + current = truncate(start) + end = truncate(end) + # Begin in the hour prior to start, as there may be a segment that starts in that hour + # but contains the start time, eg. if the start time is 01:00:01 and there's a segment + # at 00:59:59 which goes for 3 seconds. + # Checking the entire hour when in most cases it won't be needed is wasteful, but it's also + # pretty quick and the complexity of only checking this case when needed just isn't worth it. + current -= datetime.timedelta(hours=1) + while current <= end: + yield os.path.join(hours_path, current.strftime("%Y-%m-%dT%H")) + current += datetime.timedelta(hours=1) + + +def best_segments_by_start(hour): + """Within a given hour path, yield the "best" segment per unique segment start time. + Best is defined as non-partial, or failing that the longest partial. + Note this means this function may perform os.stat()s in order to find the longest partial. + """ + try: + segment_paths = os.listdir(hour) + except OSError as e: + if e.errno != errno.ENOENT: + raise + # path does not exist, treat it as having no files + return + segment_paths.sort() + # note we only parse them as we need them, which is unlikely to save us much time overall + # but is easy enough to do, so we might as well. + parsed = (parse_segment_path(os.path.join(hour, name)) for name in segment_paths) + for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start): + segments = list(segments) + full_segments = [segment for segment in segments if not segment.is_partial] + if full_segments: + if len(full_segments) != 1: + logging.warning("Multiple versions of full segment at start_time {}: {}".format( + start_time, ", ".join(map(str, segments)) + )) + # we have to pick one, so might as well make it consistent by sorting by path + full_segments.sort(key=lambda segment: segment.path) + yield full_segments[0] + continue + # no full segments, fall back to measuring partials. + yield max(segments, key=lambda segment: os.stat(segment.path).st_size) From ee8f8f6571590abe43df4eaaa402f00dff3b661e Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 9 Dec 2018 02:52:02 -0800 Subject: [PATCH 02/12] restreamer: Initial skeleton --- restreamer/Dockerfile | 15 +++++++++++++++ restreamer/restreamer/__init__.py | 0 restreamer/restreamer/__main__.py | 0 restreamer/setup.py | 11 +++++++++++ 4 files changed, 26 insertions(+) create mode 100644 restreamer/Dockerfile create mode 100644 restreamer/restreamer/__init__.py create mode 100644 restreamer/restreamer/__main__.py create mode 100644 restreamer/setup.py diff --git a/restreamer/Dockerfile b/restreamer/Dockerfile new file mode 100644 index 0000000..84650bb --- /dev/null +++ b/restreamer/Dockerfile @@ -0,0 +1,15 @@ +FROM alpine:3.7 +RUN apk --update add py2-pip + +# Install common lib first as it changes less +COPY common /tmp/common +RUN pip install /tmp/common && rm -r /tmp/common + +# Install actual application +COPY restreamer /tmp/restreamer +RUN pip install /tmp/restreamer && rm -r /tmp/restreamer + +# Add config file last as it changes most +COPY config.yaml /etc/wubloader.yaml + +CMD python2 -m restreamer diff --git a/restreamer/restreamer/__init__.py b/restreamer/restreamer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/restreamer/restreamer/__main__.py b/restreamer/restreamer/__main__.py new file mode 100644 index 0000000..e69de29 diff --git a/restreamer/setup.py b/restreamer/setup.py new file mode 100644 index 0000000..47a450f --- /dev/null +++ b/restreamer/setup.py @@ -0,0 +1,11 @@ +from setuptools import setup, find_packages + +setup( + name = "wubloader-restreamer", + version = "0.0.0", + packages = find_packages(), + install_requires = [ + "requests", + "wubloader-common", + ], +) From bab2d15d6eabaf43c9b06388e87d17358a07680e Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 9 Dec 2018 17:27:08 -0800 Subject: [PATCH 03/12] Initial implementation of the restreamer Supports serving segments, listing segments for an hour, and generating playlists so it can stream. --- restreamer/restreamer/__main__.py | 14 +++++ restreamer/restreamer/generate_hls.py | 16 +++++ restreamer/restreamer/main.py | 84 +++++++++++++++++++++++++++ 3 files changed, 114 insertions(+) create mode 100644 restreamer/restreamer/generate_hls.py create mode 100644 restreamer/restreamer/main.py diff --git a/restreamer/restreamer/__main__.py b/restreamer/restreamer/__main__.py index e69de29..24935eb 100644 --- a/restreamer/restreamer/__main__.py +++ b/restreamer/restreamer/__main__.py @@ -0,0 +1,14 @@ + +import gevent.monkey +gevent.monkey.patch_all() + +import logging + +import argh + +from restreamer.main import main + +LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" + +logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) +argh.dispatch_command(main) diff --git a/restreamer/restreamer/generate_hls.py b/restreamer/restreamer/generate_hls.py new file mode 100644 index 0000000..242b763 --- /dev/null +++ b/restreamer/restreamer/generate_hls.py @@ -0,0 +1,16 @@ + + +def generate_master(playlists): + """Generate master playlist. Playlists arg should be a map {name: url}. + Little validation or encoding is done - please try to keep the names valid + without escaping. + """ + lines = ["#EXTM3U"] + for name, url in playlists.items(): + lines += [ + # We name each variant with a VIDEO rendition with no url + '#EXT-X-MEDIA:TYPE=VIDEO,GROUP-ID="{name}",NAME="{name}",AUTOSELECT=YES,DEFAULT=YES'.format(name=name), + '#EXT-X-STREAM-INF:VIDEO="{name}"'.format(name=name), + url, + ] + return "\n".join(lines) + '\n' diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py new file mode 100644 index 0000000..596c846 --- /dev/null +++ b/restreamer/restreamer/main.py @@ -0,0 +1,84 @@ + +import errno +import json +import os + +from flask import Flask, url_for, request, abort +from gevent.pywsgi import WSGIServer + +import generate_hls + + +app = Flask('restreamer', static_url_path='/segments') + + +""" +The restreamer is a simple http api for listing available segments and generating +HLS playlists for them. + +The segments themselves are ideally to be served by some external webserver +under "/segments////" (ie. with BASE_DIR under "/segments"), +though this server will also serve them if requested. +""" + + +def listdir(path, error=True): + """List files in path, excluding hidden files. + Behaviour when path doesn't exist depends on error arg. + If error is True, raise 404. Otherwise, return []. + """ + try: + return [name for name in os.listdir(path) if not name.startswith('.')] + except OSError as e: + if e.errno != errno.ENOENT: + raise + if error: + abort(404) + return [] + + +@app.route('/files///') +def list_segments(stream, variant, hour): + """Returns a JSON list of segment files for a given stream, variant and hour. + Returns empty list on non-existant streams, etc. + """ + # Check no-one's being sneaky with path traversal or hidden folders + if any(arg.startswith('.') for arg in (stream, variant, hour)): + return "Parts may not start with period", 403 + path = os.path.join( + app.static_folder, + stream, + variant, + hour, + ) + return json.dumps(listdir(path, error=False)) + + +@app.route('/playlist/.m3u8') +def generate_master_playlist(stream): + """Returns a HLS master playlist for the given stream. + Takes optional params: + start, end: The time to begin and end the stream at. + See generate_media_playlist for details. + """ + # path traversal / hidden folders + if stream.startswith('.'): + return "Stream may not start with period", 403 + variants = listdir(os.path.join(app.static_folder, stream)) + playlists = { + variant: url_for('generate_media_playlist', stream=stream, variant=variant, **request.args) + for variant in variants + } + return generate_hls.generate_master(playlists) + + +@app.route('/playlist//.m3u8') +def generate_media_playlist(stream, variant): + # TODO + return "Not Implemented", 501 + + +def main(host='0.0.0.0', port=8000, base_dir='.'): + app.static_folder = base_dir + server = WSGIServer((host, port), app) + server.serve_forever() From 9e115f8a42f486692821f4e61a7d9dea024b3167 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 9 Dec 2018 17:33:24 -0800 Subject: [PATCH 04/12] restreamer: Also add ability to list known hours so we know where to start replicating from --- restreamer/restreamer/main.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 596c846..084d580 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -37,6 +37,22 @@ def listdir(path, error=True): return [] +@app.route('/files//') +def list_hours(stream, variant): + """Returns a JSON list of hours for the given stream and variant for which + there may be segments available. Returns empty list on non-existent streams, etc. + """ + # Check no-one's being sneaky with path traversal or hidden folders + if any(arg.startswith('.') for arg in (stream, variant)): + return "Parts may not start with period", 403 + path = os.path.join( + app.static_folder, + stream, + variant, + ) + return json.dumps(listdir(path, error=False)) + + @app.route('/files///') def list_segments(stream, variant, hour): """Returns a JSON list of segment files for a given stream, variant and hour. From a1fa60828d9d00873f6e91dd34dac596895cbd6e Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 24 Dec 2018 17:28:59 -0800 Subject: [PATCH 05/12] Basic media playlist generation, missing special cases --- restreamer/restreamer/generate_hls.py | 19 +++++++++++++++++++ restreamer/restreamer/main.py | 19 +++++++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/restreamer/restreamer/generate_hls.py b/restreamer/restreamer/generate_hls.py index 242b763..8e9d09e 100644 --- a/restreamer/restreamer/generate_hls.py +++ b/restreamer/restreamer/generate_hls.py @@ -1,3 +1,5 @@ +import os +import urllib def generate_master(playlists): @@ -14,3 +16,20 @@ def generate_master(playlists): url, ] return "\n".join(lines) + '\n' + + +def generate_media(segments, base_url): + """Generate a media playlist from a list of segments as returned by common.get_best_segments(). + Segments are specified as hour/name.ts relative to base_url. + """ + lines = [ + "#EXTM3U", + "#EXT-X-TARGETDURATION:6", + ] + for segment in segments: + # TODO handle missing bits, stream endings, other stuff + if segment is not None: + path = '/'.join(segment.path.split('/')[-2:]) + lines.append("#EXTINF:{:.3f},live".format(segment.duration.total_seconds())) + lines.append(urllib.quote(os.path.join(base_url, path))) + return "\n".join(lines) + '\n' diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 084d580..8e25aba 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -6,6 +6,10 @@ import os from flask import Flask, url_for, request, abort from gevent.pywsgi import WSGIServer +import dateutil.parser + +from common import get_best_segments + import generate_hls @@ -90,8 +94,19 @@ def generate_master_playlist(stream): @app.route('/playlist//.m3u8') def generate_media_playlist(stream, variant): - # TODO - return "Not Implemented", 501 + # path traversal / hidden folders + if stream.startswith('.'): + return "Stream may not start with period", 403 + if variant.startswith('.'): + return "Variant may not start with period", 403 + #TODO handle no start/end + #TODO error handling of args + # TODO lots of other stuff + start = dateutil.parser.parse(request.args['start']) + end = dateutil.parser.parse(request.args['end']) + hours_path = os.path.join(app.static_folder, stream, variant) + segments = get_best_segments(hours_path, start, end) + return generate_hls.generate_media(segments, os.path.join(app.static_url_path, stream, variant)) def main(host='0.0.0.0', port=8000, base_dir='.'): From 5942091d1af896bfadb81bc5f487a16509cfe1b7 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 24 Dec 2018 20:25:05 -0800 Subject: [PATCH 06/12] restreamer: Cleanup around argument processing --- restreamer/restreamer/main.py | 37 ++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 8e25aba..c5bc873 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -1,13 +1,13 @@ import errno +import functools import json import os +import dateutil.parser from flask import Flask, url_for, request, abort from gevent.pywsgi import WSGIServer -import dateutil.parser - from common import get_best_segments import generate_hls @@ -41,14 +41,27 @@ def listdir(path, error=True): return [] +def has_path_args(fn): + """Decorator to wrap routes which take args which are to be used as parts of a filepath. + Disallows hidden folders and path traversal, and converts unicode to bytes. + """ + @functools.wraps(fn) + def _has_path_args(**kwargs): + kwargs = {key: value.encode('utf-8') for key, value in kwargs.items()} + for key, value in kwargs.items(): + # Disallowing a leading . prevents both hidden files and path traversal ("..") + if value.startswith('.'): + return "Bad {}: May not start with a period".format(key), 403 + return fn(**kwargs) + return _has_path_args + + @app.route('/files//') +@has_path_args def list_hours(stream, variant): """Returns a JSON list of hours for the given stream and variant for which there may be segments available. Returns empty list on non-existent streams, etc. """ - # Check no-one's being sneaky with path traversal or hidden folders - if any(arg.startswith('.') for arg in (stream, variant)): - return "Parts may not start with period", 403 path = os.path.join( app.static_folder, stream, @@ -58,13 +71,11 @@ def list_hours(stream, variant): @app.route('/files///') +@has_path_args def list_segments(stream, variant, hour): """Returns a JSON list of segment files for a given stream, variant and hour. Returns empty list on non-existant streams, etc. """ - # Check no-one's being sneaky with path traversal or hidden folders - if any(arg.startswith('.') for arg in (stream, variant, hour)): - return "Parts may not start with period", 403 path = os.path.join( app.static_folder, stream, @@ -75,15 +86,13 @@ def list_segments(stream, variant, hour): @app.route('/playlist/.m3u8') +@has_path_args def generate_master_playlist(stream): """Returns a HLS master playlist for the given stream. Takes optional params: start, end: The time to begin and end the stream at. See generate_media_playlist for details. """ - # path traversal / hidden folders - if stream.startswith('.'): - return "Stream may not start with period", 403 variants = listdir(os.path.join(app.static_folder, stream)) playlists = { variant: url_for('generate_media_playlist', stream=stream, variant=variant, **request.args) @@ -93,12 +102,8 @@ def generate_master_playlist(stream): @app.route('/playlist//.m3u8') +@has_path_args def generate_media_playlist(stream, variant): - # path traversal / hidden folders - if stream.startswith('.'): - return "Stream may not start with period", 403 - if variant.startswith('.'): - return "Variant may not start with period", 403 #TODO handle no start/end #TODO error handling of args # TODO lots of other stuff From 3bbe1ed32d353c917f0db6cc9b91f473cf9de5d9 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 26 Dec 2018 20:07:07 -0800 Subject: [PATCH 07/12] Prefer longer duration on multiple segments --- common/common.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/common/common.py b/common/common.py index 83455ed..17f9b6b 100644 --- a/common/common.py +++ b/common/common.py @@ -240,8 +240,12 @@ def best_segments_by_start(hour): logging.warning("Multiple versions of full segment at start_time {}: {}".format( start_time, ", ".join(map(str, segments)) )) - # we have to pick one, so might as well make it consistent by sorting by path - full_segments.sort(key=lambda segment: segment.path) + # We've observed some cases where the same segment (with the same hash) will be reported + # with different durations (generally at stream end). Prefer the longer duration, + # as this will ensure that if hashes are different we get the most data, and if they + # are the same it should keep holes to a minimum. + # If same duration, we have to pick one, so pick highest-sorting hash just so we're consistent. + full_segments = [max(full_segments, key=lambda segment: (segment.duration, segment.hash))] yield full_segments[0] continue # no full segments, fall back to measuring partials. From 8f5a98a90690595fa35a168558818794913ebd06 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 26 Dec 2018 21:03:15 -0800 Subject: [PATCH 08/12] restreamer: Don't offer a variant on the master playlist if it's outside requested time range This prevents clients from picking a variant that they then can't play any content for. In general we expect the same content to be available on all variants being captured, but if the set of captured variants changes we still want to handle that gracefully. --- restreamer/restreamer/main.py | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index c5bc873..7f32ad5 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -1,4 +1,5 @@ +import datetime import errno import functools import json @@ -85,6 +86,17 @@ def list_segments(stream, variant, hour): return json.dumps(listdir(path, error=False)) +def time_range_for_variant(stream, variant): + """Returns earliest and latest times that the given variant has segments for + (up to hour resolution), or 404 if it doesn't exist / is empty.""" + hours = listdir(os.path.join(app.static_folder, stream, variant)) + if not hours: + abort(404) + first, last = min(hours), max(hours) + # note last hour parses to _start_ of that hour, so we add 1h to go to end of that hour + return dateutil.parser.parse(first), dateutil.parser.parse(last) + datetime.timedelta(hours=1) + + @app.route('/playlist/.m3u8') @has_path_args def generate_master_playlist(stream): @@ -93,11 +105,24 @@ def generate_master_playlist(stream): start, end: The time to begin and end the stream at. See generate_media_playlist for details. """ + start = dateutil.parser.parse(request.args['start']) if 'start' in request.args else None + end = dateutil.parser.parse(request.args['end']) if 'end' in request.args else None variants = listdir(os.path.join(app.static_folder, stream)) - playlists = { - variant: url_for('generate_media_playlist', stream=stream, variant=variant, **request.args) - for variant in variants - } + + playlists = {} + for variant in variants: + # If start or end are given, try to restrict offered variants to ones which exist for that + # time range. + if start is not None or end is not None: + first, last = time_range_for_variant(stream, variant) + if start is not None and last < start: + continue # last time for variant is before our start time, don't offer variant + if end is not None and end < first: + continue # our end time is before first time for variant, don't offer variant + playlists[variant] = url_for( + 'generate_media_playlist', stream=stream, variant=variant, **request.args + ) + return generate_hls.generate_master(playlists) From 6fa74608fbf50042467967ad9c561b154d8e669b Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 26 Dec 2018 21:07:43 -0800 Subject: [PATCH 09/12] common: Improve some docs to note types of things that are ambiguous --- common/common.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/common.py b/common/common.py index 17f9b6b..be6f71b 100644 --- a/common/common.py +++ b/common/common.py @@ -73,7 +73,8 @@ class SegmentInfo( 'path', 'stream', 'variant', 'start', 'duration', 'is_partial', 'hash' ]) ): - """Info parsed from a segment path, including original path.""" + """Info parsed from a segment path, including original path. + Note that start time is a datetime and duration is a timedelta, and hash is a decoded binary string.""" @property def end(self): return self.start + self.duration @@ -116,6 +117,7 @@ def parse_segment_path(path): def get_best_segments(hours_path, start, end): """Return a list of the best sequence of non-overlapping segments we have for a given time range. Hours path should be the directory containing hour directories. + Time args start and end should be given as datetime objects. The first segment may start before the time range, and the last may end after it. The returned list contains items that are either: SegmentInfo: a segment From e34f04cf57f1ae788e3547898a1ee82df1d74010 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 26 Dec 2018 21:29:42 -0800 Subject: [PATCH 10/12] restreamer: Harden generate_media_playlist to handle weird inputs and defaults --- restreamer/restreamer/main.py | 36 +++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 7f32ad5..41adfd0 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -129,13 +129,37 @@ def generate_master_playlist(stream): @app.route('/playlist//.m3u8') @has_path_args def generate_media_playlist(stream, variant): - #TODO handle no start/end - #TODO error handling of args - # TODO lots of other stuff - start = dateutil.parser.parse(request.args['start']) - end = dateutil.parser.parse(request.args['end']) + """Returns a HLS media playlist for the given stream and variant. + Takes optional params: + start, end: The time to begin and end the stream at. + Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS). + If not given, effectively means "infinity", ie. no start means + any time ago, no end means any time in the future. + """ + hours_path = os.path.join(app.static_folder, stream, variant) - segments = get_best_segments(hours_path, start, end) + if not os.path.isdir(hours_path): + abort(404) + + start = dateutil.parser.parse(request.args['start']) if 'start' in request.args else None + end = dateutil.parser.parse(request.args['end']) if 'end' in request.args else None + if start is None or end is None: + # If start or end are not given, use the earliest/latest time available + first, last = time_range_for_variant(stream, variant) + if start is None: + start = first + if end is None: + end = last + + # get_best_segments requires start be before end, special case that as no segments + # (not an error because someone might ask for a specific start, no end, but we ended up with + # end before start because that's the latest time we have) + if start < end: + segments = get_best_segments(hours_path, start, end) + else: + # Note the None to indicate there was a "hole" at both start and end + segments = [None] + return generate_hls.generate_media(segments, os.path.join(app.static_url_path, stream, variant)) From 201959888ab71904a95e61d6c4e3eb28ecef835c Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 26 Dec 2018 21:44:51 -0800 Subject: [PATCH 11/12] restreamer: More accurate target duration in playlist --- restreamer/restreamer/generate_hls.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/restreamer/restreamer/generate_hls.py b/restreamer/restreamer/generate_hls.py index 8e9d09e..8db0b14 100644 --- a/restreamer/restreamer/generate_hls.py +++ b/restreamer/restreamer/generate_hls.py @@ -1,5 +1,7 @@ +import datetime import os import urllib +from collections import Counter def generate_master(playlists): @@ -22,9 +24,19 @@ def generate_media(segments, base_url): """Generate a media playlist from a list of segments as returned by common.get_best_segments(). Segments are specified as hour/name.ts relative to base_url. """ + # We have to pick a "target duration". in most circumstances almost all segments + # will be of that duration, so we get the most common duration out of all the segments + # and use that. + # If we have no segments, default to 6 seconds. + non_none_segments = [segment for segment in segments if segment is not None] + if non_none_segments: + # Note most_common returns [(value, count)] so we unpack. + ((target_duration, _),) = Counter(segment.duration for segment in non_none_segments).most_common(1) + else: + target_duration = datetime.timedelta(seconds=6) lines = [ "#EXTM3U", - "#EXT-X-TARGETDURATION:6", + "#EXT-X-TARGETDURATION:{:.3f}".format(target_duration.total_seconds()), ] for segment in segments: # TODO handle missing bits, stream endings, other stuff From b4e627f382d6b6c8d73e1ea36134fef4457c7368 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 26 Dec 2018 22:06:28 -0800 Subject: [PATCH 12/12] restreamer: When generating playlists, include discontinuities, timestamps and endlist This fills out the incomplete playlist generation functionality to handle holes and communicate extra information. See comments for details. --- restreamer/restreamer/generate_hls.py | 36 +++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/restreamer/restreamer/generate_hls.py b/restreamer/restreamer/generate_hls.py index 8db0b14..835daa2 100644 --- a/restreamer/restreamer/generate_hls.py +++ b/restreamer/restreamer/generate_hls.py @@ -24,6 +24,7 @@ def generate_media(segments, base_url): """Generate a media playlist from a list of segments as returned by common.get_best_segments(). Segments are specified as hour/name.ts relative to base_url. """ + # We have to pick a "target duration". in most circumstances almost all segments # will be of that duration, so we get the most common duration out of all the segments # and use that. @@ -34,14 +35,45 @@ def generate_media(segments, base_url): ((target_duration, _),) = Counter(segment.duration for segment in non_none_segments).most_common(1) else: target_duration = datetime.timedelta(seconds=6) + lines = [ "#EXTM3U", "#EXT-X-TARGETDURATION:{:.3f}".format(target_duration.total_seconds()), ] + + # Note and remove any trailing None from the segment list - this indicates there is a hole + # at the end, which means we should mark the stream as incomplete but not include a discontinuity. + if segments and segments[-1] is None: + incomplete = True + segments = segments[:-1] + else: + incomplete = False + + # Remove any leading None from the segment list - this indicates there is a hole at the start, + # which isn't actually meaningful in any way to us. + # Note that in the case of segments = [None], we already removed it when we removed the trailing + # None, and segments is now []. This is fine. + if segments and segments[0] is None: + segments = segments[1:] + for segment in segments: - # TODO handle missing bits, stream endings, other stuff - if segment is not None: + if segment is None: + # Discontinuity. Adding this tag tells the client that we've missed something + # and it should start decoding fresh on the next segment. This is required when + # someone stops/starts a stream and a good idea if we're missing a segment in a + # continuous stream. + lines.append("#EXT-X-DISCONTINUITY") + else: + # Each segment has two prefixes: timestamp and duration. + # This tells the client exactly what time the segment represents, which is important + # for the editor since it needs to describe cut points in these times. path = '/'.join(segment.path.split('/')[-2:]) + lines.append("#EXT-X-PROGRAM-DATE-TIME:{}".format(segment.start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))) lines.append("#EXTINF:{:.3f},live".format(segment.duration.total_seconds())) lines.append(urllib.quote(os.path.join(base_url, path))) + + # If stream is complete, add an ENDLIST marker to show this. + if not incomplete: + lines.append("#EXT-X-ENDLIST") + return "\n".join(lines) + '\n'