diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index e0bee91..cb51ede 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -16,6 +16,7 @@ import argh import gevent.backdoor import gevent.pool import prometheus_client as prom +from requests.adapters import HTTPAdapter import common from common import dateutil @@ -23,8 +24,10 @@ from common import database from common.requests import InstrumentedSession from common.segments import list_segment_files, unpadded_b64_decode -# Wraps all requests in some metric collection +# Wraps all requests in some metric collection and connection pooling requests = InstrumentedSession() +adapter = HTTPAdapter(pool_maxsize=100) +requests.mount('https://', adapter) segments_backfilled = prom.Counter( 'segments_backfilled', @@ -44,6 +47,12 @@ hash_mismatches = prom.Counter( ['remote', 'channel', 'quality', 'hour'], ) +small_difference_segments = prom.Gauge( + 'small_difference_segments', + 'Number of segments which were not pulled due to differing from existing segments by only a very small time difference', + ['remote', 'channel', 'quality', 'hour'], +) + node_list_errors = prom.Counter( 'node_list_errors', 'Number of errors fetching a list of nodes', @@ -504,9 +513,21 @@ class BackfillerWorker(object): # multiple workers request the same segment at the same time random.shuffle(missing_segments) + if quality != 'chat': + MATCH_FIELDS = ("channel", "quality", "duration", "type", "hash") + EPSILON = 0.001 + local_infos = [] + for path in local_segments: + path = os.path.join(channel, quality, hour, path) + try: + local_infos.append(common.parse_segment_path(path)) + except ValueError as e: + self.logger.warning('Local file {} could not be parsed: {}'.format(path, e)) + pool = gevent.pool.Pool(self.download_concurrency) workers = [] - + small_differences = 0 + for missing_segment in missing_segments: if self.stopping.is_set(): @@ -542,6 +563,21 @@ class BackfillerWorker(object): if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff): self.logger.debug('Skipping {} as too recent'.format(path)) continue + + # if any local segment is within 1ms of the missing segment and otherwise identical, ignore it + found = None + for local_segment in local_infos: + # if any fields differ, no match + if not all(getattr(segment, field) == getattr(local_segment, field) for field in MATCH_FIELDS): + continue + # if time difference > epsilon, no match + if abs((segment.start - local_segment.start).total_seconds()) > EPSILON: + continue + found = local_segment + break + if found is not None: + self.logger.debug(f'Skipping {path} as within {EPSILON}s of identical segment {found.path}') + continue # start segment as soon as a pool slot opens up, then track it in workers workers.append(pool.spawn( @@ -549,6 +585,8 @@ class BackfillerWorker(object): self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger )) + small_difference_segments.labels(self.node, channel, quality, hour).set(small_differences) + # verify that all the workers succeeded. if any failed, raise the exception from # one of them arbitrarily. for worker in workers: diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 3b9ae49..5186945 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -15,6 +15,7 @@ import gevent.backdoor import gevent.event import prometheus_client as prom import requests +import requests.adapters from monotonic import monotonic import common @@ -49,6 +50,31 @@ ad_segments_ignored = prom.Counter( ["channel", "quality"], ) +suspicious_skew_count = prom.Counter( + "suspicious_skew_count", + "", + ["channel", "quality"], +) + +segment_time_skew = prom.Histogram( + "segment_time_skew", + "", + ["channel", "quality", "worker"], + buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10], +) + +segment_time_skew_non_zero_sum = prom.Gauge( + "segment_time_skew_non_zero_sum", + "", + ["channel", "quality", "worker"], +) + +segment_time_skew_non_zero_count = prom.Counter( + "segment_time_skew_non_zero_count", + "", + ["channel", "quality", "worker"], +) + class TimedOutError(Exception): pass @@ -290,6 +316,11 @@ class StreamWorker(object): FETCH_RETRY_INTERVAL = 1 FETCH_POLL_INTERVAL = 2 + # Max difference between a segment's time + duration and the next segment's time (in seconds) + # before we consider this "suspicious" and trigger a refresh. + # See https://github.com/dbvideostriketeam/wubloader/issues/539 + MAX_SEGMENT_TIME_SKEW = 0.01 + def __init__(self, manager, quality, url, url_time): self.manager = manager self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) @@ -305,6 +336,8 @@ class StreamWorker(object): # This worker's SegmentGetters will use its session by default for performance, # but will fall back to a new one if something goes wrong. self.session = common.requests.InstrumentedSession() + adapter = requests.adapters.HTTPAdapter(pool_maxsize=100) + self.session.mount('https://', adapter) # Map cache is a simple cache to avoid re-downloading the same map URI for every segment, # since it's generally the same but may occasionally change. # We expect the map data to be very small so there is no eviction here. @@ -347,12 +380,16 @@ class StreamWorker(object): def _run(self): first = True + suspicious_skew = False + history = [] + HISTORY_SIZE = 10 while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) try: with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): - playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) + playlist_time = datetime.datetime.utcnow() + raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) except Exception as e: self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() @@ -368,8 +405,11 @@ class StreamWorker(object): # We successfully got the playlist at least once first = False + history = [(playlist_time, raw_playlist)] + history[:HISTORY_SIZE] + # Start any new segment getters date = None # tracks date in case some segment doesn't include it + prev_segment = None for segment in playlist.segments: if segment.ad_reason: self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason)) @@ -380,7 +420,27 @@ class StreamWorker(object): self.manager.mark_working(self) if segment.date: - date = common.dateutil.parse(segment.date) + new_date = common.dateutil.parse(segment.date) + if date is not None: + # We have observed an issue on twitch where there will be a small time jump + # (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7) + # and all subsequent segment timestamps will be out by this amount compared + # to what other downloader instances see. Our workaround for this issue is to + # watch for such gaps and: + # 1. trigger a worker refresh (which seems to fix the issue) + # 2. treat all subsequent segments as "suspect" so they are still saved + # but only used if no other source is available. + skew = (date - new_date).total_seconds() + segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew) + if skew != 0: + segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew) + segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc() + if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew: + self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}") + self.trigger_new_worker() + suspicious_skew = True + suspicious_skew_count.labels(self.manager.channel, self.quality).inc() + date = new_date if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") @@ -392,11 +452,14 @@ class StreamWorker(object): self.quality, segment, date, + suspicious_skew, self.map_cache, + history, ) gevent.spawn(self.getters[segment.uri].run) if date is not None: date += datetime.timedelta(seconds=segment.duration) + prev_segment = segment # Clean up any old segment getters. # Note use of list() to make a copy to avoid modification-during-iteration @@ -448,19 +511,21 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.quality = quality self.segment = segment self.date = date + self.suspect = suspect self.map_cache = map_cache self.prefix = self.make_path_prefix() self.retry = None # Event, set to begin retrying self.done = gevent.event.Event() # set when file exists or we give up # Our parent's connection pool, but we'll replace it if there's any issues self.session = session + self.history = history def run(self): try: @@ -517,6 +582,12 @@ class SegmentGetter(object): Type may be: full: Segment is complete. Hash is included. suspect: Segment appears to be complete, but we suspect it is not. Hash is included. + This currently triggers on two conditions: + - If a segment takes a very long time to download, which we've observed to result in + partial files even though they appeared to end correctly. + - If the StreamWorker has encountered a time gap, then we suspect this segment to be + mis-timed. We have observed this where there is a small (~0.5s) time jump, then + all segments are consistently off by that amount compared to other nodes until refresh. partial: Segment is incomplete. Hash is included. temp: Segment has not been downloaded yet. A random uuid is added. """ @@ -601,7 +672,9 @@ class SegmentGetter(object): raise e else: request_duration = monotonic() - start_time - segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" + segment_type = "full" + if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME: + segment_type = "suspect" full_path = self.make_path(segment_type, hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path) @@ -612,6 +685,19 @@ class SegmentGetter(object): stat = latest_segment.labels(channel=self.channel, quality=self.quality) timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe + self.write_history(full_path) + + def write_history(self, segment_path): + segment_path = os.path.relpath(segment_path, self.base_dir) + history_path = os.path.join(self.base_dir, "playlist-debug", segment_path) + try: + os.makedirs(history_path) + except FileExistsError: + pass + for n, (timestamp, playlist) in enumerate(self.history): + filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")) + path = os.path.join(history_path, filename) + common.atomic_write(path, playlist) def parse_channel(channel): diff --git a/downloader/downloader/providers.py b/downloader/downloader/providers.py index f791a33..996144c 100644 --- a/downloader/downloader/providers.py +++ b/downloader/downloader/providers.py @@ -29,7 +29,8 @@ class Provider: session = InstrumentedSession() resp = session.get(uri, metric_name='get_media_playlist') resp.raise_for_status() - return hls_playlist.load(resp.text, base_uri=resp.url) + playlist = resp.text + return playlist, hls_playlist.load(playlist, base_uri=resp.url) class URLProvider(Provider): diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index a198831..4f4ec16 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -91,10 +91,10 @@ def metrics(): """Return current metrics in prometheus metrics format""" return prom.generate_latest() -# To make nginx proxying simpler, we want to allow /metrics/* to work -@app.route('/metrics/') +# To make nginx proxying simpler, we want to allow /metrics/restreamer to work +@app.route('/metrics/restreamer') @request_stats -def metrics_with_trailing(trailing): +def metrics_with_trailing(): """Expose Prometheus metrics.""" return prom.generate_latest()