|
|
|
@ -36,29 +36,6 @@ def encode_strings(o):
|
|
|
|
|
return o.encode('utf-8')
|
|
|
|
|
return o
|
|
|
|
|
|
|
|
|
|
def get_nodes(node_source=None):
|
|
|
|
|
"""List address of other wubloaders.
|
|
|
|
|
|
|
|
|
|
This returns a list of the other wubloaders as strings of the form
|
|
|
|
|
'protocol://host:port/'"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if node_source == 'file':
|
|
|
|
|
logging.info('Fetching list of other nodes from file')
|
|
|
|
|
# read a config file
|
|
|
|
|
# should I allow any filename?
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
elif node_source == 'database':
|
|
|
|
|
logging.info('Fetching list of other nodes from database')
|
|
|
|
|
# query the database
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
nodes = []
|
|
|
|
|
|
|
|
|
|
return nodes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_local_segments(base_dir, stream, variant, hour):
|
|
|
|
|
"""List segments in a given hour directory.
|
|
|
|
@ -171,7 +148,7 @@ class BackfillerManager(object):
|
|
|
|
|
|
|
|
|
|
NODE_INTERVAL = 300 #seconds between updating list of nodes
|
|
|
|
|
|
|
|
|
|
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_source=None):
|
|
|
|
|
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_file=None, node_database=None):
|
|
|
|
|
"""Constructor for BackfillerManager.
|
|
|
|
|
Creates a manager for a given stream with specified variants. If
|
|
|
|
|
static_nodes is None, manager"""
|
|
|
|
@ -181,7 +158,8 @@ class BackfillerManager(object):
|
|
|
|
|
self.static_nodes = static_nodes
|
|
|
|
|
self.start = start
|
|
|
|
|
self.run_once = run_once
|
|
|
|
|
self.node_source = node_source
|
|
|
|
|
self.node_file = node_file
|
|
|
|
|
self.node_database = node_database
|
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
|
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
|
|
|
|
|
self.workers = {} # {node url: worker}
|
|
|
|
@ -207,7 +185,7 @@ class BackfillerManager(object):
|
|
|
|
|
def run(self):
|
|
|
|
|
self.logger.info('Starting')
|
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
|
new_nodes = set(get_nodes(self.node_source) + self.static_nodes)
|
|
|
|
|
new_nodes = set(self.get_nodes())
|
|
|
|
|
exisiting_nodes = set(self.workers.keys())
|
|
|
|
|
to_start = new_nodes - exisiting_nodes
|
|
|
|
|
for node in to_start:
|
|
|
|
@ -227,6 +205,30 @@ class BackfillerManager(object):
|
|
|
|
|
for worker in self.workers.values():
|
|
|
|
|
worker.done.wait()
|
|
|
|
|
|
|
|
|
|
def get_nodes(self):
|
|
|
|
|
"""List address of other wubloaders.
|
|
|
|
|
|
|
|
|
|
This returns a list of the other wubloaders as strings of the form
|
|
|
|
|
'protocol://host:port/'"""
|
|
|
|
|
nodes = self.static_nodes + []
|
|
|
|
|
|
|
|
|
|
if self.node_file is not None:
|
|
|
|
|
self.logger.info('Fetching list of nodes from {}'.format(self.node_file))
|
|
|
|
|
try:
|
|
|
|
|
with open(self.node_file) as f:
|
|
|
|
|
for line in f.readlines():
|
|
|
|
|
substrs = line.split()
|
|
|
|
|
if not len(line) or substrs[0][0] == '#':
|
|
|
|
|
continue
|
|
|
|
|
nodes.append(substrs[0])
|
|
|
|
|
except IOError:
|
|
|
|
|
self.logger.info('{} not found'.format(self.node_file))
|
|
|
|
|
|
|
|
|
|
if self.node_database:
|
|
|
|
|
self.logger.info('Fetching list of nodes from {}'.format(self.node_database))
|
|
|
|
|
# query the database
|
|
|
|
|
|
|
|
|
|
return nodes
|
|
|
|
|
|
|
|
|
|
class BackfillerWorker(object):
|
|
|
|
|
"""Backfills segments from a node.
|
|
|
|
@ -328,12 +330,13 @@ class BackfillerWorker(object):
|
|
|
|
|
|
|
|
|
|
self.logger.info('Worker stopped')
|
|
|
|
|
self.done.set()
|
|
|
|
|
if self.node in self.manager.workers:
|
|
|
|
|
del self.manager.workers[self.node]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(base_dir='.', streams='', variants='', metrics_port=8002,
|
|
|
|
|
static_nodes='', backdoor_port=0, start=None, run_once=False,
|
|
|
|
|
node_source=None):
|
|
|
|
|
node_file=None, node_database=None):
|
|
|
|
|
"""Backfiller service."""
|
|
|
|
|
|
|
|
|
|
streams = streams.split(',') if variants else []
|
|
|
|
@ -356,7 +359,7 @@ def main(base_dir='.', streams='', variants='', metrics_port=8002,
|
|
|
|
|
workers = []
|
|
|
|
|
for stream in streams:
|
|
|
|
|
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
|
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_source)
|
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database)
|
|
|
|
|
managers.append(manager)
|
|
|
|
|
workers.append(gevent.spawn(manager.run))
|
|
|
|
|
|
|
|
|
|