diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 9f09e0a..17b7fb0 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -8,18 +8,19 @@ # more frequently, backfill the last couple hours # (last three hour directories so always at least two hours). -import os -import time import datetime import errno +import os +import time import uuid import requests import common + HOUR_FMT = '%Y-%m-%dT%H' -TIMEOUT = 5 +TIMEOUT = 5 #default timeout for remote requests def get_nodes(): @@ -32,77 +33,70 @@ def get_nodes(): # each element in nodes is a 'protocol://host:port/' string nodes = [] - return nodes -def list_local_segments(base_dir, stream, variant, hour): +def list_local_segments(base_dir, stream, variant, hour): # based on restreamer.list_segments - # could just call restreamer.list_segments but this avoids http/json - # overheads + # could just call restreamer.list_segments but this avoids http/json overheads path = os.path.join(base_dir, stream, variant, hour) try: - local_segments = [name for name in os.listdir(path) if not name.startswith('.')] + return [name for name in os.listdir(path) if not name.startswith('.')] except OSError as e: if e.errno != errno.ENOENT: raise - local_segments = [] + return [] - return local_segments def list_remote_hours(node, stream, variant, timeout=TIMEOUT): - # just a wrapper around a call to restreamer.list_hours uri = '{}/files/{}/{}'.format(node, stream, variant) resp = requests.get(uri, timeout=timeout) - hours = resp.json() - return hours + return resp.json() -def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT): +def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT): # just a wrapper around a call to restreamer.list_segments - uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour_str) + uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour) resp = requests.get(uri, timeout=timeout) - remote_segments = resp.json() - return remote_segments + return resp.json() + # based on _get_segment in downloader/main # very basic error handling def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, timeout=TIMEOUT): - file_created = False path = os.path.join(base_dir, stream, variant, hour, missing_segment) + # check to see if file already exists to avoid unnecessarily copying it if os.path.exists(path): return - - substrs = path.split('-') - temp_path = '-'.join(substrs[:-1] + [str(uuid.uuid4()) + '.st']) + + dir_name = os.path.dirname(path) + date, duration, _ = os.path.basename(path).split('-', 2) + temp_name = "-".join([date, duration, "temp", str(uuid.uuid4())]) + temp_path = os.path.join(dir_name, "{}.ts".format(temp_name)) common.ensure_directory(temp_path) try: uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment) resp = requests.get(uri, stream=True, timeout=timeout) + resp.raise_for_status() + with open(temp_path, 'w') as f: - file_created = True for chunk in resp.iter_content(8192): f.write(chunk) except Exception: - ex_type, ex, tb = sys.exc_info() - if file_created: + if os.path.exists(temp_path): os.remove(temp_path) - - raise ex_type, ex, tb - - + raise common.rename(temp_path, path) - def backfill(base_dir, stream, variants, hours=None, nodes=None): # loop over nodes backfilling from each @@ -112,24 +106,31 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None): #ideally do this in parallel for node in nodes: - try: backfill_node(base_dir, node, stream, variants, hours) - #need to replace this with a more sophisticated error handler except Exception as e: print node, e -def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60): - +def is_iterable(x): + try: + iter(x) + except TypeError: + return False + return True - # if hours is int, backfill last hours hourdirs - # else if hours is None, backfill all hourdirs - # else assume hours is iterable and backfill those hourdirs +def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60): - if isinstance(hours, int): + # if hours is None, backfill all hourdirs + if hours is None: + hours = list_remote_hours(node, stream, variant) + # if hours is iterable, backfill those hourdirs + elif is_iterable(hours): + None + # assume int and backfill last hours hourdirs + else: n_hours = hours if n_hours < 1: @@ -140,17 +141,10 @@ def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60 for variant in variants: - if hours is None: - node_hours = list_remote_hours(node, stream, variant) - else: - node_hours = hours - - for hour in node_hours: + for hour in hours: - local_segments = list_local_segments(base_dir, stream, variant, hour) - local_segments = set(local_segments) - remote_segments = list_remote_segments(node, stream, variant, hour) - remote_segments = set(remote_segments) + local_segments = set(list_local_segments(base_dir, stream, variant, hour)) + remote_segments = set(list_remote_segments(node, stream, variant, hour)) missing_segments = remote_segments - local_segments for missing_segment in missing_segments: @@ -163,9 +157,6 @@ def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60 get_remote_segment(base_dir, node, stream, variant, hour, missing_segment) - - - # all wait times are in minutes # obviously adjust default times in response to how long back filling actually @@ -197,15 +188,6 @@ def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, sleep_time fill_start = now - else: time.sleep(common.jitter(60 * sleep_time)) - - - - - - - - diff --git a/common/common.py b/common/common.py index 558181e..acb6b08 100644 --- a/common/common.py +++ b/common/common.py @@ -266,6 +266,7 @@ def rename(old, new): raise os.remove(old) + def ensure_directory(path): """Create directory that contains path, as well as any parent directories, if they don't already exist.""" @@ -280,6 +281,7 @@ def ensure_directory(path): if e.errno != errno.EEXIST: raise + def jitter(interval): """Apply some 'jitter' to an interval. This is a random +/- 10% change in order to smooth out patterns and prevent everything from retrying at the same time. diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 544613b..d06cc50 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -461,7 +461,7 @@ class SegmentGetter(object): logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) return resp.raise_for_status() - ensure_directory(temp_path) + common.ensure_directory(temp_path) with open(temp_path, 'w') as f: file_created = True # We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway,