diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index aff1dc0..32b2466 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -132,7 +132,8 @@ def is_iterable(x): def backfill_node(base_dir, node, stream, variants, hours=None, start=None, - stop=None, order=None, recent_cutoff=60): + stop=None, hour_order=None, segment_order='random', get_full_hour=True, + recent_cutoff=60): """Backfill from remote node. Backfill from node/stream/variants to base_dir/stream/variants. @@ -142,14 +143,22 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, backfill only hours in iterable. Otherwise backfill the last N hours, starting with the lastest. start -- Only backfill hours starting after or equal to this datetime - object. If None (default), backfill all hours. + object. If None (default), backfill all hours. If get_full_hour is + False, only segments starting after start will be backfilled. stop -- Only backfill hours starting before or equal to this datetime - object. If None (default), backfill all hours. - order -- If 'random', randomise the order of hours. If 'forward', sort the - hours in ascending order. If 'reverse', sort the hours in descending - order. Otherwise, do not change the order of hours (default). + object. If None (default), backfill all hours. If get_full_hour is + False, only segments finishing before stop will be backfilled. + hour_order -- If 'random', randomise the order of hours. If 'forward', sort + the hours in ascending order. If 'reverse', sort the hours in descending order. Otherwise, do not change the order of hours (default). + segment_order -- If 'random', randomise the order of segments (default). + If 'forward', sort the segment in ascending order. If 'reverse', sort + the segments in descending order. Otherwise, do not change the order of + segments. + get_full_hour -- If True (default), get all segments in an hour. If False, + use start and stop to limit which segments are backfilled. recent_cutoff -- Skip backfilling segments younger than this number of seconds to prioritise letting the downloader grab these segments.""" + logging.info('Starting backfilling from {}'.format(node)) if hours is None: @@ -172,14 +181,12 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, if stop is not None: hours = [hour for hour in hours if hour <= stop] - # useful if running in parallel and expecting to fetch a segments from - # multiple hours (say on start up) so that you don't try to backfill the - # same hour at the same time - if order == 'random': + # useful if running in parallel so multiple nodes don't request the same hour at the same time + if hour_order == 'random': random.shuffle(hours) - elif order == 'forward': + elif hour_order == 'forward': hours.sort() - elif order == 'reverse': + elif hour_order == 'reverse': hours.sort(reverse=True) for variant in variants: @@ -188,7 +195,15 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, local_segments = set(list_local_segments(base_dir, stream, variant, hour)) remote_segments = set(list_remote_segments(node, stream, variant, hour)) - missing_segments = remote_segments - local_segments + missing_segments = list(remote_segments - local_segments) + + # useful if running in parallel so multiple nodes don't request the same segment at the same time + if segment_order == 'random': + random.shuffle(missing_segments) + elif segment_order == 'forward': + missing_segments.sort() + elif segment_order == 'reverse': + missing_segments.sort(reverse=True) for missing_segment in missing_segments: @@ -200,7 +215,12 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, except ValueError as e: logging.warning('File {} invaid: {}'.format(path, e.value)) continue - + + if not get_full_hour and start is not None and segment['start'] < start: + continue + if not get_full_hour and start is not None and segment['end'] > stop: + continue + #to avoid getting in the downloader's way ignore segments less than recent_cutoff old if datetime.datetime.utcnow() - segment['start'] < datetime.timedelta(seconds=recent_cutoff): continue