|
|
|
@ -132,7 +132,8 @@ def is_iterable(x):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
stop=None, order=None, recent_cutoff=60):
|
|
|
|
|
stop=None, hour_order=None, segment_order='random', get_full_hour=True,
|
|
|
|
|
recent_cutoff=60):
|
|
|
|
|
"""Backfill from remote node.
|
|
|
|
|
|
|
|
|
|
Backfill from node/stream/variants to base_dir/stream/variants.
|
|
|
|
@ -142,14 +143,22 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
backfill only hours in iterable. Otherwise backfill the last N hours,
|
|
|
|
|
starting with the lastest.
|
|
|
|
|
start -- Only backfill hours starting after or equal to this datetime
|
|
|
|
|
object. If None (default), backfill all hours.
|
|
|
|
|
object. If None (default), backfill all hours. If get_full_hour is
|
|
|
|
|
False, only segments starting after start will be backfilled.
|
|
|
|
|
stop -- Only backfill hours starting before or equal to this datetime
|
|
|
|
|
object. If None (default), backfill all hours.
|
|
|
|
|
order -- If 'random', randomise the order of hours. If 'forward', sort the
|
|
|
|
|
hours in ascending order. If 'reverse', sort the hours in descending
|
|
|
|
|
order. Otherwise, do not change the order of hours (default).
|
|
|
|
|
object. If None (default), backfill all hours. If get_full_hour is
|
|
|
|
|
False, only segments finishing before stop will be backfilled.
|
|
|
|
|
hour_order -- If 'random', randomise the order of hours. If 'forward', sort
|
|
|
|
|
the hours in ascending order. If 'reverse', sort the hours in descending order. Otherwise, do not change the order of hours (default).
|
|
|
|
|
segment_order -- If 'random', randomise the order of segments (default).
|
|
|
|
|
If 'forward', sort the segment in ascending order. If 'reverse', sort
|
|
|
|
|
the segments in descending order. Otherwise, do not change the order of
|
|
|
|
|
segments.
|
|
|
|
|
get_full_hour -- If True (default), get all segments in an hour. If False,
|
|
|
|
|
use start and stop to limit which segments are backfilled.
|
|
|
|
|
recent_cutoff -- Skip backfilling segments younger than this number of
|
|
|
|
|
seconds to prioritise letting the downloader grab these segments."""
|
|
|
|
|
|
|
|
|
|
logging.info('Starting backfilling from {}'.format(node))
|
|
|
|
|
|
|
|
|
|
if hours is None:
|
|
|
|
@ -172,14 +181,12 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
if stop is not None:
|
|
|
|
|
hours = [hour for hour in hours if hour <= stop]
|
|
|
|
|
|
|
|
|
|
# useful if running in parallel and expecting to fetch a segments from
|
|
|
|
|
# multiple hours (say on start up) so that you don't try to backfill the
|
|
|
|
|
# same hour at the same time
|
|
|
|
|
if order == 'random':
|
|
|
|
|
# useful if running in parallel so multiple nodes don't request the same hour at the same time
|
|
|
|
|
if hour_order == 'random':
|
|
|
|
|
random.shuffle(hours)
|
|
|
|
|
elif order == 'forward':
|
|
|
|
|
elif hour_order == 'forward':
|
|
|
|
|
hours.sort()
|
|
|
|
|
elif order == 'reverse':
|
|
|
|
|
elif hour_order == 'reverse':
|
|
|
|
|
hours.sort(reverse=True)
|
|
|
|
|
|
|
|
|
|
for variant in variants:
|
|
|
|
@ -188,7 +195,15 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
|
|
|
|
|
local_segments = set(list_local_segments(base_dir, stream, variant, hour))
|
|
|
|
|
remote_segments = set(list_remote_segments(node, stream, variant, hour))
|
|
|
|
|
missing_segments = remote_segments - local_segments
|
|
|
|
|
missing_segments = list(remote_segments - local_segments)
|
|
|
|
|
|
|
|
|
|
# useful if running in parallel so multiple nodes don't request the same segment at the same time
|
|
|
|
|
if segment_order == 'random':
|
|
|
|
|
random.shuffle(missing_segments)
|
|
|
|
|
elif segment_order == 'forward':
|
|
|
|
|
missing_segments.sort()
|
|
|
|
|
elif segment_order == 'reverse':
|
|
|
|
|
missing_segments.sort(reverse=True)
|
|
|
|
|
|
|
|
|
|
for missing_segment in missing_segments:
|
|
|
|
|
|
|
|
|
@ -200,7 +215,12 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
except ValueError as e:
|
|
|
|
|
logging.warning('File {} invaid: {}'.format(path, e.value))
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not get_full_hour and start is not None and segment['start'] < start:
|
|
|
|
|
continue
|
|
|
|
|
if not get_full_hour and start is not None and segment['end'] > stop:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old
|
|
|
|
|
if datetime.datetime.utcnow() - segment['start'] < datetime.timedelta(seconds=recent_cutoff):
|
|
|
|
|
continue
|
|
|
|
|