backfiller: Allow backfilling of non-segment directories

These are referred to as "extras" and all files in all subdirectories are backfilled
if not present.
pull/315/merge
Mike Lang 1 year ago committed by Mike Lang
parent 4fb039d7d9
commit 9f523c65cd

@ -203,6 +203,33 @@ def list_hours(node, channel, quality, start=None):
return hours
def list_local_extras(base_dir, dir):
root = os.path.join(base_dir, dir)
for path, subdirs, files in os.walk(root):
relpath = os.path.relpath(path, root)
for file in files:
yield os.path.normpath(os.path.join(relpath, file))
def list_remote_extras(node, dir, timeout=TIMEOUT):
uri = f'{node}/extras/{dir}'
resp = requests.get(uri, timeout=timeout, metric_name='list_remote_extras')
resp.raise_for_status()
return resp.json()
def get_remote_extra(base_dir, node, path, timeout=TIMEOUT):
local_path = os.path.join(base_dir, path)
if os.path.exists(local_path):
return
uri = f'{node}/segments/{path}'
resp = requests.get(uri, timeout=timeout, metric_name='get_remote_extra')
resp.raise_for_status()
common.atomic_write(local_path, resp.content)
class BackfillerManager(object):
"""Manages BackfillerWorkers to backfill from a pool of nodes.
@ -215,7 +242,7 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, channels, qualities, static_nodes=[],
def __init__(self, base_dir, channels, qualities, extras=[], static_nodes=[],
start=None, delete_old=False, run_once=False, node_file=None,
node_database=None, localhost=None, download_concurrency=5,
recent_cutoff=120):
@ -225,6 +252,7 @@ class BackfillerManager(object):
self.base_dir = base_dir
self.channels = channels
self.qualities = qualities
self.extras = extras
self.static_nodes = static_nodes
self.start = start
self.delete_old = delete_old
@ -428,6 +456,7 @@ class BackfillerWorker(object):
self.download_concurrency = manager.download_concurrency
self.channels = manager.channels
self.qualities = manager.qualities
self.extras = manager.extras
self.start = manager.start
self.run_once = manager.run_once
self.recent_cutoff = manager.recent_cutoff
@ -443,7 +472,7 @@ class BackfillerWorker(object):
self.logger.info('Stopping')
self.stopping.set()
def backfill(self):
def backfill_segments(self):
"""Backfill from remote node.
Backfill from node/channel/qualities to base_dir/channel/qualities for
@ -529,6 +558,36 @@ class BackfillerWorker(object):
hours_backfilled.labels(remote=self.node, channel=channel, quality=quality).inc()
def backfill_extras(self):
for dir in self.extras:
self.logger.info(f'Backfilling extra directory {dir}')
local_files = set(list_local_extras(self.base_dir, dir))
remote_files = set(list_remote_extras(self.node, dir))
missing_files = list(remote_files - local_files)
# randomise the order of the files to reduce the chance that
# multiple workers request the same file at the same time
random.shuffle(missing_files)
pool = gevent.pool.Pool(self.download_concurrency)
workers = []
for file in missing_files:
if self.stopping.is_set():
return
path = os.path.join(dir, file)
self.logger.info(f"Backfilling {path}")
workers.append(pool.spawn(
get_remote_extra(self.base_dir, self.node, path)
))
for worker in workers:
worker.get()
self.logger.info("Backfilled {} extras".format(len(missing_files)))
def run(self):
self.logger.info('Starting')
failures = 0
@ -536,7 +595,8 @@ class BackfillerWorker(object):
while not self.stopping.is_set():
try:
self.logger.info('Starting backfill')
self.backfill()
self.backfill_segments()
self.backfill_extras()
self.logger.info('Backfill complete')
failures = 0 #reset failure count on a successful backfill
if not self.run_once:
@ -558,11 +618,17 @@ class BackfillerWorker(object):
if self.node in self.manager.workers:
del self.manager.workers[self.node]
def comma_seperated(value):
return [part.strip() for part in value.split(",")] if value else []
@argh.arg('channels', nargs='*', help='Channels to backfill from')
@argh.arg('--base-dir', help='Directory to which segments will be backfilled. Default is current working directory.')
@argh.arg('--qualities', help="Qualities of each channel to backfill. Comma seperated if multiple. Default is 'source'.")
@argh.arg('--qualities', help="Qualities of each channel to backfill. Comma seperated if multiple. Default is 'source'.", type=comma_seperated)
@argh.arg('--extras', help="Extra non-segment directories to backfill. Comma seperated if multiple.", type=comma_seperated)
@argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8002.')
@argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.')
@argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.', type=comma_seperated)
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
@argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.')
@argh.arg('--delete-old', help='If True, delete hours older than start. By default False.')
@ -572,17 +638,12 @@ class BackfillerWorker(object):
@argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()')
@argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.')
@argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.')
def main(channels, base_dir='.', qualities='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, delete_old=False,
def main(channels, base_dir='.', qualities=['source'], extras=[], metrics_port=8002,
static_nodes=[], backdoor_port=0, start=None, delete_old=False,
run_once=False, node_file=None, node_database=None,
localhost=socket.gethostname(), download_concurrency=5, recent_cutoff=120):
"""Backfiller service."""
qualities = qualities.split(',') if qualities else []
qualities = [quality.strip() for quality in qualities]
static_nodes = static_nodes.split(',') if static_nodes else []
static_nodes = [static_node.strip() for static_node in static_nodes]
if start is not None:
try:
start = float(start)
@ -596,7 +657,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
prom.start_http_server(metrics_port)
logging.info('Starting backfilling {} with {} as qualities to {}'.format(', '.join(channels), ', '.join(qualities), base_dir))
manager = BackfillerManager(base_dir, channels, qualities, static_nodes,
manager = BackfillerManager(base_dir, channels, qualities, extras, static_nodes,
start, delete_old, run_once, node_file, node_database,
localhost, download_concurrency, recent_cutoff)

Loading…
Cancel
Save