Compare commits

...

5 Commits

Author SHA1 Message Date
Mike Lang 472164278b optionally enabled debugging: Write out last 10 media playlists after fetching each segment 3 weeks ago
Mike Lang e225b397eb downloader: Re-connect when we see a time error over 0.01s
We have observed an issue on twitch where there will be a small time jump
(eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
and all subsequent segment timestamps will be out by this amount compared
to what other downloader instances see. Our workaround for this issue is to
watch for such gaps and:
1. trigger a worker refresh (which seems to fix the issue)
2. treat all subsequent segments as "suspect" so they are still saved
but only used if no other source is available.
3 weeks ago
Mike Lang 753bea306a backfiller: Don't backfill segments that only differ from existing segment by 1ms
In the wild we've seen different servers get timestamps that differ by 1ms for segments
that are otherwise identical - same content, same duration.

The allowable fudge factor for segments is already 10ms, so having timing be 1ms different
between servers shouldn't cause any problems.
Worst case, there's a slight chance you'll get an adjacent frame when picking a cut point / thumbnail.
3 weeks ago
Mike Lang 316a19614c Fix connection pool warnings by increasing pool size
in backfiller and downloader, the things making lots of outgoing http requests.

We want these larger sizes anyway to improve performance in downloader and backfiller.
3 weeks ago
Mike Lang 19ad28d686 Don't have restreamer respond with metrics to non-restreamer metrics requests 3 weeks ago

@ -16,6 +16,7 @@ import argh
import gevent.backdoor import gevent.backdoor
import gevent.pool import gevent.pool
import prometheus_client as prom import prometheus_client as prom
from requests.adapters import HTTPAdapter
import common import common
from common import dateutil from common import dateutil
@ -23,8 +24,10 @@ from common import database
from common.requests import InstrumentedSession from common.requests import InstrumentedSession
from common.segments import list_segment_files, unpadded_b64_decode from common.segments import list_segment_files, unpadded_b64_decode
# Wraps all requests in some metric collection # Wraps all requests in some metric collection and connection pooling
requests = InstrumentedSession() requests = InstrumentedSession()
adapter = HTTPAdapter(pool_maxsize=100)
requests.mount('https://', adapter)
segments_backfilled = prom.Counter( segments_backfilled = prom.Counter(
'segments_backfilled', 'segments_backfilled',
@ -44,6 +47,12 @@ hash_mismatches = prom.Counter(
['remote', 'channel', 'quality', 'hour'], ['remote', 'channel', 'quality', 'hour'],
) )
small_difference_segments = prom.Gauge(
'small_difference_segments',
'Number of segments which were not pulled due to differing from existing segments by only a very small time difference',
['remote', 'channel', 'quality', 'hour'],
)
node_list_errors = prom.Counter( node_list_errors = prom.Counter(
'node_list_errors', 'node_list_errors',
'Number of errors fetching a list of nodes', 'Number of errors fetching a list of nodes',
@ -504,9 +513,21 @@ class BackfillerWorker(object):
# multiple workers request the same segment at the same time # multiple workers request the same segment at the same time
random.shuffle(missing_segments) random.shuffle(missing_segments)
if quality != 'chat':
MATCH_FIELDS = ("channel", "quality", "duration", "type", "hash")
EPSILON = 0.001
local_infos = []
for path in local_segments:
path = os.path.join(channel, quality, hour, path)
try:
local_infos.append(common.parse_segment_path(path))
except ValueError as e:
self.logger.warning('Local file {} could not be parsed: {}'.format(path, e))
pool = gevent.pool.Pool(self.download_concurrency) pool = gevent.pool.Pool(self.download_concurrency)
workers = [] workers = []
small_differences = 0
for missing_segment in missing_segments: for missing_segment in missing_segments:
if self.stopping.is_set(): if self.stopping.is_set():
@ -542,6 +563,21 @@ class BackfillerWorker(object):
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff): if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff):
self.logger.debug('Skipping {} as too recent'.format(path)) self.logger.debug('Skipping {} as too recent'.format(path))
continue continue
# if any local segment is within 1ms of the missing segment and otherwise identical, ignore it
found = None
for local_segment in local_infos:
# if any fields differ, no match
if not all(getattr(segment, field) == getattr(local_segment, field) for field in MATCH_FIELDS):
continue
# if time difference > epsilon, no match
if abs((segment.start - local_segment.start).total_seconds()) > EPSILON:
continue
found = local_segment
break
if found is not None:
self.logger.debug(f'Skipping {path} as within {EPSILON}s of identical segment {found.path}')
continue
# start segment as soon as a pool slot opens up, then track it in workers # start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn( workers.append(pool.spawn(
@ -549,6 +585,8 @@ class BackfillerWorker(object):
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
)) ))
small_difference_segments.labels(self.node, channel, quality, hour).set(small_differences)
# verify that all the workers succeeded. if any failed, raise the exception from # verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily. # one of them arbitrarily.
for worker in workers: for worker in workers:

