clean up for new gevent based backfiller.

pull/43/head
Christopher Usher 6 years ago
parent 7d9a5b4626
commit c9f6ee95c5

@ -6,7 +6,7 @@ import errno
import logging
import os
import random
import time
import signal
import uuid
import dateutil.parser
@ -131,25 +131,6 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
segments_backfilled.labels(remote=node, stream=stream, variant=variant, hour=hour).inc()
def backfill_nodes(base_dir, stream, variants, hours=None, nodes=None, start=None):
"""Loop over nodes backfilling from each.
Backfill from node/stream/variants to base_dir/stream/variants for each node
in nodes. If nodes is None, use get_nodes() to get a list of nodes to
backfill from. By default all hours are backfilled. If backfilling from a
node raises an exception, this just goes onto the next node."""
if nodes is None:
nodes = get_nodes()
#ideally do this in parallel
for node in nodes:
try:
backfill(base_dir, node, stream, variants, hours)
except Exception:
logging.exception("Error while backfilling node {}".format(node))
def last_hours(n_hours=3):
"""Return of a list of the last n_hours in descending order."""
if n_hours < 1:
@ -187,7 +168,7 @@ def list_hours(node, stream, variants, order='forward', start=None):
return hours
def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60):
def backfill(base_dir, node, stream, variants, hours, segment_order='random', recent_cutoff=60, stopping=None):
"""Backfill from remote node.
Backfill from node/stream/variants to base_dir/stream/variants for each hour
@ -207,6 +188,9 @@ def backfill(base_dir, node, stream, variants, hours, segment_order='random', re
for hour in hours:
if stopping is not None and stopping.is_set():
return
logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour))
local_segments = set(list_local_segments(base_dir, stream, variant, hour))
@ -251,11 +235,12 @@ class BackfillerManager(object):
NODE_INTERVAL = 5 #minutes between updating list of nodes
def __init__(self, base_dir, stream, variants, input_nodes=None):
def __init__(self, base_dir, stream, variants, nodes=None):
"""Constructor for BackfillerManager."""
self.base_dir = base_dir
self.stream = stream
self.variants = variants
self.nodes = nodes
self.stopping = gevent.event.Event()
self.workers = {} # {node url: worker}
@ -283,7 +268,10 @@ class BackfillerManager(object):
def run(self):
while not self.stopping.is_set():
if self.nodes is None:
new_nodes = set(get_nodes())
else:
new_nodes = self.nodes
exisiting_nodes = set(self.workers.keys())
to_start = new_nodes - exisiting_nodes
@ -334,8 +322,6 @@ class BackfillerWorker(object):
"""Tell the worker to shut down"""
self.stopping.set()
def trigger_new_worker(self):
self.manager.trigger_new_worker(self)
def run(self):
self.logger.info('Worker starting')
@ -350,7 +336,6 @@ class BackfillerWorker(object):
del self.manager.workers[self.node]
def _run(self):
last_small_backfill = datetime.datetime.now() + datetime.timedetla(-1)
last_large_backfill = datetime.datetime.now() + datetime.timedetla(-1)
large_hours = []
@ -360,7 +345,7 @@ class BackfillerWorker(object):
now = datetime.datetime.now()
if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL):
backfill(self.base_dir, self.node, self.stream, self.variants, last_hours())
backfill(self.base_dir, self.node, self.stream, self.variants, last_hours(), stopping=self.stopping)
last_small_backfill = now
elif now - last_large_backfill > datetime.timedelta(minutes=self.LARGE_INTERVAL) or len(large_hours):
@ -370,21 +355,13 @@ class BackfillerWorker(object):
this_hour = large_hours[-1:]
large_hours = large_hours[:-1]
backfill(self.base_dir, self.node, self.stream, self.variants, this_hour)
backfill(self.base_dir, self.node, self.stream, self.variants, this_hour, stopping=self.stopping)
else:
self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0, start=None):
"""Prototype backfiller service.
Do a backfill of the last 3 hours from stream/variants from all nodes
initially before doing a full backfill from all nodes. Then every sleep_time
minutes check to see if more than fill_wait minutes have passed since the
last backfill. If so do a backfill of the last 3 hours. Also check whether
it has been more than full_fill_wait minutes since the last full backfill;
if so, do a full backfill."""
# TODO replace this with a more robust event based service and backfill from multiple nodes in parallel
"""Backfiller service."""
# stretch goal: provide an interface to trigger backfills manually
# stretch goal: use the backfiller to monitor the restreamer
@ -394,6 +371,9 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180,
if start is not None:
start = dateutil.parser.parse(start)
manager = BackfillerManager(base_dir, stream, variants, nodes)
gevent.signal(signal.SIGTERM, manager.stop)
common.PromLogCountsHandler.install()
common.install_stacksampler()
prom.start_http_server(metrics_port)
@ -402,34 +382,5 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180,
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
fill_start = datetime.datetime.now()
full_fill_start = fill_start
backfill(base_dir, stream, variants, 3, nodes=nodes, start=start)
backfill(base_dir, stream, variants, nodes=nodes, start=start)
# I'm sure there is a module that does this in a more robust way
# but I understand this and it gives the behaviour I want
while True:
now = datetime.datetime.now()
if now - full_fill_start > datetime.timedelta(minutes=full_fill_wait):
backfill(base_dir, stream, variants, nodes=nodes, start=start)
fill_start = now
full_fill_start = fill_start
elif now - fill_start > datetime.timedelta(minutes=fill_wait):
backfill(base_dir, stream, variants, 3, nodes=nodes, start=start)
fill_start = now
else:
time.sleep(common.jitter(60 * sleep_time))
manager.run()
logging.info('Gracefully stopped')

Loading…
Cancel
Save