You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
wubloader/segment_coverage/segment_coverage/main.py

594 lines
23 KiB
Python

import errno
import datetime
import itertools
import logging
import os
import signal
import uuid
import argh
import gevent.backdoor
import matplotlib
import matplotlib.image
import numpy as np
import prometheus_client as prom
import common
from common import dateutil
from common import database
segment_count_gauge = prom.Gauge(
'segment_count',
'Number of segments in an hour',
['channel', 'quality', 'hour', 'type'],
)
segment_duration_gauge = prom.Gauge(
'segment_duration',
'Segment duration in an hour',
['channel', 'quality', 'hour', 'type'],
)
raw_coverage_gauge = prom.Gauge(
'raw_coverage',
'Total time covered by segments in an hour',
['channel', 'quality', 'hour'],
)
editable_coverage_gauge = prom.Gauge(
'editable_coverage',
'Non-overlapping time covered by segments in an hour',
['channel', 'quality', 'hour'],
)
raw_holes_gauge = prom.Gauge(
'raw_holes',
'Number of holes in raw coverage for the hour',
['channel', 'quality', 'hour'],
)
editable_holes_gauge = prom.Gauge(
'editable_holes',
'Number of holes in editable coverage for the hour',
['channel', 'quality', 'hour'],
)
overlap_count_gauge = prom.Gauge(
'overlap_count',
'Number of overlap segments for the hour',
['channel', 'quality', 'hour', 'type'],
)
overlap_duration_gauge = prom.Gauge(
'overlap_duration',
'Duration of overlaping segments for the hour',
['channel', 'quality', 'hour', 'type'],
)
HOUR_FMT = '%Y-%m-%dT%H'
class CoverageChecker(object):
"""Checks the segment coverage for a given channel in a a given directoy."""
def __init__(self, channel, qualities, base_dir, first_hour, last_hour,
make_page, connection_string, check_interval):
"""Constructor for CoverageChecker.
Creates a checker for a given channel with specified qualities."""
self.base_dir = base_dir
self.channel = channel
self.qualities = qualities
self.first_hour = first_hour
self.last_hour = last_hour
self.make_page = make_page
self.db_manager = None if connection_string is None else database.DBManager(dsn=connection_string)
self.check_interval = check_interval
self.stopping = gevent.event.Event()
self.logger = logging.getLogger('CoverageChecker({})'.format(channel))
def stop(self):
"""Stop checking coverage."""
self.logger.info('Stopping')
self.stopping.set()
def create_coverage_map(self, quality, all_hour_holes, all_hour_partials,
pixel_length=2, rows=300):
"""Create a PNG image showing segment coverage.
Each pixel repersents pixel_length seconds, with time increasing from
top to bottom along each column then right to left. By default each
pixel is 2 s and each column of the image repersents 10 min. White
pixels have no coverage, orange pixels only have coverage by partial
segments and blue pixels have coverage by full segments. If any part
of a pixel does not have coverage, it is marked as not having coverage.
Likewise, if only a partial segment is available for any part of a
pixel, it is marked as partial.
all_hour_holes -- a dict mapping hours to lists of holes
all_hour_holes -- a dict mapping hours to lists of partial segments
pixel_length -- length of a pixel in seconds
rows -- the height of the image"""
if not all_hour_holes:
self.logger.info('No hours to generate coverage map from')
return
if self.first_hour is None:
first_hour = datetime.datetime.strptime(min(all_hour_holes.keys()), HOUR_FMT)
else:
first_hour = self.first_hour.replace(minute=0, second=0, microsecond=0)
if self.last_hour is None:
last_hour = datetime.datetime.strptime(max(all_hour_holes.keys()), HOUR_FMT)
else:
last_hour = self.last_hour.replace(minute=0, second=0, microsecond=0)
self.logger.info('Creating coverage map for {} from {} to {}'.format(quality,
first_hour.strftime(HOUR_FMT), last_hour.strftime(HOUR_FMT)))
hours = []
latest_hour = first_hour
while latest_hour <= last_hour:
hours.append(latest_hour)
latest_hour += datetime.timedelta(hours = 1)
pixel_starts = np.arange(0, 3600, pixel_length) # start times of the pixels in an hour in seconds
pixel_ends = np.arange(pixel_length, 3601, pixel_length) # end times of the pixels in an hour in seconds
pixel_count = 3600 // pixel_length # number of pixels in an hour
coverage_mask = np.zeros(len(hours) * pixel_count, dtype=np.bool_)
partial_mask = np.zeros(len(hours) * pixel_count, dtype=np.bool_)
for i, hour in enumerate(hours):
hour_str = hour.strftime(HOUR_FMT)
if hour_str in all_hour_holes:
hour_coverage = np.ones(pixel_count, dtype=np.bool_)
hour_partial = np.zeros(pixel_count, dtype=np.bool_)
for hole in all_hour_holes[hour_str]:
hole_start = np.floor((hole[0] - hour).total_seconds() / pixel_length) * pixel_length # the start of the pixel containing the start of the hole
hole_end = np.ceil((hole[1] - hour).total_seconds() / pixel_length) * pixel_length # the end of the pixel containing the end of the hole
hour_coverage = hour_coverage & ((pixel_starts < hole_start) | (pixel_ends > hole_end))
for partial in all_hour_partials[hour_str]:
partial_start = np.floor((partial[0] - hour).total_seconds() / pixel_length) * pixel_length # the start of the pixel containing the start of the partial segment
partial_end = np.ceil((partial[1] - hour).total_seconds() / pixel_length) * pixel_length # the end of the pixel containing the end of the partial segment
hour_partial = hour_partial | ((pixel_starts >= partial_start) & (pixel_ends <= partial_end))
coverage_mask[i * pixel_count:(i + 1) * pixel_count] = hour_coverage
partial_mask[i * pixel_count:(i + 1) * pixel_count] = hour_partial
# convert the flat masks into 2-D arrays
columns = coverage_mask.size // rows
coverage_mask = coverage_mask.reshape((columns, rows)).T
partial_mask = partial_mask.reshape((columns, rows)).T
# use the masks to set the actual pixel colours
colours = np.ones((rows, columns, 3))
colours[coverage_mask] = matplotlib.colors.to_rgb('tab:blue')
colours[coverage_mask & partial_mask] = matplotlib.colors.to_rgb('tab:orange')
# write the pixel array to a temporary file then atomically rename it
path_prefix = os.path.join(self.base_dir, 'coverage-maps', '{}_{}'.format(self.channel, quality))
temp_path = '{}_{}.png'.format(path_prefix, uuid.uuid4())
final_path = '{}_coverage.png'.format(path_prefix)
common.ensure_directory(temp_path)
matplotlib.image.imsave(temp_path, colours)
os.rename(temp_path, final_path)
self.logger.info('Coverage map for {} created'.format(quality))
def create_coverage_page(self, quality):
nodes = {}
try:
connection = self.db_manager.get_conn()
host = [s.split('=')[-1] for s in connection.dsn.split() if 'host' in s][0]
self.logger.info('Fetching list of nodes from {}'.format(host))
results = database.query(connection, """
SELECT name, url
FROM nodes
WHERE backfill_from""")
for row in results:
nodes[row.name] = row.url
except:
self.logger.exception('Getting nodes failed.', exc_info=True)
return
self.logger.info('Nodes fetched: {}'.format(list(nodes.keys())))
html = """<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="refresh" content="30"/>
<title>{0} {1} Segment Coverage Maps</title>
<style>
html {{ background-color: #222;}}
h1 {{ color: #eee;
text-align: center;
font-family: sans-serif;}}
h3 {{ color: #eee;
text-align: center;
font-family: sans-serif;}}
img {{ display: block;
margin-left: auto;
margin-right: auto;}}
</style>
</head>
<body>
<h1>{0} {1}</h1>""".format(self.channel, quality)
for node in sorted(nodes.keys()):
html += """ <h3>{}</h3>
<img src="{}/segments/coverage-maps/{}_{}_coverage.png" alt="{}">
""".format(node, nodes[node], self.channel, quality, node)
html += """ </body>
</html>"""
path_prefix = os.path.join(self.base_dir, 'coverage-maps', '{}_{}'.format(self.channel, quality))
temp_path = '{}_{}.html'.format(path_prefix, uuid.uuid4())
final_path = '{}_coverage.html'.format(path_prefix)
common.ensure_directory(temp_path)
with open(temp_path, 'w') as f:
common.writeall(f.write, html)
os.rename(temp_path, final_path)
self.logger.info('Coverage page for {} created'.format(quality))
def run(self):
"""Loop over available hours for each quality, checking segment coverage."""
self.logger.info('Starting')
while not self.stopping.is_set():
for quality in self.qualities:
if self.stopping.is_set():
break
path = os.path.join(self.base_dir, self.channel, quality)
try:
hours = [name for name in os.listdir(path) if not name.startswith('.')]
except OSError as e:
if e.errno == errno.ENOENT:
self.logger.info('{} does not exist, skipping'.format(path))
continue
hours.sort()
previous_hour_segments = None
all_hour_holes = {}
all_hour_partials = {}
for hour in hours:
# Let other things run, to avoid starving them with CPU-heavy workload
# (in particular the metrics server can have issues responding in time
# otherwise).
gevent.idle()
if self.stopping.is_set():
break
self.logger.info('Checking {}/{}'.format(quality, hour))
# based on common.segments.best_segments_by_start
# but more complicated to capture more detailed metrics
hour_path = os.path.join(self.base_dir, self.channel, quality, hour)
try:
segment_names = [name for name in os.listdir(hour_path) if not name.startswith('.')]
except OSError as e:
if e.errno == errno.ENOENT:
self.logger.warning('Hour {} was deleted between finding it and processing it, ignoring'.format(hour))
continue
segment_names.sort()
parsed = []
bad_segment_count = 0
for name in segment_names:
try:
parsed.append(common.parse_segment_path(os.path.join(hour_path, name)))
except ValueError:
self.logger.warning("Failed to parse segment: {!r}".format(os.path.join(hour_path, name)), exc_info=True)
bad_segment_count += 1
full_segment_count = 0
suspect_segment_count = 0
partial_segment_count = 0
full_segment_duration = datetime.timedelta()
suspect_segment_duration = datetime.timedelta()
partial_segment_duration = datetime.timedelta()
full_overlaps = 0
full_overlap_duration = datetime.timedelta()
suspect_overlaps = 0
suspect_overlap_duration = datetime.timedelta()
partial_overlaps = 0
partial_overlap_duration = datetime.timedelta()
best_segments = []
holes = []
editable_holes = []
previous = None
previous_editable = None
coverage = datetime.timedelta()
editable_coverage = datetime.timedelta()
only_partials = []
# loop over all start times
# first select the best segment for a start time
# then update coverage
for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start):
full_segments = []
suspect_segments = []
partial_segments = []
for segment in segments:
if segment.type == 'full':
full_segments.append(segment)
full_segment_count += 1
full_segment_duration += segment.duration
elif segment.type == 'suspect':
suspect_segments.append(segment)
suspect_segment_count += 1
suspect_segment_duration += segment.duration
elif segment.type == 'partial':
partial_segments.append(segment)
partial_segment_count += 1
partial_segment_duration += segment.duration
if full_segments:
full_segments.sort(key=lambda segment: (segment.duration))
best_segment = full_segments[-1]
for segment in full_segments[:-1]:
full_overlaps += 1
full_overlap_duration += segment.duration
for segment in partial_segments:
partial_overlaps += 1
partial_overlap_duration += segment.duration
elif suspect_segments:
suspect_segments.sort(key=lambda segment: os.stat(segment.path).st_size)
best_segment = suspect_segments[-1]
only_partials.append((best_segment.start, best_segment.start + best_segment.duration))
for segment in suspect_segments[:-1]:
suspect_overlaps += 1
suspect_overlap_duration += segment.duration
elif partial_segments:
partial_segments.sort(key=lambda segment: os.stat(segment.path).st_size)
best_segment = partial_segments[-1]
only_partials.append((best_segment.start, best_segment.start + best_segment.duration))
for segment in partial_segments[:-1]:
partial_overlaps += 1
partial_overlap_duration += segment.duration
else:
# ignore any start times with only temporary segments
continue
self.logger.debug(best_segment.path.split('/')[-1])
best_segments.append(best_segment)
# now update coverage, overlaps and holes
if previous is None:
coverage += best_segment.duration
editable_coverage += best_segment.duration
previous_editable = best_segment
else:
previous_end = previous.start + previous.duration
if segment.start < previous_end:
if segment.type == 'full':
full_overlaps += 1
full_overlap_duration += previous_end - segment.start
elif segment.type == 'suspect':
suspect_overlaps += 1
suspect_overlap_duration += previous_end - segment.start
else:
partial_overlaps += 1
partial_overlap_duration += previous_end - segment.start
coverage += segment.start - previous_end + segment.duration
else:
coverage += segment.duration
editable_coverage += segment.duration
if segment.start > previous_end:
holes.append((previous_end, segment.start))
previous_editable_end = previous_editable.start + previous_editable.duration
if segment.start > previous_editable_end:
editable_holes.append((previous_editable_end, segment.start))
previous_editable = best_segment
previous = best_segment
if best_segments:
start = best_segments[0].start
end = best_segments[-1].start + best_segments[-1].duration
hole_duration = end - start - coverage
editable_hole_duration = end - start - editable_coverage
hour_start = datetime.datetime.strptime(hour, HOUR_FMT)
hour_end = hour_start + datetime.timedelta(hours=1)
# handle the case when there is a hole between the last segment of the previous hour and the first of this
if previous_hour_segments:
last_segment = previous_hour_segments[-1]
if best_segments[0].start > last_segment.start + last_segment.duration:
holes.append((hour_start, start))
hole_duration += start - hour_start
editable_holes.append((hour_start, start))
editable_hole_duration += start - hour_start
# handle the case when there is a hole between the last segment and the end of the hour if not the last hour
if hour != hours[-1] and end < hour_end:
holes.append((end, hour_end))
hole_duration += hour_end - end
editable_holes.append((end, hour_end))
editable_hole_duration += hour_end - end
# update the large number of Prometheus guages
segment_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='full'
).set(full_segment_count)
segment_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='suspect'
).set(suspect_segment_count)
segment_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='partial'
).set(partial_segment_count)
segment_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='bad'
).set(bad_segment_count)
segment_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='full'
).set(full_segment_duration.total_seconds())
segment_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='suspect'
).set(suspect_segment_duration.total_seconds())
segment_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='partial'
).set(partial_segment_duration.total_seconds())
raw_coverage_gauge.labels(
channel=self.channel, quality=quality, hour=hour
).set(coverage.total_seconds())
editable_coverage_gauge.labels(
channel=self.channel, quality=quality, hour=hour
).set(editable_coverage.total_seconds())
raw_holes_gauge.labels(
channel=self.channel, quality=quality, hour=hour
).set(len(holes))
editable_holes_gauge.labels(
channel=self.channel, quality=quality, hour=hour
).set(len(editable_holes))
overlap_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='full'
).set(full_overlaps)
overlap_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='suspect'
).set(suspect_overlaps)
overlap_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='partial'
).set(partial_overlaps)
overlap_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='full'
).set(full_overlap_duration.total_seconds())
overlap_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='suspect'
).set(suspect_overlap_duration.total_seconds())
overlap_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour, type='partial'
).set(partial_overlap_duration.total_seconds())
# log the same information
if best_segments:
self.logger.info('{}/{}: Start: {} End: {} ({} s)'.format(
quality, hour, start, end,
(end - start).total_seconds()))
self.logger.info('{}/{}: {} full segments totalling {} s'.format(
quality, hour, full_segment_count,
full_segment_duration.total_seconds()))
self.logger.info('{}/{}: {} bad segments'.format(
quality, hour, bad_segment_count))
self.logger.info('{}/{}: {} overlapping full segments totalling {} s'.format(
quality, hour, full_overlaps,
full_overlap_duration.total_seconds()))
self.logger.info('{}/{}: {} suspect segments totalling {} s'.format(
quality, hour, suspect_segment_count,
suspect_segment_duration.total_seconds()))
self.logger.info('{}/{}: {} overlapping suspect segments totalling {} s'.format(
quality, hour, suspect_overlaps,
suspect_overlap_duration.total_seconds()))
self.logger.info('{}/{}: {} partial segments totalling {} s'.format(
quality, hour, partial_segment_count,
partial_segment_duration.total_seconds()))
self.logger.info('{}/{}: {} overlapping partial segments totalling {} s'.format(
quality, hour, partial_overlaps,
partial_overlap_duration.total_seconds()))
self.logger.info('{}/{}: raw coverage {} s, editable coverage {} s '.format(
quality, hour, coverage.total_seconds(),
editable_coverage.total_seconds()))
self.logger.info('{}/{}: {} holes totalling {} s '.format(
quality, hour, len(holes),
hole_duration.total_seconds()))
self.logger.info('{}/{}: {} editable holes totalling {} s '.format(
quality, hour, len(editable_holes),
editable_hole_duration.total_seconds()))
self.logger.info('Checking {}/{} complete'.format(
quality, hour))
# add holes for the start and end hours for the
# coverage map. do this after updating gauges and
# logging as these aren't likely real holes, just the
# start and end of the stream.
if previous_hour_segments is None:
holes.append((hour_start, start))
if hour == hours[-1]:
holes.append((end, hour_end))
all_hour_holes[hour] = holes
all_hour_partials[hour] = only_partials
previous_hour_segments = best_segments
else:
self.logger.info('{}/{} is empty'.format(quality, hour))
self.create_coverage_map(quality, all_hour_holes, all_hour_partials)
if self.make_page:
self.create_coverage_page(quality)
self.stopping.wait(common.jitter(self.check_interval))
@argh.arg('channels', nargs='*', help='Channels to check coverage of')
@argh.arg('--base-dir', help='Directory where segments are stored. Default is current working directory.')
@argh.arg('--qualities', help="Qualities of each channel to checked. Comma seperated if multiple. Default is 'source'.")
@argh.arg('--first-hour', help='First hour to compute coverage for. Default is earliest available hour.')
@argh.arg('--last-hour', help='Last hour to compute coverage for. Default is lastest available hour.')
@argh.arg('--make-page', help='Make a html page displaying coverage maps for all nodes in database')
@argh.arg('--connection-string', help='Postgres connection string, which is either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE')
@argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.')
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
@argh.arg('--check-interval', help='How many seconds to wait in between doing checks.')
def main(channels, base_dir='.', qualities='source', first_hour=None,
last_hour=None, make_page=False, connection_string=None,
metrics_port=8006, backdoor_port=0, check_interval=300):
"""Segment coverage service"""
qualities = qualities.split(',') if qualities else []
qualities = [quality.strip() for quality in qualities]
if first_hour is not None:
first_hour = dateutil.parse(first_hour)
if last_hour is not None:
last_hour = dateutil.parse(last_hour)
common.PromLogCountsHandler.install()
common.install_stacksampler()
prom.start_http_server(metrics_port)
managers = []
workers = []
for channel in channels:
logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir))
manager = CoverageChecker(channel, qualities, base_dir, first_hour,
last_hour, make_page, connection_string, check_interval)
managers.append(manager)
workers.append(gevent.spawn(manager.run))
def stop():
for manager in managers:
manager.stop()
gevent.signal_handler(signal.SIGTERM, stop)
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
# Wait for any to die
gevent.wait(workers, count=1)
# If one has stopped, either:
# 1. stop() was called and all are stopping
# 2. one errored and we should stop all remaining and report the error
# Our behaviour in both cases is the same:
# 1. Tell all managers to gracefully stop
stop()
# 2. Wait (with timeout) until they've stopped
gevent.wait(workers)
# 3. Check if any of them failed. If they did, report it. If mulitple
# failed, we report one arbitrarily.
for worker in workers:
worker.get()
logging.info('Gracefully stopped')