diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 7463379..365f2ff 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -3,6 +3,7 @@ import datetime import errno import hashlib +import itertools import logging import os import random @@ -50,7 +51,7 @@ node_list_errors = prom.Counter( backfill_errors = prom.Counter( 'backfill_errors', 'Number of errors backfilling', - ['remote', 'channel'], + ['remote'], ) segments_deleted = prom.Counter( @@ -169,10 +170,10 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment, logger.info('Segment {}/{}/{} backfilled'.format(quality, hour, missing_segment)) -def list_hours(node, channel, qualities, start=None): +def list_hours(node, channel, quality, start=None): """Return a list of all available hours from a node. - List all hours available from node/channel for each quality in qualities + List all hours available from node/channel ordered from newest to oldest. Keyword arguments: @@ -180,8 +181,7 @@ def list_hours(node, channel, qualities, start=None): return hours more recent than that number of hours ago. If None (default), all hours are returned.""" - hour_lists = [list_remote_hours(node, channel, quality) for quality in qualities] - hours = list(set().union(*hour_lists)) + hours = list_remote_hours(node, channel, quality) hours.sort(reverse=True) #latest hour first if start is not None: @@ -204,7 +204,7 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, channel, qualities, static_nodes=[], + def __init__(self, base_dir, channels, qualities, static_nodes=[], start=None, delete_old=False, run_once=False, node_file=None, node_database=None, localhost=None, download_concurrency=5, recent_cutoff=120): @@ -212,7 +212,7 @@ class BackfillerManager(object): Creates a manager for a given channel with specified qualities.""" self.base_dir = base_dir - self.channel = channel + self.channels = channels self.qualities = qualities self.static_nodes = static_nodes self.start = start @@ -225,7 +225,7 @@ class BackfillerManager(object): self.download_concurrency = download_concurrency self.recent_cutoff = recent_cutoff self.stopping = gevent.event.Event() - self.logger = logging.getLogger("BackfillerManager({})".format(channel)) + self.logger = logging.getLogger("BackfillerManager") self.workers = {} # {node url: worker} def stop(self): @@ -257,8 +257,8 @@ class BackfillerManager(object): else: self.logger.info('Deleting hours older than {} hours ago'.format(self.start)) - for quality in self.qualities: - hours = list_local_hours(self.base_dir, self.channel, quality) + for channel, quality in itertools.product(self.channels, self.qualities): + hours = list_local_hours(self.base_dir, channel, quality) if not isinstance(self.start, datetime.datetime): cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.start) else: @@ -270,13 +270,13 @@ class BackfillerManager(object): # deleting segments can take a bit time but is less important # than the actually backfilling so we yield gevent.idle() - path = os.path.join(self.base_dir, self.channel, quality, hour) + path = os.path.join(self.base_dir, channel, quality, hour) self.logger.info('Deleting {}'.format(path)) - segments = list_local_segments(self.base_dir, self.channel, quality, hour) + segments = list_local_segments(self.base_dir, channel, quality, hour) for segment in segments: try: os.remove(os.path.join(path, segment)) - segments_deleted.labels(channel=self.channel, quality=quality, hour=hour).inc() + segments_deleted.labels(channel=channel, quality=quality, hour=hour).inc() except OSError as e: # ignore error when the file is already gone if e.errno != errno.ENOENT: @@ -411,7 +411,7 @@ class BackfillerWorker(object): self.base_dir = manager.base_dir self.node = node self.download_concurrency = manager.download_concurrency - self.channel = manager.channel + self.channels = manager.channels self.qualities = manager.qualities self.start = manager.start self.run_once = manager.run_once @@ -420,7 +420,7 @@ class BackfillerWorker(object): self.done = gevent.event.Event() def __repr__(self): - return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.channel) + return '<{} at 0x{:x} for {!r}>'.format(type(self).__name__, id(self), self.node) __str__ = __repr__ def stop(self): @@ -428,15 +428,14 @@ class BackfillerWorker(object): self.logger.info('Stopping') self.stopping.set() - def backfill(self, hours): + def backfill(self): """Backfill from remote node. Backfill from node/channel/qualities to base_dir/channel/qualities for each hour in hours. """ - for quality in self.qualities: - - for hour in hours: + for channel, quality in itertools.product(self.channels, self.qualities): + for hour in list_hours(self.node, channel, quality, self.start): # since backfilling can take a long time, recheck whether this # hour is after the start if self.start is not None: @@ -449,8 +448,8 @@ class BackfillerWorker(object): self.logger.info('Backfilling {}/{}'.format(quality, hour)) - local_segments = set(list_local_segments(self.base_dir, self.channel, quality, hour)) - remote_segments = set(list_remote_segments(self.node, self.channel, quality, hour)) + local_segments = set(list_local_segments(self.base_dir, channel, quality, hour)) + remote_segments = set(list_remote_segments(self.node, channel, quality, hour)) missing_segments = list(remote_segments - local_segments) # randomise the order of the segments to reduce the chance that @@ -465,7 +464,7 @@ class BackfillerWorker(object): if self.stopping.is_set(): return - path = os.path.join(self.channel, quality, hour, missing_segment) + path = os.path.join(channel, quality, hour, missing_segment) # test to see if file is a segment and get the segments start time try: @@ -488,7 +487,7 @@ class BackfillerWorker(object): # start segment as soon as a pool slot opens up, then track it in workers workers.append(pool.spawn( get_remote_segment, - self.base_dir, self.node, self.channel, quality, hour, missing_segment, self.logger + self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger )) # verify that all the workers succeeded. if any failed, raise the exception from @@ -497,7 +496,7 @@ class BackfillerWorker(object): worker.get() # re-raise error, if any self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour)) - hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc() + hours_backfilled.labels(remote=self.node, channel=channel, quality=quality).inc() def run(self): @@ -507,7 +506,7 @@ class BackfillerWorker(object): while not self.stopping.is_set(): try: self.logger.info('Starting backfill') - self.backfill(list_hours(self.node, self.channel, self.qualities, self.start)) + self.backfill() self.logger.info('Backfill complete') failures = 0 #reset failure count on a successful backfill if not self.run_once: @@ -518,7 +517,7 @@ class BackfillerWorker(object): failures += 1 delay = common.jitter(TIMEOUT * 2**failures) self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay)) - backfill_errors.labels(remote=self.node, channel=self.channel).inc() + backfill_errors.labels(remote=self.node).inc() self.stopping.wait(delay) if self.run_once: @@ -566,38 +565,19 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002, common.install_stacksampler() prom.start_http_server(metrics_port) - managers = [] - workers = [] - for channel in channels: - logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir)) - manager = BackfillerManager(base_dir, channel, qualities, static_nodes, - start, delete_old, run_once, node_file, node_database, - localhost, download_concurrency, recent_cutoff) - managers.append(manager) - workers.append(gevent.spawn(manager.run)) + logging.info('Starting backfilling {} with {} as qualities to {}'.format(', '.join(channels), ', '.join(qualities), base_dir)) + manager = BackfillerManager(base_dir, channels, qualities, static_nodes, + start, delete_old, run_once, node_file, node_database, + localhost, download_concurrency, recent_cutoff) def stop(): - for manager in managers: - manager.stop() + manager.stop() gevent.signal(signal.SIGTERM, stop) if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() - # Wait for any to die - gevent.wait(workers, count=1) - # If one has stopped, either: - # 1. stop() was called and all are stopping - # 2. one errored and we should stop all remaining and report the error - # Our behaviour in both cases is the same: - # 1. Tell all managers to gracefully stop - stop() - # 2. Wait (with timeout) until they've stopped - gevent.wait(workers) - # 3. Check if any of them failed. If they did, report it. If mulitple - # failed, we report one arbitrarily. - for worker in workers: - worker.get() + manager.run() logging.info('Gracefully stopped')