From 065c72f71f028944d9d6880c8c418d058bd3af4a Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 28 Jul 2025 00:02:16 +1000 Subject: [PATCH] downloader: Re-connect when we see a time error over 0.01s We have observed an issue on twitch where there will be a small time jump (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) and all subsequent segment timestamps will be out by this amount compared to what other downloader instances see. Our workaround for this issue is to watch for such gaps and: 1. trigger a worker refresh (which seems to fix the issue) 2. treat all subsequent segments as "suspect" so they are still saved but only used if no other source is available. --- downloader/downloader/main.py | 61 +++++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 33e6a52..35b0239 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -50,6 +50,24 @@ ad_segments_ignored = prom.Counter( ["channel", "quality"], ) +suspicious_skew_count = prom.Counter( + "suspicious_skew_count", + "Number of times we've restarted a worker due to suspicious skew", + ["channel", "quality"], +) + +segment_time_skew_non_zero_sum = prom.Gauge( + "segment_time_skew_non_zero_sum", + "Sum of all observed segment skew amounts for worker", + ["channel", "quality", "worker"], +) + +segment_time_skew_non_zero_count = prom.Counter( + "segment_time_skew_non_zero_count", + "Count of segments with non-zero skew for worker", + ["channel", "quality", "worker"], +) + class TimedOutError(Exception): pass @@ -291,6 +309,11 @@ class StreamWorker(object): FETCH_RETRY_INTERVAL = 1 FETCH_POLL_INTERVAL = 2 + # Max difference between a segment's time + duration and the next segment's time (in seconds) + # before we consider this "suspicious" and trigger a refresh. + # See https://github.com/dbvideostriketeam/wubloader/issues/539 + MAX_SEGMENT_TIME_SKEW = 0.01 + def __init__(self, manager, quality, url, url_time): self.manager = manager self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) @@ -350,6 +373,7 @@ class StreamWorker(object): def _run(self): first = True + suspicious_skew = False while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) @@ -373,6 +397,7 @@ class StreamWorker(object): # Start any new segment getters date = None # tracks date in case some segment doesn't include it + prev_segment = None for segment in playlist.segments: if segment.ad_reason: self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason)) @@ -383,7 +408,26 @@ class StreamWorker(object): self.manager.mark_working(self) if segment.date: - date = common.dateutil.parse(segment.date) + new_date = common.dateutil.parse(segment.date) + if date is not None: + # We have observed an issue on twitch where there will be a small time jump + # (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) + # and all subsequent segment timestamps will be out by this amount compared + # to what other downloader instances see. Our workaround for this issue is to + # watch for such gaps and: + # 1. trigger a worker refresh (which seems to fix the issue) + # 2. treat all subsequent segments as "suspect" so they are still saved + # but only used if no other source is available. + skew = (date - new_date).total_seconds() + if skew != 0: + segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew) + segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc() + if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: + self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}") + self.trigger_new_worker() + suspicious_skew = True + suspicious_skew_count.labels(self.manager.channel, self.quality).inc() + date = new_date if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") @@ -395,11 +439,13 @@ class StreamWorker(object): self.quality, segment, date, + suspicious_skew, self.map_cache, ) gevent.spawn(self.getters[segment.uri].run) if date is not None: date += datetime.timedelta(seconds=segment.duration) + prev_segment = segment # Clean up any old segment getters. # Note use of list() to make a copy to avoid modification-during-iteration @@ -451,13 +497,14 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.quality = quality self.segment = segment self.date = date + self.suspect = suspect self.map_cache = map_cache self.prefix = self.make_path_prefix() self.retry = None # Event, set to begin retrying @@ -520,6 +567,12 @@ class SegmentGetter(object): Type may be: full: Segment is complete. Hash is included. suspect: Segment appears to be complete, but we suspect it is not. Hash is included. + This currently triggers on two conditions: + - If a segment takes a very long time to download, which we've observed to result in + partial files even though they appeared to end correctly. + - If the StreamWorker has encountered a time gap, then we suspect this segment to be + mis-timed. We have observed this where there is a small (~0.5s) time jump, then + all segments are consistently off by that amount compared to other nodes until refresh. partial: Segment is incomplete. Hash is included. temp: Segment has not been downloaded yet. A random uuid is added. """ @@ -604,7 +657,9 @@ class SegmentGetter(object): raise e else: request_duration = monotonic() - start_time - segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" + segment_type = "full" + if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME: + segment_type = "suspect" full_path = self.make_path(segment_type, hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path)