From 794b5fec23b910832c6033a624c09181a3b8dfb0 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Tue, 22 Oct 2019 03:42:23 +0100 Subject: [PATCH] Added the ability to delete old hours --- backfiller/backfiller/main.py | 57 ++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index ec3d023..215ff1f 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -6,6 +6,7 @@ import hashlib import logging import os import random +import shutil import signal import socket import urlparse @@ -55,6 +56,24 @@ HOUR_FMT = '%Y-%m-%dT%H' TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions MAX_BACKOFF = 4 #number of times to back off +def list_local_hours(base_dir, channel, quality): + """List hours in a given quality directory. + + For a given base_dir/channel/quality directory return a list of + non-hidden files. If the directory path is not found, return an empty list. + + Based on based on restreamer.list_hours. We could just call + restreamer.list_hours but this avoids HTTP/JSON overheads.""" + + path = os.path.join(base_dir, channel, quality) + try: + return [name for name in os.listdir(path) if not name.startswith('.')] + + except OSError as e: + if e.errno != errno.ENOENT: + raise + return [] + def list_local_segments(base_dir, channel, quality, hour): """List segments in a given hour directory. @@ -176,8 +195,9 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes def __init__(self, base_dir, channel, qualities, static_nodes=[], - start=None, run_once=False, node_file=None, node_database=None, - localhost=None, download_concurrency=5, recent_cutoff=120): + start=None, delete_before=0, run_once=False, node_file=None, + node_database=None, localhost=None, download_concurrency=5, + recent_cutoff=120): """Constructor for BackfillerManager. Creates a manager for a given channel with specified qualities.""" @@ -186,6 +206,7 @@ class BackfillerManager(object): self.qualities = qualities self.static_nodes = static_nodes self.start = start + self.delete_before = delete_before self.run_once = run_once self.node_file = node_file self.db_manager = None if node_database is None else database.DBManager(dsn=node_database) @@ -218,6 +239,23 @@ class BackfillerManager(object): """Stop the worker for given node.""" self.workers.pop(node).stop() + def delete(self): + """Delete old hours.""" + + self.logger.info('Deleting hours older than {} hours ago'.format(self.delete_before)) + + for quality in self.qualities: + hours = list_local_hours(self.base_dir, self.channel, quality) + + cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.delete_before) + hours = [hour for hour in hours if hour < cutoff] + + for hour in hours: + self.logger.info('Deleting {}/{}'.format(quality, hour)) + shutil.rmtree(os.path.join(self.base_dir, self.channel, quality)) + + self.logger.info('Deleting old hours complete') + def run(self): """Stop and start workers based on results of get_nodes. @@ -349,7 +387,7 @@ class BackfillerWorker(object): for quality in self.qualities: for hour in hours: - + self.logger.info('Backfilling {}/{}'.format(quality, hour)) local_segments = set(list_local_segments(self.base_dir, self.channel, quality, hour)) @@ -402,12 +440,12 @@ class BackfillerWorker(object): self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour)) hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc() + def run(self): self.logger.info('Starting') failures = 0 while not self.stopping.is_set(): - try: self.logger.info('Starting backfill') self.backfill(list_hours(self.node, self.channel, self.qualities, self.start)) @@ -439,6 +477,7 @@ class BackfillerWorker(object): @argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') @argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') +@argh.arg('--delete-before', help='Delete hours older than this number of hours ago. If 0 (default) do not delete any hours.') @argh.arg('--run-once', help='If True, backfill only once. By default False.') @argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.') @@ -446,9 +485,9 @@ class BackfillerWorker(object): @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') @argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.') def main(channels, base_dir='.', qualities='source', metrics_port=8002, - static_nodes='', backdoor_port=0, start=None, run_once=False, - node_file=None, node_database=None, localhost=socket.gethostname(), - download_concurrency=5, recent_cutoff=120): + static_nodes='', backdoor_port=0, start=None, delete_before=0, + run_once=False, node_file=None, node_database=None, + localhost=socket.gethostname(), download_concurrency=5, recent_cutoff=120): """Backfiller service.""" qualities = qualities.split(',') if qualities else [] @@ -472,7 +511,9 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002, workers = [] for channel in channels: logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir)) - manager = BackfillerManager(base_dir, channel, qualities, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff) + manager = BackfillerManager(base_dir, channel, qualities, static_nodes, + start, delete_before, run_once, node_file, node_database, + localhost, download_concurrency, recent_cutoff) managers.append(manager) workers.append(gevent.spawn(manager.run))