diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 4a41909..1b03b64 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -144,7 +144,7 @@ class BackfillerManager(object): def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_file=None, node_database=None, localhost=None, - download_concurrency=5): + download_concurrency=5, recent_cutoff=120): """Constructor for BackfillerManager. Creates a manager for a given stream with specified variants. If static_nodes is None, manager""" @@ -160,6 +160,7 @@ class BackfillerManager(object): self.db_manager = database.DBManager(dsn=self.node_database) self.localhost = localhost self.download_concurrency = download_concurrency + self.recent_cutoff = recent_cutoff self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} @@ -276,7 +277,11 @@ class BackfillerWorker(object): """Backfills segments from a node. Backfills every WAIT_INTERVAL all segments from node/stream to - base_dir/stream for all variants. If run_once, only backfill once.""" + base_dir/stream for all variants. If run_once, only backfill once. + + recent_cutoff -- Skip backfilling segments younger than this number of + seconds to prioritise letting the downloader grab these segments. + """ WAIT_INTERVAL = 120 #seconds between backfills @@ -290,6 +295,7 @@ class BackfillerWorker(object): self.variants = manager.variants self.start = manager.start self.run_once = manager.run_once + self.recent_cutoff = manager.recent_cutoff self.stopping = gevent.event.Event() self.done = gevent.event.Event() @@ -302,16 +308,12 @@ class BackfillerWorker(object): self.logger.info('Stopping') self.stopping.set() - def backfill(self, hours, segment_order='random', recent_cutoff=60): + def backfill(self, hours): """Backfill from remote node. Backfill from node/stream/variants to base_dir/stream/variants for each hour in hours. - - Keyword arguments: - recent_cutoff -- Skip backfilling segments younger than this number of - seconds to prioritise letting the downloader grab these segments.""" - + """ for variant in self.variants: for hour in hours: @@ -350,7 +352,7 @@ class BackfillerWorker(object): # to avoid getting in the downloader's way ignore segments # less than recent_cutoff old - if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): + if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff): self.logger.debug('Skipping {} as too recent'.format(path)) continue @@ -406,10 +408,11 @@ class BackfillerWorker(object): @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.') @argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()') @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') +@argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.') def main(streams, base_dir='.', variants='source', metrics_port=8002, static_nodes='', backdoor_port=0, start=None, run_once=False, node_file=None, node_database=None, localhost=socket.gethostname(), - download_concurrency=5): + download_concurrency=5, recent_cutoff=120): """Backfiller service.""" variants = variants.split(',') if variants else [] @@ -433,7 +436,7 @@ def main(streams, base_dir='.', variants='source', metrics_port=8002, workers = [] for stream in streams: logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) - manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency) + manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff) managers.append(manager) workers.append(gevent.spawn(manager.run))