From a8cb1ff37018c1e58566da19a9aa47ce031a771b Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Sun, 28 Apr 2019 22:58:04 +0100 Subject: [PATCH] fixed start not propagating to list_hours plus some refactorting --- backfiller/backfiller/main.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 36b42b9..40e9b38 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -165,7 +165,7 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, stream, variants, static_nodes=[], run_once=False): + def __init__(self, base_dir, stream, variants, static_nodes=[], start, run_once=False): """Constructor for BackfillerManager. Creates a manager for a given stream with specified variants. If static_nodes is None, manager""" @@ -173,11 +173,11 @@ class BackfillerManager(object): self.stream = stream self.variants = variants self.static_nodes = static_nodes + self.start = start self.run_once = run_once self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} - self.logger.info('Starting') def stop(self): """Shut down all workers and stop backfilling.""" @@ -188,7 +188,7 @@ class BackfillerManager(object): def start_worker(self, node): """Start a new worker for given node.""" - worker = BackfillerWorker(self, self.base_dir, node, self.stream, self.variants, self) + worker = BackfillerWorker(self, self.base_dir, node) assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node) self.workers[node] = worker gevent.spawn(worker.run) @@ -198,6 +198,7 @@ class BackfillerManager(object): self.workers.pop(node).stop() def run(self): + self.logger.info('Starting') while not self.stopping.is_set(): new_nodes = set(get_nodes() + self.static_nodes) exisiting_nodes = set(self.workers.keys()) @@ -229,19 +230,19 @@ class BackfillerWorker(object): WAIT_INTERVAL = 120 #seconds between backfills RETRY_INTERVAL = 5 #seconds between retrying a failed backfill - def __init__(self, manager, base_dir, node, stream, variants, run_once=False): + def __init__(self, manager, node): self.manager = manager self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node)) - self.base_dir = base_dir + self.base_dir = manager.base_dir self.node = node - self.stream = stream - self.variants = variants - self.run_once = run_once + self.stream = manager.stream + self.variants = manager.variants + self.start = manager.start + self.run_once = manager.run_once self.stopping = gevent.event.Event() self.done = gevent.event.Event() self.failures = 0 - self.logger.info('Starting') def __repr__(self): return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) @@ -299,12 +300,12 @@ class BackfillerWorker(object): self.logger.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour)) def run(self): - self.logger.info('Worker starting') + self.logger.info('Starting') while not self.stopping.is_set(): try: - self.backfill(list_hours(self.node, self.stream, self.variants)) + self.backfill(list_hours(self.node, self.stream, self.variants, self.start)) self.failures = 0 #reset failure count on a successful backfill self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) @@ -331,7 +332,7 @@ def main(base_dir='.', stream='', variants='', metrics_port=8002, if start is not None: start = dateutil.parser.parse(start) - manager = BackfillerManager(base_dir, stream, variants, static_nodes, run_once) + manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once) gevent.signal(signal.SIGTERM, manager.stop) common.PromLogCountsHandler.install()