diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 0e26f0e..fe16cec 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -6,7 +6,7 @@ import errno import logging import os import random -import time +import signal import uuid import dateutil.parser @@ -131,25 +131,6 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, segments_backfilled.labels(remote=node, stream=stream, variant=variant, hour=hour).inc() -def backfill_nodes(base_dir, stream, variants, hours=None, nodes=None, start=None): - """Loop over nodes backfilling from each. - - Backfill from node/stream/variants to base_dir/stream/variants for each node - in nodes. If nodes is None, use get_nodes() to get a list of nodes to - backfill from. By default all hours are backfilled. If backfilling from a - node raises an exception, this just goes onto the next node.""" - - if nodes is None: - nodes = get_nodes() - - #ideally do this in parallel - for node in nodes: - try: - backfill(base_dir, node, stream, variants, hours) - except Exception: - logging.exception("Error while backfilling node {}".format(node)) - - def last_hours(n_hours=3): """Return of a list of the last n_hours in descending order.""" if n_hours < 1: @@ -187,7 +168,7 @@ def list_hours(node, stream, variants, order='forward', start=None): return hours -def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60): +def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60, stopping=None): """Backfill from remote node. Backfill from node/stream/variants to base_dir/stream/variants for each hour @@ -207,6 +188,9 @@ def backfill(base_dir, node, stream, variants, hours, segment_order='random', re for hour in hours: + if stopping is not None and stopping.is_set(): + return + logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour)) local_segments = set(list_local_segments(base_dir, stream, variant, hour)) @@ -251,11 +235,12 @@ class BackfillerManager(object): NODE_INTERVAL = 5 #minutes between updating list of nodes - def __init__(self, base_dir, stream, variants, input_nodes=None): + def __init__(self, base_dir, stream, variants, nodes=None): """Constructor for BackfillerManager.""" self.base_dir = base_dir self.stream = stream self.variants = variants + self.nodes = nodes self.stopping = gevent.event.Event() self.workers = {} # {node url: worker} @@ -283,7 +268,10 @@ class BackfillerManager(object): def run(self): while not self.stopping.is_set(): - new_nodes = set(get_nodes()) + if self.nodes is None: + new_nodes = set(get_nodes()) + else: + new_nodes = self.nodes exisiting_nodes = set(self.workers.keys()) to_start = new_nodes - exisiting_nodes @@ -334,8 +322,6 @@ class BackfillerWorker(object): """Tell the worker to shut down""" self.stopping.set() - def trigger_new_worker(self): - self.manager.trigger_new_worker(self) def run(self): self.logger.info('Worker starting') @@ -350,7 +336,6 @@ class BackfillerWorker(object): del self.manager.workers[self.node] def _run(self): - last_small_backfill = datetime.datetime.now() + datetime.timedetla(-1) last_large_backfill = datetime.datetime.now() + datetime.timedetla(-1) large_hours = [] @@ -360,7 +345,7 @@ class BackfillerWorker(object): now = datetime.datetime.now() if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL): - backfill(self.base_dir, self.node, self.stream, self.variants, last_hours()) + backfill(self.base_dir, self.node, self.stream, self.variants, last_hours(), stopping=self.stopping) last_small_backfill = now elif now - last_large_backfill > datetime.timedelta(minutes=self.LARGE_INTERVAL) or len(large_hours): @@ -370,21 +355,13 @@ class BackfillerWorker(object): this_hour = large_hours[-1:] large_hours = large_hours[:-1] - backfill(self.base_dir, self.node, self.stream, self.variants, this_hour) + backfill(self.base_dir, self.node, self.stream, self.variants, this_hour, stopping=self.stopping) else: self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0, start=None): - """Prototype backfiller service. - - Do a backfill of the last 3 hours from stream/variants from all nodes - initially before doing a full backfill from all nodes. Then every sleep_time - minutes check to see if more than fill_wait minutes have passed since the - last backfill. If so do a backfill of the last 3 hours. Also check whether - it has been more than full_fill_wait minutes since the last full backfill; - if so, do a full backfill.""" - # TODO replace this with a more robust event based service and backfill from multiple nodes in parallel + """Backfiller service.""" # stretch goal: provide an interface to trigger backfills manually # stretch goal: use the backfiller to monitor the restreamer @@ -394,6 +371,9 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, if start is not None: start = dateutil.parser.parse(start) + manager = BackfillerManager(base_dir, stream, variants, nodes) + gevent.signal(signal.SIGTERM, manager.stop) + common.PromLogCountsHandler.install() common.install_stacksampler() prom.start_http_server(metrics_port) @@ -402,34 +382,5 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) - - fill_start = datetime.datetime.now() - full_fill_start = fill_start - - - backfill(base_dir, stream, variants, 3, nodes=nodes, start=start) - - backfill(base_dir, stream, variants, nodes=nodes, start=start) - - # I'm sure there is a module that does this in a more robust way - # but I understand this and it gives the behaviour I want - while True: - - now = datetime.datetime.now() - - if now - full_fill_start > datetime.timedelta(minutes=full_fill_wait): - - backfill(base_dir, stream, variants, nodes=nodes, start=start) - - fill_start = now - full_fill_start = fill_start - - elif now - fill_start > datetime.timedelta(minutes=fill_wait): - - backfill(base_dir, stream, variants, 3, nodes=nodes, start=start) - - fill_start = now - - else: - time.sleep(common.jitter(60 * sleep_time)) - + manager.run() + logging.info('Gracefully stopped')