From 09cf238bb97de8ed6f71a312a7a416c07685ce22 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 6 Jan 2019 01:50:14 -0800 Subject: [PATCH] common: Create general function for timing things, and use it to time get_best_segments The function is quite customizable and therefore quite complex, but it allows us to easily annotate a function to be timed with labels based on input and output, as well as normalize results based on amount of work done to get a better picture of the actual amount of time taken per unit of work. This will help us monitor for performance issues. --- common/common.py | 157 +++++++++++++++++++++++++++++++++++++++++++++++ common/setup.py | 1 + 2 files changed, 158 insertions(+) diff --git a/common/common.py b/common/common.py index 451370e..7053bef 100644 --- a/common/common.py +++ b/common/common.py @@ -5,6 +5,7 @@ import base64 import datetime import errno +import functools import itertools import logging import os @@ -14,6 +15,157 @@ from collections import namedtuple import dateutil.parser import prometheus_client as prom +from monotonic import monotonic + + +def timed(name=None, + buckets=[10.**x for x in range(-9, 5)], normalized_buckets=None, + normalize=None, + **labels +): + """Decorator that instruments wrapped function to record real, user and system time + as a prometheus histogram. + + Metrics are recorded as NAME_latency, NAME_cputime{type=user} and NAME_cputime{type=system} + respectively. User and system time are process-wide (which means they'll be largely meaningless + if you're using gevent and the wrapped function blocks) and do not include subprocesses. + + NAME defaults to the wrapped function's name. + + Any labels passed in are included. Given label values may be callable, in which case + they are passed the input and result from the wrapped function and should return a label value. + Otherwise the given label value is used directly. All label values are automatically str()'d. + + In addition, the "error" label is automatically included, and set to "" if no exception + occurs, or the name of the exception type if one does. + + The normalize argument, if given, causes the creation of a second set of metrics + NAME_normalized_latency, etc. The normalize argument should be a callable which + takes the input and result of the wrapped function and returns a normalization factor. + All normalized metrics divide the observed times by this factor. + The intent is to allow a function which is expected to take longer given a larger input + to be timed on a per-input basis. + As a special case, when normalize returns 0 or None, normalized metrics are not updated. + + The buckets kwarg is as per prometheus_client.Histogram. The default is a conservative + but sparse range covering nanoseconds to hours. + The normalized_buckets kwarg applies to the normalized metrics, and defaults to the same + as buckets. + + All callables that take inputs and result take them as follows: The first arg is the result, + followed by *args and **kwargs as per the function's inputs. + If the wrapped function errored, result is None. + To simplify error handling in these functions, any errors are taken to mean None, + and None is interpreted as '' for label values. + + Contrived Example: + @timed("scanner", + # constant label + foo="my example label", + # label dependent on input + all=lambda results, predicate, list, find_all=False: find_all, + # label dependent on output + found=lambda results, *a, **k: len(found) > 0, + # normalized on input + normalize=lambda results, predicate, list, **k: len(list), + ) + def scanner(predicate, list, find_all=False): + results = [] + for item in list: + if predicate(item): + results.append(item) + if not find_all: + break + return results + """ + + if normalized_buckets is None: + normalized_buckets = buckets + # convert constant (non-callable) values into callables for consistency + labels = { + # there's a pyflakes bug here suggesting that v is undefined, but it isn't + k: v if callable(v) else (lambda *a, **k: v) + for k, v in labels.items() + } + + def _timed(fn): + # can't safely assign to name inside closure, we use a new _name variable instead + _name = fn.__name__ if name is None else name + + latency = prom.Histogram( + "{}_latency".format(_name), + "Wall clock time taken to execute {}".format(_name), + labels.keys() + ['error'], + buckets=buckets, + ) + cputime = prom.Histogram( + "{}_cputime".format(_name), + "Process-wide consumed CPU time during execution of {}".format(_name), + labels.keys() + ['error', 'type'], + buckets=buckets, + ) + if normalize: + normal_latency = prom.Histogram( + "{}_latency_normalized".format(_name), + "Wall clock time taken to execute {} per unit of work".format(_name), + labels.keys() + ['error'], + buckets=normalized_buckets, + ) + normal_cputime = prom.Histogram( + "{}_cputime_normalized".format(_name), + "Process-wide consumed CPU time during execution of {} per unit of work".format(_name), + labels.keys() + ['error', 'type'], + buckets=normalized_buckets, + ) + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + start_monotonic = monotonic() + start_user, start_sys, _, _, _ = os.times() + + try: + ret = fn(*args, **kwargs) + except Exception: + ret = None + error_type, error, tb = sys.exc_info() + else: + error = None + + end_monotonic = monotonic() + end_user, end_sys, _, _, _ = os.times() + wall_time = end_monotonic - start_monotonic + user_time = end_user - start_user + sys_time = end_sys - start_sys + + label_values = {} + for k, v in labels.items(): + try: + value = v(ret, *args, **kwargs) + except Exception: + value = None + label_values[k] = '' if value is None else str(value) + label_values.update(error='' if error is None else type(error).__name__) + + latency.labels(**label_values).observe(wall_time) + cputime.labels(type='user', **label_values).observe(user_time) + cputime.labels(type='system', **label_values).observe(sys_time) + if normalize: + try: + factor = normalize(ret, *args, **kwargs) + except Exception: + factor = None + if factor is not None and factor > 0: + normal_latency.labels(**label_values).observe(wall_time / factor) + normal_cputime.labels(type='user', **label_values).observe(user_time / factor) + normal_cputime.labels(type='system', **label_values).observe(sys_time / factor) + + if error is None: + return ret + raise error_type, error, tb # re-raise error with original traceback + + return wrapper + + return _timed def dt_to_bustime(start, dt): @@ -116,6 +268,11 @@ def parse_segment_path(path): raise ValueError, ValueError("Bad path {!r}: {}".format(path, e)), tb +@timed( + hours_path=lambda ret, hours_path, start, end: hours_path, + has_holes=lambda ret, hours_path, start, end: None in ret, + normalize=lambda ret, hours_path, start, end: len([x for x in ret if x is not None]), +) def get_best_segments(hours_path, start, end): """Return a list of the best sequence of non-overlapping segments we have for a given time range. Hours path should be the directory containing hour directories. diff --git a/common/setup.py b/common/setup.py index 05618a7..18a094c 100644 --- a/common/setup.py +++ b/common/setup.py @@ -5,6 +5,7 @@ setup( version = "0.0.0", py_modules = ["common"], install_requires = [ + "monotonic", "prometheus-client", "python-dateutil", ],