diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index a2e229a..398bed8 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -1,5 +1,4 @@ """Download segments from other nodes to catch stuff this node missed.""" -# TODO more logging import datetime import errno @@ -7,8 +6,10 @@ import logging import os import random import signal +import socket import uuid +import argh import dateutil.parser import gevent.backdoor import prometheus_client as prom @@ -27,15 +28,6 @@ segments_backfilled = prom.Counter( HOUR_FMT = '%Y-%m-%dT%H' TIMEOUT = 5 #default timeout in seconds for remote requests -def encode_strings(o): - if isinstance(o, list): - return [encode_strings(x) for x in o] - if isinstance(o, dict): - return {k.encode('utf-8'): encode_strings(v) for k, v in o.items()} - if isinstance(o, unicode): - return o.encode('utf-8') - return o - def list_local_segments(base_dir, stream, variant, hour): """List segments in a given hour directory. @@ -175,6 +167,9 @@ class BackfillerManager(object): def start_worker(self, node): """Start a new worker for given node.""" + if self.stopping.is_set(): + logging.debug("Refusing to create new worker because we're stopping") + return worker = BackfillerWorker(self, node) assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node) self.workers[node] = worker @@ -201,17 +196,13 @@ class BackfillerManager(object): self.stopping.wait(common.jitter(self.NODE_INTERVAL)) - else: - self.stop() - for worker in self.workers.values(): worker.done.wait() def get_nodes(self): """List address of other wubloaders. - This returns a list of the other wubloaders as strings of the form - 'protocol://host:port/'""" + This returns a list of the other wubloaders as URI strings""" nodes = self.static_nodes + [] if self.node_file is not None: @@ -242,7 +233,8 @@ class BackfillerWorker(object): base_dir/stream for all variants. If run_once, only backfill once.""" WAIT_INTERVAL = 120 #seconds between backfills - RETRY_INTERVAL = 5 #seconds between retrying a failed backfill + RETRY_INTERVAL = 5 #seconds between retrying a failed backfill + MAX_RETRIES = 4 #number of times to retry before stopping worker def __init__(self, manager, node): self.manager = manager @@ -255,8 +247,6 @@ class BackfillerWorker(object): self.run_once = manager.run_once self.stopping = gevent.event.Event() self.done = gevent.event.Event() - self.failures = 0 - def __repr__(self): return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) @@ -315,17 +305,21 @@ class BackfillerWorker(object): def run(self): self.logger.info('Starting') + failures = 0 while not self.stopping.is_set(): try: self.backfill(list_hours(self.node, self.stream, self.variants, self.start)) - self.failures = 0 #reset failure count on a successful backfill + failures = 0 #reset failure count on a successful backfill if not self.run_once: self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) except Exception: - self.failures += 1 + failures += 1 + if failures > MAX_RETRIES: + self.logger.exception('Maximum number of failures ({}) exceed.'.format(MAX_RETRIES) + break delay = common.jitter(self.RETRY_INTERVAL * 2**self.failures) self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay)) self.stopping.wait(delay) @@ -338,37 +332,22 @@ class BackfillerWorker(object): if self.node in self.manager.workers: del self.manager.workers[self.node] - -def main(base_dir='.', streams='', variants='', metrics_port=8002, +@argh.arg("streams", nargs="*") +@argh.arg('--base-dir', 'Directory to which segments will be backfilled. Default is current working directory.') +@argh.arg('--variants', "Variants of each stream to backfill. Comma seperated if multiple. Default is 'source'.") +@argh.arg('--metrics-port', 'Port for Prometheus stats. Default is 8002.') +@argh.arg('--static-nodes', 'Nodes to always backfill from. Comma seperated if multiple. By default empty.') +@argh.arg('--backdoor_port' 'Port for gevent.backdoor access. By default disabled.') +@argh.arg('--start', 'If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') +@argh.arg('--run_once', 'If True, backfill only once. By default False.') +@argh.arg('--node_file', "Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") +@argh.arg('--node_database', 'Address of database node to fetch a list of nodes from. If None (default) do not get nodes from database.') +@argh.arg('--localhost', 'Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()' +def main(streams, base_dir='.', variants='source', metrics_port=8002, static_nodes='', backdoor_port=0, start=None, run_once=False, - node_file=None, node_database=None, localhost=None): - """Backfiller service. + node_file=None, node_database=None, localhost=socket.gethostname()): + """Backfiller service.""" - Keyword arguments: - base_dir -- Directory to which segments will be backfilled. Default current - working directory. - streams -- Streams to backfill. Comma seperated if multiple. - variants -- Variants of each stream to backfill. Comma seperated if - multiple. - metrics_port -- Port for Prometheus stats. Default 8002. - static_nodes -- Nodes to always backfill from. Comma seperated if multiple. - By default empty. - backdoor_port -- Port for gevent.backdoor access. By default disabled. - start -- If a datetime only backfill hours after that datetime. If a number, - bacfill hours more recent than that number of hours ago. If None - (default), all hours are backfilled. - run_once -- If True, backfill only once. By default False. - node_file -- Name of file listing nodes to backfill from. One node per line - with whitespace only lines or lines starting with '#' ignored. If None - (default) do not get nodes from a file. - node_database -- Address of database node to fetch a list of nodes from. If - None (default) do not get nodes from database. - localhost -- Address of local machine. Used to prevent backfilling from - itself. Only works if this address matches the address for this node in - static_nodes or the node_file or the node_database.""" - - streams = streams.split(',') if streams else [] - streams = [stream.strip() for stream in streams] # get rid of any whitespace variants = variants.split(',') if variants else [] variants = [variant.strip() for variant in variants] static_nodes = static_nodes.split(',') if static_nodes else []