started on the segment_coverage service

pull/128/head
Christopher Usher 5 years ago
parent 095e391b60
commit 929308f3e7

@ -10,7 +10,6 @@ import signal
import socket import socket
import urlparse import urlparse
import uuid import uuid
from base64 import b64encode
import argh import argh
import gevent.backdoor import gevent.backdoor

@ -0,0 +1,14 @@
FROM alpine:3.7
# dependencies needed for compiling c extensions
# also busybox-extras for telnet for easier use of backdoor
RUN apk --update add py2-pip gcc python-dev musl-dev busybox-extras
# Install common lib first as it changes less
COPY common /tmp/common
RUN pip install /tmp/common && rm -r /tmp/common
# Install actual application
COPY segment_coverage /tmp/segment_coverage
RUN pip install /tmp/segment_coverage && rm -r /tmp/segment_coverage
ENTRYPOINT ["python2", "-m", "segment_coverage"]

@ -0,0 +1,17 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import os
import argh
from segment_coverage.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=level, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -0,0 +1,101 @@
import glob
import logging
import os
import argh
import gevent.backdoor
import prometheus_client as prom
import common
class CoverageChecker(object):
"""Checks the segment coverage for a given channel in a a given directoy."""
CHECK_INTERVAL = 60 #seconds between checking coverage
def __init__(channel, qualities, base_dir, recent_cutoff):
"""Constructor for CoverageChecker.
Creates a checker for a given channel with specified qualities."""
self.base_dir = base_dir
self.channel = channel
self.qualities = qualities
self.recent_cutoff = recent_cutoff
self.stopping = gevent.event.Event()
self.logger = logging.getLogger('CoverageChecker({})'.format(channel))
def stop(self):
"""Stop checking coverage."""
self.logger.info('Stopping')
self.stopping.set()
def run(self):
"""Loop over available hours for each quality, checking segment coverage."""
self.logger.info('Starting')
while not self.stopping.is_set():
for quality in self.qualities:
hour_dirs = glob.glob('{}/{}/{}/????-??-??T??'.format(self.basedir, self.channel, quality))
for hour_dir in hour_dirs:
segments = glob.glob('{}/??:*-*-*.ts')
for segment in segments:
seg = common.parse_segment_path(segment)
@argh.arg('channels', nargs='*', help='Channels to check coverage of')
@argh.arg('--base-dir', help='Directory where segments are stored. Default is current working directory.')
@argh.arg('--qualities', help="Qualities of each channel to checked. Comma seperated if multiple. Default is 'source'.")
@argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8006.')
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
@argh.arg('--recent-cutoff', help='Ignore segments younger than this when computing coverage. Expressed as number of seconds.')
def main(channels, base_dir='.', qualities='source', metrics_port=8006,
backdoor_port=0, recent_cutoff=120):
"""Segment coverage service"""
qualities = qualities.split(',') if qualities else []
qualities = [quality.strip() for quality in qualities]
common.PromLogCountsHandler.install()
common.install_stacksampler()
prom.start_http_server(metrics_port)
managers = []
workers = []
for channel in channels:
logging.info('Starting coverage checks {} with {} as qualities in {}'.format(channel, ', '.join(qualities), base_dir))
manager = CoverageChecker(channel, qualities, base_dir, recent_cutoff)
managers.append(manager)
workers.append(gevent.spawn(manager.run))
def stop():
for manager in managers:
manager.stop()
gevent.signal(signal.SIGTERM, stop)
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
# Wait for any to die
gevent.wait(workers, count=1)
# If one has stopped, either:
# 1. stop() was called and all are stopping
# 2. one errored and we should stop all remaining and report the error
# Our behaviour in both cases is the same:
# 1. Tell all managers to gracefully stop
stop()
# 2. Wait (with timeout) until they've stopped
gevent.wait(workers)
# 3. Check if any of them failed. If they did, report it. If mulitple
# failed, we report one arbitrarily.
for worker in workers:
worker.get()
logging.info('Gracefully stopped')

@ -0,0 +1,14 @@
from setuptools import setup, find_packages
setup(
name = 'wubloader-segment_coverage',
version = '0.0.0',
packages = find_packages(),
install_requires = [
'argh',
'gevent',
'prometheus-client',
'python-dateutil',
'wubloader-common',
],
)
Loading…
Cancel
Save