diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index bd47aaf..b982180 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -203,6 +203,33 @@ def list_hours(node, channel, quality, start=None): return hours +def list_local_extras(base_dir, dir): + root = os.path.join(base_dir, dir) + for path, subdirs, files in os.walk(root): + relpath = os.path.relpath(path, root) + for file in files: + yield os.path.normpath(os.path.join(relpath, file)) + + +def list_remote_extras(node, dir, timeout=TIMEOUT): + uri = f'{node}/extras/{dir}' + resp = requests.get(uri, timeout=timeout, metric_name='list_remote_extras') + resp.raise_for_status() + return resp.json() + + +def get_remote_extra(base_dir, node, path, timeout=TIMEOUT): + local_path = os.path.join(base_dir, path) + if os.path.exists(local_path): + return + + uri = f'{node}/segments/{path}' + resp = requests.get(uri, timeout=timeout, metric_name='get_remote_extra') + resp.raise_for_status() + + common.atomic_write(local_path, resp.content) + + class BackfillerManager(object): """Manages BackfillerWorkers to backfill from a pool of nodes. @@ -215,7 +242,7 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, channels, qualities, static_nodes=[], + def __init__(self, base_dir, channels, qualities, extras=[], static_nodes=[], start=None, delete_old=False, run_once=False, node_file=None, node_database=None, localhost=None, download_concurrency=5, recent_cutoff=120): @@ -225,6 +252,7 @@ class BackfillerManager(object): self.base_dir = base_dir self.channels = channels self.qualities = qualities + self.extras = extras self.static_nodes = static_nodes self.start = start self.delete_old = delete_old @@ -428,6 +456,7 @@ class BackfillerWorker(object): self.download_concurrency = manager.download_concurrency self.channels = manager.channels self.qualities = manager.qualities + self.extras = manager.extras self.start = manager.start self.run_once = manager.run_once self.recent_cutoff = manager.recent_cutoff @@ -443,7 +472,7 @@ class BackfillerWorker(object): self.logger.info('Stopping') self.stopping.set() - def backfill(self): + def backfill_segments(self): """Backfill from remote node. Backfill from node/channel/qualities to base_dir/channel/qualities for @@ -529,6 +558,36 @@ class BackfillerWorker(object): hours_backfilled.labels(remote=self.node, channel=channel, quality=quality).inc() + def backfill_extras(self): + for dir in self.extras: + self.logger.info(f'Backfilling extra directory {dir}') + + local_files = set(list_local_extras(self.base_dir, dir)) + remote_files = set(list_remote_extras(self.node, dir)) + missing_files = list(remote_files - local_files) + + # randomise the order of the files to reduce the chance that + # multiple workers request the same file at the same time + random.shuffle(missing_files) + + pool = gevent.pool.Pool(self.download_concurrency) + workers = [] + + for file in missing_files: + if self.stopping.is_set(): + return + path = os.path.join(dir, file) + self.logger.info(f"Backfilling {path}") + workers.append(pool.spawn( + get_remote_extra(self.base_dir, self.node, path) + )) + + for worker in workers: + worker.get() + + self.logger.info("Backfilled {} extras".format(len(missing_files))) + + def run(self): self.logger.info('Starting') failures = 0 @@ -536,7 +595,8 @@ class BackfillerWorker(object): while not self.stopping.is_set(): try: self.logger.info('Starting backfill') - self.backfill() + self.backfill_segments() + self.backfill_extras() self.logger.info('Backfill complete') failures = 0 #reset failure count on a successful backfill if not self.run_once: @@ -558,11 +618,17 @@ class BackfillerWorker(object): if self.node in self.manager.workers: del self.manager.workers[self.node] + +def comma_seperated(value): + return [part.strip() for part in value.split(",")] if value else [] + + @argh.arg('channels', nargs='*', help='Channels to backfill from') @argh.arg('--base-dir', help='Directory to which segments will be backfilled. Default is current working directory.') -@argh.arg('--qualities', help="Qualities of each channel to backfill. Comma seperated if multiple. Default is 'source'.") +@argh.arg('--qualities', help="Qualities of each channel to backfill. Comma seperated if multiple. Default is 'source'.", type=comma_seperated) +@argh.arg('--extras', help="Extra non-segment directories to backfill. Comma seperated if multiple.", type=comma_seperated) @argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8002.') -@argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.') +@argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.', type=comma_seperated) @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') @argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') @argh.arg('--delete-old', help='If True, delete hours older than start. By default False.') @@ -572,17 +638,12 @@ class BackfillerWorker(object): @argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()') @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') @argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.') -def main(channels, base_dir='.', qualities='source', metrics_port=8002, - static_nodes='', backdoor_port=0, start=None, delete_old=False, +def main(channels, base_dir='.', qualities=['source'], extras=[], metrics_port=8002, + static_nodes=[], backdoor_port=0, start=None, delete_old=False, run_once=False, node_file=None, node_database=None, localhost=socket.gethostname(), download_concurrency=5, recent_cutoff=120): """Backfiller service.""" - qualities = qualities.split(',') if qualities else [] - qualities = [quality.strip() for quality in qualities] - static_nodes = static_nodes.split(',') if static_nodes else [] - static_nodes = [static_node.strip() for static_node in static_nodes] - if start is not None: try: start = float(start) @@ -596,7 +657,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002, prom.start_http_server(metrics_port) logging.info('Starting backfilling {} with {} as qualities to {}'.format(', '.join(channels), ', '.join(qualities), base_dir)) - manager = BackfillerManager(base_dir, channels, qualities, static_nodes, + manager = BackfillerManager(base_dir, channels, qualities, extras, static_nodes, start, delete_old, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff)