|
|
@ -17,6 +17,14 @@ import common
|
|
|
|
HOUR_FMT = '%Y-%m-%dT%H'
|
|
|
|
HOUR_FMT = '%Y-%m-%dT%H'
|
|
|
|
TIMEOUT = 5 #default timeout for remote requests
|
|
|
|
TIMEOUT = 5 #default timeout for remote requests
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def encode_strings(o):
|
|
|
|
|
|
|
|
if isinstance(o, list):
|
|
|
|
|
|
|
|
return [encode_strings(x) for x in o]
|
|
|
|
|
|
|
|
if isinstance(o, dict):
|
|
|
|
|
|
|
|
return {k.encode('utf-8'): encode_strings(v) for k, v in o.items()}
|
|
|
|
|
|
|
|
if isinstance(o, unicode):
|
|
|
|
|
|
|
|
return o.encode('utf-8')
|
|
|
|
|
|
|
|
return o
|
|
|
|
|
|
|
|
|
|
|
|
def get_nodes():
|
|
|
|
def get_nodes():
|
|
|
|
"""List address of other wubloaders.
|
|
|
|
"""List address of other wubloaders.
|
|
|
@ -31,7 +39,7 @@ def get_nodes():
|
|
|
|
|
|
|
|
|
|
|
|
logging.info('Fetching list of other nodes')
|
|
|
|
logging.info('Fetching list of other nodes')
|
|
|
|
|
|
|
|
|
|
|
|
nodes = ['toodles.videostrike.team:1337/']
|
|
|
|
nodes = ['http://toodles.videostrike.team:1337/']
|
|
|
|
#nodes = []
|
|
|
|
#nodes = []
|
|
|
|
return nodes
|
|
|
|
return nodes
|
|
|
|
|
|
|
|
|
|
|
@ -60,7 +68,7 @@ def list_remote_hours(node, stream, variant, timeout=TIMEOUT):
|
|
|
|
uri = '{}/files/{}/{}'.format(node, stream, variant)
|
|
|
|
uri = '{}/files/{}/{}'.format(node, stream, variant)
|
|
|
|
logging.debug('Getting list of hours from {}'.format(uri))
|
|
|
|
logging.debug('Getting list of hours from {}'.format(uri))
|
|
|
|
resp = requests.get(uri, timeout=timeout)
|
|
|
|
resp = requests.get(uri, timeout=timeout)
|
|
|
|
return resp.json()
|
|
|
|
return common.encode_strings(resp.json())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
|
|
|
|
def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
|
|
|
@ -68,7 +76,7 @@ def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
|
|
|
|
uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour)
|
|
|
|
uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour)
|
|
|
|
logging.debug('Getting list of segments from {}'.format(uri))
|
|
|
|
logging.debug('Getting list of segments from {}'.format(uri))
|
|
|
|
resp = requests.get(uri, timeout=timeout)
|
|
|
|
resp = requests.get(uri, timeout=timeout)
|
|
|
|
return resp.json()
|
|
|
|
return common.encode_strings(resp.json())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
|
|
|
|
def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
|
|
|
@ -176,6 +184,7 @@ def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='r
|
|
|
|
for variant in variants:
|
|
|
|
for variant in variants:
|
|
|
|
|
|
|
|
|
|
|
|
for hour in hours:
|
|
|
|
for hour in hours:
|
|
|
|
|
|
|
|
logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour))
|
|
|
|
|
|
|
|
|
|
|
|
local_segments = set(list_local_segments(base_dir, stream, variant, hour))
|
|
|
|
local_segments = set(list_local_segments(base_dir, stream, variant, hour))
|
|
|
|
remote_segments = set(list_remote_segments(node, stream, variant, hour))
|
|
|
|
remote_segments = set(list_remote_segments(node, stream, variant, hour))
|
|
|
@ -201,10 +210,11 @@ def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='r
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old
|
|
|
|
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old
|
|
|
|
if datetime.datetime.utcnow() - segment['start'] < datetime.timedelta(seconds=recent_cutoff):
|
|
|
|
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff):
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
|
|
|
|
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
|
|
|
|
|
|
|
|
logging.info('{} segments in {}/{}/{} backfilled'.format(len(missing_segments), stream, variant, hour))
|
|
|
|
|
|
|
|
|
|
|
|
logging.info('Finished backfilling from {}'.format(node))
|
|
|
|
logging.info('Finished backfilling from {}'.format(node))
|
|
|
|
|
|
|
|
|
|
|
@ -214,7 +224,9 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180,
|
|
|
|
|
|
|
|
|
|
|
|
Do a backfill of the last 3 hours from stream/variants from all nodes
|
|
|
|
Do a backfill of the last 3 hours from stream/variants from all nodes
|
|
|
|
initially before doing a full backfill from all nodes. Then every sleep_time
|
|
|
|
initially before doing a full backfill from all nodes. Then every sleep_time
|
|
|
|
minutes check to see if more than fill_wait minutes have passed since the last backfill. If so do a backfill of the last 3 hours. Also check whether it has been more than full_fill_wait minutes since the last full backfill;
|
|
|
|
minutes check to see if more than fill_wait minutes have passed since the
|
|
|
|
|
|
|
|
last backfill. If so do a backfill of the last 3 hours. Also check whether
|
|
|
|
|
|
|
|
it has been more than full_fill_wait minutes since the last full backfill;
|
|
|
|
if so, do a full backfill."""
|
|
|
|
if so, do a full backfill."""
|
|
|
|
# TODO replace this with a more robust event based service and backfill from multiple nodes in parallel
|
|
|
|
# TODO replace this with a more robust event based service and backfill from multiple nodes in parallel
|
|
|
|
# stretch goal: provide an interface to trigger backfills manually
|
|
|
|
# stretch goal: provide an interface to trigger backfills manually
|
|
|
@ -227,7 +239,9 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180,
|
|
|
|
fill_start = datetime.datetime.now()
|
|
|
|
fill_start = datetime.datetime.now()
|
|
|
|
full_fill_start = fill_start
|
|
|
|
full_fill_start = fill_start
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
backfill(base_dir, stream, variants, 3)
|
|
|
|
backfill(base_dir, stream, variants, 3)
|
|
|
|
|
|
|
|
|
|
|
|
backfill(base_dir, stream, variants)
|
|
|
|
backfill(base_dir, stream, variants)
|
|
|
|
|
|
|
|
|
|
|
|
# I'm sure there is a module that does this in a more robust way
|
|
|
|
# I'm sure there is a module that does this in a more robust way
|
|
|
|