diff --git a/common/common/__init__.py b/common/common/__init__.py index e961fc1..cc7f6a1 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -8,7 +8,7 @@ import os import random from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo -from .stats import timed, PromLogCountsHandler, install_stacksampler +from .stats import timed, PromLogCountsHandler, install_stacksampler, request_stats, after_request def dt_to_bustime(start, dt): diff --git a/common/common/stats.py b/common/common/stats.py index 34c482c..b255c97 100644 --- a/common/common/stats.py +++ b/common/common/stats.py @@ -7,9 +7,11 @@ import signal import sys import gevent.lock - -import prometheus_client as prom +from flask import request +from flask import g as request_store from monotonic import monotonic +import prometheus_client as prom + def timed(name=None, @@ -236,3 +238,106 @@ def install_stacksampler(interval=0.005): signal.signal(signal.SIGVTALRM, sample) # deliver the first signal in INTERVAL seconds signal.setitimer(signal.ITIMER_VIRTUAL, interval) + + +# Generic metrics that all http requests get logged to (see below for specific metrics per endpoint) + +LATENCY_HELP = "Time taken to run the request handler and create a response" +# buckets: very long playlists / cutting can be quite slow, +# so we have a wider range of latencies than default, up to 10min. +LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600] +generic_latency = prom.Histogram( + 'http_request_latency_all', LATENCY_HELP, + ['endpoint', 'method', 'status'], + buckets=LATENCY_BUCKETS, +) + +SIZE_HELP = 'Size in bytes of response body for non-chunked responses' +# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...) +SIZE_BUCKETS = [4**i for i in range(16)] +generic_size = prom.Histogram( + 'http_response_size_all', SIZE_HELP, + ['endpoint', 'method', 'status'], + buckets=SIZE_BUCKETS, +) + +CONCURRENT_HELP = 'Number of requests currently ongoing' +generic_concurrent = prom.Gauge( + 'http_request_concurrency_all', CONCURRENT_HELP, + ['endpoint', 'method'], +) + + +def request_stats(fn): + """Decorator that wraps a handler func to collect metrics. + Adds handler func args as labels, along with 'endpoint' label using func's name, + method and response status where applicable.""" + # We have to jump through some hoops here, because the prometheus client lib demands + # we pre-define our label names, but we don't know the names of the handler kwargs + # until the first time the function's called. So we delay defining the metrics until + # first call. + # In addition, it doesn't let us have different sets of labels with the same name. + # So we record everything twice: Once under a generic name with only endpoint, method + # and status, and once under a name specific to the endpoint with the full set of labels. + metrics = {} + endpoint = fn.__name__ + + @functools.wraps(fn) + def _stats(**kwargs): + if not metrics: + # first call, set up metrics + labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method'] + labels = labels_no_status + ['status'] + metrics['latency'] = prom.Histogram( + 'http_request_latency_{}'.format(endpoint), LATENCY_HELP, + labels, buckets=LATENCY_BUCKETS, + ) + metrics['size'] = prom.Histogram( + 'http_response_size_{}'.format(endpoint), SIZE_HELP, + labels, buckets=SIZE_BUCKETS, + ) + metrics['concurrent'] = prom.Gauge( + 'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP, + labels_no_status, + ) + + request_store.metrics = metrics + request_store.endpoint = endpoint + request_store.method = request.method + request_store.labels = {k: str(v) for k, v in kwargs.items()} + generic_concurrent.labels(endpoint=endpoint, method=request.method).inc() + metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc() + request_store.start_time = monotonic() + return fn(**kwargs) + + return _stats + + +def after_request(response): + """Must be registered to run after requests. Finishes tracking the request + and logs most of the metrics. + We do it in this way, instead of inside the request_stats wrapper, because it lets flask + normalize the handler result into a Response object. + """ + if 'metrics' not in request_store: + return response # untracked handler + + end_time = monotonic() + metrics = request_store.metrics + endpoint = request_store.endpoint + method = request_store.method + labels = request_store.labels + start_time = request_store.start_time + + generic_concurrent.labels(endpoint=endpoint, method=method).dec() + metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec() + + status = str(response.status_code) + generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time) + metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time) + size = response.calculate_content_length() + if size is not None: + generic_size.labels(endpoint=endpoint, method=method, status=status).observe(size) + metrics['size'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(size) + + return response diff --git a/common/setup.py b/common/setup.py index ae2a724..471c214 100644 --- a/common/setup.py +++ b/common/setup.py @@ -6,6 +6,7 @@ setup( packages = find_packages(), install_requires = [ "gevent", + "flask", "monotonic", "prometheus-client", ], diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index a05874e..f18d8cc 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -14,10 +14,9 @@ from flask import Flask, url_for, request, abort, Response from gevent.pywsgi import WSGIServer import common.dateutil -from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler +from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler, request_stats, after_request import generate_hls -from stats import stats, after_request app = Flask('restreamer', static_url_path='/segments') @@ -82,13 +81,13 @@ def cors(app): @app.route('/metrics') -@stats +@request_stats def metrics(): """Return current metrics in prometheus metrics format""" return prom.generate_latest() @app.route('/files') -@stats +@request_stats def list_channels(): """Returns a JSON list of channels for which there may be segments available. Returns empty list if no channels are available. @@ -98,7 +97,7 @@ def list_channels(): @app.route('/files/') -@stats +@request_stats @has_path_args def list_qualities(channel): """Returns a JSON list of qualities for the given channel for which there @@ -110,7 +109,7 @@ def list_qualities(channel): return json.dumps(listdir(path, error=False)) @app.route('/files//') -@stats +@request_stats @has_path_args def list_hours(channel, quality): """Returns a JSON list of hours for the given channel and quality for which @@ -126,7 +125,7 @@ def list_hours(channel, quality): @app.route('/files///') -@stats +@request_stats @has_path_args def list_segments(channel, quality, hour): """Returns a JSON list of segment files for a given channel, quality and @@ -155,7 +154,7 @@ def time_range_for_quality(channel, quality): @app.route('/playlist/.m3u8') -@stats +@request_stats @has_path_args def generate_master_playlist(channel): """Returns a HLS master playlist for the given channel. @@ -185,7 +184,7 @@ def generate_master_playlist(channel): @app.route('/playlist//.m3u8') -@stats +@request_stats @has_path_args def generate_media_playlist(channel, quality): """Returns a HLS media playlist for the given channel and quality. @@ -225,7 +224,7 @@ def generate_media_playlist(channel, quality): @app.route('/cut//.ts') -@stats +@request_stats @has_path_args def cut(channel, quality): """Return a MPEGTS video file covering the exact timestamp range. diff --git a/restreamer/restreamer/stats.py b/restreamer/restreamer/stats.py deleted file mode 100644 index e43cf93..0000000 --- a/restreamer/restreamer/stats.py +++ /dev/null @@ -1,111 +0,0 @@ - -import functools - -import prometheus_client as prom -from flask import request -from flask import g as request_store -from monotonic import monotonic - - -# Generic metrics that all requests get logged to (see below for specific metrics per endpoint) - -LATENCY_HELP = "Time taken to run the request handler and create a response" -# buckets: very long playlists / cutting can be quite slow, -# so we have a wider range of latencies than default, up to 10min. -LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600] -generic_latency = prom.Histogram( - 'http_request_latency_all', LATENCY_HELP, - ['endpoint', 'method', 'status'], - buckets=LATENCY_BUCKETS, -) - -SIZE_HELP = 'Size in bytes of response body for non-chunked responses' -# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...) -SIZE_BUCKETS = [4**i for i in range(16)] -generic_size = prom.Histogram( - 'http_response_size_all', SIZE_HELP, - ['endpoint', 'method', 'status'], - buckets=SIZE_BUCKETS, -) - -CONCURRENT_HELP = 'Number of requests currently ongoing' -generic_concurrent = prom.Gauge( - 'http_request_concurrency_all', CONCURRENT_HELP, - ['endpoint', 'method'], -) - - - -def stats(fn): - """Decorator that wraps a handler func to collect metrics. - Adds handler func args as labels, along with 'endpoint' label using func's name, - method and response status where applicable.""" - # We have to jump through some hoops here, because the prometheus client lib demands - # we pre-define our label names, but we don't know the names of the handler kwargs - # until the first time the function's called. So we delay defining the metrics until - # first call. - # In addition, it doesn't let us have different sets of labels with the same name. - # So we record everything twice: Once under a generic name with only endpoint, method - # and status, and once under a name specific to the endpoint with the full set of labels. - metrics = {} - endpoint = fn.__name__ - - @functools.wraps(fn) - def _stats(**kwargs): - if not metrics: - # first call, set up metrics - labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method'] - labels = labels_no_status + ['status'] - metrics['latency'] = prom.Histogram( - 'http_request_latency_{}'.format(endpoint), LATENCY_HELP, - labels, buckets=LATENCY_BUCKETS, - ) - metrics['size'] = prom.Histogram( - 'http_response_size_{}'.format(endpoint), SIZE_HELP, - labels, buckets=SIZE_BUCKETS, - ) - metrics['concurrent'] = prom.Gauge( - 'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP, - labels_no_status, - ) - - request_store.metrics = metrics - request_store.endpoint = endpoint - request_store.method = request.method - request_store.labels = {k: str(v) for k, v in kwargs.items()} - generic_concurrent.labels(endpoint=endpoint, method=request.method).inc() - metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc() - request_store.start_time = monotonic() - return fn(**kwargs) - - return _stats - - -def after_request(response): - """Must be registered to run after requests. Finishes tracking the request - and logs most of the metrics. - We do it in this way, instead of inside the stats wrapper, because it lets flask - normalize the handler result into a Response object. - """ - if 'metrics' not in request_store: - return response # untracked handler - - end_time = monotonic() - metrics = request_store.metrics - endpoint = request_store.endpoint - method = request_store.method - labels = request_store.labels - start_time = request_store.start_time - - generic_concurrent.labels(endpoint=endpoint, method=method).dec() - metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec() - - status = str(response.status_code) - generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time) - metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time) - size = response.calculate_content_length() - if size is not None: - generic_size.labels(endpoint=endpoint, method=method, status=status).observe(size) - metrics['size'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(size) - - return response