diff --git a/common/common.py b/common/common.py index 96a1cf1..be6f71b 100644 --- a/common/common.py +++ b/common/common.py @@ -2,7 +2,14 @@ """A place for common utilities between wubloader components""" +import base64 import datetime +import errno +import itertools +import logging +import os +import sys +from collections import namedtuple import dateutil.parser @@ -52,3 +59,196 @@ def format_bustime(bustime, round="millisecond"): else: raise ValueError("Bad rounding value: {!r}".format(round)) return sign + ":".join(parts) + + +def unpadded_b64_decode(s): + """Decode base64-encoded string that has had its padding removed""" + # right-pad with '=' to multiple of 4 + s = s + '=' * (- len(s) % 4) + return base64.b64decode(s, "-_") + + +class SegmentInfo( + namedtuple('SegmentInfoBase', [ + 'path', 'stream', 'variant', 'start', 'duration', 'is_partial', 'hash' + ]) +): + """Info parsed from a segment path, including original path. + Note that start time is a datetime and duration is a timedelta, and hash is a decoded binary string.""" + @property + def end(self): + return self.start + self.duration + + +def parse_segment_path(path): + """Parse segment path, returning a SegmentInfo. If path is only the trailing part, + eg. just a filename, it will leave unknown fields as None.""" + parts = path.split('/') + # left-pad parts with None up to 4 parts + parts = [None] * (4 - len(parts)) + parts + # pull info out of path parts + stream, variant, hour, filename = parts[-4:] + # split filename, which should be TIME-DURATION-TYPE-HASH.ts + try: + if not filename.endswith('.ts'): + raise ValueError("Does not end in .ts") + filename = filename[:-len('.ts')] # chop off .ts + parts = filename.split('-', 3) + if len(parts) != 4: + raise ValueError("Not enough dashes in filename") + time, duration, type, hash = parts + if type not in ('full', 'partial'): + raise ValueError("Unknown type {!r}".format(type)) + return SegmentInfo( + path = path, + stream = stream, + variant = variant, + start = dateutil.parser.parse("{}:{}".format(hour, time)), + duration = datetime.timedelta(seconds=float(duration)), + is_partial = type == "partial", + hash = unpadded_b64_decode(hash), + ) + except ValueError as e: + # wrap error but preserve original traceback + _, _, tb = sys.exc_info() + raise ValueError, ValueError("Bad path {!r}: {}".format(path, e)), tb + + +def get_best_segments(hours_path, start, end): + """Return a list of the best sequence of non-overlapping segments + we have for a given time range. Hours path should be the directory containing hour directories. + Time args start and end should be given as datetime objects. + The first segment may start before the time range, and the last may end after it. + The returned list contains items that are either: + SegmentInfo: a segment + None: represents a discontinuity between the previous segment and the next one. + ie. as long as two segments appear next to each other, we guarentee there is no gap between + them, the second one starts right as the first one finishes. + Similarly, unless the first item is None, the first segment starts <= the start of the time + range, and unless the last item is None, the last segment ends >= the end of the time range. + Example: + Suppose you ask for a time range from 10 to 60. We have 10-second segments covering + the following times: + 5 to 15 + 15 to 25 + 30 to 40 + 40 to 50 + Then the output would look like: + segment from 5 to 15 + segment from 15 to 25 + None, as the previous segment ends 5sec before the next one begins + segment from 30 to 40 + segment from 40 to 50 + None, as the previous segment ends 10sec before the requested end time of 60. + Note that any is_partial=True segment will be followed by a None, since we can't guarentee + it joins on to the next segment fully intact. + """ + # Note: The exact equality checks in this function are not vulnerable to floating point error, + # but only because all input dates and durations are only precise to the millisecond, and + # python's datetime types represent these as integer microseconds internally. So the parsing + # to these types is exact, and all operations on them are exact, so all operations are exact. + + result = [] + + for hour in hour_paths_for_range(hours_path, start, end): + # best_segments_by_start will give us the best available segment for each unique start time + for segment in best_segments_by_start(hour): + + # special case: first segment + if not result: + # first segment is allowed to be before start as long as it includes it + if segment.start <= start < segment.end: + # segment covers start + result.append(segment) + elif start < segment.start < end: + # segment is after start (but before end), so there was no segment that covers start + # so we begin with a None + result.append(None) + result.append(segment) + else: + # segment is before start, and doesn't cover start, or starts after end. + # ignore and go to next. + continue + else: + # normal case: check against previous segment end time + prev_end = result[-1].end + if segment.start < prev_end: + # Overlap! This shouldn't happen, though it might be possible due to weirdness + # if the stream drops then starts again quickly. We simply ignore the overlapping + # segment and let the algorithm continue. + logging.warning("Overlapping segments: {} overlaps end of {}".format(segment, result[-1])) + continue + if result[-1].is_partial or prev_end < segment.start: + # there's a gap between prev end and this start, so add a None + result.append(None) + result.append(segment) + + # check if we've reached the end + if end <= segment.end: + break + + # this is a weird little construct that says "if we broke from the inner loop, + # then also break from the outer one. otherwise continue." + else: + continue + break + + # check if we need a trailing None because last segment is partial or doesn't reach end + if result and (result[-1].is_partial or result[-1].end < end): + result.append(None) + + return result + + +def hour_paths_for_range(hours_path, start, end): + """Generate a list of hour paths to check when looking for segments between start and end.""" + # truncate start and end to the hour + def truncate(dt): + return dt.replace(microsecond=0, second=0, minute=0) + current = truncate(start) + end = truncate(end) + # Begin in the hour prior to start, as there may be a segment that starts in that hour + # but contains the start time, eg. if the start time is 01:00:01 and there's a segment + # at 00:59:59 which goes for 3 seconds. + # Checking the entire hour when in most cases it won't be needed is wasteful, but it's also + # pretty quick and the complexity of only checking this case when needed just isn't worth it. + current -= datetime.timedelta(hours=1) + while current <= end: + yield os.path.join(hours_path, current.strftime("%Y-%m-%dT%H")) + current += datetime.timedelta(hours=1) + + +def best_segments_by_start(hour): + """Within a given hour path, yield the "best" segment per unique segment start time. + Best is defined as non-partial, or failing that the longest partial. + Note this means this function may perform os.stat()s in order to find the longest partial. + """ + try: + segment_paths = os.listdir(hour) + except OSError as e: + if e.errno != errno.ENOENT: + raise + # path does not exist, treat it as having no files + return + segment_paths.sort() + # note we only parse them as we need them, which is unlikely to save us much time overall + # but is easy enough to do, so we might as well. + parsed = (parse_segment_path(os.path.join(hour, name)) for name in segment_paths) + for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start): + segments = list(segments) + full_segments = [segment for segment in segments if not segment.is_partial] + if full_segments: + if len(full_segments) != 1: + logging.warning("Multiple versions of full segment at start_time {}: {}".format( + start_time, ", ".join(map(str, segments)) + )) + # We've observed some cases where the same segment (with the same hash) will be reported + # with different durations (generally at stream end). Prefer the longer duration, + # as this will ensure that if hashes are different we get the most data, and if they + # are the same it should keep holes to a minimum. + # If same duration, we have to pick one, so pick highest-sorting hash just so we're consistent. + full_segments = [max(full_segments, key=lambda segment: (segment.duration, segment.hash))] + yield full_segments[0] + continue + # no full segments, fall back to measuring partials. + yield max(segments, key=lambda segment: os.stat(segment.path).st_size) diff --git a/restreamer/Dockerfile b/restreamer/Dockerfile new file mode 100644 index 0000000..84650bb --- /dev/null +++ b/restreamer/Dockerfile @@ -0,0 +1,15 @@ +FROM alpine:3.7 +RUN apk --update add py2-pip + +# Install common lib first as it changes less +COPY common /tmp/common +RUN pip install /tmp/common && rm -r /tmp/common + +# Install actual application +COPY restreamer /tmp/restreamer +RUN pip install /tmp/restreamer && rm -r /tmp/restreamer + +# Add config file last as it changes most +COPY config.yaml /etc/wubloader.yaml + +CMD python2 -m restreamer diff --git a/restreamer/restreamer/__init__.py b/restreamer/restreamer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/restreamer/restreamer/__main__.py b/restreamer/restreamer/__main__.py new file mode 100644 index 0000000..24935eb --- /dev/null +++ b/restreamer/restreamer/__main__.py @@ -0,0 +1,14 @@ + +import gevent.monkey +gevent.monkey.patch_all() + +import logging + +import argh + +from restreamer.main import main + +LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" + +logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) +argh.dispatch_command(main) diff --git a/restreamer/restreamer/generate_hls.py b/restreamer/restreamer/generate_hls.py new file mode 100644 index 0000000..835daa2 --- /dev/null +++ b/restreamer/restreamer/generate_hls.py @@ -0,0 +1,79 @@ +import datetime +import os +import urllib +from collections import Counter + + +def generate_master(playlists): + """Generate master playlist. Playlists arg should be a map {name: url}. + Little validation or encoding is done - please try to keep the names valid + without escaping. + """ + lines = ["#EXTM3U"] + for name, url in playlists.items(): + lines += [ + # We name each variant with a VIDEO rendition with no url + '#EXT-X-MEDIA:TYPE=VIDEO,GROUP-ID="{name}",NAME="{name}",AUTOSELECT=YES,DEFAULT=YES'.format(name=name), + '#EXT-X-STREAM-INF:VIDEO="{name}"'.format(name=name), + url, + ] + return "\n".join(lines) + '\n' + + +def generate_media(segments, base_url): + """Generate a media playlist from a list of segments as returned by common.get_best_segments(). + Segments are specified as hour/name.ts relative to base_url. + """ + + # We have to pick a "target duration". in most circumstances almost all segments + # will be of that duration, so we get the most common duration out of all the segments + # and use that. + # If we have no segments, default to 6 seconds. + non_none_segments = [segment for segment in segments if segment is not None] + if non_none_segments: + # Note most_common returns [(value, count)] so we unpack. + ((target_duration, _),) = Counter(segment.duration for segment in non_none_segments).most_common(1) + else: + target_duration = datetime.timedelta(seconds=6) + + lines = [ + "#EXTM3U", + "#EXT-X-TARGETDURATION:{:.3f}".format(target_duration.total_seconds()), + ] + + # Note and remove any trailing None from the segment list - this indicates there is a hole + # at the end, which means we should mark the stream as incomplete but not include a discontinuity. + if segments and segments[-1] is None: + incomplete = True + segments = segments[:-1] + else: + incomplete = False + + # Remove any leading None from the segment list - this indicates there is a hole at the start, + # which isn't actually meaningful in any way to us. + # Note that in the case of segments = [None], we already removed it when we removed the trailing + # None, and segments is now []. This is fine. + if segments and segments[0] is None: + segments = segments[1:] + + for segment in segments: + if segment is None: + # Discontinuity. Adding this tag tells the client that we've missed something + # and it should start decoding fresh on the next segment. This is required when + # someone stops/starts a stream and a good idea if we're missing a segment in a + # continuous stream. + lines.append("#EXT-X-DISCONTINUITY") + else: + # Each segment has two prefixes: timestamp and duration. + # This tells the client exactly what time the segment represents, which is important + # for the editor since it needs to describe cut points in these times. + path = '/'.join(segment.path.split('/')[-2:]) + lines.append("#EXT-X-PROGRAM-DATE-TIME:{}".format(segment.start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))) + lines.append("#EXTINF:{:.3f},live".format(segment.duration.total_seconds())) + lines.append(urllib.quote(os.path.join(base_url, path))) + + # If stream is complete, add an ENDLIST marker to show this. + if not incomplete: + lines.append("#EXT-X-ENDLIST") + + return "\n".join(lines) + '\n' diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py new file mode 100644 index 0000000..41adfd0 --- /dev/null +++ b/restreamer/restreamer/main.py @@ -0,0 +1,169 @@ + +import datetime +import errno +import functools +import json +import os + +import dateutil.parser +from flask import Flask, url_for, request, abort +from gevent.pywsgi import WSGIServer + +from common import get_best_segments + +import generate_hls + + +app = Flask('restreamer', static_url_path='/segments') + + +""" +The restreamer is a simple http api for listing available segments and generating +HLS playlists for them. + +The segments themselves are ideally to be served by some external webserver +under "/segments////" (ie. with BASE_DIR under "/segments"), +though this server will also serve them if requested. +""" + + +def listdir(path, error=True): + """List files in path, excluding hidden files. + Behaviour when path doesn't exist depends on error arg. + If error is True, raise 404. Otherwise, return []. + """ + try: + return [name for name in os.listdir(path) if not name.startswith('.')] + except OSError as e: + if e.errno != errno.ENOENT: + raise + if error: + abort(404) + return [] + + +def has_path_args(fn): + """Decorator to wrap routes which take args which are to be used as parts of a filepath. + Disallows hidden folders and path traversal, and converts unicode to bytes. + """ + @functools.wraps(fn) + def _has_path_args(**kwargs): + kwargs = {key: value.encode('utf-8') for key, value in kwargs.items()} + for key, value in kwargs.items(): + # Disallowing a leading . prevents both hidden files and path traversal ("..") + if value.startswith('.'): + return "Bad {}: May not start with a period".format(key), 403 + return fn(**kwargs) + return _has_path_args + + +@app.route('/files//') +@has_path_args +def list_hours(stream, variant): + """Returns a JSON list of hours for the given stream and variant for which + there may be segments available. Returns empty list on non-existent streams, etc. + """ + path = os.path.join( + app.static_folder, + stream, + variant, + ) + return json.dumps(listdir(path, error=False)) + + +@app.route('/files///') +@has_path_args +def list_segments(stream, variant, hour): + """Returns a JSON list of segment files for a given stream, variant and hour. + Returns empty list on non-existant streams, etc. + """ + path = os.path.join( + app.static_folder, + stream, + variant, + hour, + ) + return json.dumps(listdir(path, error=False)) + + +def time_range_for_variant(stream, variant): + """Returns earliest and latest times that the given variant has segments for + (up to hour resolution), or 404 if it doesn't exist / is empty.""" + hours = listdir(os.path.join(app.static_folder, stream, variant)) + if not hours: + abort(404) + first, last = min(hours), max(hours) + # note last hour parses to _start_ of that hour, so we add 1h to go to end of that hour + return dateutil.parser.parse(first), dateutil.parser.parse(last) + datetime.timedelta(hours=1) + + +@app.route('/playlist/.m3u8') +@has_path_args +def generate_master_playlist(stream): + """Returns a HLS master playlist for the given stream. + Takes optional params: + start, end: The time to begin and end the stream at. + See generate_media_playlist for details. + """ + start = dateutil.parser.parse(request.args['start']) if 'start' in request.args else None + end = dateutil.parser.parse(request.args['end']) if 'end' in request.args else None + variants = listdir(os.path.join(app.static_folder, stream)) + + playlists = {} + for variant in variants: + # If start or end are given, try to restrict offered variants to ones which exist for that + # time range. + if start is not None or end is not None: + first, last = time_range_for_variant(stream, variant) + if start is not None and last < start: + continue # last time for variant is before our start time, don't offer variant + if end is not None and end < first: + continue # our end time is before first time for variant, don't offer variant + playlists[variant] = url_for( + 'generate_media_playlist', stream=stream, variant=variant, **request.args + ) + + return generate_hls.generate_master(playlists) + + +@app.route('/playlist//.m3u8') +@has_path_args +def generate_media_playlist(stream, variant): + """Returns a HLS media playlist for the given stream and variant. + Takes optional params: + start, end: The time to begin and end the stream at. + Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS). + If not given, effectively means "infinity", ie. no start means + any time ago, no end means any time in the future. + """ + + hours_path = os.path.join(app.static_folder, stream, variant) + if not os.path.isdir(hours_path): + abort(404) + + start = dateutil.parser.parse(request.args['start']) if 'start' in request.args else None + end = dateutil.parser.parse(request.args['end']) if 'end' in request.args else None + if start is None or end is None: + # If start or end are not given, use the earliest/latest time available + first, last = time_range_for_variant(stream, variant) + if start is None: + start = first + if end is None: + end = last + + # get_best_segments requires start be before end, special case that as no segments + # (not an error because someone might ask for a specific start, no end, but we ended up with + # end before start because that's the latest time we have) + if start < end: + segments = get_best_segments(hours_path, start, end) + else: + # Note the None to indicate there was a "hole" at both start and end + segments = [None] + + return generate_hls.generate_media(segments, os.path.join(app.static_url_path, stream, variant)) + + +def main(host='0.0.0.0', port=8000, base_dir='.'): + app.static_folder = base_dir + server = WSGIServer((host, port), app) + server.serve_forever() diff --git a/restreamer/setup.py b/restreamer/setup.py new file mode 100644 index 0000000..47a450f --- /dev/null +++ b/restreamer/setup.py @@ -0,0 +1,11 @@ +from setuptools import setup, find_packages + +setup( + name = "wubloader-restreamer", + version = "0.0.0", + packages = find_packages(), + install_requires = [ + "requests", + "wubloader-common", + ], +)