docstings and a few minor feature additions to the backfiller

pull/18/head
Christopher Usher 6 years ago committed by Mike Lang
parent a59f6e1569
commit 0b524a72cb

@ -1,12 +1,5 @@
# this is a prototype of the backfiller
# lots about web services and the like I don't know
# needs logging, exception handling and the like
# also proper doc strings
# when starting the backfiller and every few hours, backfill everything
# more frequently, backfill the last couple hours
# (last three hour directories so always at least two hours).
"""Download segments from other nodes to catch stuff this node missed."""
# TODO logging, better exception handling
import datetime
import errno
@ -21,26 +14,30 @@ import common
HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5 #default timeout for remote requests
def get_nodes():
"""List address of other wubloaders.
This returns a list of the other wubloaders as strings of the form 'protocol://host:port/'"""
# either read a config file or query the database to get the addresses
# of the other nodes
# figure out some way that the local machine isn't in the list of returned
# nodes so that
# as a prototype can just hardcode some addresses.
# each element in nodes is a 'protocol://host:port/' string
nodes = []
return nodes
def list_local_segments(base_dir, stream, variant, hour):
# based on restreamer.list_segments
# could just call restreamer.list_segments but this avoids http/json overheads
"""List segments in a given hour directory.
For a given base_dir/stream/variant/hour directory return a list of non-hidden files. If the directory path is not found, return an empty list.
Based on based on restreamer.list_segments. We could just call restreamer.list_segments but this avoids HTTP/JSON overheads."""
path = os.path.join(base_dir, stream, variant, hour)
try:
return [name for name in os.listdir(path) if not name.startswith('.')]
@ -52,23 +49,24 @@ def list_local_segments(base_dir, stream, variant, hour):
def list_remote_hours(node, stream, variant, timeout=TIMEOUT):
# just a wrapper around a call to restreamer.list_hours
"""Wrapper around a call to restreamer.list_hours."""
uri = '{}/files/{}/{}'.format(node, stream, variant)
resp = requests.get(uri, timeout=timeout)
return resp.json()
def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
# just a wrapper around a call to restreamer.list_segments
"""Wrapper around a call to restreamer.list_segments."""
uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour)
resp = requests.get(uri, timeout=timeout)
return resp.json()
# based on _get_segment in downloader/main
# very basic error handling
def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
timeout=TIMEOUT):
"""Get a segment from a node.
Fetches stream/variant/hour/missing_segment from node and puts it in base_dir/stream/variant/hour/missing_segment. If the segment already exists locally, this does not attempt to fetch it."""
path = os.path.join(base_dir, stream, variant, hour, missing_segment)
# check to see if file already exists to avoid unnecessarily copying it
@ -91,6 +89,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
for chunk in resp.iter_content(8192):
f.write(chunk)
#try to get rid of the temp file if an exception is raised.
except Exception:
if os.path.exists(temp_path):
os.remove(temp_path)
@ -99,11 +98,11 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
common.rename(temp_path, path)
def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None,
stop=None, order=None):
"""Loop over nodes backfilling from each.
# loop over nodes backfilling from each
Backfill from node/stream/variants to base_dir/stream/variants for each node in nodes. If nodes is None, use get_nodes() to get a list of nodes to backfill from. Passes hours, start, stop and order to backfill_node to control which hours are backfilled and in which order. By default all hours are backfilled. If backfilling from a node raises an exception, this just goes onto the next node."""
if nodes is None:
nodes = get_nodes()
@ -112,12 +111,12 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None,
for node in nodes:
try:
backfill_node(base_dir, node, stream, variants, hours, start, stop, order=order)
#need to replace this with a more sophisticated error handler
except Exception as e:
print node, e
def is_iterable(x):
"""Test whether input is iterable."""
try:
iter(x)
except TypeError:
@ -126,27 +125,31 @@ def is_iterable(x):
def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
stop=None, recent_cutoff=60, order=None):
stop=None, order=None, recent_cutoff=60):
"""Backfill from remote node.
Backfill from node/stream/variants to base_dir/stream/variants.
Keyword arguments:
hours -- If None (default), backfill all available hours. If iterable, backfill only hours in iterable. Otherwise backfill the last N hours, starting with the lastest.
start -- Only backfill hours starting after or equal to this datetime object. If None (default), backfill all hours.
stop -- Only backfill hours starting before or equal to this datetime object. If None (default), backfill all hours.
order -- If 'random', randomise the order of hours. If 'forward', sort the hours in acceding order. If 'reverse', sort the hours in descending order. Otherwise, do not change the order of hours (default).
recent_cutoff -- Skip backfilling segments younger than this number of seconds to prioritise letting the downloader grab these segments."""
# if hours is None, backfill all hourdirs
if hours is None:
hours = list_remote_hours(node, stream, variant)
# if hours is iterable, backfill those hourdirs
elif is_iterable(hours):
None
# assume int and backfill last hours hourdirs
else:
n_hours = hours
if n_hours < 1:
raise ValueError('Number of hours has to be 1 or greater')
now = datetime.datetime.utcnow()
hours = [(now - i * timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)]
if start is not None:
hours = [hour for hour in hours if hour >= start]
if stop is not None:
hours = [hour for hour in hours if hour <= stop]
@ -155,6 +158,10 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
# same hour at the same time
if order == 'random':
hours = random.shuffle(hours)
elif order == 'forward':
sort(hours)
elif order == 'reverse':
sort(hours, reverse=True)
for variant in variants:
@ -170,6 +177,10 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
if 'temp' in missing_segment:
continue
#only get '*.ts' files to try to only get segments
if missing_segment[-3:] != '.ts':
continue
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old
time_str = '{}:{}'.format(hour, missing_segment.split('-')[0])
segment_time = datetime.datetime.strptime(time_str, HOUR_FMT + ':%M:%S.%f')
@ -179,15 +190,17 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
# all wait times are in minutes
# obviously adjust default times in response to how long back filling actually
# takes
def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, sleep_time=1):
"""Prototype backfiller service.
Do a full backfill of stream/variants from all nodes initially. Then every sleep_time minutes check to see if more than fill_wait minutes have passed since the last backfill. If so do a backfill of the last 3 hours. Also check whether it has been more than full_fill_wait minutes since the last full backfill; if so, do a full backfill."""
# TODO replace this with a more robust event based service and backfill from multiple nodes in parallel
# stretch goal: provide an interface to trigger backfills manually
# stretch goal: use the backfiller to monitor the restreamer
fill_start = datetime.datetime.now()
full_fill_start = fill_start
# Do a full backfill at start
backfill(base_dir, stream, variants, order='random')
# I'm sure there is a module that does this in a more robust way

Loading…
Cancel
Save