|
|
@ -148,7 +148,8 @@ class BackfillerManager(object):
|
|
|
|
|
|
|
|
|
|
|
|
NODE_INTERVAL = 300 #seconds between updating list of nodes
|
|
|
|
NODE_INTERVAL = 300 #seconds between updating list of nodes
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_file=None, node_database=None):
|
|
|
|
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None,
|
|
|
|
|
|
|
|
run_once=False, node_file=None, node_database=None, localhost=None):
|
|
|
|
"""Constructor for BackfillerManager.
|
|
|
|
"""Constructor for BackfillerManager.
|
|
|
|
Creates a manager for a given stream with specified variants. If
|
|
|
|
Creates a manager for a given stream with specified variants. If
|
|
|
|
static_nodes is None, manager"""
|
|
|
|
static_nodes is None, manager"""
|
|
|
@ -160,6 +161,7 @@ class BackfillerManager(object):
|
|
|
|
self.run_once = run_once
|
|
|
|
self.run_once = run_once
|
|
|
|
self.node_file = node_file
|
|
|
|
self.node_file = node_file
|
|
|
|
self.node_database = node_database
|
|
|
|
self.node_database = node_database
|
|
|
|
|
|
|
|
self.localhost = localhost
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
|
|
|
|
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
|
|
|
|
self.workers = {} # {node url: worker}
|
|
|
|
self.workers = {} # {node url: worker}
|
|
|
@ -228,6 +230,9 @@ class BackfillerManager(object):
|
|
|
|
self.logger.info('Fetching list of nodes from {}'.format(self.node_database))
|
|
|
|
self.logger.info('Fetching list of nodes from {}'.format(self.node_database))
|
|
|
|
# query the database
|
|
|
|
# query the database
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.localhost is not None:
|
|
|
|
|
|
|
|
nodes = [node for node in nodes if self.localhost not in node]
|
|
|
|
|
|
|
|
|
|
|
|
return nodes
|
|
|
|
return nodes
|
|
|
|
|
|
|
|
|
|
|
|
class BackfillerWorker(object):
|
|
|
|
class BackfillerWorker(object):
|
|
|
@ -336,7 +341,7 @@ class BackfillerWorker(object):
|
|
|
|
|
|
|
|
|
|
|
|
def main(base_dir='.', streams='', variants='', metrics_port=8002,
|
|
|
|
def main(base_dir='.', streams='', variants='', metrics_port=8002,
|
|
|
|
static_nodes='', backdoor_port=0, start=None, run_once=False,
|
|
|
|
static_nodes='', backdoor_port=0, start=None, run_once=False,
|
|
|
|
node_file=None, node_database=None):
|
|
|
|
node_file=None, node_database=None, localhost=None):
|
|
|
|
"""Backfiller service."""
|
|
|
|
"""Backfiller service."""
|
|
|
|
|
|
|
|
|
|
|
|
streams = streams.split(',') if variants else []
|
|
|
|
streams = streams.split(',') if variants else []
|
|
|
@ -359,7 +364,7 @@ def main(base_dir='.', streams='', variants='', metrics_port=8002,
|
|
|
|
workers = []
|
|
|
|
workers = []
|
|
|
|
for stream in streams:
|
|
|
|
for stream in streams:
|
|
|
|
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
|
|
|
|
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database)
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost)
|
|
|
|
managers.append(manager)
|
|
|
|
managers.append(manager)
|
|
|
|
workers.append(gevent.spawn(manager.run))
|
|
|
|
workers.append(gevent.spawn(manager.run))
|
|
|
|
|
|
|
|
|
|
|
|