From 7525b7c13570edd4f1caa42395fa96c811435529 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 3 Jan 2019 21:10:23 -0800 Subject: [PATCH] restreamer: Add basic prometheus stats to all endpoints I had to go to some effort to get nice labelling, which also meant none of the existing libs for this were any good, but this works well enough. Exposes the metrics on /metrics. --- restreamer/restreamer/main.py | 14 ++++++ restreamer/restreamer/stats.py | 80 ++++++++++++++++++++++++++++++++++ restreamer/setup.py | 1 + 3 files changed, 95 insertions(+) create mode 100644 restreamer/restreamer/stats.py diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 7fba30b..9b354ef 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -11,6 +11,7 @@ from contextlib import closing import dateutil.parser import gevent +import prometheus_client as prom from flask import Flask, url_for, request, abort, Response from gevent import subprocess from gevent.pywsgi import WSGIServer @@ -18,9 +19,11 @@ from gevent.pywsgi import WSGIServer from common import get_best_segments import generate_hls +from stats import stats, after_request app = Flask('restreamer', static_url_path='/segments') +app.after_request(after_request) """ @@ -80,6 +83,13 @@ def cors(app): return handle +@app.route('/metrics') +@stats +def metrics(): + """Return current metrics in prometheus metrics format""" + return prom.generate_latest() + + @app.route('/files//') @has_path_args def list_hours(stream, variant): @@ -95,6 +105,7 @@ def list_hours(stream, variant): @app.route('/files///') +@stats @has_path_args def list_segments(stream, variant, hour): """Returns a JSON list of segment files for a given stream, variant and hour. @@ -121,6 +132,7 @@ def time_range_for_variant(stream, variant): @app.route('/playlist/.m3u8') +@stats @has_path_args def generate_master_playlist(stream): """Returns a HLS master playlist for the given stream. @@ -150,6 +162,7 @@ def generate_master_playlist(stream): @app.route('/playlist//.m3u8') +@stats @has_path_args def generate_media_playlist(stream, variant): """Returns a HLS media playlist for the given stream and variant. @@ -189,6 +202,7 @@ def generate_media_playlist(stream, variant): @app.route('/cut//.ts') +@stats @has_path_args def cut(stream, variant): """Return a MPEGTS video file covering the exact timestamp range. diff --git a/restreamer/restreamer/stats.py b/restreamer/restreamer/stats.py new file mode 100644 index 0000000..b1e2a24 --- /dev/null +++ b/restreamer/restreamer/stats.py @@ -0,0 +1,80 @@ + +import functools + +import prometheus_client as prom +from flask import request +from flask import g as request_store +from monotonic import monotonic + + +def stats(fn): + """Decorator that wraps a handler func to collect metrics. + Adds handler func args as labels, along with 'endpoint' label using func's name, + method and response status where applicable.""" + # We have to jump through some hoops here, because the prometheus client lib demands + # we pre-define our label names, but we don't know the names of the handler kwargs + # until the first time the function's called. So we delay defining the metrics until + # first call. + metrics = {} + endpoint = fn.__name__ + + @functools.wraps(fn) + def _stats(**kwargs): + if not metrics: + # first call, set up metrics + labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method'] + labels = labels_no_status + ['status'] + metrics['latency'] = prom.Histogram( + 'http_request_latency', + 'Time taken to run the request handler and create a response', + labels, + # buckets: very long playlists / cutting can be quite slow, + # so we have a wider range of latencies than default, up to 10min. + buckets=[.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600], + ) + metrics['size'] = prom.Histogram( + 'http_response_size', + 'Size in bytes of response body for non-chunked responses', + labels, + # buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...) + buckets=[4**i for i in range(16)], + ) + metrics['concurrent'] = prom.Gauge( + 'http_request_concurrency', + 'Number of requests currently ongoing', + labels_no_status, + ) + + request_store.metrics = metrics + request_store.labels = {k: str(v) for k, v in kwargs.items()} + request_store.labels.update(endpoint=endpoint, method=request.method) + metrics['concurrent'].labels(**request_store.labels).inc() + request_store.start_time = monotonic() + return fn(**kwargs) + + return _stats + + +def after_request(response): + """Must be registered to run after requests. Finishes tracking the request + and logs most of the metrics. + We do it in this way, instead of inside the stats wrapper, because it lets flask + normalize the handler result into a Response object. + """ + if 'metrics' not in request_store: + return # untracked handler + + end_time = monotonic() + metrics = request_store.metrics + labels = request_store.labels + start_time = request_store.start_time + + metrics['concurrent'].labels(**labels).dec() + + labels['status'] = str(response.status_code) + metrics['latency'].labels(**labels).observe(end_time - start_time) + size = response.calculate_content_length() + if size is not None: + metrics['size'].labels(**labels).observe(size) + + return response diff --git a/restreamer/setup.py b/restreamer/setup.py index 999c956..7e1344e 100644 --- a/restreamer/setup.py +++ b/restreamer/setup.py @@ -8,6 +8,7 @@ setup( "argh", "flask", "gevent", + "prometheus-client", "wubloader-common", ], )