improvements suggested by ekim

pull/43/head
Christopher Usher 6 years ago
parent 530b9f7d5e
commit 728adb7c1d

@ -27,7 +27,8 @@ segments_backfilled = prom.Counter(
HOUR_FMT = '%Y-%m-%dT%H' HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5 #default timeout in seconds for remote requests TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions
MAX_RETRIES = 4 #number of times to retry before stopping worker or manager
def list_local_segments(base_dir, stream, variant, hour): def list_local_segments(base_dir, stream, variant, hour):
@ -122,7 +123,6 @@ def list_hours(node, stream, variants, start=None):
hours = list(set().union(*hour_lists)) hours = list(set().union(*hour_lists))
hours.sort(reverse=True) #latest hour first hours.sort(reverse=True) #latest hour first
if start is not None: if start is not None:
if not isinstance(start, datetime.datetime): if not isinstance(start, datetime.datetime):
start = datetime.datetime.utcnow() - datetime.timedelta(hours=start) start = datetime.datetime.utcnow() - datetime.timedelta(hours=start)
@ -169,7 +169,7 @@ class BackfillerManager(object):
def start_worker(self, node): def start_worker(self, node):
"""Start a new worker for given node.""" """Start a new worker for given node."""
if self.stopping.is_set(): if self.stopping.is_set():
logging.debug("Refusing to create new worker because we're stopping") logging.debug('Refusing to create new workers because we are stopping')
return return
worker = BackfillerWorker(self, node) worker = BackfillerWorker(self, node)
assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node) assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node)
@ -181,9 +181,27 @@ class BackfillerManager(object):
self.workers.pop(node).stop() self.workers.pop(node).stop()
def run(self): def run(self):
"""Stop and start workers based on results of get_nodes.
Regularly call get_nodes. Nodes returned by get_nodes not currently
running are started and currently running nodes not returned by
get_nodes are stopped. If self.run_once, only call nodes once. Calling
stop will exit the loop."""
self.logger.info('Starting') self.logger.info('Starting')
failures = 0
while not self.stopping.is_set(): while not self.stopping.is_set():
try:
new_nodes = set(self.get_nodes()) new_nodes = set(self.get_nodes())
except Exception:
failures += 1
if failures > MAX_RETRIES:
self.logger.exception('Maximum number of failures ({}) exceed.'.format(MAX_RETRIES))
break
delay = common.jitter(TIMEOUT * 2**failures)
self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay))
self.stopping.wait(delay)
continue
exisiting_nodes = set(self.workers.keys()) exisiting_nodes = set(self.workers.keys())
to_start = new_nodes - exisiting_nodes to_start = new_nodes - exisiting_nodes
for node in to_start: for node in to_start:
@ -191,12 +209,13 @@ class BackfillerManager(object):
to_stop = exisiting_nodes - new_nodes to_stop = exisiting_nodes - new_nodes
for node in to_stop: for node in to_stop:
self.stop_worker(node) self.stop_worker(node)
failures = 0 #reset failures on success
if self.run_once: if self.run_once:
break break
self.stopping.wait(common.jitter(self.NODE_INTERVAL)) self.stopping.wait(common.jitter(self.NODE_INTERVAL))
#wait for all workers to finish
for worker in self.workers.values(): for worker in self.workers.values():
worker.done.wait() worker.done.wait()
@ -207,27 +226,25 @@ class BackfillerManager(object):
If only has a URL, infer name from the hostname of the URL""" If only has a URL, infer name from the hostname of the URL"""
nodes = {urlparse.urlparse(node).hostname:node for nodes in self.static_nodes} nodes = {urlparse.urlparse(node).hostname:node for node in self.static_nodes}
if self.node_file is not None: if self.node_file is not None:
self.logger.info('Fetching list of nodes from {}'.format(self.node_file)) self.logger.info('Fetching list of nodes from {}'.format(self.node_file))
try:
with open(self.node_file) as f: with open(self.node_file) as f:
for line in f.readlines(): for line in f.readlines():
substrs = line.split() substrs = line.split()
if not len(line) or substrs[0][0] == '#': if not len(line) or substrs[0][0] == '#':
continue continue
elif len(substrs) == 1: elif len(substrs) == 1:
nodes[urlparse.urlparse(substr[0]).hostname] = substr[0] nodes[urlparse.urlparse(substrs[0]).hostname] = substrs[0]
else: else:
nodes[substrs[0]] = substrs[1] nodes[substrs[0]] = substrs[1]
if self.node_database is not None: if self.node_database is not None:
self.logger.info('Fetching list of nodes from {}'.format(self.node_database)) self.logger.info('Fetching list of nodes from {}'.format(self.node_database))
# query the database # TODO query the database
nodes.pop(self.localhost, None) nodes.pop(self.localhost, None)
return nodes.values() return nodes.values()
class BackfillerWorker(object): class BackfillerWorker(object):
@ -237,8 +254,6 @@ class BackfillerWorker(object):
base_dir/stream for all variants. If run_once, only backfill once.""" base_dir/stream for all variants. If run_once, only backfill once."""
WAIT_INTERVAL = 120 #seconds between backfills WAIT_INTERVAL = 120 #seconds between backfills
RETRY_INTERVAL = 5 #seconds between retrying a failed backfill
MAX_RETRIES = 4 #number of times to retry before stopping worker
def __init__(self, manager, node): def __init__(self, manager, node):
self.manager = manager self.manager = manager
@ -299,7 +314,8 @@ class BackfillerWorker(object):
self.logger.warning('File {} invaid: {}'.format(path, e)) self.logger.warning('File {} invaid: {}'.format(path, e))
continue continue
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old # to avoid getting in the downloader's way ignore segments
# less than recent_cutoff old
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff):
self.logger.debug('Skipping {} as too recent'.format(path)) self.logger.debug('Skipping {} as too recent'.format(path))
continue continue
@ -322,9 +338,9 @@ class BackfillerWorker(object):
except Exception: except Exception:
failures += 1 failures += 1
if failures > MAX_RETRIES: if failures > MAX_RETRIES:
self.logger.exception('Maximum number of failures ({}) exceed.'.format(MAX_RETRIES) self.logger.exception('Maximum number of failures ({}) exceed.'.format(MAX_RETRIES))
break break
delay = common.jitter(self.RETRY_INTERVAL * 2**self.failures) delay = common.jitter(TIMEOUT * 2**failures)
self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay)) self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay))
self.stopping.wait(delay) self.stopping.wait(delay)
@ -337,16 +353,16 @@ class BackfillerWorker(object):
del self.manager.workers[self.node] del self.manager.workers[self.node]
@argh.arg("streams", nargs="*") @argh.arg("streams", nargs="*")
@argh.arg('--base-dir', 'Directory to which segments will be backfilled. Default is current working directory.') @argh.arg('--base-dir', help='Directory to which segments will be backfilled. Default is current working directory.')
@argh.arg('--variants', "Variants of each stream to backfill. Comma seperated if multiple. Default is 'source'.") @argh.arg('--variants', help="Variants of each stream to backfill. Comma seperated if multiple. Default is 'source'.")
@argh.arg('--metrics-port', 'Port for Prometheus stats. Default is 8002.') @argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8002.')
@argh.arg('--static-nodes', 'Nodes to always backfill from. Comma seperated if multiple. By default empty.') @argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.')
@argh.arg('--backdoor_port' 'Port for gevent.backdoor access. By default disabled.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
@argh.arg('--start', 'If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') @argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.')
@argh.arg('--run_once', 'If True, backfill only once. By default False.') @argh.arg('--run-once', help='If True, backfill only once. By default False.')
@argh.arg('--node_file', "Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") @argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.")
@argh.arg('--node_database', 'Address of database node to fetch a list of nodes from. If None (default) do not get nodes from database.') @argh.arg('--node-database', help='Address of database node to fetch a list of nodes from. If None (default) do not get nodes from database.')
@argh.arg('--localhost', 'Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()' @argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()')
def main(streams, base_dir='.', variants='source', metrics_port=8002, def main(streams, base_dir='.', variants='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, run_once=False, static_nodes='', backdoor_port=0, start=None, run_once=False,
node_file=None, node_database=None, localhost=socket.gethostname()): node_file=None, node_database=None, localhost=socket.gethostname()):

@ -80,8 +80,8 @@
image: "quay.io/ekimekim/wubloader-backfiller:%s" % $.image_tag, image: "quay.io/ekimekim/wubloader-backfiller:%s" % $.image_tag,
// Args for the backfiller: set channel and qualities // Args for the backfiller: set channel and qualities
command: [ command: [
"--stream", $.channel, $.channel,
"-v", std.join(",", $.qualities), "--variants", std.join(",", $.qualities),
"--static-nodes", std.join(",", $.peers), "--static-nodes", std.join(",", $.peers),
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
], ],

Loading…
Cancel
Save