hopefully did break anything with this refactor

pull/43/head
Christopher Usher 6 years ago
parent 1f53fa8d29
commit f4385ad4e3

@ -168,64 +168,6 @@ def list_hours(node, stream, variants, order='forward', start=None):
return hours return hours
def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60, stopping=None):
"""Backfill from remote node.
Backfill from node/stream/variants to base_dir/stream/variants for each hour
in hours.
Keyword arguments:
segment_order -- If 'random', randomise the order of segments (default).
If 'forward', sort the segment in ascending order. If 'reverse', sort
the segments in descending order. Otherwise, do not change the order of
segments.
recent_cutoff -- Skip backfilling segments younger than this number of
seconds to prioritise letting the downloader grab these segments."""
#logging.info('Starting backfilling from {}'.format(node))
for variant in variants:
for hour in hours:
if stopping is not None and stopping.is_set():
return
logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour))
local_segments = set(list_local_segments(base_dir, stream, variant, hour))
remote_segments = set(list_remote_segments(node, stream, variant, hour))
missing_segments = list(remote_segments - local_segments)
# useful if running in parallel so multiple nodes don't request the same segment at the same time
if segment_order == 'random':
random.shuffle(missing_segments)
elif segment_order == 'forward':
missing_segments.sort()
elif segment_order == 'reverse':
missing_segments.sort(reverse=True)
for missing_segment in missing_segments:
path = os.path.join(stream, variant, hour, missing_segment)
# test to see if file is a segment and get the segments start time
try:
segment = common.parse_segment_path(path)
except ValueError as e:
logging.warning('File {} invaid: {}'.format(path, e))
continue
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff):
logging.debug('Skipping {} as too recent'.format(path))
continue
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
logging.info('{} segments in {}/{}/{} backfilled'.format(len(missing_segments), stream, variant, hour))
#logging.info('Finished backfilling from {}'.format(node))
class BackfillerManager(object): class BackfillerManager(object):
"""Manages BackfillerWorkers to backfill from a pool of nodes. """Manages BackfillerWorkers to backfill from a pool of nodes.
@ -315,15 +257,74 @@ class BackfillerWorker(object):
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.done = gevent.event.Event() self.done = gevent.event.Event()
def __repr__(self): def __repr__(self):
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream)
__str__ = __repr__ __str__ = __repr__
def stop(self): def stop(self):
"""Tell the worker to shut down""" """Tell the worker to shut down"""
self.stopping.set() self.stopping.set()
def backfill(self, hours, segment_order='random', recent_cutoff=60):
"""Backfill from remote node.
Backfill from node/stream/variants to base_dir/stream/variants for each
hour in hours.
Keyword arguments:
segment_order -- If 'random', randomise the order of segments (default).
If 'forward', sort the segment in ascending order. If 'reverse',
sort the segments in descending order. Otherwise, do not change the
order of segments.
recent_cutoff -- Skip backfilling segments younger than this number of
seconds to prioritise letting the downloader grab these segments."""
#logging.info('Starting backfilling from {}'.format(node))
for variant in self.variants:
for hour in hours:
logging.info('Backfilling {}/{}'.format(variant, hour))
local_segments = set(list_local_segments(self.base_dir, self.stream, variant, hour))
remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour))
missing_segments = list(remote_segments - local_segments)
# useful if running in parallel so multiple nodes don't request the same segment at the same time
if segment_order == 'random':
random.shuffle(missing_segments)
elif segment_order == 'forward':
missing_segments.sort()
elif segment_order == 'reverse':
missing_segments.sort(reverse=True)
for missing_segment in missing_segments:
if self.stopping.is_set():
return
path = os.path.join(self.stream, variant, hour, missing_segment)
# test to see if file is a segment and get the segments start time
try:
segment = common.parse_segment_path(path)
except ValueError as e:
logging.warning('File {} invaid: {}'.format(path, e))
continue
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff):
logging.debug('Skipping {} as too recent'.format(path))
continue
get_remote_segment(self.base_dir, self.node, self.stream, variant, hour, missing_segment)
logging.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour))
def run(self): def run(self):
self.logger.info('Worker starting') self.logger.info('Worker starting')
try: try:
@ -347,7 +348,7 @@ class BackfillerWorker(object):
if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL): if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL):
self.logger.info('Starting backfilling last 3 hours') self.logger.info('Starting backfilling last 3 hours')
backfill(self.base_dir, self.node, self.stream, self.variants, last_hours(), stopping=self.stopping) self.backfill(last_hours())
self.logger.info('Finished backfilling last 3 hours') self.logger.info('Finished backfilling last 3 hours')
last_small_backfill = now last_small_backfill = now
@ -359,7 +360,7 @@ class BackfillerWorker(object):
this_hour = large_hours[-1:] this_hour = large_hours[-1:]
large_hours = large_hours[:-1] large_hours = large_hours[:-1]
self.logger.info('Starting full backfill hour: {}'.format(this_hour[0])) self.logger.info('Starting full backfill hour: {}'.format(this_hour[0]))
backfill(self.base_dir, self.node, self.stream, self.variants, this_hour, stopping=self.stopping) self.backfill(this_hour)
self.logger.info('Finished full backfill hour: {}'.format(this_hour[0])) self.logger.info('Finished full backfill hour: {}'.format(this_hour[0]))
else: else:
self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) self.stopping.wait(common.jitter(self.WAIT_INTERVAL))

Loading…
Cancel
Save