From f5e6af9befcffd3247eab39792ebec6adb99f0b6 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 26 May 2024 11:43:44 +1000 Subject: [PATCH] Replace ffmpeg_cut_stdin() with ffmpeg_cut_many() This is a more featureful wrapper around ffmpeg with notable differences: - It's used as a context manager, and so can manage its own cleanup - It takes care of input feeding - It can handle multiple inputs (via pipes), instead of one (via stdin) This drastically reduces the setup and cleanup code required for most basic usage, and the multi-input support will be used in followup changes. --- common/common/segments.py | 258 ++++++++++++++++++-------------------- 1 file changed, 121 insertions(+), 137 deletions(-) diff --git a/common/common/segments.py b/common/common/segments.py index ffc66fb..c5dc0a5 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -11,12 +11,13 @@ import logging import os import shutil from collections import namedtuple -from contextlib import closing +from contextlib import closing, contextmanager from tempfile import TemporaryFile from uuid import uuid4 import gevent from gevent import subprocess +from gevent.fileobject import FileObject from .cached_iterator import CachedIterator from .stats import timed @@ -390,24 +391,30 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): return subprocess.Popen(args, stdout=subprocess.PIPE) -def ffmpeg_cut_stdin(output_file, encode_args): - """Return a Popen object which is ffmpeg cutting from stdin. - This is used when doing a full cut, plus various functions for transforming segment video without doing a multi-range cut. - If output_file is not subprocess.PIPE, - uses explicit output file object instead of using a pipe, - because some output formats require a seekable file. +@contextmanager +def ffmpeg_cut_many(inputs, output_file, encode_args): """ - args = [ + Context manager that produces a Popen object which is ffmpeg cutting the given inputs. + + INPUTS is a list of (segments, input_args). The list of segments will be fed as input data, + preceeded by the given input args. + OUTPUT_FILE may be a writable file object (with fileno) or subprocess.PIPE. + If subprocess.PIPE, then output can be read from the Popen object's stdout. + Using a stdout pipe is preferred but a file can be useful if the output needs to be seekable. + + Upon successful context exit, we block until ffmpeg finishes and raise if anything errored. + Upon unsuccessful exit, ffmpeg will be killed if still running. + In either case all files will be closed and everything cleaned up. + """ + BASE_ARGS = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error', # suppress noisy output - '-i', '-', ] - args += list(encode_args) if output_file is subprocess.PIPE: - args.append('-') # output to stdout + output_args = ['-'] # output to stdout else: - args += [ + output_args = [ # We want ffmpeg to write to our tempfile, which is its stdout. # However, it assumes that '-' means the output is not seekable. # We trick it into understanding that its stdout is seekable by @@ -417,9 +424,63 @@ def ffmpeg_cut_stdin(output_file, encode_args): # permission to "overwrite" it. '-y', ] - args = list(map(str, args)) - logging.info("Cutting from stdin with args: {}".format(" ".join(args))) - return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file) + + input_pipes = [] + input_feeders = [] + ffmpeg = None + try: + + # Create pipes and feeders, and prepare input args + all_input_args = [] + for segments, input_args in inputs: + # prepare the input pipe + read_fd, write_fd = os.pipe() + input_pipes.append(read_fd) + # set up the writer to fill the pipe + write_file = FileObject(write_fd, 'wb') + input_feeder = gevent.spawn(feed_input, segments, write_file) + input_feeders.append(input_feeder) + # add input file to ffmpeg args + all_input_args += input_args + ["-i", "/proc/self/fd/{}".format(read_fd)] + + # Prepare final arg list and spawn the process + args = BASE_ARGS + all_input_args + encode_args + output_args + args = list(map(str, args)) + logging.info("Running ffmpeg with args: {}".format(" ".join(args))) + ffmpeg = subprocess.Popen(args, stdout=output_file, pass_fds=input_pipes) + + # Close input fds now that the child is holding them. + # Note we remove them from the list one at a time so any failure in a close() + # call will still close the rest of them during cleanup. + while input_pipes: + fd = input_pipes.pop() + os.close(fd) + + # produce context manager result, everything after this only applies if + # the context block succeeds + yield ffmpeg + + # check if any errors occurred in input writing, or if ffmpeg exited non-success. + if ffmpeg.wait() != 0: + raise Exception("Error while cutting: ffmpeg exited {}".format(ffmpeg.returncode)) + for input_feeder in input_feeders: + input_feeder.get() # re-raise any errors from feed_input() calls + + finally: + # if something goes wrong, try to clean up ignoring errors + for input_feeder in input_feeders: + input_feeder.kill() + if ffmpeg is not None and ffmpeg.poll() is None: + for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): + try: + action() + except (OSError, IOError): + pass + for fd in input_pipes: + try: + os.close(fd) + except (OSError, IOError): + pass def read_chunks(fileobj, chunk_size=16*1024): @@ -622,52 +683,33 @@ def full_cut_segments(segments, start, end, encode_args, stream=False): # duration duration = (end - start).total_seconds() - ffmpeg = None - input_feeder = None - try: - - if stream: - # When streaming, we can just use a pipe - tempfile = subprocess.PIPE - else: - # Some ffmpeg output formats require a seekable file. - # For the same reason, it's not safe to begin uploading until ffmpeg - # has finished. We create a temporary file for this. - tempfile = TemporaryFile() - - args = [] - if cut_start is not None: - args += ['-ss', cut_start] - if duration is not None: - args += ['-t', duration] - args += list(encode_args) - ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, args) - input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) + if stream: + # When streaming, we can just use a pipe + tempfile = subprocess.PIPE + else: + # Some ffmpeg output formats require a seekable file. + # For the same reason, it's not safe to begin uploading until ffmpeg + # has finished. We create a temporary file for this. + tempfile = TemporaryFile() + + args = [] + if cut_start is not None: + args += ['-ss', cut_start] + if duration is not None: + args += ['-t', duration] + args += list(encode_args) + with ffmpeg_cut_many([segments, ()], tempfile, args) as ffmpeg: # When streaming, we can return data as it is available + # Otherwise, just exit the context manager so tempfile is fully written. if stream: for chunk in read_chunks(ffmpeg.stdout): yield chunk - # check if any errors occurred in input writing, or if ffmpeg exited non-success. - if ffmpeg.wait() != 0: - raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) - input_feeder.get() # re-raise any errors from feed_input() - - # When not streaming, we can only return the data once ffmpeg has exited - if not stream: - for chunk in read_chunks(tempfile): - yield chunk - finally: - # if something goes wrong, try to clean up ignoring errors - if input_feeder is not None: - input_feeder.kill() - if ffmpeg is not None and ffmpeg.poll() is None: - for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): - try: - action() - except (OSError, IOError): - pass + # When not streaming, we can only return the data once ffmpeg has exited + if not stream: + for chunk in read_chunks(tempfile): + yield chunk @timed('cut', cut_type='archive', normalize=lambda ret, sr, ranges: range_total(ranges)) @@ -692,32 +734,15 @@ def archive_cut_segments(segment_ranges, ranges, tempdir): for segments in segment_ranges: contiguous_ranges += list(split_contiguous(segments)) for segments in contiguous_ranges: - ffmpeg = None - input_feeder = None tempfile_name = os.path.join(tempdir, "archive-temp-{}.mkv".format(uuid4())) try: - tempfile = open(tempfile_name, "wb") - - ffmpeg = ffmpeg_cut_stdin(tempfile, encode_args) - input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) - - # since we've now handed off the tempfile fd to ffmpeg, close ours - tempfile.close() - - # check if any errors occurred in input writing, or if ffmpeg exited non-success. - if ffmpeg.wait() != 0: - raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) - input_feeder.get() # re-raise any errors from feed_input() + with open(tempfile_name, "wb") as tempfile: + with ffmpeg_cut_many([(segments, [])], tempfile, encode_args): + # We just want ffmpeg to run to completion, which ffmpeg_cut_many() + # will do on exit for us. + pass except: - # if something goes wrong, try to clean up ignoring errors - if input_feeder is not None: - input_feeder.kill() - if ffmpeg is not None and ffmpeg.poll() is None: - for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): - try: - action() - except (OSError, IOError): - pass + # if something goes wrong, try to delete the tempfile try: os.remove(tempfile_name) except (OSError, IOError): @@ -742,39 +767,19 @@ def render_segments_waveform(segments, size=(1024, 128), scale='sqrt', color='#0 # Remove holes segments = [segment for segment in segments if segment is not None] - ffmpeg = None - input_feeder = None - try: - args = [ - # create waveform from input audio - '-filter_complex', - f'[0:a]showwavespic=size={width}x{height}:colors={color}:scale={scale}[out]', - # use created waveform as our output - '-map', '[out]', - # output as png - '-f', 'image2', '-c', 'png', - ] - ffmpeg = ffmpeg_cut_stdin(subprocess.PIPE, args) - input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) - + args = [ + # create waveform from input audio + '-filter_complex', + f'[0:a]showwavespic=size={width}x{height}:colors={color}:scale={scale}[out]', + # use created waveform as our output + '-map', '[out]', + # output as png + '-f', 'image2', '-c', 'png', + ] + with ffmpeg_cut_many([(segments, [])], subprocess.PIPE, args) as ffmpeg: for chunk in read_chunks(ffmpeg.stdout): yield chunk - # check if any errors occurred in input writing, or if ffmpeg exited non-success. - if ffmpeg.wait() != 0: - raise Exception("Error while rendering waveform: ffmpeg exited {}".format(ffmpeg.returncode)) - input_feeder.get() # re-raise any errors from feed_input() - finally: - # if something goes wrong, try to clean up ignoring errors - if input_feeder is not None: - input_feeder.kill() - if ffmpeg is not None and ffmpeg.poll() is None: - for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): - try: - action() - except (OSError, IOError): - pass - @timed('extract_frame') def extract_frame(segments, timestamp): @@ -790,39 +795,18 @@ def extract_frame(segments, timestamp): # "cut" input so that first frame is our target frame cut_start = (timestamp - segments[0].start).total_seconds() + input_args = ["-ss", cut_start] - ffmpeg = None - input_feeder = None - try: - args = [ - # cut to correct start frame - "-ss", cut_start, - # get a single frame - '-vframes', '1', - # output as png - '-f', 'image2', '-c', 'png', - ] - ffmpeg = ffmpeg_cut_stdin(subprocess.PIPE, args) - input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) - + args = [ + # get a single frame + '-vframes', '1', + # output as png + '-f', 'image2', '-c', 'png', + ] + with ffmpeg_cut_many([(segments, input_args)], subprocess.PIPE, args) as ffmpeg: for chunk in read_chunks(ffmpeg.stdout): yield chunk - # check if any errors occurred in input writing, or if ffmpeg exited non-success. - if ffmpeg.wait() != 0: - raise Exception("Error while extracting frame: ffmpeg exited {}".format(ffmpeg.returncode)) - input_feeder.get() # re-raise any errors from feed_input() - finally: - # if something goes wrong, try to clean up ignoring errors - if input_feeder is not None: - input_feeder.kill() - if ffmpeg is not None and ffmpeg.poll() is None: - for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): - try: - action() - except (OSError, IOError): - pass - def split_contiguous(segments): """For a list of segments, return a list of contiguous ranges of segments.