@ -15,6 +15,7 @@ import gevent.backdoor
import gevent.event import gevent.event
import prometheus_client as prom import prometheus_client as prom
import requests import requests
import requests.adapters
from monotonic import monotonic from monotonic import monotonic
import common import common
@ -49,6 +50,24 @@ ad_segments_ignored = prom.Counter(
["channel", "quality"], ["channel", "quality"],
) )
suspicious_skew_count = prom.Counter(
"suspicious_skew_count",
"Number of times we've restarted a worker due to suspicious skew",
["channel", "quality"],
)
segment_time_skew_non_zero_sum = prom.Gauge(
"segment_time_skew_non_zero_sum",
"Sum of all observed segment skew amounts for worker",
["channel", "quality", "worker"],
)
segment_time_skew_non_zero_count = prom.Counter(
"segment_time_skew_non_zero_count",
"Count of segments with non-zero skew for worker",
["channel", "quality", "worker"],
)
class TimedOutError(Exception): class TimedOutError(Exception):
pass pass
@ -122,7 +141,7 @@ class StreamsManager(object):
FETCH_TIMEOUTS = 5, 30 FETCH_TIMEOUTS = 5, 30
def __init__(self, provider, channel, base_dir, qualities, important=False): def __init__(self, provider, channel, base_dir, qualities, important=False, history_size=0):
self.provider = provider self.provider = provider
self.channel = channel self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.logger = logging.getLogger("StreamsManager({})".format(channel))
@ -203,7 +222,7 @@ class StreamsManager(object):
self.logger.info("Ignoring worker start as we are stopping") self.logger.info("Ignoring worker start as we are stopping")
return return
url_time, url = self.latest_urls[quality] url_time, url = self.latest_urls[quality]
worker = StreamWorker(self, quality, url, url_time) worker = StreamWorker(self, quality, url, url_time, self.history_size)
self.stream_workers[quality].append(worker) self.stream_workers[quality].append(worker)
gevent.spawn(worker.run) gevent.spawn(worker.run)
@ -290,7 +309,12 @@ class StreamWorker(object):
FETCH_RETRY_INTERVAL = 1 FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2 FETCH_POLL_INTERVAL = 2
def __init__(self, manager, quality, url, url_time): # Max difference between a segment's time + duration and the next segment's time (in seconds)
# before we consider this "suspicious" and trigger a refresh.
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__(self, manager, quality, url, url_time, history_size):
self.manager = manager self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self)))
self.quality = quality self.quality = quality
@ -305,11 +329,16 @@ class StreamWorker(object):
# This worker's SegmentGetters will use its session by default for performance, # This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong. # but will fall back to a new one if something goes wrong.
self.session = common.requests.InstrumentedSession() self.session = common.requests.InstrumentedSession()
adapter = requests.adapters.HTTPAdapter(pool_maxsize=100)
self.session.mount('https://', adapter)
# Map cache is a simple cache to avoid re-downloading the same map URI for every segment, # Map cache is a simple cache to avoid re-downloading the same map URI for every segment,
# since it's generally the same but may occasionally change. # since it's generally the same but may occasionally change.
# We expect the map data to be very small so there is no eviction here. # We expect the map data to be very small so there is no eviction here.
# {uri: data} # {uri: data}
self.map_cache = {} self.map_cache = {}
# If enabled, playlist history is saved after each segment fetch,
# showing the last N playlist fetches up until the one that resulted in that fetch.
self.history_size = history_size
def __repr__(self): def __repr__(self):
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
@ -347,12 +376,15 @@ class StreamWorker(object):
def _run(self): def _run(self):
first = True first = True
suspicious_skew = False
history = []
while not self.stopping.is_set(): while not self.stopping.is_set():
self.logger.debug("Getting media playlist {}".format(self.url)) self.logger.debug("Getting media playlist {}".format(self.url))
try: try:
with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) playlist_time = datetime.datetime.utcnow()
raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
except Exception as e: except Exception as e:
self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True)
self.trigger_new_worker() self.trigger_new_worker()
@ -368,8 +400,12 @@ class StreamWorker(object):
# We successfully got the playlist at least once # We successfully got the playlist at least once
first = False first = False
if self.history_size > 0:
history = [(playlist_time, raw_playlist)] + history[:self.history_size]
# Start any new segment getters # Start any new segment getters
date = None # tracks date in case some segment doesn't include it date = None # tracks date in case some segment doesn't include it
prev_segment = None
for segment in playlist.segments: for segment in playlist.segments:
if segment.ad_reason: if segment.ad_reason:
self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason)) self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason))
@ -380,7 +416,26 @@ class StreamWorker(object):
self.manager.mark_working(self) self.manager.mark_working(self)
if segment.date: if segment.date:
date = common.dateutil.parse(segment.date) new_date = common.dateutil.parse(segment.date)
if date is not None:
# We have observed an issue on twitch where there will be a small time jump
# (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
# and all subsequent segment timestamps will be out by this amount compared
# to what other downloader instances see. Our workaround for this issue is to
# watch for such gaps and:
# 1. trigger a worker refresh (which seems to fix the issue)
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = (date - new_date).total_seconds()
if skew != 0:
segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew)
segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc()
if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew:
self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}")
self.trigger_new_worker()
suspicious_skew = True
suspicious_skew_count.labels(self.manager.channel, self.quality).inc()
date = new_date
if segment.uri not in self.getters: if segment.uri not in self.getters:
if date is None: if date is None:
raise ValueError("Cannot determine date of segment") raise ValueError("Cannot determine date of segment")
@ -392,11 +447,14 @@ class StreamWorker(object):
self.quality, self.quality,
segment, segment,
date, date,
suspicious_skew,
self.map_cache, self.map_cache,
history,
) )
gevent.spawn(self.getters[segment.uri].run) gevent.spawn(self.getters[segment.uri].run)
if date is not None: if date is not None:
date += datetime.timedelta(seconds=segment.duration) date += datetime.timedelta(seconds=segment.duration)
prev_segment = segment
# Clean up any old segment getters. # Clean up any old segment getters.
# Note use of list() to make a copy to avoid modification-during-iteration # Note use of list() to make a copy to avoid modification-during-iteration
@ -448,19 +506,21 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that. # or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60 GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir self.base_dir = base_dir
self.channel = channel self.channel = channel
self.quality = quality self.quality = quality
self.segment = segment self.segment = segment
self.date = date self.date = date
self.suspect = suspect
self.map_cache = map_cache self.map_cache = map_cache
self.prefix = self.make_path_prefix() self.prefix = self.make_path_prefix()
self.retry = None # Event, set to begin retrying self.retry = None # Event, set to begin retrying
self.done = gevent.event.Event() # set when file exists or we give up self.done = gevent.event.Event() # set when file exists or we give up
# Our parent's connection pool, but we'll replace it if there's any issues # Our parent's connection pool, but we'll replace it if there's any issues
self.session = session self.session = session
self.history = history
def run(self): def run(self):
try: try:
@ -517,6 +577,12 @@ class SegmentGetter(object):
Type may be: Type may be:
full: Segment is complete. Hash is included. full: Segment is complete. Hash is included.
suspect: Segment appears to be complete, but we suspect it is not. Hash is included. suspect: Segment appears to be complete, but we suspect it is not. Hash is included.
This currently triggers on two conditions:
- If a segment takes a very long time to download, which we've observed to result in
partial files even though they appeared to end correctly.
- If the StreamWorker has encountered a time gap, then we suspect this segment to be
mis-timed. We have observed this where there is a small (~0.5s) time jump, then
all segments are consistently off by that amount compared to other nodes until refresh.
partial: Segment is incomplete. Hash is included. partial: Segment is incomplete. Hash is included.
temp: Segment has not been downloaded yet. A random uuid is added. temp: Segment has not been downloaded yet. A random uuid is added.
""" """
@ -601,7 +667,9 @@ class SegmentGetter(object):
raise e raise e
else: else:
request_duration = monotonic() - start_time request_duration = monotonic() - start_time
segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" segment_type = "full"
if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME:
segment_type = "suspect"
full_path = self.make_path(segment_type, hash) full_path = self.make_path(segment_type, hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path) common.rename(temp_path, full_path)
@ -612,6 +680,20 @@ class SegmentGetter(object):
stat = latest_segment.labels(channel=self.channel, quality=self.quality) stat = latest_segment.labels(channel=self.channel, quality=self.quality)
timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds()
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe
if self.history:
self.write_history(full_path)
def write_history(self, segment_path):
segment_path = os.path.relpath(segment_path, self.base_dir)
history_path = os.path.join(self.base_dir, "playlist-debug", segment_path)
try:
os.makedirs(history_path)
except FileExistsError:
pass
for n, (timestamp, playlist) in enumerate(self.history):
filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"))
path = os.path.join(history_path, filename)
common.atomic_write(path, playlist)
def parse_channel(channel): def parse_channel(channel):
@ -630,7 +712,7 @@ def parse_channel(channel):
"This affects retry interval, error reporting and monitoring. " "This affects retry interval, error reporting and monitoring. "
"Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL" "Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL"
) )
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None): def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None, playlist_debug=0):
qualities = qualities.split(",") if qualities else [] qualities = qualities.split(",") if qualities else []
twitch_auth_token = None twitch_auth_token = None
@ -651,7 +733,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
channel_qualities = ["source"] channel_qualities = ["source"]
else: else:
raise ValueError(f"Unknown type {type!r}") raise ValueError(f"Unknown type {type!r}")
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important) manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important, history_size=playlist_debug)
managers.append(manager) managers.append(manager)
def stop(): def stop():

@ -29,7 +29,8 @@ class Provider:
session = InstrumentedSession() session = InstrumentedSession()
resp = session.get(uri, metric_name='get_media_playlist') resp = session.get(uri, metric_name='get_media_playlist')
resp.raise_for_status() resp.raise_for_status()
return hls_playlist.load(resp.text, base_uri=resp.url) playlist = resp.text
return playlist, hls_playlist.load(playlist, base_uri=resp.url)
class URLProvider(Provider): class URLProvider(Provider):

@ -91,10 +91,10 @@ def metrics():
"""Return current metrics in prometheus metrics format""" """Return current metrics in prometheus metrics format"""
return prom.generate_latest() return prom.generate_latest()
# To make nginx proxying simpler, we want to allow /metrics/* to work # To make nginx proxying simpler, we want to allow /metrics/restreamer to work
@app.route('/metrics/<trailing>') @app.route('/metrics/restreamer')
@request_stats @request_stats
def metrics_with_trailing(trailing): def metrics_with_trailing():
"""Expose Prometheus metrics.""" """Expose Prometheus metrics."""
return prom.generate_latest() return prom.generate_latest()

Loading…
Cancel
Save