Merge pull request #5 from ekimekim/mike/restreamer/initial

Initial work on restreamer
pull/10/head
Christopher Usher 6 years ago committed by GitHub
commit 4981c6521b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,7 +2,14 @@
"""A place for common utilities between wubloader components"""
import base64
import datetime
import errno
import itertools
import logging
import os
import sys
from collections import namedtuple
import dateutil.parser
@ -52,3 +59,190 @@ def format_bustime(bustime, round="millisecond"):
else:
raise ValueError("Bad rounding value: {!r}".format(round))
return sign + ":".join(parts)
def unpadded_b64_decode(s):
"""Decode base64-encoded string that has had its padding removed"""
# right-pad with '=' to multiple of 4
s = s + '=' * (- len(s) % 4)
return base64.b64decode(s, "-_")
class SegmentInfo(
namedtuple('SegmentInfoBase', [
'path', 'stream', 'variant', 'start', 'duration', 'is_partial', 'hash'
])
):
"""Info parsed from a segment path, including original path."""
@property
def end(self):
return self.start + self.duration
def parse_segment_path(path):
"""Parse segment path, returning a SegmentInfo. If path is only the trailing part,
eg. just a filename, it will leave unknown fields as None."""
parts = path.split('/')
# left-pad parts with None up to 4 parts
parts = [None] * (4 - len(parts)) + parts
# pull info out of path parts
stream, variant, hour, filename = parts[-4:]
# split filename, which should be TIME-DURATION-TYPE-HASH.ts
try:
if not filename.endswith('.ts'):
raise ValueError("Does not end in .ts")
filename = filename[:-len('.ts')] # chop off .ts
parts = filename.split('-', 3)
if len(parts) != 4:
raise ValueError("Not enough dashes in filename")
time, duration, type, hash = parts
if type not in ('full', 'partial'):
raise ValueError("Unknown type {!r}".format(type))
return SegmentInfo(
path = path,
stream = stream,
variant = variant,
start = dateutil.parser.parse("{}:{}".format(hour, time)),
duration = datetime.timedelta(seconds=float(duration)),
is_partial = type == "partial",
hash = unpadded_b64_decode(hash),
)
except ValueError as e:
# wrap error but preserve original traceback
_, _, tb = sys.exc_info()
raise ValueError, ValueError("Bad path {!r}: {}".format(path, e)), tb
def get_best_segments(hours_path, start, end):
"""Return a list of the best sequence of non-overlapping segments
we have for a given time range. Hours path should be the directory containing hour directories.
The first segment may start before the time range, and the last may end after it.
The returned list contains items that are either:
SegmentInfo: a segment
None: represents a discontinuity between the previous segment and the next one.
ie. as long as two segments appear next to each other, we guarentee there is no gap between
them, the second one starts right as the first one finishes.
Similarly, unless the first item is None, the first segment starts <= the start of the time
range, and unless the last item is None, the last segment ends >= the end of the time range.
Example:
Suppose you ask for a time range from 10 to 60. We have 10-second segments covering
the following times:
5 to 15
15 to 25
30 to 40
40 to 50
Then the output would look like:
segment from 5 to 15
segment from 15 to 25
None, as the previous segment ends 5sec before the next one begins
segment from 30 to 40
segment from 40 to 50
None, as the previous segment ends 10sec before the requested end time of 60.
Note that any is_partial=True segment will be followed by a None, since we can't guarentee
it joins on to the next segment fully intact.
"""
# Note: The exact equality checks in this function are not vulnerable to floating point error,
# but only because all input dates and durations are only precise to the millisecond, and
# python's datetime types represent these as integer microseconds internally. So the parsing
# to these types is exact, and all operations on them are exact, so all operations are exact.
result = []
for hour in hour_paths_for_range(hours_path, start, end):
# best_segments_by_start will give us the best available segment for each unique start time
for segment in best_segments_by_start(hour):
# special case: first segment
if not result:
# first segment is allowed to be before start as long as it includes it
if segment.start <= start < segment.end:
# segment covers start
result.append(segment)
elif start < segment.start < end:
# segment is after start (but before end), so there was no segment that covers start
# so we begin with a None
result.append(None)
result.append(segment)
else:
# segment is before start, and doesn't cover start, or starts after end.
# ignore and go to next.
continue
else:
# normal case: check against previous segment end time
prev_end = result[-1].end
if segment.start < prev_end:
# Overlap! This shouldn't happen, though it might be possible due to weirdness
# if the stream drops then starts again quickly. We simply ignore the overlapping
# segment and let the algorithm continue.
logging.warning("Overlapping segments: {} overlaps end of {}".format(segment, result[-1]))
continue
if result[-1].is_partial or prev_end < segment.start:
# there's a gap between prev end and this start, so add a None
result.append(None)
result.append(segment)
# check if we've reached the end
if end <= segment.end:
break
# this is a weird little construct that says "if we broke from the inner loop,
# then also break from the outer one. otherwise continue."
else:
continue
break
# check if we need a trailing None because last segment is partial or doesn't reach end
if result and (result[-1].is_partial or result[-1].end < end):
result.append(None)
return result
def hour_paths_for_range(hours_path, start, end):
"""Generate a list of hour paths to check when looking for segments between start and end."""
# truncate start and end to the hour
def truncate(dt):
return dt.replace(microsecond=0, second=0, minute=0)
current = truncate(start)
end = truncate(end)
# Begin in the hour prior to start, as there may be a segment that starts in that hour
# but contains the start time, eg. if the start time is 01:00:01 and there's a segment
# at 00:59:59 which goes for 3 seconds.
# Checking the entire hour when in most cases it won't be needed is wasteful, but it's also
# pretty quick and the complexity of only checking this case when needed just isn't worth it.
current -= datetime.timedelta(hours=1)
while current <= end:
yield os.path.join(hours_path, current.strftime("%Y-%m-%dT%H"))
current += datetime.timedelta(hours=1)
def best_segments_by_start(hour):
"""Within a given hour path, yield the "best" segment per unique segment start time.
Best is defined as non-partial, or failing that the longest partial.
Note this means this function may perform os.stat()s in order to find the longest partial.
"""
try:
segment_paths = os.listdir(hour)
except OSError as e:
if e.errno != errno.ENOENT:
raise
# path does not exist, treat it as having no files
return
segment_paths.sort()
# note we only parse them as we need them, which is unlikely to save us much time overall
# but is easy enough to do, so we might as well.
parsed = (parse_segment_path(os.path.join(hour, name)) for name in segment_paths)
for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start):
segments = list(segments)
full_segments = [segment for segment in segments if not segment.is_partial]
if full_segments:
if len(full_segments) != 1:
logging.warning("Multiple versions of full segment at start_time {}: {}".format(
start_time, ", ".join(map(str, segments))
))
# we have to pick one, so might as well make it consistent by sorting by path
full_segments.sort(key=lambda segment: segment.path)
yield full_segments[0]
continue
# no full segments, fall back to measuring partials.
yield max(segments, key=lambda segment: os.stat(segment.path).st_size)

