backfiller: Allow multiple concurrent segment downloads

This will signifigantly increase throughput when downloading
large ranges of segments.

The max concurrency is exposed as a cli arg.

We also slightly modify the logged info, so it reports segments downloaded,
not just number of missing segments (which we might skip downloading for various reasons).
pull/61/head
Mike Lang 6 years ago
parent ec5a545fd2
commit 29040a166c

@ -12,6 +12,7 @@ import uuid
import argh import argh
import gevent.backdoor import gevent.backdoor
import gevent.pool
import prometheus_client as prom import prometheus_client as prom
import requests import requests
@ -142,7 +143,8 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, def __init__(self, base_dir, stream, variants, static_nodes=[], start=None,
run_once=False, node_file=None, node_database=None, localhost=None): run_once=False, node_file=None, node_database=None, localhost=None,
download_concurrency=5):
"""Constructor for BackfillerManager. """Constructor for BackfillerManager.
Creates a manager for a given stream with specified variants. If Creates a manager for a given stream with specified variants. If
static_nodes is None, manager""" static_nodes is None, manager"""
@ -157,6 +159,7 @@ class BackfillerManager(object):
if self.node_database is not None: if self.node_database is not None:
self.db_manager = database.DBManager(dsn=self.node_database) self.db_manager = database.DBManager(dsn=self.node_database)
self.localhost = localhost self.localhost = localhost
self.download_concurrency = download_concurrency
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.logger = logging.getLogger("BackfillerManager({})".format(stream))
self.workers = {} # {node url: worker} self.workers = {} # {node url: worker}
@ -282,6 +285,7 @@ class BackfillerWorker(object):
self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node)) self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node))
self.base_dir = manager.base_dir self.base_dir = manager.base_dir
self.node = node self.node = node
self.download_concurrency = manager.download_concurrency
self.stream = manager.stream self.stream = manager.stream
self.variants = manager.variants self.variants = manager.variants
self.start = manager.start self.start = manager.start
@ -312,7 +316,7 @@ class BackfillerWorker(object):
for hour in hours: for hour in hours:
self.logger.info('Backfilling {}/{}'.format(variant, hour)) self.logger.debug('Backfilling {}/{}'.format(variant, hour))
local_segments = set(list_local_segments(self.base_dir, self.stream, variant, hour)) local_segments = set(list_local_segments(self.base_dir, self.stream, variant, hour))
remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour)) remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour))
@ -322,6 +326,9 @@ class BackfillerWorker(object):
# multiple workers request the same segment at the same time # multiple workers request the same segment at the same time
random.shuffle(missing_segments) random.shuffle(missing_segments)
pool = gevent.pool.Pool(self.download_concurrency)
workers = []
for missing_segment in missing_segments: for missing_segment in missing_segments:
if self.stopping.is_set(): if self.stopping.is_set():
@ -347,8 +354,18 @@ class BackfillerWorker(object):
self.logger.debug('Skipping {} as too recent'.format(path)) self.logger.debug('Skipping {} as too recent'.format(path))
continue continue
get_remote_segment(self.base_dir, self.node, self.stream, variant, hour, missing_segment) # start segment as soon as a pool slot opens up, then track it in workers
self.logger.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour)) workers.append(pool.spawn(
get_remote_segment,
self.base_dir, self.node, self.stream, variant, hour, missing_segment
))
# verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily.
for worker in workers:
worker.get() # re-raise error, if any
self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), variant, hour))
def run(self): def run(self):
self.logger.info('Starting') self.logger.info('Starting')
@ -388,9 +405,11 @@ class BackfillerWorker(object):
@argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") @argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.")
@argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.') @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.')
@argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()') @argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()')
@argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.')
def main(streams, base_dir='.', variants='source', metrics_port=8002, def main(streams, base_dir='.', variants='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, run_once=False, static_nodes='', backdoor_port=0, start=None, run_once=False,
node_file=None, node_database=None, localhost=socket.gethostname()): node_file=None, node_database=None, localhost=socket.gethostname(),
download_concurrency=5):
"""Backfiller service.""" """Backfiller service."""
variants = variants.split(',') if variants else [] variants = variants.split(',') if variants else []
@ -414,7 +433,7 @@ def main(streams, base_dir='.', variants='source', metrics_port=8002,
workers = [] workers = []
for stream in streams: for stream in streams:
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost) manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency)
managers.append(manager) managers.append(manager)
workers.append(gevent.spawn(manager.run)) workers.append(gevent.spawn(manager.run))

Loading…
Cancel
Save