From c65eb2c565a3dd586ad7fecbb9eee2661f9883a7 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Sun, 27 Jan 2019 20:42:56 +0000 Subject: [PATCH] added workers and a worker manager --- backfiller/backfiller/main.py | 133 +++++++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index f537326..0e26f0e 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -187,7 +187,6 @@ def list_hours(node, stream, variants, order='forward', start=None): return hours - def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60): """Backfill from remote node. @@ -243,6 +242,138 @@ def backfill(base_dir, node, stream, variants, hours, segment_order='random', re logging.info('Finished backfilling from {}'.format(node)) +class BackfillerManager(object): + """Manages BackfillerWorkers to backfill from a pool of nodes. + + The manager regularly calls get_nodes to an up to date list of nodes. If no + worker exists for a node, the manager starts one. If a worker corresponds to + a node not in the list, the manager stops it.""" + + NODE_INTERVAL = 5 #minutes between updating list of nodes + + def __init__(self, base_dir, stream, variants, input_nodes=None): + """Constructor for BackfillerManager.""" + self.base_dir = base_dir + self.stream = stream + self.variants = variants + self.stopping = gevent.event.Event() + self.workers = {} # {node url: worker} + + + def stop(self): + """Shut down all workers and stop backfilling.""" + self.logger.info('Stopping') + self.stopping.set() + + + def start_worker(self, node): + """Start a new worker for given node.""" + worker = BackfillerWorker(self, self.base_dir, node, self.stream, self.variants) + if node in self.workers: + self.workers[node].stop() #only one worker per node + self.workers[node] = worker + gevent.spawn(worker.run) + + + def stop_worker(self, node): + """Stop the worker for given node.""" + self.workers[node].stop() + del self.workers[node] + + + def run(self): + while not self.stopping.is_set(): + new_nodes = set(get_nodes()) + exisiting_nodes = set(self.workers.keys()) + + to_start = new_nodes - exisiting_nodes + for node in to_start: + self.start_worker(node) + + to_stop = exisiting_nodes - new_nodes + for node in to_stop: + self.stop_worker(node) + + self.stopping.wait(common.jitter(self.NODE_INTERVAL * 60)) + + for worker in self.workers: + worker.stop() + for worker in self.workers: + worker.done.wait() + + +class BackfillerWorker(object): + """Backfills segments from a node. + + Backfills all segments from node/stream to base_dir/stream for all variants. + Every SMALL_INTERVAL minutes backfill the last three hours starting from the + most recent one (a 'small backfill'). When not doing a small backfill, + backfill all segments starting with the most recent one (a 'large backfill') + unless a large backfill has occured less than LARGE_INTERVAL ago.""" + + SMALL_INTERVAL = 5 #minutes between small backfills + LARGE_INTERVAL = 60 #minutes between large backfills + WAIT_INTERVAL = 1 #seconds between backfill actions + + def __init__(self, manager, base_dir, node, stream, variants): + """Constructor for BackfillerWorker""" + self.manager = manager + self.logger = manager.logger.getChild('BackfillerWorker({}/{})@{:x}'.format(node, stream, id(self))) + self.base_dir = base_dir + self.node = node + self.stream = stream + self.variants = variants + self.stopping = gevent.event.Event() + self.done = gevent.event.Event() + + def __repr__(self): + return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) + __str__ = __repr__ + + def stop(self): + """Tell the worker to shut down""" + self.stopping.set() + + def trigger_new_worker(self): + self.manager.trigger_new_worker(self) + + def run(self): + self.logger.info('Worker starting') + try: + self._run() + except Exception: + self.logger.exception('Worker failed') + else: + self.logger.info('Worker stopped') + finally: + self.done.set() + del self.manager.workers[self.node] + + def _run(self): + + last_small_backfill = datetime.datetime.now() + datetime.timedetla(-1) + last_large_backfill = datetime.datetime.now() + datetime.timedetla(-1) + large_hours = [] + + while not self.stopping.is_set(): + + now = datetime.datetime.now() + + if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL): + backfill(self.base_dir, self.node, self.stream, self.variants, last_hours()) + last_small_backfill = now + + elif now - last_large_backfill > datetime.timedelta(minutes=self.LARGE_INTERVAL) or len(large_hours): + if not len(large_hours): + large_hours = list_hours(self.node, self.stream, self.variants) + last_large_backfill = now + + this_hour = large_hours[-1:] + large_hours = large_hours[:-1] + backfill(self.base_dir, self.node, self.stream, self.variants, this_hour) + else: + self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) + def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0, start=None): """Prototype backfiller service.