From 2fb17fff59056faa21bb3f473fee16028235ea1f Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Fri, 28 Dec 2018 04:18:06 -0800 Subject: [PATCH] much closer to being functional --- backfiller/backfiller.py | 101 +++++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 21 deletions(-) diff --git a/backfiller/backfiller.py b/backfiller/backfiller.py index 40773fc..8c4ba7c 100644 --- a/backfiller/backfiller.py +++ b/backfiller/backfiller.py @@ -11,6 +11,11 @@ import glob import requests import os +hour_fmt = '%Y-%m-%dT%H' + +def is_localhost(node): + + return None def get_nodes(): @@ -23,60 +28,114 @@ def get_nodes(): return nodes -def list_remote_segments(node): +def list_local_segments(base_dir, stream, variant, hour): + + path = os.path.join(base_dir, stream, variant, hour) + return [name for name in os.listdir(path) if not name.startswith('.')] + +def get_hours(node, stream, variant): + + resp = requests.get('https://{}/files/{}/{}'.format(node, stream, variant)) + hours = resp.json() - # return a list of paths - # obviously update path with real one + return hours - resp = requests.get(node + '/wubloader/segment_list') +def list_remote_segments(node, stream, variant, hour): + + resp = requests.get('https://{}/files/{}/{}/{}'.format(node, stream, + variant, hour_str)) remote_segments = resp.json() #replace with appropriate parser return remote_segments #based on _get_segment in downloader/main -#should have a more general shared version of this -def get_remote_segment(node, segment): +def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment): - # obviously update path with real one - resp = requests.get(node + '/wubloader/segments/' + segment, stream=True) + resp = requests.get('https://{}/segments/{}/{}/{}/{}'.format(node, stream, variant, + hour, missing_segment), stream=True) with open('temp_backfill', 'w') as f: for chunk in resp.iter_content(8192): f.write(chunk) + path = os.path.join(base_dir, stream, variant, hour, missing_segment) os.rename(temp, segment) -def back_fill(base_dir, nodes=None): +def back_fill(static_folder, stream, variants, hours=None, nodes=None, + failure_limit=5): + # if variants is None, backfill all versions + # if hours is None, backfill all hourdirs + # if hours is iterable, backfill those hourdirs + # if hours is int, backfill last hours hourdirs + # loop over nodes asking for a list of segments then downloads any # segments it doesn't have if nodes is None: nodes = get_nodes() + if isinstance(hours, int): + n_hours = hours + + if n_hours < 1: + raise ValueError('Number of hours has to be 1 or greater') + + now = datetime.datetime.utcnow() + + now_str = now.strftime(hour_fmt) + now_hour = datetime.strptime(now_str, hour_fmt) + + hours = [now_str] + + for i in range(n_hours - 1): + + previous_hour = datetime.strptime(hours[-1], hour_fmt) + current_hour = previous_hour + datetime.timedelta(hours=-1) + hours.append(current_hour.strftime(hour_fmt)) for node in nodes: - # need to figure out how to properly check whether this node is - # the same - if node == 'localhost': - continue + back_fill_node(static_folder, node, stream, variants, hours, + failure_limit) - # not sure how much we want to hard code the search - # replace with something from the restreamer - local_segments = set(glob.glob(base_dir + '/*/*/*.ts')) - remote_segments = set(list_remote_segments(node)) +def back_fill_node(base_dir, node, stream, variants, hours, failure_limit): - missing_segments = remote_segments - local_segments + # need to figure out how to properly check whether this node is the same + if is_local_host(node): + return - - for missing_segment in missing_segments: - get_remote_segment(node, missing_segment) + failures = 0 + for variant in variants: + + if hours is None: + node_hours = get_hours(node, stream, variant) + else: + node_hours = hours + + for hour in node_hours: + + local_segments = list_local_segments(base_dir, stream, variant, + hour) + local_segments = set(local_segments) + remote_segments = list_remote_segments(node, stream, variant, hour) + remote_segments = set(remote_segments) + missing_segments = remote_segments - local_segments + + for missing_segment in missing_segments: + + status = get_remote_segment(base_dir, node, stream, variant, + hour, missing_segment) + if not status: + failures += 1 + if failures > failure_limit: + return + def main(base_dir, wait_time=60):