From 3618510f35a34bf082484edd407bf99dcaf0e5d1 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Sun, 13 Oct 2019 01:19:32 +0100 Subject: [PATCH] basic functionality --- segment_coverage/segment_coverage/main.py | 47 ++++++++++++++++------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/segment_coverage/segment_coverage/main.py b/segment_coverage/segment_coverage/main.py index e356edc..4809e85 100644 --- a/segment_coverage/segment_coverage/main.py +++ b/segment_coverage/segment_coverage/main.py @@ -1,4 +1,4 @@ -import glob +import datatime import logging import os @@ -10,20 +10,18 @@ import common - class CoverageChecker(object): """Checks the segment coverage for a given channel in a a given directoy.""" CHECK_INTERVAL = 60 #seconds between checking coverage - def __init__(channel, qualities, base_dir, recent_cutoff): + def __init__(channel, qualities, base_dir): """Constructor for CoverageChecker. Creates a checker for a given channel with specified qualities.""" self.base_dir = base_dir self.channel = channel self.qualities = qualities - self.recent_cutoff = recent_cutoff self.stopping = gevent.event.Event() self.logger = logging.getLogger('CoverageChecker({})'.format(channel)) @@ -33,6 +31,7 @@ class CoverageChecker(object): self.logger.info('Stopping') self.stopping.set() + def run(self): """Loop over available hours for each quality, checking segment coverage.""" self.logger.info('Starting') @@ -40,24 +39,44 @@ class CoverageChecker(object): while not self.stopping.is_set(): for quality in self.qualities: - hour_dirs = glob.glob('{}/{}/{}/????-??-??T??'.format(self.basedir, self.channel, quality)) - for hour_dir in hour_dirs: - segments = glob.glob('{}/??:*-*-*.ts') + if self.stopping.is_set(): + break + path = os.path.join(self.basedir, self.channel, quality) + hours = [name for name in os.listdir(path) if not name.startswith('.')] + hours.sort() + for hour in hours: + if self.stopping.is_set(): + break + self.logger.info('Checking {}/{}'.format(quality, hour)) + path = os.path.join(self.basedir, self.channel, quality, hour) + segment_names = [name for name in os.listdir(path) if not name.startswith('.')] + segment_names.sort() + segments = [] + for name in segment_names: + path = os.path.join(hour, name) + try: + segments.append(parse_segment_path(path)) + except ValueError: + self.logger.warning('Skipping segment {} with invalid format'.format(path)) + + full_segments = [segment for segment in segments if segment.type == 'full'] + partial_segments = [segment for segment in segments if segment.type == 'partial'] + full_segments_duration = sum([segment.duration.seconds for segment in full_segments]) + partial_segments_duration = sum([segment.duration.seconds for segment in partial_segments]) + self.logger.info('{}/{}: {} full segments totalling {} s'.format(quality, hour, len(full_segments), full_segments_duration) + self.logger.info('{}/{}: {} partial segments totalling {} s'.format(quality, hour,len(partial_segments), partial_segments_duration) + + self.stopping.wait(common.jitter(self.CHECK_INTERVAL)) - for segment in segments: - seg = common.parse_segment_path(segment) - @argh.arg('channels', nargs='*', help='Channels to check coverage of') @argh.arg('--base-dir', help='Directory where segments are stored. Default is current working directory.') @argh.arg('--qualities', help="Qualities of each channel to checked. Comma seperated if multiple. Default is 'source'.") @argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') -@argh.arg('--recent-cutoff', help='Ignore segments younger than this when computing coverage. Expressed as number of seconds.') - def main(channels, base_dir='.', qualities='source', metrics_port=8006, - backdoor_port=0, recent_cutoff=120): + backdoor_port=0): """Segment coverage service""" qualities = qualities.split(',') if qualities else [] qualities = [quality.strip() for quality in qualities] @@ -70,7 +89,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8006, workers = [] for channel in channels: logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir)) - manager = CoverageChecker(channel, qualities, base_dir, recent_cutoff) + manager = CoverageChecker(channel, qualities, base_dir) managers.append(manager) workers.append(gevent.spawn(manager.run))