Merge pull request #136 from ekimekim/mike/cut-maybe-stream

full cut: Optionally use seekable file OR directly stream
pull/137/head
Mike Lang 5 years ago committed by GitHub
commit 0305c0797f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -295,8 +295,9 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args): def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args):
"""Return a Popen object which is ffmpeg cutting from stdin. """Return a Popen object which is ffmpeg cutting from stdin.
This is used when doing a full cut. This is used when doing a full cut.
Note the explicit output file object instead of using a pipe, If output_file is not subprocess.PIPE,
because most video formats require a seekable file. uses explicit output file object instead of using a pipe,
because some video formats require a seekable file.
""" """
args = [ args = [
'ffmpeg', 'ffmpeg',
@ -304,16 +305,20 @@ def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args):
'-i', '-', '-i', '-',
'-ss', cut_start, '-ss', cut_start,
'-t', duration, '-t', duration,
] + list(encode_args) + [ ] + list(encode_args)
# We want ffmpeg to write to our tempfile, which is its stdout. if output_file is subprocess.PIPE:
# However, it assumes that '-' means the output is not seekable. args.append('-') # output to stdout
# We trick it into understanding that its stdout is seekable by else:
# telling it to write to the fd via its /proc/self filename. args += [
'/proc/self/fd/1', # We want ffmpeg to write to our tempfile, which is its stdout.
# But of course, that file "already exists", so we need to give it # However, it assumes that '-' means the output is not seekable.
# permission to "overwrite" it. # We trick it into understanding that its stdout is seekable by
'-y', # telling it to write to the fd via its /proc/self filename.
] '/proc/self/fd/1',
# But of course, that file "already exists", so we need to give it
# permission to "overwrite" it.
'-y',
]
args = map(str, args) args = map(str, args)
logging.info("Running full cut with args: {}".format(" ".join(args))) logging.info("Running full cut with args: {}".format(" ".join(args)))
return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file) return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file)
@ -401,41 +406,67 @@ def fast_cut_segments(segments, start, end):
yield chunk yield chunk
@timed('cut', type='full', normalize=lambda _, segments, start, end, encode_args: (end - start).total_seconds()) def feed_input(segments, pipe):
def full_cut_segments(segments, start, end, encode_args): """Write each segment's data into the given pipe in order.
This is used to provide input to ffmpeg in a full cut."""
for segment in segments:
with open(segment.path) as f:
try:
shutil.copyfileobj(f, pipe)
except OSError as e:
# ignore EPIPE, as this just means the end cut meant we didn't need all it
if e.errno != errno.EPIPE:
raise
pipe.close()
@timed('cut',
type=lambda _, segments, start, end, encode_args, stream=False: "full-streamed" if stream else "full-buffered",
normalize=lambda _, segments, start, end, *a, **k: (end - start).total_seconds()),
)
def full_cut_segments(segments, start, end, encode_args, stream=False):
"""If stream=true, assume encode_args gives a streamable format,
and begin returning output immediately instead of waiting for ffmpeg to finish
and buffering to disk."""
# how far into the first segment to begin # how far into the first segment to begin
cut_start = max(0, (start - segments[0].start).total_seconds()) cut_start = max(0, (start - segments[0].start).total_seconds())
# duration # duration
duration = (end - start).total_seconds() duration = (end - start).total_seconds()
ffmpeg = None ffmpeg = None
input_feeder = None
try: try:
# Most ffmpeg output formats require a seekable file.
# For the same reason, it's not safe to begin uploading until ffmpeg if stream:
# has finished. We create a temporary file for this. # When streaming, we can just use a pipe
tempfile = TemporaryFile() tempfile = subprocess.PIPE
else:
# Some ffmpeg output formats require a seekable file.
# For the same reason, it's not safe to begin uploading until ffmpeg
# has finished. We create a temporary file for this.
tempfile = TemporaryFile()
ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, encode_args) ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, encode_args)
input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin)
# stream the input # When streaming, we can return data as it is available
for segment in segments: if stream:
with open(segment.path) as f: for chunk in read_chunks(ffmpeg.stdout):
try: yield chunk
shutil.copyfileobj(f, ffmpeg.stdin)
except OSError as e:
# ignore EPIPE, as this just means the end cut meant we didn't need all input
if e.errno != errno.EPIPE:
raise
ffmpeg.stdin.close()
# check if any errors occurred in input writing, or if ffmpeg exited non-success. # check if any errors occurred in input writing, or if ffmpeg exited non-success.
if ffmpeg.wait() != 0: if ffmpeg.wait() != 0:
raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode))
input_feeder.get() # re-raise any errors from feed_input()
# Now actually yield the resulting file # When not streaming, we can only return the data once ffmpeg has exited
for chunk in read_chunks(tempfile): if not stream:
yield chunk for chunk in read_chunks(tempfile):
yield chunk
finally: finally:
# if something goes wrong, try to clean up ignoring errors # if something goes wrong, try to clean up ignoring errors
if input_feeder is not None:
input_feeder.kill()
if ffmpeg is not None and ffmpeg.poll() is None: if ffmpeg is not None and ffmpeg.poll() is None:
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
try: try:

