Replace ffmpeg_cut_stdin() with ffmpeg_cut_many()

This is a more featureful wrapper around ffmpeg with notable differences:
- It's used as a context manager, and so can manage its own cleanup
- It takes care of input feeding
- It can handle multiple inputs (via pipes), instead of one (via stdin)

This drastically reduces the setup and cleanup code required for most basic usage,
and the multi-input support will be used in followup changes.
pull/400/head
Mike Lang 6 months ago committed by Mike Lang
parent d571bbe81e
commit e65145bcad

@ -11,12 +11,13 @@ import logging
import os import os
import shutil import shutil
from collections import namedtuple from collections import namedtuple
from contextlib import closing from contextlib import closing, contextmanager
from tempfile import TemporaryFile from tempfile import TemporaryFile
from uuid import uuid4 from uuid import uuid4
import gevent import gevent
from gevent import subprocess from gevent import subprocess
from gevent.fileobject import FileObject
from .cached_iterator import CachedIterator from .cached_iterator import CachedIterator
from .stats import timed from .stats import timed
@ -390,24 +391,30 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
return subprocess.Popen(args, stdout=subprocess.PIPE) return subprocess.Popen(args, stdout=subprocess.PIPE)
def ffmpeg_cut_stdin(output_file, encode_args): @contextmanager
"""Return a Popen object which is ffmpeg cutting from stdin. def ffmpeg_cut_many(inputs, output_file, encode_args):
This is used when doing a full cut, plus various functions for transforming segment video without doing a multi-range cut.
If output_file is not subprocess.PIPE,
uses explicit output file object instead of using a pipe,
because some output formats require a seekable file.
""" """
args = [ Context manager that produces a Popen object which is ffmpeg cutting the given inputs.
INPUTS is a list of (segments, input_args). The list of segments will be fed as input data,
preceeded by the given input args.
OUTPUT_FILE may be a writable file object (with fileno) or subprocess.PIPE.
If subprocess.PIPE, then output can be read from the Popen object's stdout.
Using a stdout pipe is preferred but a file can be useful if the output needs to be seekable.
Upon successful context exit, we block until ffmpeg finishes and raise if anything errored.
Upon unsuccessful exit, ffmpeg will be killed if still running.
In either case all files will be closed and everything cleaned up.
"""
BASE_ARGS = [
'ffmpeg', 'ffmpeg',
'-hide_banner', '-loglevel', 'error', # suppress noisy output '-hide_banner', '-loglevel', 'error', # suppress noisy output
'-i', '-',
] ]
args += list(encode_args)
if output_file is subprocess.PIPE: if output_file is subprocess.PIPE:
args.append('-') # output to stdout output_args = ['-'] # output to stdout
else: else:
args += [ output_args = [
# We want ffmpeg to write to our tempfile, which is its stdout. # We want ffmpeg to write to our tempfile, which is its stdout.
# However, it assumes that '-' means the output is not seekable. # However, it assumes that '-' means the output is not seekable.
# We trick it into understanding that its stdout is seekable by # We trick it into understanding that its stdout is seekable by
@ -417,9 +424,63 @@ def ffmpeg_cut_stdin(output_file, encode_args):
# permission to "overwrite" it. # permission to "overwrite" it.
'-y', '-y',
] ]
input_pipes = []
input_feeders = []
ffmpeg = None
try:
# Create pipes and feeders, and prepare input args
all_input_args = []
for segments, input_args in inputs:
# prepare the input pipe
read_fd, write_fd = os.pipe()
input_pipes.append(read_fd)
# set up the writer to fill the pipe
write_file = FileObject(write_fd, 'wb')
input_feeder = gevent.spawn(feed_input, segments, write_file)
input_feeders.append(input_feeder)
# add input file to ffmpeg args
all_input_args += input_args + ["-i", "/proc/self/fd/{}".format(read_fd)]
# Prepare final arg list and spawn the process
args = BASE_ARGS + all_input_args + encode_args + output_args
args = list(map(str, args)) args = list(map(str, args))
logging.info("Cutting from stdin with args: {}".format(" ".join(args))) logging.info("Running ffmpeg with args: {}".format(" ".join(args)))
return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file) ffmpeg = subprocess.Popen(args, stdout=output_file, pass_fds=input_pipes)
# Close input fds now that the child is holding them.
# Note we remove them from the list one at a time so any failure in a close()
# call will still close the rest of them during cleanup.
while input_pipes:
fd = input_pipes.pop()
os.close(fd)
# produce context manager result, everything after this only applies if
# the context block succeeds
yield ffmpeg
# check if any errors occurred in input writing, or if ffmpeg exited non-success.
if ffmpeg.wait() != 0:
raise Exception("Error while cutting: ffmpeg exited {}".format(ffmpeg.returncode))
for input_feeder in input_feeders:
input_feeder.get() # re-raise any errors from feed_input() calls
finally:
# if something goes wrong, try to clean up ignoring errors
for input_feeder in input_feeders:
input_feeder.kill()
if ffmpeg is not None and ffmpeg.poll() is None:
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
try:
action()
except (OSError, IOError):
pass
for fd in input_pipes:
try:
os.close(fd)
except (OSError, IOError):
pass
def read_chunks(fileobj, chunk_size=16*1024): def read_chunks(fileobj, chunk_size=16*1024):
@ -622,10 +683,6 @@ def full_cut_segments(segments, start, end, encode_args, stream=False):
# duration # duration
duration = (end - start).total_seconds() duration = (end - start).total_seconds()
ffmpeg = None
input_feeder = None
try:
if stream: if stream:
# When streaming, we can just use a pipe # When streaming, we can just use a pipe
tempfile = subprocess.PIPE tempfile = subprocess.PIPE
@ -641,33 +698,18 @@ def full_cut_segments(segments, start, end, encode_args, stream=False):
if duration is not None: if duration is not None:
args += ['-t', duration] args += ['-t', duration]
args += list(encode_args) args += list(encode_args)
ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, args)
input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin)
with ffmpeg_cut_many([segments, ()], tempfile, args) as ffmpeg:
# When streaming, we can return data as it is available # When streaming, we can return data as it is available
# Otherwise, just exit the context manager so tempfile is fully written.
if stream: if stream:
for chunk in read_chunks(ffmpeg.stdout): for chunk in read_chunks(ffmpeg.stdout):
yield chunk yield chunk
# check if any errors occurred in input writing, or if ffmpeg exited non-success.
if ffmpeg.wait() != 0:
raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode))
input_feeder.get() # re-raise any errors from feed_input()
# When not streaming, we can only return the data once ffmpeg has exited # When not streaming, we can only return the data once ffmpeg has exited
if not stream: if not stream:
for chunk in read_chunks(tempfile): for chunk in read_chunks(tempfile):
yield chunk yield chunk
finally:
# if something goes wrong, try to clean up ignoring errors
if input_feeder is not None:
input_feeder.kill()
if ffmpeg is not None and ffmpeg.poll() is None:
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
try:
action()
except (OSError, IOError):
pass
@timed('cut', cut_type='archive', normalize=lambda ret, sr, ranges: range_total(ranges)) @timed('cut', cut_type='archive', normalize=lambda ret, sr, ranges: range_total(ranges))
@ -692,32 +734,15 @@ def archive_cut_segments(segment_ranges, ranges, tempdir):
for segments in segment_ranges: for segments in segment_ranges:
contiguous_ranges += list(split_contiguous(segments)) contiguous_ranges += list(split_contiguous(segments))
for segments in contiguous_ranges: for segments in contiguous_ranges:
ffmpeg = None
input_feeder = None
tempfile_name = os.path.join(tempdir, "archive-temp-{}.mkv".format(uuid4())) tempfile_name = os.path.join(tempdir, "archive-temp-{}.mkv".format(uuid4()))
try: try:
tempfile = open(tempfile_name, "wb") with open(tempfile_name, "wb") as tempfile:
with ffmpeg_cut_many([(segments, [])], tempfile, encode_args):
ffmpeg = ffmpeg_cut_stdin(tempfile, encode_args) # We just want ffmpeg to run to completion, which ffmpeg_cut_many()
input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) # will do on exit for us.
# since we've now handed off the tempfile fd to ffmpeg, close ours
tempfile.close()
# check if any errors occurred in input writing, or if ffmpeg exited non-success.
if ffmpeg.wait() != 0:
raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode))
input_feeder.get() # re-raise any errors from feed_input()
except:
# if something goes wrong, try to clean up ignoring errors
if input_feeder is not None:
input_feeder.kill()
if ffmpeg is not None and ffmpeg.poll() is None:
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
try:
action()
except (OSError, IOError):
pass pass
except:
# if something goes wrong, try to delete the tempfile
try: try:
os.remove(tempfile_name) os.remove(tempfile_name)
except (OSError, IOError): except (OSError, IOError):
@ -742,9 +767,6 @@ def render_segments_waveform(segments, size=(1024, 128), scale='sqrt', color='#0
# Remove holes # Remove holes
segments = [segment for segment in segments if segment is not None] segments = [segment for segment in segments if segment is not None]
ffmpeg = None
input_feeder = None
try:
args = [ args = [
# create waveform from input audio # create waveform from input audio
'-filter_complex', '-filter_complex',
@ -754,27 +776,10 @@ def render_segments_waveform(segments, size=(1024, 128), scale='sqrt', color='#0
# output as png # output as png
'-f', 'image2', '-c', 'png', '-f', 'image2', '-c', 'png',
] ]
ffmpeg = ffmpeg_cut_stdin(subprocess.PIPE, args) with ffmpeg_cut_many([(segments, [])], subprocess.PIPE, args) as ffmpeg:
input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin)
for chunk in read_chunks(ffmpeg.stdout): for chunk in read_chunks(ffmpeg.stdout):
yield chunk yield chunk
# check if any errors occurred in input writing, or if ffmpeg exited non-success.
if ffmpeg.wait() != 0:
raise Exception("Error while rendering waveform: ffmpeg exited {}".format(ffmpeg.returncode))
input_feeder.get() # re-raise any errors from feed_input()
finally:
# if something goes wrong, try to clean up ignoring errors
if input_feeder is not None:
input_feeder.kill()
if ffmpeg is not None and ffmpeg.poll() is None:
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
try:
action()
except (OSError, IOError):
pass
@timed('extract_frame') @timed('extract_frame')
def extract_frame(segments, timestamp): def extract_frame(segments, timestamp):
@ -790,39 +795,18 @@ def extract_frame(segments, timestamp):
# "cut" input so that first frame is our target frame # "cut" input so that first frame is our target frame
cut_start = (timestamp - segments[0].start).total_seconds() cut_start = (timestamp - segments[0].start).total_seconds()
input_args = ["-ss", cut_start]
ffmpeg = None
input_feeder = None
try:
args = [ args = [
# cut to correct start frame
"-ss", cut_start,
# get a single frame # get a single frame
'-vframes', '1', '-vframes', '1',
# output as png # output as png
'-f', 'image2', '-c', 'png', '-f', 'image2', '-c', 'png',
] ]
ffmpeg = ffmpeg_cut_stdin(subprocess.PIPE, args) with ffmpeg_cut_many([(segments, input_args)], subprocess.PIPE, args) as ffmpeg:
input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin)
for chunk in read_chunks(ffmpeg.stdout): for chunk in read_chunks(ffmpeg.stdout):
yield chunk yield chunk
# check if any errors occurred in input writing, or if ffmpeg exited non-success.
if ffmpeg.wait() != 0:
raise Exception("Error while extracting frame: ffmpeg exited {}".format(ffmpeg.returncode))
input_feeder.get() # re-raise any errors from feed_input()
finally:
# if something goes wrong, try to clean up ignoring errors
if input_feeder is not None:
input_feeder.kill()
if ffmpeg is not None and ffmpeg.poll() is None:
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
try:
action()
except (OSError, IOError):
pass
def split_contiguous(segments): def split_contiguous(segments):
"""For a list of segments, return a list of contiguous ranges of segments. """For a list of segments, return a list of contiguous ranges of segments.

Loading…
Cancel
Save