added workers and a worker manager

pull/43/head
Christopher Usher 6 years ago
parent be8d40d1ba
commit 7d9a5b4626

@ -187,7 +187,6 @@ def list_hours(node, stream, variants, order='forward', start=None):
return hours return hours
def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60): def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60):
"""Backfill from remote node. """Backfill from remote node.
@ -243,6 +242,138 @@ def backfill(base_dir, node, stream, variants, hours, segment_order='random', re
logging.info('Finished backfilling from {}'.format(node)) logging.info('Finished backfilling from {}'.format(node))
class BackfillerManager(object):
"""Manages BackfillerWorkers to backfill from a pool of nodes.
The manager regularly calls get_nodes to an up to date list of nodes. If no
worker exists for a node, the manager starts one. If a worker corresponds to
a node not in the list, the manager stops it."""
NODE_INTERVAL = 5 #minutes between updating list of nodes
def __init__(self, base_dir, stream, variants, input_nodes=None):
"""Constructor for BackfillerManager."""
self.base_dir = base_dir
self.stream = stream
self.variants = variants
self.stopping = gevent.event.Event()
self.workers = {} # {node url: worker}
def stop(self):
"""Shut down all workers and stop backfilling."""
self.logger.info('Stopping')
self.stopping.set()
def start_worker(self, node):
"""Start a new worker for given node."""
worker = BackfillerWorker(self, self.base_dir, node, self.stream, self.variants)
if node in self.workers:
self.workers[node].stop() #only one worker per node
self.workers[node] = worker
gevent.spawn(worker.run)
def stop_worker(self, node):
"""Stop the worker for given node."""
self.workers[node].stop()
del self.workers[node]
def run(self):
while not self.stopping.is_set():
new_nodes = set(get_nodes())
exisiting_nodes = set(self.workers.keys())
to_start = new_nodes - exisiting_nodes
for node in to_start:
self.start_worker(node)
to_stop = exisiting_nodes - new_nodes
for node in to_stop:
self.stop_worker(node)
self.stopping.wait(common.jitter(self.NODE_INTERVAL * 60))
for worker in self.workers:
worker.stop()
for worker in self.workers:
worker.done.wait()
class BackfillerWorker(object):
"""Backfills segments from a node.
Backfills all segments from node/stream to base_dir/stream for all variants.
Every SMALL_INTERVAL minutes backfill the last three hours starting from the
most recent one (a 'small backfill'). When not doing a small backfill,
backfill all segments starting with the most recent one (a 'large backfill')
unless a large backfill has occured less than LARGE_INTERVAL ago."""
SMALL_INTERVAL = 5 #minutes between small backfills
LARGE_INTERVAL = 60 #minutes between large backfills
WAIT_INTERVAL = 1 #seconds between backfill actions
def __init__(self, manager, base_dir, node, stream, variants):
"""Constructor for BackfillerWorker"""
self.manager = manager
self.logger = manager.logger.getChild('BackfillerWorker({}/{})@{:x}'.format(node, stream, id(self)))
self.base_dir = base_dir
self.node = node
self.stream = stream
self.variants = variants
self.stopping = gevent.event.Event()
self.done = gevent.event.Event()
def __repr__(self):
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream)
__str__ = __repr__
def stop(self):
"""Tell the worker to shut down"""
self.stopping.set()
def trigger_new_worker(self):
self.manager.trigger_new_worker(self)
def run(self):
self.logger.info('Worker starting')
try:
self._run()
except Exception:
self.logger.exception('Worker failed')
else:
self.logger.info('Worker stopped')
finally:
self.done.set()
del self.manager.workers[self.node]
def _run(self):
last_small_backfill = datetime.datetime.now() + datetime.timedetla(-1)
last_large_backfill = datetime.datetime.now() + datetime.timedetla(-1)
large_hours = []
while not self.stopping.is_set():
now = datetime.datetime.now()
if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL):
backfill(self.base_dir, self.node, self.stream, self.variants, last_hours())
last_small_backfill = now
elif now - last_large_backfill > datetime.timedelta(minutes=self.LARGE_INTERVAL) or len(large_hours):
if not len(large_hours):
large_hours = list_hours(self.node, self.stream, self.variants)
last_large_backfill = now
this_hour = large_hours[-1:]
large_hours = large_hours[:-1]
backfill(self.base_dir, self.node, self.stream, self.variants, this_hour)
else:
self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0, start=None): def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0, start=None):
"""Prototype backfiller service. """Prototype backfiller service.

Loading…
Cancel
Save