modifications to the backfiller in response to ekimekim's comments

pull/18/head
Christopher Usher 6 years ago committed by Mike Lang
parent ba52bf7f5d
commit 7d26997b1f

@ -12,9 +12,12 @@ import os
import time import time
import datetime import datetime
import errno import errno
import uuid
import requests import requests
import common
HOUR_FMT = '%Y-%m-%dT%H' HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5 TIMEOUT = 5
@ -26,6 +29,7 @@ def get_nodes():
# nodes so that # nodes so that
# as a prototype can just hardcode some addresses. # as a prototype can just hardcode some addresses.
# each element in nodes is a 'protocol://host:port/' string
nodes = [] nodes = []
@ -38,50 +42,28 @@ def list_local_segments(base_dir, stream, variant, hour):
# overheads # overheads
path = os.path.join(base_dir, stream, variant, hour) path = os.path.join(base_dir, stream, variant, hour)
try: try:
local_segments = [name for name in os.listdir(path) if not local_segments = [name for name in os.listdir(path) if not name.startswith('.')]
name.startswith('.')]
except OSError as e: except OSError as e:
if e.errno != errno.ENOENT: if e.errno != errno.ENOENT:
raise: raise
else:
local_segments = [] local_segments = []
return local_segments return local_segments
def list_remote_hours(node, stream, variant, timeout=TIMEOUT): def list_remote_hours(node, stream, variant, timeout=TIMEOUT):
# just a wrapper around a call to restreamer.list_hours # just a wrapper around a call to restreamer.list_hours
uri = '{}/files/{}/{}'.format(node, stream, variant)
uri = 'https://{}/files/{}/{}'.format(node, stream, variant) resp = requests.get(uri, timeout=timeout)
try:
resp = requests.get(uri, timeout=timeout)
except requests.exceptions.Timeout:
return []
if resp.status_code != request.codes.ok:
return []
hours = resp.json() hours = resp.json()
return hours return hours
def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT): def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
# just a wrapper around a call to restreamer.list_segments # just a wrapper around a call to restreamer.list_segments
uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour_str)
uri = 'https://{}/files/{}/{}/{}'.format(node, stream, variant, hour_str) resp = requests.get(uri, timeout=timeout)
try:
resp = requests.get(uri, timeout=timeout)
except requests.exceptions.Timeout:
return []
if resp.status_code != request.codes.ok:
return []
remote_segments = resp.json() remote_segments = resp.json()
return remote_segments return remote_segments
@ -90,49 +72,28 @@ def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
timeout=TIMEOUT): timeout=TIMEOUT):
uri = 'https://{}/segments/{}/{}/{}/{}'.format(node, stream, variant,
hour, missing_segment)
resp = requests.get(uri, stream=True, path = os.path.join(base_dir, stream, variant, hour, missing_segment)
timeout=timeout) if os.path.exists(path):
try: return
resp = requests.get(uri, stream=True, timeout=timeout)
except requests.exceptions.Timeout: common.ensure_directory(path)
return False
if resp.status_code != requests.codes.ok: substrs = path.split('-')
return False temp_path = '-'.join(substrs[:-1] + [str(uuid.uuid4()) + '.st'])
temp_name = 'temp_backfill' uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment)
resp = requests.get(uri, stream=True, timeout=timeout)
with open(temp_name, 'w') as f: with open(temp_path, 'w') as f:
for chunk in resp.iter_content(8192): for chunk in resp.iter_content(8192):
f.write(chunk) f.write(chunk)
dir_path = os.path.join(base_dir, stream, variant, hour) common.rename(temp_path, path)
if not os.path.exists(dir_path):
try:
os.mkdir(dir_path)
except OSError as e:
# Ignore if EEXISTS. This is needed to avoid a race if a getter is running at the same time.
if e.errno != errno.EEXIST:
raise
path = os.path.join(dir_path, missing_segment)
os.rename(temp_name, path)
return True
def backfill(base_dir, stream, variants, hours=None, nodes=None):
def backfill(base_dir, stream, variants, hours=None, nodes=None,
failure_limit=5):
# if hours is int, backfill last hours hourdirs
# else if hours is None, backfill all hourdirs
# else assume hours is iterable and backfill those hourdirs
# loop over nodes asking for a list of segments then downloads any # loop over nodes asking for a list of segments then downloads any
# segments it doesn't have # segments it doesn't have
@ -140,75 +101,69 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None,
if nodes is None: if nodes is None:
nodes = get_nodes() nodes = get_nodes()
if isinstance(hours, int):
n_hours = hours
if n_hours < 1:
raise ValueError('Number of hours has to be 1 or greater')
now = datetime.datetime.utcnow() #ideally do this in parallel
for node in nodes:
now_str = now.strftime(HOUR_FMT) try:
backfill_node(base_dir, node, stream, variants, hours)
hours = [now_str] #need to replace this with a more sophisticated error handler
except Exception as e:
print node, e
for i in range(n_hours - 1):
previous_hour = datetime.strptime(hours[-1], HOUR_FMT) def backfill_node(base_dir, node, stream, variants, hours, recent_cutoff=60):
current_hour = previous_hour + datetime.timedelta(hours=-1)
hours.append(current_hour.strftime(HOUR_FMT))
for node in nodes:
backfill_node(base_dir, node, stream, variants, hours, # if hours is int, backfill last hours hourdirs
failure_limit) # else if hours is None, backfill all hourdirs
# else assume hours is iterable and backfill those hourdirs
def backfill_node(base_dir, node, stream, variants, hours, failure_limit): if isinstance(hours, int):
n_hours = hours
if n_hours < 1:
raise ValueError('Number of hours has to be 1 or greater')
# split into its own function to allow breaking out of two loops at once now = datetime.datetime.utcnow()
# count failures this node has and if too many occur, assume node isn't hours = [(now - i * timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)]
# working and move onto next
failures = 0
for variant in variants: for variant in variants:
if hours is None: if hours is None:
# if this fails, get an empty list back so function quickly
# finishes
node_hours = list_remote_hours(node, stream, variant) node_hours = list_remote_hours(node, stream, variant)
else: else:
node_hours = hours node_hours = hours
for hour in node_hours: for hour in node_hours:
local_segments = list_local_segments(base_dir, stream, variant, local_segments = list_local_segments(base_dir, stream, variant, hour)
hour)
local_segments = set(local_segments) local_segments = set(local_segments)
# if this fails, get an empty list back so no missing segments are
# requested
remote_segments = list_remote_segments(node, stream, variant, hour) remote_segments = list_remote_segments(node, stream, variant, hour)
remote_segments = set(remote_segments) remote_segments = set(remote_segments)
missing_segments = remote_segments - local_segments missing_segments = remote_segments - local_segments
for missing_segment in missing_segments: for missing_segment in missing_segments:
status = get_remote_segment(base_dir, node, stream, variant, #to avoid getting in the downloader's way ignore segments less than recent_cutoff old
hour, missing_segment) time_str = '{}:{}'.format(hour, missing_segment.split('-')[0])
segment_time = datetime.datetime.strptime(time_str, HOUR_FMT + ':%M:%S.%f')
if datetime.datetime.utcnow() - segment_time < datetime.timedelta(seconds=recent_cutoff):
continue
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
if not status:
failures += 1
if failures > failure_limit:
return
# all wait times are in minutes # all wait times are in minutes
# obviously adjust default times in response to how long back filling actually # obviously adjust default times in response to how long back filling actually
# takes # takes
def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, sleep_time=1):
sleep_time=1):
fill_start = datetime.datetime.now() fill_start = datetime.datetime.now()
full_fill_start = fill_start full_fill_start = fill_start
@ -237,7 +192,7 @@ def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180,
else: else:
time.sleep(60 * sleep_time) time.sleep(common.jitter(60 * sleep_time))

Loading…
Cancel
Save