Move the code for calculating hours outside the code that backfills

pull/43/head
Christopher Usher 6 years ago
parent ed58b6e44d
commit be8d40d1ba

@ -101,7 +101,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
# check to see if file was created since we listed the local segments to # check to see if file was created since we listed the local segments to
# avoid unnecessarily copying # avoid unnecessarily copying
if os.path.exists(path): if os.path.exists(path):
logging.debug('Skipping exisiting segment {}'.format(path)) logging.debug('Skipping existing segment {}'.format(path))
return return
dir_name = os.path.dirname(path) dir_name = os.path.dirname(path)
@ -131,7 +131,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
segments_backfilled.labels(remote=node, stream=stream, variant=variant, hour=hour).inc() segments_backfilled.labels(remote=node, stream=stream, variant=variant, hour=hour).inc()
def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None): def backfill_nodes(base_dir, stream, variants, hours=None, nodes=None, start=None):
"""Loop over nodes backfilling from each. """Loop over nodes backfilling from each.
Backfill from node/stream/variants to base_dir/stream/variants for each node Backfill from node/stream/variants to base_dir/stream/variants for each node
@ -145,62 +145,68 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None):
#ideally do this in parallel #ideally do this in parallel
for node in nodes: for node in nodes:
try: try:
backfill_node(base_dir, node, stream, variants, hours) backfill(base_dir, node, stream, variants, hours)
except Exception: except Exception:
logging.exception("Error while backfilling node {}".format(node)) logging.exception("Error while backfilling node {}".format(node))
def is_iterable(x): def last_hours(n_hours=3):
"""Test whether input is iterable.""" """Return of a list of the last n_hours in descending order."""
try: if n_hours < 1:
iter(x) raise ValueError('Number of hours has to be 1 or greater')
except TypeError: now = datetime.datetime.utcnow()
return False return [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)]
return True
def list_hours(node, stream, variants, order='forward', start=None):
"""Return a list of all available hours from a node.
List all hours available from node/stream for each variant in variants.
Keyword arguments:
order -- If 'random', randomise the order of segments. If 'forward', sort
the hours in ascending order. If 'reverse' (default), sort the
hours in descending order. Otherwise, do not change the order of the
hours.
start -- Only return hours after this time. If None (default), all hours are
returned."""
hour_lists = [list_remote_hours(node, stream, variant) for variant in variants]
hours = list(set().union(*hour_lists))
def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='random', recent_cutoff=60, start=None): if start is not None:
hours = [hour for hour in hours if datetime.datetime.strptime(hour, HOUR_FMT) < start]
if order == 'random':
random.shuffle(hours)
elif order == 'forward':
hours.sort()
elif order == 'reverse':
hours.sort(reverse=True)
return hours
def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60):
"""Backfill from remote node. """Backfill from remote node.
Backfill from node/stream/variants to base_dir/stream/variants. Backfill from node/stream/variants to base_dir/stream/variants for each hour
in hours.
Keyword arguments: Keyword arguments:
hours -- If None (default), backfill all available hours. If iterable,
backfill only hours in iterable. Otherwise backfill the last N hours,
starting with the lastest.
segment_order -- If 'random', randomise the order of segments (default). segment_order -- If 'random', randomise the order of segments (default).
If 'forward', sort the segment in ascending order. If 'reverse', sort If 'forward', sort the segment in ascending order. If 'reverse', sort
the segments in descending order. Otherwise, do not change the order of the segments in descending order. Otherwise, do not change the order of
segments. segments.
recent_cutoff -- Skip backfilling segments younger than this number of recent_cutoff -- Skip backfilling segments younger than this number of
seconds to prioritise letting the downloader grab these segments. seconds to prioritise letting the downloader grab these segments."""
start -- Do not backfill hours starting before this time. If None (default),
all hours are backfilled"""
logging.info('Starting backfilling from {}'.format(node)) logging.info('Starting backfilling from {}'.format(node))
if hours is None:
# gather all available hours from all variants and take the union
hours = set().union(*[
list_remote_hours(node, stream, variant)
for variant in variants
])
elif is_iterable(hours):
hours = list(hours) # coerce to list so it can be sorted
else:
n_hours = hours
if n_hours < 1:
raise ValueError('Number of hours has to be 1 or greater')
now = datetime.datetime.utcnow()
hours = [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)]
for variant in variants: for variant in variants:
for hour in hours: for hour in hours:
hour_time = datetime.datetime.strptime(hour, HOUR_FMT)
if start is not None and hour_time < start:
logging.debug('Skipping {}/{}/{} as before start'.format(stream, variant, hour))
continue
logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour)) logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour))

Loading…
Cancel
Save