diff --git a/backfiller/backfiller/__main__.py b/backfiller/backfiller/__main__.py index 9b9e890..e58d5f2 100644 --- a/backfiller/backfiller/__main__.py +++ b/backfiller/backfiller/__main__.py @@ -11,7 +11,6 @@ from backfiller.main import main LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" -logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() logging.basicConfig(level=level, format=LOG_FORMAT) argh.dispatch_command(main) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 0a0b88a..6ed7410 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -21,7 +21,8 @@ TIMEOUT = 5 #default timeout for remote requests def get_nodes(): """List address of other wubloaders. - This returns a list of the other wubloaders as strings of the form 'protocol://host:port/'""" + This returns a list of the other wubloaders as strings of the form + 'protocol://host:port/'""" # either read a config file or query the database to get the addresses # of the other nodes # figure out some way that the local machine isn't in the list of returned @@ -30,16 +31,19 @@ def get_nodes(): logging.info('Fetching list of other nodes') - nodes = [] + nodes = ['toodles.videostrike.team:1337/'] + #nodes = [] return nodes def list_local_segments(base_dir, stream, variant, hour): """List segments in a given hour directory. - For a given base_dir/stream/variant/hour directory return a list of non-hidden files. If the directory path is not found, return an empty list. + For a given base_dir/stream/variant/hour directory return a list of + non-hidden files. If the directory path is not found, return an empty list. - Based on based on restreamer.list_segments. We could just call restreamer.list_segments but this avoids HTTP/JSON overheads.""" + Based on based on restreamer.list_segments. We could just call + restreamer.list_segments but this avoids HTTP/JSON overheads.""" path = os.path.join(base_dir, stream, variant, hour) try: @@ -71,7 +75,9 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, timeout=TIMEOUT): """Get a segment from a node. - Fetches stream/variant/hour/missing_segment from node and puts it in base_dir/stream/variant/hour/missing_segment. If the segment already exists locally, this does not attempt to fetch it.""" + Fetches stream/variant/hour/missing_segment from node and puts it in + base_dir/stream/variant/hour/missing_segment. If the segment already exists + locally, this does not attempt to fetch it.""" path = os.path.join(base_dir, stream, variant, hour, missing_segment) logging.debug('Getting segment {}'.format(path)) @@ -101,15 +107,17 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, if os.path.exists(temp_path): os.remove(temp_path) raise - + logging.debug('Saving completed segment {} as {}'.format(temp_path, path)) common.rename(temp_path, path) -def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None, - stop=None, order=None): +def backfill(base_dir, stream, variants, hours=None, nodes=None): """Loop over nodes backfilling from each. - Backfill from node/stream/variants to base_dir/stream/variants for each node in nodes. If nodes is None, use get_nodes() to get a list of nodes to backfill from. Passes hours, start, stop and order to backfill_node to control which hours are backfilled and in which order. By default all hours are backfilled. If backfilling from a node raises an exception, this just goes onto the next node.""" + Backfill from node/stream/variants to base_dir/stream/variants for each node + in nodes. If nodes is None, use get_nodes() to get a list of nodes to + backfill from. By default all hours are backfilled. If backfilling from a + node raises an exception, this just goes onto the next node.""" if nodes is None: nodes = get_nodes() @@ -117,7 +125,7 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None, #ideally do this in parallel for node in nodes: try: - backfill_node(base_dir, node, stream, variants, hours, start, stop, order=order) + backfill_node(base_dir, node, stream, variants, hours) except Exception: logging.exception("Error while backfilling node {}".format(node)) @@ -131,9 +139,7 @@ def is_iterable(x): return True -def backfill_node(base_dir, node, stream, variants, hours=None, start=None, - stop=None, hour_order=None, segment_order='random', get_full_hour=True, - recent_cutoff=60): +def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='random', recent_cutoff=60): """Backfill from remote node. Backfill from node/stream/variants to base_dir/stream/variants. @@ -142,19 +148,10 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, hours -- If None (default), backfill all available hours. If iterable, backfill only hours in iterable. Otherwise backfill the last N hours, starting with the lastest. - start -- Only backfill hours starting after or equal to this datetime - object. If None (default), backfill all hours. If get_full_hour is - False, only segments starting after start will be backfilled. - stop -- Only backfill hours starting before or equal to this datetime - object. If None (default), backfill all hours. If get_full_hour is - False, only segments finishing before stop will be backfilled. - hour_order -- If 'random', randomise the order of hours. If 'forward', sort - the hours in ascending order. If 'reverse', sort the hours in descending order. Otherwise, do not change the order of hours (default). segment_order -- If 'random', randomise the order of segments (default). If 'forward', sort the segment in ascending order. If 'reverse', sort the segments in descending order. Otherwise, do not change the order of segments. - get_full_hour -- If True (default), get all segments in an hour. If False, use start and stop to limit which segments are backfilled. recent_cutoff -- Skip backfilling segments younger than this number of seconds to prioritise letting the downloader grab these segments.""" @@ -176,19 +173,6 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, now = datetime.datetime.utcnow() hours = [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] - if start is not None: - hours = [hour for hour in hours if hour >= start] - if stop is not None: - hours = [hour for hour in hours if hour <= stop] - - # useful if running in parallel so multiple nodes don't request the same hour at the same time - if hour_order == 'random': - random.shuffle(hours) - elif hour_order == 'forward': - hours.sort() - elif hour_order == 'reverse': - hours.sort(reverse=True) - for variant in variants: for hour in hours: @@ -213,12 +197,7 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, try: segment = common.parse_segment_path(path) except ValueError as e: - logging.warning('File {} invaid: {}'.format(path, e.value)) - continue - - if not get_full_hour and start is not None and segment['start'] < start: - continue - if not get_full_hour and start is not None and segment['end'] > stop: + logging.warning('File {} invaid: {}'.format(path, e)) continue #to avoid getting in the downloader's way ignore segments less than recent_cutoff old @@ -230,19 +209,26 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, logging.info('Finished backfilling from {}'.format(node)) -def main(base_dir='/mnt', stream='desertbus', variants=['source'], fill_wait=5, full_fill_wait=180, sleep_time=1): +def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1): """Prototype backfiller service. - Do a backfill of the last 3 hours from stream/variants from all nodes initially before doing a full backfill from all nodes. Then every sleep_time minutes check to see if more than fill_wait minutes have passed since the last backfill. If so do a backfill of the last 3 hours. Also check whether it has been more than full_fill_wait minutes since the last full backfill; if so, do a full backfill.""" + Do a backfill of the last 3 hours from stream/variants from all nodes + initially before doing a full backfill from all nodes. Then every sleep_time + minutes check to see if more than fill_wait minutes have passed since the last backfill. If so do a backfill of the last 3 hours. Also check whether it has been more than full_fill_wait minutes since the last full backfill; + if so, do a full backfill.""" # TODO replace this with a more robust event based service and backfill from multiple nodes in parallel # stretch goal: provide an interface to trigger backfills manually # stretch goal: use the backfiller to monitor the restreamer + variants = variants.split(',') if variants else [] + + logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) + fill_start = datetime.datetime.now() full_fill_start = fill_start + backfill(base_dir, stream, variants, 3) backfill(base_dir, stream, variants) - backfill(base_dir, stream, variants, order='random') # I'm sure there is a module that does this in a more robust way # but I understand this and it gives the behaviour I want