From 663449498c6911042c7ba0b869a19f59b6c1e4eb Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 28 Jul 2025 00:02:16 +1000 Subject: [PATCH 1/8] downloader: Re-connect when we see a time error over 0.01s We have observed an issue on twitch where there will be a small time jump (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) and all subsequent segment timestamps will be out by this amount compared to what other downloader instances see. Our workaround for this issue is to watch for such gaps and: 1. trigger a worker refresh (which seems to fix the issue) 2. treat all subsequent segments as "suspect" so they are still saved but only used if no other source is available. --- downloader/downloader/main.py | 51 ++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 3b9ae49..18ef73e 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -49,6 +49,19 @@ ad_segments_ignored = prom.Counter( ["channel", "quality"], ) +suspicious_skew_count = prom.Counter( + "suspicious_skew_count", + "", + ["channel", "quality"], +) + +segment_time_skew = prom.Histogram( + "segment_time_skew", + "", + ["channel", "quality", "worker"], + buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10], +) + class TimedOutError(Exception): pass @@ -290,6 +303,11 @@ class StreamWorker(object): FETCH_RETRY_INTERVAL = 1 FETCH_POLL_INTERVAL = 2 + # Max difference between a segment's time + duration and the next segment's time (in seconds) + # before we consider this "suspicious" and trigger a refresh. + # See https://github.com/dbvideostriketeam/wubloader/issues/539 + MAX_SEGMENT_TIME_SKEW = 0.01 + def __init__(self, manager, quality, url, url_time): self.manager = manager self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) @@ -347,6 +365,7 @@ class StreamWorker(object): def _run(self): first = True + suspicious_skew = False while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) @@ -380,7 +399,23 @@ class StreamWorker(object): self.manager.mark_working(self) if segment.date: - date = common.dateutil.parse(segment.date) + new_date = common.dateutil.parse(segment.date) + if date is not None: + # We have observed an issue on twitch where there will be a small time jump + # (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) + # and all subsequent segment timestamps will be out by this amount compared + # to what other downloader instances see. Our workaround for this issue is to + # watch for such gaps and: + # 1. trigger a worker refresh (which seems to fix the issue) + # 2. treat all subsequent segments as "suspect" so they are still saved + # but only used if no other source is available. + skew = (date - new_date).total_seconds() + segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew) + if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: + self.trigger_new_worker() + suspicious_skew = True + suspicious_skew_count.labels(self.manager.channel, self.quality).inc() + date = new_date if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") @@ -392,6 +427,7 @@ class StreamWorker(object): self.quality, segment, date, + suspicious_skew, self.map_cache, ) gevent.spawn(self.getters[segment.uri].run) @@ -448,13 +484,14 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.quality = quality self.segment = segment self.date = date + self.suspect = suspect self.map_cache = map_cache self.prefix = self.make_path_prefix() self.retry = None # Event, set to begin retrying @@ -517,6 +554,12 @@ class SegmentGetter(object): Type may be: full: Segment is complete. Hash is included. suspect: Segment appears to be complete, but we suspect it is not. Hash is included. + This currently triggers on two conditions: + - If a segment takes a very long time to download, which we've observed to result in + partial files even though they appeared to end correctly. + - If the StreamWorker has encountered a time gap, then we suspect this segment to be + mis-timed. We have observed this where there is a small (~0.5s) time jump, then + all segments are consistently off by that amount compared to other nodes until refresh. partial: Segment is incomplete. Hash is included. temp: Segment has not been downloaded yet. A random uuid is added. """ @@ -601,7 +644,9 @@ class SegmentGetter(object): raise e else: request_duration = monotonic() - start_time - segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" + segment_type = "full" + if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME: + segment_type = "suspect" full_path = self.make_path(segment_type, hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path) From 2ecd4e0a3ead5d64585275ede3210282614ff8a8 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 29 Jul 2025 15:39:47 +1000 Subject: [PATCH 2/8] more metrics for tracking skew --- downloader/downloader/main.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 18ef73e..bf7a2ac 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -62,6 +62,18 @@ segment_time_skew = prom.Histogram( buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10], ) +segment_time_skew_non_zero_sum = prom.Gauge( + "segment_time_skew_non_zero_sum", + "", + ["channel", "quality", "worker"], +) + +segment_time_skew_non_zero_count = prom.Counter( + "segment_time_skew_non_zero_count", + "", + ["channel", "quality", "worker"], +) + class TimedOutError(Exception): pass @@ -411,6 +423,9 @@ class StreamWorker(object): # but only used if no other source is available. skew = (date - new_date).total_seconds() segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew) + if skew != 0: + segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew) + segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc() if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: self.trigger_new_worker() suspicious_skew = True From 37e225adabcea16fc732890efe64d1101cc37db5 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 29 Jul 2025 16:12:34 +1000 Subject: [PATCH 3/8] more debugging logs --- downloader/downloader/main.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index bf7a2ac..1db7856 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -401,6 +401,7 @@ class StreamWorker(object): # Start any new segment getters date = None # tracks date in case some segment doesn't include it + prev_segment = None for segment in playlist.segments: if segment.ad_reason: self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason)) @@ -427,6 +428,7 @@ class StreamWorker(object): segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew) segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc() if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: + self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}") self.trigger_new_worker() suspicious_skew = True suspicious_skew_count.labels(self.manager.channel, self.quality).inc() @@ -448,6 +450,7 @@ class StreamWorker(object): gevent.spawn(self.getters[segment.uri].run) if date is not None: date += datetime.timedelta(seconds=segment.duration) + prev_segment = segment # Clean up any old segment getters. # Note use of list() to make a copy to avoid modification-during-iteration From ebc5afd58b3eabe9fe0cb2b87a683629aed94d1a Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 17 Oct 2023 18:25:01 +1100 Subject: [PATCH 4/8] Temporary debugging: Write out last 10 media playlists after fetching each segment --- downloader/downloader/main.py | 21 +++++++++++++++++++-- downloader/downloader/providers.py | 3 ++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 1db7856..17ae89d 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -378,12 +378,15 @@ class StreamWorker(object): def _run(self): first = True suspicious_skew = False + history = [] + HISTORY_SIZE = 10 while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) try: with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): - playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) + playlist_time = datetime.datetime.utcnow() + raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) except Exception as e: self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() @@ -399,6 +402,8 @@ class StreamWorker(object): # We successfully got the playlist at least once first = False + history = [(playlist_time, raw_playlist)] + history[:HISTORY_SIZE] + # Start any new segment getters date = None # tracks date in case some segment doesn't include it prev_segment = None @@ -446,6 +451,7 @@ class StreamWorker(object): date, suspicious_skew, self.map_cache, + history, ) gevent.spawn(self.getters[segment.uri].run) if date is not None: @@ -502,7 +508,7 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel @@ -516,6 +522,7 @@ class SegmentGetter(object): self.done = gevent.event.Event() # set when file exists or we give up # Our parent's connection pool, but we'll replace it if there's any issues self.session = session + self.history = history def run(self): try: @@ -675,6 +682,16 @@ class SegmentGetter(object): stat = latest_segment.labels(channel=self.channel, quality=self.quality) timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe + self.write_history(full_path) + + def write_history(self, segment_path): + segment_path = os.path.relpath(segment_path, self.base_dir) + history_path = os.path.join(self.base_dir, "playlist-debug", segment_path) + os.makedirs(history_path) + for n, (timestamp, playlist) in enumerate(self.history): + filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")) + path = os.path.join(history_path, filename) + common.atomic_write(path, playlist) def parse_channel(channel): diff --git a/downloader/downloader/providers.py b/downloader/downloader/providers.py index f791a33..996144c 100644 --- a/downloader/downloader/providers.py +++ b/downloader/downloader/providers.py @@ -29,7 +29,8 @@ class Provider: session = InstrumentedSession() resp = session.get(uri, metric_name='get_media_playlist') resp.raise_for_status() - return hls_playlist.load(resp.text, base_uri=resp.url) + playlist = resp.text + return playlist, hls_playlist.load(playlist, base_uri=resp.url) class URLProvider(Provider): From a0c06984469c6774ee3dc1833bc1f63140c71aff Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 29 Jul 2025 17:10:15 +1000 Subject: [PATCH 5/8] playlist debug fix --- downloader/downloader/main.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 17ae89d..f36d70b 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -687,7 +687,10 @@ class SegmentGetter(object): def write_history(self, segment_path): segment_path = os.path.relpath(segment_path, self.base_dir) history_path = os.path.join(self.base_dir, "playlist-debug", segment_path) - os.makedirs(history_path) + try: + os.makedirs(history_path) + except FileExistsError: + pass for n, (timestamp, playlist) in enumerate(self.history): filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")) path = os.path.join(history_path, filename) From c13aa26726beb2353f7d223db0e32f2a4f84974a Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 30 Jul 2025 19:24:04 +1000 Subject: [PATCH 6/8] Don't have restreamer respond with metrics to non-restreamer metrics requests --- restreamer/restreamer/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index a198831..4f4ec16 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -91,10 +91,10 @@ def metrics(): """Return current metrics in prometheus metrics format""" return prom.generate_latest() -# To make nginx proxying simpler, we want to allow /metrics/* to work -@app.route('/metrics/') +# To make nginx proxying simpler, we want to allow /metrics/restreamer to work +@app.route('/metrics/restreamer') @request_stats -def metrics_with_trailing(trailing): +def metrics_with_trailing(): """Expose Prometheus metrics.""" return prom.generate_latest() From 8eec87355cd56072140421464e53c8f8855a522d Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Fri, 1 Aug 2025 12:57:47 +1000 Subject: [PATCH 7/8] backfiller: Don't backfill segments that only differ from existing segment by 1ms In the wild we've seen different servers get timestamps that differ by 1ms for segments that are otherwise identical - same content, same duration. The allowable fudge factor for segments is already 10ms, so having timing be 1ms different between servers shouldn't cause any problems. Worst case, there's a slight chance you'll get an adjacent frame when picking a cut point / thumbnail. --- backfiller/backfiller/main.py | 37 ++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index e0bee91..bec75a7 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -44,6 +44,12 @@ hash_mismatches = prom.Counter( ['remote', 'channel', 'quality', 'hour'], ) +small_difference_segments = prom.Gauge( + 'small_difference_segments', + 'Number of segments which were not pulled due to differing from existing segments by only a very small time difference', + ['remote', 'channel', 'quality', 'hour'], +) + node_list_errors = prom.Counter( 'node_list_errors', 'Number of errors fetching a list of nodes', @@ -504,9 +510,21 @@ class BackfillerWorker(object): # multiple workers request the same segment at the same time random.shuffle(missing_segments) + if quality != 'chat': + MATCH_FIELDS = ("channel", "quality", "duration", "type", "hash") + EPSILON = 0.001 + local_infos = [] + for path in local_segments: + path = os.path.join(channel, quality, hour, path) + try: + local_infos.append(common.parse_segment_path(path)) + except ValueError as e: + self.logger.warning('Local file {} could not be parsed: {}'.format(path, e)) + pool = gevent.pool.Pool(self.download_concurrency) workers = [] - + small_differences = 0 + for missing_segment in missing_segments: if self.stopping.is_set(): @@ -542,6 +560,21 @@ class BackfillerWorker(object): if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff): self.logger.debug('Skipping {} as too recent'.format(path)) continue + + # if any local segment is within 1ms of the missing segment and otherwise identical, ignore it + found = None + for local_segment in local_infos: + # if any fields differ, no match + if not all(getattr(segment, field) == getattr(local_segment, field) for field in MATCH_FIELDS): + continue + # if time difference > epsilon, no match + if abs((segment.start - local_segment.start).total_seconds()) > EPSILON: + continue + found = local_segment + break + if found is not None: + self.logger.debug(f'Skipping {path} as within {EPSILON}s of identical segment {found.path}') + continue # start segment as soon as a pool slot opens up, then track it in workers workers.append(pool.spawn( @@ -549,6 +582,8 @@ class BackfillerWorker(object): self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger )) + small_difference_segments.labels(self.node, channel, quality, hour).set(small_differences) + # verify that all the workers succeeded. if any failed, raise the exception from # one of them arbitrarily. for worker in workers: From 871a2e190942212c5b1dd3a44ab8c2754df5ce42 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sat, 2 Aug 2025 02:36:59 +1000 Subject: [PATCH 8/8] Fix connection pool warnings by increasing pool size in backfiller and downloader, the things making lots of outgoing http requests. We want these larger sizes anyway to improve performance in downloader and backfiller. --- backfiller/backfiller/main.py | 5 ++++- downloader/downloader/main.py | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index bec75a7..cb51ede 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -16,6 +16,7 @@ import argh import gevent.backdoor import gevent.pool import prometheus_client as prom +from requests.adapters import HTTPAdapter import common from common import dateutil @@ -23,8 +24,10 @@ from common import database from common.requests import InstrumentedSession from common.segments import list_segment_files, unpadded_b64_decode -# Wraps all requests in some metric collection +# Wraps all requests in some metric collection and connection pooling requests = InstrumentedSession() +adapter = HTTPAdapter(pool_maxsize=100) +requests.mount('https://', adapter) segments_backfilled = prom.Counter( 'segments_backfilled', diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index f36d70b..5186945 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -15,6 +15,7 @@ import gevent.backdoor import gevent.event import prometheus_client as prom import requests +import requests.adapters from monotonic import monotonic import common @@ -335,6 +336,8 @@ class StreamWorker(object): # This worker's SegmentGetters will use its session by default for performance, # but will fall back to a new one if something goes wrong. self.session = common.requests.InstrumentedSession() + adapter = requests.adapters.HTTPAdapter(pool_maxsize=100) + self.session.mount('https://', adapter) # Map cache is a simple cache to avoid re-downloading the same map URI for every segment, # since it's generally the same but may occasionally change. # We expect the map data to be very small so there is no eviction here.