diff --git a/common/common/segments.py b/common/common/segments.py index 52936c6..fcd04fe 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -421,6 +421,114 @@ def fast_cut_segments(segments, start, end): yield chunk +@timed('cut', cut_type='smart', normalize=lambda _, segments, start, end: (end - start).total_seconds()) +def smart_cut_segments(segments, start, end): + """As fast_cut_segments(), but concats segments by using ffmpeg's concat demuxer, + which should make timestamps etc behave correctly.""" + + # how far into the first segment to begin (if no hole at start) + cut_start = None + if segments[0] is not None: + cut_start = (start - segments[0].start).total_seconds() + if cut_start < 0: + raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated") + + # how far into the final segment to end (if no hole at end) + cut_end = None + if segments[-1] is not None: + cut_end = (end - segments[-1].start).total_seconds() + if cut_end < 0: + raise ValueError("Last segment ends before cut end, but no trailing hole indicated") + + # Set first and last only if they actually need cutting. + # Note this handles both the cut_start = None (no first segment to cut) + # and cut_start = 0 (first segment already starts on time) cases. + first = segments[0] if cut_start else None + last = segments[-1] if cut_end else None + + # We start up to three ffmpeg processes: + # Two that cut the first and last segments to size + # One that concats the outputs of the first two procs, along with all the segments from disk + # We pass the output pipe of the first two procs directly to the third. + + concat_entries = [] # the lines we will pass to the concat demuxer, either 'pipe:FD' or 'file:PATH' + pipes = [] # the pipes referenced in concat_entries + procs = [] # the ffmpeg processes + input_feeder = None + concat_proc = None + try: + for segment in segments: + if segment is None: + logging.debug("Skipping discontinuity while cutting") + # TODO: If we want to be safe against the possibility of codecs changing, + # we should check the streams_info() after each discontinuity. + continue + # note first and last might be the same segment. + # note a segment will only match if cutting actually needs to be done + # (ie. cut_start or cut_end is not 0) + if segment in (first, last): + proc = ffmpeg_cut_segment( + segment, + cut_start if segment == first else None, + cut_end if segment == last else None, + ) + procs.append(proc) + pipes.append(proc.stdout) + concat_entries.append('pipe:{}'.format(proc.stdout.fileno())) + else: + # just pass the file directly + concat_entries.append('file:{}'.format(segment.path)) + + concat_config = ''.join("file '{}'\n".format(entry) for entry in concat_entries) + args = [ + 'ffmpeg', + '-hide_banner', '-loglevel', 'error', # suppress noisy output + '-f', 'concat', '-', # read concat config from stdin + '-safe', 0, # trust weird filenames + '-protocol_whitelist', 'file,pipe', # need to explicitly allow pipe + '-c', 'copy', # don't re-encode the actual video + '-fflags', '+genpts', # this does something to do with timestamps? + '-', + ] + concat_proc = subprocess.Popen(args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + close_fds=False, # don't close non-stdin/out/err before execing, so that pipe: works + ) + procs.append(concat_proc) + + def write_config(): + concat_proc.stdin.write(concat_config) + concat_proc.stdin.close() + + input_feeder = gevent.spawn(write_config) + + # we can now close our output pipes since ffmpeg will read them directly + for pipe in pipes: + pipe.close() + + # now stream results + for chunk in read_chunks(concat_proc.stdout): + yield chunk + + # check if any errors occurred in input writing, or if anything exited non-success. + input_feeder.get() + for i, proc in enumerate(procs): + if proc.wait() != 0: + raise Exception("Smart cut ffmpeg process {}/{} exited {}".format(i, len(procs), proc.returncode)) + finally: + # if something goes wrong, try to clean up ignoring errors + if input_feeder is not None: + input_feeder.kill() + for proc in procs: + if proc.poll() is not None: + for action in [proc.kill, proc.stdout.close] + [proc.stdin.close if proc is concat_proc else []]: + try: + action() + except (OSError, IOError): + pass + + def feed_input(segments, pipe): """Write each segment's data into the given pipe in order. This is used to provide input to ffmpeg in a full cut.""" @@ -443,6 +551,10 @@ def full_cut_segments(segments, start, end, encode_args, stream=False): """If stream=true, assume encode_args gives a streamable format, and begin returning output immediately instead of waiting for ffmpeg to finish and buffering to disk.""" + + # Remove holes + segments = [segment for segment in segments if segment is not None] + # how far into the first segment to begin cut_start = max(0, (start - segments[0].start).total_seconds()) # duration diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 22c00b2..d80c44c 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -277,8 +277,10 @@ def cut(channel, quality): type = request.args.get('type', 'fast') if type == 'rough': return Response(rough_cut_segments(segments, start, end), mimetype='video/MP2T') - if type == 'fast': + elif type == 'fast': return Response(fast_cut_segments(segments, start, end), mimetype='video/MP2T') + elif type == 'smart': + return Response(smart_cut_segments(segments, start, end), mimetype='video/MP2T') elif type in ('mpegts', 'mp4'): if type == 'mp4': return "mp4 type has been disabled due to the load it causes", 400