|
|
@ -177,6 +177,7 @@ class BackfillerManager(object):
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
|
|
|
|
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
|
|
|
|
self.workers = {} # {node url: worker}
|
|
|
|
self.workers = {} # {node url: worker}
|
|
|
|
|
|
|
|
self.logger.info('Starting')
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
def stop(self):
|
|
|
|
"""Shut down all workers and stop backfilling."""
|
|
|
|
"""Shut down all workers and stop backfilling."""
|
|
|
@ -196,6 +197,13 @@ class BackfillerManager(object):
|
|
|
|
"""Stop the worker for given node."""
|
|
|
|
"""Stop the worker for given node."""
|
|
|
|
self.workers.pop(node).stop()
|
|
|
|
self.workers.pop(node).stop()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def restart_worker(self, node):
|
|
|
|
|
|
|
|
"""Restart the worker for given node."""
|
|
|
|
|
|
|
|
self.stop_worker(node)
|
|
|
|
|
|
|
|
self.stopping.wait(common.jitter(self.RESTART_INTERVAL))
|
|
|
|
|
|
|
|
self.start_worker(node)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
def run(self):
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
new_nodes = set(get_nodes() + self.static_nodes)
|
|
|
|
new_nodes = set(get_nodes() + self.static_nodes)
|
|
|
@ -226,6 +234,7 @@ class BackfillerWorker(object):
|
|
|
|
base_dir/stream for all variants. If run_once, only backfill once."""
|
|
|
|
base_dir/stream for all variants. If run_once, only backfill once."""
|
|
|
|
|
|
|
|
|
|
|
|
WAIT_INTERVAL = 120 #seconds between backfills
|
|
|
|
WAIT_INTERVAL = 120 #seconds between backfills
|
|
|
|
|
|
|
|
RETRY_INTERVAL = 5 #seconds between retrying a failed backfill
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, manager, base_dir, node, stream, variants, run_once=False):
|
|
|
|
def __init__(self, manager, base_dir, node, stream, variants, run_once=False):
|
|
|
|
self.manager = manager
|
|
|
|
self.manager = manager
|
|
|
@ -237,6 +246,9 @@ class BackfillerWorker(object):
|
|
|
|
self.run_once = run_once
|
|
|
|
self.run_once = run_once
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.done = gevent.event.Event()
|
|
|
|
self.done = gevent.event.Event()
|
|
|
|
|
|
|
|
self.failures = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.info('Starting')
|
|
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
def __repr__(self):
|
|
|
|
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream)
|
|
|
|
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream)
|
|
|
@ -254,10 +266,6 @@ class BackfillerWorker(object):
|
|
|
|
hour in hours.
|
|
|
|
hour in hours.
|
|
|
|
|
|
|
|
|
|
|
|
Keyword arguments:
|
|
|
|
Keyword arguments:
|
|
|
|
segment_order -- If 'random', randomise the order of segments (default).
|
|
|
|
|
|
|
|
If 'forward', sort the segment in ascending order. If 'reverse',
|
|
|
|
|
|
|
|
sort the segments in descending order. Otherwise, do not change the
|
|
|
|
|
|
|
|
order of segments.
|
|
|
|
|
|
|
|
recent_cutoff -- Skip backfilling segments younger than this number of
|
|
|
|
recent_cutoff -- Skip backfilling segments younger than this number of
|
|
|
|
seconds to prioritise letting the downloader grab these segments."""
|
|
|
|
seconds to prioritise letting the downloader grab these segments."""
|
|
|
|
|
|
|
|
|
|
|
@ -273,13 +281,9 @@ class BackfillerWorker(object):
|
|
|
|
remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour))
|
|
|
|
remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour))
|
|
|
|
missing_segments = list(remote_segments - local_segments)
|
|
|
|
missing_segments = list(remote_segments - local_segments)
|
|
|
|
|
|
|
|
|
|
|
|
# useful if running in parallel so multiple nodes don't request the same segment at the same time
|
|
|
|
# randomise the order of the segments to reduce the chance that
|
|
|
|
if segment_order == 'random':
|
|
|
|
# multiple workers request the same segment at the same time
|
|
|
|
random.shuffle(missing_segments)
|
|
|
|
random.shuffle(missing_segments)
|
|
|
|
elif segment_order == 'forward':
|
|
|
|
|
|
|
|
missing_segments.sort()
|
|
|
|
|
|
|
|
elif segment_order == 'reverse':
|
|
|
|
|
|
|
|
missing_segments.sort(reverse=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for missing_segment in missing_segments:
|
|
|
|
for missing_segment in missing_segments:
|
|
|
|
|
|
|
|
|
|
|
@ -307,8 +311,17 @@ class BackfillerWorker(object):
|
|
|
|
self.logger.info('Worker starting')
|
|
|
|
self.logger.info('Worker starting')
|
|
|
|
|
|
|
|
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
self.backfill(list_hours(self.node, self.stream, self.variants))
|
|
|
|
|
|
|
|
self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
|
|
|
|
try:
|
|
|
|
|
|
|
|
self.backfill(list_hours(self.node, self.stream, self.variants))
|
|
|
|
|
|
|
|
self.failures = 0 #reset failure count on a successful backfill
|
|
|
|
|
|
|
|
self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
|
|
self.failures += 1
|
|
|
|
|
|
|
|
delay = common.jitter(self.RETRY_INTERVAL * 2**self.failures)
|
|
|
|
|
|
|
|
self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay))
|
|
|
|
|
|
|
|
self.stopping.wait(delay)
|
|
|
|
|
|
|
|
|
|
|
|
if self.run_once:
|
|
|
|
if self.run_once:
|
|
|
|
break
|
|
|
|
break
|
|
|
|