diff --git a/common/common/__init__.py b/common/common/__init__.py index cc7f6a1..e961fc1 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -8,7 +8,7 @@ import os import random from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo -from .stats import timed, PromLogCountsHandler, install_stacksampler, request_stats, after_request +from .stats import timed, PromLogCountsHandler, install_stacksampler def dt_to_bustime(start, dt): diff --git a/common/common/flask_stats.py b/common/common/flask_stats.py new file mode 100644 index 0000000..52b2a2a --- /dev/null +++ b/common/common/flask_stats.py @@ -0,0 +1,109 @@ +import functools + +from flask import request +from flask import g as request_store +from monotonic import monotonic +import prometheus_client as prom + + +# Generic metrics that all http requests get logged to (see below for specific metrics per endpoint) + +LATENCY_HELP = "Time taken to run the request handler and create a response" +# buckets: very long playlists / cutting can be quite slow, +# so we have a wider range of latencies than default, up to 10min. +LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600] +generic_latency = prom.Histogram( + 'http_request_latency_all', LATENCY_HELP, + ['endpoint', 'method', 'status'], + buckets=LATENCY_BUCKETS, +) + +SIZE_HELP = 'Size in bytes of response body for non-chunked responses' +# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...) +SIZE_BUCKETS = [4**i for i in range(16)] +generic_size = prom.Histogram( + 'http_response_size_all', SIZE_HELP, + ['endpoint', 'method', 'status'], + buckets=SIZE_BUCKETS, +) + +CONCURRENT_HELP = 'Number of requests currently ongoing' +generic_concurrent = prom.Gauge( + 'http_request_concurrency_all', CONCURRENT_HELP, + ['endpoint', 'method'], +) + + +def request_stats(fn): + """Decorator that wraps a handler func to collect metrics. + Adds handler func args as labels, along with 'endpoint' label using func's name, + method and response status where applicable.""" + # We have to jump through some hoops here, because the prometheus client lib demands + # we pre-define our label names, but we don't know the names of the handler kwargs + # until the first time the function's called. So we delay defining the metrics until + # first call. + # In addition, it doesn't let us have different sets of labels with the same name. + # So we record everything twice: Once under a generic name with only endpoint, method + # and status, and once under a name specific to the endpoint with the full set of labels. + metrics = {} + endpoint = fn.__name__ + + @functools.wraps(fn) + def _stats(**kwargs): + if not metrics: + # first call, set up metrics + labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method'] + labels = labels_no_status + ['status'] + metrics['latency'] = prom.Histogram( + 'http_request_latency_{}'.format(endpoint), LATENCY_HELP, + labels, buckets=LATENCY_BUCKETS, + ) + metrics['size'] = prom.Histogram( + 'http_response_size_{}'.format(endpoint), SIZE_HELP, + labels, buckets=SIZE_BUCKETS, + ) + metrics['concurrent'] = prom.Gauge( + 'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP, + labels_no_status, + ) + + request_store.metrics = metrics + request_store.endpoint = endpoint + request_store.method = request.method + request_store.labels = {k: str(v) for k, v in kwargs.items()} + generic_concurrent.labels(endpoint=endpoint, method=request.method).inc() + metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc() + request_store.start_time = monotonic() + return fn(**kwargs) + + return _stats + + +def after_request(response): + """Must be registered to run after requests. Finishes tracking the request + and logs most of the metrics. + We do it in this way, instead of inside the request_stats wrapper, because it lets flask + normalize the handler result into a Response object. + """ + if 'metrics' not in request_store: + return response # untracked handler + + end_time = monotonic() + metrics = request_store.metrics + endpoint = request_store.endpoint + method = request_store.method + labels = request_store.labels + start_time = request_store.start_time + + generic_concurrent.labels(endpoint=endpoint, method=method).dec() + metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec() + + status = str(response.status_code) + generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time) + metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time) + size = response.calculate_content_length() + if size is not None: + generic_size.labels(endpoint=endpoint, method=method, status=status).observe(size) + metrics['size'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(size) + + return response diff --git a/common/common/stats.py b/common/common/stats.py index b255c97..bd3a854 100644 --- a/common/common/stats.py +++ b/common/common/stats.py @@ -7,8 +7,6 @@ import signal import sys import gevent.lock -from flask import request -from flask import g as request_store from monotonic import monotonic import prometheus_client as prom @@ -240,104 +238,4 @@ def install_stacksampler(interval=0.005): signal.setitimer(signal.ITIMER_VIRTUAL, interval) -# Generic metrics that all http requests get logged to (see below for specific metrics per endpoint) - -LATENCY_HELP = "Time taken to run the request handler and create a response" -# buckets: very long playlists / cutting can be quite slow, -# so we have a wider range of latencies than default, up to 10min. -LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600] -generic_latency = prom.Histogram( - 'http_request_latency_all', LATENCY_HELP, - ['endpoint', 'method', 'status'], - buckets=LATENCY_BUCKETS, -) - -SIZE_HELP = 'Size in bytes of response body for non-chunked responses' -# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...) -SIZE_BUCKETS = [4**i for i in range(16)] -generic_size = prom.Histogram( - 'http_response_size_all', SIZE_HELP, - ['endpoint', 'method', 'status'], - buckets=SIZE_BUCKETS, -) - -CONCURRENT_HELP = 'Number of requests currently ongoing' -generic_concurrent = prom.Gauge( - 'http_request_concurrency_all', CONCURRENT_HELP, - ['endpoint', 'method'], -) - - -def request_stats(fn): - """Decorator that wraps a handler func to collect metrics. - Adds handler func args as labels, along with 'endpoint' label using func's name, - method and response status where applicable.""" - # We have to jump through some hoops here, because the prometheus client lib demands - # we pre-define our label names, but we don't know the names of the handler kwargs - # until the first time the function's called. So we delay defining the metrics until - # first call. - # In addition, it doesn't let us have different sets of labels with the same name. - # So we record everything twice: Once under a generic name with only endpoint, method - # and status, and once under a name specific to the endpoint with the full set of labels. - metrics = {} - endpoint = fn.__name__ - - @functools.wraps(fn) - def _stats(**kwargs): - if not metrics: - # first call, set up metrics - labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method'] - labels = labels_no_status + ['status'] - metrics['latency'] = prom.Histogram( - 'http_request_latency_{}'.format(endpoint), LATENCY_HELP, - labels, buckets=LATENCY_BUCKETS, - ) - metrics['size'] = prom.Histogram( - 'http_response_size_{}'.format(endpoint), SIZE_HELP, - labels, buckets=SIZE_BUCKETS, - ) - metrics['concurrent'] = prom.Gauge( - 'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP, - labels_no_status, - ) - - request_store.metrics = metrics - request_store.endpoint = endpoint - request_store.method = request.method - request_store.labels = {k: str(v) for k, v in kwargs.items()} - generic_concurrent.labels(endpoint=endpoint, method=request.method).inc() - metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc() - request_store.start_time = monotonic() - return fn(**kwargs) - return _stats - - -def after_request(response): - """Must be registered to run after requests. Finishes tracking the request - and logs most of the metrics. - We do it in this way, instead of inside the request_stats wrapper, because it lets flask - normalize the handler result into a Response object. - """ - if 'metrics' not in request_store: - return response # untracked handler - - end_time = monotonic() - metrics = request_store.metrics - endpoint = request_store.endpoint - method = request_store.method - labels = request_store.labels - start_time = request_store.start_time - - generic_concurrent.labels(endpoint=endpoint, method=method).dec() - metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec() - - status = str(response.status_code) - generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time) - metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time) - size = response.calculate_content_length() - if size is not None: - generic_size.labels(endpoint=endpoint, method=method, status=status).observe(size) - metrics['size'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(size) - - return response diff --git a/common/setup.py b/common/setup.py index 471c214..ae2a724 100644 --- a/common/setup.py +++ b/common/setup.py @@ -6,7 +6,6 @@ setup( packages = find_packages(), install_requires = [ "gevent", - "flask", "monotonic", "prometheus-client", ], diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index f18d8cc..3ecef7f 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -13,8 +13,8 @@ import prometheus_client as prom from flask import Flask, url_for, request, abort, Response from gevent.pywsgi import WSGIServer -import common.dateutil -from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler, request_stats, after_request +from common import dateutil, get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler +from common.flask_stats import request_stats, after_request import generate_hls @@ -162,8 +162,8 @@ def generate_master_playlist(channel): start, end: The time to begin and end the stream at. See generate_media_playlist for details. """ - start = common.dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None - end = common.dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None + start = dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None + end = dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None qualities = listdir(os.path.join(app.static_folder, channel)) playlists = {} @@ -201,8 +201,8 @@ def generate_media_playlist(channel, quality): if not os.path.isdir(hours_path): abort(404) - start = common.dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None - end = common.dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None + start = dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None + end = dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None if start is None or end is None: # If start or end are not given, use the earliest/latest time available first, last = time_range_for_quality(channel, quality) @@ -236,8 +236,8 @@ def cut(channel, quality): Set to true by passing "true" (case insensitive). Even if holes are allowed, a 406 may result if the resulting video would be empty. """ - start = common.dateutil.parse_utc_only(request.args['start']) - end = common.dateutil.parse_utc_only(request.args['end']) + start = dateutil.parse_utc_only(request.args['start']) + end = dateutil.parse_utc_only(request.args['end']) if end <= start: return "End must be after start", 400 diff --git a/thrimshim/thrimshim/main.py b/thrimshim/thrimshim/main.py index 2d89380..39543f9 100644 --- a/thrimshim/thrimshim/main.py +++ b/thrimshim/thrimshim/main.py @@ -12,7 +12,8 @@ import prometheus_client import psycopg2 from psycopg2 import sql -from common import database, PromLogCountsHandler, install_stacksampler, request_stats, after_request +from common import database, PromLogCountsHandler, install_stacksampler +from common.flask_stats import request_stats, after_request psycopg2.extras.register_uuid() app = flask.Flask('thrimshim')