|
|
@ -1,5 +1,5 @@
|
|
|
|
"""Download segments from other nodes to catch stuff this node missed."""
|
|
|
|
"""Download segments from other nodes to catch stuff this node missed."""
|
|
|
|
# TODO more logging, better exception handling
|
|
|
|
# TODO more logging
|
|
|
|
|
|
|
|
|
|
|
|
import datetime
|
|
|
|
import datetime
|
|
|
|
import errno
|
|
|
|
import errno
|
|
|
@ -36,19 +36,27 @@ def encode_strings(o):
|
|
|
|
return o.encode('utf-8')
|
|
|
|
return o.encode('utf-8')
|
|
|
|
return o
|
|
|
|
return o
|
|
|
|
|
|
|
|
|
|
|
|
def get_nodes():
|
|
|
|
def get_nodes(node_source=None):
|
|
|
|
"""List address of other wubloaders.
|
|
|
|
"""List address of other wubloaders.
|
|
|
|
|
|
|
|
|
|
|
|
This returns a list of the other wubloaders as strings of the form
|
|
|
|
This returns a list of the other wubloaders as strings of the form
|
|
|
|
'protocol://host:port/'"""
|
|
|
|
'protocol://host:port/'"""
|
|
|
|
# either read a config file or query the database to get the addresses
|
|
|
|
|
|
|
|
# of the other nodes
|
|
|
|
|
|
|
|
# figure out some way that the local machine isn't in the list of returned
|
|
|
|
|
|
|
|
# nodes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.info('Fetching list of other nodes')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
nodes = []
|
|
|
|
if node_source == 'file':
|
|
|
|
|
|
|
|
logging.info('Fetching list of other nodes from file')
|
|
|
|
|
|
|
|
# read a config file
|
|
|
|
|
|
|
|
# should I allow any filename?
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif node_source == 'database':
|
|
|
|
|
|
|
|
logging.info('Fetching list of other nodes from database')
|
|
|
|
|
|
|
|
# query the database
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
nodes = []
|
|
|
|
|
|
|
|
|
|
|
|
return nodes
|
|
|
|
return nodes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -163,7 +171,7 @@ class BackfillerManager(object):
|
|
|
|
|
|
|
|
|
|
|
|
NODE_INTERVAL = 300 #seconds between updating list of nodes
|
|
|
|
NODE_INTERVAL = 300 #seconds between updating list of nodes
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False):
|
|
|
|
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_source=None):
|
|
|
|
"""Constructor for BackfillerManager.
|
|
|
|
"""Constructor for BackfillerManager.
|
|
|
|
Creates a manager for a given stream with specified variants. If
|
|
|
|
Creates a manager for a given stream with specified variants. If
|
|
|
|
static_nodes is None, manager"""
|
|
|
|
static_nodes is None, manager"""
|
|
|
@ -173,6 +181,7 @@ class BackfillerManager(object):
|
|
|
|
self.static_nodes = static_nodes
|
|
|
|
self.static_nodes = static_nodes
|
|
|
|
self.start = start
|
|
|
|
self.start = start
|
|
|
|
self.run_once = run_once
|
|
|
|
self.run_once = run_once
|
|
|
|
|
|
|
|
self.node_source
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
|
|
|
|
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
|
|
|
|
self.workers = {} # {node url: worker}
|
|
|
|
self.workers = {} # {node url: worker}
|
|
|
@ -186,7 +195,7 @@ class BackfillerManager(object):
|
|
|
|
|
|
|
|
|
|
|
|
def start_worker(self, node):
|
|
|
|
def start_worker(self, node):
|
|
|
|
"""Start a new worker for given node."""
|
|
|
|
"""Start a new worker for given node."""
|
|
|
|
worker = BackfillerWorker(self, self.base_dir, node)
|
|
|
|
worker = BackfillerWorker(self, node)
|
|
|
|
assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node)
|
|
|
|
assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node)
|
|
|
|
self.workers[node] = worker
|
|
|
|
self.workers[node] = worker
|
|
|
|
gevent.spawn(worker.run)
|
|
|
|
gevent.spawn(worker.run)
|
|
|
@ -198,7 +207,7 @@ class BackfillerManager(object):
|
|
|
|
def run(self):
|
|
|
|
def run(self):
|
|
|
|
self.logger.info('Starting')
|
|
|
|
self.logger.info('Starting')
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
new_nodes = set(get_nodes() + self.static_nodes)
|
|
|
|
new_nodes = set(get_nodes(self.node_source) + self.static_nodes)
|
|
|
|
exisiting_nodes = set(self.workers.keys())
|
|
|
|
exisiting_nodes = set(self.workers.keys())
|
|
|
|
to_start = new_nodes - exisiting_nodes
|
|
|
|
to_start = new_nodes - exisiting_nodes
|
|
|
|
for node in to_start:
|
|
|
|
for node in to_start:
|
|
|
@ -322,7 +331,8 @@ class BackfillerWorker(object):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(base_dir='.', stream='', variants='', metrics_port=8002,
|
|
|
|
def main(base_dir='.', stream='', variants='', metrics_port=8002,
|
|
|
|
static_nodes='', backdoor_port=0, start=None, run_once=False):
|
|
|
|
static_nodes='', backdoor_port=0, start=None, run_once=False,
|
|
|
|
|
|
|
|
node_source=None):
|
|
|
|
"""Backfiller service."""
|
|
|
|
"""Backfiller service."""
|
|
|
|
|
|
|
|
|
|
|
|
variants = variants.split(',') if variants else []
|
|
|
|
variants = variants.split(',') if variants else []
|
|
|
@ -334,7 +344,7 @@ def main(base_dir='.', stream='', variants='', metrics_port=8002,
|
|
|
|
except ValueError:
|
|
|
|
except ValueError:
|
|
|
|
start = dateutil.parser.parse(start)
|
|
|
|
start = dateutil.parser.parse(start)
|
|
|
|
|
|
|
|
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once)
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_source)
|
|
|
|
gevent.signal(signal.SIGTERM, manager.stop)
|
|
|
|
gevent.signal(signal.SIGTERM, manager.stop)
|
|
|
|
|
|
|
|
|
|
|
|
common.PromLogCountsHandler.install()
|
|
|
|
common.PromLogCountsHandler.install()
|
|
|
|