hopefully more robust

pull/18/head
Christopher Usher 6 years ago committed by Mike Lang
parent 50bcb84c0c
commit ba52bf7f5d

@ -11,11 +11,12 @@
import os import os
import time import time
import datetime import datetime
import errno
import requests import requests
hour_fmt = '%Y-%m-%dT%H' HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5
def get_nodes(): def get_nodes():
@ -36,38 +37,71 @@ def list_local_segments(base_dir, stream, variant, hour):
# could just call restreamer.list_segments but this avoids http/json # could just call restreamer.list_segments but this avoids http/json
# overheads # overheads
path = os.path.join(base_dir, stream, variant, hour) path = os.path.join(base_dir, stream, variant, hour)
try:
local_segments = [name for name in os.listdir(path) if not local_segments = [name for name in os.listdir(path) if not
name.startswith('.')] name.startswith('.')]
except OSError as e:
if e.errno != errno.ENOENT:
raise:
else:
local_segments = []
return local_segments return local_segments
def list_remote_hours(node, stream, variant): def list_remote_hours(node, stream, variant, timeout=TIMEOUT):
# just a wrapper around a call to restreamer.list_hours # just a wrapper around a call to restreamer.list_hours
# TODO if the call fails, log it and just return an empty list
resp = requests.get('https://{}/files/{}/{}'.format(node, stream, variant)) uri = 'https://{}/files/{}/{}'.format(node, stream, variant)
try:
resp = requests.get(uri, timeout=timeout)
except requests.exceptions.Timeout:
return []
if resp.status_code != request.codes.ok:
return []
hours = resp.json() hours = resp.json()
return hours return hours
def list_remote_segments(node, stream, variant, hour): def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
# just a wrapper around a call to restreamer.list_segments # just a wrapper around a call to restreamer.list_segments
# TODO if the call fails, log it and just return an empty list
resp = requests.get('https://{}/files/{}/{}/{}'.format(node, stream, uri = 'https://{}/files/{}/{}/{}'.format(node, stream, variant, hour_str)
variant, hour_str))
try:
resp = requests.get(uri, timeout=timeout)
except requests.exceptions.Timeout:
return []
if resp.status_code != request.codes.ok:
return []
remote_segments = resp.json() remote_segments = resp.json()
return remote_segments return remote_segments
# based on _get_segment in downloader/main # based on _get_segment in downloader/main
# very basic error handling # very basic error handling
def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment): def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
timeout=TIMEOUT):
uri = 'https://{}/segments/{}/{}/{}/{}'.format(node, stream, variant,
hour, missing_segment)
resp = requests.get(uri, stream=True,
timeout=timeout)
try:
resp = requests.get(uri, stream=True, timeout=timeout)
resp = requests.get('https://{}/segments/{}/{}/{}/{}'.format(node, stream, except requests.exceptions.Timeout:
variant, hour, missing_segment), stream=True) return False
if resp.status_code != 200: if resp.status_code != requests.codes.ok:
return False return False
temp_name = 'temp_backfill' temp_name = 'temp_backfill'
@ -82,7 +116,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment):
try: try:
os.mkdir(dir_path) os.mkdir(dir_path)
except OSError as e: except OSError as e:
# Ignore if EEXISTS. This is needed to avoid a race if two getters run at once. # Ignore if EEXISTS. This is needed to avoid a race if a getter is running at the same time.
if e.errno != errno.EEXIST: if e.errno != errno.EEXIST:
raise raise
@ -114,16 +148,15 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None,
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
now_str = now.strftime(hour_fmt) now_str = now.strftime(HOUR_FMT)
now_hour = datetime.strptime(now_str, hour_fmt)
hours = [now_str] hours = [now_str]
for i in range(n_hours - 1): for i in range(n_hours - 1):
previous_hour = datetime.strptime(hours[-1], hour_fmt) previous_hour = datetime.strptime(hours[-1], HOUR_FMT)
current_hour = previous_hour + datetime.timedelta(hours=-1) current_hour = previous_hour + datetime.timedelta(hours=-1)
hours.append(current_hour.strftime(hour_fmt)) hours.append(current_hour.strftime(HOUR_FMT))
for node in nodes: for node in nodes:
@ -148,12 +181,12 @@ def backfill_node(base_dir, node, stream, variants, hours, failure_limit):
node_hours = hours node_hours = hours
for hour in node_hours: for hour in node_hours:
# if this fails, get an empty list back so this loop quickly
# finishes
local_segments = list_local_segments(base_dir, stream, variant, local_segments = list_local_segments(base_dir, stream, variant,
hour) hour)
local_segments = set(local_segments) local_segments = set(local_segments)
#should include the result of this in the failure count # if this fails, get an empty list back so no missing segments are
# requested
remote_segments = list_remote_segments(node, stream, variant, hour) remote_segments = list_remote_segments(node, stream, variant, hour)
remote_segments = set(remote_segments) remote_segments = set(remote_segments)
missing_segments = remote_segments - local_segments missing_segments = remote_segments - local_segments
@ -169,6 +202,8 @@ def backfill_node(base_dir, node, stream, variants, hours, failure_limit):
if failures > failure_limit: if failures > failure_limit:
return return
# all wait times are in minutes # all wait times are in minutes
# obviously adjust default times in response to how long back filling actually # obviously adjust default times in response to how long back filling actually
# takes # takes

Loading…
Cancel
Save