diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 1c56126..7a34234 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -25,7 +25,7 @@ segments_backfilled = prom.Counter( HOUR_FMT = '%Y-%m-%dT%H' -TIMEOUT = 5 #default timeout for remote requests +TIMEOUT = 5 #default timeout in seconds for remote requests def encode_strings(o): if isinstance(o, list): @@ -44,13 +44,11 @@ def get_nodes(): # either read a config file or query the database to get the addresses # of the other nodes # figure out some way that the local machine isn't in the list of returned - # nodes so that - # as a prototype can just hardcode some addresses. + # nodes logging.info('Fetching list of other nodes') - nodes = ['http://toodles.videostrike.team:1337/'] - #nodes = [] + nodes = [] return nodes @@ -133,37 +131,26 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, def last_hours(n_hours=3): """Return of a list of the last n_hours in descending order.""" - if n_hours < 1: - raise ValueError('Number of hours has to be 1 or greater') now = datetime.datetime.utcnow() return [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] -def list_hours(node, stream, variants, order='forward', start=None): +def list_hours(node, stream, variants, start=None): """Return a list of all available hours from a node. - List all hours available from node/stream for each variant in variants. + List all hours available from node/stream for each variant in variants + ordered from newest to oldest. Keyword arguments: - order -- If 'random', randomise the order of segments. If 'forward', sort - the hours in ascending order. If 'reverse' (default), sort the - hours in descending order. Otherwise, do not change the order of the - hours. start -- Only return hours after this time. If None (default), all hours are returned.""" hour_lists = [list_remote_hours(node, stream, variant) for variant in variants] hours = list(set().union(*hour_lists)) + hours.sort(reverse=True) #latest hour first if start is not None: hours = [hour for hour in hours if datetime.datetime.strptime(hour, HOUR_FMT) < start] - - if order == 'random': - random.shuffle(hours) - elif order == 'forward': - hours.sort() - elif order == 'reverse': - hours.sort(reverse=True) return hours @@ -172,63 +159,62 @@ class BackfillerManager(object): """Manages BackfillerWorkers to backfill from a pool of nodes. The manager regularly calls get_nodes to an up to date list of nodes. If no - worker exists for a node, the manager starts one. If a worker corresponds to - a node not in the list, the manager stops it.""" + worker exists for a node in this list or in the static_node list, the + manager starts one. If a worker corresponds to a node not in either list, + the manager stops it. If run_once, only backfill once.""" - NODE_INTERVAL = 1 #minutes between updating list of nodes + NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, stream, variants, nodes=None): - """Constructor for BackfillerManager.""" + def __init__(self, base_dir, stream, variants, static_nodes=[], run_once=False): + """Constructor for BackfillerManager. + Creates a manager for a given stream with specified variants. If + static_nodes is None, manager""" self.base_dir = base_dir self.stream = stream self.variants = variants - self.nodes = nodes + self.static_nodes = static_nodes + self.run_once = run_once self.stopping = gevent.event.Event() self.logger = logging.getLogger("BackfillerManager({})".format(stream)) self.workers = {} # {node url: worker} - def stop(self): """Shut down all workers and stop backfilling.""" self.logger.info('Stopping') + for worker in self.workers: + worker.stop() self.stopping.set() - def start_worker(self, node): """Start a new worker for given node.""" - worker = BackfillerWorker(self, self.base_dir, node, self.stream, self.variants) - if node in self.workers: - self.workers[node].stop() #only one worker per node + worker = BackfillerWorker(self, self.base_dir, node, self.stream, self.variants, self) + assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node) self.workers[node] = worker gevent.spawn(worker.run) - def stop_worker(self, node): """Stop the worker for given node.""" - self.workers[node].stop() - del self.workers[node] - + self.workers.pop(node).stop() def run(self): while not self.stopping.is_set(): - if self.nodes is None: - new_nodes = set(get_nodes()) - else: - new_nodes = set(self.nodes) + new_nodes = set(get_nodes() + self.static_nodes) exisiting_nodes = set(self.workers.keys()) - to_start = new_nodes - exisiting_nodes for node in to_start: self.start_worker(node) - to_stop = exisiting_nodes - new_nodes for node in to_stop: self.stop_worker(node) - self.stopping.wait(common.jitter(self.NODE_INTERVAL * 60)) + if self.run_once: + break + + self.stopping.wait(common.jitter(self.NODE_INTERVAL)) + + else: + self.stop() - for worker in self.workers: - worker.stop() for worker in self.workers: worker.done.wait() @@ -236,39 +222,31 @@ class BackfillerManager(object): class BackfillerWorker(object): """Backfills segments from a node. - Backfills all segments from node/stream to base_dir/stream for all variants. - Every SMALL_INTERVAL minutes backfill the last three hours starting from the - most recent one (a 'small backfill'). When not doing a small backfill, - backfill all segments starting with the most recent one (a 'large backfill') - unless a large backfill has occured less than LARGE_INTERVAL ago.""" + Backfills every WAIT_INTERVAL all segments from node/stream to + base_dir/stream for all variants. If run_once, only backfill once.""" - SMALL_INTERVAL = 5 #minutes between small backfills - LARGE_INTERVAL = 60 #minutes between large backfills - WAIT_INTERVAL = 1 #seconds between backfill actions + WAIT_INTERVAL = 120 #seconds between backfills - def __init__(self, manager, base_dir, node, stream, variants): - """Constructor for BackfillerWorker""" + def __init__(self, manager, base_dir, node, stream, variants, run_once=False): self.manager = manager self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node)) self.base_dir = base_dir self.node = node self.stream = stream self.variants = variants + self.run_once = run_once self.stopping = gevent.event.Event() self.done = gevent.event.Event() - def __repr__(self): return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) __str__ = __repr__ - def stop(self): """Tell the worker to shut down""" self.logger.info('Stopping') self.stopping.set() - def backfill(self, hours, segment_order='random', recent_cutoff=60): """Backfill from remote node. @@ -325,60 +303,31 @@ class BackfillerWorker(object): get_remote_segment(self.base_dir, self.node, self.stream, variant, hour, missing_segment) self.logger.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour)) - def run(self): self.logger.info('Worker starting') - try: - self._run() - except Exception: - self.logger.exception('Worker failed') - else: - self.logger.info('Worker stopped') - finally: - self.done.set() - del self.manager.workers[self.node] - - def _run(self): - last_small_backfill = datetime.datetime.now() + datetime.timedelta(-1) - last_large_backfill = datetime.datetime.now() + datetime.timedelta(-1) - large_hours = [] while not self.stopping.is_set(): + self.backfill(list_hours(self.node, self.stream, self.variants)) + self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) - now = datetime.datetime.now() - - if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL): - self.logger.info('Starting backfilling last 3 hours') - self.backfill(last_hours()) - self.logger.info('Finished backfilling last 3 hours') - last_small_backfill = now - - elif now - last_large_backfill > datetime.timedelta(minutes=self.LARGE_INTERVAL) or len(large_hours): - if not len(large_hours): - large_hours = list_hours(self.node, self.stream, self.variants) - last_large_backfill = now - - this_hour = large_hours[-1:] - large_hours = large_hours[:-1] - self.logger.info('Starting full backfill hour: {}'.format(this_hour[0])) - self.backfill(this_hour) - self.logger.info('Finished full backfill hour: {}'.format(this_hour[0])) - else: - self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) + if self.run_once: + break + + self.logger.info('Worker stopped') + self.done.set() + del self.manager.workers[self.node] -def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0, start=None): +def main(base_dir='.', stream='', variants='', metrics_port=8002, + static_nodes='', backdoor_port=0, start=None, run_once=False): """Backfiller service.""" - # stretch goal: provide an interface to trigger backfills manually - # stretch goal: use the backfiller to monitor the restreamer variants = variants.split(',') if variants else [] - if nodes is not None: - nodes = nodes.split(',') if nodes else [] + static_nodes = static_nodes.split(',') if static_nodes else [] if start is not None: start = dateutil.parser.parse(start) - manager = BackfillerManager(base_dir, stream, variants, nodes) + manager = BackfillerManager(base_dir, stream, variants, static_nodes, run_once) gevent.signal(signal.SIGTERM, manager.stop) common.PromLogCountsHandler.install()