diff --git a/common/common/segments.py b/common/common/segments.py index 2b7e0df..f75eb2b 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -40,6 +40,7 @@ class SegmentInfo( return self.start + self.duration @property def is_partial(self): + """Note that suspect is considered partial""" return self.type != "full" @@ -60,7 +61,7 @@ def parse_segment_path(path): if len(parts) != 4: raise ValueError("Not enough dashes in filename") time, duration, type, hash = parts - if type not in ('full', 'partial', 'temp'): + if type not in ('full', 'suspect', 'partial', 'temp'): raise ValueError("Unknown type {!r}".format(type)) hash = None if type == 'temp' else unpadded_b64_decode(hash) start = None if hour is None else datetime.datetime.strptime("{}:{}".format(hour, time), "%Y-%m-%dT%H:%M:%S.%f") @@ -210,8 +211,8 @@ def hour_paths_for_range(hours_path, start, end): def best_segments_by_start(hour): """Within a given hour path, yield the "best" segment per unique segment start time. - Best is defined as non-partial, or failing that the longest partial. - Note this means this function may perform os.stat()s in order to find the longest partial. + Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial. + Note this means this function may perform os.stat()s. """ try: segment_paths = os.listdir(hour) @@ -251,8 +252,11 @@ def best_segments_by_start(hour): full_segments = [max(full_segments, key=lambda segment: (segment.duration, sizes[segment], segment.hash))] yield full_segments[0] continue - # no full segments, fall back to measuring partials. - yield max(segments, key=lambda segment: os.stat(segment.path).st_size) + # no full segments, fall back to measuring partials. Prefer suspect over partial. + yield max(segments, key=lambda segment: ( + 1 if segment.type == 'suspect' else 0, + os.stat(segment.path).st_size, + )) def streams_info(segment): diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 03bf30f..060b3b3 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -27,14 +27,14 @@ import common.requests segments_downloaded = prom.Counter( "segments_downloaded", "Number of segments either partially or fully downloaded", - ["partial", "channel", "quality"], + ["type", "channel", "quality"], ) segment_duration_downloaded = prom.Counter( "segment_duration_downloaded", "Total duration of all segments partially or fully downloaded. " "Note partial segments still count the full duration.", - ["partial", "channel", "quality"], + ["type", "channel", "quality"], ) latest_segment = prom.Gauge( @@ -420,6 +420,11 @@ class SegmentGetter(object): # full timeout is for the entire download and stream to disk. FETCH_HEADERS_TIMEOUTS = 5, 60 FETCH_FULL_TIMEOUTS = 15, 240 + # Experimentally, we've observed that after 60s, stuck requests will be terminated + # by twitch with incomplete but valid data, without anything indicating an error. + # We assume anything longer than 60s is "suspect", not to be used if we have a + # version that was fetched in a more timely manner. + FETCH_SUSPECT_TIME = 59 # Overall timeout on the Getter before giving up, to prevent always-failing Getters # from growing without bound and causing resource exhaustion issues. # The longest we've observed in the wild before a segment goes un-fetchable is 7min @@ -493,6 +498,7 @@ class SegmentGetter(object): """Generate filepath for the segment. Type may be: full: Segment is complete. Hash is included. + suspect: Segment appears to be complete, but we suspect it is not. Hash is included. partial: Segment is incomplete. Hash is included. temp: Segment has not been downloaded yet. A random uuid is added. """ @@ -500,7 +506,7 @@ class SegmentGetter(object): return "{}-{}-{}.ts".format(self.prefix, type, arg) def exists(self): - """Look for an existing, full (non-partial) copy of this segment. Return bool.""" + """Look for an existing, full (non-partial, non-suspect) copy of this segment. Return bool.""" dirname = os.path.dirname(self.prefix) try: candidates = os.listdir(dirname) @@ -530,6 +536,7 @@ class SegmentGetter(object): file_created = False try: self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) + start_time = monotonic() with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment') @@ -556,15 +563,17 @@ class SegmentGetter(object): partial_path = self.make_path("partial", hash) self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) common.rename(temp_path, partial_path) - segments_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc() - segment_duration_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc(self.segment.duration) + segments_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc() + segment_duration_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc(self.segment.duration) raise ex_type, ex, tb else: - full_path = self.make_path("full", hash) + request_duration = monotonic() - start_time + segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" + full_path = self.make_path(segment_type, hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path) - segments_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc() - segment_duration_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc(self.segment.duration) + segments_downloaded.labels(type=segment_type, channel=self.channel, quality=self.quality).inc() + segment_duration_downloaded.labels(type=segment_type, channel=self.channel, quality=self.quality).inc(self.segment.duration) # Prom doesn't provide a way to compare value to gauge's existing value, # we need to reach into internals stat = latest_segment.labels(channel=self.channel, quality=self.quality)