Bug fixes and logging improvements to the backfiller

pull/43/head
Christopher Usher 6 years ago
parent c9f6ee95c5
commit 1f53fa8d29

@ -182,7 +182,7 @@ def backfill(base_dir, node, stream, variants, hours, segment_order='random', re
recent_cutoff -- Skip backfilling segments younger than this number of recent_cutoff -- Skip backfilling segments younger than this number of
seconds to prioritise letting the downloader grab these segments.""" seconds to prioritise letting the downloader grab these segments."""
logging.info('Starting backfilling from {}'.format(node)) #logging.info('Starting backfilling from {}'.format(node))
for variant in variants: for variant in variants:
@ -224,7 +224,7 @@ def backfill(base_dir, node, stream, variants, hours, segment_order='random', re
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment) get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
logging.info('{} segments in {}/{}/{} backfilled'.format(len(missing_segments), stream, variant, hour)) logging.info('{} segments in {}/{}/{} backfilled'.format(len(missing_segments), stream, variant, hour))
logging.info('Finished backfilling from {}'.format(node)) #logging.info('Finished backfilling from {}'.format(node))
class BackfillerManager(object): class BackfillerManager(object):
"""Manages BackfillerWorkers to backfill from a pool of nodes. """Manages BackfillerWorkers to backfill from a pool of nodes.
@ -242,6 +242,7 @@ class BackfillerManager(object):
self.variants = variants self.variants = variants
self.nodes = nodes self.nodes = nodes
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.logger = self.logger = logging.getLogger("BackfillerManager({})".format(stream))
self.workers = {} # {node url: worker} self.workers = {} # {node url: worker}
@ -271,7 +272,7 @@ class BackfillerManager(object):
if self.nodes is None: if self.nodes is None:
new_nodes = set(get_nodes()) new_nodes = set(get_nodes())
else: else:
new_nodes = self.nodes new_nodes = set(self.nodes)
exisiting_nodes = set(self.workers.keys()) exisiting_nodes = set(self.workers.keys())
to_start = new_nodes - exisiting_nodes to_start = new_nodes - exisiting_nodes
@ -336,8 +337,8 @@ class BackfillerWorker(object):
del self.manager.workers[self.node] del self.manager.workers[self.node]
def _run(self): def _run(self):
last_small_backfill = datetime.datetime.now() + datetime.timedetla(-1) last_small_backfill = datetime.datetime.now() + datetime.timedelta(-1)
last_large_backfill = datetime.datetime.now() + datetime.timedetla(-1) last_large_backfill = datetime.datetime.now() + datetime.timedelta(-1)
large_hours = [] large_hours = []
while not self.stopping.is_set(): while not self.stopping.is_set():
@ -345,7 +346,9 @@ class BackfillerWorker(object):
now = datetime.datetime.now() now = datetime.datetime.now()
if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL): if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL):
self.logger.info('Starting backfilling last 3 hours')
backfill(self.base_dir, self.node, self.stream, self.variants, last_hours(), stopping=self.stopping) backfill(self.base_dir, self.node, self.stream, self.variants, last_hours(), stopping=self.stopping)
self.logger.info('Finished backfilling last 3 hours')
last_small_backfill = now last_small_backfill = now
elif now - last_large_backfill > datetime.timedelta(minutes=self.LARGE_INTERVAL) or len(large_hours): elif now - last_large_backfill > datetime.timedelta(minutes=self.LARGE_INTERVAL) or len(large_hours):
@ -355,7 +358,9 @@ class BackfillerWorker(object):
this_hour = large_hours[-1:] this_hour = large_hours[-1:]
large_hours = large_hours[:-1] large_hours = large_hours[:-1]
self.logger.info('Starting full backfill hour: {}'.format(this_hour[0]))
backfill(self.base_dir, self.node, self.stream, self.variants, this_hour, stopping=self.stopping) backfill(self.base_dir, self.node, self.stream, self.variants, this_hour, stopping=self.stopping)
self.logger.info('Finished full backfill hour: {}'.format(this_hour[0]))
else: else:
self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) self.stopping.wait(common.jitter(self.WAIT_INTERVAL))

Loading…
Cancel
Save