backfiller: Expose recent_cutoff as CLI arg and increase it to 120s default

In testing, GDQ's stream delay went up over 1min, which caused backfillers to backfill
segments at the same time they were downloaded. We increase the window for now,
and also make it configurable.
pull/66/head
Mike Lang 5 years ago committed by Christopher Usher
parent 6fa9d9d388
commit f50276bd01

@ -144,7 +144,7 @@ class BackfillerManager(object):
def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, def __init__(self, base_dir, stream, variants, static_nodes=[], start=None,
run_once=False, node_file=None, node_database=None, localhost=None, run_once=False, node_file=None, node_database=None, localhost=None,
download_concurrency=5): download_concurrency=5, recent_cutoff=120):
"""Constructor for BackfillerManager. """Constructor for BackfillerManager.
Creates a manager for a given stream with specified variants. If Creates a manager for a given stream with specified variants. If
static_nodes is None, manager""" static_nodes is None, manager"""
@ -160,6 +160,7 @@ class BackfillerManager(object):
self.db_manager = database.DBManager(dsn=self.node_database) self.db_manager = database.DBManager(dsn=self.node_database)
self.localhost = localhost self.localhost = localhost
self.download_concurrency = download_concurrency self.download_concurrency = download_concurrency
self.recent_cutoff = recent_cutoff
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.logger = logging.getLogger("BackfillerManager({})".format(stream))
self.workers = {} # {node url: worker} self.workers = {} # {node url: worker}
@ -276,7 +277,11 @@ class BackfillerWorker(object):
"""Backfills segments from a node. """Backfills segments from a node.
Backfills every WAIT_INTERVAL all segments from node/stream to Backfills every WAIT_INTERVAL all segments from node/stream to
base_dir/stream for all variants. If run_once, only backfill once.""" base_dir/stream for all variants. If run_once, only backfill once.
recent_cutoff -- Skip backfilling segments younger than this number of
seconds to prioritise letting the downloader grab these segments.
"""
WAIT_INTERVAL = 120 #seconds between backfills WAIT_INTERVAL = 120 #seconds between backfills
@ -290,6 +295,7 @@ class BackfillerWorker(object):
self.variants = manager.variants self.variants = manager.variants
self.start = manager.start self.start = manager.start
self.run_once = manager.run_once self.run_once = manager.run_once
self.recent_cutoff = manager.recent_cutoff
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.done = gevent.event.Event() self.done = gevent.event.Event()
@ -302,16 +308,12 @@ class BackfillerWorker(object):
self.logger.info('Stopping') self.logger.info('Stopping')
self.stopping.set() self.stopping.set()
def backfill(self, hours, segment_order='random', recent_cutoff=60): def backfill(self, hours):
"""Backfill from remote node. """Backfill from remote node.
Backfill from node/stream/variants to base_dir/stream/variants for each Backfill from node/stream/variants to base_dir/stream/variants for each
hour in hours. hour in hours.
"""
Keyword arguments:
recent_cutoff -- Skip backfilling segments younger than this number of
seconds to prioritise letting the downloader grab these segments."""
for variant in self.variants: for variant in self.variants:
for hour in hours: for hour in hours:
@ -350,7 +352,7 @@ class BackfillerWorker(object):
# to avoid getting in the downloader's way ignore segments # to avoid getting in the downloader's way ignore segments
# less than recent_cutoff old # less than recent_cutoff old
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff):
self.logger.debug('Skipping {} as too recent'.format(path)) self.logger.debug('Skipping {} as too recent'.format(path))
continue continue
@ -406,10 +408,11 @@ class BackfillerWorker(object):
@argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.') @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.')
@argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()') @argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()')
@argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.')
@argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.')
def main(streams, base_dir='.', variants='source', metrics_port=8002, def main(streams, base_dir='.', variants='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, run_once=False, static_nodes='', backdoor_port=0, start=None, run_once=False,
node_file=None, node_database=None, localhost=socket.gethostname(), node_file=None, node_database=None, localhost=socket.gethostname(),
download_concurrency=5): download_concurrency=5, recent_cutoff=120):
"""Backfiller service.""" """Backfiller service."""
variants = variants.split(',') if variants else [] variants = variants.split(',') if variants else []
@ -433,7 +436,7 @@ def main(streams, base_dir='.', variants='source', metrics_port=8002,
workers = [] workers = []
for stream in streams: for stream in streams:
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency) manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff)
managers.append(manager) managers.append(manager)
workers.append(gevent.spawn(manager.run)) workers.append(gevent.spawn(manager.run))

Loading…
Cancel
Save