backfiller working in parallel

pull/43/head
Christopher Usher 6 years ago
parent f4385ad4e3
commit 4eac6189ce

@ -175,7 +175,7 @@ class BackfillerManager(object):
worker exists for a node, the manager starts one. If a worker corresponds to worker exists for a node, the manager starts one. If a worker corresponds to
a node not in the list, the manager stops it.""" a node not in the list, the manager stops it."""
NODE_INTERVAL = 5 #minutes between updating list of nodes NODE_INTERVAL = 1 #minutes between updating list of nodes
def __init__(self, base_dir, stream, variants, nodes=None): def __init__(self, base_dir, stream, variants, nodes=None):
"""Constructor for BackfillerManager.""" """Constructor for BackfillerManager."""
@ -184,7 +184,7 @@ class BackfillerManager(object):
self.variants = variants self.variants = variants
self.nodes = nodes self.nodes = nodes
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.logger = self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.logger = logging.getLogger("BackfillerManager({})".format(stream))
self.workers = {} # {node url: worker} self.workers = {} # {node url: worker}
@ -249,7 +249,7 @@ class BackfillerWorker(object):
def __init__(self, manager, base_dir, node, stream, variants): def __init__(self, manager, base_dir, node, stream, variants):
"""Constructor for BackfillerWorker""" """Constructor for BackfillerWorker"""
self.manager = manager self.manager = manager
self.logger = manager.logger.getChild('BackfillerWorker({}/{})@{:x}'.format(node, stream, id(self))) self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node))
self.base_dir = base_dir self.base_dir = base_dir
self.node = node self.node = node
self.stream = stream self.stream = stream
@ -265,6 +265,7 @@ class BackfillerWorker(object):
def stop(self): def stop(self):
"""Tell the worker to shut down""" """Tell the worker to shut down"""
self.logger.info('Stopping')
self.stopping.set() self.stopping.set()
@ -288,7 +289,7 @@ class BackfillerWorker(object):
for hour in hours: for hour in hours:
logging.info('Backfilling {}/{}'.format(variant, hour)) self.logger.info('Backfilling {}/{}'.format(variant, hour))
local_segments = set(list_local_segments(self.base_dir, self.stream, variant, hour)) local_segments = set(list_local_segments(self.base_dir, self.stream, variant, hour))
remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour)) remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour))
@ -313,16 +314,16 @@ class BackfillerWorker(object):
try: try:
segment = common.parse_segment_path(path) segment = common.parse_segment_path(path)
except ValueError as e: except ValueError as e:
logging.warning('File {} invaid: {}'.format(path, e)) self.logger.warning('File {} invaid: {}'.format(path, e))
continue continue
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old #to avoid getting in the downloader's way ignore segments less than recent_cutoff old
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff):
logging.debug('Skipping {} as too recent'.format(path)) self.logger.debug('Skipping {} as too recent'.format(path))
continue continue
get_remote_segment(self.base_dir, self.node, self.stream, variant, hour, missing_segment) get_remote_segment(self.base_dir, self.node, self.stream, variant, hour, missing_segment)
logging.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour)) self.logger.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour))
def run(self): def run(self):

Loading…
Cancel
Save