diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 17b7fb0..bfc627f 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -11,6 +11,7 @@ import datetime import errno import os +import random import time import uuid @@ -20,6 +21,7 @@ import common HOUR_FMT = '%Y-%m-%dT%H' + TIMEOUT = 5 #default timeout for remote requests def get_nodes(): @@ -42,10 +44,10 @@ def list_local_segments(base_dir, stream, variant, hour): path = os.path.join(base_dir, stream, variant, hour) try: return [name for name in os.listdir(path) if not name.startswith('.')] + except OSError as e: if e.errno != errno.ENOENT: raise - return [] @@ -97,7 +99,9 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, common.rename(temp_path, path) -def backfill(base_dir, stream, variants, hours=None, nodes=None): + +def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None, + stop=None, order=None): # loop over nodes backfilling from each @@ -107,7 +111,7 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None): #ideally do this in parallel for node in nodes: try: - backfill_node(base_dir, node, stream, variants, hours) + backfill_node(base_dir, node, stream, variants, hours, start, stop, order=order) #need to replace this with a more sophisticated error handler except Exception as e: print node, e @@ -121,7 +125,8 @@ def is_iterable(x): return True -def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60): +def backfill_node(base_dir, node, stream, variants, hours=None, start=None, + stop=None, recent_cutoff=60, order=None): # if hours is None, backfill all hourdirs if hours is None: @@ -139,6 +144,18 @@ def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60 now = datetime.datetime.utcnow() hours = [(now - i * timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] + if start is not None: + hours = [hour for hour in hours if hour >= start] + + if stop is not None: + hours = [hour for hour in hours if hour <= stop] + + # useful if running in parallel and expecting to fetch a segments from + # multiple hours (say on start up) so that you don't try to backfill the + # same hour at the same time + if order == 'random': + hours = random.shuffle(hours) + for variant in variants: for hour in hours: @@ -167,7 +184,7 @@ def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, sleep_time full_fill_start = fill_start # Do a full backfill at start - backfill(base_dir, stream, variants) + backfill(base_dir, stream, variants, order='random') # I'm sure there is a module that does this in a more robust way # but I understand this and it gives the behaviour I want