|
|
|
@ -1,4 +1,4 @@
|
|
|
|
|
import glob
|
|
|
|
|
import datatime
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
|
@ -10,20 +10,18 @@ import common
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CoverageChecker(object):
|
|
|
|
|
"""Checks the segment coverage for a given channel in a a given directoy."""
|
|
|
|
|
|
|
|
|
|
CHECK_INTERVAL = 60 #seconds between checking coverage
|
|
|
|
|
|
|
|
|
|
def __init__(channel, qualities, base_dir, recent_cutoff):
|
|
|
|
|
def __init__(channel, qualities, base_dir):
|
|
|
|
|
"""Constructor for CoverageChecker.
|
|
|
|
|
|
|
|
|
|
Creates a checker for a given channel with specified qualities."""
|
|
|
|
|
self.base_dir = base_dir
|
|
|
|
|
self.channel = channel
|
|
|
|
|
self.qualities = qualities
|
|
|
|
|
self.recent_cutoff = recent_cutoff
|
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
|
self.logger = logging.getLogger('CoverageChecker({})'.format(channel))
|
|
|
|
|
|
|
|
|
@ -33,6 +31,7 @@ class CoverageChecker(object):
|
|
|
|
|
self.logger.info('Stopping')
|
|
|
|
|
self.stopping.set()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
"""Loop over available hours for each quality, checking segment coverage."""
|
|
|
|
|
self.logger.info('Starting')
|
|
|
|
@ -40,24 +39,44 @@ class CoverageChecker(object):
|
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
|
|
|
|
|
|
for quality in self.qualities:
|
|
|
|
|
hour_dirs = glob.glob('{}/{}/{}/????-??-??T??'.format(self.basedir, self.channel, quality))
|
|
|
|
|
for hour_dir in hour_dirs:
|
|
|
|
|
segments = glob.glob('{}/??:*-*-*.ts')
|
|
|
|
|
if self.stopping.is_set():
|
|
|
|
|
break
|
|
|
|
|
path = os.path.join(self.basedir, self.channel, quality)
|
|
|
|
|
hours = [name for name in os.listdir(path) if not name.startswith('.')]
|
|
|
|
|
hours.sort()
|
|
|
|
|
for hour in hours:
|
|
|
|
|
if self.stopping.is_set():
|
|
|
|
|
break
|
|
|
|
|
self.logger.info('Checking {}/{}'.format(quality, hour))
|
|
|
|
|
path = os.path.join(self.basedir, self.channel, quality, hour)
|
|
|
|
|
segment_names = [name for name in os.listdir(path) if not name.startswith('.')]
|
|
|
|
|
segment_names.sort()
|
|
|
|
|
segments = []
|
|
|
|
|
for name in segment_names:
|
|
|
|
|
path = os.path.join(hour, name)
|
|
|
|
|
try:
|
|
|
|
|
segments.append(parse_segment_path(path))
|
|
|
|
|
except ValueError:
|
|
|
|
|
self.logger.warning('Skipping segment {} with invalid format'.format(path))
|
|
|
|
|
|
|
|
|
|
full_segments = [segment for segment in segments if segment.type == 'full']
|
|
|
|
|
partial_segments = [segment for segment in segments if segment.type == 'partial']
|
|
|
|
|
full_segments_duration = sum([segment.duration.seconds for segment in full_segments])
|
|
|
|
|
partial_segments_duration = sum([segment.duration.seconds for segment in partial_segments])
|
|
|
|
|
self.logger.info('{}/{}: {} full segments totalling {} s'.format(quality, hour, len(full_segments), full_segments_duration)
|
|
|
|
|
self.logger.info('{}/{}: {} partial segments totalling {} s'.format(quality, hour,len(partial_segments), partial_segments_duration)
|
|
|
|
|
|
|
|
|
|
self.stopping.wait(common.jitter(self.CHECK_INTERVAL))
|
|
|
|
|
|
|
|
|
|
for segment in segments:
|
|
|
|
|
seg = common.parse_segment_path(segment)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@argh.arg('channels', nargs='*', help='Channels to check coverage of')
|
|
|
|
|
@argh.arg('--base-dir', help='Directory where segments are stored. Default is current working directory.')
|
|
|
|
|
@argh.arg('--qualities', help="Qualities of each channel to checked. Comma seperated if multiple. Default is 'source'.")
|
|
|
|
|
@argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.')
|
|
|
|
|
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
|
|
|
|
|
@argh.arg('--recent-cutoff', help='Ignore segments younger than this when computing coverage. Expressed as number of seconds.')
|
|
|
|
|
|
|
|
|
|
def main(channels, base_dir='.', qualities='source', metrics_port=8006,
|
|
|
|
|
backdoor_port=0, recent_cutoff=120):
|
|
|
|
|
backdoor_port=0):
|
|
|
|
|
"""Segment coverage service"""
|
|
|
|
|
qualities = qualities.split(',') if qualities else []
|
|
|
|
|
qualities = [quality.strip() for quality in qualities]
|
|
|
|
@ -70,7 +89,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8006,
|
|
|
|
|
workers = []
|
|
|
|
|
for channel in channels:
|
|
|
|
|
logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir))
|
|
|
|
|
manager = CoverageChecker(channel, qualities, base_dir, recent_cutoff)
|
|
|
|
|
manager = CoverageChecker(channel, qualities, base_dir)
|
|
|
|
|
managers.append(manager)
|
|
|
|
|
workers.append(gevent.spawn(manager.run))
|
|
|
|
|
|
|
|
|
|