Merge pull request #6 from ekimekim/mike/restreamer/improvements

restreamer: Multiple improvements and general "finishing"
pull/10/head
Christopher Usher 6 years ago committed by GitHub
commit 9782a3ebd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -73,7 +73,8 @@ class SegmentInfo(
'path', 'stream', 'variant', 'start', 'duration', 'is_partial', 'hash' 'path', 'stream', 'variant', 'start', 'duration', 'is_partial', 'hash'
]) ])
): ):
"""Info parsed from a segment path, including original path.""" """Info parsed from a segment path, including original path.
Note that start time is a datetime and duration is a timedelta, and hash is a decoded binary string."""
@property @property
def end(self): def end(self):
return self.start + self.duration return self.start + self.duration
@ -116,6 +117,7 @@ def parse_segment_path(path):
def get_best_segments(hours_path, start, end): def get_best_segments(hours_path, start, end):
"""Return a list of the best sequence of non-overlapping segments """Return a list of the best sequence of non-overlapping segments
we have for a given time range. Hours path should be the directory containing hour directories. we have for a given time range. Hours path should be the directory containing hour directories.
Time args start and end should be given as datetime objects.
The first segment may start before the time range, and the last may end after it. The first segment may start before the time range, and the last may end after it.
The returned list contains items that are either: The returned list contains items that are either:
SegmentInfo: a segment SegmentInfo: a segment
@ -240,8 +242,12 @@ def best_segments_by_start(hour):
logging.warning("Multiple versions of full segment at start_time {}: {}".format( logging.warning("Multiple versions of full segment at start_time {}: {}".format(
start_time, ", ".join(map(str, segments)) start_time, ", ".join(map(str, segments))
)) ))
# we have to pick one, so might as well make it consistent by sorting by path # We've observed some cases where the same segment (with the same hash) will be reported
full_segments.sort(key=lambda segment: segment.path) # with different durations (generally at stream end). Prefer the longer duration,
# as this will ensure that if hashes are different we get the most data, and if they
# are the same it should keep holes to a minimum.
# If same duration, we have to pick one, so pick highest-sorting hash just so we're consistent.
full_segments = [max(full_segments, key=lambda segment: (segment.duration, segment.hash))]
yield full_segments[0] yield full_segments[0]
continue continue
# no full segments, fall back to measuring partials. # no full segments, fall back to measuring partials.

@ -1,5 +1,7 @@
import datetime
import os import os
import urllib import urllib
from collections import Counter
def generate_master(playlists): def generate_master(playlists):
@ -22,14 +24,56 @@ def generate_media(segments, base_url):
"""Generate a media playlist from a list of segments as returned by common.get_best_segments(). """Generate a media playlist from a list of segments as returned by common.get_best_segments().
Segments are specified as hour/name.ts relative to base_url. Segments are specified as hour/name.ts relative to base_url.
""" """
# We have to pick a "target duration". in most circumstances almost all segments
# will be of that duration, so we get the most common duration out of all the segments
# and use that.
# If we have no segments, default to 6 seconds.
non_none_segments = [segment for segment in segments if segment is not None]
if non_none_segments:
# Note most_common returns [(value, count)] so we unpack.
((target_duration, _),) = Counter(segment.duration for segment in non_none_segments).most_common(1)
else:
target_duration = datetime.timedelta(seconds=6)
lines = [ lines = [
"#EXTM3U", "#EXTM3U",
"#EXT-X-TARGETDURATION:6", "#EXT-X-TARGETDURATION:{:.3f}".format(target_duration.total_seconds()),
] ]
# Note and remove any trailing None from the segment list - this indicates there is a hole
# at the end, which means we should mark the stream as incomplete but not include a discontinuity.
if segments and segments[-1] is None:
incomplete = True
segments = segments[:-1]
else:
incomplete = False
# Remove any leading None from the segment list - this indicates there is a hole at the start,
# which isn't actually meaningful in any way to us.
# Note that in the case of segments = [None], we already removed it when we removed the trailing
# None, and segments is now []. This is fine.
if segments and segments[0] is None:
segments = segments[1:]
for segment in segments: for segment in segments:
# TODO handle missing bits, stream endings, other stuff if segment is None:
if segment is not None: # Discontinuity. Adding this tag tells the client that we've missed something
# and it should start decoding fresh on the next segment. This is required when
# someone stops/starts a stream and a good idea if we're missing a segment in a
# continuous stream.
lines.append("#EXT-X-DISCONTINUITY")
else:
# Each segment has two prefixes: timestamp and duration.
# This tells the client exactly what time the segment represents, which is important
# for the editor since it needs to describe cut points in these times.
path = '/'.join(segment.path.split('/')[-2:]) path = '/'.join(segment.path.split('/')[-2:])
lines.append("#EXT-X-PROGRAM-DATE-TIME:{}".format(segment.start.strftime("%Y-%m-%dT%H:%M:%S.%fZ")))
lines.append("#EXTINF:{:.3f},live".format(segment.duration.total_seconds())) lines.append("#EXTINF:{:.3f},live".format(segment.duration.total_seconds()))
lines.append(urllib.quote(os.path.join(base_url, path))) lines.append(urllib.quote(os.path.join(base_url, path)))
# If stream is complete, add an ENDLIST marker to show this.
if not incomplete:
lines.append("#EXT-X-ENDLIST")
return "\n".join(lines) + '\n' return "\n".join(lines) + '\n'

