|
|
|
@ -1,8 +1,9 @@
|
|
|
|
|
"""Download segments from other nodes to catch stuff this node missed."""
|
|
|
|
|
# TODO logging, better exception handling
|
|
|
|
|
# TODO more logging, better exception handling
|
|
|
|
|
|
|
|
|
|
import datetime
|
|
|
|
|
import errno
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import random
|
|
|
|
|
import time
|
|
|
|
@ -69,7 +70,8 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
|
|
|
|
|
Fetches stream/variant/hour/missing_segment from node and puts it in base_dir/stream/variant/hour/missing_segment. If the segment already exists locally, this does not attempt to fetch it."""
|
|
|
|
|
|
|
|
|
|
path = os.path.join(base_dir, stream, variant, hour, missing_segment)
|
|
|
|
|
# check to see if file already exists to avoid unnecessarily copying it
|
|
|
|
|
# check to see if file was created since we listed the local segments to
|
|
|
|
|
# avoid unnecessarily copying
|
|
|
|
|
if os.path.exists(path):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
@ -111,8 +113,8 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None,
|
|
|
|
|
for node in nodes:
|
|
|
|
|
try:
|
|
|
|
|
backfill_node(base_dir, node, stream, variants, hours, start, stop, order=order)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print node, e
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.exception("Error while backfilling node {}".format(node))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_iterable(x):
|
|
|
|
@ -131,16 +133,23 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
Backfill from node/stream/variants to base_dir/stream/variants.
|
|
|
|
|
|
|
|
|
|
Keyword arguments:
|
|
|
|
|
hours -- If None (default), backfill all available hours. If iterable, backfill only hours in iterable. Otherwise backfill the last N hours, starting with the lastest.
|
|
|
|
|
start -- Only backfill hours starting after or equal to this datetime object. If None (default), backfill all hours.
|
|
|
|
|
stop -- Only backfill hours starting before or equal to this datetime object. If None (default), backfill all hours.
|
|
|
|
|
order -- If 'random', randomise the order of hours. If 'forward', sort the hours in acceding order. If 'reverse', sort the hours in descending order. Otherwise, do not change the order of hours (default).
|
|
|
|
|
recent_cutoff -- Skip backfilling segments younger than this number of seconds to prioritise letting the downloader grab these segments."""
|
|
|
|
|
hours -- If None (default), backfill all available hours. If iterable,
|
|
|
|
|
backfill only hours in iterable. Otherwise backfill the last N hours,
|
|
|
|
|
starting with the lastest.
|
|
|
|
|
start -- Only backfill hours starting after or equal to this datetime
|
|
|
|
|
object. If None (default), backfill all hours.
|
|
|
|
|
stop -- Only backfill hours starting before or equal to this datetime
|
|
|
|
|
object. If None (default), backfill all hours.
|
|
|
|
|
order -- If 'random', randomise the order of hours. If 'forward', sort the
|
|
|
|
|
hours in ascending order. If 'reverse', sort the hours in descending
|
|
|
|
|
order. Otherwise, do not change the order of hours (default).
|
|
|
|
|
recent_cutoff -- Skip backfilling segments younger than this number of
|
|
|
|
|
seconds to prioritise letting the downloader grab these segments."""
|
|
|
|
|
|
|
|
|
|
if hours is None:
|
|
|
|
|
hours = list_remote_hours(node, stream, variant)
|
|
|
|
|
elif is_iterable(hours):
|
|
|
|
|
None
|
|
|
|
|
pass # hours already in desired format
|
|
|
|
|
else:
|
|
|
|
|
n_hours = hours
|
|
|
|
|
if n_hours < 1:
|
|
|
|
@ -157,7 +166,7 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
# multiple hours (say on start up) so that you don't try to backfill the
|
|
|
|
|
# same hour at the same time
|
|
|
|
|
if order == 'random':
|
|
|
|
|
hours = random.shuffle(hours)
|
|
|
|
|
random.shuffle(hours)
|
|
|
|
|
elif order == 'forward':
|
|
|
|
|
sort(hours)
|
|
|
|
|
elif order == 'reverse':
|
|
|
|
@ -173,18 +182,16 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
|
|
|
|
|
for missing_segment in missing_segments:
|
|
|
|
|
|
|
|
|
|
#ignore temporary files
|
|
|
|
|
if 'temp' in missing_segment:
|
|
|
|
|
continue
|
|
|
|
|
path = os.path.join(stream, variant, hour, missing_segment)
|
|
|
|
|
|
|
|
|
|
#only get '*.ts' files to try to only get segments
|
|
|
|
|
if missing_segment[-3:] != '.ts':
|
|
|
|
|
# test to see if file is a segment and get the segments start time
|
|
|
|
|
try:
|
|
|
|
|
segment = common.parse_segment_path(path)
|
|
|
|
|
except ValueError:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old
|
|
|
|
|
time_str = '{}:{}'.format(hour, missing_segment.split('-')[0])
|
|
|
|
|
segment_time = datetime.datetime.strptime(time_str, HOUR_FMT + ':%M:%S.%f')
|
|
|
|
|
if datetime.datetime.utcnow() - segment_time < datetime.timedelta(seconds=recent_cutoff):
|
|
|
|
|
if datetime.datetime.utcnow() - segment['start'] < datetime.timedelta(seconds=recent_cutoff):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
|
|
|
|
|