move restreamer.stats to common.stats

pull/68/head
Christopher Usher 5 years ago
parent 6858c2e2de
commit 6c633df3ee

@ -8,7 +8,7 @@ import os
import random import random
from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo
from .stats import timed, PromLogCountsHandler, install_stacksampler from .stats import timed, PromLogCountsHandler, install_stacksampler, request_stats, after_request
def dt_to_bustime(start, dt): def dt_to_bustime(start, dt):

@ -7,9 +7,11 @@ import signal
import sys import sys
import gevent.lock import gevent.lock
from flask import request
import prometheus_client as prom from flask import g as request_store
from monotonic import monotonic from monotonic import monotonic
import prometheus_client as prom
def timed(name=None, def timed(name=None,
@ -236,3 +238,106 @@ def install_stacksampler(interval=0.005):
signal.signal(signal.SIGVTALRM, sample) signal.signal(signal.SIGVTALRM, sample)
# deliver the first signal in INTERVAL seconds # deliver the first signal in INTERVAL seconds
signal.setitimer(signal.ITIMER_VIRTUAL, interval) signal.setitimer(signal.ITIMER_VIRTUAL, interval)
# Generic metrics that all http requests get logged to (see below for specific metrics per endpoint)
LATENCY_HELP = "Time taken to run the request handler and create a response"
# buckets: very long playlists / cutting can be quite slow,
# so we have a wider range of latencies than default, up to 10min.
LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600]
generic_latency = prom.Histogram(
'http_request_latency_all', LATENCY_HELP,
['endpoint', 'method', 'status'],
buckets=LATENCY_BUCKETS,
)
SIZE_HELP = 'Size in bytes of response body for non-chunked responses'
# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...)
SIZE_BUCKETS = [4**i for i in range(16)]
generic_size = prom.Histogram(
'http_response_size_all', SIZE_HELP,
['endpoint', 'method', 'status'],
buckets=SIZE_BUCKETS,
)
CONCURRENT_HELP = 'Number of requests currently ongoing'
generic_concurrent = prom.Gauge(
'http_request_concurrency_all', CONCURRENT_HELP,
['endpoint', 'method'],
)
def request_stats(fn):
"""Decorator that wraps a handler func to collect metrics.
Adds handler func args as labels, along with 'endpoint' label using func's name,
method and response status where applicable."""
# We have to jump through some hoops here, because the prometheus client lib demands
# we pre-define our label names, but we don't know the names of the handler kwargs
# until the first time the function's called. So we delay defining the metrics until
# first call.
# In addition, it doesn't let us have different sets of labels with the same name.
# So we record everything twice: Once under a generic name with only endpoint, method
# and status, and once under a name specific to the endpoint with the full set of labels.
metrics = {}
endpoint = fn.__name__
@functools.wraps(fn)
def _stats(**kwargs):
if not metrics:
# first call, set up metrics
labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method']
labels = labels_no_status + ['status']
metrics['latency'] = prom.Histogram(
'http_request_latency_{}'.format(endpoint), LATENCY_HELP,
labels, buckets=LATENCY_BUCKETS,
)
metrics['size'] = prom.Histogram(
'http_response_size_{}'.format(endpoint), SIZE_HELP,
labels, buckets=SIZE_BUCKETS,
)
metrics['concurrent'] = prom.Gauge(
'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP,
labels_no_status,
)
request_store.metrics = metrics
request_store.endpoint = endpoint
request_store.method = request.method
request_store.labels = {k: str(v) for k, v in kwargs.items()}
generic_concurrent.labels(endpoint=endpoint, method=request.method).inc()
metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc()
request_store.start_time = monotonic()
return fn(**kwargs)
return _stats
def after_request(response):
"""Must be registered to run after requests. Finishes tracking the request
and logs most of the metrics.
We do it in this way, instead of inside the request_stats wrapper, because it lets flask
normalize the handler result into a Response object.
"""
if 'metrics' not in request_store:
return response # untracked handler
end_time = monotonic()
metrics = request_store.metrics
endpoint = request_store.endpoint
method = request_store.method
labels = request_store.labels
start_time = request_store.start_time
generic_concurrent.labels(endpoint=endpoint, method=method).dec()
metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec()
status = str(response.status_code)
generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time)
metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time)
size = response.calculate_content_length()
if size is not None:
generic_size.labels(endpoint=endpoint, method=method, status=status).observe(size)
metrics['size'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(size)
return response

@ -6,6 +6,7 @@ setup(
packages = find_packages(), packages = find_packages(),
install_requires = [ install_requires = [
"gevent", "gevent",
"flask",
"monotonic", "monotonic",
"prometheus-client", "prometheus-client",
], ],

@ -14,10 +14,9 @@ from flask import Flask, url_for, request, abort, Response
from gevent.pywsgi import WSGIServer from gevent.pywsgi import WSGIServer
import common.dateutil import common.dateutil
from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler, request_stats, after_request
import generate_hls import generate_hls
from stats import stats, after_request
app = Flask('restreamer', static_url_path='/segments') app = Flask('restreamer', static_url_path='/segments')
@ -82,13 +81,13 @@ def cors(app):
@app.route('/metrics') @app.route('/metrics')
@stats @request_stats
def metrics(): def metrics():
"""Return current metrics in prometheus metrics format""" """Return current metrics in prometheus metrics format"""
return prom.generate_latest() return prom.generate_latest()
@app.route('/files') @app.route('/files')
@stats @request_stats
def list_channels(): def list_channels():
"""Returns a JSON list of channels for which there may be segments available. """Returns a JSON list of channels for which there may be segments available.
Returns empty list if no channels are available. Returns empty list if no channels are available.
@ -98,7 +97,7 @@ def list_channels():
@app.route('/files/<channel>') @app.route('/files/<channel>')
@stats @request_stats
@has_path_args @has_path_args
def list_qualities(channel): def list_qualities(channel):
"""Returns a JSON list of qualities for the given channel for which there """Returns a JSON list of qualities for the given channel for which there
@ -110,7 +109,7 @@ def list_qualities(channel):
return json.dumps(listdir(path, error=False)) return json.dumps(listdir(path, error=False))
@app.route('/files/<channel>/<quality>') @app.route('/files/<channel>/<quality>')
@stats @request_stats
@has_path_args @has_path_args
def list_hours(channel, quality): def list_hours(channel, quality):
"""Returns a JSON list of hours for the given channel and quality for which """Returns a JSON list of hours for the given channel and quality for which
@ -126,7 +125,7 @@ def list_hours(channel, quality):
@app.route('/files/<channel>/<quality>/<hour>') @app.route('/files/<channel>/<quality>/<hour>')
@stats @request_stats
@has_path_args @has_path_args
def list_segments(channel, quality, hour): def list_segments(channel, quality, hour):
"""Returns a JSON list of segment files for a given channel, quality and """Returns a JSON list of segment files for a given channel, quality and
@ -155,7 +154,7 @@ def time_range_for_quality(channel, quality):
@app.route('/playlist/<channel>.m3u8') @app.route('/playlist/<channel>.m3u8')
@stats @request_stats
@has_path_args @has_path_args
def generate_master_playlist(channel): def generate_master_playlist(channel):
"""Returns a HLS master playlist for the given channel. """Returns a HLS master playlist for the given channel.
@ -185,7 +184,7 @@ def generate_master_playlist(channel):
@app.route('/playlist/<channel>/<quality>.m3u8') @app.route('/playlist/<channel>/<quality>.m3u8')
@stats @request_stats
@has_path_args @has_path_args
def generate_media_playlist(channel, quality): def generate_media_playlist(channel, quality):
"""Returns a HLS media playlist for the given channel and quality. """Returns a HLS media playlist for the given channel and quality.
@ -225,7 +224,7 @@ def generate_media_playlist(channel, quality):
@app.route('/cut/<channel>/<quality>.ts') @app.route('/cut/<channel>/<quality>.ts')
@stats @request_stats
@has_path_args @has_path_args
def cut(channel, quality): def cut(channel, quality):
"""Return a MPEGTS video file covering the exact timestamp range. """Return a MPEGTS video file covering the exact timestamp range.

@ -1,111 +0,0 @@
import functools
import prometheus_client as prom
from flask import request
from flask import g as request_store
from monotonic import monotonic
# Generic metrics that all requests get logged to (see below for specific metrics per endpoint)
LATENCY_HELP = "Time taken to run the request handler and create a response"
# buckets: very long playlists / cutting can be quite slow,
# so we have a wider range of latencies than default, up to 10min.
LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600]
generic_latency = prom.Histogram(
'http_request_latency_all', LATENCY_HELP,
['endpoint', 'method', 'status'],
buckets=LATENCY_BUCKETS,
)
SIZE_HELP = 'Size in bytes of response body for non-chunked responses'
# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...)
SIZE_BUCKETS = [4**i for i in range(16)]
generic_size = prom.Histogram(
'http_response_size_all', SIZE_HELP,
['endpoint', 'method', 'status'],
buckets=SIZE_BUCKETS,
)
CONCURRENT_HELP = 'Number of requests currently ongoing'
generic_concurrent = prom.Gauge(
'http_request_concurrency_all', CONCURRENT_HELP,
['endpoint', 'method'],
)
def stats(fn):
"""Decorator that wraps a handler func to collect metrics.
Adds handler func args as labels, along with 'endpoint' label using func's name,
method and response status where applicable."""
# We have to jump through some hoops here, because the prometheus client lib demands
# we pre-define our label names, but we don't know the names of the handler kwargs
# until the first time the function's called. So we delay defining the metrics until
# first call.
# In addition, it doesn't let us have different sets of labels with the same name.
# So we record everything twice: Once under a generic name with only endpoint, method
# and status, and once under a name specific to the endpoint with the full set of labels.
metrics = {}
endpoint = fn.__name__
@functools.wraps(fn)
def _stats(**kwargs):
if not metrics:
# first call, set up metrics
labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method']
labels = labels_no_status + ['status']
metrics['latency'] = prom.Histogram(
'http_request_latency_{}'.format(endpoint), LATENCY_HELP,
labels, buckets=LATENCY_BUCKETS,
)
metrics['size'] = prom.Histogram(
'http_response_size_{}'.format(endpoint), SIZE_HELP,
labels, buckets=SIZE_BUCKETS,
)
metrics['concurrent'] = prom.Gauge(
'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP,
labels_no_status,
)
request_store.metrics = metrics
request_store.endpoint = endpoint
request_store.method = request.method
request_store.labels = {k: str(v) for k, v in kwargs.items()}
generic_concurrent.labels(endpoint=endpoint, method=request.method).inc()
metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc()
request_store.start_time = monotonic()
return fn(**kwargs)
return _stats
def after_request(response):
"""Must be registered to run after requests. Finishes tracking the request
and logs most of the metrics.
We do it in this way, instead of inside the stats wrapper, because it lets flask
normalize the handler result into a Response object.
"""
if 'metrics' not in request_store:
return response # untracked handler
end_time = monotonic()
metrics = request_store.metrics
endpoint = request_store.endpoint
method = request_store.method
labels = request_store.labels
start_time = request_store.start_time
generic_concurrent.labels(endpoint=endpoint, method=method).dec()
metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec()
status = str(response.status_code)
generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time)
metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time)
size = response.calculate_content_length()
if size is not None:
generic_size.labels(endpoint=endpoint, method=method, status=status).observe(size)
metrics['size'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(size)
return response
Loading…
Cancel
Save