fixes and improvements suggested by ekimekim

* simplied the backfiller local - now just a full backfill every couple minuteso
pull/43/head
Christopher Usher 6 years ago
parent 4eac6189ce
commit 09368d92e1

@ -25,7 +25,7 @@ segments_backfilled = prom.Counter(
HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5 #default timeout for remote requests
TIMEOUT = 5 #default timeout in seconds for remote requests
def encode_strings(o):
if isinstance(o, list):
@ -44,13 +44,11 @@ def get_nodes():
# either read a config file or query the database to get the addresses
# of the other nodes
# figure out some way that the local machine isn't in the list of returned
# nodes so that
# as a prototype can just hardcode some addresses.
# nodes
logging.info('Fetching list of other nodes')
nodes = ['http://toodles.videostrike.team:1337/']
#nodes = []
nodes = []
return nodes
@ -133,38 +131,27 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
def last_hours(n_hours=3):
"""Return of a list of the last n_hours in descending order."""
if n_hours < 1:
raise ValueError('Number of hours has to be 1 or greater')
now = datetime.datetime.utcnow()
return [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)]
def list_hours(node, stream, variants, order='forward', start=None):
def list_hours(node, stream, variants, start=None):
"""Return a list of all available hours from a node.
List all hours available from node/stream for each variant in variants.
List all hours available from node/stream for each variant in variants
ordered from newest to oldest.
Keyword arguments:
order -- If 'random', randomise the order of segments. If 'forward', sort
the hours in ascending order. If 'reverse' (default), sort the
hours in descending order. Otherwise, do not change the order of the
hours.
start -- Only return hours after this time. If None (default), all hours are
returned."""
hour_lists = [list_remote_hours(node, stream, variant) for variant in variants]
hours = list(set().union(*hour_lists))
hours.sort(reverse=True) #latest hour first
if start is not None:
hours = [hour for hour in hours if datetime.datetime.strptime(hour, HOUR_FMT) < start]
if order == 'random':
random.shuffle(hours)
elif order == 'forward':
hours.sort()
elif order == 'reverse':
hours.sort(reverse=True)
return hours
@ -172,63 +159,62 @@ class BackfillerManager(object):
"""Manages BackfillerWorkers to backfill from a pool of nodes.
The manager regularly calls get_nodes to an up to date list of nodes. If no
worker exists for a node, the manager starts one. If a worker corresponds to
a node not in the list, the manager stops it."""
worker exists for a node in this list or in the static_node list, the
manager starts one. If a worker corresponds to a node not in either list,
the manager stops it. If run_once, only backfill once."""
NODE_INTERVAL = 1 #minutes between updating list of nodes
NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, stream, variants, nodes=None):
"""Constructor for BackfillerManager."""
def __init__(self, base_dir, stream, variants, static_nodes=[], run_once=False):
"""Constructor for BackfillerManager.
Creates a manager for a given stream with specified variants. If
static_nodes is None, manager"""
self.base_dir = base_dir
self.stream = stream
self.variants = variants
self.nodes = nodes
self.static_nodes = static_nodes
self.run_once = run_once
self.stopping = gevent.event.Event()
self.logger = logging.getLogger("BackfillerManager({})".format(stream))
self.workers = {} # {node url: worker}
def stop(self):
"""Shut down all workers and stop backfilling."""
self.logger.info('Stopping')
for worker in self.workers:
worker.stop()
self.stopping.set()
def start_worker(self, node):
"""Start a new worker for given node."""
worker = BackfillerWorker(self, self.base_dir, node, self.stream, self.variants)
if node in self.workers:
self.workers[node].stop() #only one worker per node
worker = BackfillerWorker(self, self.base_dir, node, self.stream, self.variants, self)
assert node not in self.workers, "Tried to start worker for node {!r} that already has one".format(node)
self.workers[node] = worker
gevent.spawn(worker.run)
def stop_worker(self, node):
"""Stop the worker for given node."""
self.workers[node].stop()
del self.workers[node]
self.workers.pop(node).stop()
def run(self):
while not self.stopping.is_set():
if self.nodes is None:
new_nodes = set(get_nodes())
else:
new_nodes = set(self.nodes)
new_nodes = set(get_nodes() + self.static_nodes)
exisiting_nodes = set(self.workers.keys())
to_start = new_nodes - exisiting_nodes
for node in to_start:
self.start_worker(node)
to_stop = exisiting_nodes - new_nodes
for node in to_stop:
self.stop_worker(node)
self.stopping.wait(common.jitter(self.NODE_INTERVAL * 60))
if self.run_once:
break
self.stopping.wait(common.jitter(self.NODE_INTERVAL))
else:
self.stop()
for worker in self.workers:
worker.stop()
for worker in self.workers:
worker.done.wait()
@ -236,39 +222,31 @@ class BackfillerManager(object):
class BackfillerWorker(object):
"""Backfills segments from a node.
Backfills all segments from node/stream to base_dir/stream for all variants.
Every SMALL_INTERVAL minutes backfill the last three hours starting from the
most recent one (a 'small backfill'). When not doing a small backfill,
backfill all segments starting with the most recent one (a 'large backfill')
unless a large backfill has occured less than LARGE_INTERVAL ago."""
Backfills every WAIT_INTERVAL all segments from node/stream to
base_dir/stream for all variants. If run_once, only backfill once."""
SMALL_INTERVAL = 5 #minutes between small backfills
LARGE_INTERVAL = 60 #minutes between large backfills
WAIT_INTERVAL = 1 #seconds between backfill actions
WAIT_INTERVAL = 120 #seconds between backfills
def __init__(self, manager, base_dir, node, stream, variants):
"""Constructor for BackfillerWorker"""
def __init__(self, manager, base_dir, node, stream, variants, run_once=False):
self.manager = manager
self.logger = manager.logger.getChild('BackfillerWorker({})'.format(node))
self.base_dir = base_dir
self.node = node
self.stream = stream
self.variants = variants
self.run_once = run_once
self.stopping = gevent.event.Event()
self.done = gevent.event.Event()
def __repr__(self):
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream)
__str__ = __repr__
def stop(self):
"""Tell the worker to shut down"""
self.logger.info('Stopping')
self.stopping.set()
def backfill(self, hours, segment_order='random', recent_cutoff=60):
"""Backfill from remote node.
@ -325,60 +303,31 @@ class BackfillerWorker(object):
get_remote_segment(self.base_dir, self.node, self.stream, variant, hour, missing_segment)
self.logger.info('{} segments in {}/{} backfilled'.format(len(missing_segments), variant, hour))
def run(self):
self.logger.info('Worker starting')
try:
self._run()
except Exception:
self.logger.exception('Worker failed')
else:
self.logger.info('Worker stopped')
finally:
self.done.set()
del self.manager.workers[self.node]
def _run(self):
last_small_backfill = datetime.datetime.now() + datetime.timedelta(-1)
last_large_backfill = datetime.datetime.now() + datetime.timedelta(-1)
large_hours = []
while not self.stopping.is_set():
self.backfill(list_hours(self.node, self.stream, self.variants))
self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
now = datetime.datetime.now()
if now - last_small_backfill > datetime.timedelta(minutes=self.SMALL_INTERVAL):
self.logger.info('Starting backfilling last 3 hours')
self.backfill(last_hours())
self.logger.info('Finished backfilling last 3 hours')
last_small_backfill = now
elif now - last_large_backfill > datetime.timedelta(minutes=self.LARGE_INTERVAL) or len(large_hours):
if not len(large_hours):
large_hours = list_hours(self.node, self.stream, self.variants)
last_large_backfill = now
if self.run_once:
break
this_hour = large_hours[-1:]
large_hours = large_hours[:-1]
self.logger.info('Starting full backfill hour: {}'.format(this_hour[0]))
self.backfill(this_hour)
self.logger.info('Finished full backfill hour: {}'.format(this_hour[0]))
else:
self.stopping.wait(common.jitter(self.WAIT_INTERVAL))
self.logger.info('Worker stopped')
self.done.set()
del self.manager.workers[self.node]
def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0, start=None):
def main(base_dir='.', stream='', variants='', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, run_once=False):
"""Backfiller service."""
# stretch goal: provide an interface to trigger backfills manually
# stretch goal: use the backfiller to monitor the restreamer
variants = variants.split(',') if variants else []
if nodes is not None:
nodes = nodes.split(',') if nodes else []
static_nodes = static_nodes.split(',') if static_nodes else []
if start is not None:
start = dateutil.parser.parse(start)
manager = BackfillerManager(base_dir, stream, variants, nodes)
manager = BackfillerManager(base_dir, stream, variants, static_nodes, run_once)
gevent.signal(signal.SIGTERM, manager.stop)
common.PromLogCountsHandler.install()

Loading…
Cancel
Save