diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 8788c30..f537326 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -101,7 +101,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, # check to see if file was created since we listed the local segments to # avoid unnecessarily copying if os.path.exists(path): - logging.debug('Skipping exisiting segment {}'.format(path)) + logging.debug('Skipping existing segment {}'.format(path)) return dir_name = os.path.dirname(path) @@ -131,7 +131,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, segments_backfilled.labels(remote=node, stream=stream, variant=variant, hour=hour).inc() -def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None): +def backfill_nodes(base_dir, stream, variants, hours=None, nodes=None, start=None): """Loop over nodes backfilling from each. Backfill from node/stream/variants to base_dir/stream/variants for each node @@ -145,62 +145,68 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None): #ideally do this in parallel for node in nodes: try: - backfill_node(base_dir, node, stream, variants, hours) + backfill(base_dir, node, stream, variants, hours) except Exception: logging.exception("Error while backfilling node {}".format(node)) -def is_iterable(x): - """Test whether input is iterable.""" - try: - iter(x) - except TypeError: - return False - return True +def last_hours(n_hours=3): + """Return of a list of the last n_hours in descending order.""" + if n_hours < 1: + raise ValueError('Number of hours has to be 1 or greater') + now = datetime.datetime.utcnow() + return [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] + + +def list_hours(node, stream, variants, order='forward', start=None): + """Return a list of all available hours from a node. + + List all hours available from node/stream for each variant in variants. + + Keyword arguments: + order -- If 'random', randomise the order of segments. If 'forward', sort + the hours in ascending order. If 'reverse' (default), sort the + hours in descending order. Otherwise, do not change the order of the + hours. + start -- Only return hours after this time. If None (default), all hours are + returned.""" + hour_lists = [list_remote_hours(node, stream, variant) for variant in variants] + hours = list(set().union(*hour_lists)) -def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='random', recent_cutoff=60, start=None): + if start is not None: + hours = [hour for hour in hours if datetime.datetime.strptime(hour, HOUR_FMT) < start] + + if order == 'random': + random.shuffle(hours) + elif order == 'forward': + hours.sort() + elif order == 'reverse': + hours.sort(reverse=True) + + return hours + + + +def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60): """Backfill from remote node. - Backfill from node/stream/variants to base_dir/stream/variants. + Backfill from node/stream/variants to base_dir/stream/variants for each hour + in hours. Keyword arguments: - hours -- If None (default), backfill all available hours. If iterable, - backfill only hours in iterable. Otherwise backfill the last N hours, - starting with the lastest. segment_order -- If 'random', randomise the order of segments (default). If 'forward', sort the segment in ascending order. If 'reverse', sort the segments in descending order. Otherwise, do not change the order of segments. recent_cutoff -- Skip backfilling segments younger than this number of - seconds to prioritise letting the downloader grab these segments. - start -- Do not backfill hours starting before this time. If None (default), - all hours are backfilled""" + seconds to prioritise letting the downloader grab these segments.""" logging.info('Starting backfilling from {}'.format(node)) - if hours is None: - # gather all available hours from all variants and take the union - hours = set().union(*[ - list_remote_hours(node, stream, variant) - for variant in variants - ]) - elif is_iterable(hours): - hours = list(hours) # coerce to list so it can be sorted - else: - n_hours = hours - if n_hours < 1: - raise ValueError('Number of hours has to be 1 or greater') - now = datetime.datetime.utcnow() - hours = [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] - for variant in variants: for hour in hours: - hour_time = datetime.datetime.strptime(hour, HOUR_FMT) - if start is not None and hour_time < start: - logging.debug('Skipping {}/{}/{} as before start'.format(stream, variant, hour)) - continue logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour))