From 663449498c6911042c7ba0b869a19f59b6c1e4eb Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 28 Jul 2025 00:02:16 +1000 Subject: [PATCH] downloader: Re-connect when we see a time error over 0.01s We have observed an issue on twitch where there will be a small time jump (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) and all subsequent segment timestamps will be out by this amount compared to what other downloader instances see. Our workaround for this issue is to watch for such gaps and: 1. trigger a worker refresh (which seems to fix the issue) 2. treat all subsequent segments as "suspect" so they are still saved but only used if no other source is available. --- downloader/downloader/main.py | 51 ++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 3b9ae49..18ef73e 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -49,6 +49,19 @@ ad_segments_ignored = prom.Counter( ["channel", "quality"], ) +suspicious_skew_count = prom.Counter( + "suspicious_skew_count", + "", + ["channel", "quality"], +) + +segment_time_skew = prom.Histogram( + "segment_time_skew", + "", + ["channel", "quality", "worker"], + buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10], +) + class TimedOutError(Exception): pass @@ -290,6 +303,11 @@ class StreamWorker(object): FETCH_RETRY_INTERVAL = 1 FETCH_POLL_INTERVAL = 2 + # Max difference between a segment's time + duration and the next segment's time (in seconds) + # before we consider this "suspicious" and trigger a refresh. + # See https://github.com/dbvideostriketeam/wubloader/issues/539 + MAX_SEGMENT_TIME_SKEW = 0.01 + def __init__(self, manager, quality, url, url_time): self.manager = manager self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) @@ -347,6 +365,7 @@ class StreamWorker(object): def _run(self): first = True + suspicious_skew = False while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) @@ -380,7 +399,23 @@ class StreamWorker(object): self.manager.mark_working(self) if segment.date: - date = common.dateutil.parse(segment.date) + new_date = common.dateutil.parse(segment.date) + if date is not None: + # We have observed an issue on twitch where there will be a small time jump + # (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) + # and all subsequent segment timestamps will be out by this amount compared + # to what other downloader instances see. Our workaround for this issue is to + # watch for such gaps and: + # 1. trigger a worker refresh (which seems to fix the issue) + # 2. treat all subsequent segments as "suspect" so they are still saved + # but only used if no other source is available. + skew = (date - new_date).total_seconds() + segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew) + if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: + self.trigger_new_worker() + suspicious_skew = True + suspicious_skew_count.labels(self.manager.channel, self.quality).inc() + date = new_date if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") @@ -392,6 +427,7 @@ class StreamWorker(object): self.quality, segment, date, + suspicious_skew, self.map_cache, ) gevent.spawn(self.getters[segment.uri].run) @@ -448,13 +484,14 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.quality = quality self.segment = segment self.date = date + self.suspect = suspect self.map_cache = map_cache self.prefix = self.make_path_prefix() self.retry = None # Event, set to begin retrying @@ -517,6 +554,12 @@ class SegmentGetter(object): Type may be: full: Segment is complete. Hash is included. suspect: Segment appears to be complete, but we suspect it is not. Hash is included. + This currently triggers on two conditions: + - If a segment takes a very long time to download, which we've observed to result in + partial files even though they appeared to end correctly. + - If the StreamWorker has encountered a time gap, then we suspect this segment to be + mis-timed. We have observed this where there is a small (~0.5s) time jump, then + all segments are consistently off by that amount compared to other nodes until refresh. partial: Segment is incomplete. Hash is included. temp: Segment has not been downloaded yet. A random uuid is added. """ @@ -601,7 +644,9 @@ class SegmentGetter(object): raise e else: request_duration = monotonic() - start_time - segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" + segment_type = "full" + if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME: + segment_type = "suspect" full_path = self.make_path(segment_type, hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path)