diff --git a/backfiller/backfiller.py b/backfiller/backfiller.py index a26e5bc..5423c96 100644 --- a/backfiller/backfiller.py +++ b/backfiller/backfiller.py @@ -2,13 +2,17 @@ # this is a prototype of the backfiller # lots about web services and the like I don't know # needs logging, exception handling and the like +# also proper doc strings # when starting the backfiller and every few hours, backfill everything # more frequently, backfill the last couple hours # (last three hour directories so always at least two hours). -import requests import os +import time +import datetime + +import requests hour_fmt = '%Y-%m-%dT%H' @@ -28,12 +32,18 @@ def get_nodes(): def list_local_segments(base_dir, stream, variant, hour): + # based on restreamer.list_segments + # could just call restreamer.list_segments but this avoids http/json + # overheads path = os.path.join(base_dir, stream, variant, hour) local_segments = [name for name in os.listdir(path) if not name.startswith('.')] return local_segments -def get_hours(node, stream, variant): +def list_remote_hours(node, stream, variant): + + # just a wrapper around a call to restreamer.list_hours + # TODO if the call fails, log it and just return an empty list resp = requests.get('https://{}/files/{}/{}'.format(node, stream, variant)) hours = resp.json() @@ -42,13 +52,16 @@ def get_hours(node, stream, variant): def list_remote_segments(node, stream, variant, hour): + # just a wrapper around a call to restreamer.list_segments + # TODO if the call fails, log it and just return an empty list + resp = requests.get('https://{}/files/{}/{}/{}'.format(node, stream, variant, hour_str)) - remote_segments = resp.json() #replace with appropriate parser - + remote_segments = resp.json() return remote_segments -#based on _get_segment in downloader/main +# based on _get_segment in downloader/main +# very basic error handling def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment): resp = requests.get('https://{}/segments/{}/{}/{}/{}'.format(node, stream, @@ -63,19 +76,29 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment): for chunk in resp.iter_content(8192): f.write(chunk) - path = os.path.join(base_dir, stream, variant, hour, missing_segment) - os.rename(temp_name, segment) + dir_path = os.path.join(base_dir, stream, variant, hour) + + if not os.path.exists(dir_path): + try: + os.mkdir(dir_path) + except OSError as e: + # Ignore if EEXISTS. This is needed to avoid a race if two getters run at once. + if e.errno != errno.EEXIST: + raise + + path = os.path.join(dir_path, missing_segment) + os.rename(temp_name, path) return True -def back_fill(static_folder, stream, variants, hours=None, nodes=None, + +def backfill(base_dir, stream, variants, hours=None, nodes=None, failure_limit=5): - # if variants is None, backfill all versions - # if hours is None, backfill all hourdirs - # if hours is iterable, backfill those hourdirs # if hours is int, backfill last hours hourdirs + # else if hours is None, backfill all hourdirs + # else assume hours is iterable and backfill those hourdirs # loop over nodes asking for a list of segments then downloads any # segments it doesn't have @@ -104,29 +127,33 @@ def back_fill(static_folder, stream, variants, hours=None, nodes=None, for node in nodes: - back_fill_node(static_folder, node, stream, variants, hours, + backfill_node(base_dir, node, stream, variants, hours, failure_limit) -def back_fill_node(base_dir, node, stream, variants, hours, failure_limit): +def backfill_node(base_dir, node, stream, variants, hours, failure_limit): - # need to figure out how to properly check whether this node is the same - if is_local_host(node): - return + # split into its own function to allow breaking out of two loops at once + # count failures this node has and if too many occur, assume node isn't + # working and move onto next failures = 0 for variant in variants: if hours is None: - node_hours = get_hours(node, stream, variant) + # if this fails, get an empty list back so function quickly + # finishes + node_hours = list_remote_hours(node, stream, variant) else: node_hours = hours for hour in node_hours: - + # if this fails, get an empty list back so this loop quickly + # finishes local_segments = list_local_segments(base_dir, stream, variant, hour) local_segments = set(local_segments) + #should include the result of this in the failure count remote_segments = list_remote_segments(node, stream, variant, hour) remote_segments = set(remote_segments) missing_segments = remote_segments - local_segments @@ -142,13 +169,46 @@ def back_fill_node(base_dir, node, stream, variants, hours, failure_limit): if failures > failure_limit: return +# all wait times are in minutes +# obviously adjust default times in response to how long back filling actually +# takes +def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, + sleep_time=1): + + fill_start = datetime.datetime.now() + full_fill_start = fill_start + + # Do a full backfill at start + backfill(base_dir, stream, variants) + + # I'm sure there is a module that does this in a more robust way + # but I understand this and it gives the behaviour I want + while True: + + now = datetime.datetime.now() + + if now - full_fill_start > datetime.timedelta(minutes=full_fill_wait): + + backfill(base_dir, stream, variants) + + fill_start = now + full_fill_start = fill_start + + elif now - fill_start > datetime.timedelta(minutes=fill_wait): + + backfill(base_dir, stream, variants, 3) + + fill_start = now + + + else: + time.sleep(60 * sleep_time) + + + + + + + -def main(base_dir, wait_time=60): - None - # every wait_time call back_fill - # time from start of back_fill - # to keep things simple don't try two back_fills at the same time - # wait for previous one to start before launching second. - # if it's taken more than wait_time for back_fill to run, start - # immediately