fixed white space and the like

pull/18/head
Christopher Usher 6 years ago committed by Mike Lang
parent afd948576d
commit fec0975d18

@ -8,18 +8,19 @@
# more frequently, backfill the last couple hours # more frequently, backfill the last couple hours
# (last three hour directories so always at least two hours). # (last three hour directories so always at least two hours).
import os
import time
import datetime import datetime
import errno import errno
import os
import time
import uuid import uuid
import requests import requests
import common import common
HOUR_FMT = '%Y-%m-%dT%H' HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5 TIMEOUT = 5 #default timeout for remote requests
def get_nodes(): def get_nodes():
@ -32,77 +33,70 @@ def get_nodes():
# each element in nodes is a 'protocol://host:port/' string # each element in nodes is a 'protocol://host:port/' string
nodes = [] nodes = []
return nodes return nodes
def list_local_segments(base_dir, stream, variant, hour):
def list_local_segments(base_dir, stream, variant, hour):
# based on restreamer.list_segments # based on restreamer.list_segments
# could just call restreamer.list_segments but this avoids http/json # could just call restreamer.list_segments but this avoids http/json overheads
# overheads
path = os.path.join(base_dir, stream, variant, hour) path = os.path.join(base_dir, stream, variant, hour)
try: try:
local_segments = [name for name in os.listdir(path) if not name.startswith('.')] return [name for name in os.listdir(path) if not name.startswith('.')]
except OSError as e: except OSError as e:
if e.errno != errno.ENOENT: if e.errno != errno.ENOENT:
raise raise
local_segments = [] return []
return local_segments
def list_remote_hours(node, stream, variant, timeout=TIMEOUT): def list_remote_hours(node, stream, variant, timeout=TIMEOUT):
# just a wrapper around a call to restreamer.list_hours # just a wrapper around a call to restreamer.list_hours
uri = '{}/files/{}/{}'.format(node, stream, variant) uri = '{}/files/{}/{}'.format(node, stream, variant)
resp = requests.get(uri, timeout=timeout) resp = requests.get(uri, timeout=timeout)
hours = resp.json() return resp.json()
return hours
def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
# just a wrapper around a call to restreamer.list_segments # just a wrapper around a call to restreamer.list_segments
uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour_str) uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour)
resp = requests.get(uri, timeout=timeout) resp = requests.get(uri, timeout=timeout)
remote_segments = resp.json() return resp.json()
return remote_segments
# based on _get_segment in downloader/main # based on _get_segment in downloader/main
# very basic error handling # very basic error handling
def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
timeout=TIMEOUT): timeout=TIMEOUT):
file_created = False
path = os.path.join(base_dir, stream, variant, hour, missing_segment) path = os.path.join(base_dir, stream, variant, hour, missing_segment)
# check to see if file already exists to avoid unnecessarily copying it
if os.path.exists(path): if os.path.exists(path):
return return
substrs = path.split('-') dir_name = os.path.dirname(path)
temp_path = '-'.join(substrs[:-1] + [str(uuid.uuid4()) + '.st']) date, duration, _ = os.path.basename(path).split('-', 2)
temp_name = "-".join([date, duration, "temp", str(uuid.uuid4())])
temp_path = os.path.join(dir_name, "{}.ts".format(temp_name))
common.ensure_directory(temp_path) common.ensure_directory(temp_path)
try: try:
uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment) uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment)
resp = requests.get(uri, stream=True, timeout=timeout) resp = requests.get(uri, stream=True, timeout=timeout)
resp.raise_for_status()
with open(temp_path, 'w') as f: with open(temp_path, 'w') as f:
file_created = True
for chunk in resp.iter_content(8192): for chunk in resp.iter_content(8192):
f.write(chunk) f.write(chunk)
except Exception: except Exception:
ex_type, ex, tb = sys.exc_info() if os.path.exists(temp_path):
if file_created:
os.remove(temp_path) os.remove(temp_path)
raise
raise ex_type, ex, tb
common.rename(temp_path, path) common.rename(temp_path, path)
def backfill(base_dir, stream, variants, hours=None, nodes=None): def backfill(base_dir, stream, variants, hours=None, nodes=None):
# loop over nodes backfilling from each # loop over nodes backfilling from each
@ -112,24 +106,31 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None):
#ideally do this in parallel #ideally do this in parallel
for node in nodes: for node in nodes:
try: try:
backfill_node(base_dir, node, stream, variants, hours) backfill_node(base_dir, node, stream, variants, hours)
#need to replace this with a more sophisticated error handler #need to replace this with a more sophisticated error handler
except Exception as e: except Exception as e:
print node, e print node, e
def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60): def is_iterable(x):
try:
iter(x)
except TypeError:
return False
return True
# if hours is int, backfill last hours hourdirs
# else if hours is None, backfill all hourdirs
# else assume hours is iterable and backfill those hourdirs
def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60):
if isinstance(hours, int): # if hours is None, backfill all hourdirs
if hours is None:
hours = list_remote_hours(node, stream, variant)
# if hours is iterable, backfill those hourdirs
elif is_iterable(hours):
None
# assume int and backfill last hours hourdirs
else:
n_hours = hours n_hours = hours
if n_hours < 1: if n_hours < 1:
@ -140,17 +141,10 @@ def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60
for variant in variants: for variant in variants:
if hours is None: for hour in hours:
node_hours = list_remote_hours(node, stream, variant)
else:
node_hours = hours
for hour in node_hours:
local_segments = list_local_segments(base_dir, stream, variant, hour) local_segments = set(list_local_segments(base_dir, stream, variant, hour))
local_segments = set(local_segments) remote_segments = set(list_remote_segments(node, stream, variant, hour))
remote_segments = list_remote_segments(node, stream, variant, hour)
remote_segments = set(remote_segments)
missing_segments = remote_segments - local_segments missing_segments = remote_segments - local_segments
for missing_segment in missing_segments: for missing_segment in missing_segments:
@ -163,9 +157,6 @@ def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment) get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
# all wait times are in minutes # all wait times are in minutes
# obviously adjust default times in response to how long back filling actually # obviously adjust default times in response to how long back filling actually
@ -197,15 +188,6 @@ def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, sleep_time
fill_start = now fill_start = now
else: else:
time.sleep(common.jitter(60 * sleep_time)) time.sleep(common.jitter(60 * sleep_time))

@ -266,6 +266,7 @@ def rename(old, new):
raise raise
os.remove(old) os.remove(old)
def ensure_directory(path): def ensure_directory(path):
"""Create directory that contains path, as well as any parent directories, """Create directory that contains path, as well as any parent directories,
if they don't already exist.""" if they don't already exist."""
@ -280,6 +281,7 @@ def ensure_directory(path):
if e.errno != errno.EEXIST: if e.errno != errno.EEXIST:
raise raise
def jitter(interval): def jitter(interval):
"""Apply some 'jitter' to an interval. This is a random +/- 10% change in order to """Apply some 'jitter' to an interval. This is a random +/- 10% change in order to
smooth out patterns and prevent everything from retrying at the same time. smooth out patterns and prevent everything from retrying at the same time.

@ -461,7 +461,7 @@ class SegmentGetter(object):
logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment))
return return
resp.raise_for_status() resp.raise_for_status()
ensure_directory(temp_path) common.ensure_directory(temp_path)
with open(temp_path, 'w') as f: with open(temp_path, 'w') as f:
file_created = True file_created = True
# We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway, # We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway,

Loading…
Cancel
Save