|
|
|
@ -142,14 +142,18 @@ def list_hours(node, stream, variants, start=None):
|
|
|
|
|
ordered from newest to oldest.
|
|
|
|
|
|
|
|
|
|
Keyword arguments:
|
|
|
|
|
start -- Only return hours after this time. If None (default), all hours are
|
|
|
|
|
returned."""
|
|
|
|
|
start -- If a datetime return hours after that datetime. If a number,
|
|
|
|
|
return hours more recent than that number of hours ago. If None (default),
|
|
|
|
|
all hours are returned."""
|
|
|
|
|
|
|
|
|
|
hour_lists = [list_remote_hours(node, stream, variant) for variant in variants]
|
|
|
|
|
hours = list(set().union(*hour_lists))
|
|
|
|
|
hours.sort(reverse=True) #latest hour first
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if start is not None:
|
|
|
|
|
if not isinstance(start, datetime.datetime):
|
|
|
|
|
start = datetime.datetime.utcnow() - datetime.timedelta(hours=start)
|
|
|
|
|
hours = [hour for hour in hours if datetime.datetime.strptime(hour, HOUR_FMT) < start]
|
|
|
|
|
|
|
|
|
|
return hours
|
|
|
|
@ -329,8 +333,12 @@ def main(base_dir='.', stream='', variants='', metrics_port=8002,
|
|
|
|
|
|
|
|
|
|
variants = variants.split(',') if variants else []
|
|
|
|
|
static_nodes = static_nodes.split(',') if static_nodes else []
|
|
|
|
|
|
|
|
|
|
if start is not None:
|
|
|
|
|
start = dateutil.parser.parse(start)
|
|
|
|
|
try:
|
|
|
|
|
start = float(start)
|
|
|
|
|
except ValueError:
|
|
|
|
|
start = dateutil.parser.parse(start)
|
|
|
|
|
|
|
|
|
|
manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once)
|
|
|
|
|
gevent.signal(signal.SIGTERM, manager.stop)
|
|
|
|
|