|
|
@ -220,7 +220,7 @@ def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='r
|
|
|
|
logging.info('Finished backfilling from {}'.format(node))
|
|
|
|
logging.info('Finished backfilling from {}'.format(node))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002):
|
|
|
|
def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None):
|
|
|
|
"""Prototype backfiller service.
|
|
|
|
"""Prototype backfiller service.
|
|
|
|
|
|
|
|
|
|
|
|
Do a backfill of the last 3 hours from stream/variants from all nodes
|
|
|
|
Do a backfill of the last 3 hours from stream/variants from all nodes
|
|
|
@ -234,6 +234,8 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180,
|
|
|
|
# stretch goal: use the backfiller to monitor the restreamer
|
|
|
|
# stretch goal: use the backfiller to monitor the restreamer
|
|
|
|
|
|
|
|
|
|
|
|
variants = variants.split(',') if variants else []
|
|
|
|
variants = variants.split(',') if variants else []
|
|
|
|
|
|
|
|
if nodes is not None:
|
|
|
|
|
|
|
|
nodes = nodes.split(',') if nodes else []
|
|
|
|
|
|
|
|
|
|
|
|
common.PromLogCountsHandler.install()
|
|
|
|
common.PromLogCountsHandler.install()
|
|
|
|
prom.start_http_server(metrics_port)
|
|
|
|
prom.start_http_server(metrics_port)
|
|
|
@ -244,9 +246,9 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180,
|
|
|
|
full_fill_start = fill_start
|
|
|
|
full_fill_start = fill_start
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
backfill(base_dir, stream, variants, 3)
|
|
|
|
backfill(base_dir, stream, variants, 3, nodes=nodes)
|
|
|
|
|
|
|
|
|
|
|
|
backfill(base_dir, stream, variants)
|
|
|
|
backfill(base_dir, stream, variants, nodes=nodes)
|
|
|
|
|
|
|
|
|
|
|
|
# I'm sure there is a module that does this in a more robust way
|
|
|
|
# I'm sure there is a module that does this in a more robust way
|
|
|
|
# but I understand this and it gives the behaviour I want
|
|
|
|
# but I understand this and it gives the behaviour I want
|
|
|
@ -256,14 +258,14 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180,
|
|
|
|
|
|
|
|
|
|
|
|
if now - full_fill_start > datetime.timedelta(minutes=full_fill_wait):
|
|
|
|
if now - full_fill_start > datetime.timedelta(minutes=full_fill_wait):
|
|
|
|
|
|
|
|
|
|
|
|
backfill(base_dir, stream, variants)
|
|
|
|
backfill(base_dir, stream, variants, nodes=nodes)
|
|
|
|
|
|
|
|
|
|
|
|
fill_start = now
|
|
|
|
fill_start = now
|
|
|
|
full_fill_start = fill_start
|
|
|
|
full_fill_start = fill_start
|
|
|
|
|
|
|
|
|
|
|
|
elif now - fill_start > datetime.timedelta(minutes=fill_wait):
|
|
|
|
elif now - fill_start > datetime.timedelta(minutes=fill_wait):
|
|
|
|
|
|
|
|
|
|
|
|
backfill(base_dir, stream, variants, 3)
|
|
|
|
backfill(base_dir, stream, variants, 3, nodes=nodes)
|
|
|
|
|
|
|
|
|
|
|
|
fill_start = now
|
|
|
|
fill_start = now
|
|
|
|
|
|
|
|
|
|
|
|