Hacky workaround for segments that can't be played independently

Mike Lang 1 year ago
parent 590e056f7a
commit aa89c358da

@ -33,7 +33,7 @@ def unpadded_b64_decode(s):
class SegmentInfo(
namedtuple('SegmentInfoBase', [
'path', 'channel', 'quality', 'start', 'duration', 'type', 'hash'
'paths', 'channel', 'quality', 'start', 'duration', 'type', 'hash'
"""Info parsed from a segment path, including original path.
@ -45,6 +45,9 @@ class SegmentInfo(
def is_partial(self):
"""Note that suspect is considered partial"""
return self.type != "full"
def path(self):
return self.paths[0]
def parse_segment_timestamp(hour_str, min_str):
@ -84,7 +87,7 @@ def parse_segment_path(path):
hash = None if type == 'temp' else unpadded_b64_decode(hash)
start = None if hour is None else parse_segment_timestamp(hour, time)
return SegmentInfo(
path = path,
paths = (path,),
channel = channel,
quality = quality,
start = start,
@ -101,12 +104,26 @@ class ContainsHoles(Exception):
"""Raised by get_best_segments() when a hole is found and allow_holes is False"""
def merge_segments(a, b):
"""Extremely hacky thing that merges two SegmentInfos into one "segment" with two paths.
Assumes the segments have the same channel, quality, and no gap between them.
TYPES = ["full", "suspect", "partial"]
return a._replace(
paths = a.paths + b.paths,
duration = a.duration + b.duration,
hash = None,
# Use the worst of the two types
type = TYPES[max(TYPES.index(a.type), TYPES.index(b.type))],
hours_path=lambda ret, hours_path, *args, **kwargs: hours_path,
has_holes=lambda ret, *args, **kwargs: None in ret,
normalize=lambda ret, *args, **kwargs: len([x for x in ret if x is not None]),
def get_best_segments(hours_path, start, end, allow_holes=True):
def get_best_segments(hours_path, start, end, allow_holes=True, no_hack=False):
"""Return a list of the best sequence of non-overlapping segments
we have for a given time range. Hours path should be the directory containing hour directories.
Time args start and end should be given as datetime objects.
@ -145,6 +162,7 @@ def get_best_segments(hours_path, start, end, allow_holes=True):
# to these types is exact, and all operations on them are exact, so all operations are exact.
result = []
prev_segment = None
for hour in hour_paths_for_range(hours_path, start, end):
# Especially when processing multiple hours, this routine can take a signifigant amount
@ -154,6 +172,21 @@ def get_best_segments(hours_path, start, end, allow_holes=True):
# best_segments_by_start will give us the best available segment for each unique start time
for segment in best_segments_by_start(hour):
# === HACK ===
# We see segments in pairs of duration 5.2 and 3.133.
# The ones with duration 3.133 are not playable without the previous 5.2 segment.
# So if our segment is a 3.133, we instead combine it with the previous one
# and we treat them as an indivisible unit.
# ============
if (3.1 < segment.duration.total_seconds() < 3.2) and not no_hack:
if prev_segment is not None and prev_segment.end == segment.start:
segment = merge_segments(prev_segment, segment)
# If the segment we merged into was already in result, roll it back
# so we can re-run the logic with the new segment instead.
if result and result[-1] is prev_segment:
prev_segment = segment
# special case: first segment
if not result:
@ -350,7 +383,7 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
args = [
'-hide_banner', '-loglevel', 'error', # suppress noisy output
'-i', segment.path,
'-i', "-",
# output from ffprobe is generally already sorted but let's be paranoid,
# because the order of map args matters.
@ -374,7 +407,9 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
args += ['-f', 'mpegts', '-']
# run it
logging.info("Running segment cut with args: {}".format(" ".join(args)))
return subprocess.Popen(args, stdout=subprocess.PIPE)
proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
gevent.spawn(feed_input, [segment], proc.stdin)
return proc
def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args):
@ -439,9 +474,10 @@ def rough_cut_segments(segment_ranges, ranges):
for segment in segments:
if segment is None:
with open(segment.path, 'rb') as f:
for chunk in read_chunks(f):
yield chunk
for path in segment.paths:
with open(path, 'rb') as f:
for chunk in read_chunks(f):
yield chunk
@timed('cut', cut_type='fast', normalize=lambda ret, sr, ranges: range_total(ranges))
@ -569,9 +605,10 @@ def fast_cut_range(segments, start, end, fixts=None):
# no cutting needed, just serve the file
with open(segment.path, 'rb') as f:
for chunk in read_chunks(f):
yield fixts.feed(chunk) if fixts else chunk
for path in segment.paths:
with open(path, 'rb') as f:
for chunk in read_chunks(f):
yield fixts.feed(chunk) if fixts else chunk
if fixts:
# check for errors and indicate range is finished
@ -581,13 +618,14 @@ def feed_input(segments, pipe):
"""Write each segment's data into the given pipe in order.
This is used to provide input to ffmpeg in a full cut."""
for segment in segments:
with open(segment.path, 'rb') as f:
shutil.copyfileobj(f, pipe)
except OSError as e:
# ignore EPIPE, as this just means the end cut meant we didn't need all it
if e.errno != errno.EPIPE:
for path in segment.paths:
with open(path, 'rb') as f:
shutil.copyfileobj(f, pipe)
except OSError as e:
# ignore EPIPE, as this just means the end cut meant we didn't need all it
if e.errno != errno.EPIPE:

@ -270,7 +270,7 @@ def generate_media_playlist(channel, quality):
# (not an error because someone might ask for a specific start, no end, but we ended up with
# end before start because that's the latest time we have)
if start < end:
segments = get_best_segments(hours_path, start, end)
segments = get_best_segments(hours_path, start, end, no_hack=True)
# Note the None to indicate there was a "hole" at both start and end
segments = [None]
