Add new segment type "suspect"

We've noticed that when nodes have connection problems, they get full segments
with different hashes. Inspection of these segments shows that
they all have identical data up to a point.

Segments that fetched normally will then have the remainder of the data.
Segments that had issues will have a slightly corrupted end.
The data is still valid, and no errors are raised. It just doesn't have all the data.

We noticed that these corrupted segments all were cut off exactly 60sec after their requests
began. We believe this is a server-side timeout on the request that returns whatever data
it has, then closes the container file cleanly before returning successfully.

We detect segments that take > 59 seconds to recieve, and label them as "suspect".

Suspect segments are treated identically to partial segments, except they are always preferred
over partials.
pull/158/head
Mike Lang 5 years ago
parent bb05e37ae4
commit 79ef0b89e4

@ -40,6 +40,7 @@ class SegmentInfo(
return self.start + self.duration return self.start + self.duration
@property @property
def is_partial(self): def is_partial(self):
"""Note that suspect is considered partial"""
return self.type != "full" return self.type != "full"
@ -60,7 +61,7 @@ def parse_segment_path(path):
if len(parts) != 4: if len(parts) != 4:
raise ValueError("Not enough dashes in filename") raise ValueError("Not enough dashes in filename")
time, duration, type, hash = parts time, duration, type, hash = parts
if type not in ('full', 'partial', 'temp'): if type not in ('full', 'suspect', 'partial', 'temp'):
raise ValueError("Unknown type {!r}".format(type)) raise ValueError("Unknown type {!r}".format(type))
hash = None if type == 'temp' else unpadded_b64_decode(hash) hash = None if type == 'temp' else unpadded_b64_decode(hash)
start = None if hour is None else datetime.datetime.strptime("{}:{}".format(hour, time), "%Y-%m-%dT%H:%M:%S.%f") start = None if hour is None else datetime.datetime.strptime("{}:{}".format(hour, time), "%Y-%m-%dT%H:%M:%S.%f")
@ -210,8 +211,8 @@ def hour_paths_for_range(hours_path, start, end):
def best_segments_by_start(hour): def best_segments_by_start(hour):
"""Within a given hour path, yield the "best" segment per unique segment start time. """Within a given hour path, yield the "best" segment per unique segment start time.
Best is defined as non-partial, or failing that the longest partial. Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial.
Note this means this function may perform os.stat()s in order to find the longest partial. Note this means this function may perform os.stat()s.
""" """
try: try:
segment_paths = os.listdir(hour) segment_paths = os.listdir(hour)
@ -251,8 +252,11 @@ def best_segments_by_start(hour):
full_segments = [max(full_segments, key=lambda segment: (segment.duration, sizes[segment], segment.hash))] full_segments = [max(full_segments, key=lambda segment: (segment.duration, sizes[segment], segment.hash))]
yield full_segments[0] yield full_segments[0]
continue continue
# no full segments, fall back to measuring partials. # no full segments, fall back to measuring partials. Prefer suspect over partial.
yield max(segments, key=lambda segment: os.stat(segment.path).st_size) yield max(segments, key=lambda segment: (
1 if segment.type == 'suspect' else 0,
os.stat(segment.path).st_size,
))
def streams_info(segment): def streams_info(segment):

@ -27,14 +27,14 @@ import common.requests
segments_downloaded = prom.Counter( segments_downloaded = prom.Counter(
"segments_downloaded", "segments_downloaded",
"Number of segments either partially or fully downloaded", "Number of segments either partially or fully downloaded",
["partial", "channel", "quality"], ["type", "channel", "quality"],
) )
segment_duration_downloaded = prom.Counter( segment_duration_downloaded = prom.Counter(
"segment_duration_downloaded", "segment_duration_downloaded",
"Total duration of all segments partially or fully downloaded. " "Total duration of all segments partially or fully downloaded. "
"Note partial segments still count the full duration.", "Note partial segments still count the full duration.",
["partial", "channel", "quality"], ["type", "channel", "quality"],
) )
latest_segment = prom.Gauge( latest_segment = prom.Gauge(
@ -420,6 +420,11 @@ class SegmentGetter(object):
# full timeout is for the entire download and stream to disk. # full timeout is for the entire download and stream to disk.
FETCH_HEADERS_TIMEOUTS = 5, 60 FETCH_HEADERS_TIMEOUTS = 5, 60
FETCH_FULL_TIMEOUTS = 15, 240 FETCH_FULL_TIMEOUTS = 15, 240
# Experimentally, we've observed that after 60s, stuck requests will be terminated
# by twitch with incomplete but valid data, without anything indicating an error.
# We assume anything longer than 60s is "suspect", not to be used if we have a
# version that was fetched in a more timely manner.
FETCH_SUSPECT_TIME = 59
# Overall timeout on the Getter before giving up, to prevent always-failing Getters # Overall timeout on the Getter before giving up, to prevent always-failing Getters
# from growing without bound and causing resource exhaustion issues. # from growing without bound and causing resource exhaustion issues.
# The longest we've observed in the wild before a segment goes un-fetchable is 7min # The longest we've observed in the wild before a segment goes un-fetchable is 7min
@ -493,6 +498,7 @@ class SegmentGetter(object):
"""Generate filepath for the segment. """Generate filepath for the segment.
Type may be: Type may be:
full: Segment is complete. Hash is included. full: Segment is complete. Hash is included.
suspect: Segment appears to be complete, but we suspect it is not. Hash is included.
partial: Segment is incomplete. Hash is included. partial: Segment is incomplete. Hash is included.
temp: Segment has not been downloaded yet. A random uuid is added. temp: Segment has not been downloaded yet. A random uuid is added.
""" """
@ -500,7 +506,7 @@ class SegmentGetter(object):
return "{}-{}-{}.ts".format(self.prefix, type, arg) return "{}-{}-{}.ts".format(self.prefix, type, arg)
def exists(self): def exists(self):
"""Look for an existing, full (non-partial) copy of this segment. Return bool.""" """Look for an existing, full (non-partial, non-suspect) copy of this segment. Return bool."""
dirname = os.path.dirname(self.prefix) dirname = os.path.dirname(self.prefix)
try: try:
candidates = os.listdir(dirname) candidates = os.listdir(dirname)
@ -530,6 +536,7 @@ class SegmentGetter(object):
file_created = False file_created = False
try: try:
self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path))
start_time = monotonic()
with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set):
with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set):
resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment') resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment')
@ -556,15 +563,17 @@ class SegmentGetter(object):
partial_path = self.make_path("partial", hash) partial_path = self.make_path("partial", hash)
self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path))
common.rename(temp_path, partial_path) common.rename(temp_path, partial_path)
segments_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc() segments_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc()
segment_duration_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc(self.segment.duration) segment_duration_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc(self.segment.duration)
raise ex_type, ex, tb raise ex_type, ex, tb
else: else:
full_path = self.make_path("full", hash) request_duration = monotonic() - start_time
segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect"
full_path = self.make_path(segment_type, hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path) common.rename(temp_path, full_path)
segments_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc() segments_downloaded.labels(type=segment_type, channel=self.channel, quality=self.quality).inc()
segment_duration_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc(self.segment.duration) segment_duration_downloaded.labels(type=segment_type, channel=self.channel, quality=self.quality).inc(self.segment.duration)
# Prom doesn't provide a way to compare value to gauge's existing value, # Prom doesn't provide a way to compare value to gauge's existing value,
# we need to reach into internals # we need to reach into internals
stat = latest_segment.labels(channel=self.channel, quality=self.quality) stat = latest_segment.labels(channel=self.channel, quality=self.quality)

Loading…
Cancel
Save