downloader: Re-connect when we see a time error over 0.01s

We have observed an issue on twitch where there will be a small time jump
(eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
and all subsequent segment timestamps will be out by this amount compared
to what other downloader instances see. Our workaround for this issue is to
watch for such gaps and:
1. trigger a worker refresh (which seems to fix the issue)
2. treat all subsequent segments as "suspect" so they are still saved
but only used if no other source is available.
Mike Lang 2 months ago
parent d7facca842
commit 663449498c

@ -49,6 +49,19 @@ ad_segments_ignored = prom.Counter(
["channel", "quality"], ["channel", "quality"],
) )
suspicious_skew_count = prom.Counter(
"suspicious_skew_count",
"",
["channel", "quality"],
)
segment_time_skew = prom.Histogram(
"segment_time_skew",
"",
["channel", "quality", "worker"],
buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10],
)
class TimedOutError(Exception): class TimedOutError(Exception):
pass pass
@ -290,6 +303,11 @@ class StreamWorker(object):
FETCH_RETRY_INTERVAL = 1 FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2 FETCH_POLL_INTERVAL = 2
# Max difference between a segment's time + duration and the next segment's time (in seconds)
# before we consider this "suspicious" and trigger a refresh.
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__(self, manager, quality, url, url_time): def __init__(self, manager, quality, url, url_time):
self.manager = manager self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self)))
@ -347,6 +365,7 @@ class StreamWorker(object):
def _run(self): def _run(self):
first = True first = True
suspicious_skew = False
while not self.stopping.is_set(): while not self.stopping.is_set():
self.logger.debug("Getting media playlist {}".format(self.url)) self.logger.debug("Getting media playlist {}".format(self.url))
@ -380,7 +399,23 @@ class StreamWorker(object):
self.manager.mark_working(self) self.manager.mark_working(self)
if segment.date: if segment.date:
date = common.dateutil.parse(segment.date) new_date = common.dateutil.parse(segment.date)
if date is not None:
# We have observed an issue on twitch where there will be a small time jump
# (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
# and all subsequent segment timestamps will be out by this amount compared
# to what other downloader instances see. Our workaround for this issue is to
# watch for such gaps and:
# 1. trigger a worker refresh (which seems to fix the issue)
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = (date - new_date).total_seconds()
segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew)
if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew:
self.trigger_new_worker()
suspicious_skew = True
suspicious_skew_count.labels(self.manager.channel, self.quality).inc()
date = new_date
if segment.uri not in self.getters: if segment.uri not in self.getters:
if date is None: if date is None:
raise ValueError("Cannot determine date of segment") raise ValueError("Cannot determine date of segment")
@ -392,6 +427,7 @@ class StreamWorker(object):
self.quality, self.quality,
segment, segment,
date, date,
suspicious_skew,
self.map_cache, self.map_cache,
) )
gevent.spawn(self.getters[segment.uri].run) gevent.spawn(self.getters[segment.uri].run)
@ -448,13 +484,14 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that. # or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60 GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir self.base_dir = base_dir
self.channel = channel self.channel = channel
self.quality = quality self.quality = quality
self.segment = segment self.segment = segment
self.date = date self.date = date
self.suspect = suspect
self.map_cache = map_cache self.map_cache = map_cache
self.prefix = self.make_path_prefix() self.prefix = self.make_path_prefix()
self.retry = None # Event, set to begin retrying self.retry = None # Event, set to begin retrying
@ -517,6 +554,12 @@ class SegmentGetter(object):
Type may be: Type may be:
full: Segment is complete. Hash is included. full: Segment is complete. Hash is included.
suspect: Segment appears to be complete, but we suspect it is not. Hash is included. suspect: Segment appears to be complete, but we suspect it is not. Hash is included.
This currently triggers on two conditions:
- If a segment takes a very long time to download, which we've observed to result in
partial files even though they appeared to end correctly.
- If the StreamWorker has encountered a time gap, then we suspect this segment to be
mis-timed. We have observed this where there is a small (~0.5s) time jump, then
all segments are consistently off by that amount compared to other nodes until refresh.
partial: Segment is incomplete. Hash is included. partial: Segment is incomplete. Hash is included.
temp: Segment has not been downloaded yet. A random uuid is added. temp: Segment has not been downloaded yet. A random uuid is added.
""" """
@ -601,7 +644,9 @@ class SegmentGetter(object):
raise e raise e
else: else:
request_duration = monotonic() - start_time request_duration = monotonic() - start_time
segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" segment_type = "full"
if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME:
segment_type = "suspect"
full_path = self.make_path(segment_type, hash) full_path = self.make_path(segment_type, hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path) common.rename(temp_path, full_path)

Loading…
Cancel
Save