diff --git a/segment_coverage/segment_coverage/main.py b/segment_coverage/segment_coverage/main.py index dd6e4c4..71a35df 100644 --- a/segment_coverage/segment_coverage/main.py +++ b/segment_coverage/segment_coverage/main.py @@ -2,10 +2,11 @@ import datetime import itertools import logging import os -import random import signal +import uuid import argh +import dateutil import gevent.backdoor import matplotlib import matplotlib.image @@ -13,47 +14,30 @@ import numpy as np import prometheus_client as prom import common +from common import dateutil -full_segment_count_gauge = prom.Gauge( - 'full_segment_count', - 'Number of full segments in an hour', - ['channel', 'quality', 'hour'], -) - -partial_segment_count_gauge = prom.Gauge( - 'partial_segment_count', - 'Number of partial segments in an hour', - ['channel', 'quality', 'hour'], -) - -bad_segment_count_gauge = prom.Gauge( - 'bad_segment_count', - 'Number of segments that fail to parse in an hour', - ['channel', 'quality', 'hour'], +segment_count_gauge = prom.Gauge( + 'segment_count', + 'Number of segments in an hour', + ['channel', 'quality', 'hour', 'type'], ) -full_segment_duration_gauge = prom.Gauge( - 'full_segment_duration', - 'Full segment duration in an hour', - ['channel', 'quality', 'hour'], -) - -partial_segment_duration_gauge = prom.Gauge( - 'partial_segment_duration', - 'Partial segment duration in an hour', - ['channel', 'quality', 'hour'], +segment_duration_gauge = prom.Gauge( + 'segment_duration', + 'Segment duration in an hour', + ['channel', 'quality', 'hour', 'type'], ) raw_coverage_gauge = prom.Gauge( 'raw_coverage', - 'Raw coverage for the hour', + 'Total time covered by segments in an hour', ['channel', 'quality', 'hour'], ) editable_coverage_gauge = prom.Gauge( 'editable_coverage', - 'Editable coverage for the hour', + 'Non-overlapping time covered by segments in an hour', ['channel', 'quality', 'hour'], ) @@ -64,33 +48,21 @@ raw_holes_gauge = prom.Gauge( ) editable_holes_gauge = prom.Gauge( - 'editable_hole', + 'editable_holes', 'Number of holes in editable coverage for the hour', ['channel', 'quality', 'hour'], ) -full_overlap_count_gauge = prom.Gauge( - 'full_overlap_count', - 'Number of overlap full segments for the hour', - ['channel', 'quality', 'hour'], -) - -partial_overlap_count_gauge = prom.Gauge( - 'partial_overlap_count', - 'Number of overlap partial segments for the hour', - ['channel', 'quality', 'hour'], +overlap_count_gauge = prom.Gauge( + 'overlap_count', + 'Number of overlap segments for the hour', + ['channel', 'quality', 'hour', 'type'], ) -full_overlap_duration_gauge = prom.Gauge( - 'full_overlap_duration', - 'Duration of overlaping full segments for the hour', - ['channel', 'quality', 'hour'], -) - -partial_overlap_duration_gauge = prom.Gauge( - 'partial_overlap_duration', - 'Duration of overlaping partial segments for the hour', - ['channel', 'quality', 'hour'], +overlap_duration_gauge = prom.Gauge( + 'overlap_duration', + 'Duration of overlaping segments for the hour', + ['channel', 'quality', 'hour', 'type'], ) @@ -101,24 +73,28 @@ class CoverageChecker(object): CHECK_INTERVAL = 60 #seconds between checking coverage - def __init__(self, channel, qualities, base_dir): + def __init__(self, channel, qualities, base_dir, first_hour, last_hour): """Constructor for CoverageChecker. Creates a checker for a given channel with specified qualities.""" + self.base_dir = base_dir self.channel = channel self.qualities = qualities + self.first_hour = first_hour + self.last_hour = last_hour self.stopping = gevent.event.Event() self.logger = logging.getLogger('CoverageChecker({})'.format(channel)) def stop(self): """Stop checking coverage.""" + self.logger.info('Stopping') self.stopping.set() def create_coverage_map(self, quality, all_hour_holes, all_hour_partials, - pixel_length=2, hour_count=168, rows=300): + pixel_length=2, rows=300): """Create a PNG image showing segment coverage. Each pixel repersents pixel_length seconds, with time increasing from @@ -133,22 +109,32 @@ class CoverageChecker(object): all_hour_holes -- a dict mapping hours to lists of holes all_hour_holes -- a dict mapping hours to lists of partial segments pixel_length -- length of a pixel in seconds - hour_count -- number of hours to create the map for rows -- the height of the image""" - self.logger.info('Creating coverage map for {}'.format(quality)) - latest_hour = datetime.datetime.strptime(max(all_hour_holes.keys()), HOUR_FMT) - hours = [latest_hour - datetime.timedelta(hours=i) for i in range(hour_count - 1, -1, -1)] + if self.first_hour is None: + first_hour = datetime.datetime.strptime(min(all_hour_holes.keys()), HOUR_FMT) + else: + first_hour = self.first_hour.replace(minute=0, second=0, microsecond=0) + if self.last_hour is None: + last_hour = datetime.datetime.strptime(max(all_hour_holes.keys()), HOUR_FMT) + else: + last_hour = self.last_hour.replace(minute=0, second=0, microsecond=0) + self.logger.info('Creating coverage map for {} from {} to {}'.format(quality, + first_hour.strftime(HOUR_FMT), last_hour.strftime(HOUR_FMT))) + + hours = [first_hour] + latest_hour = first_hour + while latest_hour < last_hour: + latest_hour += datetime.timedelta(hours = 1) + hours.append(latest_hour) pixel_starts = np.arange(0, 3600, pixel_length) # start times of the pixels in an hour in seconds pixel_ends = np.arange(pixel_length, 3601, pixel_length) # end times of the pixels in an hour in seconds - pixel_count = 3600 / pixel_length # number of pixels in an hour - coverage_mask = np.zeros(hour_count * pixel_count, dtype=np.bool_) - partial_mask = np.zeros(hour_count * pixel_count, dtype=np.bool_) - for i in range(len(hours)): - hour = hours[i] + coverage_mask = np.zeros(len(hours) * pixel_count, dtype=np.bool_) + partial_mask = np.zeros(len(hours) * pixel_count, dtype=np.bool_) + for i, hour in enumerate(hours): hour_str = hour.strftime(HOUR_FMT) if hour_str in all_hour_holes: @@ -178,9 +164,9 @@ class CoverageChecker(object): colours[coverage_mask] = matplotlib.colors.to_rgb('tab:blue') colours[coverage_mask & partial_mask] = matplotlib.colors.to_rgb('tab:orange') # write the pixel array to a temporary file then atomically rename it - final_path = os.path.join(self.base_dir, 'coverage-maps', - '{}_{}_coverage.png'.format(self.channel, quality)) - temp_path = final_path.replace('_coverage', '_{}'.format(random.getrandbits(32))) + path_prefix = os.path.join(self.base_dir, 'coverage-maps', '{}_{}'.format(self.channel, quality)) + temp_path = '{}_{}.png'.format(path_prefix, uuid.uuid4()) + final_path = '{}_coverage.png'.format(path_prefix) common.ensure_directory(temp_path) matplotlib.image.imsave(temp_path, colours) os.rename(temp_path, final_path) @@ -218,8 +204,8 @@ class CoverageChecker(object): for name in segment_names: try: parsed.append(common.parse_segment_path(os.path.join(hour_path, name))) - except ValueError as e: - self.logger.warn(e) + except ValueError: + self.logger.warning("Failed to parse segment: {!r}".format(os.path.join(hour_path, name)), exc_info=True) bad_segment_count += 1 full_segment_count = 0 @@ -255,20 +241,16 @@ class CoverageChecker(object): partial_segment_count += 1 partial_segment_duration += segment.duration if full_segments: - if len(full_segments) == 1: - best_segment = full_segments[0] - else: - full_segments.sort(key=lambda segment: (segment.duration)) - best_segment = full_segments[-1] - for segment in full_segments[:-1]: - full_overlaps += 1 - full_overlap_duration += segment.duration - if partial_segments: - for segment in partial_segments: - partial_overlaps += 1 - partial_overlap_duration += segment.duration + full_segments.sort(key=lambda segment: (segment.duration)) + best_segment = full_segments[-1] + for segment in full_segments[:-1]: + full_overlaps += 1 + full_overlap_duration += segment.duration + for segment in partial_segments: + partial_overlaps += 1 + partial_overlap_duration += segment.duration else: - partial_segments.sort(key=lambda segment: (segment.duration)) + partial_segments.sort(key=lambda segment: os.stat(segment.path).st_size) best_segment = partial_segments[-1] only_partials.append((best_segment.start, best_segment.start + best_segment.duration)) for segment in partial_segments[:-1]: @@ -332,20 +314,20 @@ class CoverageChecker(object): editable_hole_duration += hour_end - end # update the large number of Prometheus guages - full_segment_count_gauge.labels( - channel=self.channel, quality=quality, hour=hour + segment_count_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='full' ).set(full_segment_count) - partial_segment_count_gauge.labels( - channel=self.channel, quality=quality, hour=hour + segment_count_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='partial' ).set(partial_segment_count) - bad_segment_count_gauge.labels( - channel=self.channel, quality=quality, hour=hour + segment_count_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='bad' ).set(bad_segment_count) - full_segment_duration_gauge.labels( - channel=self.channel, quality=quality, hour=hour + segment_duration_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='full' ).set(full_segment_duration.total_seconds()) - partial_segment_duration_gauge.labels( - channel=self.channel, quality=quality, hour=hour + segment_duration_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='partial' ).set(partial_segment_duration.total_seconds()) raw_coverage_gauge.labels( channel=self.channel, quality=quality, hour=hour @@ -359,17 +341,17 @@ class CoverageChecker(object): editable_holes_gauge.labels( channel=self.channel, quality=quality, hour=hour ).set(len(editable_holes)) - full_overlap_count_gauge.labels( - channel=self.channel, quality=quality, hour=hour + overlap_count_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='full' ).set(full_overlaps) - partial_overlap_count_gauge.labels( - channel=self.channel, quality=quality, hour=hour + overlap_count_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='partial' ).set(partial_overlaps) - full_overlap_duration_gauge.labels( - channel=self.channel, quality=quality, hour=hour + overlap_duration_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='full' ).set(full_overlap_duration.total_seconds()) - partial_overlap_duration_gauge.labels( - channel=self.channel, quality=quality, hour=hour + overlap_duration_gauge.labels( + channel=self.channel, quality=quality, hour=hour, type='partial' ).set(partial_overlap_duration.total_seconds()) # log the same information @@ -429,13 +411,20 @@ class CoverageChecker(object): @argh.arg('channels', nargs='*', help='Channels to check coverage of') @argh.arg('--base-dir', help='Directory where segments are stored. Default is current working directory.') @argh.arg('--qualities', help="Qualities of each channel to checked. Comma seperated if multiple. Default is 'source'.") +@argh.arg('--first-hour', help='First hour to compute coverage for. Default is earliest available hour.') +@argh.arg('--last-hour', help='Last hour to compute coverage for. Default is lastest available hour.') @argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') -def main(channels, base_dir='.', qualities='source', metrics_port=8006, - backdoor_port=0): +def main(channels, base_dir='.', qualities='source', first_hour=None, + last_hour=None, metrics_port=8006, backdoor_port=0): """Segment coverage service""" + qualities = qualities.split(',') if qualities else [] qualities = [quality.strip() for quality in qualities] + if first_hour is not None: + first_hour = dateutil.parse(first_hour) + if last_hour is not None: + last_hour = dateutil.parse(last_hour) common.PromLogCountsHandler.install() common.install_stacksampler() @@ -445,7 +434,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8006, workers = [] for channel in channels: logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir)) - manager = CoverageChecker(channel, qualities, base_dir) + manager = CoverageChecker(channel, qualities, base_dir, first_hour, last_hour) managers.append(manager) workers.append(gevent.spawn(manager.run)) diff --git a/segment_coverage/setup.py b/segment_coverage/setup.py index e110592..764a9bb 100644 --- a/segment_coverage/setup.py +++ b/segment_coverage/setup.py @@ -10,6 +10,7 @@ setup( 'matplotlib', 'numpy', 'prometheus-client', + 'python-dateutil', 'wubloader-common', ], )