diff --git a/common/common/__init__.py b/common/common/__init__.py index 0b345e3..940ab13 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -7,7 +7,7 @@ import errno import os import random -from .segments import get_best_segments, parse_segment_path, SegmentInfo +from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo from .stats import timed, PromLogCountsHandler, install_stacksampler diff --git a/common/common/segments.py b/common/common/segments.py index 6e42acb..842319e 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -6,12 +6,15 @@ import base64 import datetime import errno import itertools +import json import logging import os import sys from collections import namedtuple +from contextlib import closing import gevent +from gevent import subprocess from .stats import timed @@ -232,3 +235,123 @@ def best_segments_by_start(hour): continue # no full segments, fall back to measuring partials. yield max(segments, key=lambda segment: os.stat(segment.path).st_size) + + +def streams_info(segment): + """Return ffprobe's info on streams as a list of dicts""" + output = subprocess.check_output([ + 'ffprobe', + '-hide_banner', '-loglevel', 'fatal', # suppress noisy output + '-of', 'json', '-show_streams', # get streams info as json + segment.path, + ]) + return json.loads(output)['streams'] + + +def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): + """Return a Popen object which is ffmpeg cutting the given segment""" + args = [ + 'ffmpeg', + '-hide_banner', '-loglevel', 'fatal', # suppress noisy output + '-i', segment.path, + ] + # output from ffprobe is generally already sorted but let's be paranoid, + # because the order of map args matters. + for stream in sorted(streams_info(segment), key=lambda stream: stream['index']): + # map the same stream in the same position from input to output + args += ['-map', '0:{}'.format(stream['index'])] + if stream['codec_type'] in ('video', 'audio'): + # for non-metadata streams, make sure we use the same codec (metadata streams + # are a bit weirder, and ffmpeg will do the right thing anyway) + args += ['-codec:{}'.format(stream['index']), stream['codec_name']] + # now add trim args + if cut_start: + args += ['-ss', str(cut_start)] + if cut_end: + args += ['-to', str(cut_end)] + # output to stdout as MPEG-TS + args += ['-f', 'mpegts', '-'] + # run it + logging.info("Running segment cut with args: {}".format(" ".join(args))) + return subprocess.Popen(args, stdout=subprocess.PIPE) + + +def read_chunks(fileobj, chunk_size=16*1024): + """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" + while True: + chunk = fileobj.read(chunk_size) + if not chunk: + break + yield chunk + + +def cut_segments(segments, start, end): + """Yields chunks of a MPEGTS video file covering the exact timestamp range. + segments should be a list of segments as returned by get_best_segments(). + This method works by only cutting the first and last segments, and concatenating the rest. + This only works if the same codec settings etc are used across all segments. + This should almost always be true but may cause weird results if not. + """ + + # how far into the first segment to begin (if no hole at start) + cut_start = None + if segments[0] is not None: + cut_start = (start - segments[0].start).total_seconds() + if cut_start < 0: + raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated") + + # how far into the final segment to end (if no hole at end) + cut_end = None + if segments[-1] is not None: + cut_end = (end - segments[-1].start).total_seconds() + if cut_end < 0: + raise ValueError("Last segment ends before cut end, but no trailing hole indicated") + + # Set first and last only if they actually need cutting. + # Note this handles both the cut_start = None (no first segment to cut) + # and cut_start = 0 (first segment already starts on time) cases. + first = segments[0] if cut_start else None + last = segments[-1] if cut_end else None + + for segment in segments: + if segment is None: + logging.debug("Skipping discontinuity while cutting") + # TODO: If we want to be safe against the possibility of codecs changing, + # we should check the streams_info() after each discontinuity. + continue + + # note first and last might be the same segment. + # note a segment will only match if cutting actually needs to be done + # (ie. cut_start or cut_end is not 0) + if segment in (first, last): + proc = None + try: + proc = ffmpeg_cut_segment( + segment, + cut_start if segment == first else None, + cut_end if segment == last else None, + ) + with closing(proc.stdout): + for chunk in read_chunks(proc.stdout): + yield chunk + proc.wait() + except Exception: + ex, ex_type, tb = sys.exc_info() + # try to clean up proc, ignoring errors + if proc is not None: + try: + proc.kill() + except OSError: + pass + raise ex, ex_type, tb + else: + # check if ffmpeg had errors + if proc.returncode != 0: + raise Exception( + "Error while streaming cut: ffmpeg exited {}".format(proc.returncode) + ) + else: + # no cutting needed, just serve the file + with open(segment.path) as f: + for chunk in read_chunks(f): + yield chunk diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 18b70c3..ff1e42f 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -5,19 +5,16 @@ import functools import json import logging import os -import shutil import signal -from contextlib import closing import dateutil.parser import gevent import gevent.backdoor import prometheus_client as prom from flask import Flask, url_for, request, abort, Response -from gevent import subprocess from gevent.pywsgi import WSGIServer -from common import get_best_segments, PromLogCountsHandler, install_stacksampler +from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler import generate_hls from stats import stats, after_request @@ -237,8 +234,6 @@ def cut(stream, variant): if any holes are detected, rather than producing a video with missing parts. Set to true by passing "true" (case insensitive). Even if holes are allowed, a 406 may result if the resulting video would be empty. - experimental: Optional, default false. If true, use the new, much faster, but experimental - method of cutting. """ start = dateutil.parser.parse(request.args['start']) end = dateutil.parser.parse(request.args['end']) @@ -258,157 +253,10 @@ def cut(stream, variant): if not allow_holes and None in segments: return "Requested time range contains holes or is incomplete.", 406 - segments = [segment for segment in segments if segment is not None] - - if not segments: + if not any(segment is not None for segment in segments): return "We have no content available within the requested time range.", 406 - # how far into the first segment to begin - cut_start = max(0, (start - segments[0].start).total_seconds()) - # calculate full uncut duration of content, ie. without holes. - full_duration = sum(segment.duration.total_seconds() for segment in segments) - # calculate how much of final segment should be cut off - cut_end = max(0, (segments[-1].end - end).total_seconds()) - # finally, calculate actual output duration, which is what ffmpeg will use - duration = full_duration - cut_start - cut_end - - # possibly defer to experiemntal version now that we've parsed our inputs. - # we'll clean up this whole flow later. - if request.args.get('experimental') == 'true': - return cut_experimental(segments, cut_start, cut_end) - - def feed_input(pipe): - # pass each segment into ffmpeg's stdin in order, while outputing everything on stdout. - for segment in segments: - with open(segment.path) as f: - try: - shutil.copyfileobj(f, pipe) - except OSError as e: - # ignore EPIPE, as this just means the end cut meant we didn't need all input - if e.errno != errno.EPIPE: - raise - pipe.close() - - def _cut(): - ffmpeg = None - input_feeder = None - try: - ffmpeg = subprocess.Popen([ - "ffmpeg", - "-i", "-", # read from stdin - "-ss", str(cut_start), # seconds to cut from start - "-t", str(duration), # total duration, which says when to cut at end - "-f", "mpegts", # output as MPEG-TS format - "-", # output to stdout - ], stdin=subprocess.PIPE, stdout=subprocess.PIPE) - input_feeder = gevent.spawn(feed_input, ffmpeg.stdin) - # stream the output until it is closed - while True: - chunk = ffmpeg.stdout.read(16*1024) - if not chunk: - break - yield chunk - # check if any errors occurred in input writing, or if ffmpeg exited non-success. - # raising an error mid-streaming-response will get flask to abort the response - # uncleanly, which tells the client that something went wrong. - if ffmpeg.wait() != 0: - raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) - input_feeder.get() # re-raise any errors from feed_input() - finally: - # if something goes wrong, try to clean up ignoring errors - if input_feeder is not None: - input_feeder.kill() - if ffmpeg is not None and ffmpeg.poll() is None: - for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): - try: - action() - except (OSError, IOError): - pass - - return Response(_cut(), mimetype='video/MP2T') - - -def cut_experimental(segments, cut_start, cut_end): - """Experimental cutting method where we cut the first and last segments only, - then cat them all together.""" - # Note: assumes codecs don't change from segment to segment. - - def streams_info(segment): - """Return ffprobe's info on streams as a list of dicts""" - output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path]) - return json.loads(output)['streams'] - - def ffmpeg(segment, cut_start=None, cut_end=None): - """Return a Popen object which is ffmpeg cutting the given segment""" - args = ['ffmpeg', '-i', segment.path] - # output from ffprobe is generally already sorted but let's be paranoid, - # because the order of map args matters. - for stream in sorted(streams_info(segment), key=lambda stream: stream['index']): - # map the same stream in the same position from input to output - args += ['-map', '0:{}'.format(stream['index'])] - if stream['codec_type'] in ('video', 'audio'): - # for non-metadata streams, make sure we use the same codec (metadata streams - # are a bit weirder, and ffmpeg will do the right thing anyway) - args += ['-codec:{}'.format(stream['index']), stream['codec_name']] - # now add trim args - if cut_start: - args += ['-ss', str(cut_start)] - if cut_end: - args += ['-to', str(cut_end)] - # output to stdout as MPEG-TS - args += ['-f', 'mpegts', '-'] - # run it - logging.info("Running segment cut with args: {}".format(" ".join(args))) - return subprocess.Popen(args, stdout=subprocess.PIPE) - - def chunks(fileobj, chunk_size=16*1024): - """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" - while True: - chunk = fileobj.read(chunk_size) - if not chunk: - break - yield chunk - - def _cut(): - # set first and last only if they actually need cutting - first = segments[0] if cut_start else None - last = segments[-1] if cut_end else None - for segment in segments: - # note first and last might be the same segment. - # note a segment will only match if cutting actually needs to be done - # (ie. cut_start or cut_end is not 0) - if segment in (first, last): - proc = None - try: - proc = ffmpeg( - segment, - cut_start if segment == first else None, - cut_end if segment == last else None, - ) - with closing(proc.stdout): - for chunk in chunks(proc.stdout): - yield chunk - proc.wait() - except Exception: - # try to clean up proc, ignoring errors - try: - proc.kill() - except OSError: - pass - else: - # check if ffmpeg had errors - if proc.returncode != 0: - raise Exception( - "Error while streaming cut: ffmpeg exited {}".format(proc.returncode) - ) - else: - # no cutting needed, just serve the file - with open(segment.path) as f: - for chunk in chunks(f): - yield chunk - - return Response(_cut(), mimetype='video/MP2T') - + return Response(cut_segments(segments, start, end), mimetype='video/MP2T') def main(host='0.0.0.0', port=8000, base_dir='.', backdoor_port=0):