Compare commits

...

8 Commits

Author SHA1 Message Date
Mike Lang 871a2e1909 Fix connection pool warnings by increasing pool size
in backfiller and downloader, the things making lots of outgoing http requests.

We want these larger sizes anyway to improve performance in downloader and backfiller.
2 months ago
Mike Lang 8eec87355c backfiller: Don't backfill segments that only differ from existing segment by 1ms
In the wild we've seen different servers get timestamps that differ by 1ms for segments
that are otherwise identical - same content, same duration.

The allowable fudge factor for segments is already 10ms, so having timing be 1ms different
between servers shouldn't cause any problems.
Worst case, there's a slight chance you'll get an adjacent frame when picking a cut point / thumbnail.
2 months ago
Mike Lang c13aa26726 Don't have restreamer respond with metrics to non-restreamer metrics requests 2 months ago
Mike Lang a0c0698446 playlist debug fix 2 months ago
Mike Lang ebc5afd58b Temporary debugging: Write out last 10 media playlists after fetching each segment 2 months ago
Mike Lang 37e225adab more debugging logs 2 months ago
Mike Lang 2ecd4e0a3e more metrics for tracking skew 2 months ago
Mike Lang 663449498c downloader: Re-connect when we see a time error over 0.01s
We have observed an issue on twitch where there will be a small time jump
(eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
and all subsequent segment timestamps will be out by this amount compared
to what other downloader instances see. Our workaround for this issue is to
watch for such gaps and:
1. trigger a worker refresh (which seems to fix the issue)
2. treat all subsequent segments as "suspect" so they are still saved
but only used if no other source is available.
2 months ago

@ -16,6 +16,7 @@ import argh
import gevent.backdoor
import gevent.pool
import prometheus_client as prom
from requests.adapters import HTTPAdapter
import common
from common import dateutil
@ -23,8 +24,10 @@ from common import database
from common.requests import InstrumentedSession
from common.segments import list_segment_files, unpadded_b64_decode
# Wraps all requests in some metric collection
# Wraps all requests in some metric collection and connection pooling
requests = InstrumentedSession()
adapter = HTTPAdapter(pool_maxsize=100)
requests.mount('https://', adapter)
segments_backfilled = prom.Counter(
'segments_backfilled',
@ -44,6 +47,12 @@ hash_mismatches = prom.Counter(
['remote', 'channel', 'quality', 'hour'],
)
small_difference_segments = prom.Gauge(
'small_difference_segments',
'Number of segments which were not pulled due to differing from existing segments by only a very small time difference',
['remote', 'channel', 'quality', 'hour'],
)
node_list_errors = prom.Counter(
'node_list_errors',
'Number of errors fetching a list of nodes',
@ -504,9 +513,21 @@ class BackfillerWorker(object):
# multiple workers request the same segment at the same time
random.shuffle(missing_segments)
if quality != 'chat':
MATCH_FIELDS = ("channel", "quality", "duration", "type", "hash")
EPSILON = 0.001
local_infos = []
for path in local_segments:
path = os.path.join(channel, quality, hour, path)
try:
local_infos.append(common.parse_segment_path(path))
except ValueError as e:
self.logger.warning('Local file {} could not be parsed: {}'.format(path, e))
pool = gevent.pool.Pool(self.download_concurrency)
workers = []
small_differences = 0
for missing_segment in missing_segments:
if self.stopping.is_set():
@ -542,6 +563,21 @@ class BackfillerWorker(object):
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff):
self.logger.debug('Skipping {} as too recent'.format(path))
continue
# if any local segment is within 1ms of the missing segment and otherwise identical, ignore it
found = None
for local_segment in local_infos:
# if any fields differ, no match
if not all(getattr(segment, field) == getattr(local_segment, field) for field in MATCH_FIELDS):
continue
# if time difference > epsilon, no match
if abs((segment.start - local_segment.start).total_seconds()) > EPSILON:
continue
found = local_segment
break
if found is not None:
self.logger.debug(f'Skipping {path} as within {EPSILON}s of identical segment {found.path}')
continue
# start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn(
@ -549,6 +585,8 @@ class BackfillerWorker(object):
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
))
small_difference_segments.labels(self.node, channel, quality, hour).set(small_differences)
# verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily.
for worker in workers:

@ -15,6 +15,7 @@ import gevent.backdoor
import gevent.event
import prometheus_client as prom
import requests
import requests.adapters
from monotonic import monotonic
import common
@ -49,6 +50,31 @@ ad_segments_ignored = prom.Counter(
["channel", "quality"],
)
suspicious_skew_count = prom.Counter(
"suspicious_skew_count",
"",
["channel", "quality"],
)
segment_time_skew = prom.Histogram(
"segment_time_skew",
"",
["channel", "quality", "worker"],
buckets=[-10, -1, -0.5, -0.1, -0.01, -0.001, 0, 0.001, 0.01, 0.1, 0.5, 1, 10],
)
segment_time_skew_non_zero_sum = prom.Gauge(
"segment_time_skew_non_zero_sum",
"",
["channel", "quality", "worker"],
)
segment_time_skew_non_zero_count = prom.Counter(
"segment_time_skew_non_zero_count",
"",
["channel", "quality", "worker"],
)
class TimedOutError(Exception):
pass
@ -290,6 +316,11 @@ class StreamWorker(object):
FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2
# Max difference between a segment's time + duration and the next segment's time (in seconds)
# before we consider this "suspicious" and trigger a refresh.
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__(self, manager, quality, url, url_time):
self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self)))
@ -305,6 +336,8 @@ class StreamWorker(object):
# This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong.
self.session = common.requests.InstrumentedSession()
adapter = requests.adapters.HTTPAdapter(pool_maxsize=100)
self.session.mount('https://', adapter)
# Map cache is a simple cache to avoid re-downloading the same map URI for every segment,
# since it's generally the same but may occasionally change.
# We expect the map data to be very small so there is no eviction here.
@ -347,12 +380,16 @@ class StreamWorker(object):
def _run(self):
first = True
suspicious_skew = False
history = []
HISTORY_SIZE = 10
while not self.stopping.is_set():
self.logger.debug("Getting media playlist {}".format(self.url))
try:
with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
playlist_time = datetime.datetime.utcnow()
raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
except Exception as e:
self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True)
self.trigger_new_worker()
@ -368,8 +405,11 @@ class StreamWorker(object):
# We successfully got the playlist at least once
first = False
history = [(playlist_time, raw_playlist)] + history[:HISTORY_SIZE]
# Start any new segment getters
date = None # tracks date in case some segment doesn't include it
prev_segment = None
for segment in playlist.segments:
if segment.ad_reason:
self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason))
@ -380,7 +420,27 @@ class StreamWorker(object):
self.manager.mark_working(self)
if segment.date:
date = common.dateutil.parse(segment.date)
new_date = common.dateutil.parse(segment.date)
if date is not None:
# We have observed an issue on twitch where there will be a small time jump
# (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
# and all subsequent segment timestamps will be out by this amount compared
# to what other downloader instances see. Our workaround for this issue is to
# watch for such gaps and:
# 1. trigger a worker refresh (which seems to fix the issue)
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = (date - new_date).total_seconds()
segment_time_skew.labels(self.manager.channel, self.quality, f"{id(self):x}").observe(skew)
if skew != 0:
segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew)
segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc()
if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew:
self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}")
self.trigger_new_worker()
suspicious_skew = True
suspicious_skew_count.labels(self.manager.channel, self.quality).inc()
date = new_date
if segment.uri not in self.getters:
if date is None:
raise ValueError("Cannot determine date of segment")
@ -392,11 +452,14 @@ class StreamWorker(object):
self.quality,
segment,
date,
suspicious_skew,
self.map_cache,
history,
)
gevent.spawn(self.getters[segment.uri].run)
if date is not None:
date += datetime.timedelta(seconds=segment.duration)
prev_segment = segment
# Clean up any old segment getters.
# Note use of list() to make a copy to avoid modification-during-iteration
@ -448,19 +511,21 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache):
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir
self.channel = channel
self.quality = quality
self.segment = segment
self.date = date
self.suspect = suspect
self.map_cache = map_cache
self.prefix = self.make_path_prefix()
self.retry = None # Event, set to begin retrying
self.done = gevent.event.Event() # set when file exists or we give up
# Our parent's connection pool, but we'll replace it if there's any issues
self.session = session
self.history = history
def run(self):
try:
@ -517,6 +582,12 @@ class SegmentGetter(object):
Type may be:
full: Segment is complete. Hash is included.
suspect: Segment appears to be complete, but we suspect it is not. Hash is included.
This currently triggers on two conditions:
- If a segment takes a very long time to download, which we've observed to result in
partial files even though they appeared to end correctly.
- If the StreamWorker has encountered a time gap, then we suspect this segment to be
mis-timed. We have observed this where there is a small (~0.5s) time jump, then
all segments are consistently off by that amount compared to other nodes until refresh.
partial: Segment is incomplete. Hash is included.
temp: Segment has not been downloaded yet. A random uuid is added.
"""
@ -601,7 +672,9 @@ class SegmentGetter(object):
raise e
else:
request_duration = monotonic() - start_time
segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect"
segment_type = "full"
if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME:
segment_type = "suspect"
full_path = self.make_path(segment_type, hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path)
@ -612,6 +685,19 @@ class SegmentGetter(object):
stat = latest_segment.labels(channel=self.channel, quality=self.quality)
timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds()
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe
self.write_history(full_path)
def write_history(self, segment_path):
segment_path = os.path.relpath(segment_path, self.base_dir)
history_path = os.path.join(self.base_dir, "playlist-debug", segment_path)
try:
os.makedirs(history_path)
except FileExistsError:
pass
for n, (timestamp, playlist) in enumerate(self.history):
filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"))
path = os.path.join(history_path, filename)
common.atomic_write(path, playlist)
def parse_channel(channel):

@ -29,7 +29,8 @@ class Provider:
session = InstrumentedSession()
resp = session.get(uri, metric_name='get_media_playlist')
resp.raise_for_status()
return hls_playlist.load(resp.text, base_uri=resp.url)
playlist = resp.text
return playlist, hls_playlist.load(playlist, base_uri=resp.url)
class URLProvider(Provider):

@ -91,10 +91,10 @@ def metrics():
"""Return current metrics in prometheus metrics format"""
return prom.generate_latest()
# To make nginx proxying simpler, we want to allow /metrics/* to work
@app.route('/metrics/<trailing>')
# To make nginx proxying simpler, we want to allow /metrics/restreamer to work
@app.route('/metrics/restreamer')
@request_stats
def metrics_with_trailing(trailing):
def metrics_with_trailing():
"""Expose Prometheus metrics."""
return prom.generate_latest()

Loading…
Cancel
Save