@ -0,0 +1,15 @@
FROM alpine:3.7
RUN apk --update add py2-pip
# Install common lib first as it changes less
COPY common /tmp/common
RUN pip install /tmp/common && rm -r /tmp/common
# Install actual application
COPY restreamer /tmp/restreamer
RUN pip install /tmp/restreamer && rm -r /tmp/restreamer
# Add config file last as it changes most
COPY config.yaml /etc/wubloader.yaml
CMD python2 -m restreamer

@ -0,0 +1,14 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import argh
from restreamer.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -0,0 +1,35 @@
import os
import urllib
def generate_master(playlists):
"""Generate master playlist. Playlists arg should be a map {name: url}.
Little validation or encoding is done - please try to keep the names valid
without escaping.
"""
lines = ["#EXTM3U"]
for name, url in playlists.items():
lines += [
# We name each variant with a VIDEO rendition with no url
'#EXT-X-MEDIA:TYPE=VIDEO,GROUP-ID="{name}",NAME="{name}",AUTOSELECT=YES,DEFAULT=YES'.format(name=name),
'#EXT-X-STREAM-INF:VIDEO="{name}"'.format(name=name),
url,
]
return "\n".join(lines) + '\n'
def generate_media(segments, base_url):
"""Generate a media playlist from a list of segments as returned by common.get_best_segments().
Segments are specified as hour/name.ts relative to base_url.
"""
lines = [
"#EXTM3U",
"#EXT-X-TARGETDURATION:6",
]
for segment in segments:
# TODO handle missing bits, stream endings, other stuff
if segment is not None:
path = '/'.join(segment.path.split('/')[-2:])
lines.append("#EXTINF:{:.3f},live".format(segment.duration.total_seconds()))
lines.append(urllib.quote(os.path.join(base_url, path)))
return "\n".join(lines) + '\n'

