cutter: Introduce an alternate cutting approach that is much faster

This cutter works by only cutting the first and last segments to size,
then concatting them with the other segments, so we only ever process a few seconds
of video instead of the entire video duration.
However, to make this work, care must be taken that the cut segments use the same codecs
as the other segments.

The reason it's experimental is that we are not yet confident in its ability
to cut accurately and without sync issues. We have seen some minor issues when trying to play
back the raw output files, but youtube's re-encoding has consistently smoothed out those issues
and they seem to be highly player-specific. Vigorous testing is needed.

Also note that both methods right now (cat then cut, and cut then cat) only work if all the segments
are cattable, that is they all use the same codecs, have the same resolution, etc.
If a stream were to change its encoding settings, and we were cutting over that change,
both approaches would not work. We should add checks for that scenario (which can only happen
over a stream drop), and if so fallback to a slow method using ffmpeg's concat filter,
which will work even for disparate codecs, though reconciling mismatched resolutions or frame rates
may require further work.
pull/23/head
Mike Lang 6 years ago committed by Christopher Usher
parent 6815924097
commit 6bf709287a

@ -7,6 +7,7 @@ import logging
import os import os
import shutil import shutil
import signal import signal
from contextlib import closing
import dateutil.parser import dateutil.parser
import gevent import gevent
@ -198,6 +199,8 @@ def cut(stream, variant):
if any holes are detected, rather than producing a video with missing parts. if any holes are detected, rather than producing a video with missing parts.
Set to true by passing "true" (case insensitive). Set to true by passing "true" (case insensitive).
Even if holes are allowed, a 406 may result if the resulting video would be empty. Even if holes are allowed, a 406 may result if the resulting video would be empty.
experimental: Optional, default false. If true, use the new, much faster, but experimental
method of cutting.
""" """
start = dateutil.parser.parse(request.args['start']) start = dateutil.parser.parse(request.args['start'])
end = dateutil.parser.parse(request.args['end']) end = dateutil.parser.parse(request.args['end'])
@ -231,6 +234,11 @@ def cut(stream, variant):
# finally, calculate actual output duration, which is what ffmpeg will use # finally, calculate actual output duration, which is what ffmpeg will use
duration = full_duration - cut_start - cut_end duration = full_duration - cut_start - cut_end
# possibly defer to experiemntal version now that we've parsed our inputs.
# we'll clean up this whole flow later.
if request.args.get('experimental') == 'true':
return cut_experimental(segments, cut_start, cut_end)
def feed_input(pipe): def feed_input(pipe):
# pass each segment into ffmpeg's stdin in order, while outputing everything on stdout. # pass each segment into ffmpeg's stdin in order, while outputing everything on stdout.
for segment in segments: for segment in segments:
@ -277,6 +285,89 @@ def cut(stream, variant):
return Response(_cut(), mimetype='video/MP2T') return Response(_cut(), mimetype='video/MP2T')
def cut_experimental(segments, cut_start, cut_end):
"""Experimental cutting method where we cut the first and last segments only,
then cat them all together."""
# Note: assumes codecs don't change from segment to segment.
def streams_info(segment):
"""Return ffprobe's info on streams as a list of dicts"""
output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path])
return json.loads(output)['streams']
def ffmpeg(segment, cut_start=None, cut_end=None):
"""Return a Popen object which is ffmpeg cutting the given segment"""
args = ['ffmpeg', '-i', segment.path]
# output from ffprobe is generally already sorted but let's be paranoid,
# because the order of map args matters.
for stream in sorted(streams_info(segment), key=lambda stream: stream['index']):
# map the same stream in the same position from input to output
args += ['-map', '0:{}'.format(stream['index'])]
if stream['codec_type'] in ('video', 'audio'):
# for non-metadata streams, make sure we use the same codec (metadata streams
# are a bit weirder, and ffmpeg will do the right thing anyway)
args += ['-codec:{}'.format(stream['index']), stream['codec_name']]
# now add trim args
if cut_start:
args += ['-ss', str(cut_start)]
if cut_end:
args += ['-to', str(cut_end)]
# output to stdout as MPEG-TS
args += ['-f', 'mpegts', '-']
# run it
logging.info("Running segment cut with args: {}".format(" ".join(args)))
return subprocess.Popen(args, stdout=subprocess.PIPE)
def chunks(fileobj, chunk_size=16*1024):
"""Read fileobj until EOF, yielding chunk_size sized chunks of data."""
while True:
chunk = fileobj.read(chunk_size)
if not chunk:
break
yield chunk
def _cut():
# set first and last only if they actually need cutting
first = segments[0] if cut_start else None
last = segments[-1] if cut_end else None
for segment in segments:
# note first and last might be the same segment.
# note a segment will only match if cutting actually needs to be done
# (ie. cut_start or cut_end is not 0)
if segment in (first, last):
proc = None
try:
proc = ffmpeg(
segment,
cut_start if segment == first else None,
cut_end if segment == last else None,
)
with closing(proc.stdout):
for chunk in chunks(proc.stdout):
yield chunk
proc.wait()
except Exception:
# try to clean up proc, ignoring errors
try:
proc.kill()
except OSError:
pass
else:
# check if ffmpeg had errors
if proc.returncode != 0:
raise Exception(
"Error while streaming cut: ffmpeg exited {}".format(proc.returncode)
)
else:
# no cutting needed, just serve the file
with open(segment.path) as f:
for chunk in chunks(f):
yield chunk
return Response(_cut(), mimetype='video/MP2T')
def main(host='0.0.0.0', port=8000, base_dir='.'): def main(host='0.0.0.0', port=8000, base_dir='.'):
app.static_folder = base_dir app.static_folder = base_dir
server = WSGIServer((host, port), cors(app)) server = WSGIServer((host, port), cors(app))

Loading…
Cancel
Save