diff --git a/common/common/segments.py b/common/common/segments.py index 234ca58..9634d32 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -13,6 +13,7 @@ import shutil import sys from collections import namedtuple from contextlib import closing +from tempfile import TemporaryFile import gevent from gevent import subprocess @@ -286,9 +287,12 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): return subprocess.Popen(args, stdout=subprocess.PIPE) -def ffmpeg_cut_stdin(cut_start, duration, encode_args): +def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args): """Return a Popen object which is ffmpeg cutting from stdin. - This is used when doing a full cut.""" + This is used when doing a full cut. + Note the explicit output file object instead of using a pipe, + because most video formats require a seekable file. + """ args = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error', # suppress noisy output @@ -296,11 +300,18 @@ def ffmpeg_cut_stdin(cut_start, duration, encode_args): '-ss', cut_start, '-t', duration, ] + list(encode_args) + [ - '-', # output to stdout + # We want ffmpeg to write to our tempfile, which is its stdout. + # However, it assumes that '-' means the output is not seekable. + # We trick it into understanding that its stdout is seekable by + # telling it to write to the fd via its /proc/self filename. + '/proc/self/fd/1', + # But of course, that file "already exists", so we need to give it + # permission to "overwrite" it. + '-y', ] args = map(str, args) logging.info("Running full cut with args: {}".format(" ".join(args))) - return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file) def read_chunks(fileobj, chunk_size=16*1024): @@ -384,20 +395,6 @@ def fast_cut_segments(segments, start, end): yield chunk -def feed_input(segments, pipe): - """Write each segment's data into the given pipe in order. - This is used to provide input to ffmpeg in a full cut.""" - for segment in segments: - with open(segment.path) as f: - try: - shutil.copyfileobj(f, pipe) - except OSError as e: - # ignore EPIPE, as this just means the end cut meant we didn't need all input - if e.errno != errno.EPIPE: - raise - pipe.close() - - def full_cut_segments(segments, start, end, encode_args): # how far into the first segment to begin cut_start = max(0, (start - segments[0].start).total_seconds()) @@ -405,21 +402,33 @@ def full_cut_segments(segments, start, end, encode_args): duration = (end - start).total_seconds() ffmpeg = None - input_feeder = None try: - ffmpeg = ffmpeg_cut_stdin(cut_start, duration, encode_args) - input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) - # stream the output until it is closed - for chunk in read_chunks(ffmpeg.stdout): - yield chunk + # Most ffmpeg output formats require a seekable file. + # For the same reason, it's not safe to begin uploading until ffmpeg + # has finished. We create a temporary file for this. + tempfile = TemporaryFile() + ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, encode_args) + + # stream the input + for segment in segments: + with open(segment.path) as f: + try: + shutil.copyfileobj(f, ffmpeg.stdin) + except OSError as e: + # ignore EPIPE, as this just means the end cut meant we didn't need all input + if e.errno != errno.EPIPE: + raise + ffmpeg.stdin.close() + # check if any errors occurred in input writing, or if ffmpeg exited non-success. if ffmpeg.wait() != 0: raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) - input_feeder.get() # re-raise any errors from feed_input() + + # Now actually yield the resulting file + for chunk in read_chunks(tempfile): + yield chunk finally: # if something goes wrong, try to clean up ignoring errors - if input_feeder is not None: - input_feeder.kill() if ffmpeg is not None and ffmpeg.poll() is None: for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): try: