diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 1e01d08..657a926 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -7,6 +7,7 @@ import logging import os import shutil import signal +from contextlib import closing import dateutil.parser import gevent @@ -198,6 +199,8 @@ def cut(stream, variant): if any holes are detected, rather than producing a video with missing parts. Set to true by passing "true" (case insensitive). Even if holes are allowed, a 406 may result if the resulting video would be empty. + experimental: Optional, default false. If true, use the new, much faster, but experimental + method of cutting. """ start = dateutil.parser.parse(request.args['start']) end = dateutil.parser.parse(request.args['end']) @@ -231,6 +234,11 @@ def cut(stream, variant): # finally, calculate actual output duration, which is what ffmpeg will use duration = full_duration - cut_start - cut_end + # possibly defer to experiemntal version now that we've parsed our inputs. + # we'll clean up this whole flow later. + if request.args.get('experimental') == 'true': + return cut_experimental(segments, cut_start, cut_end) + def feed_input(pipe): # pass each segment into ffmpeg's stdin in order, while outputing everything on stdout. for segment in segments: @@ -277,6 +285,89 @@ def cut(stream, variant): return Response(_cut(), mimetype='video/MP2T') +def cut_experimental(segments, cut_start, cut_end): + """Experimental cutting method where we cut the first and last segments only, + then cat them all together.""" + # Note: assumes codecs don't change from segment to segment. + + def streams_info(segment): + """Return ffprobe's info on streams as a list of dicts""" + output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path]) + return json.loads(output)['streams'] + + def ffmpeg(segment, cut_start=None, cut_end=None): + """Return a Popen object which is ffmpeg cutting the given segment""" + args = ['ffmpeg', '-i', segment.path] + # output from ffprobe is generally already sorted but let's be paranoid, + # because the order of map args matters. + for stream in sorted(streams_info(segment), key=lambda stream: stream['index']): + # map the same stream in the same position from input to output + args += ['-map', '0:{}'.format(stream['index'])] + if stream['codec_type'] in ('video', 'audio'): + # for non-metadata streams, make sure we use the same codec (metadata streams + # are a bit weirder, and ffmpeg will do the right thing anyway) + args += ['-codec:{}'.format(stream['index']), stream['codec_name']] + # now add trim args + if cut_start: + args += ['-ss', str(cut_start)] + if cut_end: + args += ['-to', str(cut_end)] + # output to stdout as MPEG-TS + args += ['-f', 'mpegts', '-'] + # run it + logging.info("Running segment cut with args: {}".format(" ".join(args))) + return subprocess.Popen(args, stdout=subprocess.PIPE) + + def chunks(fileobj, chunk_size=16*1024): + """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" + while True: + chunk = fileobj.read(chunk_size) + if not chunk: + break + yield chunk + + def _cut(): + # set first and last only if they actually need cutting + first = segments[0] if cut_start else None + last = segments[-1] if cut_end else None + for segment in segments: + # note first and last might be the same segment. + # note a segment will only match if cutting actually needs to be done + # (ie. cut_start or cut_end is not 0) + if segment in (first, last): + proc = None + try: + proc = ffmpeg( + segment, + cut_start if segment == first else None, + cut_end if segment == last else None, + ) + with closing(proc.stdout): + for chunk in chunks(proc.stdout): + yield chunk + proc.wait() + except Exception: + # try to clean up proc, ignoring errors + try: + proc.kill() + except OSError: + pass + else: + # check if ffmpeg had errors + if proc.returncode != 0: + raise Exception( + "Error while streaming cut: ffmpeg exited {}".format(proc.returncode) + ) + else: + # no cutting needed, just serve the file + with open(segment.path) as f: + for chunk in chunks(f): + yield chunk + + return Response(_cut(), mimetype='video/MP2T') + + + def main(host='0.0.0.0', port=8000, base_dir='.'): app.static_folder = base_dir server = WSGIServer((host, port), cors(app))