diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index e42251d..0dcafef 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -330,11 +330,12 @@ class BackfillerWorker(object): del self.manager.workers[self.node] -def main(base_dir='.', stream='', variants='', metrics_port=8002, +def main(base_dir='.', streams='', variants='', metrics_port=8002, static_nodes='', backdoor_port=0, start=None, run_once=False, node_source=None): """Backfiller service.""" + streams = streams.split(',') if variants else [] variants = variants.split(',') if variants else [] static_nodes = static_nodes.split(',') if static_nodes else [] @@ -344,16 +345,40 @@ def main(base_dir='.', stream='', variants='', metrics_port=8002, except ValueError: start = dateutil.parser.parse(start) - manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_source) - gevent.signal(signal.SIGTERM, manager.stop) - common.PromLogCountsHandler.install() common.install_stacksampler() prom.start_http_server(metrics_port) + managers = [] + workers = [] + for stream in streams: + logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) + manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_source) + managers.append(manager) + workers.append(gevent.spawn(manager.run)) + + def stop(): + for manager in managers: + manager.stop() + + gevent.signal(signal.SIGTERM, stop) + if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() - logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) - manager.run() + # Wait for any to die + gevent.wait(workers, count=1) + # If one has stopped, either: + # 1. stop() was called and all are stopping + # 2. one errored and we should stop all remaining and report the error + # Our behaviour in both cases is the same: + # 1. Tell all managers to gracefully stop + stop() + # 2. Wait (with timeout) until they've stopped + gevent.wait(workers) + # 3. Check if any of them failed. If they did, report it. If mulitple + # failed, we report one arbitrarily. + for worker in workers: + worker.get() + logging.info('Gracefully stopped')