common: Split stats-related stuff into its own module

We still import them into __init__.py so they're accessible externally just the same
mike/common/split-up
Mike Lang 6 years ago
parent 84b611055b
commit 3edc27cfe6

@ -5,7 +5,6 @@
import base64
import datetime
import errno
import functools
import itertools
import logging
import os
@ -14,158 +13,8 @@ import sys
from collections import namedtuple
import dateutil.parser
import prometheus_client as prom
from monotonic import monotonic
def timed(name=None,
buckets=[10.**x for x in range(-9, 5)], normalized_buckets=None,
normalize=None,
**labels
):
"""Decorator that instruments wrapped function to record real, user and system time
as a prometheus histogram.
Metrics are recorded as NAME_latency, NAME_cputime{type=user} and NAME_cputime{type=system}
respectively. User and system time are process-wide (which means they'll be largely meaningless
if you're using gevent and the wrapped function blocks) and do not include subprocesses.
NAME defaults to the wrapped function's name.
Any labels passed in are included. Given label values may be callable, in which case
they are passed the input and result from the wrapped function and should return a label value.
Otherwise the given label value is used directly. All label values are automatically str()'d.
In addition, the "error" label is automatically included, and set to "" if no exception
occurs, or the name of the exception type if one does.
The normalize argument, if given, causes the creation of a second set of metrics
NAME_normalized_latency, etc. The normalize argument should be a callable which
takes the input and result of the wrapped function and returns a normalization factor.
All normalized metrics divide the observed times by this factor.
The intent is to allow a function which is expected to take longer given a larger input
to be timed on a per-input basis.
As a special case, when normalize returns 0 or None, normalized metrics are not updated.
The buckets kwarg is as per prometheus_client.Histogram. The default is a conservative
but sparse range covering nanoseconds to hours.
The normalized_buckets kwarg applies to the normalized metrics, and defaults to the same
as buckets.
All callables that take inputs and result take them as follows: The first arg is the result,
followed by *args and **kwargs as per the function's inputs.
If the wrapped function errored, result is None.
To simplify error handling in these functions, any errors are taken to mean None,
and None is interpreted as '' for label values.
Contrived Example:
@timed("scanner",
# constant label
foo="my example label",
# label dependent on input
all=lambda results, predicate, list, find_all=False: find_all,
# label dependent on output
found=lambda results, *a, **k: len(found) > 0,
# normalized on input
normalize=lambda results, predicate, list, **k: len(list),
)
def scanner(predicate, list, find_all=False):
results = []
for item in list:
if predicate(item):
results.append(item)
if not find_all:
break
return results
"""
if normalized_buckets is None:
normalized_buckets = buckets
# convert constant (non-callable) values into callables for consistency
labels = {
# there's a pyflakes bug here suggesting that v is undefined, but it isn't
k: v if callable(v) else (lambda *a, **k: v)
for k, v in labels.items()
}
def _timed(fn):
# can't safely assign to name inside closure, we use a new _name variable instead
_name = fn.__name__ if name is None else name
latency = prom.Histogram(
"{}_latency".format(_name),
"Wall clock time taken to execute {}".format(_name),
labels.keys() + ['error'],
buckets=buckets,
)
cputime = prom.Histogram(
"{}_cputime".format(_name),
"Process-wide consumed CPU time during execution of {}".format(_name),
labels.keys() + ['error', 'type'],
buckets=buckets,
)
if normalize:
normal_latency = prom.Histogram(
"{}_latency_normalized".format(_name),
"Wall clock time taken to execute {} per unit of work".format(_name),
labels.keys() + ['error'],
buckets=normalized_buckets,
)
normal_cputime = prom.Histogram(
"{}_cputime_normalized".format(_name),
"Process-wide consumed CPU time during execution of {} per unit of work".format(_name),
labels.keys() + ['error', 'type'],
buckets=normalized_buckets,
)
@functools.wraps(fn)
def wrapper(*args, **kwargs):
start_monotonic = monotonic()
start_user, start_sys, _, _, _ = os.times()
try:
ret = fn(*args, **kwargs)
except Exception:
ret = None
error_type, error, tb = sys.exc_info()
else:
error = None
end_monotonic = monotonic()
end_user, end_sys, _, _, _ = os.times()
wall_time = end_monotonic - start_monotonic
user_time = end_user - start_user
sys_time = end_sys - start_sys
label_values = {}
for k, v in labels.items():
try:
value = v(ret, *args, **kwargs)
except Exception:
value = None
label_values[k] = '' if value is None else str(value)
label_values.update(error='' if error is None else type(error).__name__)
latency.labels(**label_values).observe(wall_time)
cputime.labels(type='user', **label_values).observe(user_time)
cputime.labels(type='system', **label_values).observe(sys_time)
if normalize:
try:
factor = normalize(ret, *args, **kwargs)
except Exception:
factor = None
if factor is not None and factor > 0:
normal_latency.labels(**label_values).observe(wall_time / factor)
normal_cputime.labels(type='user', **label_values).observe(user_time / factor)
normal_cputime.labels(type='system', **label_values).observe(sys_time / factor)
if error is None:
return ret
raise error_type, error, tb # re-raise error with original traceback
return wrapper
return _timed
from .stats import timed, PromLogCountsHandler
def dt_to_bustime(start, dt):
@ -457,16 +306,3 @@ def encode_strings(o):
if isinstance(o, unicode):
return o.encode('utf-8')
return o
log_count = prom.Counter("log_count", "Count of messages logged", ["level", "module", "function"])
class PromLogCountsHandler(logging.Handler):
"""A logging handler that records a count of logs by level, module and function."""
def emit(self, record):
log_count.labels(record.levelname, record.module, record.funcName).inc()
@classmethod
def install(cls):
root_logger = logging.getLogger()
root_logger.addHandler(cls())

