|
|
@ -11,6 +11,7 @@
|
|
|
|
import datetime
|
|
|
|
import datetime
|
|
|
|
import errno
|
|
|
|
import errno
|
|
|
|
import os
|
|
|
|
import os
|
|
|
|
|
|
|
|
import random
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
import uuid
|
|
|
|
import uuid
|
|
|
|
|
|
|
|
|
|
|
@ -20,6 +21,7 @@ import common
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
HOUR_FMT = '%Y-%m-%dT%H'
|
|
|
|
HOUR_FMT = '%Y-%m-%dT%H'
|
|
|
|
|
|
|
|
|
|
|
|
TIMEOUT = 5 #default timeout for remote requests
|
|
|
|
TIMEOUT = 5 #default timeout for remote requests
|
|
|
|
|
|
|
|
|
|
|
|
def get_nodes():
|
|
|
|
def get_nodes():
|
|
|
@ -42,10 +44,10 @@ def list_local_segments(base_dir, stream, variant, hour):
|
|
|
|
path = os.path.join(base_dir, stream, variant, hour)
|
|
|
|
path = os.path.join(base_dir, stream, variant, hour)
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
return [name for name in os.listdir(path) if not name.startswith('.')]
|
|
|
|
return [name for name in os.listdir(path) if not name.startswith('.')]
|
|
|
|
|
|
|
|
|
|
|
|
except OSError as e:
|
|
|
|
except OSError as e:
|
|
|
|
if e.errno != errno.ENOENT:
|
|
|
|
if e.errno != errno.ENOENT:
|
|
|
|
raise
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
return []
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -97,7 +99,9 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
|
|
|
|
common.rename(temp_path, path)
|
|
|
|
common.rename(temp_path, path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def backfill(base_dir, stream, variants, hours=None, nodes=None):
|
|
|
|
|
|
|
|
|
|
|
|
def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None,
|
|
|
|
|
|
|
|
stop=None, order=None):
|
|
|
|
|
|
|
|
|
|
|
|
# loop over nodes backfilling from each
|
|
|
|
# loop over nodes backfilling from each
|
|
|
|
|
|
|
|
|
|
|
@ -107,7 +111,7 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None):
|
|
|
|
#ideally do this in parallel
|
|
|
|
#ideally do this in parallel
|
|
|
|
for node in nodes:
|
|
|
|
for node in nodes:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
backfill_node(base_dir, node, stream, variants, hours)
|
|
|
|
backfill_node(base_dir, node, stream, variants, hours, start, stop, order=order)
|
|
|
|
#need to replace this with a more sophisticated error handler
|
|
|
|
#need to replace this with a more sophisticated error handler
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
print node, e
|
|
|
|
print node, e
|
|
|
@ -121,7 +125,8 @@ def is_iterable(x):
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60):
|
|
|
|
def backfill_node(base_dir, node, stream, variants, hours=None, start=None,
|
|
|
|
|
|
|
|
stop=None, recent_cutoff=60, order=None):
|
|
|
|
|
|
|
|
|
|
|
|
# if hours is None, backfill all hourdirs
|
|
|
|
# if hours is None, backfill all hourdirs
|
|
|
|
if hours is None:
|
|
|
|
if hours is None:
|
|
|
@ -139,6 +144,18 @@ def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60
|
|
|
|
now = datetime.datetime.utcnow()
|
|
|
|
now = datetime.datetime.utcnow()
|
|
|
|
hours = [(now - i * timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)]
|
|
|
|
hours = [(now - i * timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if start is not None:
|
|
|
|
|
|
|
|
hours = [hour for hour in hours if hour >= start]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if stop is not None:
|
|
|
|
|
|
|
|
hours = [hour for hour in hours if hour <= stop]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# useful if running in parallel and expecting to fetch a segments from
|
|
|
|
|
|
|
|
# multiple hours (say on start up) so that you don't try to backfill the
|
|
|
|
|
|
|
|
# same hour at the same time
|
|
|
|
|
|
|
|
if order == 'random':
|
|
|
|
|
|
|
|
hours = random.shuffle(hours)
|
|
|
|
|
|
|
|
|
|
|
|
for variant in variants:
|
|
|
|
for variant in variants:
|
|
|
|
|
|
|
|
|
|
|
|
for hour in hours:
|
|
|
|
for hour in hours:
|
|
|
@ -167,7 +184,7 @@ def main(base_dir, stream, variants, fill_wait=5, full_fill_wait=180, sleep_time
|
|
|
|
full_fill_start = fill_start
|
|
|
|
full_fill_start = fill_start
|
|
|
|
|
|
|
|
|
|
|
|
# Do a full backfill at start
|
|
|
|
# Do a full backfill at start
|
|
|
|
backfill(base_dir, stream, variants)
|
|
|
|
backfill(base_dir, stream, variants, order='random')
|
|
|
|
|
|
|
|
|
|
|
|
# I'm sure there is a module that does this in a more robust way
|
|
|
|
# I'm sure there is a module that does this in a more robust way
|
|
|
|
# but I understand this and it gives the behaviour I want
|
|
|
|
# but I understand this and it gives the behaviour I want
|
|
|
|