Port existing cutting code from restreamer into common

Note this moves over the 'experimental' cutter and deletes the original cutter
that concatenates entire videos before cutting.
We may eventually want to revive that method if the experimental cutter turns out
to introduce too many issues.

We move most of the code over verbatim, but adjust it such that it acts
as a generic iterator that can be used in a variety of contexts.

Some other changes made during the move include telling ffmpeg to be quieter
(don't output version info and junk, only log if something goes wrong),
and avoiding errors during cleanup.
pull/47/head
Mike Lang 6 years ago committed by Mike Lang
parent 3d9ba77745
commit dfc64481a6

@ -7,7 +7,7 @@ import errno
import os
import random
from .segments import get_best_segments, parse_segment_path, SegmentInfo
from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo
from .stats import timed, PromLogCountsHandler, install_stacksampler

@ -6,12 +6,15 @@ import base64
import datetime
import errno
import itertools
import json
import logging
import os
import sys
from collections import namedtuple
from contextlib import closing
import gevent
from gevent import subprocess
from .stats import timed
@ -232,3 +235,123 @@ def best_segments_by_start(hour):
continue
# no full segments, fall back to measuring partials.
yield max(segments, key=lambda segment: os.stat(segment.path).st_size)
def streams_info(segment):
"""Return ffprobe's info on streams as a list of dicts"""
output = subprocess.check_output([
'ffprobe',
'-hide_banner', '-loglevel', 'fatal', # suppress noisy output
'-of', 'json', '-show_streams', # get streams info as json
segment.path,
])
return json.loads(output)['streams']
def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
"""Return a Popen object which is ffmpeg cutting the given segment"""
args = [
'ffmpeg',
'-hide_banner', '-loglevel', 'fatal', # suppress noisy output
'-i', segment.path,
]
# output from ffprobe is generally already sorted but let's be paranoid,
# because the order of map args matters.
for stream in sorted(streams_info(segment), key=lambda stream: stream['index']):
# map the same stream in the same position from input to output
args += ['-map', '0:{}'.format(stream['index'])]
if stream['codec_type'] in ('video', 'audio'):
# for non-metadata streams, make sure we use the same codec (metadata streams
# are a bit weirder, and ffmpeg will do the right thing anyway)
args += ['-codec:{}'.format(stream['index']), stream['codec_name']]
# now add trim args
if cut_start:
args += ['-ss', str(cut_start)]
if cut_end:
args += ['-to', str(cut_end)]
# output to stdout as MPEG-TS
args += ['-f', 'mpegts', '-']
# run it
logging.info("Running segment cut with args: {}".format(" ".join(args)))
return subprocess.Popen(args, stdout=subprocess.PIPE)
def read_chunks(fileobj, chunk_size=16*1024):
"""Read fileobj until EOF, yielding chunk_size sized chunks of data."""
while True:
chunk = fileobj.read(chunk_size)
if not chunk:
break
yield chunk
def cut_segments(segments, start, end):
"""Yields chunks of a MPEGTS video file covering the exact timestamp range.
segments should be a list of segments as returned by get_best_segments().
This method works by only cutting the first and last segments, and concatenating the rest.
This only works if the same codec settings etc are used across all segments.
This should almost always be true but may cause weird results if not.
"""
# how far into the first segment to begin (if no hole at start)
cut_start = None
if segments[0] is not None:
cut_start = (start - segments[0].start).total_seconds()
if cut_start < 0:
raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated")
# how far into the final segment to end (if no hole at end)
cut_end = None
if segments[-1] is not None:
cut_end = (end - segments[-1].start).total_seconds()
if cut_end < 0:
raise ValueError("Last segment ends before cut end, but no trailing hole indicated")
# Set first and last only if they actually need cutting.
# Note this handles both the cut_start = None (no first segment to cut)
# and cut_start = 0 (first segment already starts on time) cases.
first = segments[0] if cut_start else None
last = segments[-1] if cut_end else None
for segment in segments:
if segment is None:
logging.debug("Skipping discontinuity while cutting")
# TODO: If we want to be safe against the possibility of codecs changing,
# we should check the streams_info() after each discontinuity.
continue
# note first and last might be the same segment.
# note a segment will only match if cutting actually needs to be done
# (ie. cut_start or cut_end is not 0)
if segment in (first, last):
proc = None
try:
proc = ffmpeg_cut_segment(
segment,
cut_start if segment == first else None,
cut_end if segment == last else None,
)
with closing(proc.stdout):
for chunk in read_chunks(proc.stdout):
yield chunk
proc.wait()
except Exception:
ex, ex_type, tb = sys.exc_info()
# try to clean up proc, ignoring errors
if proc is not None:
try:
proc.kill()
except OSError:
pass
raise ex, ex_type, tb
else:
# check if ffmpeg had errors
if proc.returncode != 0:
raise Exception(
"Error while streaming cut: ffmpeg exited {}".format(proc.returncode)
)
else:
# no cutting needed, just serve the file
with open(segment.path) as f:
for chunk in read_chunks(f):
yield chunk

@ -5,19 +5,16 @@ import functools
import json
import logging
import os
import shutil
import signal
from contextlib import closing
import dateutil.parser
import gevent
import gevent.backdoor
import prometheus_client as prom
from flask import Flask, url_for, request, abort, Response
from gevent import subprocess
from gevent.pywsgi import WSGIServer
from common import get_best_segments, PromLogCountsHandler, install_stacksampler
from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler
import generate_hls
from stats import stats, after_request
@ -237,8 +234,6 @@ def cut(stream, variant):
if any holes are detected, rather than producing a video with missing parts.
Set to true by passing "true" (case insensitive).
Even if holes are allowed, a 406 may result if the resulting video would be empty.
experimental: Optional, default false. If true, use the new, much faster, but experimental
method of cutting.
"""
start = dateutil.parser.parse(request.args['start'])
end = dateutil.parser.parse(request.args['end'])
@ -258,157 +253,10 @@ def cut(stream, variant):
if not allow_holes and None in segments:
return "Requested time range contains holes or is incomplete.", 406
segments = [segment for segment in segments if segment is not None]
if not segments:
if not any(segment is not None for segment in segments):
return "We have no content available within the requested time range.", 406
# how far into the first segment to begin
cut_start = max(0, (start - segments[0].start).total_seconds())
# calculate full uncut duration of content, ie. without holes.
full_duration = sum(segment.duration.total_seconds() for segment in segments)
# calculate how much of final segment should be cut off
cut_end = max(0, (segments[-1].end - end).total_seconds())
# finally, calculate actual output duration, which is what ffmpeg will use
duration = full_duration - cut_start - cut_end
# possibly defer to experiemntal version now that we've parsed our inputs.
# we'll clean up this whole flow later.
if request.args.get('experimental') == 'true':
return cut_experimental(segments, cut_start, cut_end)
def feed_input(pipe):
# pass each segment into ffmpeg's stdin in order, while outputing everything on stdout.
for segment in segments:
with open(segment.path) as f:
try:
shutil.copyfileobj(f, pipe)
except OSError as e:
# ignore EPIPE, as this just means the end cut meant we didn't need all input
if e.errno != errno.EPIPE:
raise
pipe.close()
def _cut():
ffmpeg = None
input_feeder = None
try:
ffmpeg = subprocess.Popen([
"ffmpeg",
"-i", "-", # read from stdin
"-ss", str(cut_start), # seconds to cut from start
"-t", str(duration), # total duration, which says when to cut at end
"-f", "mpegts", # output as MPEG-TS format
"-", # output to stdout
], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
input_feeder = gevent.spawn(feed_input, ffmpeg.stdin)
# stream the output until it is closed
while True:
chunk = ffmpeg.stdout.read(16*1024)
if not chunk:
break
yield chunk
# check if any errors occurred in input writing, or if ffmpeg exited non-success.
# raising an error mid-streaming-response will get flask to abort the response
# uncleanly, which tells the client that something went wrong.
if ffmpeg.wait() != 0:
raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode))
input_feeder.get() # re-raise any errors from feed_input()
finally:
# if something goes wrong, try to clean up ignoring errors
if input_feeder is not None:
input_feeder.kill()
if ffmpeg is not None and ffmpeg.poll() is None:
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
try:
action()
except (OSError, IOError):
pass
return Response(_cut(), mimetype='video/MP2T')
def cut_experimental(segments, cut_start, cut_end):
"""Experimental cutting method where we cut the first and last segments only,
then cat them all together."""
# Note: assumes codecs don't change from segment to segment.
def streams_info(segment):
"""Return ffprobe's info on streams as a list of dicts"""
output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path])
return json.loads(output)['streams']
def ffmpeg(segment, cut_start=None, cut_end=None):
"""Return a Popen object which is ffmpeg cutting the given segment"""
args = ['ffmpeg', '-i', segment.path]
# output from ffprobe is generally already sorted but let's be paranoid,
# because the order of map args matters.
for stream in sorted(streams_info(segment), key=lambda stream: stream['index']):
# map the same stream in the same position from input to output
args += ['-map', '0:{}'.format(stream['index'])]
if stream['codec_type'] in ('video', 'audio'):
# for non-metadata streams, make sure we use the same codec (metadata streams
# are a bit weirder, and ffmpeg will do the right thing anyway)
args += ['-codec:{}'.format(stream['index']), stream['codec_name']]
# now add trim args
if cut_start:
args += ['-ss', str(cut_start)]
if cut_end:
args += ['-to', str(cut_end)]
# output to stdout as MPEG-TS
args += ['-f', 'mpegts', '-']
# run it
logging.info("Running segment cut with args: {}".format(" ".join(args)))
return subprocess.Popen(args, stdout=subprocess.PIPE)
def chunks(fileobj, chunk_size=16*1024):
"""Read fileobj until EOF, yielding chunk_size sized chunks of data."""
while True:
chunk = fileobj.read(chunk_size)
if not chunk:
break
yield chunk
def _cut():
# set first and last only if they actually need cutting
first = segments[0] if cut_start else None
last = segments[-1] if cut_end else None
for segment in segments:
# note first and last might be the same segment.
# note a segment will only match if cutting actually needs to be done
# (ie. cut_start or cut_end is not 0)
if segment in (first, last):
proc = None
try:
proc = ffmpeg(
segment,
cut_start if segment == first else None,
cut_end if segment == last else None,
)
with closing(proc.stdout):
for chunk in chunks(proc.stdout):
yield chunk
proc.wait()
except Exception:
# try to clean up proc, ignoring errors
try:
proc.kill()
except OSError:
pass
else:
# check if ffmpeg had errors
if proc.returncode != 0:
raise Exception(
"Error while streaming cut: ffmpeg exited {}".format(proc.returncode)
)
else:
# no cutting needed, just serve the file
with open(segment.path) as f:
for chunk in chunks(f):
yield chunk
return Response(_cut(), mimetype='video/MP2T')
return Response(cut_segments(segments, start, end), mimetype='video/MP2T')
def main(host='0.0.0.0', port=8000, base_dir='.', backdoor_port=0):

Loading…
Cancel
Save