diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 7a34234..ec55c98 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -177,6 +177,7 @@ class BackfillerManager(object): self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} + self.logger.info('Starting') def stop(self): """Shut down all workers and stop backfilling.""" @@ -196,6 +197,13 @@ class BackfillerManager(object): """Stop the worker for given node.""" self.workers.pop(node).stop() + def restart_worker(self, node): + """Restart the worker for given node.""" + self.stop_worker(node) + self.stopping.wait(common.jitter(self.RESTART_INTERVAL)) + self.start_worker(node) + + def run(self): while not self.stopping.is_set(): new_nodes = set(get_nodes() + self.static_nodes) @@ -226,6 +234,7 @@ class BackfillerWorker(object): base_dir/stream for all variants. If run_once, only backfill once.""" WAIT_INTERVAL = 120 #seconds between backfills + RETRY_INTERVAL = 5 #seconds between retrying a failed backfill def __init__(self, manager, base_dir, node, stream, variants, run_once=False): self.manager = manager @@ -237,6 +246,9 @@ class BackfillerWorker(object): self.run_once = run_once self.stopping = gevent.event.Event() self.done = gevent.event.Event() + self.failures = 0 + + self.logger.info('Starting') def __repr__(self): return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) @@ -254,10 +266,6 @@ class BackfillerWorker(object): hour in hours. Keyword arguments: - segment_order -- If 'random', randomise the order of segments (default). - If 'forward', sort the segment in ascending order. If 'reverse', - sort the segments in descending order. Otherwise, do not change the - order of segments. recent_cutoff -- Skip backfilling segments younger than this number of seconds to prioritise letting the downloader grab these segments.""" @@ -273,13 +281,9 @@ class BackfillerWorker(object): remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour)) missing_segments = list(remote_segments - local_segments) - # useful if running in parallel so multiple nodes don't request the same segment at the same time - if segment_order == 'random': - random.shuffle(missing_segments) - elif segment_order == 'forward': - missing_segments.sort() - elif segment_order == 'reverse': - missing_segments.sort(reverse=True) + # randomise the order of the segments to reduce the chance that + # multiple workers request the same segment at the same time + random.shuffle(missing_segments) for missing_segment in missing_segments: @@ -307,8 +311,17 @@ class BackfillerWorker(object): self.logger.info('Worker starting') while not self.stopping.is_set(): - self.backfill(list_hours(self.node, self.stream, self.variants)) - self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) + + try: + self.backfill(list_hours(self.node, self.stream, self.variants)) + self.failures = 0 #reset failure count on a successful backfill + self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) + + except Exception: + self.failures += 1 + delay = common.jitter(self.RETRY_INTERVAL * 2**self.failures) + self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay)) + self.stopping.wait(delay) if self.run_once: break