From 0b524a72cb9986e8bc093ea4adfcdc89b3650ef9 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Sun, 30 Dec 2018 17:24:28 -0800 Subject: [PATCH] docstings and a few minor feature additions to the backfiller --- backfiller/backfiller/main.py | 81 ++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 73895cd..0fe27f6 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -1,12 +1,5 @@ - -# this is a prototype of the backfiller -# lots about web services and the like I don't know -# needs logging, exception handling and the like -# also proper doc strings - -# when starting the backfiller and every few hours, backfill everything -# more frequently, backfill the last couple hours -# (last three hour directories so always at least two hours). +"""Download segments from other nodes to catch stuff this node missed.""" +# TODO logging, better exception handling import datetime import errno @@ -21,26 +14,30 @@ import common HOUR_FMT = '%Y-%m-%dT%H' - TIMEOUT = 5 #default timeout for remote requests -def get_nodes(): +def get_nodes(): + """List address of other wubloaders. + + This returns a list of the other wubloaders as strings of the form 'protocol://host:port/'""" # either read a config file or query the database to get the addresses # of the other nodes # figure out some way that the local machine isn't in the list of returned # nodes so that - # as a prototype can just hardcode some addresses. - # each element in nodes is a 'protocol://host:port/' string nodes = [] return nodes def list_local_segments(base_dir, stream, variant, hour): - # based on restreamer.list_segments - # could just call restreamer.list_segments but this avoids http/json overheads + """List segments in a given hour directory. + + For a given base_dir/stream/variant/hour directory return a list of non-hidden files. If the directory path is not found, return an empty list. + + Based on based on restreamer.list_segments. We could just call restreamer.list_segments but this avoids HTTP/JSON overheads.""" + path = os.path.join(base_dir, stream, variant, hour) try: return [name for name in os.listdir(path) if not name.startswith('.')] @@ -52,23 +49,24 @@ def list_local_segments(base_dir, stream, variant, hour): def list_remote_hours(node, stream, variant, timeout=TIMEOUT): - # just a wrapper around a call to restreamer.list_hours + """Wrapper around a call to restreamer.list_hours.""" uri = '{}/files/{}/{}'.format(node, stream, variant) resp = requests.get(uri, timeout=timeout) return resp.json() def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT): - # just a wrapper around a call to restreamer.list_segments + """Wrapper around a call to restreamer.list_segments.""" uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour) resp = requests.get(uri, timeout=timeout) return resp.json() -# based on _get_segment in downloader/main -# very basic error handling def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, timeout=TIMEOUT): + """Get a segment from a node. + + Fetches stream/variant/hour/missing_segment from node and puts it in base_dir/stream/variant/hour/missing_segment. If the segment already exists locally, this does not attempt to fetch it.""" path = os.path.join(base_dir, stream, variant, hour, missing_segment) # check to see if file already exists to avoid unnecessarily copying it @@ -91,6 +89,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, for chunk in resp.iter_content(8192): f.write(chunk) + #try to get rid of the temp file if an exception is raised. except Exception: if os.path.exists(temp_path): os.remove(temp_path) @@ -99,11 +98,11 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, common.rename(temp_path, path) - def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None, stop=None, order=None): - - # loop over nodes backfilling from each + """Loop over nodes backfilling from each. + + Backfill from node/stream/variants to base_dir/stream/variants for each node in nodes. If nodes is None, use get_nodes() to get a list of nodes to backfill from. Passes hours, start, stop and order to backfill_node to control which hours are backfilled and in which order. By default all hours are backfilled. If backfilling from a node raises an exception, this just goes onto the next node.""" if nodes is None: nodes = get_nodes() @@ -112,12 +111,12 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None, for node in nodes: try: backfill_node(base_dir, node, stream, variants, hours, start, stop, order=order) - #need to replace this with a more sophisticated error handler except Exception as e: print node, e def is_iterable(x): + """Test whether input is iterable.""" try: iter(x) except TypeError: @@ -126,27 +125,31 @@ def is_iterable(x): def backfill_node(base_dir, node, stream, variants, hours=None, start=None, - stop=None, recent_cutoff=60, order=None): + stop=None, order=None, recent_cutoff=60): + """Backfill from remote node. + + Backfill from node/stream/variants to base_dir/stream/variants. + + Keyword arguments: + hours -- If None (default), backfill all available hours. If iterable, backfill only hours in iterable. Otherwise backfill the last N hours, starting with the lastest. + start -- Only backfill hours starting after or equal to this datetime object. If None (default), backfill all hours. + stop -- Only backfill hours starting before or equal to this datetime object. If None (default), backfill all hours. + order -- If 'random', randomise the order of hours. If 'forward', sort the hours in acceding order. If 'reverse', sort the hours in descending order. Otherwise, do not change the order of hours (default). + recent_cutoff -- Skip backfilling segments younger than this number of seconds to prioritise letting the downloader grab these segments.""" - # if hours is None, backfill all hourdirs if hours is None: hours = list_remote_hours(node, stream, variant) - # if hours is iterable, backfill those hourdirs elif is_iterable(hours): None - # assume int and backfill last hours hourdirs else: n_hours = hours - if n_hours < 1: raise ValueError('Number of hours has to be 1 or greater') - now = datetime.datetime.utcnow() hours = [(now - i * timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] if start is not None: hours = [hour for hour in hours if hour >= start] - if stop is not None: hours = [hour for hour in hours if hour <= stop] @@ -155,6 +158,10 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, # same hour at the same time if order == 'random': hours = random.shuffle(hours) + elif order == 'forward': + sort(hours) + elif order == 'reverse': + sort(hours, reverse=True) for variant in variants: @@ -170,6 +177,10 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, if 'temp' in missing_segment: continue + #only get '*.ts' files to try to only get segments + if missing_segment[-3:] != '.ts': + continue + #to avoid getting in the downloader's way ignore segments less than recent_cutoff old time_str = '{}:{}'.format(hour, missing_segment.split('-')[0]) segment_time = datetime.datetime.strptime(time_str, HOUR_FMT + ':%M:%S.%f') @@ -179,15 +190,17 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, get_remote_segment(base_dir, node, stream, variant, hour, missing_segment) -# all wait times are in minutes -# obviously adjust default times in response to how long back filling actually -# takes def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, sleep_time=1): + """Prototype backfiller service. + + Do a full backfill of stream/variants from all nodes initially. Then every sleep_time minutes check to see if more than fill_wait minutes have passed since the last backfill. If so do a backfill of the last 3 hours. Also check whether it has been more than full_fill_wait minutes since the last full backfill; if so, do a full backfill.""" + # TODO replace this with a more robust event based service and backfill from multiple nodes in parallel + # stretch goal: provide an interface to trigger backfills manually + # stretch goal: use the backfiller to monitor the restreamer fill_start = datetime.datetime.now() full_fill_start = fill_start - # Do a full backfill at start backfill(base_dir, stream, variants, order='random') # I'm sure there is a module that does this in a more robust way