diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 5423c96..718d237 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -11,11 +11,12 @@ import os import time import datetime +import errno import requests -hour_fmt = '%Y-%m-%dT%H' - +HOUR_FMT = '%Y-%m-%dT%H' +TIMEOUT = 5 def get_nodes(): @@ -36,38 +37,71 @@ def list_local_segments(base_dir, stream, variant, hour): # could just call restreamer.list_segments but this avoids http/json # overheads path = os.path.join(base_dir, stream, variant, hour) - local_segments = [name for name in os.listdir(path) if not + try: + local_segments = [name for name in os.listdir(path) if not name.startswith('.')] - return local_segments + except OSError as e: + if e.errno != errno.ENOENT: + raise: + else: + local_segments = [] -def list_remote_hours(node, stream, variant): + return local_segments +def list_remote_hours(node, stream, variant, timeout=TIMEOUT): + # just a wrapper around a call to restreamer.list_hours - # TODO if the call fails, log it and just return an empty list - resp = requests.get('https://{}/files/{}/{}'.format(node, stream, variant)) + uri = 'https://{}/files/{}/{}'.format(node, stream, variant) + + try: + resp = requests.get(uri, timeout=timeout) + + except requests.exceptions.Timeout: + return [] + + if resp.status_code != request.codes.ok: + return [] + hours = resp.json() return hours -def list_remote_segments(node, stream, variant, hour): +def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT): # just a wrapper around a call to restreamer.list_segments - # TODO if the call fails, log it and just return an empty list - resp = requests.get('https://{}/files/{}/{}/{}'.format(node, stream, - variant, hour_str)) + uri = 'https://{}/files/{}/{}/{}'.format(node, stream, variant, hour_str) + + try: + resp = requests.get(uri, timeout=timeout) + + except requests.exceptions.Timeout: + return [] + + if resp.status_code != request.codes.ok: + return [] + remote_segments = resp.json() return remote_segments # based on _get_segment in downloader/main # very basic error handling -def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment): +def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, + timeout=TIMEOUT): - resp = requests.get('https://{}/segments/{}/{}/{}/{}'.format(node, stream, - variant, hour, missing_segment), stream=True) + uri = 'https://{}/segments/{}/{}/{}/{}'.format(node, stream, variant, + hour, missing_segment) - if resp.status_code != 200: + resp = requests.get(uri, stream=True, + timeout=timeout) + try: + resp = requests.get(uri, stream=True, timeout=timeout) + + except requests.exceptions.Timeout: + return False + + if resp.status_code != requests.codes.ok: return False temp_name = 'temp_backfill' @@ -82,7 +116,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment): try: os.mkdir(dir_path) except OSError as e: - # Ignore if EEXISTS. This is needed to avoid a race if two getters run at once. + # Ignore if EEXISTS. This is needed to avoid a race if a getter is running at the same time. if e.errno != errno.EEXIST: raise @@ -114,16 +148,15 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None, now = datetime.datetime.utcnow() - now_str = now.strftime(hour_fmt) - now_hour = datetime.strptime(now_str, hour_fmt) + now_str = now.strftime(HOUR_FMT) hours = [now_str] for i in range(n_hours - 1): - previous_hour = datetime.strptime(hours[-1], hour_fmt) + previous_hour = datetime.strptime(hours[-1], HOUR_FMT) current_hour = previous_hour + datetime.timedelta(hours=-1) - hours.append(current_hour.strftime(hour_fmt)) + hours.append(current_hour.strftime(HOUR_FMT)) for node in nodes: @@ -148,12 +181,12 @@ def backfill_node(base_dir, node, stream, variants, hours, failure_limit): node_hours = hours for hour in node_hours: - # if this fails, get an empty list back so this loop quickly - # finishes + local_segments = list_local_segments(base_dir, stream, variant, hour) local_segments = set(local_segments) - #should include the result of this in the failure count + # if this fails, get an empty list back so no missing segments are + # requested remote_segments = list_remote_segments(node, stream, variant, hour) remote_segments = set(remote_segments) missing_segments = remote_segments - local_segments @@ -168,6 +201,8 @@ def backfill_node(base_dir, node, stream, variants, hours, failure_limit): if failures > failure_limit: return + + # all wait times are in minutes # obviously adjust default times in response to how long back filling actually