|
|
@ -68,7 +68,7 @@ def list_remote_segments(node, channel, quality, hour, timeout=TIMEOUT):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
|
|
|
|
def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
|
|
|
|
timeout=TIMEOUT):
|
|
|
|
logger, timeout=TIMEOUT):
|
|
|
|
"""Get a segment from a node.
|
|
|
|
"""Get a segment from a node.
|
|
|
|
|
|
|
|
|
|
|
|
Fetches channel/quality/hour/missing_segment from node and puts it in
|
|
|
|
Fetches channel/quality/hour/missing_segment from node and puts it in
|
|
|
@ -107,7 +107,7 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
|
|
|
|
logging.debug('Saving completed segment {} as {}'.format(temp_path, path))
|
|
|
|
logging.debug('Saving completed segment {} as {}'.format(temp_path, path))
|
|
|
|
common.rename(temp_path, path)
|
|
|
|
common.rename(temp_path, path)
|
|
|
|
segments_backfilled.labels(remote=node, channel=channel, quality=quality, hour=hour).inc()
|
|
|
|
segments_backfilled.labels(remote=node, channel=channel, quality=quality, hour=hour).inc()
|
|
|
|
|
|
|
|
logger.info('Segment {}/{}/{} backfilled'.format(quality, hour, missing_segment))
|
|
|
|
|
|
|
|
|
|
|
|
def list_hours(node, channel, qualities, start=None):
|
|
|
|
def list_hours(node, channel, qualities, start=None):
|
|
|
|
"""Return a list of all available hours from a node.
|
|
|
|
"""Return a list of all available hours from a node.
|
|
|
@ -270,6 +270,7 @@ class BackfillerManager(object):
|
|
|
|
for row in results:
|
|
|
|
for row in results:
|
|
|
|
nodes[row.name] = row.url
|
|
|
|
nodes[row.name] = row.url
|
|
|
|
nodes.pop(self.localhost, None)
|
|
|
|
nodes.pop(self.localhost, None)
|
|
|
|
|
|
|
|
self.logger.info('Nodes fetched: {}'.format(nodes.keys()))
|
|
|
|
return nodes.values()
|
|
|
|
return nodes.values()
|
|
|
|
|
|
|
|
|
|
|
|
class BackfillerWorker(object):
|
|
|
|
class BackfillerWorker(object):
|
|
|
@ -317,7 +318,7 @@ class BackfillerWorker(object):
|
|
|
|
|
|
|
|
|
|
|
|
for hour in hours:
|
|
|
|
for hour in hours:
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.debug('Backfilling {}/{}'.format(quality, hour))
|
|
|
|
self.logger.info('Backfilling {}/{}'.format(quality, hour))
|
|
|
|
|
|
|
|
|
|
|
|
local_segments = set(list_local_segments(self.base_dir, self.channel, quality, hour))
|
|
|
|
local_segments = set(list_local_segments(self.base_dir, self.channel, quality, hour))
|
|
|
|
remote_segments = set(list_remote_segments(self.node, self.channel, quality, hour))
|
|
|
|
remote_segments = set(list_remote_segments(self.node, self.channel, quality, hour))
|
|
|
@ -358,7 +359,7 @@ class BackfillerWorker(object):
|
|
|
|
# start segment as soon as a pool slot opens up, then track it in workers
|
|
|
|
# start segment as soon as a pool slot opens up, then track it in workers
|
|
|
|
workers.append(pool.spawn(
|
|
|
|
workers.append(pool.spawn(
|
|
|
|
get_remote_segment,
|
|
|
|
get_remote_segment,
|
|
|
|
self.base_dir, self.node, self.channel, quality, hour, missing_segment
|
|
|
|
self.base_dir, self.node, self.channel, quality, hour, missing_segment, self.logger
|
|
|
|
))
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
# verify that all the workers succeeded. if any failed, raise the exception from
|
|
|
|
# verify that all the workers succeeded. if any failed, raise the exception from
|
|
|
@ -375,7 +376,9 @@ class BackfillerWorker(object):
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
|
|
|
|
self.logger.info('Starting backfill')
|
|
|
|
self.backfill(list_hours(self.node, self.channel, self.qualities, self.start))
|
|
|
|
self.backfill(list_hours(self.node, self.channel, self.qualities, self.start))
|
|
|
|
|
|
|
|
self.logger.info('Backfill complete')
|
|
|
|
failures = 0 #reset failure count on a successful backfill
|
|
|
|
failures = 0 #reset failure count on a successful backfill
|
|
|
|
if not self.run_once:
|
|
|
|
if not self.run_once:
|
|
|
|
self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
|
|
|
|
self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
|
|
|
|