Compare commits

...

2 Commits

Author SHA1 Message Date
Mike Lang 871a2e1909 Fix connection pool warnings by increasing pool size
in backfiller and downloader, the things making lots of outgoing http requests.

We want these larger sizes anyway to improve performance in downloader and backfiller.
2 months ago
Mike Lang 8eec87355c backfiller: Don't backfill segments that only differ from existing segment by 1ms
In the wild we've seen different servers get timestamps that differ by 1ms for segments
that are otherwise identical - same content, same duration.

The allowable fudge factor for segments is already 10ms, so having timing be 1ms different
between servers shouldn't cause any problems.
Worst case, there's a slight chance you'll get an adjacent frame when picking a cut point / thumbnail.
2 months ago

@ -16,6 +16,7 @@ import argh
import gevent.backdoor
import gevent.pool
import prometheus_client as prom
from requests.adapters import HTTPAdapter
import common
from common import dateutil
@ -23,8 +24,10 @@ from common import database
from common.requests import InstrumentedSession
from common.segments import list_segment_files, unpadded_b64_decode
# Wraps all requests in some metric collection
# Wraps all requests in some metric collection and connection pooling
requests = InstrumentedSession()
adapter = HTTPAdapter(pool_maxsize=100)
requests.mount('https://', adapter)
segments_backfilled = prom.Counter(
'segments_backfilled',
@ -44,6 +47,12 @@ hash_mismatches = prom.Counter(
['remote', 'channel', 'quality', 'hour'],
)
small_difference_segments = prom.Gauge(
'small_difference_segments',
'Number of segments which were not pulled due to differing from existing segments by only a very small time difference',
['remote', 'channel', 'quality', 'hour'],
)
node_list_errors = prom.Counter(
'node_list_errors',
'Number of errors fetching a list of nodes',
@ -504,9 +513,21 @@ class BackfillerWorker(object):
# multiple workers request the same segment at the same time
random.shuffle(missing_segments)
if quality != 'chat':
MATCH_FIELDS = ("channel", "quality", "duration", "type", "hash")
EPSILON = 0.001
local_infos = []
for path in local_segments:
path = os.path.join(channel, quality, hour, path)
try:
local_infos.append(common.parse_segment_path(path))
except ValueError as e:
self.logger.warning('Local file {} could not be parsed: {}'.format(path, e))
pool = gevent.pool.Pool(self.download_concurrency)
workers = []
small_differences = 0
for missing_segment in missing_segments:
if self.stopping.is_set():
@ -542,6 +563,21 @@ class BackfillerWorker(object):
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff):
self.logger.debug('Skipping {} as too recent'.format(path))
continue
# if any local segment is within 1ms of the missing segment and otherwise identical, ignore it
found = None
for local_segment in local_infos:
# if any fields differ, no match
if not all(getattr(segment, field) == getattr(local_segment, field) for field in MATCH_FIELDS):
continue
# if time difference > epsilon, no match
if abs((segment.start - local_segment.start).total_seconds()) > EPSILON:
continue
found = local_segment
break
if found is not None:
self.logger.debug(f'Skipping {path} as within {EPSILON}s of identical segment {found.path}')
continue
# start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn(
@ -549,6 +585,8 @@ class BackfillerWorker(object):
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
))
small_difference_segments.labels(self.node, channel, quality, hour).set(small_differences)
# verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily.
for worker in workers:

@ -15,6 +15,7 @@ import gevent.backdoor
import gevent.event
import prometheus_client as prom
import requests
import requests.adapters
from monotonic import monotonic
import common
@ -335,6 +336,8 @@ class StreamWorker(object):
# This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong.
self.session = common.requests.InstrumentedSession()
adapter = requests.adapters.HTTPAdapter(pool_maxsize=100)
self.session.mount('https://', adapter)
# Map cache is a simple cache to avoid re-downloading the same map URI for every segment,
# since it's generally the same but may occasionally change.
# We expect the map data to be very small so there is no eviction here.

Loading…
Cancel
Save