moved flask monitoring to its own module

pull/68/head
Christopher Usher 5 years ago
parent 73541f852f
commit 76bc629720

@ -8,7 +8,7 @@ import os
import random
from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo
from .stats import timed, PromLogCountsHandler, install_stacksampler, request_stats, after_request
from .stats import timed, PromLogCountsHandler, install_stacksampler
def dt_to_bustime(start, dt):

@ -0,0 +1,109 @@
import functools
from flask import request
from flask import g as request_store
from monotonic import monotonic
import prometheus_client as prom
# Generic metrics that all http requests get logged to (see below for specific metrics per endpoint)
LATENCY_HELP = "Time taken to run the request handler and create a response"
# buckets: very long playlists / cutting can be quite slow,
# so we have a wider range of latencies than default, up to 10min.
LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600]
generic_latency = prom.Histogram(
'http_request_latency_all', LATENCY_HELP,
['endpoint', 'method', 'status'],
buckets=LATENCY_BUCKETS,
)
SIZE_HELP = 'Size in bytes of response body for non-chunked responses'
# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...)
SIZE_BUCKETS = [4**i for i in range(16)]
generic_size = prom.Histogram(
'http_response_size_all', SIZE_HELP,
['endpoint', 'method', 'status'],
buckets=SIZE_BUCKETS,
)
CONCURRENT_HELP = 'Number of requests currently ongoing'
generic_concurrent = prom.Gauge(
'http_request_concurrency_all', CONCURRENT_HELP,
['endpoint', 'method'],
)
def request_stats(fn):
"""Decorator that wraps a handler func to collect metrics.
Adds handler func args as labels, along with 'endpoint' label using func's name,
method and response status where applicable."""
# We have to jump through some hoops here, because the prometheus client lib demands
# we pre-define our label names, but we don't know the names of the handler kwargs
# until the first time the function's called. So we delay defining the metrics until
# first call.
# In addition, it doesn't let us have different sets of labels with the same name.
# So we record everything twice: Once under a generic name with only endpoint, method
# and status, and once under a name specific to the endpoint with the full set of labels.
metrics = {}
endpoint = fn.__name__
@functools.wraps(fn)
def _stats(**kwargs):
if not metrics:
# first call, set up metrics
labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method']
labels = labels_no_status + ['status']
metrics['latency'] = prom.Histogram(
'http_request_latency_{}'.format(endpoint), LATENCY_HELP,
labels, buckets=LATENCY_BUCKETS,
)
metrics['size'] = prom.Histogram(
'http_response_size_{}'.format(endpoint), SIZE_HELP,
labels, buckets=SIZE_BUCKETS,
)
metrics['concurrent'] = prom.Gauge(
'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP,
labels_no_status,
)
request_store.metrics = metrics
request_store.endpoint = endpoint
request_store.method = request.method
request_store.labels = {k: str(v) for k, v in kwargs.items()}
generic_concurrent.labels(endpoint=endpoint, method=request.method).inc()
metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc()
request_store.start_time = monotonic()
return fn(**kwargs)
return _stats
def after_request(response):
"""Must be registered to run after requests. Finishes tracking the request
and logs most of the metrics.
We do it in this way, instead of inside the request_stats wrapper, because it lets flask
normalize the handler result into a Response object.
"""
if 'metrics' not in request_store:
return response # untracked handler
end_time = monotonic()
metrics = request_store.metrics
endpoint = request_store.endpoint
method = request_store.method
labels = request_store.labels
start_time = request_store.start_time
generic_concurrent.labels(endpoint=endpoint, method=method).dec()
metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec()
status = str(response.status_code)
generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time)
metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time)
size = response.calculate_content_length()
if size is not None:
generic_size.labels(endpoint=endpoint, method=method, status=status).observe(size)
metrics['size'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(size)
return response

@ -7,8 +7,6 @@ import signal
import sys
import gevent.lock
from flask import request
from flask import g as request_store
from monotonic import monotonic
import prometheus_client as prom
@ -240,104 +238,4 @@ def install_stacksampler(interval=0.005):
signal.setitimer(signal.ITIMER_VIRTUAL, interval)
# Generic metrics that all http requests get logged to (see below for specific metrics per endpoint)
LATENCY_HELP = "Time taken to run the request handler and create a response"
# buckets: very long playlists / cutting can be quite slow,
# so we have a wider range of latencies than default, up to 10min.
LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600]
generic_latency = prom.Histogram(
'http_request_latency_all', LATENCY_HELP,
['endpoint', 'method', 'status'],
buckets=LATENCY_BUCKETS,
)
SIZE_HELP = 'Size in bytes of response body for non-chunked responses'
# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...)
SIZE_BUCKETS = [4**i for i in range(16)]
generic_size = prom.Histogram(
'http_response_size_all', SIZE_HELP,
['endpoint', 'method', 'status'],
buckets=SIZE_BUCKETS,
)
CONCURRENT_HELP = 'Number of requests currently ongoing'
generic_concurrent = prom.Gauge(
'http_request_concurrency_all', CONCURRENT_HELP,
['endpoint', 'method'],
)
def request_stats(fn):
"""Decorator that wraps a handler func to collect metrics.
Adds handler func args as labels, along with 'endpoint' label using func's name,
method and response status where applicable."""
# We have to jump through some hoops here, because the prometheus client lib demands
# we pre-define our label names, but we don't know the names of the handler kwargs
# until the first time the function's called. So we delay defining the metrics until
# first call.
# In addition, it doesn't let us have different sets of labels with the same name.
# So we record everything twice: Once under a generic name with only endpoint, method
# and status, and once under a name specific to the endpoint with the full set of labels.
metrics = {}
endpoint = fn.__name__
@functools.wraps(fn)
def _stats(**kwargs):
if not metrics:
# first call, set up metrics
labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method']
labels = labels_no_status + ['status']
metrics['latency'] = prom.Histogram(
'http_request_latency_{}'.format(endpoint), LATENCY_HELP,
labels, buckets=LATENCY_BUCKETS,
)
metrics['size'] = prom.Histogram(
'http_response_size_{}'.format(endpoint), SIZE_HELP,
labels, buckets=SIZE_BUCKETS,
)
metrics['concurrent'] = prom.Gauge(
'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP,
labels_no_status,
)
request_store.metrics = metrics
request_store.endpoint = endpoint
request_store.method = request.method
request_store.labels = {k: str(v) for k, v in kwargs.items()}
generic_concurrent.labels(endpoint=endpoint, method=request.method).inc()
metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc()
request_store.start_time = monotonic()
return fn(**kwargs)
return _stats
def after_request(response):
"""Must be registered to run after requests. Finishes tracking the request
and logs most of the metrics.
We do it in this way, instead of inside the request_stats wrapper, because it lets flask
normalize the handler result into a Response object.
"""
if 'metrics' not in request_store:
return response # untracked handler
end_time = monotonic()
metrics = request_store.metrics
endpoint = request_store.endpoint
method = request_store.method
labels = request_store.labels
start_time = request_store.start_time
generic_concurrent.labels(endpoint=endpoint, method=method).dec()
metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec()
status = str(response.status_code)
generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time)
metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time)
size = response.calculate_content_length()
if size is not None:
generic_size.labels(endpoint=endpoint, method=method, status=status).observe(size)
metrics['size'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(size)
return response

@ -6,7 +6,6 @@ setup(
packages = find_packages(),
install_requires = [
"gevent",
"flask",
"monotonic",
"prometheus-client",
],

@ -13,8 +13,8 @@ import prometheus_client as prom
from flask import Flask, url_for, request, abort, Response
from gevent.pywsgi import WSGIServer
import common.dateutil
from common import get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler, request_stats, after_request
from common import dateutil, get_best_segments, cut_segments, PromLogCountsHandler, install_stacksampler
from common.flask_stats import request_stats, after_request
import generate_hls
@ -162,8 +162,8 @@ def generate_master_playlist(channel):
start, end: The time to begin and end the stream at.
See generate_media_playlist for details.
"""
start = common.dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None
end = common.dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None
start = dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None
end = dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None
qualities = listdir(os.path.join(app.static_folder, channel))
playlists = {}
@ -201,8 +201,8 @@ def generate_media_playlist(channel, quality):
if not os.path.isdir(hours_path):
abort(404)
start = common.dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None
end = common.dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None
start = dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None
end = dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None
if start is None or end is None:
# If start or end are not given, use the earliest/latest time available
first, last = time_range_for_quality(channel, quality)
@ -236,8 +236,8 @@ def cut(channel, quality):
Set to true by passing "true" (case insensitive).
Even if holes are allowed, a 406 may result if the resulting video would be empty.
"""
start = common.dateutil.parse_utc_only(request.args['start'])
end = common.dateutil.parse_utc_only(request.args['end'])
start = dateutil.parse_utc_only(request.args['start'])
end = dateutil.parse_utc_only(request.args['end'])
if end <= start:
return "End must be after start", 400

@ -12,7 +12,8 @@ import prometheus_client
import psycopg2
from psycopg2 import sql
from common import database, PromLogCountsHandler, install_stacksampler, request_stats, after_request
from common import database, PromLogCountsHandler, install_stacksampler
from common.flask_stats import request_stats, after_request
psycopg2.extras.register_uuid()
app = flask.Flask('thrimshim')

Loading…
Cancel
Save