Merge pull request #61 from ekimekim/mike/backfiller/concurrent

backfiller: Allow multiple concurrent segment downloads
pull/65/head
Mike Lang 5 years ago committed by GitHub
commit 612e34b88d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,6 +12,7 @@ import uuid
import argh
import gevent.backdoor
import gevent.pool
import prometheus_client as prom
import requests
@ -142,7 +143,8 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None,
run_once=False, node_file=None, node_database=None, localhost=None):
run_once=False, node_file=None, node_database=None, localhost=None,
download_concurrency=5):
"""Constructor for BackfillerManager.
Creates a manager for a given stream with specified variants. If
static_nodes is None, manager"""
@ -157,6 +159,7 @@ class BackfillerManager(object):
if self.node_database is not None:
self.db_manager = database.DBManager(dsn=self.node_database)
self.localhost = localhost
self.download_concurrency = download_concurrency
self.stopping = gevent.event.Event()
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
self.workers = {} # {node url: worker}
@ -282,6 +285,7 @@ class BackfillerWorker(object):
self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node))
self.base_dir = manager.base_dir
self.node = node
self.download_concurrency = manager.download_concurrency
self.stream = manager.stream
self.variants = manager.variants
self.start = manager.start
@ -312,7 +316,7 @@ class BackfillerWorker(object):
for hour in hours:
self.logger.info('Backfilling {}/{}'.format(variant, hour))
self.logger.debug('Backfilling {}/{}'.format(variant, hour))
local_segments = set(list_local_segments(self.base_dir, self.stream, variant, hour))
remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour))
@ -321,6 +325,9 @@ class BackfillerWorker(object):
# randomise the order of the segments to reduce the chance that
# multiple workers request the same segment at the same time
random.shuffle(missing_segments)
pool = gevent.pool.Pool(self.download_concurrency)
workers = []
for missing_segment in missing_segments:
@ -347,8 +354,18 @@ class BackfillerWorker(object):
self.logger.debug('Skipping {} as too recent'.format(path))
continue
get_remote_segment(self.base_dir, self.node, self.stream, variant, hour, missing_segment)
self.logger.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour))
# start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn(
get_remote_segment,
self.base_dir, self.node, self.stream, variant, hour, missing_segment
))
# verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily.
for worker in workers:
worker.get() # re-raise error, if any
self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), variant, hour))
def run(self):
self.logger.info('Starting')
@ -388,9 +405,11 @@ class BackfillerWorker(object):
@argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.")
@argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.')
@argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()')
@argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.')
def main(streams, base_dir='.', variants='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, run_once=False,
node_file=None, node_database=None, localhost=socket.gethostname()):
node_file=None, node_database=None, localhost=socket.gethostname(),
download_concurrency=5):
"""Backfiller service."""
variants = variants.split(',') if variants else []
@ -414,7 +433,7 @@ def main(streams, base_dir='.', variants='source', metrics_port=8002,
workers = []
for stream in streams:
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost)
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency)
managers.append(manager)
workers.append(gevent.spawn(manager.run))

Loading…
Cancel
Save