diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index be69d13..17b8a89 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -168,64 +168,6 @@ def list_hours(node, stream, variants, order='forward', start=None): return hours -def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60, stopping=None): - """Backfill from remote node. - - Backfill from node/stream/variants to base_dir/stream/variants for each hour - in hours. - - Keyword arguments: - segment_order -- If 'random', randomise the order of segments (default). - If 'forward', sort the segment in ascending order. If 'reverse', sort - the segments in descending order. Otherwise, do not change the order of - segments. - recent_cutoff -- Skip backfilling segments younger than this number of - seconds to prioritise letting the downloader grab these segments.""" - - #logging.info('Starting backfilling from {}'.format(node)) - - for variant in variants: - - for hour in hours: - - if stopping is not None and stopping.is_set(): - return - - logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour)) - - local_segments = set(list_local_segments(base_dir, stream, variant, hour)) - remote_segments = set(list_remote_segments(node, stream, variant, hour)) - missing_segments = list(remote_segments - local_segments) - - # useful if running in parallel so multiple nodes don't request the same segment at the same time - if segment_order == 'random': - random.shuffle(missing_segments) - elif segment_order == 'forward': - missing_segments.sort() - elif segment_order == 'reverse': - missing_segments.sort(reverse=True) - - for missing_segment in missing_segments: - - path = os.path.join(stream, variant, hour, missing_segment) - - # test to see if file is a segment and get the segments start time - try: - segment = common.parse_segment_path(path) - except ValueError as e: - logging.warning('File {} invaid: {}'.format(path, e)) - continue - - #to avoid getting in the downloader's way ignore segments less than recent_cutoff old - if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): - logging.debug('Skipping {} as too recent'.format(path)) - continue - - get_remote_segment(base_dir, node, stream, variant, hour, missing_segment) - logging.info('{} segments in {}/{}/{} backfilled'.format(len(missing_segments), stream, variant, hour)) - - #logging.info('Finished backfilling from {}'.format(node)) - class BackfillerManager(object): """Manages BackfillerWorkers to backfill from a pool of nodes. @@ -315,15 +257,74 @@ class BackfillerWorker(object): self.stopping = gevent.event.Event() self.done = gevent.event.Event() + def __repr__(self): return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) __str__ = __repr__ + def stop(self): """Tell the worker to shut down""" self.stopping.set() + def backfill(self, hours, segment_order='random', recent_cutoff=60): + """Backfill from remote node. + + Backfill from node/stream/variants to base_dir/stream/variants for each + hour in hours. + + Keyword arguments: + segment_order -- If 'random', randomise the order of segments (default). + If 'forward', sort the segment in ascending order. If 'reverse', + sort the segments in descending order. Otherwise, do not change the + order of segments. + recent_cutoff -- Skip backfilling segments younger than this number of + seconds to prioritise letting the downloader grab these segments.""" + + #logging.info('Starting backfilling from {}'.format(node)) + + for variant in self.variants: + + for hour in hours: + + logging.info('Backfilling {}/{}'.format(variant, hour)) + + local_segments = set(list_local_segments(self.base_dir, self.stream, variant, hour)) + remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour)) + missing_segments = list(remote_segments - local_segments) + + # useful if running in parallel so multiple nodes don't request the same segment at the same time + if segment_order == 'random': + random.shuffle(missing_segments) + elif segment_order == 'forward': + missing_segments.sort() + elif segment_order == 'reverse': + missing_segments.sort(reverse=True) + + for missing_segment in missing_segments: + + if self.stopping.is_set(): + return + + path = os.path.join(self.stream, variant, hour, missing_segment) + + # test to see if file is a segment and get the segments start time + try: + segment = common.parse_segment_path(path) + except ValueError as e: + logging.warning('File {} invaid: {}'.format(path, e)) + continue + + #to avoid getting in the downloader's way ignore segments less than recent_cutoff old + if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): + logging.debug('Skipping {} as too recent'.format(path)) + continue + + get_remote_segment(self.base_dir, self.node, self.stream, variant, hour, missing_segment) + logging.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour)) + + def run(self): self.logger.info('Worker starting') try: @@ -347,7 +348,7 @@ class BackfillerWorker(object): if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL): self.logger.info('Starting backfilling last 3 hours') - backfill(self.base_dir, self.node, self.stream, self.variants, last_hours(), stopping=self.stopping) + self.backfill(last_hours()) self.logger.info('Finished backfilling last 3 hours') last_small_backfill = now @@ -359,7 +360,7 @@ class BackfillerWorker(object): this_hour = large_hours[-1:] large_hours = large_hours[:-1] self.logger.info('Starting full backfill hour: {}'.format(this_hour[0])) - backfill(self.base_dir, self.node, self.stream, self.variants, this_hour, stopping=self.stopping) + self.backfill(this_hour) self.logger.info('Finished full backfill hour: {}'.format(this_hour[0])) else: self.stopping.wait(common.jitter(self.WAIT_INTERVAL))