From 71333cf8266bc4861e42cdad00fc7b241b7b99a5 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 3 Nov 2019 01:41:29 -0700 Subject: [PATCH] backfiller: Only run one manager, not one per channel Then treat backfilling each channel just like backfilling each quality. This is conceptually simpler (only one kind of thing, a (channel, quality)) and has better behaviour when a node is down (we only have one lot of error handling around it). It also means we aren't asking the database for the same info once per channel, and cuts down on logging noise. --- backfiller/backfiller/main.py | 82 +++++++++++++---------------------- 1 file changed, 31 insertions(+), 51 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 7463379..365f2ff 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -3,6 +3,7 @@ import datetime import errno import hashlib +import itertools import logging import os import random @@ -50,7 +51,7 @@ node_list_errors = prom.Counter( backfill_errors = prom.Counter( 'backfill_errors', 'Number of errors backfilling', - ['remote', 'channel'], + ['remote'], ) segments_deleted = prom.Counter( @@ -169,10 +170,10 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment, logger.info('Segment {}/{}/{} backfilled'.format(quality, hour, missing_segment)) -def list_hours(node, channel, qualities, start=None): +def list_hours(node, channel, quality, start=None): """Return a list of all available hours from a node. - List all hours available from node/channel for each quality in qualities + List all hours available from node/channel ordered from newest to oldest. Keyword arguments: @@ -180,8 +181,7 @@ def list_hours(node, channel, qualities, start=None): return hours more recent than that number of hours ago. If None (default), all hours are returned.""" - hour_lists = [list_remote_hours(node, channel, quality) for quality in qualities] - hours = list(set().union(*hour_lists)) + hours = list_remote_hours(node, channel, quality) hours.sort(reverse=True) #latest hour first if start is not None: @@ -204,7 +204,7 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, channel, qualities, static_nodes=[], + def __init__(self, base_dir, channels, qualities, static_nodes=[], start=None, delete_old=False, run_once=False, node_file=None, node_database=None, localhost=None, download_concurrency=5, recent_cutoff=120): @@ -212,7 +212,7 @@ class BackfillerManager(object): Creates a manager for a given channel with specified qualities.""" self.base_dir = base_dir - self.channel = channel + self.channels = channels self.qualities = qualities self.static_nodes = static_nodes self.start = start @@ -225,7 +225,7 @@ class BackfillerManager(object): self.download_concurrency = download_concurrency self.recent_cutoff = recent_cutoff self.stopping = gevent.event.Event() - self.logger = logging.getLogger("BackfillerManager({})".format(channel)) + self.logger = logging.getLogger("BackfillerManager") self.workers = {} # {node url: worker} def stop(self): @@ -257,8 +257,8 @@ class BackfillerManager(object): else: self.logger.info('Deleting hours older than {} hours ago'.format(self.start)) - for quality in self.qualities: - hours = list_local_hours(self.base_dir, self.channel, quality) + for channel, quality in itertools.product(self.channels, self.qualities): + hours = list_local_hours(self.base_dir, channel, quality) if not isinstance(self.start, datetime.datetime): cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.start) else: @@ -270,13 +270,13 @@ class BackfillerManager(object): # deleting segments can take a bit time but is less important # than the actually backfilling so we yield gevent.idle() - path = os.path.join(self.base_dir, self.channel, quality, hour) + path = os.path.join(self.base_dir, channel, quality, hour) self.logger.info('Deleting {}'.format(path)) - segments = list_local_segments(self.base_dir, self.channel, quality, hour) + segments = list_local_segments(self.base_dir, channel, quality, hour) for segment in segments: try: os.remove(os.path.join(path, segment)) - segments_deleted.labels(channel=self.channel, quality=quality, hour=hour).inc() + segments_deleted.labels(channel=channel, quality=quality, hour=hour).inc() except OSError as e: # ignore error when the file is already gone if e.errno != errno.ENOENT: @@ -411,7 +411,7 @@ class BackfillerWorker(object): self.base_dir = manager.base_dir self.node = node self.download_concurrency = manager.download_concurrency - self.channel = manager.channel + self.channels = manager.channels self.qualities = manager.qualities self.start = manager.start self.run_once = manager.run_once @@ -420,7 +420,7 @@ class BackfillerWorker(object): self.done = gevent.event.Event() def __repr__(self): - return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.channel) + return '<{} at 0x{:x} for {!r}>'.format(type(self).__name__, id(self), self.node) __str__ = __repr__ def stop(self): @@ -428,15 +428,14 @@ class BackfillerWorker(object): self.logger.info('Stopping') self.stopping.set() - def backfill(self, hours): + def backfill(self): """Backfill from remote node. Backfill from node/channel/qualities to base_dir/channel/qualities for each hour in hours. """ - for quality in self.qualities: - - for hour in hours: + for channel, quality in itertools.product(self.channels, self.qualities): + for hour in list_hours(self.node, channel, quality, self.start): # since backfilling can take a long time, recheck whether this # hour is after the start if self.start is not None: @@ -449,8 +448,8 @@ class BackfillerWorker(object): self.logger.info('Backfilling {}/{}'.format(quality, hour)) - local_segments = set(list_local_segments(self.base_dir, self.channel, quality, hour)) - remote_segments = set(list_remote_segments(self.node, self.channel, quality, hour)) + local_segments = set(list_local_segments(self.base_dir, channel, quality, hour)) + remote_segments = set(list_remote_segments(self.node, channel, quality, hour)) missing_segments = list(remote_segments - local_segments) # randomise the order of the segments to reduce the chance that @@ -465,7 +464,7 @@ class BackfillerWorker(object): if self.stopping.is_set(): return - path = os.path.join(self.channel, quality, hour, missing_segment) + path = os.path.join(channel, quality, hour, missing_segment) # test to see if file is a segment and get the segments start time try: @@ -488,7 +487,7 @@ class BackfillerWorker(object): # start segment as soon as a pool slot opens up, then track it in workers workers.append(pool.spawn( get_remote_segment, - self.base_dir, self.node, self.channel, quality, hour, missing_segment, self.logger + self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger )) # verify that all the workers succeeded. if any failed, raise the exception from @@ -497,7 +496,7 @@ class BackfillerWorker(object): worker.get() # re-raise error, if any self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour)) - hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc() + hours_backfilled.labels(remote=self.node, channel=channel, quality=quality).inc() def run(self): @@ -507,7 +506,7 @@ class BackfillerWorker(object): while not self.stopping.is_set(): try: self.logger.info('Starting backfill') - self.backfill(list_hours(self.node, self.channel, self.qualities, self.start)) + self.backfill() self.logger.info('Backfill complete') failures = 0 #reset failure count on a successful backfill if not self.run_once: @@ -518,7 +517,7 @@ class BackfillerWorker(object): failures += 1 delay = common.jitter(TIMEOUT * 2**failures) self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay)) - backfill_errors.labels(remote=self.node, channel=self.channel).inc() + backfill_errors.labels(remote=self.node).inc() self.stopping.wait(delay) if self.run_once: @@ -566,38 +565,19 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002, common.install_stacksampler() prom.start_http_server(metrics_port) - managers = [] - workers = [] - for channel in channels: - logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir)) - manager = BackfillerManager(base_dir, channel, qualities, static_nodes, - start, delete_old, run_once, node_file, node_database, - localhost, download_concurrency, recent_cutoff) - managers.append(manager) - workers.append(gevent.spawn(manager.run)) + logging.info('Starting backfilling {} with {} as qualities to {}'.format(', '.join(channels), ', '.join(qualities), base_dir)) + manager = BackfillerManager(base_dir, channels, qualities, static_nodes, + start, delete_old, run_once, node_file, node_database, + localhost, download_concurrency, recent_cutoff) def stop(): - for manager in managers: - manager.stop() + manager.stop() gevent.signal(signal.SIGTERM, stop) if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() - # Wait for any to die - gevent.wait(workers, count=1) - # If one has stopped, either: - # 1. stop() was called and all are stopping - # 2. one errored and we should stop all remaining and report the error - # Our behaviour in both cases is the same: - # 1. Tell all managers to gracefully stop - stop() - # 2. Wait (with timeout) until they've stopped - gevent.wait(workers) - # 3. Check if any of them failed. If they did, report it. If mulitple - # failed, we report one arbitrarily. - for worker in workers: - worker.get() + manager.run() logging.info('Gracefully stopped')