@ -0,0 +1,174 @@
"""A place for common utilities between wubloader components"""
import functools
import logging
import os
import sys
import prometheus_client as prom
from monotonic import monotonic
def timed(name=None,
buckets=[10.**x for x in range(-9, 5)], normalized_buckets=None,
normalize=None,
**labels
):
"""Decorator that instruments wrapped function to record real, user and system time
as a prometheus histogram.
Metrics are recorded as NAME_latency, NAME_cputime{type=user} and NAME_cputime{type=system}
respectively. User and system time are process-wide (which means they'll be largely meaningless
if you're using gevent and the wrapped function blocks) and do not include subprocesses.
NAME defaults to the wrapped function's name.
Any labels passed in are included. Given label values may be callable, in which case
they are passed the input and result from the wrapped function and should return a label value.
Otherwise the given label value is used directly. All label values are automatically str()'d.
In addition, the "error" label is automatically included, and set to "" if no exception
occurs, or the name of the exception type if one does.
The normalize argument, if given, causes the creation of a second set of metrics
NAME_normalized_latency, etc. The normalize argument should be a callable which
takes the input and result of the wrapped function and returns a normalization factor.
All normalized metrics divide the observed times by this factor.
The intent is to allow a function which is expected to take longer given a larger input
to be timed on a per-input basis.
As a special case, when normalize returns 0 or None, normalized metrics are not updated.
The buckets kwarg is as per prometheus_client.Histogram. The default is a conservative
but sparse range covering nanoseconds to hours.
The normalized_buckets kwarg applies to the normalized metrics, and defaults to the same
as buckets.
All callables that take inputs and result take them as follows: The first arg is the result,
followed by *args and **kwargs as per the function's inputs.
If the wrapped function errored, result is None.
To simplify error handling in these functions, any errors are taken to mean None,
and None is interpreted as '' for label values.
Contrived Example:
@timed("scanner",
# constant label
foo="my example label",
# label dependent on input
all=lambda results, predicate, list, find_all=False: find_all,
# label dependent on output
found=lambda results, *a, **k: len(found) > 0,
# normalized on input
normalize=lambda results, predicate, list, **k: len(list),
)
def scanner(predicate, list, find_all=False):
results = []
for item in list:
if predicate(item):
results.append(item)
if not find_all:
break
return results
"""
if normalized_buckets is None:
normalized_buckets = buckets
# convert constant (non-callable) values into callables for consistency
labels = {
# there's a pyflakes bug here suggesting that v is undefined, but it isn't
k: v if callable(v) else (lambda *a, **k: v)
for k, v in labels.items()
}
def _timed(fn):
# can't safely assign to name inside closure, we use a new _name variable instead
_name = fn.__name__ if name is None else name
latency = prom.Histogram(
"{}_latency".format(_name),
"Wall clock time taken to execute {}".format(_name),
labels.keys() + ['error'],
buckets=buckets,
)
cputime = prom.Histogram(
"{}_cputime".format(_name),
"Process-wide consumed CPU time during execution of {}".format(_name),
labels.keys() + ['error', 'type'],
buckets=buckets,
)
if normalize:
normal_latency = prom.Histogram(
"{}_latency_normalized".format(_name),
"Wall clock time taken to execute {} per unit of work".format(_name),
labels.keys() + ['error'],
buckets=normalized_buckets,
)
normal_cputime = prom.Histogram(
"{}_cputime_normalized".format(_name),
"Process-wide consumed CPU time during execution of {} per unit of work".format(_name),
labels.keys() + ['error', 'type'],
buckets=normalized_buckets,
)
@functools.wraps(fn)
def wrapper(*args, **kwargs):
start_monotonic = monotonic()
start_user, start_sys, _, _, _ = os.times()
try:
ret = fn(*args, **kwargs)
except Exception:
ret = None
error_type, error, tb = sys.exc_info()
else:
error = None
end_monotonic = monotonic()
end_user, end_sys, _, _, _ = os.times()
wall_time = end_monotonic - start_monotonic
user_time = end_user - start_user
sys_time = end_sys - start_sys
label_values = {}
for k, v in labels.items():
try:
value = v(ret, *args, **kwargs)
except Exception:
value = None
label_values[k] = '' if value is None else str(value)
label_values.update(error='' if error is None else type(error).__name__)
latency.labels(**label_values).observe(wall_time)
cputime.labels(type='user', **label_values).observe(user_time)
cputime.labels(type='system', **label_values).observe(sys_time)
if normalize:
try:
factor = normalize(ret, *args, **kwargs)
except Exception:
factor = None
if factor is not None and factor > 0:
normal_latency.labels(**label_values).observe(wall_time / factor)
normal_cputime.labels(type='user', **label_values).observe(user_time / factor)
normal_cputime.labels(type='system', **label_values).observe(sys_time / factor)
if error is None:
return ret
raise error_type, error, tb # re-raise error with original traceback
return wrapper
return _timed
log_count = prom.Counter("log_count", "Count of messages logged", ["level", "module", "function"])
class PromLogCountsHandler(logging.Handler):
"""A logging handler that records a count of logs by level, module and function."""
def emit(self, record):
log_count.labels(record.levelname, record.module, record.funcName).inc()
@classmethod
def install(cls):
root_logger = logging.getLogger()
root_logger.addHandler(cls())
Loading…
Cancel
Save