|
|
|
@ -11,6 +11,11 @@ import glob
|
|
|
|
|
import requests
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
|
|
hour_fmt = '%Y-%m-%dT%H'
|
|
|
|
|
|
|
|
|
|
def is_localhost(node):
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_nodes():
|
|
|
|
|
|
|
|
|
@ -23,32 +28,48 @@ def get_nodes():
|
|
|
|
|
|
|
|
|
|
return nodes
|
|
|
|
|
|
|
|
|
|
def list_remote_segments(node):
|
|
|
|
|
def list_local_segments(base_dir, stream, variant, hour):
|
|
|
|
|
|
|
|
|
|
path = os.path.join(base_dir, stream, variant, hour)
|
|
|
|
|
return [name for name in os.listdir(path) if not name.startswith('.')]
|
|
|
|
|
|
|
|
|
|
def get_hours(node, stream, variant):
|
|
|
|
|
|
|
|
|
|
resp = requests.get('https://{}/files/{}/{}'.format(node, stream, variant))
|
|
|
|
|
hours = resp.json()
|
|
|
|
|
|
|
|
|
|
# return a list of paths
|
|
|
|
|
# obviously update path with real one
|
|
|
|
|
return hours
|
|
|
|
|
|
|
|
|
|
resp = requests.get(node + '/wubloader/segment_list')
|
|
|
|
|
def list_remote_segments(node, stream, variant, hour):
|
|
|
|
|
|
|
|
|
|
resp = requests.get('https://{}/files/{}/{}/{}'.format(node, stream,
|
|
|
|
|
variant, hour_str))
|
|
|
|
|
remote_segments = resp.json() #replace with appropriate parser
|
|
|
|
|
|
|
|
|
|
return remote_segments
|
|
|
|
|
|
|
|
|
|
#based on _get_segment in downloader/main
|
|
|
|
|
#should have a more general shared version of this
|
|
|
|
|
def get_remote_segment(node, segment):
|
|
|
|
|
def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment):
|
|
|
|
|
|
|
|
|
|
# obviously update path with real one
|
|
|
|
|
resp = requests.get(node + '/wubloader/segments/' + segment, stream=True)
|
|
|
|
|
resp = requests.get('https://{}/segments/{}/{}/{}/{}'.format(node, stream, variant,
|
|
|
|
|
hour, missing_segment), stream=True)
|
|
|
|
|
|
|
|
|
|
with open('temp_backfill', 'w') as f:
|
|
|
|
|
for chunk in resp.iter_content(8192):
|
|
|
|
|
f.write(chunk)
|
|
|
|
|
|
|
|
|
|
path = os.path.join(base_dir, stream, variant, hour, missing_segment)
|
|
|
|
|
os.rename(temp, segment)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def back_fill(base_dir, nodes=None):
|
|
|
|
|
def back_fill(static_folder, stream, variants, hours=None, nodes=None,
|
|
|
|
|
failure_limit=5):
|
|
|
|
|
|
|
|
|
|
# if variants is None, backfill all versions
|
|
|
|
|
# if hours is None, backfill all hourdirs
|
|
|
|
|
# if hours is iterable, backfill those hourdirs
|
|
|
|
|
# if hours is int, backfill last hours hourdirs
|
|
|
|
|
|
|
|
|
|
# loop over nodes asking for a list of segments then downloads any
|
|
|
|
|
# segments it doesn't have
|
|
|
|
@ -56,26 +77,64 @@ def back_fill(base_dir, nodes=None):
|
|
|
|
|
if nodes is None:
|
|
|
|
|
nodes = get_nodes()
|
|
|
|
|
|
|
|
|
|
if isinstance(hours, int):
|
|
|
|
|
n_hours = hours
|
|
|
|
|
|
|
|
|
|
if n_hours < 1:
|
|
|
|
|
raise ValueError('Number of hours has to be 1 or greater')
|
|
|
|
|
|
|
|
|
|
now = datetime.datetime.utcnow()
|
|
|
|
|
|
|
|
|
|
now_str = now.strftime(hour_fmt)
|
|
|
|
|
now_hour = datetime.strptime(now_str, hour_fmt)
|
|
|
|
|
|
|
|
|
|
hours = [now_str]
|
|
|
|
|
|
|
|
|
|
for i in range(n_hours - 1):
|
|
|
|
|
|
|
|
|
|
previous_hour = datetime.strptime(hours[-1], hour_fmt)
|
|
|
|
|
current_hour = previous_hour + datetime.timedelta(hours=-1)
|
|
|
|
|
hours.append(current_hour.strftime(hour_fmt))
|
|
|
|
|
|
|
|
|
|
for node in nodes:
|
|
|
|
|
|
|
|
|
|
# need to figure out how to properly check whether this node is
|
|
|
|
|
# the same
|
|
|
|
|
if node == 'localhost':
|
|
|
|
|
continue
|
|
|
|
|
back_fill_node(static_folder, node, stream, variants, hours,
|
|
|
|
|
failure_limit)
|
|
|
|
|
|
|
|
|
|
# not sure how much we want to hard code the search
|
|
|
|
|
# replace with something from the restreamer
|
|
|
|
|
local_segments = set(glob.glob(base_dir + '/*/*/*.ts'))
|
|
|
|
|
|
|
|
|
|
remote_segments = set(list_remote_segments(node))
|
|
|
|
|
def back_fill_node(base_dir, node, stream, variants, hours, failure_limit):
|
|
|
|
|
|
|
|
|
|
missing_segments = remote_segments - local_segments
|
|
|
|
|
# need to figure out how to properly check whether this node is the same
|
|
|
|
|
if is_local_host(node):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
failures = 0
|
|
|
|
|
for variant in variants:
|
|
|
|
|
|
|
|
|
|
if hours is None:
|
|
|
|
|
node_hours = get_hours(node, stream, variant)
|
|
|
|
|
else:
|
|
|
|
|
node_hours = hours
|
|
|
|
|
|
|
|
|
|
for hour in node_hours:
|
|
|
|
|
|
|
|
|
|
local_segments = list_local_segments(base_dir, stream, variant,
|
|
|
|
|
hour)
|
|
|
|
|
local_segments = set(local_segments)
|
|
|
|
|
remote_segments = list_remote_segments(node, stream, variant, hour)
|
|
|
|
|
remote_segments = set(remote_segments)
|
|
|
|
|
missing_segments = remote_segments - local_segments
|
|
|
|
|
|
|
|
|
|
for missing_segment in missing_segments:
|
|
|
|
|
get_remote_segment(node, missing_segment)
|
|
|
|
|
|
|
|
|
|
status = get_remote_segment(base_dir, node, stream, variant,
|
|
|
|
|
hour, missing_segment)
|
|
|
|
|
|
|
|
|
|
if not status:
|
|
|
|
|
failures += 1
|
|
|
|
|
|
|
|
|
|
if failures > failure_limit:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(base_dir, wait_time=60):
|
|
|
|
|