From 663449498c6911042c7ba0b869a19f59b6c1e4eb Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 28 Jul 2025 00:02:16 +1000 Subject: [PATCH 1/6] downloader: Re-connect when we see a time error over 0.01s We have observed an issue on twitch where there will be a small time jump (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) and all subsequent segment timestamps will be out by this amount compared to what other downloader instances see. Our workaround for this issue is to watch for such gaps and: 1. trigger a worker refresh (which seems to fix the issue) 2. treat all subsequent segments as "suspect" so they are still saved but only used if no other source is available. --- downloader/downloader/main.py | 51 ++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 3b9ae49..18ef73e 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -49,6 +49,19 @@ ad_segments_ignored = prom.Counter( ["channel", "quality"], ) +suspicious_skew_count = prom.Counter( + "suspicious_skew_count", + "", + ["channel", "quality"], +) + +segment_time_skew = prom.Histogram( + "segment_time_skew", + "", + ["channel", "quality", "worker"], + buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10], +) + class TimedOutError(Exception): pass @@ -290,6 +303,11 @@ class StreamWorker(object): FETCH_RETRY_INTERVAL = 1 FETCH_POLL_INTERVAL = 2 + # Max difference between a segment's time + duration and the next segment's time (in seconds) + # before we consider this "suspicious" and trigger a refresh. + # See https://github.com/dbvideostriketeam/wubloader/issues/539 + MAX_SEGMENT_TIME_SKEW = 0.01 + def __init__(self, manager, quality, url, url_time): self.manager = manager self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) @@ -347,6 +365,7 @@ class StreamWorker(object): def _run(self): first = True + suspicious_skew = False while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) @@ -380,7 +399,23 @@ class StreamWorker(object): self.manager.mark_working(self) if segment.date: - date = common.dateutil.parse(segment.date) + new_date = common.dateutil.parse(segment.date) + if date is not None: + # We have observed an issue on twitch where there will be a small time jump + # (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) + # and all subsequent segment timestamps will be out by this amount compared + # to what other downloader instances see. Our workaround for this issue is to + # watch for such gaps and: + # 1. trigger a worker refresh (which seems to fix the issue) + # 2. treat all subsequent segments as "suspect" so they are still saved + # but only used if no other source is available. + skew = (date - new_date).total_seconds() + segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew) + if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: + self.trigger_new_worker() + suspicious_skew = True + suspicious_skew_count.labels(self.manager.channel, self.quality).inc() + date = new_date if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") @@ -392,6 +427,7 @@ class StreamWorker(object): self.quality, segment, date, + suspicious_skew, self.map_cache, ) gevent.spawn(self.getters[segment.uri].run) @@ -448,13 +484,14 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.quality = quality self.segment = segment self.date = date + self.suspect = suspect self.map_cache = map_cache self.prefix = self.make_path_prefix() self.retry = None # Event, set to begin retrying @@ -517,6 +554,12 @@ class SegmentGetter(object): Type may be: full: Segment is complete. Hash is included. suspect: Segment appears to be complete, but we suspect it is not. Hash is included. + This currently triggers on two conditions: + - If a segment takes a very long time to download, which we've observed to result in + partial files even though they appeared to end correctly. + - If the StreamWorker has encountered a time gap, then we suspect this segment to be + mis-timed. We have observed this where there is a small (~0.5s) time jump, then + all segments are consistently off by that amount compared to other nodes until refresh. partial: Segment is incomplete. Hash is included. temp: Segment has not been downloaded yet. A random uuid is added. """ @@ -601,7 +644,9 @@ class SegmentGetter(object): raise e else: request_duration = monotonic() - start_time - segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" + segment_type = "full" + if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME: + segment_type = "suspect" full_path = self.make_path(segment_type, hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path) From 2ecd4e0a3ead5d64585275ede3210282614ff8a8 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 29 Jul 2025 15:39:47 +1000 Subject: [PATCH 2/6] more metrics for tracking skew --- downloader/downloader/main.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 18ef73e..bf7a2ac 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -62,6 +62,18 @@ segment_time_skew = prom.Histogram( buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10], ) +segment_time_skew_non_zero_sum = prom.Gauge( + "segment_time_skew_non_zero_sum", + "", + ["channel", "quality", "worker"], +) + +segment_time_skew_non_zero_count = prom.Counter( + "segment_time_skew_non_zero_count", + "", + ["channel", "quality", "worker"], +) + class TimedOutError(Exception): pass @@ -411,6 +423,9 @@ class StreamWorker(object): # but only used if no other source is available. skew = (date - new_date).total_seconds() segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew) + if skew != 0: + segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew) + segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc() if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: self.trigger_new_worker() suspicious_skew = True From 37e225adabcea16fc732890efe64d1101cc37db5 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 29 Jul 2025 16:12:34 +1000 Subject: [PATCH 3/6] more debugging logs --- downloader/downloader/main.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index bf7a2ac..1db7856 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -401,6 +401,7 @@ class StreamWorker(object): # Start any new segment getters date = None # tracks date in case some segment doesn't include it + prev_segment = None for segment in playlist.segments: if segment.ad_reason: self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason)) @@ -427,6 +428,7 @@ class StreamWorker(object): segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew) segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc() if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: + self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}") self.trigger_new_worker() suspicious_skew = True suspicious_skew_count.labels(self.manager.channel, self.quality).inc() @@ -448,6 +450,7 @@ class StreamWorker(object): gevent.spawn(self.getters[segment.uri].run) if date is not None: date += datetime.timedelta(seconds=segment.duration) + prev_segment = segment # Clean up any old segment getters. # Note use of list() to make a copy to avoid modification-during-iteration From ebc5afd58b3eabe9fe0cb2b87a683629aed94d1a Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 17 Oct 2023 18:25:01 +1100 Subject: [PATCH 4/6] Temporary debugging: Write out last 10 media playlists after fetching each segment --- downloader/downloader/main.py | 21 +++++++++++++++++++-- downloader/downloader/providers.py | 3 ++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 1db7856..17ae89d 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -378,12 +378,15 @@ class StreamWorker(object): def _run(self): first = True suspicious_skew = False + history = [] + HISTORY_SIZE = 10 while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) try: with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): - playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) + playlist_time = datetime.datetime.utcnow() + raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) except Exception as e: self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() @@ -399,6 +402,8 @@ class StreamWorker(object): # We successfully got the playlist at least once first = False + history = [(playlist_time, raw_playlist)] + history[:HISTORY_SIZE] + # Start any new segment getters date = None # tracks date in case some segment doesn't include it prev_segment = None @@ -446,6 +451,7 @@ class StreamWorker(object): date, suspicious_skew, self.map_cache, + history, ) gevent.spawn(self.getters[segment.uri].run) if date is not None: @@ -502,7 +508,7 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel @@ -516,6 +522,7 @@ class SegmentGetter(object): self.done = gevent.event.Event() # set when file exists or we give up # Our parent's connection pool, but we'll replace it if there's any issues self.session = session + self.history = history def run(self): try: @@ -675,6 +682,16 @@ class SegmentGetter(object): stat = latest_segment.labels(channel=self.channel, quality=self.quality) timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe + self.write_history(full_path) + + def write_history(self, segment_path): + segment_path = os.path.relpath(segment_path, self.base_dir) + history_path = os.path.join(self.base_dir, "playlist-debug", segment_path) + os.makedirs(history_path) + for n, (timestamp, playlist) in enumerate(self.history): + filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")) + path = os.path.join(history_path, filename) + common.atomic_write(path, playlist) def parse_channel(channel): diff --git a/downloader/downloader/providers.py b/downloader/downloader/providers.py index f791a33..996144c 100644 --- a/downloader/downloader/providers.py +++ b/downloader/downloader/providers.py @@ -29,7 +29,8 @@ class Provider: session = InstrumentedSession() resp = session.get(uri, metric_name='get_media_playlist') resp.raise_for_status() - return hls_playlist.load(resp.text, base_uri=resp.url) + playlist = resp.text + return playlist, hls_playlist.load(playlist, base_uri=resp.url) class URLProvider(Provider): From a0c06984469c6774ee3dc1833bc1f63140c71aff Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 29 Jul 2025 17:10:15 +1000 Subject: [PATCH 5/6] playlist debug fix --- downloader/downloader/main.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 17ae89d..f36d70b 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -687,7 +687,10 @@ class SegmentGetter(object): def write_history(self, segment_path): segment_path = os.path.relpath(segment_path, self.base_dir) history_path = os.path.join(self.base_dir, "playlist-debug", segment_path) - os.makedirs(history_path) + try: + os.makedirs(history_path) + except FileExistsError: + pass for n, (timestamp, playlist) in enumerate(self.history): filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")) path = os.path.join(history_path, filename) From c13aa26726beb2353f7d223db0e32f2a4f84974a Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 30 Jul 2025 19:24:04 +1000 Subject: [PATCH 6/6] Don't have restreamer respond with metrics to non-restreamer metrics requests --- restreamer/restreamer/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index a198831..4f4ec16 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -91,10 +91,10 @@ def metrics(): """Return current metrics in prometheus metrics format""" return prom.generate_latest() -# To make nginx proxying simpler, we want to allow /metrics/* to work -@app.route('/metrics/') +# To make nginx proxying simpler, we want to allow /metrics/restreamer to work +@app.route('/metrics/restreamer') @request_stats -def metrics_with_trailing(trailing): +def metrics_with_trailing(): """Expose Prometheus metrics.""" return prom.generate_latest()