@ -0,0 +1,120 @@
import errno
import functools
import json
import os
import dateutil.parser
from flask import Flask, url_for, request, abort
from gevent.pywsgi import WSGIServer
from common import get_best_segments
import generate_hls
app = Flask('restreamer', static_url_path='/segments')
"""
The restreamer is a simple http api for listing available segments and generating
HLS playlists for them.
The segments themselves are ideally to be served by some external webserver
under "/segments/<stream>/<variant>/<hour>/<filename>" (ie. with BASE_DIR under "/segments"),
though this server will also serve them if requested.
"""
def listdir(path, error=True):
"""List files in path, excluding hidden files.
Behaviour when path doesn't exist depends on error arg.
If error is True, raise 404. Otherwise, return [].
"""
try:
return [name for name in os.listdir(path) if not name.startswith('.')]
except OSError as e:
if e.errno != errno.ENOENT:
raise
if error:
abort(404)
return []
def has_path_args(fn):
"""Decorator to wrap routes which take args which are to be used as parts of a filepath.
Disallows hidden folders and path traversal, and converts unicode to bytes.
"""
@functools.wraps(fn)
def _has_path_args(**kwargs):
kwargs = {key: value.encode('utf-8') for key, value in kwargs.items()}
for key, value in kwargs.items():
# Disallowing a leading . prevents both hidden files and path traversal ("..")
if value.startswith('.'):
return "Bad {}: May not start with a period".format(key), 403
return fn(**kwargs)
return _has_path_args
@app.route('/files/<stream>/<variant>')
@has_path_args
def list_hours(stream, variant):
"""Returns a JSON list of hours for the given stream and variant for which
there may be segments available. Returns empty list on non-existent streams, etc.
"""
path = os.path.join(
app.static_folder,
stream,
variant,
)
return json.dumps(listdir(path, error=False))
@app.route('/files/<stream>/<variant>/<hour>')
@has_path_args
def list_segments(stream, variant, hour):
"""Returns a JSON list of segment files for a given stream, variant and hour.
Returns empty list on non-existant streams, etc.
"""
path = os.path.join(
app.static_folder,
stream,
variant,
hour,
)
return json.dumps(listdir(path, error=False))
@app.route('/playlist/<stream>.m3u8')
@has_path_args
def generate_master_playlist(stream):
"""Returns a HLS master playlist for the given stream.
Takes optional params:
start, end: The time to begin and end the stream at.
See generate_media_playlist for details.
"""
variants = listdir(os.path.join(app.static_folder, stream))
playlists = {
variant: url_for('generate_media_playlist', stream=stream, variant=variant, **request.args)
for variant in variants
}
return generate_hls.generate_master(playlists)
@app.route('/playlist/<stream>/<variant>.m3u8')
@has_path_args
def generate_media_playlist(stream, variant):
#TODO handle no start/end
#TODO error handling of args
# TODO lots of other stuff
start = dateutil.parser.parse(request.args['start'])
end = dateutil.parser.parse(request.args['end'])
hours_path = os.path.join(app.static_folder, stream, variant)
segments = get_best_segments(hours_path, start, end)
return generate_hls.generate_media(segments, os.path.join(app.static_url_path, stream, variant))
def main(host='0.0.0.0', port=8000, base_dir='.'):
app.static_folder = base_dir
server = WSGIServer((host, port), app)
server.serve_forever()

@ -0,0 +1,11 @@
from setuptools import setup, find_packages
setup(
name = "wubloader-restreamer",
version = "0.0.0",
packages = find_packages(),
install_requires = [
"requests",
"wubloader-common",
],
)
Loading…
Cancel
Save