From e4364b75b153981db0ea5aa41f681795e791935c Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Tue, 7 May 2019 13:16:10 +0100 Subject: [PATCH] options to change where the node list is coming from --- backfiller/backfiller/main.py | 36 ++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 837c3df..e42251d 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -1,5 +1,5 @@ """Download segments from other nodes to catch stuff this node missed.""" -# TODO more logging, better exception handling +# TODO more logging import datetime import errno @@ -36,19 +36,27 @@ def encode_strings(o): return o.encode('utf-8') return o -def get_nodes(): +def get_nodes(node_source=None): """List address of other wubloaders. This returns a list of the other wubloaders as strings of the form 'protocol://host:port/'""" - # either read a config file or query the database to get the addresses - # of the other nodes - # figure out some way that the local machine isn't in the list of returned - # nodes - logging.info('Fetching list of other nodes') - nodes = [] + if node_source == 'file': + logging.info('Fetching list of other nodes from file') + # read a config file + # should I allow any filename? + pass + + elif node_source == 'database': + logging.info('Fetching list of other nodes from database') + # query the database + pass + + else: + nodes = [] + return nodes @@ -163,7 +171,7 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False): + def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_source=None): """Constructor for BackfillerManager. Creates a manager for a given stream with specified variants. If static_nodes is None, manager""" @@ -173,6 +181,7 @@ class BackfillerManager(object): self.static_nodes = static_nodes self.start = start self.run_once = run_once + self.node_source self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} @@ -186,7 +195,7 @@ class BackfillerManager(object): def start_worker(self, node): """Start a new worker for given node.""" - worker = BackfillerWorker(self, self.base_dir, node) + worker = BackfillerWorker(self, node) assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node) self.workers[node] = worker gevent.spawn(worker.run) @@ -198,7 +207,7 @@ class BackfillerManager(object): def run(self): self.logger.info('Starting') while not self.stopping.is_set(): - new_nodes = set(get_nodes() + self.static_nodes) + new_nodes = set(get_nodes(self.node_source) + self.static_nodes) exisiting_nodes = set(self.workers.keys()) to_start = new_nodes - exisiting_nodes for node in to_start: @@ -322,7 +331,8 @@ class BackfillerWorker(object): def main(base_dir='.', stream='', variants='', metrics_port=8002, - static_nodes='', backdoor_port=0, start=None, run_once=False): + static_nodes='', backdoor_port=0, start=None, run_once=False, + node_source=None): """Backfiller service.""" variants = variants.split(',') if variants else [] @@ -334,7 +344,7 @@ def main(base_dir='.', stream='', variants='', metrics_port=8002, except ValueError: start = dateutil.parser.parse(start) - manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once) + manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_source) gevent.signal(signal.SIGTERM, manager.stop) common.PromLogCountsHandler.install()