From f50276bd01c25cb94c1376b92a7fa369d48a1c29 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 27 Jun 2019 16:58:34 -0700 Subject: [PATCH] backfiller: Expose recent_cutoff as CLI arg and increase it to 120s default In testing, GDQ's stream delay went up over 1min, which caused backfillers to backfill segments at the same time they were downloaded. We increase the window for now, and also make it configurable. --- backfiller/backfiller/main.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 4a41909..1b03b64 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -144,7 +144,7 @@ class BackfillerManager(object): def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, run_once=False, node_file=None, node_database=None, localhost=None, - download_concurrency=5): + download_concurrency=5, recent_cutoff=120): """Constructor for BackfillerManager. Creates a manager for a given stream with specified variants. If static_nodes is None, manager""" @@ -160,6 +160,7 @@ class BackfillerManager(object): self.db_manager = database.DBManager(dsn=self.node_database) self.localhost = localhost self.download_concurrency = download_concurrency + self.recent_cutoff = recent_cutoff self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} @@ -276,7 +277,11 @@ class BackfillerWorker(object): """Backfills segments from a node. Backfills every WAIT_INTERVAL all segments from node/stream to - base_dir/stream for all variants. If run_once, only backfill once.""" + base_dir/stream for all variants. If run_once, only backfill once. + + recent_cutoff -- Skip backfilling segments younger than this number of + seconds to prioritise letting the downloader grab these segments. + """ WAIT_INTERVAL = 120 #seconds between backfills @@ -290,6 +295,7 @@ class BackfillerWorker(object): self.variants = manager.variants self.start = manager.start self.run_once = manager.run_once + self.recent_cutoff = manager.recent_cutoff self.stopping = gevent.event.Event() self.done = gevent.event.Event() @@ -302,16 +308,12 @@ class BackfillerWorker(object): self.logger.info('Stopping') self.stopping.set() - def backfill(self, hours, segment_order='random', recent_cutoff=60): + def backfill(self, hours): """Backfill from remote node. Backfill from node/stream/variants to base_dir/stream/variants for each hour in hours. - - Keyword arguments: - recent_cutoff -- Skip backfilling segments younger than this number of - seconds to prioritise letting the downloader grab these segments.""" - + """ for variant in self.variants: for hour in hours: @@ -350,7 +352,7 @@ class BackfillerWorker(object): # to avoid getting in the downloader's way ignore segments # less than recent_cutoff old - if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): + if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff): self.logger.debug('Skipping {} as too recent'.format(path)) continue @@ -406,10 +408,11 @@ class BackfillerWorker(object): @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.') @argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()') @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') +@argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.') def main(streams, base_dir='.', variants='source', metrics_port=8002, static_nodes='', backdoor_port=0, start=None, run_once=False, node_file=None, node_database=None, localhost=socket.gethostname(), - download_concurrency=5): + download_concurrency=5, recent_cutoff=120): """Backfiller service.""" variants = variants.split(',') if variants else [] @@ -433,7 +436,7 @@ def main(streams, base_dir='.', variants='source', metrics_port=8002, workers = [] for stream in streams: logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) - manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency) + manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff) managers.append(manager) workers.append(gevent.spawn(manager.run))