fixed start not propagating to list_hours plus some refactorting

pull/43/head
Christopher Usher 6 years ago
parent 57bb74632f
commit a8cb1ff370

@ -165,7 +165,7 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, stream, variants, static_nodes=[], run_once=False): def __init__(self, base_dir, stream, variants, static_nodes=[], start, run_once=False):
"""Constructor for BackfillerManager. """Constructor for BackfillerManager.
Creates a manager for a given stream with specified variants. If Creates a manager for a given stream with specified variants. If
static_nodes is None, manager""" static_nodes is None, manager"""
@ -173,11 +173,11 @@ class BackfillerManager(object):
self.stream = stream self.stream = stream
self.variants = variants self.variants = variants
self.static_nodes = static_nodes self.static_nodes = static_nodes
self.start = start
self.run_once = run_once self.run_once = run_once
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.logger = logging.getLogger("BackfillerManager({})".format(stream))
self.workers = {} # {node url: worker} self.workers = {} # {node url: worker}
self.logger.info('Starting')
def stop(self): def stop(self):
"""Shut down all workers and stop backfilling.""" """Shut down all workers and stop backfilling."""
@ -188,7 +188,7 @@ class BackfillerManager(object):
def start_worker(self, node): def start_worker(self, node):
"""Start a new worker for given node.""" """Start a new worker for given node."""
worker = BackfillerWorker(self, self.base_dir, node, self.stream, self.variants, self) worker = BackfillerWorker(self, self.base_dir, node)
assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node) assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node)
self.workers[node] = worker self.workers[node] = worker
gevent.spawn(worker.run) gevent.spawn(worker.run)
@ -198,6 +198,7 @@ class BackfillerManager(object):
self.workers.pop(node).stop() self.workers.pop(node).stop()
def run(self): def run(self):
self.logger.info('Starting')
while not self.stopping.is_set(): while not self.stopping.is_set():
new_nodes = set(get_nodes() + self.static_nodes) new_nodes = set(get_nodes() + self.static_nodes)
exisiting_nodes = set(self.workers.keys()) exisiting_nodes = set(self.workers.keys())
@ -229,19 +230,19 @@ class BackfillerWorker(object):
WAIT_INTERVAL = 120 #seconds between backfills WAIT_INTERVAL = 120 #seconds between backfills
RETRY_INTERVAL = 5 #seconds between retrying a failed backfill RETRY_INTERVAL = 5 #seconds between retrying a failed backfill
def __init__(self, manager, base_dir, node, stream, variants, run_once=False): def __init__(self, manager, node):
self.manager = manager self.manager = manager
self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node)) self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node))
self.base_dir = base_dir self.base_dir = manager.base_dir
self.node = node self.node = node
self.stream = stream self.stream = manager.stream
self.variants = variants self.variants = manager.variants
self.run_once = run_once self.start = manager.start
self.run_once = manager.run_once
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.done = gevent.event.Event() self.done = gevent.event.Event()
self.failures = 0 self.failures = 0
self.logger.info('Starting')
def __repr__(self): def __repr__(self):
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream)
@ -299,12 +300,12 @@ class BackfillerWorker(object):
self.logger.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour)) self.logger.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour))
def run(self): def run(self):
self.logger.info('Worker starting') self.logger.info('Starting')
while not self.stopping.is_set(): while not self.stopping.is_set():
try: try:
self.backfill(list_hours(self.node, self.stream, self.variants)) self.backfill(list_hours(self.node, self.stream, self.variants, self.start))
self.failures = 0 #reset failure count on a successful backfill self.failures = 0 #reset failure count on a successful backfill
self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
@ -331,7 +332,7 @@ def main(base_dir='.', stream='', variants='', metrics_port=8002,
if start is not None: if start is not None:
start = dateutil.parser.parse(start) start = dateutil.parser.parse(start)
manager = BackfillerManager(base_dir, stream, variants, static_nodes, run_once) manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once)
gevent.signal(signal.SIGTERM, manager.stop) gevent.signal(signal.SIGTERM, manager.stop)
common.PromLogCountsHandler.install() common.PromLogCountsHandler.install()

Loading…
Cancel
Save