@ -5,6 +5,7 @@
import base64
import datetime
import errno
import functools
import itertools
import logging
import os
@ -14,6 +15,157 @@ from collections import namedtuple
import dateutil . parser
import prometheus_client as prom
from monotonic import monotonic
def timed ( name = None ,
buckets = [ 10. * * x for x in range ( - 9 , 5 ) ] , normalized_buckets = None ,
normalize = None ,
* * labels
) :
""" Decorator that instruments wrapped function to record real, user and system time
as a prometheus histogram .
Metrics are recorded as NAME_latency , NAME_cputime { type = user } and NAME_cputime { type = system }
respectively . User and system time are process - wide ( which means they ' ll be largely meaningless
if you ' re using gevent and the wrapped function blocks) and do not include subprocesses.
NAME defaults to the wrapped function ' s name.
Any labels passed in are included . Given label values may be callable , in which case
they are passed the input and result from the wrapped function and should return a label value .
Otherwise the given label value is used directly . All label values are automatically str ( ) ' d.
In addition , the " error " label is automatically included , and set to " " if no exception
occurs , or the name of the exception type if one does .
The normalize argument , if given , causes the creation of a second set of metrics
NAME_normalized_latency , etc . The normalize argument should be a callable which
takes the input and result of the wrapped function and returns a normalization factor .
All normalized metrics divide the observed times by this factor .
The intent is to allow a function which is expected to take longer given a larger input
to be timed on a per - input basis .
As a special case , when normalize returns 0 or None , normalized metrics are not updated .
The buckets kwarg is as per prometheus_client . Histogram . The default is a conservative
but sparse range covering nanoseconds to hours .
The normalized_buckets kwarg applies to the normalized metrics , and defaults to the same
as buckets .
All callables that take inputs and result take them as follows : The first arg is the result ,
followed by * args and * * kwargs as per the function ' s inputs.
If the wrapped function errored , result is None .
To simplify error handling in these functions , any errors are taken to mean None ,
and None is interpreted as ' ' for label values .
Contrived Example :
@timed ( " scanner " ,
# constant label
foo = " my example label " ,
# label dependent on input
all = lambda results , predicate , list , find_all = False : find_all ,
# label dependent on output
found = lambda results , * a , * * k : len ( found ) > 0 ,
# normalized on input
normalize = lambda results , predicate , list , * * k : len ( list ) ,
)
def scanner ( predicate , list , find_all = False ) :
results = [ ]
for item in list :
if predicate ( item ) :
results . append ( item )
if not find_all :
break
return results
"""
if normalized_buckets is None :
normalized_buckets = buckets
# convert constant (non-callable) values into callables for consistency
labels = {
# there's a pyflakes bug here suggesting that v is undefined, but it isn't
k : v if callable ( v ) else ( lambda * a , * * k : v )
for k , v in labels . items ( )
}
def _timed ( fn ) :
# can't safely assign to name inside closure, we use a new _name variable instead
_name = fn . __name__ if name is None else name
latency = prom . Histogram (
" {} _latency " . format ( _name ) ,
" Wall clock time taken to execute {} " . format ( _name ) ,
labels . keys ( ) + [ ' error ' ] ,
buckets = buckets ,
)
cputime = prom . Histogram (
" {} _cputime " . format ( _name ) ,
" Process-wide consumed CPU time during execution of {} " . format ( _name ) ,
labels . keys ( ) + [ ' error ' , ' type ' ] ,
buckets = buckets ,
)
if normalize :
normal_latency = prom . Histogram (
" {} _latency_normalized " . format ( _name ) ,
" Wall clock time taken to execute {} per unit of work " . format ( _name ) ,
labels . keys ( ) + [ ' error ' ] ,
buckets = normalized_buckets ,
)
normal_cputime = prom . Histogram (
" {} _cputime_normalized " . format ( _name ) ,
" Process-wide consumed CPU time during execution of {} per unit of work " . format ( _name ) ,
labels . keys ( ) + [ ' error ' , ' type ' ] ,
buckets = normalized_buckets ,
)
@functools.wraps ( fn )
def wrapper ( * args , * * kwargs ) :
start_monotonic = monotonic ( )
start_user , start_sys , _ , _ , _ = os . times ( )
try :
ret = fn ( * args , * * kwargs )
except Exception :
ret = None
error_type , error , tb = sys . exc_info ( )
else :
error = None
end_monotonic = monotonic ( )
end_user , end_sys , _ , _ , _ = os . times ( )
wall_time = end_monotonic - start_monotonic
user_time = end_user - start_user
sys_time = end_sys - start_sys
label_values = { }
for k , v in labels . items ( ) :
try :
value = v ( ret , * args , * * kwargs )
except Exception :
value = None
label_values [ k ] = ' ' if value is None else str ( value )
label_values . update ( error = ' ' if error is None else type ( error ) . __name__ )
latency . labels ( * * label_values ) . observe ( wall_time )
cputime . labels ( type = ' user ' , * * label_values ) . observe ( user_time )
cputime . labels ( type = ' system ' , * * label_values ) . observe ( sys_time )
if normalize :
try :
factor = normalize ( ret , * args , * * kwargs )
except Exception :
factor = None
if factor is not None and factor > 0 :
normal_latency . labels ( * * label_values ) . observe ( wall_time / factor )
normal_cputime . labels ( type = ' user ' , * * label_values ) . observe ( user_time / factor )
normal_cputime . labels ( type = ' system ' , * * label_values ) . observe ( sys_time / factor )
if error is None :
return ret
raise error_type , error , tb # re-raise error with original traceback
return wrapper
return _timed
def dt_to_bustime ( start , dt ) :
@ -116,6 +268,11 @@ def parse_segment_path(path):
raise ValueError , ValueError ( " Bad path {!r} : {} " . format ( path , e ) ) , tb
@timed (
hours_path = lambda ret , hours_path , start , end : hours_path ,
has_holes = lambda ret , hours_path , start , end : None in ret ,
normalize = lambda ret , hours_path , start , end : len ( [ x for x in ret if x is not None ] ) ,
)
def get_best_segments ( hours_path , start , end ) :
""" Return a list of the best sequence of non-overlapping segments
we have for a given time range . Hours path should be the directory containing hour directories .