diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 718d237..bda5fa2 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -12,9 +12,12 @@ import os import time import datetime import errno +import uuid import requests +import common + HOUR_FMT = '%Y-%m-%dT%H' TIMEOUT = 5 @@ -26,6 +29,7 @@ def get_nodes(): # nodes so that # as a prototype can just hardcode some addresses. + # each element in nodes is a 'protocol://host:port/' string nodes = [] @@ -38,50 +42,28 @@ def list_local_segments(base_dir, stream, variant, hour): # overheads path = os.path.join(base_dir, stream, variant, hour) try: - local_segments = [name for name in os.listdir(path) if not - name.startswith('.')] + local_segments = [name for name in os.listdir(path) if not name.startswith('.')] except OSError as e: if e.errno != errno.ENOENT: - raise: - else: - local_segments = [] + raise + + local_segments = [] return local_segments def list_remote_hours(node, stream, variant, timeout=TIMEOUT): # just a wrapper around a call to restreamer.list_hours - - uri = 'https://{}/files/{}/{}'.format(node, stream, variant) - - try: - resp = requests.get(uri, timeout=timeout) - - except requests.exceptions.Timeout: - return [] - - if resp.status_code != request.codes.ok: - return [] - + uri = '{}/files/{}/{}'.format(node, stream, variant) + resp = requests.get(uri, timeout=timeout) hours = resp.json() - return hours def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT): # just a wrapper around a call to restreamer.list_segments - - uri = 'https://{}/files/{}/{}/{}'.format(node, stream, variant, hour_str) - - try: - resp = requests.get(uri, timeout=timeout) - - except requests.exceptions.Timeout: - return [] - - if resp.status_code != request.codes.ok: - return [] - + uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour_str) + resp = requests.get(uri, timeout=timeout) remote_segments = resp.json() return remote_segments @@ -90,49 +72,28 @@ def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT): def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, timeout=TIMEOUT): - uri = 'https://{}/segments/{}/{}/{}/{}'.format(node, stream, variant, - hour, missing_segment) - resp = requests.get(uri, stream=True, - timeout=timeout) - try: - resp = requests.get(uri, stream=True, timeout=timeout) + path = os.path.join(base_dir, stream, variant, hour, missing_segment) + if os.path.exists(path): + return - except requests.exceptions.Timeout: - return False + common.ensure_directory(path) - if resp.status_code != requests.codes.ok: - return False + substrs = path.split('-') + temp_path = '-'.join(substrs[:-1] + [str(uuid.uuid4()) + '.st']) - temp_name = 'temp_backfill' + uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment) + resp = requests.get(uri, stream=True, timeout=timeout) - with open(temp_name, 'w') as f: + with open(temp_path, 'w') as f: for chunk in resp.iter_content(8192): f.write(chunk) - dir_path = os.path.join(base_dir, stream, variant, hour) + common.rename(temp_path, path) - if not os.path.exists(dir_path): - try: - os.mkdir(dir_path) - except OSError as e: - # Ignore if EEXISTS. This is needed to avoid a race if a getter is running at the same time. - if e.errno != errno.EEXIST: - raise - path = os.path.join(dir_path, missing_segment) - os.rename(temp_name, path) - return True - - - -def backfill(base_dir, stream, variants, hours=None, nodes=None, - failure_limit=5): - - # if hours is int, backfill last hours hourdirs - # else if hours is None, backfill all hourdirs - # else assume hours is iterable and backfill those hourdirs +def backfill(base_dir, stream, variants, hours=None, nodes=None): # loop over nodes asking for a list of segments then downloads any # segments it doesn't have @@ -140,75 +101,69 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None, if nodes is None: nodes = get_nodes() - if isinstance(hours, int): - n_hours = hours - if n_hours < 1: - raise ValueError('Number of hours has to be 1 or greater') - - now = datetime.datetime.utcnow() - now_str = now.strftime(HOUR_FMT) + #ideally do this in parallel + for node in nodes: + + try: + backfill_node(base_dir, node, stream, variants, hours) - hours = [now_str] + #need to replace this with a more sophisticated error handler + except Exception as e: + print node, e - for i in range(n_hours - 1): - previous_hour = datetime.strptime(hours[-1], HOUR_FMT) - current_hour = previous_hour + datetime.timedelta(hours=-1) - hours.append(current_hour.strftime(HOUR_FMT)) +def backfill_node(base_dir, node, stream, variants, hours, recent_cutoff=60): - for node in nodes: - backfill_node(base_dir, node, stream, variants, hours, - failure_limit) + # if hours is int, backfill last hours hourdirs + # else if hours is None, backfill all hourdirs + # else assume hours is iterable and backfill those hourdirs -def backfill_node(base_dir, node, stream, variants, hours, failure_limit): + if isinstance(hours, int): + n_hours = hours - # split into its own function to allow breaking out of two loops at once - # count failures this node has and if too many occur, assume node isn't - # working and move onto next + if n_hours < 1: + raise ValueError('Number of hours has to be 1 or greater') + + now = datetime.datetime.utcnow() + hours = [(now - i * timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] - failures = 0 for variant in variants: if hours is None: - # if this fails, get an empty list back so function quickly - # finishes node_hours = list_remote_hours(node, stream, variant) else: node_hours = hours for hour in node_hours: - local_segments = list_local_segments(base_dir, stream, variant, - hour) + local_segments = list_local_segments(base_dir, stream, variant, hour) local_segments = set(local_segments) - # if this fails, get an empty list back so no missing segments are - # requested remote_segments = list_remote_segments(node, stream, variant, hour) remote_segments = set(remote_segments) missing_segments = remote_segments - local_segments for missing_segment in missing_segments: - status = get_remote_segment(base_dir, node, stream, variant, - hour, missing_segment) + #to avoid getting in the downloader's way ignore segments less than recent_cutoff old + time_str = '{}:{}'.format(hour, missing_segment.split('-')[0]) + segment_time = datetime.datetime.strptime(time_str, HOUR_FMT + ':%M:%S.%f') + if datetime.datetime.utcnow() - segment_time < datetime.timedelta(seconds=recent_cutoff): + continue + + get_remote_segment(base_dir, node, stream, variant, hour, missing_segment) - if not status: - failures += 1 - if failures > failure_limit: - return # all wait times are in minutes # obviously adjust default times in response to how long back filling actually # takes -def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, - sleep_time=1): +def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, sleep_time=1): fill_start = datetime.datetime.now() full_fill_start = fill_start @@ -237,7 +192,7 @@ def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, else: - time.sleep(60 * sleep_time) + time.sleep(common.jitter(60 * sleep_time))