diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 4a27411..9bdf41e 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -36,29 +36,6 @@ def encode_strings(o): return o.encode('utf-8') return o -def get_nodes(node_source=None): - """List address of other wubloaders. - - This returns a list of the other wubloaders as strings of the form - 'protocol://host:port/'""" - - - if node_source == 'file': - logging.info('Fetching list of other nodes from file') - # read a config file - # should I allow any filename? - pass - - elif node_source == 'database': - logging.info('Fetching list of other nodes from database') - # query the database - pass - - else: - nodes = [] - - return nodes - def list_local_segments(base_dir, stream, variant, hour): """List segments in a given hour directory. @@ -171,7 +148,7 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_source=None): + def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_file=None, node_database=None): """Constructor for BackfillerManager. Creates a manager for a given stream with specified variants. If static_nodes is None, manager""" @@ -181,7 +158,8 @@ class BackfillerManager(object): self.static_nodes = static_nodes self.start = start self.run_once = run_once - self.node_source = node_source + self.node_file = node_file + self.node_database = node_database self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} @@ -207,7 +185,7 @@ class BackfillerManager(object): def run(self): self.logger.info('Starting') while not self.stopping.is_set(): - new_nodes = set(get_nodes(self.node_source) + self.static_nodes) + new_nodes = set(self.get_nodes()) exisiting_nodes = set(self.workers.keys()) to_start = new_nodes - exisiting_nodes for node in to_start: @@ -227,6 +205,30 @@ class BackfillerManager(object): for worker in self.workers.values(): worker.done.wait() + def get_nodes(self): + """List address of other wubloaders. + + This returns a list of the other wubloaders as strings of the form + 'protocol://host:port/'""" + nodes = self.static_nodes + [] + + if self.node_file is not None: + self.logger.info('Fetching list of nodes from {}'.format(self.node_file)) + try: + with open(self.node_file) as f: + for line in f.readlines(): + substrs = line.split() + if not len(line) or substrs[0][0] == '#': + continue + nodes.append(substrs[0]) + except IOError: + self.logger.info('{} not found'.format(self.node_file)) + + if self.node_database: + self.logger.info('Fetching list of nodes from {}'.format(self.node_database)) + # query the database + + return nodes class BackfillerWorker(object): """Backfills segments from a node. @@ -328,12 +330,13 @@ class BackfillerWorker(object): self.logger.info('Worker stopped') self.done.set() - del self.manager.workers[self.node] + if self.node in self.manager.workers: + del self.manager.workers[self.node] def main(base_dir='.', streams='', variants='', metrics_port=8002, static_nodes='', backdoor_port=0, start=None, run_once=False, - node_source=None): + node_file=None, node_database=None): """Backfiller service.""" streams = streams.split(',') if variants else [] @@ -356,7 +359,7 @@ def main(base_dir='.', streams='', variants='', metrics_port=8002, workers = [] for stream in streams: logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) - manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_source) + manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database) managers.append(manager) workers.append(gevent.spawn(manager.run))