|
|
@ -74,7 +74,7 @@ class CoverageChecker(object):
|
|
|
|
"""Checks the segment coverage for a given channel in a a given directoy."""
|
|
|
|
"""Checks the segment coverage for a given channel in a a given directoy."""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, channel, qualities, base_dir, first_hour, last_hour,
|
|
|
|
def __init__(self, channel, qualities, base_dir, first_hour, last_hour,
|
|
|
|
make_page, connection_string, check_interval):
|
|
|
|
make_page, connection_string, check_interval, output_dir, run_once):
|
|
|
|
"""Constructor for CoverageChecker.
|
|
|
|
"""Constructor for CoverageChecker.
|
|
|
|
|
|
|
|
|
|
|
|
Creates a checker for a given channel with specified qualities."""
|
|
|
|
Creates a checker for a given channel with specified qualities."""
|
|
|
@ -87,6 +87,8 @@ class CoverageChecker(object):
|
|
|
|
self.make_page = make_page
|
|
|
|
self.make_page = make_page
|
|
|
|
self.db_manager = None if connection_string is None else database.DBManager(dsn=connection_string)
|
|
|
|
self.db_manager = None if connection_string is None else database.DBManager(dsn=connection_string)
|
|
|
|
self.check_interval = check_interval
|
|
|
|
self.check_interval = check_interval
|
|
|
|
|
|
|
|
self.output_dir = output_dir
|
|
|
|
|
|
|
|
self.run_once = run_once
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.logger = logging.getLogger('CoverageChecker({})'.format(channel))
|
|
|
|
self.logger = logging.getLogger('CoverageChecker({})'.format(channel))
|
|
|
|
|
|
|
|
|
|
|
@ -172,7 +174,7 @@ class CoverageChecker(object):
|
|
|
|
colours[coverage_mask] = matplotlib.colors.to_rgb('tab:blue')
|
|
|
|
colours[coverage_mask] = matplotlib.colors.to_rgb('tab:blue')
|
|
|
|
colours[coverage_mask & partial_mask] = matplotlib.colors.to_rgb('tab:orange')
|
|
|
|
colours[coverage_mask & partial_mask] = matplotlib.colors.to_rgb('tab:orange')
|
|
|
|
# write the pixel array to a temporary file then atomically rename it
|
|
|
|
# write the pixel array to a temporary file then atomically rename it
|
|
|
|
path_prefix = os.path.join(self.base_dir, 'coverage-maps', '{}_{}'.format(self.channel, quality))
|
|
|
|
path_prefix = os.path.join(self.base_dir, self.output_dir, '{}_{}'.format(self.channel, quality))
|
|
|
|
temp_path = '{}_{}.png'.format(path_prefix, uuid.uuid4())
|
|
|
|
temp_path = '{}_{}.png'.format(path_prefix, uuid.uuid4())
|
|
|
|
final_path = '{}_coverage.png'.format(path_prefix)
|
|
|
|
final_path = '{}_coverage.png'.format(path_prefix)
|
|
|
|
common.ensure_directory(temp_path)
|
|
|
|
common.ensure_directory(temp_path)
|
|
|
@ -232,13 +234,13 @@ class CoverageChecker(object):
|
|
|
|
|
|
|
|
|
|
|
|
for node in sorted(nodes.keys()):
|
|
|
|
for node in sorted(nodes.keys()):
|
|
|
|
html += """ <h3>{}</h3>
|
|
|
|
html += """ <h3>{}</h3>
|
|
|
|
<img src="{}/segments/coverage-maps/{}_{}_coverage.png" alt="{}">
|
|
|
|
<img src="{}/segments/{}/{}_{}_coverage.png" alt="{}">
|
|
|
|
""".format(node, nodes[node], self.channel, quality, node)
|
|
|
|
""".format(node, nodes[node], self.output_dir, self.channel, quality, node)
|
|
|
|
|
|
|
|
|
|
|
|
html += """ </body>
|
|
|
|
html += """ </body>
|
|
|
|
</html>"""
|
|
|
|
</html>"""
|
|
|
|
|
|
|
|
|
|
|
|
path_prefix = os.path.join(self.base_dir, 'coverage-maps', '{}_{}'.format(self.channel, quality))
|
|
|
|
path_prefix = os.path.join(self.base_dir, self.output_dir, '{}_{}'.format(self.channel, quality))
|
|
|
|
temp_path = '{}_{}.html'.format(path_prefix, uuid.uuid4())
|
|
|
|
temp_path = '{}_{}.html'.format(path_prefix, uuid.uuid4())
|
|
|
|
final_path = '{}_coverage.html'.format(path_prefix)
|
|
|
|
final_path = '{}_coverage.html'.format(path_prefix)
|
|
|
|
common.ensure_directory(temp_path)
|
|
|
|
common.ensure_directory(temp_path)
|
|
|
@ -534,6 +536,9 @@ class CoverageChecker(object):
|
|
|
|
if self.make_page:
|
|
|
|
if self.make_page:
|
|
|
|
self.create_coverage_page(quality)
|
|
|
|
self.create_coverage_page(quality)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.run_once:
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
self.stopping.wait(common.jitter(self.check_interval))
|
|
|
|
self.stopping.wait(common.jitter(self.check_interval))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -547,9 +552,11 @@ class CoverageChecker(object):
|
|
|
|
@argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.')
|
|
|
|
@argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.')
|
|
|
|
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
|
|
|
|
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
|
|
|
|
@argh.arg('--check-interval', help='How many seconds to wait in between doing checks.')
|
|
|
|
@argh.arg('--check-interval', help='How many seconds to wait in between doing checks.')
|
|
|
|
|
|
|
|
@argh.arg('--output-dir', help='Where to write coverage maps, relative to base dir.')
|
|
|
|
|
|
|
|
@argh.arg('--run-once', help='Exit after completing checks once.')
|
|
|
|
def main(channels, base_dir='.', qualities='source', first_hour=None,
|
|
|
|
def main(channels, base_dir='.', qualities='source', first_hour=None,
|
|
|
|
last_hour=None, make_page=False, connection_string=None,
|
|
|
|
last_hour=None, make_page=False, connection_string=None,
|
|
|
|
metrics_port=8006, backdoor_port=0, check_interval=300):
|
|
|
|
metrics_port=8006, backdoor_port=0, check_interval=300, output_dir="coverage-maps", run_once=False):
|
|
|
|
"""Segment coverage service"""
|
|
|
|
"""Segment coverage service"""
|
|
|
|
|
|
|
|
|
|
|
|
qualities = qualities.split(',') if qualities else []
|
|
|
|
qualities = qualities.split(',') if qualities else []
|
|
|
@ -568,7 +575,7 @@ def main(channels, base_dir='.', qualities='source', first_hour=None,
|
|
|
|
for channel in channels:
|
|
|
|
for channel in channels:
|
|
|
|
logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir))
|
|
|
|
logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir))
|
|
|
|
manager = CoverageChecker(channel, qualities, base_dir, first_hour,
|
|
|
|
manager = CoverageChecker(channel, qualities, base_dir, first_hour,
|
|
|
|
last_hour, make_page, connection_string, check_interval)
|
|
|
|
last_hour, make_page, connection_string, check_interval, output_dir, run_once)
|
|
|
|
managers.append(manager)
|
|
|
|
managers.append(manager)
|
|
|
|
workers.append(gevent.spawn(manager.run))
|
|
|
|
workers.append(gevent.spawn(manager.run))
|
|
|
|
|
|
|
|
|
|
|
@ -586,13 +593,14 @@ def main(channels, base_dir='.', qualities='source', first_hour=None,
|
|
|
|
# If one has stopped, either:
|
|
|
|
# If one has stopped, either:
|
|
|
|
# 1. stop() was called and all are stopping
|
|
|
|
# 1. stop() was called and all are stopping
|
|
|
|
# 2. one errored and we should stop all remaining and report the error
|
|
|
|
# 2. one errored and we should stop all remaining and report the error
|
|
|
|
# Our behaviour in both cases is the same:
|
|
|
|
# 3. We were only told to run once, and it's finished.
|
|
|
|
# 1. Tell all managers to gracefully stop
|
|
|
|
# For cases 1 & 2, tell everyone else to stop. In the run-once case, let everyone else finish naturally.
|
|
|
|
stop()
|
|
|
|
if not run_once:
|
|
|
|
# 2. Wait (with timeout) until they've stopped
|
|
|
|
stop()
|
|
|
|
|
|
|
|
# Then wait until they've stopped
|
|
|
|
gevent.wait(workers)
|
|
|
|
gevent.wait(workers)
|
|
|
|
# 3. Check if any of them failed. If they did, report it. If mulitple
|
|
|
|
# Finally, check if any of them failed. If they did, report it. If mulitple
|
|
|
|
# failed, we report one arbitrarily.
|
|
|
|
# failed, we report one arbitrarily.
|
|
|
|
for worker in workers:
|
|
|
|
for worker in workers:
|
|
|
|
worker.get()
|
|
|
|
worker.get()
|
|
|
|
|
|
|
|
|
|
|
|