|
|
@ -267,7 +267,7 @@ class BackfillerManager(object):
|
|
|
|
hours.sort()
|
|
|
|
hours.sort()
|
|
|
|
|
|
|
|
|
|
|
|
for hour in hours:
|
|
|
|
for hour in hours:
|
|
|
|
# deleting segments can take a bit time but is less important
|
|
|
|
# deleting segments can take a bit of time but is less important
|
|
|
|
# than the actually backfilling so we yield
|
|
|
|
# than the actually backfilling so we yield
|
|
|
|
gevent.idle()
|
|
|
|
gevent.idle()
|
|
|
|
path = os.path.join(self.base_dir, channel, quality, hour)
|
|
|
|
path = os.path.join(self.base_dir, channel, quality, hour)
|
|
|
@ -428,6 +428,19 @@ class BackfillerWorker(object):
|
|
|
|
self.logger.info('Stopping')
|
|
|
|
self.logger.info('Stopping')
|
|
|
|
self.stopping.set()
|
|
|
|
self.stopping.set()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# retries a failed function call once after TIMEOUT
|
|
|
|
|
|
|
|
def retry_handler(self, function, *args):
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
return function(*args)
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
|
|
delay = common.jitter(TIMEOUT)
|
|
|
|
|
|
|
|
self.logger.exception('Backfill opperation failed. Retrying in {:.0f} s'.format(delay))
|
|
|
|
|
|
|
|
backfill_errors.labels(remote=self.node).inc()
|
|
|
|
|
|
|
|
self.stopping.wait(delay)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return function(*args)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def backfill(self):
|
|
|
|
def backfill(self):
|
|
|
|
"""Backfill from remote node.
|
|
|
|
"""Backfill from remote node.
|
|
|
|
|
|
|
|
|
|
|
@ -435,7 +448,7 @@ class BackfillerWorker(object):
|
|
|
|
each hour in hours.
|
|
|
|
each hour in hours.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
for channel, quality in itertools.product(self.channels, self.qualities):
|
|
|
|
for channel, quality in itertools.product(self.channels, self.qualities):
|
|
|
|
for hour in list_hours(self.node, channel, quality, self.start):
|
|
|
|
for hour in self.retry_handler(list_hours, self.node, channel, quality, self.start):
|
|
|
|
# since backfilling can take a long time, recheck whether this
|
|
|
|
# since backfilling can take a long time, recheck whether this
|
|
|
|
# hour is after the start
|
|
|
|
# hour is after the start
|
|
|
|
if self.start is not None:
|
|
|
|
if self.start is not None:
|
|
|
@ -448,7 +461,7 @@ class BackfillerWorker(object):
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.info('Backfilling {}/{}'.format(quality, hour))
|
|
|
|
self.logger.info('Backfilling {}/{}'.format(quality, hour))
|
|
|
|
|
|
|
|
|
|
|
|
local_segments = set(list_local_segments(self.base_dir, channel, quality, hour))
|
|
|
|
local_segments = set(self.retry_handler(list_local_segments, self.base_dir, channel, quality, hour))
|
|
|
|
remote_segments = set(list_remote_segments(self.node, channel, quality, hour))
|
|
|
|
remote_segments = set(list_remote_segments(self.node, channel, quality, hour))
|
|
|
|
missing_segments = list(remote_segments - local_segments)
|
|
|
|
missing_segments = list(remote_segments - local_segments)
|
|
|
|
|
|
|
|
|
|
|
@ -486,8 +499,8 @@ class BackfillerWorker(object):
|
|
|
|
|
|
|
|
|
|
|
|
# start segment as soon as a pool slot opens up, then track it in workers
|
|
|
|
# start segment as soon as a pool slot opens up, then track it in workers
|
|
|
|
workers.append(pool.spawn(
|
|
|
|
workers.append(pool.spawn(
|
|
|
|
get_remote_segment,
|
|
|
|
self.retry_handler,
|
|
|
|
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
|
|
|
|
get_remote_segment, self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
|
|
|
|
))
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
# verify that all the workers succeeded. if any failed, raise the exception from
|
|
|
|
# verify that all the workers succeeded. if any failed, raise the exception from
|
|
|
|