From 537be1a8aee76ffe0711ae89928aa0b8d78f20ad Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Mon, 20 May 2019 22:36:28 +0100 Subject: [PATCH] bug fixing after testing --- backfiller/backfiller/main.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 0dcafef..4a27411 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -156,7 +156,7 @@ def list_hours(node, stream, variants, start=None): if start is not None: if not isinstance(start, datetime.datetime): start = datetime.datetime.utcnow() - datetime.timedelta(hours=start) - hours = [hour for hour in hours if datetime.datetime.strptime(hour, HOUR_FMT) < start] + hours = [hour for hour in hours if datetime.datetime.strptime(hour, HOUR_FMT) > start] return hours @@ -181,7 +181,7 @@ class BackfillerManager(object): self.static_nodes = static_nodes self.start = start self.run_once = run_once - self.node_source + self.node_source = node_source self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} @@ -189,8 +189,8 @@ class BackfillerManager(object): def stop(self): """Shut down all workers and stop backfilling.""" self.logger.info('Stopping') - for worker in self.workers: - worker.stop() + for node in self.workers.keys(): + self.stop_worker(node) self.stopping.set() def start_worker(self, node): @@ -224,7 +224,7 @@ class BackfillerManager(object): else: self.stop() - for worker in self.workers: + for worker in self.workers.values(): worker.done.wait() @@ -314,7 +314,8 @@ class BackfillerWorker(object): try: self.backfill(list_hours(self.node, self.stream, self.variants, self.start)) self.failures = 0 #reset failure count on a successful backfill - self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) + if not self.run_once: + self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) except Exception: self.failures += 1 @@ -342,8 +343,10 @@ def main(base_dir='.', streams='', variants='', metrics_port=8002, if start is not None: try: start = float(start) + logging.info('Backfilling last {} hours'.format(start)) except ValueError: start = dateutil.parser.parse(start) + logging.info('Backfilling since {}'.format(start)) common.PromLogCountsHandler.install() common.install_stacksampler()