diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 9bdf41e..4598912 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -148,7 +148,8 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_file=None, node_database=None): + def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, + run_once=False, node_file=None, node_database=None, localhost=None): """Constructor for BackfillerManager. Creates a manager for a given stream with specified variants. If static_nodes is None, manager""" @@ -160,6 +161,7 @@ class BackfillerManager(object): self.run_once = run_once self.node_file = node_file self.node_database = node_database + self.localhost = localhost self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} @@ -228,6 +230,9 @@ class BackfillerManager(object): self.logger.info('Fetching list of nodes from {}'.format(self.node_database)) # query the database + if self.localhost is not None: + nodes = [node for node in nodes if self.localhost not in node] + return nodes class BackfillerWorker(object): @@ -336,7 +341,7 @@ class BackfillerWorker(object): def main(base_dir='.', streams='', variants='', metrics_port=8002, static_nodes='', backdoor_port=0, start=None, run_once=False, - node_file=None, node_database=None): + node_file=None, node_database=None, localhost=None): """Backfiller service.""" streams = streams.split(',') if variants else [] @@ -359,7 +364,7 @@ def main(base_dir='.', streams='', variants='', metrics_port=8002, workers = [] for stream in streams: logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) - manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database) + manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost) managers.append(manager) workers.append(gevent.spawn(manager.run))