Compare commits

..

5 Commits

Author SHA1 Message Date
Mike Lang 472164278b optionally enabled debugging: Write out last 10 media playlists after fetching each segment 3 weeks ago
Mike Lang e225b397eb downloader: Re-connect when we see a time error over 0.01s
We have observed an issue on twitch where there will be a small time jump
(eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
and all subsequent segment timestamps will be out by this amount compared
to what other downloader instances see. Our workaround for this issue is to
watch for such gaps and:
1. trigger a worker refresh (which seems to fix the issue)
2. treat all subsequent segments as "suspect" so they are still saved
but only used if no other source is available.
3 weeks ago
Mike Lang 753bea306a backfiller: Don't backfill segments that only differ from existing segment by 1ms
In the wild we've seen different servers get timestamps that differ by 1ms for segments
that are otherwise identical - same content, same duration.

The allowable fudge factor for segments is already 10ms, so having timing be 1ms different
between servers shouldn't cause any problems.
Worst case, there's a slight chance you'll get an adjacent frame when picking a cut point / thumbnail.
3 weeks ago
Mike Lang 316a19614c Fix connection pool warnings by increasing pool size
in backfiller and downloader, the things making lots of outgoing http requests.

We want these larger sizes anyway to improve performance in downloader and backfiller.
3 weeks ago
Mike Lang 19ad28d686 Don't have restreamer respond with metrics to non-restreamer metrics requests 3 weeks ago

@ -52,26 +52,19 @@ ad_segments_ignored = prom.Counter(
suspicious_skew_count = prom.Counter(
"suspicious_skew_count",
"",
"Number of times we've restarted a worker due to suspicious skew",
["channel", "quality"],
)
segment_time_skew = prom.Histogram(
"segment_time_skew",
"",
["channel", "quality", "worker"],
buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10],
)
segment_time_skew_non_zero_sum = prom.Gauge(
"segment_time_skew_non_zero_sum",
"",
"Sum of all observed segment skew amounts for worker",
["channel", "quality", "worker"],
)
segment_time_skew_non_zero_count = prom.Counter(
"segment_time_skew_non_zero_count",
"",
"Count of segments with non-zero skew for worker",
["channel", "quality", "worker"],
)
@ -148,7 +141,7 @@ class StreamsManager(object):
FETCH_TIMEOUTS = 5, 30
def __init__(self, provider, channel, base_dir, qualities, important=False):
def __init__(self, provider, channel, base_dir, qualities, important=False, history_size=0):
self.provider = provider
self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel))
@ -229,7 +222,7 @@ class StreamsManager(object):
self.logger.info("Ignoring worker start as we are stopping")
return
url_time, url = self.latest_urls[quality]
worker = StreamWorker(self, quality, url, url_time)
worker = StreamWorker(self, quality, url, url_time, self.history_size)
self.stream_workers[quality].append(worker)
gevent.spawn(worker.run)
@ -321,7 +314,7 @@ class StreamWorker(object):
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__(self, manager, quality, url, url_time):
def __init__(self, manager, quality, url, url_time, history_size):
self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self)))
self.quality = quality
@ -343,6 +336,9 @@ class StreamWorker(object):
# We expect the map data to be very small so there is no eviction here.
# {uri: data}
self.map_cache = {}
# If enabled, playlist history is saved after each segment fetch,
# showing the last N playlist fetches up until the one that resulted in that fetch.
self.history_size = history_size
def __repr__(self):
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
@ -382,7 +378,6 @@ class StreamWorker(object):
first = True
suspicious_skew = False
history = []
HISTORY_SIZE = 10
while not self.stopping.is_set():
self.logger.debug("Getting media playlist {}".format(self.url))
@ -405,7 +400,8 @@ class StreamWorker(object):
# We successfully got the playlist at least once
first = False
history = [(playlist_time, raw_playlist)] + history[:HISTORY_SIZE]
if self.history_size > 0:
history = [(playlist_time, raw_playlist)] + history[:self.history_size]
# Start any new segment getters
date = None # tracks date in case some segment doesn't include it
@ -431,7 +427,6 @@ class StreamWorker(object):
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = (date - new_date).total_seconds()
segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew)
if skew != 0:
segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew)
segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc()
@ -685,7 +680,8 @@ class SegmentGetter(object):
stat = latest_segment.labels(channel=self.channel, quality=self.quality)
timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds()
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe
self.write_history(full_path)
if self.history:
self.write_history(full_path)
def write_history(self, segment_path):
segment_path = os.path.relpath(segment_path, self.base_dir)
@ -716,7 +712,7 @@ def parse_channel(channel):
"This affects retry interval, error reporting and monitoring. "
"Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL"
)
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None):
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None, playlist_debug=0):
qualities = qualities.split(",") if qualities else []
twitch_auth_token = None
@ -737,7 +733,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
channel_qualities = ["source"]
else:
raise ValueError(f"Unknown type {type!r}")
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important)
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important, history_size=playlist_debug)
managers.append(manager)
def stop():

Loading…
Cancel
Save