@ -1,4 +1,5 @@
import datetime
import errno import errno
import functools import functools
import json import json
@ -85,6 +86,17 @@ def list_segments(stream, variant, hour):
return json.dumps(listdir(path, error=False)) return json.dumps(listdir(path, error=False))
def time_range_for_variant(stream, variant):
"""Returns earliest and latest times that the given variant has segments for
(up to hour resolution), or 404 if it doesn't exist / is empty."""
hours = listdir(os.path.join(app.static_folder, stream, variant))
if not hours:
abort(404)
first, last = min(hours), max(hours)
# note last hour parses to _start_ of that hour, so we add 1h to go to end of that hour
return dateutil.parser.parse(first), dateutil.parser.parse(last) + datetime.timedelta(hours=1)
@app.route('/playlist/<stream>.m3u8') @app.route('/playlist/<stream>.m3u8')
@has_path_args @has_path_args
def generate_master_playlist(stream): def generate_master_playlist(stream):
@ -93,24 +105,61 @@ def generate_master_playlist(stream):
start, end: The time to begin and end the stream at. start, end: The time to begin and end the stream at.
See generate_media_playlist for details. See generate_media_playlist for details.
""" """
start = dateutil.parser.parse(request.args['start']) if 'start' in request.args else None
end = dateutil.parser.parse(request.args['end']) if 'end' in request.args else None
variants = listdir(os.path.join(app.static_folder, stream)) variants = listdir(os.path.join(app.static_folder, stream))
playlists = {
variant: url_for('generate_media_playlist', stream=stream, variant=variant, **request.args) playlists = {}
for variant in variants for variant in variants:
} # If start or end are given, try to restrict offered variants to ones which exist for that
# time range.
if start is not None or end is not None:
first, last = time_range_for_variant(stream, variant)
if start is not None and last < start:
continue # last time for variant is before our start time, don't offer variant
if end is not None and end < first:
continue # our end time is before first time for variant, don't offer variant
playlists[variant] = url_for(
'generate_media_playlist', stream=stream, variant=variant, **request.args
)
return generate_hls.generate_master(playlists) return generate_hls.generate_master(playlists)
@app.route('/playlist/<stream>/<variant>.m3u8') @app.route('/playlist/<stream>/<variant>.m3u8')
@has_path_args @has_path_args
def generate_media_playlist(stream, variant): def generate_media_playlist(stream, variant):
#TODO handle no start/end """Returns a HLS media playlist for the given stream and variant.
#TODO error handling of args Takes optional params:
# TODO lots of other stuff start, end: The time to begin and end the stream at.
start = dateutil.parser.parse(request.args['start']) Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS).
end = dateutil.parser.parse(request.args['end']) If not given, effectively means "infinity", ie. no start means
any time ago, no end means any time in the future.
"""
hours_path = os.path.join(app.static_folder, stream, variant) hours_path = os.path.join(app.static_folder, stream, variant)
segments = get_best_segments(hours_path, start, end) if not os.path.isdir(hours_path):
abort(404)
start = dateutil.parser.parse(request.args['start']) if 'start' in request.args else None
end = dateutil.parser.parse(request.args['end']) if 'end' in request.args else None
if start is None or end is None:
# If start or end are not given, use the earliest/latest time available
first, last = time_range_for_variant(stream, variant)
if start is None:
start = first
if end is None:
end = last
# get_best_segments requires start be before end, special case that as no segments
# (not an error because someone might ask for a specific start, no end, but we ended up with
# end before start because that's the latest time we have)
if start < end:
segments = get_best_segments(hours_path, start, end)
else:
# Note the None to indicate there was a "hole" at both start and end
segments = [None]
return generate_hls.generate_media(segments, os.path.join(app.static_url_path, stream, variant)) return generate_hls.generate_media(segments, os.path.join(app.static_url_path, stream, variant))

Loading…
Cancel
Save