From 184eccd06982085e210e146de6419c06e5ae4d39 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Mon, 28 Oct 2019 02:18:27 +0000 Subject: [PATCH] documentation for --delete-old; check that start is not none --- backfiller/backfiller/main.py | 77 ++++++++++++++++++++++++++--------- 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 9a27a03..849787c 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -6,7 +6,6 @@ import hashlib import logging import os import random -import shutil import signal import socket import urlparse @@ -161,6 +160,7 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment, segments_backfilled.labels(remote=node, channel=channel, quality=quality, hour=hour).inc() logger.info('Segment {}/{}/{} backfilled'.format(quality, hour, missing_segment)) + def list_hours(node, channel, qualities, start=None): """Return a list of all available hours from a node. @@ -190,12 +190,14 @@ class BackfillerManager(object): The manager regularly calls get_nodes to an up to date list of nodes. If no worker exists for a node in this list or in the static_node list, the manager starts one. If a worker corresponds to a node not in either list, - the manager stops it. If run_once, only backfill once.""" + the manager stops it. If run_once, only backfill once. If delete_old, + delete hours older than start. The deletion is handled by the Manager as + having the Workers do it could lead to race conditions.""" NODE_INTERVAL = 300 #seconds between updating list of nodes def __init__(self, base_dir, channel, qualities, static_nodes=[], - start=None, keep_hours=0, run_once=False, node_file=None, + start=None, delete_old=False, run_once=False, node_file=None, node_database=None, localhost=None, download_concurrency=5, recent_cutoff=120): """Constructor for BackfillerManager. @@ -206,7 +208,7 @@ class BackfillerManager(object): self.qualities = qualities self.static_nodes = static_nodes self.start = start - self.keep_hours = keep_hours + self.delete_old = delete_old self.run_once = run_once self.node_file = node_file self.db_manager = None if node_database is None else database.DBManager(dsn=node_database) @@ -239,20 +241,50 @@ class BackfillerManager(object): """Stop the worker for given node.""" self.workers.pop(node).stop() - def delete(self): - """Delete hours older than self.keep_hours ago.""" + def delete_hours(self): + """Delete hours older than self.start ago.""" - self.logger.info('Deleting hours older than {} hours ago'.format(self.keep_hours)) + if isinstance(self.start, datetime.datetime): + self.logger.info('Deleting hours older than {}'.format(self.start.strftime(HOUR_FMT))) + else: + self.logger.info('Deleting hours older than {} hours ago'.format(self.start)) for quality in self.qualities: hours = list_local_hours(self.base_dir, self.channel, quality) - - cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.keep_hours) - hours = [hour for hour in hours if hour < cutoff] + if not isinstance(self.start, datetime.datetime): + cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.start) + else: + cutoff = self.start + hours = [hour for hour in hours if datetime.datetime.strptime(hour, HOUR_FMT) < cutoff] + hours.sort() for hour in hours: - self.logger.info('Deleting {}/{}'.format(quality, hour)) - shutil.rmtree(os.path.join(self.base_dir, self.channel, quality)) + path = os.path.join(self.base_dir, self.channel, quality, hour) + self.logger.info('Deleting {}'.format(path)) + segments = list_local_segments(self.base_dir, self.channel, quality, hour) + for segment in segments: + try: + os.remove(os.path.join(path, segment)) + except OSError as e: + # ignore error when the file is already gone + if e.errno != errno.ENOENT: + raise + + try: + os.rmdir(path) + except OSError as e: + # ignore error when file is already deleted + if e.errno == errno.ENOENT: + self.logger.warn('{} already deleted'.format(path)) + + # warn if not empty (will try to delete folder again next time) + elif e.errno == errno.ENOTEMPTY: + self.logger.warn('Failed to delete non-empty folder {}'.format(path)) + else: + raise e + else: + self.logger.info('{} deleted'.format(path)) + self.logger.info('Deleting old hours complete') @@ -291,6 +323,9 @@ class BackfillerManager(object): if self.run_once: break + if self.delete_old and self.start: + self.delete_hours() + self.stopping.wait(common.jitter(self.NODE_INTERVAL)) #wait for all workers to finish @@ -387,6 +422,14 @@ class BackfillerWorker(object): for quality in self.qualities: for hour in hours: + # since backfilling can take a long time, recheck whether this + # hour is after the start + if not isinstance(self.start, datetime.datetime): + start_hour = datetime.datetime.utcnow() - datetime.timedelta(hours=self.start) + else: + start_hour = self.start + if datetime.datetime.strptime(hour, HOUR_FMT) < start_hour: + break self.logger.info('Backfilling {}/{}'.format(quality, hour)) @@ -477,7 +520,7 @@ class BackfillerWorker(object): @argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') @argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') -@argh.arg('--keep-hours', help='Only keep this number of hours. If 0 (default) keep all hours.') +@argh.arg('--delete-old', help='If True, delete hours older than start. By default False.') @argh.arg('--run-once', help='If True, backfill only once. By default False.') @argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.') @@ -485,7 +528,7 @@ class BackfillerWorker(object): @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') @argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.') def main(channels, base_dir='.', qualities='source', metrics_port=8002, - static_nodes='', backdoor_port=0, start=None, keep_hours=0, + static_nodes='', backdoor_port=0, start=None, delete_old=False, run_once=False, node_file=None, node_database=None, localhost=socket.gethostname(), download_concurrency=5, recent_cutoff=120): """Backfiller service.""" @@ -499,13 +542,9 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002, try: start = float(start) logging.info('Backfilling last {} hours'.format(start)) - if keep_hours and start > keep_hours: - logging.warn('Keeping fewer hours ({}) than backfilling ({})'.format(keep_hours, start)) except ValueError: start = dateutil.parse(start) logging.info('Backfilling since {}'.format(start)) - if keep_hours: - logging.warn('Only keeping {} hours when backfilling since {}'.format(keep_hours, start)) common.PromLogCountsHandler.install() common.install_stacksampler() @@ -516,7 +555,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002, for channel in channels: logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir)) manager = BackfillerManager(base_dir, channel, qualities, static_nodes, - start, keep_hours, run_once, node_file, node_database, + start, delete_old, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff) managers.append(manager) workers.append(gevent.spawn(manager.run))