started in on ekim's comments

pull/43/head
Chris Usher 6 years ago committed by Christopher Usher
parent 2857d3fb9f
commit 332e03de80

@ -1,5 +1,4 @@
"""Download segments from other nodes to catch stuff this node missed.""" """Download segments from other nodes to catch stuff this node missed."""
# TODO more logging
import datetime import datetime
import errno import errno
@ -7,8 +6,10 @@ import logging
import os import os
import random import random
import signal import signal
import socket
import uuid import uuid
import argh
import dateutil.parser import dateutil.parser
import gevent.backdoor import gevent.backdoor
import prometheus_client as prom import prometheus_client as prom
@ -27,15 +28,6 @@ segments_backfilled = prom.Counter(
HOUR_FMT = '%Y-%m-%dT%H' HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5 #default timeout in seconds for remote requests TIMEOUT = 5 #default timeout in seconds for remote requests
def encode_strings(o):
if isinstance(o, list):
return [encode_strings(x) for x in o]
if isinstance(o, dict):
return {k.encode('utf-8'): encode_strings(v) for k, v in o.items()}
if isinstance(o, unicode):
return o.encode('utf-8')
return o
def list_local_segments(base_dir, stream, variant, hour): def list_local_segments(base_dir, stream, variant, hour):
"""List segments in a given hour directory. """List segments in a given hour directory.
@ -175,6 +167,9 @@ class BackfillerManager(object):
def start_worker(self, node): def start_worker(self, node):
"""Start a new worker for given node.""" """Start a new worker for given node."""
if self.stopping.is_set():
logging.debug("Refusing to create new worker because we're stopping")
return
worker = BackfillerWorker(self, node) worker = BackfillerWorker(self, node)
assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node) assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node)
self.workers[node] = worker self.workers[node] = worker
@ -201,17 +196,13 @@ class BackfillerManager(object):
self.stopping.wait(common.jitter(self.NODE_INTERVAL)) self.stopping.wait(common.jitter(self.NODE_INTERVAL))
else:
self.stop()
for worker in self.workers.values(): for worker in self.workers.values():
worker.done.wait() worker.done.wait()
def get_nodes(self): def get_nodes(self):
"""List address of other wubloaders. """List address of other wubloaders.
This returns a list of the other wubloaders as strings of the form This returns a list of the other wubloaders as URI strings"""
'protocol://host:port/'"""
nodes = self.static_nodes + [] nodes = self.static_nodes + []
if self.node_file is not None: if self.node_file is not None:
@ -242,7 +233,8 @@ class BackfillerWorker(object):
base_dir/stream for all variants. If run_once, only backfill once.""" base_dir/stream for all variants. If run_once, only backfill once."""
WAIT_INTERVAL = 120 #seconds between backfills WAIT_INTERVAL = 120 #seconds between backfills
RETRY_INTERVAL = 5 #seconds between retrying a failed backfill RETRY_INTERVAL = 5 #seconds between retrying a failed backfill
MAX_RETRIES = 4 #number of times to retry before stopping worker
def __init__(self, manager, node): def __init__(self, manager, node):
self.manager = manager self.manager = manager
@ -255,8 +247,6 @@ class BackfillerWorker(object):
self.run_once = manager.run_once self.run_once = manager.run_once
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.done = gevent.event.Event() self.done = gevent.event.Event()
self.failures = 0
def __repr__(self): def __repr__(self):
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream)
@ -315,17 +305,21 @@ class BackfillerWorker(object):
def run(self): def run(self):
self.logger.info('Starting') self.logger.info('Starting')
failures = 0
while not self.stopping.is_set(): while not self.stopping.is_set():
try: try:
self.backfill(list_hours(self.node, self.stream, self.variants, self.start)) self.backfill(list_hours(self.node, self.stream, self.variants, self.start))
self.failures = 0 #reset failure count on a successful backfill failures = 0 #reset failure count on a successful backfill
if not self.run_once: if not self.run_once:
self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
except Exception: except Exception:
self.failures += 1 failures += 1
if failures > MAX_RETRIES:
self.logger.exception('Maximum number of failures ({}) exceed.'.format(MAX_RETRIES)
break
delay = common.jitter(self.RETRY_INTERVAL * 2**self.failures) delay = common.jitter(self.RETRY_INTERVAL * 2**self.failures)
self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay)) self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay))
self.stopping.wait(delay) self.stopping.wait(delay)
@ -338,37 +332,22 @@ class BackfillerWorker(object):
if self.node in self.manager.workers: if self.node in self.manager.workers:
del self.manager.workers[self.node] del self.manager.workers[self.node]
@argh.arg("streams", nargs="*")
def main(base_dir='.', streams='', variants='', metrics_port=8002, @argh.arg('--base-dir', 'Directory to which segments will be backfilled. Default is current working directory.')
@argh.arg('--variants', "Variants of each stream to backfill. Comma seperated if multiple. Default is 'source'.")
@argh.arg('--metrics-port', 'Port for Prometheus stats. Default is 8002.')
@argh.arg('--static-nodes', 'Nodes to always backfill from. Comma seperated if multiple. By default empty.')
@argh.arg('--backdoor_port' 'Port for gevent.backdoor access. By default disabled.')
@argh.arg('--start', 'If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.')
@argh.arg('--run_once', 'If True, backfill only once. By default False.')
@argh.arg('--node_file', "Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.")
@argh.arg('--node_database', 'Address of database node to fetch a list of nodes from. If None (default) do not get nodes from database.')
@argh.arg('--localhost', 'Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()'
def main(streams, base_dir='.', variants='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, run_once=False, static_nodes='', backdoor_port=0, start=None, run_once=False,
node_file=None, node_database=None, localhost=None): node_file=None, node_database=None, localhost=socket.gethostname()):
"""Backfiller service. """Backfiller service."""
Keyword arguments:
base_dir -- Directory to which segments will be backfilled. Default current
working directory.
streams -- Streams to backfill. Comma seperated if multiple.
variants -- Variants of each stream to backfill. Comma seperated if
multiple.
metrics_port -- Port for Prometheus stats. Default 8002.
static_nodes -- Nodes to always backfill from. Comma seperated if multiple.
By default empty.
backdoor_port -- Port for gevent.backdoor access. By default disabled.
start -- If a datetime only backfill hours after that datetime. If a number,
bacfill hours more recent than that number of hours ago. If None
(default), all hours are backfilled.
run_once -- If True, backfill only once. By default False.
node_file -- Name of file listing nodes to backfill from. One node per line
with whitespace only lines or lines starting with '#' ignored. If None
(default) do not get nodes from a file.
node_database -- Address of database node to fetch a list of nodes from. If
None (default) do not get nodes from database.
localhost -- Address of local machine. Used to prevent backfilling from
itself. Only works if this address matches the address for this node in
static_nodes or the node_file or the node_database."""
streams = streams.split(',') if streams else []
streams = [stream.strip() for stream in streams] # get rid of any whitespace
variants = variants.split(',') if variants else [] variants = variants.split(',') if variants else []
variants = [variant.strip() for variant in variants] variants = [variant.strip() for variant in variants]
static_nodes = static_nodes.split(',') if static_nodes else [] static_nodes = static_nodes.split(',') if static_nodes else []

Loading…
Cancel
Save