fixes based on ekim's suggestions

pull/128/head
Christopher Usher 5 years ago
parent bea876e0cc
commit a1880b2414

@ -2,10 +2,11 @@ import datetime
import itertools import itertools
import logging import logging
import os import os
import random
import signal import signal
import uuid
import argh import argh
import dateutil
import gevent.backdoor import gevent.backdoor
import matplotlib import matplotlib
import matplotlib.image import matplotlib.image
@ -13,47 +14,30 @@ import numpy as np
import prometheus_client as prom import prometheus_client as prom
import common import common
from common import dateutil
full_segment_count_gauge = prom.Gauge( segment_count_gauge = prom.Gauge(
'full_segment_count', 'segment_count',
'Number of full segments in an hour', 'Number of segments in an hour',
['channel', 'quality', 'hour'], ['channel', 'quality', 'hour', 'type'],
)
partial_segment_count_gauge = prom.Gauge(
'partial_segment_count',
'Number of partial segments in an hour',
['channel', 'quality', 'hour'],
)
bad_segment_count_gauge = prom.Gauge(
'bad_segment_count',
'Number of segments that fail to parse in an hour',
['channel', 'quality', 'hour'],
) )
full_segment_duration_gauge = prom.Gauge( segment_duration_gauge = prom.Gauge(
'full_segment_duration', 'segment_duration',
'Full segment duration in an hour', 'Segment duration in an hour',
['channel', 'quality', 'hour'], ['channel', 'quality', 'hour', 'type'],
)
partial_segment_duration_gauge = prom.Gauge(
'partial_segment_duration',
'Partial segment duration in an hour',
['channel', 'quality', 'hour'],
) )
raw_coverage_gauge = prom.Gauge( raw_coverage_gauge = prom.Gauge(
'raw_coverage', 'raw_coverage',
'Raw coverage for the hour', 'Total time covered by segments in an hour',
['channel', 'quality', 'hour'], ['channel', 'quality', 'hour'],
) )
editable_coverage_gauge = prom.Gauge( editable_coverage_gauge = prom.Gauge(
'editable_coverage', 'editable_coverage',
'Editable coverage for the hour', 'Non-overlapping time covered by segments in an hour',
['channel', 'quality', 'hour'], ['channel', 'quality', 'hour'],
) )
@ -64,33 +48,21 @@ raw_holes_gauge = prom.Gauge(
) )
editable_holes_gauge = prom.Gauge( editable_holes_gauge = prom.Gauge(
'editable_hole', 'editable_holes',
'Number of holes in editable coverage for the hour', 'Number of holes in editable coverage for the hour',
['channel', 'quality', 'hour'], ['channel', 'quality', 'hour'],
) )
full_overlap_count_gauge = prom.Gauge( overlap_count_gauge = prom.Gauge(
'full_overlap_count', 'overlap_count',
'Number of overlap full segments for the hour', 'Number of overlap segments for the hour',
['channel', 'quality', 'hour'], ['channel', 'quality', 'hour', 'type'],
)
partial_overlap_count_gauge = prom.Gauge(
'partial_overlap_count',
'Number of overlap partial segments for the hour',
['channel', 'quality', 'hour'],
) )
full_overlap_duration_gauge = prom.Gauge( overlap_duration_gauge = prom.Gauge(
'full_overlap_duration', 'overlap_duration',
'Duration of overlaping full segments for the hour', 'Duration of overlaping segments for the hour',
['channel', 'quality', 'hour'], ['channel', 'quality', 'hour', 'type'],
)
partial_overlap_duration_gauge = prom.Gauge(
'partial_overlap_duration',
'Duration of overlaping partial segments for the hour',
['channel', 'quality', 'hour'],
) )
@ -101,24 +73,28 @@ class CoverageChecker(object):
CHECK_INTERVAL = 60 #seconds between checking coverage CHECK_INTERVAL = 60 #seconds between checking coverage
def __init__(self, channel, qualities, base_dir): def __init__(self, channel, qualities, base_dir, first_hour, last_hour):
"""Constructor for CoverageChecker. """Constructor for CoverageChecker.
Creates a checker for a given channel with specified qualities.""" Creates a checker for a given channel with specified qualities."""
self.base_dir = base_dir self.base_dir = base_dir
self.channel = channel self.channel = channel
self.qualities = qualities self.qualities = qualities
self.first_hour = first_hour
self.last_hour = last_hour
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.logger = logging.getLogger('CoverageChecker({})'.format(channel)) self.logger = logging.getLogger('CoverageChecker({})'.format(channel))
def stop(self): def stop(self):
"""Stop checking coverage.""" """Stop checking coverage."""
self.logger.info('Stopping') self.logger.info('Stopping')
self.stopping.set() self.stopping.set()
def create_coverage_map(self, quality, all_hour_holes, all_hour_partials, def create_coverage_map(self, quality, all_hour_holes, all_hour_partials,
pixel_length=2, hour_count=168, rows=300): pixel_length=2, rows=300):
"""Create a PNG image showing segment coverage. """Create a PNG image showing segment coverage.
Each pixel repersents pixel_length seconds, with time increasing from Each pixel repersents pixel_length seconds, with time increasing from
@ -133,22 +109,32 @@ class CoverageChecker(object):
all_hour_holes -- a dict mapping hours to lists of holes all_hour_holes -- a dict mapping hours to lists of holes
all_hour_holes -- a dict mapping hours to lists of partial segments all_hour_holes -- a dict mapping hours to lists of partial segments
pixel_length -- length of a pixel in seconds pixel_length -- length of a pixel in seconds
hour_count -- number of hours to create the map for
rows -- the height of the image""" rows -- the height of the image"""
self.logger.info('Creating coverage map for {}'.format(quality))
latest_hour = datetime.datetime.strptime(max(all_hour_holes.keys()), HOUR_FMT) if self.first_hour is None:
hours = [latest_hour - datetime.timedelta(hours=i) for i in range(hour_count - 1, -1, -1)] first_hour = datetime.datetime.strptime(min(all_hour_holes.keys()), HOUR_FMT)
else:
first_hour = self.first_hour.replace(minute=0, second=0, microsecond=0)
if self.last_hour is None:
last_hour = datetime.datetime.strptime(max(all_hour_holes.keys()), HOUR_FMT)
else:
last_hour = self.last_hour.replace(minute=0, second=0, microsecond=0)
self.logger.info('Creating coverage map for {} from {} to {}'.format(quality,
first_hour.strftime(HOUR_FMT), last_hour.strftime(HOUR_FMT)))
hours = [first_hour]
latest_hour = first_hour
while latest_hour < last_hour:
latest_hour += datetime.timedelta(hours = 1)
hours.append(latest_hour)
pixel_starts = np.arange(0, 3600, pixel_length) # start times of the pixels in an hour in seconds pixel_starts = np.arange(0, 3600, pixel_length) # start times of the pixels in an hour in seconds
pixel_ends = np.arange(pixel_length, 3601, pixel_length) # end times of the pixels in an hour in seconds pixel_ends = np.arange(pixel_length, 3601, pixel_length) # end times of the pixels in an hour in seconds
pixel_count = 3600 / pixel_length # number of pixels in an hour pixel_count = 3600 / pixel_length # number of pixels in an hour
coverage_mask = np.zeros(hour_count * pixel_count, dtype=np.bool_) coverage_mask = np.zeros(len(hours) * pixel_count, dtype=np.bool_)
partial_mask = np.zeros(hour_count * pixel_count, dtype=np.bool_) partial_mask = np.zeros(len(hours) * pixel_count, dtype=np.bool_)
for i in range(len(hours)): for i, hour in enumerate(hours):
hour = hours[i]
hour_str = hour.strftime(HOUR_FMT) hour_str = hour.strftime(HOUR_FMT)
if hour_str in all_hour_holes: if hour_str in all_hour_holes:
@ -178,9 +164,9 @@ class CoverageChecker(object):
colours[coverage_mask] = matplotlib.colors.to_rgb('tab:blue') colours[coverage_mask] = matplotlib.colors.to_rgb('tab:blue')
colours[coverage_mask & partial_mask] = matplotlib.colors.to_rgb('tab:orange') colours[coverage_mask & partial_mask] = matplotlib.colors.to_rgb('tab:orange')
# write the pixel array to a temporary file then atomically rename it # write the pixel array to a temporary file then atomically rename it
final_path = os.path.join(self.base_dir, 'coverage-maps', path_prefix = os.path.join(self.base_dir, 'coverage-maps', '{}_{}'.format(self.channel, quality))
'{}_{}_coverage.png'.format(self.channel, quality)) temp_path = '{}_{}.png'.format(path_prefix, uuid.uuid4())
temp_path = final_path.replace('_coverage', '_{}'.format(random.getrandbits(32))) final_path = '{}_coverage.png'.format(path_prefix)
common.ensure_directory(temp_path) common.ensure_directory(temp_path)
matplotlib.image.imsave(temp_path, colours) matplotlib.image.imsave(temp_path, colours)
os.rename(temp_path, final_path) os.rename(temp_path, final_path)
@ -218,8 +204,8 @@ class CoverageChecker(object):
for name in segment_names: for name in segment_names:
try: try:
parsed.append(common.parse_segment_path(os.path.join(hour_path, name))) parsed.append(common.parse_segment_path(os.path.join(hour_path, name)))
except ValueError as e: except ValueError:
self.logger.warn(e) self.logger.warning("Failed to parse segment: {!r}".format(os.path.join(hour_path, name)), exc_info=True)
bad_segment_count += 1 bad_segment_count += 1
full_segment_count = 0 full_segment_count = 0
@ -255,20 +241,16 @@ class CoverageChecker(object):
partial_segment_count += 1 partial_segment_count += 1
partial_segment_duration += segment.duration partial_segment_duration += segment.duration
if full_segments: if full_segments:
if len(full_segments) == 1: full_segments.sort(key=lambda segment: (segment.duration))
best_segment = full_segments[0] best_segment = full_segments[-1]
else: for segment in full_segments[:-1]:
full_segments.sort(key=lambda segment: (segment.duration)) full_overlaps += 1
best_segment = full_segments[-1] full_overlap_duration += segment.duration
for segment in full_segments[:-1]: for segment in partial_segments:
full_overlaps += 1 partial_overlaps += 1
full_overlap_duration += segment.duration partial_overlap_duration += segment.duration
if partial_segments:
for segment in partial_segments:
partial_overlaps += 1
partial_overlap_duration += segment.duration
else: else:
partial_segments.sort(key=lambda segment: (segment.duration)) partial_segments.sort(key=lambda segment: os.stat(segment.path).st_size)
best_segment = partial_segments[-1] best_segment = partial_segments[-1]
only_partials.append((best_segment.start, best_segment.start + best_segment.duration)) only_partials.append((best_segment.start, best_segment.start + best_segment.duration))
for segment in partial_segments[:-1]: for segment in partial_segments[:-1]:
@ -332,20 +314,20 @@ class CoverageChecker(object):
editable_hole_duration += hour_end - end editable_hole_duration += hour_end - end
# update the large number of Prometheus guages # update the large number of Prometheus guages
full_segment_count_gauge.labels( segment_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='full'
).set(full_segment_count) ).set(full_segment_count)
partial_segment_count_gauge.labels( segment_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='partial'
).set(partial_segment_count) ).set(partial_segment_count)
bad_segment_count_gauge.labels( segment_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='bad'
).set(bad_segment_count) ).set(bad_segment_count)
full_segment_duration_gauge.labels( segment_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='full'
).set(full_segment_duration.total_seconds()) ).set(full_segment_duration.total_seconds())
partial_segment_duration_gauge.labels( segment_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='partial'
).set(partial_segment_duration.total_seconds()) ).set(partial_segment_duration.total_seconds())
raw_coverage_gauge.labels( raw_coverage_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour
@ -359,17 +341,17 @@ class CoverageChecker(object):
editable_holes_gauge.labels( editable_holes_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour
).set(len(editable_holes)) ).set(len(editable_holes))
full_overlap_count_gauge.labels( overlap_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='full'
).set(full_overlaps) ).set(full_overlaps)
partial_overlap_count_gauge.labels( overlap_count_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='partial'
).set(partial_overlaps) ).set(partial_overlaps)
full_overlap_duration_gauge.labels( overlap_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='full'
).set(full_overlap_duration.total_seconds()) ).set(full_overlap_duration.total_seconds())
partial_overlap_duration_gauge.labels( overlap_duration_gauge.labels(
channel=self.channel, quality=quality, hour=hour channel=self.channel, quality=quality, hour=hour, type='partial'
).set(partial_overlap_duration.total_seconds()) ).set(partial_overlap_duration.total_seconds())
# log the same information # log the same information
@ -429,13 +411,20 @@ class CoverageChecker(object):
@argh.arg('channels', nargs='*', help='Channels to check coverage of') @argh.arg('channels', nargs='*', help='Channels to check coverage of')
@argh.arg('--base-dir', help='Directory where segments are stored. Default is current working directory.') @argh.arg('--base-dir', help='Directory where segments are stored. Default is current working directory.')
@argh.arg('--qualities', help="Qualities of each channel to checked. Comma seperated if multiple. Default is 'source'.") @argh.arg('--qualities', help="Qualities of each channel to checked. Comma seperated if multiple. Default is 'source'.")
@argh.arg('--first-hour', help='First hour to compute coverage for. Default is earliest available hour.')
@argh.arg('--last-hour', help='Last hour to compute coverage for. Default is lastest available hour.')
@argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.') @argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.')
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
def main(channels, base_dir='.', qualities='source', metrics_port=8006, def main(channels, base_dir='.', qualities='source', first_hour=None,
backdoor_port=0): last_hour=None, metrics_port=8006, backdoor_port=0):
"""Segment coverage service""" """Segment coverage service"""
qualities = qualities.split(',') if qualities else [] qualities = qualities.split(',') if qualities else []
qualities = [quality.strip() for quality in qualities] qualities = [quality.strip() for quality in qualities]
if first_hour is not None:
first_hour = dateutil.parse(first_hour)
if last_hour is not None:
last_hour = dateutil.parse(last_hour)
common.PromLogCountsHandler.install() common.PromLogCountsHandler.install()
common.install_stacksampler() common.install_stacksampler()
@ -445,7 +434,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8006,
workers = [] workers = []
for channel in channels: for channel in channels:
logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir)) logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir))
manager = CoverageChecker(channel, qualities, base_dir) manager = CoverageChecker(channel, qualities, base_dir, first_hour, last_hour)
managers.append(manager) managers.append(manager)
workers.append(gevent.spawn(manager.run)) workers.append(gevent.spawn(manager.run))

@ -10,6 +10,7 @@ setup(
'matplotlib', 'matplotlib',
'numpy', 'numpy',
'prometheus-client', 'prometheus-client',
'python-dateutil',
'wubloader-common', 'wubloader-common',
], ],
) )

Loading…
Cancel
Save