@ -294,8 +294,14 @@ class Cutter(object):
self.logger.debug("No encoding settings, using fast cut") self.logger.debug("No encoding settings, using fast cut")
cut = fast_cut_segments(job.segments, job.video_start, job.video_end) cut = fast_cut_segments(job.segments, job.video_start, job.video_end)
else: else:
self.logger.debug("Using encoding settings for cut: {}".format(upload_backend.encoding_settings)) self.logger.debug("Using encoding settings for {} cut: {}".format(
cut = full_cut_segments(job.segments, job.video_start, job.video_end, upload_backend.encoding_settings) "streamable" if upload_backend.encoding_streamable else "non-streamable",
upload_backend.encoding_settings,
))
cut = full_cut_segments(
job.segments, job.video_start, job.video_end,
upload_backend.encoding_settings, stream=upload_backend.encoding_streamable,
)
# This flag tracks whether we've told requests to finalize the upload, # This flag tracks whether we've told requests to finalize the upload,
# and serves to detect whether errors from the request call are recoverable. # and serves to detect whether errors from the request call are recoverable.

@ -67,12 +67,17 @@ class UploadBackend(object):
under the 'encoding_settings' attribute. under the 'encoding_settings' attribute.
If this is None, instead uses the 'fast cut' strategy where nothing If this is None, instead uses the 'fast cut' strategy where nothing
is transcoded. is transcoded.
In addition, if the output format doesn't need a seekable file,
you should set encoding_streamable = True so we know we can stream the output directly.
""" """
needs_transcode = False needs_transcode = False
# reasonable default if settings don't otherwise matter # reasonable default if settings don't otherwise matter:
encoding_settings = ['-f', 'mp4'] # high-quality mpegts, without wasting too much cpu on encoding
encoding_args = ['-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '0', '-f', 'mpegts']
encoding_streamable = True
def upload_video(self, title, description, tags, data): def upload_video(self, title, description, tags, data):
raise NotImplementedError raise NotImplementedError
@ -92,10 +97,14 @@ class Youtube(UploadBackend):
language: language:
The language code to describe all videos as. The language code to describe all videos as.
Default is "en", ie. English. Set to null to not set. Default is "en", ie. English. Set to null to not set.
use_yt_recommended_encoding:
Default False. If True, use the ffmpeg settings that Youtube recommends for
fast processing once uploaded. We suggest not bothering, as it doesn't appear
to make much difference.
""" """
needs_transcode = True needs_transcode = True
encoding_settings = [ recommended_settings = [
# Youtube's recommended settings: # Youtube's recommended settings:
'-codec:v', 'libx264', # Make the video codec x264 '-codec:v', 'libx264', # Make the video codec x264
'-crf', '21', # Set the video quality, this produces the bitrate range that YT likes '-crf', '21', # Set the video quality, this produces the bitrate range that YT likes
@ -108,7 +117,7 @@ class Youtube(UploadBackend):
'-movflags', 'faststart', # put MOOV atom at the front of the file, as requested '-movflags', 'faststart', # put MOOV atom at the front of the file, as requested
] ]
def __init__(self, credentials, hidden=False, category_id=23, language="en"): def __init__(self, credentials, hidden=False, category_id=23, language="en", use_yt_recommended_encoding=False):
self.logger = logging.getLogger(type(self).__name__) self.logger = logging.getLogger(type(self).__name__)
self.client = GoogleAPIClient( self.client = GoogleAPIClient(
credentials['client_id'], credentials['client_id'],
@ -118,6 +127,9 @@ class Youtube(UploadBackend):
self.hidden = hidden self.hidden = hidden
self.category_id = category_id self.category_id = category_id
self.language = language self.language = language
if use_yt_recommended_encoding:
self.encoding_settings = self.recommended_settings
self.encoding_streamable = False
def upload_video(self, title, description, tags, data): def upload_video(self, title, description, tags, data):
json = { json = {

@ -237,7 +237,6 @@ def cut(channel, quality):
Even if holes are allowed, a 406 may result if the resulting video would be empty. Even if holes are allowed, a 406 may result if the resulting video would be empty.
type: One of "fast" or "full". Default to "fast". type: One of "fast" or "full". Default to "fast".
A fast cut is much faster but minor artifacting may be present near the start and end. A fast cut is much faster but minor artifacting may be present near the start and end.
A fast cut is encoded as MPEG-TS, a full as mp4.
""" """
start = dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None start = dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None
end = dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None end = dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None
@ -272,8 +271,9 @@ def cut(channel, quality):
if type == 'fast': if type == 'fast':
return Response(fast_cut_segments(segments, start, end), mimetype='video/MP2T') return Response(fast_cut_segments(segments, start, end), mimetype='video/MP2T')
elif type == 'full': elif type == 'full':
# output as mp4 with no more specific encoding args # output as high-quality mpegts, without wasting too much cpu on encoding
return Response(full_cut_segments(segments, start, end, ['-f', 'mp4']), mimetype='video/mp4') encoding_args = ['-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '0', '-f', 'mpegts']
return Response(full_cut_segments(segments, start, end, encoding_args, stream=True), mimetype='video/MP2T')
else: else:
return "Unknown type {!r}. Must be 'fast' or 'full'.".format(type), 400 return "Unknown type {!r}. Must be 'fast' or 'full'.".format(type), 400

Loading…
Cancel
Save