From 3edc27cfe6debad5cfced636e8ec175b5896a741 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 7 Jan 2019 00:15:25 -0800 Subject: [PATCH] common: Split stats-related stuff into its own module We still import them into __init__.py so they're accessible externally just the same --- common/common/__init__.py | 166 +----------------------------------- common/common/stats.py | 174 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 165 deletions(-) create mode 100644 common/common/stats.py diff --git a/common/common/__init__.py b/common/common/__init__.py index 7053bef..90be4d9 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -5,7 +5,6 @@ import base64 import datetime import errno -import functools import itertools import logging import os @@ -14,158 +13,8 @@ import sys from collections import namedtuple import dateutil.parser -import prometheus_client as prom -from monotonic import monotonic - -def timed(name=None, - buckets=[10.**x for x in range(-9, 5)], normalized_buckets=None, - normalize=None, - **labels -): - """Decorator that instruments wrapped function to record real, user and system time - as a prometheus histogram. - - Metrics are recorded as NAME_latency, NAME_cputime{type=user} and NAME_cputime{type=system} - respectively. User and system time are process-wide (which means they'll be largely meaningless - if you're using gevent and the wrapped function blocks) and do not include subprocesses. - - NAME defaults to the wrapped function's name. - - Any labels passed in are included. Given label values may be callable, in which case - they are passed the input and result from the wrapped function and should return a label value. - Otherwise the given label value is used directly. All label values are automatically str()'d. - - In addition, the "error" label is automatically included, and set to "" if no exception - occurs, or the name of the exception type if one does. - - The normalize argument, if given, causes the creation of a second set of metrics - NAME_normalized_latency, etc. The normalize argument should be a callable which - takes the input and result of the wrapped function and returns a normalization factor. - All normalized metrics divide the observed times by this factor. - The intent is to allow a function which is expected to take longer given a larger input - to be timed on a per-input basis. - As a special case, when normalize returns 0 or None, normalized metrics are not updated. - - The buckets kwarg is as per prometheus_client.Histogram. The default is a conservative - but sparse range covering nanoseconds to hours. - The normalized_buckets kwarg applies to the normalized metrics, and defaults to the same - as buckets. - - All callables that take inputs and result take them as follows: The first arg is the result, - followed by *args and **kwargs as per the function's inputs. - If the wrapped function errored, result is None. - To simplify error handling in these functions, any errors are taken to mean None, - and None is interpreted as '' for label values. - - Contrived Example: - @timed("scanner", - # constant label - foo="my example label", - # label dependent on input - all=lambda results, predicate, list, find_all=False: find_all, - # label dependent on output - found=lambda results, *a, **k: len(found) > 0, - # normalized on input - normalize=lambda results, predicate, list, **k: len(list), - ) - def scanner(predicate, list, find_all=False): - results = [] - for item in list: - if predicate(item): - results.append(item) - if not find_all: - break - return results - """ - - if normalized_buckets is None: - normalized_buckets = buckets - # convert constant (non-callable) values into callables for consistency - labels = { - # there's a pyflakes bug here suggesting that v is undefined, but it isn't - k: v if callable(v) else (lambda *a, **k: v) - for k, v in labels.items() - } - - def _timed(fn): - # can't safely assign to name inside closure, we use a new _name variable instead - _name = fn.__name__ if name is None else name - - latency = prom.Histogram( - "{}_latency".format(_name), - "Wall clock time taken to execute {}".format(_name), - labels.keys() + ['error'], - buckets=buckets, - ) - cputime = prom.Histogram( - "{}_cputime".format(_name), - "Process-wide consumed CPU time during execution of {}".format(_name), - labels.keys() + ['error', 'type'], - buckets=buckets, - ) - if normalize: - normal_latency = prom.Histogram( - "{}_latency_normalized".format(_name), - "Wall clock time taken to execute {} per unit of work".format(_name), - labels.keys() + ['error'], - buckets=normalized_buckets, - ) - normal_cputime = prom.Histogram( - "{}_cputime_normalized".format(_name), - "Process-wide consumed CPU time during execution of {} per unit of work".format(_name), - labels.keys() + ['error', 'type'], - buckets=normalized_buckets, - ) - - @functools.wraps(fn) - def wrapper(*args, **kwargs): - start_monotonic = monotonic() - start_user, start_sys, _, _, _ = os.times() - - try: - ret = fn(*args, **kwargs) - except Exception: - ret = None - error_type, error, tb = sys.exc_info() - else: - error = None - - end_monotonic = monotonic() - end_user, end_sys, _, _, _ = os.times() - wall_time = end_monotonic - start_monotonic - user_time = end_user - start_user - sys_time = end_sys - start_sys - - label_values = {} - for k, v in labels.items(): - try: - value = v(ret, *args, **kwargs) - except Exception: - value = None - label_values[k] = '' if value is None else str(value) - label_values.update(error='' if error is None else type(error).__name__) - - latency.labels(**label_values).observe(wall_time) - cputime.labels(type='user', **label_values).observe(user_time) - cputime.labels(type='system', **label_values).observe(sys_time) - if normalize: - try: - factor = normalize(ret, *args, **kwargs) - except Exception: - factor = None - if factor is not None and factor > 0: - normal_latency.labels(**label_values).observe(wall_time / factor) - normal_cputime.labels(type='user', **label_values).observe(user_time / factor) - normal_cputime.labels(type='system', **label_values).observe(sys_time / factor) - - if error is None: - return ret - raise error_type, error, tb # re-raise error with original traceback - - return wrapper - - return _timed +from .stats import timed, PromLogCountsHandler def dt_to_bustime(start, dt): @@ -457,16 +306,3 @@ def encode_strings(o): if isinstance(o, unicode): return o.encode('utf-8') return o - - -log_count = prom.Counter("log_count", "Count of messages logged", ["level", "module", "function"]) - -class PromLogCountsHandler(logging.Handler): - """A logging handler that records a count of logs by level, module and function.""" - def emit(self, record): - log_count.labels(record.levelname, record.module, record.funcName).inc() - - @classmethod - def install(cls): - root_logger = logging.getLogger() - root_logger.addHandler(cls()) diff --git a/common/common/stats.py b/common/common/stats.py new file mode 100644 index 0000000..cc2394b --- /dev/null +++ b/common/common/stats.py @@ -0,0 +1,174 @@ + +"""A place for common utilities between wubloader components""" + + +import functools +import logging +import os +import sys + +import prometheus_client as prom +from monotonic import monotonic + + +def timed(name=None, + buckets=[10.**x for x in range(-9, 5)], normalized_buckets=None, + normalize=None, + **labels +): + """Decorator that instruments wrapped function to record real, user and system time + as a prometheus histogram. + + Metrics are recorded as NAME_latency, NAME_cputime{type=user} and NAME_cputime{type=system} + respectively. User and system time are process-wide (which means they'll be largely meaningless + if you're using gevent and the wrapped function blocks) and do not include subprocesses. + + NAME defaults to the wrapped function's name. + + Any labels passed in are included. Given label values may be callable, in which case + they are passed the input and result from the wrapped function and should return a label value. + Otherwise the given label value is used directly. All label values are automatically str()'d. + + In addition, the "error" label is automatically included, and set to "" if no exception + occurs, or the name of the exception type if one does. + + The normalize argument, if given, causes the creation of a second set of metrics + NAME_normalized_latency, etc. The normalize argument should be a callable which + takes the input and result of the wrapped function and returns a normalization factor. + All normalized metrics divide the observed times by this factor. + The intent is to allow a function which is expected to take longer given a larger input + to be timed on a per-input basis. + As a special case, when normalize returns 0 or None, normalized metrics are not updated. + + The buckets kwarg is as per prometheus_client.Histogram. The default is a conservative + but sparse range covering nanoseconds to hours. + The normalized_buckets kwarg applies to the normalized metrics, and defaults to the same + as buckets. + + All callables that take inputs and result take them as follows: The first arg is the result, + followed by *args and **kwargs as per the function's inputs. + If the wrapped function errored, result is None. + To simplify error handling in these functions, any errors are taken to mean None, + and None is interpreted as '' for label values. + + Contrived Example: + @timed("scanner", + # constant label + foo="my example label", + # label dependent on input + all=lambda results, predicate, list, find_all=False: find_all, + # label dependent on output + found=lambda results, *a, **k: len(found) > 0, + # normalized on input + normalize=lambda results, predicate, list, **k: len(list), + ) + def scanner(predicate, list, find_all=False): + results = [] + for item in list: + if predicate(item): + results.append(item) + if not find_all: + break + return results + """ + + if normalized_buckets is None: + normalized_buckets = buckets + # convert constant (non-callable) values into callables for consistency + labels = { + # there's a pyflakes bug here suggesting that v is undefined, but it isn't + k: v if callable(v) else (lambda *a, **k: v) + for k, v in labels.items() + } + + def _timed(fn): + # can't safely assign to name inside closure, we use a new _name variable instead + _name = fn.__name__ if name is None else name + + latency = prom.Histogram( + "{}_latency".format(_name), + "Wall clock time taken to execute {}".format(_name), + labels.keys() + ['error'], + buckets=buckets, + ) + cputime = prom.Histogram( + "{}_cputime".format(_name), + "Process-wide consumed CPU time during execution of {}".format(_name), + labels.keys() + ['error', 'type'], + buckets=buckets, + ) + if normalize: + normal_latency = prom.Histogram( + "{}_latency_normalized".format(_name), + "Wall clock time taken to execute {} per unit of work".format(_name), + labels.keys() + ['error'], + buckets=normalized_buckets, + ) + normal_cputime = prom.Histogram( + "{}_cputime_normalized".format(_name), + "Process-wide consumed CPU time during execution of {} per unit of work".format(_name), + labels.keys() + ['error', 'type'], + buckets=normalized_buckets, + ) + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + start_monotonic = monotonic() + start_user, start_sys, _, _, _ = os.times() + + try: + ret = fn(*args, **kwargs) + except Exception: + ret = None + error_type, error, tb = sys.exc_info() + else: + error = None + + end_monotonic = monotonic() + end_user, end_sys, _, _, _ = os.times() + wall_time = end_monotonic - start_monotonic + user_time = end_user - start_user + sys_time = end_sys - start_sys + + label_values = {} + for k, v in labels.items(): + try: + value = v(ret, *args, **kwargs) + except Exception: + value = None + label_values[k] = '' if value is None else str(value) + label_values.update(error='' if error is None else type(error).__name__) + + latency.labels(**label_values).observe(wall_time) + cputime.labels(type='user', **label_values).observe(user_time) + cputime.labels(type='system', **label_values).observe(sys_time) + if normalize: + try: + factor = normalize(ret, *args, **kwargs) + except Exception: + factor = None + if factor is not None and factor > 0: + normal_latency.labels(**label_values).observe(wall_time / factor) + normal_cputime.labels(type='user', **label_values).observe(user_time / factor) + normal_cputime.labels(type='system', **label_values).observe(sys_time / factor) + + if error is None: + return ret + raise error_type, error, tb # re-raise error with original traceback + + return wrapper + + return _timed + + +log_count = prom.Counter("log_count", "Count of messages logged", ["level", "module", "function"]) + +class PromLogCountsHandler(logging.Handler): + """A logging handler that records a count of logs by level, module and function.""" + def emit(self, record): + log_count.labels(record.levelname, record.module, record.funcName).inc() + + @classmethod + def install(cls): + root_logger = logging.getLogger() + root_logger.addHandler(cls())