From 728adb7c1dada47fcb518498cf7118fd0d909ab2 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Tue, 4 Jun 2019 00:05:26 +0100 Subject: [PATCH] improvements suggested by ekim --- backfiller/backfiller/main.py | 84 +++++++++++++++++++++-------------- docker-compose.jsonnet | 4 +- 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 2bc672d..15ce770 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -27,7 +27,8 @@ segments_backfilled = prom.Counter( HOUR_FMT = '%Y-%m-%dT%H' -TIMEOUT = 5 #default timeout in seconds for remote requests +TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions +MAX_RETRIES = 4 #number of times to retry before stopping worker or manager def list_local_segments(base_dir, stream, variant, hour): @@ -122,7 +123,6 @@ def list_hours(node, stream, variants, start=None): hours = list(set().union(*hour_lists)) hours.sort(reverse=True) #latest hour first - if start is not None: if not isinstance(start, datetime.datetime): start = datetime.datetime.utcnow() - datetime.timedelta(hours=start) @@ -169,7 +169,7 @@ class BackfillerManager(object): def start_worker(self, node): """Start a new worker for given node.""" if self.stopping.is_set(): - logging.debug("Refusing to create new worker because we're stopping") + logging.debug('Refusing to create new workers because we are stopping') return worker = BackfillerWorker(self, node) assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node) @@ -181,9 +181,27 @@ class BackfillerManager(object): self.workers.pop(node).stop() def run(self): + """Stop and start workers based on results of get_nodes. + + Regularly call get_nodes. Nodes returned by get_nodes not currently + running are started and currently running nodes not returned by + get_nodes are stopped. If self.run_once, only call nodes once. Calling + stop will exit the loop.""" self.logger.info('Starting') + failures = 0 + while not self.stopping.is_set(): - new_nodes = set(self.get_nodes()) + try: + new_nodes = set(self.get_nodes()) + except Exception: + failures += 1 + if failures > MAX_RETRIES: + self.logger.exception('Maximum number of failures ({}) exceed.'.format(MAX_RETRIES)) + break + delay = common.jitter(TIMEOUT * 2**failures) + self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay)) + self.stopping.wait(delay) + continue exisiting_nodes = set(self.workers.keys()) to_start = new_nodes - exisiting_nodes for node in to_start: @@ -191,12 +209,13 @@ class BackfillerManager(object): to_stop = exisiting_nodes - new_nodes for node in to_stop: self.stop_worker(node) - + failures = 0 #reset failures on success if self.run_once: break self.stopping.wait(common.jitter(self.NODE_INTERVAL)) + #wait for all workers to finish for worker in self.workers.values(): worker.done.wait() @@ -207,27 +226,25 @@ class BackfillerManager(object): If only has a URL, infer name from the hostname of the URL""" - nodes = {urlparse.urlparse(node).hostname:node for nodes in self.static_nodes} + nodes = {urlparse.urlparse(node).hostname:node for node in self.static_nodes} if self.node_file is not None: self.logger.info('Fetching list of nodes from {}'.format(self.node_file)) - try: - with open(self.node_file) as f: - for line in f.readlines(): - substrs = line.split() - if not len(line) or substrs[0][0] == '#': - continue - elif len(substrs) == 1: - nodes[urlparse.urlparse(substr[0]).hostname] = substr[0] - else: - nodes[substrs[0]] = substrs[1] + with open(self.node_file) as f: + for line in f.readlines(): + substrs = line.split() + if not len(line) or substrs[0][0] == '#': + continue + elif len(substrs) == 1: + nodes[urlparse.urlparse(substrs[0]).hostname] = substrs[0] + else: + nodes[substrs[0]] = substrs[1] if self.node_database is not None: self.logger.info('Fetching list of nodes from {}'.format(self.node_database)) - # query the database - - nodes.pop(self.localhost, None) + # TODO query the database + nodes.pop(self.localhost, None) return nodes.values() class BackfillerWorker(object): @@ -237,8 +254,6 @@ class BackfillerWorker(object): base_dir/stream for all variants. If run_once, only backfill once.""" WAIT_INTERVAL = 120 #seconds between backfills - RETRY_INTERVAL = 5 #seconds between retrying a failed backfill - MAX_RETRIES = 4 #number of times to retry before stopping worker def __init__(self, manager, node): self.manager = manager @@ -299,7 +314,8 @@ class BackfillerWorker(object): self.logger.warning('File {} invaid: {}'.format(path, e)) continue - #to avoid getting in the downloader's way ignore segments less than recent_cutoff old + # to avoid getting in the downloader's way ignore segments + # less than recent_cutoff old if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): self.logger.debug('Skipping {} as too recent'.format(path)) continue @@ -322,9 +338,9 @@ class BackfillerWorker(object): except Exception: failures += 1 if failures > MAX_RETRIES: - self.logger.exception('Maximum number of failures ({}) exceed.'.format(MAX_RETRIES) + self.logger.exception('Maximum number of failures ({}) exceed.'.format(MAX_RETRIES)) break - delay = common.jitter(self.RETRY_INTERVAL * 2**self.failures) + delay = common.jitter(TIMEOUT * 2**failures) self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay)) self.stopping.wait(delay) @@ -337,16 +353,16 @@ class BackfillerWorker(object): del self.manager.workers[self.node] @argh.arg("streams", nargs="*") -@argh.arg('--base-dir', 'Directory to which segments will be backfilled. Default is current working directory.') -@argh.arg('--variants', "Variants of each stream to backfill. Comma seperated if multiple. Default is 'source'.") -@argh.arg('--metrics-port', 'Port for Prometheus stats. Default is 8002.') -@argh.arg('--static-nodes', 'Nodes to always backfill from. Comma seperated if multiple. By default empty.') -@argh.arg('--backdoor_port' 'Port for gevent.backdoor access. By default disabled.') -@argh.arg('--start', 'If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') -@argh.arg('--run_once', 'If True, backfill only once. By default False.') -@argh.arg('--node_file', "Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") -@argh.arg('--node_database', 'Address of database node to fetch a list of nodes from. If None (default) do not get nodes from database.') -@argh.arg('--localhost', 'Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()' +@argh.arg('--base-dir', help='Directory to which segments will be backfilled. Default is current working directory.') +@argh.arg('--variants', help="Variants of each stream to backfill. Comma seperated if multiple. Default is 'source'.") +@argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8002.') +@argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.') +@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') +@argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') +@argh.arg('--run-once', help='If True, backfill only once. By default False.') +@argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") +@argh.arg('--node-database', help='Address of database node to fetch a list of nodes from. If None (default) do not get nodes from database.') +@argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()') def main(streams, base_dir='.', variants='source', metrics_port=8002, static_nodes='', backdoor_port=0, start=None, run_once=False, node_file=None, node_database=None, localhost=socket.gethostname()): diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index c0cc582..d368661 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -80,8 +80,8 @@ image: "quay.io/ekimekim/wubloader-backfiller:%s" % $.image_tag, // Args for the backfiller: set channel and qualities command: [ - "--stream", $.channel, - "-v", std.join(",", $.qualities), + $.channel, + "--variants", std.join(",", $.qualities), "--static-nodes", std.join(",", $.peers), "--backdoor-port", std.toString($.backdoor_port), ],