rewrite fast cuts to support transitions being allowed later

In theory there should be no change in actual output for no-transition cuts,
even though we're handling the logic in a very different way.

This doesn't actually allow transitions, but sets up most of what is needed
Mike Lang 10 months ago committed by Mike Lang
parent 066d10f94a
commit c8724a1e63

@ -448,6 +448,8 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
taking care to preserve stream order and other metadata.
Used when doing a fast cut.
logging.debug(f"Will cut segment for range ({cut_start}, {cut_end}): {segment.path}")
args = []
# output from ffprobe is generally already sorted but let's be paranoid,
# because the order of map args matters.
@ -593,30 +595,177 @@ def rough_cut_segments(segment_ranges, ranges):
This method works by simply concatenating all the segments, without any re-encoding.
for segments in segment_ranges:
for segment in segments:
if segment is None:
with open(segment.path, 'rb') as f:
for chunk in read_chunks(f):
yield chunk
yield from read_segments(segments)
@timed('cut', cut_type='fast', normalize=lambda ret, sr, ranges: range_total(ranges))
def fast_cut_segments(segment_ranges, ranges):
def fast_cut_segments(segment_ranges, ranges, transitions, smart=False):
"""Yields chunks of a MPEGTS video file covering the exact timestamp ranges.
segments should be a list of segment lists as returned by get_best_segments() for each range.
This method works by only cutting the first and last segments of each range,
and concatenating everything together. Ranges are cut between with no transitions.
This method works by only cutting the first and last segments of each range
(or more if there are longer transitions), and concatenating everything together.
This only works if the same codec settings etc are used across all segments.
This should almost always be true but may cause weird results if not.
if len(segment_ranges) != len(ranges):
raise ValueError("You need to provide one segment list for each range")
for segments, (start, end) in zip(segment_ranges, ranges):
# We could potentially optimize here by cutting all firsts/lasts in parallel
# instead of doing them in order, but that's probably not that helpful and would
# greatly complicate things.
yield from fast_cut_range(segments, start, end)
if not (len(segment_ranges) == len(ranges) == len(transitions) + 1):
raise ValueError("Cut input length mismatch: {} segment ranges, {} time ranges, {} transitions".format(
# We subdivide each range into a start, middle and end.
# The start covers any incoming transition + a partial first segment.
# The end covers any outgoing transition + a partial last segment.
# The middle is everything else, split on any discontinuities.
# Furthermore, one range's start may be cut together with the previous range's end
# if there is a transition.
# We collect iterators covering all of these sections into a single ordered list.
parts = []
# In each iteration we handle:
# - The transition, including previous range's end + this range's start, if there is one
# - Otherwise, we handle this range's start but assume previous range's end is already done
# - This range's middle
# - This range's end, unless it's part of the next transition.
# We pass the previous range's end segments and offset to the next iteration via prev_end
prev_end = None # None | (segments, offset)
for segments, (start, end), in_transition, out_transition in zip(
# pad transitions with an implicit "hard cut" before and after the actual ranges.
[None] + transitions,
transitions + [None],
# prev_end should be set if and only if there is an in transition
assert (in_transition is None) == (prev_end is None)
# Determine start and end cut points, unless we start/end with a discontinuity
cut_start = None
cut_end = None
if segments[0] is not None:
cut_start = (start - segments[0].start).total_seconds()
if cut_start < 0:
raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated")
if segments[-1] is not None:
cut_end = (end - segments[-1].start).total_seconds()
if cut_end < 0:
raise ValueError("Last segment ends before cut end, but no trailing hole indicated")
# Handle start
start_cut_end = None
if in_transition is not None:
video_type, duration = in_transition
# Get start segments, and split them from the full range of segments
# by finding the first segment that starts after the transition ends
transition_end = start + datetime.timedelta(seconds=duration)
for i, segment in enumerate(segments):
if segment is not None and segment.start > transition_end:
start_segments = segments[:i]
segments = segments[i:]
# Unlikely,but in this case the start transition is the entire range
# and we should include the end cut.
start_segments = segments
segments = []
start_cut_end = cut_end
cut_end = None
if len(start_segments) == 0:
raise ValueError(f"Could not find any video data for {duration}s transition into range ({start}, {end})")
raise NotImplementedError("no transitions yet")
# TODO append ffmpeg with transition, start_segments, prev_end, cut_start, start_cut_end
elif cut_start is not None and cut_start > 0:
# Cut start segment, unless it is a discontinuity or doesn't need cutting
segment = segments[0]
segments = segments[1:]
if not segments:
# The whole thing is only one segment, so also apply end cut
start_cut_end = cut_end
cut_end = None
ffmpeg = ffmpeg_cut_segment(segment, cut_start, start_cut_end)
pass # No cutting required at start
prev_end = None
# Handle end, but don't append it to parts list just yet.
end_cut_part = None
if out_transition is not None:
video_type, duration = out_transition
# Get end segments, and split them from the full range of segments
# by finding the last segment that ends before the transition starts
transition_start = end - datetime.timedelta(seconds=duration)
for i, segment in enumerate(segments):
if segment is not None and segment.end >= transition_start:
end_segments = segments[i:]
segments = segments[:i]
raise ValueError(f"Could not find any video data for {duration}s transition out of range ({start}, {end}) that is not already part of another transition")
# offset is how many seconds into end_segments the transition should start.
# We know that end_segments is not empty as otherwise we would have hit the for/else condition.
# We also know that end_segments[0] is not None as it is one of our conditions for picking it.
# It is however possible for end_segments[0] to start AFTER the transition should have started
# due to a hole immediately before. Technically a negative offset is still correct in this case,
# and ideally would cause ffmpeg to treat the transition as "already in progress" when we start.
# I haven't tested this behaviour but the alternative is to just fail so it's worth a shot.
offset = (transition_start - end_segments[0].start).total_seconds()
prev_end = end_segments, offset
elif cut_end is not None:
# Cut end segment, unless it is a discontinuity, doesn't need cutting, or was already
# cut as part of start (which sets cut_end to None).
assert segments # Either we should still have segments left, or the start cut should have included the end
segment = segments[-1]
segments = segments[:-1]
ffmpeg = ffmpeg_cut_segment(segment, cut_end=cut_end)
end_cut_part = read_from_stdout(ffmpeg)
# For each remaining segment in the middle, append a part per run without a discontinuity
run = []
for segment in segments:
if segment is None:
if run:
run = []
if run:
if end_cut_part is not None:
# after all that, double check the last range had no transition and we carried nothing over
assert prev_end is None
# yield from each part in order, applying fixts if needed
fixts = FixTSSequence() if smart else None
for part in parts:
for i, chunk in enumerate(part):
# Since long smart cuts can be CPU and disk bound for quite a while,
# yield to give other things a chance to run. Note this will run on the first
# iteration so every part switch also introduces an idle yield.
if i % 1000 == 0:
yield fixts.feed(chunk) if smart else chunk
if fixts:
def read_from_stdout(ffmpeg_context):
"""Takes a ffmpeg context manager as returned by ffmpeg_cut_many() and its wrapper functions,
and yields data chunks from ffmpeg's stdout."""
with ffmpeg_context as ffmpeg:
yield from read_chunks(ffmpeg.stdout)
def read_segments(segments):
"""Takes a list of segment files and yields data chunks from each one in order. Ignores holes."""
for segment in segments:
if segment is None:
with open(segment.path, 'rb') as f:
yield from read_chunks(f)
class FixTSSequence:
@ -640,85 +789,12 @@ class FixTSSequence:
@timed('cut', cut_type='smart', normalize=lambda ret, sr, ranges: range_total(ranges))
def smart_cut_segments(segment_ranges, ranges):
def smart_cut_segments(segment_ranges, ranges, transitions):
As per fast_cut_segments(), except we also do a "fix" pass over the resulting video stream
to re-time internal timestamps to avoid discontinuities and make sure the video starts at t=0.
if len(segment_ranges) != len(ranges):
raise ValueError("You need to provide one segment list for each range")
fixts = FixTSSequence()
for segments, (start, end) in zip(segment_ranges, ranges):
yield from fast_cut_range(segments, start, end, fixts=fixts)
@timed('cut_range', cut_type='fast', normalize=lambda _, segments, start, end, **k: (end - start).total_seconds())
def fast_cut_range(segments, start, end, fixts=None):
"""Does a fast cut for an individual range of segments.
If a FixTSSequence is given, fixes timestamps to avoid discontinuities
between cut segments and passed through segments.
# how far into the first segment to begin (if no hole at start)
cut_start = None
if segments[0] is not None:
cut_start = (start - segments[0].start).total_seconds()
if cut_start < 0:
raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated")
# how far into the final segment to end (if no hole at end)
cut_end = None
if segments[-1] is not None:
cut_end = (end - segments[-1].start).total_seconds()
if cut_end < 0:
raise ValueError("Last segment ends before cut end, but no trailing hole indicated")
# Set first and last only if they actually need cutting.
# Note this handles both the cut_start = None (no first segment to cut)
# and cut_start = 0 (first segment already starts on time) cases.
first = segments[0] if cut_start else None
last = segments[-1] if cut_end else None
for i, segment in enumerate(segments):
# Since long smart cuts can be CPU and disk bound for quite a while,
# yield to give other things a chance to run.
if i % 1000 == 0:
if segment is None:
logging.debug("Skipping discontinuity while cutting")
# TODO: If we want to be safe against the possibility of codecs changing,
# we should check the streams_info() after each discontinuity.
# To keep our output clean, we reset our FixTS so the output doesn't contain
# the discontinuity. The video just cuts to the next segment.
if fixts:
# note first and last might be the same segment.
# note a segment will only match if cutting actually needs to be done
# (ie. cut_start or cut_end is not 0)
if segment in (first, last):
if fixts:
with ffmpeg_cut_segment(
cut_start if segment == first else None,
cut_end if segment == last else None,
) as ffmpeg:
for chunk in read_chunks(ffmpeg.stdout):
yield fixts.feed(chunk) if fixts else chunk
if fixts:
# no cutting needed, just serve the file
with open(segment.path, 'rb') as f:
for chunk in read_chunks(f):
yield fixts.feed(chunk) if fixts else chunk
if fixts:
# check for errors and indicate range is finished
return fast_cut_segments(segment_ranges, ranges, transitions, smart=True)
def feed_input(segments, pipe):

@ -396,23 +396,24 @@ class Cutter(object):
nonlocal upload_finished
if upload_backend.encoding_settings in ("fast", "smart", "archive"):
if upload_backend.encoding_settings in ("fast", "smart"):
self.logger.debug(f"Using {upload_backend.encoding_settings} cut")
if any(transition is not None for transition in job.video_transitions):
raise ValueError("Fast cuts do not support complex transitions")
cut_fn = {
"fast": fast_cut_segments,
"smart": smart_cut_segments,
# Note archive cuts return a list of filenames instead of data chunks.
# We assume the upload location expects this.
# We use segments_path as a tempdir path under the assumption that:
# a) it has plenty of space
# b) for a Local upload location, it will be on the same filesystem as the
# final desired path.
"archive": lambda sr, vr: archive_cut_segments(sr, vr, self.segments_path),
cut = cut_fn(job.segment_ranges, job.video_ranges)
cut = cut_fn(job.segment_ranges, job.video_ranges, job.video_transitions)
elif upload_backend.encoding_settings == "archive":
self.logger.debug("Using archive cut")
if any(transition is not None for transition in job.video_transitions):
raise ValueError("Archive cuts do not support complex transitions")
# Note archive cuts return a list of filenames instead of data chunks.
# We assume the upload location expects this.
# We use segments_path as a tempdir path under the assumption that:
# a) it has plenty of space
# b) for a Local upload location, it will be on the same filesystem as the
# final desired path.
cut = archive_cut_segments(sr, vr, self.segments_path)
self.logger.debug("Using encoding settings for {} cut: {}".format(
"streamable" if upload_backend.encoding_streamable else "non-streamable",

@ -415,13 +415,9 @@ def cut(channel, quality):
return "Cannot do rough cut with transitions", 400
return Response(rough_cut_segments(segment_ranges, ranges), mimetype='video/MP2T')
elif type == 'fast':
if has_transitions:
return "Cannot do fast cut with transitions", 400
return Response(fast_cut_segments(segment_ranges, ranges), mimetype='video/MP2T')
return Response(fast_cut_segments(segment_ranges, ranges, transitions), mimetype='video/MP2T')
elif type == 'smart':
if has_transitions:
return "Cannot do smart cut with transitions", 400
return Response(smart_cut_segments(segment_ranges, ranges), mimetype='video/MP2T')
return Response(smart_cut_segments(segment_ranges, ranges, transitions), mimetype='video/MP2T')
elif type in ('mpegts', 'mp4'):
if type == 'mp4':
return "mp4 type has been disabled due to the load it causes", 400
