diff --git a/common/common/segments.py b/common/common/segments.py index 34137b8..fd326b1 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -9,6 +9,7 @@ import itertools import json import logging import os +import shutil import sys from collections import namedtuple from contextlib import closing @@ -256,7 +257,9 @@ def streams_info(segment): def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): - """Return a Popen object which is ffmpeg cutting the given segment""" + """Return a Popen object which is ffmpeg cutting the given single segment. + This is used when doing a fast cut. + """ args = [ 'ffmpeg', '-hide_banner', '-loglevel', 'fatal', # suppress noisy output @@ -283,6 +286,22 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): return subprocess.Popen(args, stdout=subprocess.PIPE) +def ffmpeg_cut_stdin(cut_start, cut_end, encode_args): + """Return a Popen object which is ffmpeg cutting from stdin. + This is used when doing a full cut.""" + args = [ + 'ffmpeg', + '-hide_banner', '-loglevel', 'fatal', # suppress noisy output + '-i', '-' + '-ss', cut_start, + '-to', cut_end, + ] + list(encode_args) + [ + '-', # output to stdout + ] + logging.info("Running full cut with args: {}".format(" ".join(args))) + return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + def read_chunks(fileobj, chunk_size=16*1024): """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" while True: @@ -292,7 +311,7 @@ def read_chunks(fileobj, chunk_size=16*1024): yield chunk -def cut_segments(segments, start, end): +def fast_cut_segments(segments, start, end): """Yields chunks of a MPEGTS video file covering the exact timestamp range. segments should be a list of segments as returned by get_best_segments(). This method works by only cutting the first and last segments, and concatenating the rest. @@ -362,3 +381,47 @@ def cut_segments(segments, start, end): with open(segment.path) as f: for chunk in read_chunks(f): yield chunk + + +def feed_input(segments, pipe): + """Write each segment's data into the given pipe in order. + This is used to provide input to ffmpeg in a full cut.""" + for segment in segments: + with open(segment.path) as f: + try: + shutil.copyfileobj(f, pipe) + except OSError as e: + # ignore EPIPE, as this just means the end cut meant we didn't need all input + if e.errno != errno.EPIPE: + raise + pipe.close() + + +def full_cut_segments(segments, start, end, encode_args): + # how far into the first segment to begin + cut_start = max(0, (start - segments[0].start).total_seconds()) + # how much of final segment should be cut off + cut_end = max(0, (segments[-1].end - end).total_seconds()) + + ffmpeg = None + input_feeder = None + try: + ffmpeg = ffmpeg_cut_stdin(cut_start, cut_end, encode_args) + input_feeder = gevent.spawn(feed_input, ffmpeg.stdin) + # stream the output until it is closed + for chunk in read_chunks(ffmpeg.stdout): + yield chunk + # check if any errors occurred in input writing, or if ffmpeg exited non-success. + if ffmpeg.wait() != 0: + raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) + input_feeder.get() # re-raise any errors from feed_input() + finally: + # if something goes wrong, try to clean up ignoring errors + if input_feeder is not None: + input_feeder.kill() + if ffmpeg is not None and ffmpeg.poll() is None: + for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): + try: + action() + except (OSError, IOError): + pass diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 5dfe7c1..373ba07 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -16,7 +16,7 @@ from psycopg2 import sql import common from common.database import DBManager, query -from common.segments import get_best_segments, cut_segments, ContainsHoles +from common.segments import get_best_segments, fast_cut_segments, ContainsHoles from .upload_backends import Youtube, Local @@ -267,7 +267,7 @@ class Cutter(object): upload_backend = self.upload_locations[job.upload_location] self.logger.info("Cutting and uploading job {} to {}".format(format_job(job), upload_backend)) - cut = cut_segments(job.segments, job.video_start, job.video_end) + cut = fast_cut_segments(job.segments, job.video_start, job.video_end) # This flag tracks whether we've told requests to finalize the upload, # and serves to detect whether errors from the request call are recoverable. diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 3ecef7f..47e6d76 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -13,7 +13,7 @@ import prometheus_client as prom from flask import Flask, url_for, request, abort, Response from gevent.pywsgi import WSGIServer -from common import dateutil, get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler +from common import dateutil, get_best_segments, fast_cut_segments, PromLogCountsHandler, install_stacksampler from common.flask_stats import request_stats, after_request import generate_hls @@ -257,7 +257,7 @@ def cut(channel, quality): if not any(segment is not None for segment in segments): return "We have no content available within the requested time range.", 406 - return Response(cut_segments(segments, start, end), mimetype='video/MP2T') + return Response(fast_cut_segments(segments, start, end), mimetype='video/MP2T') def main(host='0.0.0.0', port=8000, base_dir='.', backdoor_port=0):