From 79ef0b89e49c19bd8ea7dfbc2832a5b166c306d9 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 11 Nov 2019 05:18:27 -0800 Subject: [PATCH] Add new segment type "suspect" We've noticed that when nodes have connection problems, they get full segments with different hashes. Inspection of these segments shows that they all have identical data up to a point. Segments that fetched normally will then have the remainder of the data. Segments that had issues will have a slightly corrupted end. The data is still valid, and no errors are raised. It just doesn't have all the data. We noticed that these corrupted segments all were cut off exactly 60sec after their requests began. We believe this is a server-side timeout on the request that returns whatever data it has, then closes the container file cleanly before returning successfully. We detect segments that take > 59 seconds to recieve, and label them as "suspect". Suspect segments are treated identically to partial segments, except they are always preferred over partials. --- common/common/segments.py | 14 +++++++++----- downloader/downloader/main.py | 25 +++++++++++++++++-------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/common/common/segments.py b/common/common/segments.py index 2b7e0df..f75eb2b 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -40,6 +40,7 @@ class SegmentInfo( return self.start + self.duration @property def is_partial(self): + """Note that suspect is considered partial""" return self.type != "full" @@ -60,7 +61,7 @@ def parse_segment_path(path): if len(parts) != 4: raise ValueError("Not enough dashes in filename") time, duration, type, hash = parts - if type not in ('full', 'partial', 'temp'): + if type not in ('full', 'suspect', 'partial', 'temp'): raise ValueError("Unknown type {!r}".format(type)) hash = None if type == 'temp' else unpadded_b64_decode(hash) start = None if hour is None else datetime.datetime.strptime("{}:{}".format(hour, time), "%Y-%m-%dT%H:%M:%S.%f") @@ -210,8 +211,8 @@ def hour_paths_for_range(hours_path, start, end): def best_segments_by_start(hour): """Within a given hour path, yield the "best" segment per unique segment start time. - Best is defined as non-partial, or failing that the longest partial. - Note this means this function may perform os.stat()s in order to find the longest partial. + Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial. + Note this means this function may perform os.stat()s. """ try: segment_paths = os.listdir(hour) @@ -251,8 +252,11 @@ def best_segments_by_start(hour): full_segments = [max(full_segments, key=lambda segment: (segment.duration, sizes[segment], segment.hash))] yield full_segments[0] continue - # no full segments, fall back to measuring partials. - yield max(segments, key=lambda segment: os.stat(segment.path).st_size) + # no full segments, fall back to measuring partials. Prefer suspect over partial. + yield max(segments, key=lambda segment: ( + 1 if segment.type == 'suspect' else 0, + os.stat(segment.path).st_size, + )) def streams_info(segment): diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 03bf30f..060b3b3 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -27,14 +27,14 @@ import common.requests segments_downloaded = prom.Counter( "segments_downloaded", "Number of segments either partially or fully downloaded", - ["partial", "channel", "quality"], + ["type", "channel", "quality"], ) segment_duration_downloaded = prom.Counter( "segment_duration_downloaded", "Total duration of all segments partially or fully downloaded. " "Note partial segments still count the full duration.", - ["partial", "channel", "quality"], + ["type", "channel", "quality"], ) latest_segment = prom.Gauge( @@ -420,6 +420,11 @@ class SegmentGetter(object): # full timeout is for the entire download and stream to disk. FETCH_HEADERS_TIMEOUTS = 5, 60 FETCH_FULL_TIMEOUTS = 15, 240 + # Experimentally, we've observed that after 60s, stuck requests will be terminated + # by twitch with incomplete but valid data, without anything indicating an error. + # We assume anything longer than 60s is "suspect", not to be used if we have a + # version that was fetched in a more timely manner. + FETCH_SUSPECT_TIME = 59 # Overall timeout on the Getter before giving up, to prevent always-failing Getters # from growing without bound and causing resource exhaustion issues. # The longest we've observed in the wild before a segment goes un-fetchable is 7min @@ -493,6 +498,7 @@ class SegmentGetter(object): """Generate filepath for the segment. Type may be: full: Segment is complete. Hash is included. + suspect: Segment appears to be complete, but we suspect it is not. Hash is included. partial: Segment is incomplete. Hash is included. temp: Segment has not been downloaded yet. A random uuid is added. """ @@ -500,7 +506,7 @@ class SegmentGetter(object): return "{}-{}-{}.ts".format(self.prefix, type, arg) def exists(self): - """Look for an existing, full (non-partial) copy of this segment. Return bool.""" + """Look for an existing, full (non-partial, non-suspect) copy of this segment. Return bool.""" dirname = os.path.dirname(self.prefix) try: candidates = os.listdir(dirname) @@ -530,6 +536,7 @@ class SegmentGetter(object): file_created = False try: self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) + start_time = monotonic() with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment') @@ -556,15 +563,17 @@ class SegmentGetter(object): partial_path = self.make_path("partial", hash) self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) common.rename(temp_path, partial_path) - segments_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc() - segment_duration_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc(self.segment.duration) + segments_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc() + segment_duration_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc(self.segment.duration) raise ex_type, ex, tb else: - full_path = self.make_path("full", hash) + request_duration = monotonic() - start_time + segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" + full_path = self.make_path(segment_type, hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path) - segments_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc() - segment_duration_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc(self.segment.duration) + segments_downloaded.labels(type=segment_type, channel=self.channel, quality=self.quality).inc() + segment_duration_downloaded.labels(type=segment_type, channel=self.channel, quality=self.quality).inc(self.segment.duration) # Prom doesn't provide a way to compare value to gauge's existing value, # we need to reach into internals stat = latest_segment.labels(channel=self.channel, quality=self.quality)