From 6bf709287a7212abda1234494f9e5f40c4d8b7cb Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 2 Jan 2019 06:36:27 -0800 Subject: [PATCH] cutter: Introduce an alternate cutting approach that is much faster This cutter works by only cutting the first and last segments to size, then concatting them with the other segments, so we only ever process a few seconds of video instead of the entire video duration. However, to make this work, care must be taken that the cut segments use the same codecs as the other segments. The reason it's experimental is that we are not yet confident in its ability to cut accurately and without sync issues. We have seen some minor issues when trying to play back the raw output files, but youtube's re-encoding has consistently smoothed out those issues and they seem to be highly player-specific. Vigorous testing is needed. Also note that both methods right now (cat then cut, and cut then cat) only work if all the segments are cattable, that is they all use the same codecs, have the same resolution, etc. If a stream were to change its encoding settings, and we were cutting over that change, both approaches would not work. We should add checks for that scenario (which can only happen over a stream drop), and if so fallback to a slow method using ffmpeg's concat filter, which will work even for disparate codecs, though reconciling mismatched resolutions or frame rates may require further work. --- restreamer/restreamer/main.py | 91 +++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 1e01d08..657a926 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -7,6 +7,7 @@ import logging import os import shutil import signal +from contextlib import closing import dateutil.parser import gevent @@ -198,6 +199,8 @@ def cut(stream, variant): if any holes are detected, rather than producing a video with missing parts. Set to true by passing "true" (case insensitive). Even if holes are allowed, a 406 may result if the resulting video would be empty. + experimental: Optional, default false. If true, use the new, much faster, but experimental + method of cutting. """ start = dateutil.parser.parse(request.args['start']) end = dateutil.parser.parse(request.args['end']) @@ -231,6 +234,11 @@ def cut(stream, variant): # finally, calculate actual output duration, which is what ffmpeg will use duration = full_duration - cut_start - cut_end + # possibly defer to experiemntal version now that we've parsed our inputs. + # we'll clean up this whole flow later. + if request.args.get('experimental') == 'true': + return cut_experimental(segments, cut_start, cut_end) + def feed_input(pipe): # pass each segment into ffmpeg's stdin in order, while outputing everything on stdout. for segment in segments: @@ -277,6 +285,89 @@ def cut(stream, variant): return Response(_cut(), mimetype='video/MP2T') +def cut_experimental(segments, cut_start, cut_end): + """Experimental cutting method where we cut the first and last segments only, + then cat them all together.""" + # Note: assumes codecs don't change from segment to segment. + + def streams_info(segment): + """Return ffprobe's info on streams as a list of dicts""" + output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path]) + return json.loads(output)['streams'] + + def ffmpeg(segment, cut_start=None, cut_end=None): + """Return a Popen object which is ffmpeg cutting the given segment""" + args = ['ffmpeg', '-i', segment.path] + # output from ffprobe is generally already sorted but let's be paranoid, + # because the order of map args matters. + for stream in sorted(streams_info(segment), key=lambda stream: stream['index']): + # map the same stream in the same position from input to output + args += ['-map', '0:{}'.format(stream['index'])] + if stream['codec_type'] in ('video', 'audio'): + # for non-metadata streams, make sure we use the same codec (metadata streams + # are a bit weirder, and ffmpeg will do the right thing anyway) + args += ['-codec:{}'.format(stream['index']), stream['codec_name']] + # now add trim args + if cut_start: + args += ['-ss', str(cut_start)] + if cut_end: + args += ['-to', str(cut_end)] + # output to stdout as MPEG-TS + args += ['-f', 'mpegts', '-'] + # run it + logging.info("Running segment cut with args: {}".format(" ".join(args))) + return subprocess.Popen(args, stdout=subprocess.PIPE) + + def chunks(fileobj, chunk_size=16*1024): + """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" + while True: + chunk = fileobj.read(chunk_size) + if not chunk: + break + yield chunk + + def _cut(): + # set first and last only if they actually need cutting + first = segments[0] if cut_start else None + last = segments[-1] if cut_end else None + for segment in segments: + # note first and last might be the same segment. + # note a segment will only match if cutting actually needs to be done + # (ie. cut_start or cut_end is not 0) + if segment in (first, last): + proc = None + try: + proc = ffmpeg( + segment, + cut_start if segment == first else None, + cut_end if segment == last else None, + ) + with closing(proc.stdout): + for chunk in chunks(proc.stdout): + yield chunk + proc.wait() + except Exception: + # try to clean up proc, ignoring errors + try: + proc.kill() + except OSError: + pass + else: + # check if ffmpeg had errors + if proc.returncode != 0: + raise Exception( + "Error while streaming cut: ffmpeg exited {}".format(proc.returncode) + ) + else: + # no cutting needed, just serve the file + with open(segment.path) as f: + for chunk in chunks(f): + yield chunk + + return Response(_cut(), mimetype='video/MP2T') + + + def main(host='0.0.0.0', port=8000, base_dir='.'): app.static_folder = base_dir server = WSGIServer((host, port), cors(app))