diff --git a/common/common/segments.py b/common/common/segments.py index 0f068c8..4f9947c 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -448,6 +448,8 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): taking care to preserve stream order and other metadata. Used when doing a fast cut. """ + logging.debug(f"Will cut segment for range ({cut_start}, {cut_end}): {segment.path}") + args = [] # output from ffprobe is generally already sorted but let's be paranoid, # because the order of map args matters. @@ -593,30 +595,177 @@ def rough_cut_segments(segment_ranges, ranges): This method works by simply concatenating all the segments, without any re-encoding. """ for segments in segment_ranges: - for segment in segments: - if segment is None: - continue - with open(segment.path, 'rb') as f: - for chunk in read_chunks(f): - yield chunk + yield from read_segments(segments) @timed('cut', cut_type='fast', normalize=lambda ret, sr, ranges: range_total(ranges)) -def fast_cut_segments(segment_ranges, ranges): +def fast_cut_segments(segment_ranges, ranges, transitions, smart=False): """Yields chunks of a MPEGTS video file covering the exact timestamp ranges. segments should be a list of segment lists as returned by get_best_segments() for each range. - This method works by only cutting the first and last segments of each range, - and concatenating everything together. Ranges are cut between with no transitions. + This method works by only cutting the first and last segments of each range + (or more if there are longer transitions), and concatenating everything together. This only works if the same codec settings etc are used across all segments. This should almost always be true but may cause weird results if not. """ - if len(segment_ranges) != len(ranges): - raise ValueError("You need to provide one segment list for each range") - for segments, (start, end) in zip(segment_ranges, ranges): - # We could potentially optimize here by cutting all firsts/lasts in parallel - # instead of doing them in order, but that's probably not that helpful and would - # greatly complicate things. - yield from fast_cut_range(segments, start, end) + if not (len(segment_ranges) == len(ranges) == len(transitions) + 1): + raise ValueError("Cut input length mismatch: {} segment ranges, {} time ranges, {} transitions".format( + len(segment_ranges), + len(ranges), + len(transitions), + )) + + # We subdivide each range into a start, middle and end. + # The start covers any incoming transition + a partial first segment. + # The end covers any outgoing transition + a partial last segment. + # The middle is everything else, split on any discontinuities. + # Furthermore, one range's start may be cut together with the previous range's end + # if there is a transition. + # We collect iterators covering all of these sections into a single ordered list. + parts = [] + # In each iteration we handle: + # - The transition, including previous range's end + this range's start, if there is one + # - Otherwise, we handle this range's start but assume previous range's end is already done + # - This range's middle + # - This range's end, unless it's part of the next transition. + # We pass the previous range's end segments and offset to the next iteration via prev_end + prev_end = None # None | (segments, offset) + for segments, (start, end), in_transition, out_transition in zip( + segment_ranges, + ranges, + # pad transitions with an implicit "hard cut" before and after the actual ranges. + [None] + transitions, + transitions + [None], + ): + # prev_end should be set if and only if there is an in transition + assert (in_transition is None) == (prev_end is None) + + # Determine start and end cut points, unless we start/end with a discontinuity + cut_start = None + cut_end = None + if segments[0] is not None: + cut_start = (start - segments[0].start).total_seconds() + if cut_start < 0: + raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated") + if segments[-1] is not None: + cut_end = (end - segments[-1].start).total_seconds() + if cut_end < 0: + raise ValueError("Last segment ends before cut end, but no trailing hole indicated") + + # Handle start + start_cut_end = None + if in_transition is not None: + video_type, duration = in_transition + # Get start segments, and split them from the full range of segments + # by finding the first segment that starts after the transition ends + transition_end = start + datetime.timedelta(seconds=duration) + for i, segment in enumerate(segments): + if segment is not None and segment.start > transition_end: + start_segments = segments[:i] + segments = segments[i:] + break + else: + # Unlikely,but in this case the start transition is the entire range + # and we should include the end cut. + start_segments = segments + segments = [] + start_cut_end = cut_end + cut_end = None + if len(start_segments) == 0: + raise ValueError(f"Could not find any video data for {duration}s transition into range ({start}, {end})") + raise NotImplementedError("no transitions yet") + # TODO append ffmpeg with transition, start_segments, prev_end, cut_start, start_cut_end + elif cut_start is not None and cut_start > 0: + # Cut start segment, unless it is a discontinuity or doesn't need cutting + segment = segments[0] + segments = segments[1:] + if not segments: + # The whole thing is only one segment, so also apply end cut + start_cut_end = cut_end + cut_end = None + ffmpeg = ffmpeg_cut_segment(segment, cut_start, start_cut_end) + parts.append(read_from_stdout(ffmpeg)) + else: + pass # No cutting required at start + + prev_end = None + + # Handle end, but don't append it to parts list just yet. + end_cut_part = None + if out_transition is not None: + video_type, duration = out_transition + # Get end segments, and split them from the full range of segments + # by finding the last segment that ends before the transition starts + transition_start = end - datetime.timedelta(seconds=duration) + for i, segment in enumerate(segments): + if segment is not None and segment.end >= transition_start: + end_segments = segments[i:] + segments = segments[:i] + else: + raise ValueError(f"Could not find any video data for {duration}s transition out of range ({start}, {end}) that is not already part of another transition") + # offset is how many seconds into end_segments the transition should start. + # We know that end_segments is not empty as otherwise we would have hit the for/else condition. + # We also know that end_segments[0] is not None as it is one of our conditions for picking it. + # It is however possible for end_segments[0] to start AFTER the transition should have started + # due to a hole immediately before. Technically a negative offset is still correct in this case, + # and ideally would cause ffmpeg to treat the transition as "already in progress" when we start. + # I haven't tested this behaviour but the alternative is to just fail so it's worth a shot. + offset = (transition_start - end_segments[0].start).total_seconds() + prev_end = end_segments, offset + elif cut_end is not None: + # Cut end segment, unless it is a discontinuity, doesn't need cutting, or was already + # cut as part of start (which sets cut_end to None). + assert segments # Either we should still have segments left, or the start cut should have included the end + segment = segments[-1] + segments = segments[:-1] + ffmpeg = ffmpeg_cut_segment(segment, cut_end=cut_end) + end_cut_part = read_from_stdout(ffmpeg) + + # For each remaining segment in the middle, append a part per run without a discontinuity + run = [] + for segment in segments: + if segment is None: + if run: + parts.append(read_segments(run)) + run = [] + else: + run.append(segment) + if run: + parts.append(read_segments(run)) + + if end_cut_part is not None: + parts.append(end_cut_part) + + # after all that, double check the last range had no transition and we carried nothing over + assert prev_end is None + + # yield from each part in order, applying fixts if needed + fixts = FixTSSequence() if smart else None + for part in parts: + for i, chunk in enumerate(part): + # Since long smart cuts can be CPU and disk bound for quite a while, + # yield to give other things a chance to run. Note this will run on the first + # iteration so every part switch also introduces an idle yield. + if i % 1000 == 0: + gevent.idle() + yield fixts.feed(chunk) if smart else chunk + if fixts: + fixts.next() + + +def read_from_stdout(ffmpeg_context): + """Takes a ffmpeg context manager as returned by ffmpeg_cut_many() and its wrapper functions, + and yields data chunks from ffmpeg's stdout.""" + with ffmpeg_context as ffmpeg: + yield from read_chunks(ffmpeg.stdout) + + +def read_segments(segments): + """Takes a list of segment files and yields data chunks from each one in order. Ignores holes.""" + for segment in segments: + if segment is None: + continue + with open(segment.path, 'rb') as f: + yield from read_chunks(f) class FixTSSequence: @@ -640,85 +789,12 @@ class FixTSSequence: @timed('cut', cut_type='smart', normalize=lambda ret, sr, ranges: range_total(ranges)) -def smart_cut_segments(segment_ranges, ranges): +def smart_cut_segments(segment_ranges, ranges, transitions): """ As per fast_cut_segments(), except we also do a "fix" pass over the resulting video stream to re-time internal timestamps to avoid discontinuities and make sure the video starts at t=0. """ - if len(segment_ranges) != len(ranges): - raise ValueError("You need to provide one segment list for each range") - fixts = FixTSSequence() - for segments, (start, end) in zip(segment_ranges, ranges): - yield from fast_cut_range(segments, start, end, fixts=fixts) - - -@timed('cut_range', cut_type='fast', normalize=lambda _, segments, start, end, **k: (end - start).total_seconds()) -def fast_cut_range(segments, start, end, fixts=None): - """Does a fast cut for an individual range of segments. - If a FixTSSequence is given, fixes timestamps to avoid discontinuities - between cut segments and passed through segments. - """ - - # how far into the first segment to begin (if no hole at start) - cut_start = None - if segments[0] is not None: - cut_start = (start - segments[0].start).total_seconds() - if cut_start < 0: - raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated") - - # how far into the final segment to end (if no hole at end) - cut_end = None - if segments[-1] is not None: - cut_end = (end - segments[-1].start).total_seconds() - if cut_end < 0: - raise ValueError("Last segment ends before cut end, but no trailing hole indicated") - - # Set first and last only if they actually need cutting. - # Note this handles both the cut_start = None (no first segment to cut) - # and cut_start = 0 (first segment already starts on time) cases. - first = segments[0] if cut_start else None - last = segments[-1] if cut_end else None - - for i, segment in enumerate(segments): - # Since long smart cuts can be CPU and disk bound for quite a while, - # yield to give other things a chance to run. - if i % 1000 == 0: - gevent.idle() - - if segment is None: - logging.debug("Skipping discontinuity while cutting") - # TODO: If we want to be safe against the possibility of codecs changing, - # we should check the streams_info() after each discontinuity. - - # To keep our output clean, we reset our FixTS so the output doesn't contain - # the discontinuity. The video just cuts to the next segment. - if fixts: - fixts.next() - continue - - # note first and last might be the same segment. - # note a segment will only match if cutting actually needs to be done - # (ie. cut_start or cut_end is not 0) - if segment in (first, last): - if fixts: - fixts.next() - with ffmpeg_cut_segment( - segment, - cut_start if segment == first else None, - cut_end if segment == last else None, - ) as ffmpeg: - for chunk in read_chunks(ffmpeg.stdout): - yield fixts.feed(chunk) if fixts else chunk - if fixts: - fixts.next() - else: - # no cutting needed, just serve the file - with open(segment.path, 'rb') as f: - for chunk in read_chunks(f): - yield fixts.feed(chunk) if fixts else chunk - if fixts: - # check for errors and indicate range is finished - fixts.next() + return fast_cut_segments(segment_ranges, ranges, transitions, smart=True) def feed_input(segments, pipe): diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 887a4c6..fdb781d 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -396,23 +396,24 @@ class Cutter(object): nonlocal upload_finished try: - if upload_backend.encoding_settings in ("fast", "smart", "archive"): + if upload_backend.encoding_settings in ("fast", "smart"): self.logger.debug(f"Using {upload_backend.encoding_settings} cut") - if any(transition is not None for transition in job.video_transitions): - raise ValueError("Fast cuts do not support complex transitions") - cut_fn = { "fast": fast_cut_segments, "smart": smart_cut_segments, - # Note archive cuts return a list of filenames instead of data chunks. - # We assume the upload location expects this. - # We use segments_path as a tempdir path under the assumption that: - # a) it has plenty of space - # b) for a Local upload location, it will be on the same filesystem as the - # final desired path. - "archive": lambda sr, vr: archive_cut_segments(sr, vr, self.segments_path), }[upload_backend.encoding_settings] - cut = cut_fn(job.segment_ranges, job.video_ranges) + cut = cut_fn(job.segment_ranges, job.video_ranges, job.video_transitions) + elif upload_backend.encoding_settings == "archive": + self.logger.debug("Using archive cut") + if any(transition is not None for transition in job.video_transitions): + raise ValueError("Archive cuts do not support complex transitions") + # Note archive cuts return a list of filenames instead of data chunks. + # We assume the upload location expects this. + # We use segments_path as a tempdir path under the assumption that: + # a) it has plenty of space + # b) for a Local upload location, it will be on the same filesystem as the + # final desired path. + cut = archive_cut_segments(sr, vr, self.segments_path) else: self.logger.debug("Using encoding settings for {} cut: {}".format( "streamable" if upload_backend.encoding_streamable else "non-streamable", diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 50aed08..69f665b 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -415,13 +415,9 @@ def cut(channel, quality): return "Cannot do rough cut with transitions", 400 return Response(rough_cut_segments(segment_ranges, ranges), mimetype='video/MP2T') elif type == 'fast': - if has_transitions: - return "Cannot do fast cut with transitions", 400 - return Response(fast_cut_segments(segment_ranges, ranges), mimetype='video/MP2T') + return Response(fast_cut_segments(segment_ranges, ranges, transitions), mimetype='video/MP2T') elif type == 'smart': - if has_transitions: - return "Cannot do smart cut with transitions", 400 - return Response(smart_cut_segments(segment_ranges, ranges), mimetype='video/MP2T') + return Response(smart_cut_segments(segment_ranges, ranges, transitions), mimetype='video/MP2T') elif type in ('mpegts', 'mp4'): if type == 'mp4': return "mp4 type has been disabled due to the load it causes", 400