renamed delete_before to keep_hours

pull/129/head
Christopher Usher 5 years ago
parent be562b8448
commit a361249145

@ -195,7 +195,7 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, channel, qualities, static_nodes=[], def __init__(self, base_dir, channel, qualities, static_nodes=[],
start=None, delete_before=0, run_once=False, node_file=None, start=None, keep_hours=0, run_once=False, node_file=None,
node_database=None, localhost=None, download_concurrency=5, node_database=None, localhost=None, download_concurrency=5,
recent_cutoff=120): recent_cutoff=120):
"""Constructor for BackfillerManager. """Constructor for BackfillerManager.
@ -206,7 +206,7 @@ class BackfillerManager(object):
self.qualities = qualities self.qualities = qualities
self.static_nodes = static_nodes self.static_nodes = static_nodes
self.start = start self.start = start
self.delete_before = delete_before self.keep_hours = keep_hours
self.run_once = run_once self.run_once = run_once
self.node_file = node_file self.node_file = node_file
self.db_manager = None if node_database is None else database.DBManager(dsn=node_database) self.db_manager = None if node_database is None else database.DBManager(dsn=node_database)
@ -240,14 +240,14 @@ class BackfillerManager(object):
self.workers.pop(node).stop() self.workers.pop(node).stop()
def delete(self): def delete(self):
"""Delete old hours.""" """Delete hours older than self.keep_hours ago."""
self.logger.info('Deleting hours older than {} hours ago'.format(self.delete_before)) self.logger.info('Deleting hours older than {} hours ago'.format(self.keep_hours))
for quality in self.qualities: for quality in self.qualities:
hours = list_local_hours(self.base_dir, self.channel, quality) hours = list_local_hours(self.base_dir, self.channel, quality)
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.delete_before) cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.keep_hours)
hours = [hour for hour in hours if hour < cutoff] hours = [hour for hour in hours if hour < cutoff]
for hour in hours: for hour in hours:
@ -477,7 +477,7 @@ class BackfillerWorker(object):
@argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.') @argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.')
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
@argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') @argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.')
@argh.arg('--delete-before', help='Delete hours older than this number of hours ago. If 0 (default) do not delete any hours.') @argh.arg('--keep-hours', help='Only keep this number of hours. If 0 (default) keep all hours.')
@argh.arg('--run-once', help='If True, backfill only once. By default False.') @argh.arg('--run-once', help='If True, backfill only once. By default False.')
@argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") @argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.")
@argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.') @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.')
@ -485,7 +485,7 @@ class BackfillerWorker(object):
@argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.')
@argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.') @argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.')
def main(channels, base_dir='.', qualities='source', metrics_port=8002, def main(channels, base_dir='.', qualities='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, delete_before=0, static_nodes='', backdoor_port=0, start=None, keep_hours=0,
run_once=False, node_file=None, node_database=None, run_once=False, node_file=None, node_database=None,
localhost=socket.gethostname(), download_concurrency=5, recent_cutoff=120): localhost=socket.gethostname(), download_concurrency=5, recent_cutoff=120):
"""Backfiller service.""" """Backfiller service."""
@ -499,13 +499,13 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
try: try:
start = float(start) start = float(start)
logging.info('Backfilling last {} hours'.format(start)) logging.info('Backfilling last {} hours'.format(start))
if delete_before and start > delete_before: if keep_hours and start > keep_hours:
logging.warn('Keeping fewer hours ({}) than backfilling ({})'.format(delete_before, start)) logging.warn('Keeping fewer hours ({}) than backfilling ({})'.format(keep_hours, start))
except ValueError: except ValueError:
start = dateutil.parse(start) start = dateutil.parse(start)
logging.info('Backfilling since {}'.format(start)) logging.info('Backfilling since {}'.format(start))
if delete_before: if keep_hours:
logging.warn('Only keeping {} hours when backfilling since {}'.format(delete_before, start)) logging.warn('Only keeping {} hours when backfilling since {}'.format(keep_hours, start))
common.PromLogCountsHandler.install() common.PromLogCountsHandler.install()
common.install_stacksampler() common.install_stacksampler()
@ -516,7 +516,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
for channel in channels: for channel in channels:
logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir)) logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir))
manager = BackfillerManager(base_dir, channel, qualities, static_nodes, manager = BackfillerManager(base_dir, channel, qualities, static_nodes,
start, delete_before, run_once, node_file, node_database, start, keep_hours, run_once, node_file, node_database,
localhost, download_concurrency, recent_cutoff) localhost, download_concurrency, recent_cutoff)
managers.append(manager) managers.append(manager)
workers.append(gevent.spawn(manager.run)) workers.append(gevent.spawn(manager.run))

Loading…
Cancel
Save