From 3371502ea9c2a33523ab0ae7ef5e277efe47ec34 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 10 Jun 2019 03:50:45 -0700 Subject: [PATCH] Port existing cutting code from restreamer into common Note this moves over the 'experimental' cutter and deletes the original cutter that concatenates entire videos before cutting. We may eventually want to revive that method if the experimental cutter turns out to introduce too many issues. We move most of the code over verbatim, but adjust it such that it acts as a generic iterator that can be used in a variety of contexts. Some other changes made during the move include telling ffmpeg to be quieter (don't output version info and junk, only log if something goes wrong), and avoiding errors during cleanup. --- common/common/__init__.py | 2 +- common/common/segments.py | 123 ++++++++++++++++++++++++++ restreamer/restreamer/main.py | 158 +--------------------------------- 3 files changed, 127 insertions(+), 156 deletions(-) diff --git a/common/common/__init__.py b/common/common/__init__.py index 0b345e3..940ab13 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -7,7 +7,7 @@ import errno import os import random -from .segments import get_best_segments, parse_segment_path, SegmentInfo +from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo from .stats import timed, PromLogCountsHandler, install_stacksampler diff --git a/common/common/segments.py b/common/common/segments.py index 6e42acb..842319e 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -6,12 +6,15 @@ import base64 import datetime import errno import itertools +import json import logging import os import sys from collections import namedtuple +from contextlib import closing import gevent +from gevent import subprocess from .stats import timed @@ -232,3 +235,123 @@ def best_segments_by_start(hour): continue # no full segments, fall back to measuring partials. yield max(segments, key=lambda segment: os.stat(segment.path).st_size) + + +def streams_info(segment): + """Return ffprobe's info on streams as a list of dicts""" + output = subprocess.check_output([ + 'ffprobe', + '-hide_banner', '-loglevel', 'fatal', # suppress noisy output + '-of', 'json', '-show_streams', # get streams info as json + segment.path, + ]) + return json.loads(output)['streams'] + + +def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): + """Return a Popen object which is ffmpeg cutting the given segment""" + args = [ + 'ffmpeg', + '-hide_banner', '-loglevel', 'fatal', # suppress noisy output + '-i', segment.path, + ] + # output from ffprobe is generally already sorted but let's be paranoid, + # because the order of map args matters. + for stream in sorted(streams_info(segment), key=lambda stream: stream['index']): + # map the same stream in the same position from input to output + args += ['-map', '0:{}'.format(stream['index'])] + if stream['codec_type'] in ('video', 'audio'): + # for non-metadata streams, make sure we use the same codec (metadata streams + # are a bit weirder, and ffmpeg will do the right thing anyway) + args += ['-codec:{}'.format(stream['index']), stream['codec_name']] + # now add trim args + if cut_start: + args += ['-ss', str(cut_start)] + if cut_end: + args += ['-to', str(cut_end)] + # output to stdout as MPEG-TS + args += ['-f', 'mpegts', '-'] + # run it + logging.info("Running segment cut with args: {}".format(" ".join(args))) + return subprocess.Popen(args, stdout=subprocess.PIPE) + + +def read_chunks(fileobj, chunk_size=16*1024): + """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" + while True: + chunk = fileobj.read(chunk_size) + if not chunk: + break + yield chunk + + +def cut_segments(segments, start, end): + """Yields chunks of a MPEGTS video file covering the exact timestamp range. + segments should be a list of segments as returned by get_best_segments(). + This method works by only cutting the first and last segments, and concatenating the rest. + This only works if the same codec settings etc are used across all segments. + This should almost always be true but may cause weird results if not. + """ + + # how far into the first segment to begin (if no hole at start) + cut_start = None + if segments[0] is not None: + cut_start = (start - segments[0].start).total_seconds() + if cut_start < 0: + raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated") + + # how far into the final segment to end (if no hole at end) + cut_end = None + if segments[-1] is not None: + cut_end = (end - segments[-1].start).total_seconds() + if cut_end < 0: + raise ValueError("Last segment ends before cut end, but no trailing hole indicated") + + # Set first and last only if they actually need cutting. + # Note this handles both the cut_start = None (no first segment to cut) + # and cut_start = 0 (first segment already starts on time) cases. + first = segments[0] if cut_start else None + last = segments[-1] if cut_end else None + + for segment in segments: + if segment is None: + logging.debug("Skipping discontinuity while cutting") + # TODO: If we want to be safe against the possibility of codecs changing, + # we should check the streams_info() after each discontinuity. + continue + + # note first and last might be the same segment. + # note a segment will only match if cutting actually needs to be done + # (ie. cut_start or cut_end is not 0) + if segment in (first, last): + proc = None + try: + proc = ffmpeg_cut_segment( + segment, + cut_start if segment == first else None, + cut_end if segment == last else None, + ) + with closing(proc.stdout): + for chunk in read_chunks(proc.stdout): + yield chunk + proc.wait() + except Exception: + ex, ex_type, tb = sys.exc_info() + # try to clean up proc, ignoring errors + if proc is not None: + try: + proc.kill() + except OSError: + pass + raise ex, ex_type, tb + else: + # check if ffmpeg had errors + if proc.returncode != 0: + raise Exception( + "Error while streaming cut: ffmpeg exited {}".format(proc.returncode) + ) + else: + # no cutting needed, just serve the file + with open(segment.path) as f: + for chunk in read_chunks(f): + yield chunk diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 18b70c3..ff1e42f 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -5,19 +5,16 @@ import functools import json import logging import os -import shutil import signal -from contextlib import closing import dateutil.parser import gevent import gevent.backdoor import prometheus_client as prom from flask import Flask, url_for, request, abort, Response -from gevent import subprocess from gevent.pywsgi import WSGIServer -from common import get_best_segments, PromLogCountsHandler, install_stacksampler +from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler import generate_hls from stats import stats, after_request @@ -237,8 +234,6 @@ def cut(stream, variant): if any holes are detected, rather than producing a video with missing parts. Set to true by passing "true" (case insensitive). Even if holes are allowed, a 406 may result if the resulting video would be empty. - experimental: Optional, default false. If true, use the new, much faster, but experimental - method of cutting. """ start = dateutil.parser.parse(request.args['start']) end = dateutil.parser.parse(request.args['end']) @@ -258,157 +253,10 @@ def cut(stream, variant): if not allow_holes and None in segments: return "Requested time range contains holes or is incomplete.", 406 - segments = [segment for segment in segments if segment is not None] - - if not segments: + if not any(segment is not None for segment in segments): return "We have no content available within the requested time range.", 406 - # how far into the first segment to begin - cut_start = max(0, (start - segments[0].start).total_seconds()) - # calculate full uncut duration of content, ie. without holes. - full_duration = sum(segment.duration.total_seconds() for segment in segments) - # calculate how much of final segment should be cut off - cut_end = max(0, (segments[-1].end - end).total_seconds()) - # finally, calculate actual output duration, which is what ffmpeg will use - duration = full_duration - cut_start - cut_end - - # possibly defer to experiemntal version now that we've parsed our inputs. - # we'll clean up this whole flow later. - if request.args.get('experimental') == 'true': - return cut_experimental(segments, cut_start, cut_end) - - def feed_input(pipe): - # pass each segment into ffmpeg's stdin in order, while outputing everything on stdout. - for segment in segments: - with open(segment.path) as f: - try: - shutil.copyfileobj(f, pipe) - except OSError as e: - # ignore EPIPE, as this just means the end cut meant we didn't need all input - if e.errno != errno.EPIPE: - raise - pipe.close() - - def _cut(): - ffmpeg = None - input_feeder = None - try: - ffmpeg = subprocess.Popen([ - "ffmpeg", - "-i", "-", # read from stdin - "-ss", str(cut_start), # seconds to cut from start - "-t", str(duration), # total duration, which says when to cut at end - "-f", "mpegts", # output as MPEG-TS format - "-", # output to stdout - ], stdin=subprocess.PIPE, stdout=subprocess.PIPE) - input_feeder = gevent.spawn(feed_input, ffmpeg.stdin) - # stream the output until it is closed - while True: - chunk = ffmpeg.stdout.read(16*1024) - if not chunk: - break - yield chunk - # check if any errors occurred in input writing, or if ffmpeg exited non-success. - # raising an error mid-streaming-response will get flask to abort the response - # uncleanly, which tells the client that something went wrong. - if ffmpeg.wait() != 0: - raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) - input_feeder.get() # re-raise any errors from feed_input() - finally: - # if something goes wrong, try to clean up ignoring errors - if input_feeder is not None: - input_feeder.kill() - if ffmpeg is not None and ffmpeg.poll() is None: - for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): - try: - action() - except (OSError, IOError): - pass - - return Response(_cut(), mimetype='video/MP2T') - - -def cut_experimental(segments, cut_start, cut_end): - """Experimental cutting method where we cut the first and last segments only, - then cat them all together.""" - # Note: assumes codecs don't change from segment to segment. - - def streams_info(segment): - """Return ffprobe's info on streams as a list of dicts""" - output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path]) - return json.loads(output)['streams'] - - def ffmpeg(segment, cut_start=None, cut_end=None): - """Return a Popen object which is ffmpeg cutting the given segment""" - args = ['ffmpeg', '-i', segment.path] - # output from ffprobe is generally already sorted but let's be paranoid, - # because the order of map args matters. - for stream in sorted(streams_info(segment), key=lambda stream: stream['index']): - # map the same stream in the same position from input to output - args += ['-map', '0:{}'.format(stream['index'])] - if stream['codec_type'] in ('video', 'audio'): - # for non-metadata streams, make sure we use the same codec (metadata streams - # are a bit weirder, and ffmpeg will do the right thing anyway) - args += ['-codec:{}'.format(stream['index']), stream['codec_name']] - # now add trim args - if cut_start: - args += ['-ss', str(cut_start)] - if cut_end: - args += ['-to', str(cut_end)] - # output to stdout as MPEG-TS - args += ['-f', 'mpegts', '-'] - # run it - logging.info("Running segment cut with args: {}".format(" ".join(args))) - return subprocess.Popen(args, stdout=subprocess.PIPE) - - def chunks(fileobj, chunk_size=16*1024): - """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" - while True: - chunk = fileobj.read(chunk_size) - if not chunk: - break - yield chunk - - def _cut(): - # set first and last only if they actually need cutting - first = segments[0] if cut_start else None - last = segments[-1] if cut_end else None - for segment in segments: - # note first and last might be the same segment. - # note a segment will only match if cutting actually needs to be done - # (ie. cut_start or cut_end is not 0) - if segment in (first, last): - proc = None - try: - proc = ffmpeg( - segment, - cut_start if segment == first else None, - cut_end if segment == last else None, - ) - with closing(proc.stdout): - for chunk in chunks(proc.stdout): - yield chunk - proc.wait() - except Exception: - # try to clean up proc, ignoring errors - try: - proc.kill() - except OSError: - pass - else: - # check if ffmpeg had errors - if proc.returncode != 0: - raise Exception( - "Error while streaming cut: ffmpeg exited {}".format(proc.returncode) - ) - else: - # no cutting needed, just serve the file - with open(segment.path) as f: - for chunk in chunks(f): - yield chunk - - return Response(_cut(), mimetype='video/MP2T') - + return Response(cut_segments(segments, start, end), mimetype='video/MP2T') def main(host='0.0.0.0', port=8000, base_dir='.', backdoor_port=0):