|
|
@ -330,11 +330,12 @@ class BackfillerWorker(object):
|
|
|
|
del self.manager.workers[self.node]
|
|
|
|
del self.manager.workers[self.node]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(base_dir='.', stream='', variants='', metrics_port=8002,
|
|
|
|
def main(base_dir='.', streams='', variants='', metrics_port=8002,
|
|
|
|
static_nodes='', backdoor_port=0, start=None, run_once=False,
|
|
|
|
static_nodes='', backdoor_port=0, start=None, run_once=False,
|
|
|
|
node_source=None):
|
|
|
|
node_source=None):
|
|
|
|
"""Backfiller service."""
|
|
|
|
"""Backfiller service."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
streams = streams.split(',') if variants else []
|
|
|
|
variants = variants.split(',') if variants else []
|
|
|
|
variants = variants.split(',') if variants else []
|
|
|
|
static_nodes = static_nodes.split(',') if static_nodes else []
|
|
|
|
static_nodes = static_nodes.split(',') if static_nodes else []
|
|
|
|
|
|
|
|
|
|
|
@ -344,16 +345,40 @@ def main(base_dir='.', stream='', variants='', metrics_port=8002,
|
|
|
|
except ValueError:
|
|
|
|
except ValueError:
|
|
|
|
start = dateutil.parser.parse(start)
|
|
|
|
start = dateutil.parser.parse(start)
|
|
|
|
|
|
|
|
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_source)
|
|
|
|
|
|
|
|
gevent.signal(signal.SIGTERM, manager.stop)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
common.PromLogCountsHandler.install()
|
|
|
|
common.PromLogCountsHandler.install()
|
|
|
|
common.install_stacksampler()
|
|
|
|
common.install_stacksampler()
|
|
|
|
prom.start_http_server(metrics_port)
|
|
|
|
prom.start_http_server(metrics_port)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
managers = []
|
|
|
|
|
|
|
|
workers = []
|
|
|
|
|
|
|
|
for stream in streams:
|
|
|
|
|
|
|
|
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
|
|
|
|
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_source)
|
|
|
|
|
|
|
|
managers.append(manager)
|
|
|
|
|
|
|
|
workers.append(gevent.spawn(manager.run))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def stop():
|
|
|
|
|
|
|
|
for manager in managers:
|
|
|
|
|
|
|
|
manager.stop()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gevent.signal(signal.SIGTERM, stop)
|
|
|
|
|
|
|
|
|
|
|
|
if backdoor_port:
|
|
|
|
if backdoor_port:
|
|
|
|
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
|
|
|
|
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
|
|
|
|
|
|
|
|
|
|
|
|
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
|
|
|
|
# Wait for any to die
|
|
|
|
manager.run()
|
|
|
|
gevent.wait(workers, count=1)
|
|
|
|
|
|
|
|
# If one has stopped, either:
|
|
|
|
|
|
|
|
# 1. stop() was called and all are stopping
|
|
|
|
|
|
|
|
# 2. one errored and we should stop all remaining and report the error
|
|
|
|
|
|
|
|
# Our behaviour in both cases is the same:
|
|
|
|
|
|
|
|
# 1. Tell all managers to gracefully stop
|
|
|
|
|
|
|
|
stop()
|
|
|
|
|
|
|
|
# 2. Wait (with timeout) until they've stopped
|
|
|
|
|
|
|
|
gevent.wait(workers)
|
|
|
|
|
|
|
|
# 3. Check if any of them failed. If they did, report it. If mulitple
|
|
|
|
|
|
|
|
# failed, we report one arbitrarily.
|
|
|
|
|
|
|
|
for worker in workers:
|
|
|
|
|
|
|
|
worker.get()
|
|
|
|
|
|
|
|
|
|
|
|
logging.info('Gracefully stopped')
|
|
|
|
logging.info('Gracefully stopped')
|
|
|
|