Fix issues with metrics gathering for cut functions

* Need to allow timed() to have multiple callers with same name
* "type" label is reserved, use "cut_type" instead
pull/139/head
Mike Lang 5 years ago
parent 58d09b419c
commit a7f5d1c545

@ -333,7 +333,7 @@ def read_chunks(fileobj, chunk_size=16*1024):
yield chunk yield chunk
@timed('cut', type='rough', normalize=lambda _, segments, start, end: (end - start).total_seconds()) @timed('cut', cut_type='rough', normalize=lambda _, segments, start, end: (end - start).total_seconds())
def rough_cut_segments(segments, start, end): def rough_cut_segments(segments, start, end):
"""Yields chunks of a MPEGTS video file covering at least the timestamp range, """Yields chunks of a MPEGTS video file covering at least the timestamp range,
likely with a few extra seconds on either side. likely with a few extra seconds on either side.
@ -345,7 +345,7 @@ def rough_cut_segments(segments, start, end):
yield chunk yield chunk
@timed('cut', type='fast', normalize=lambda _, segments, start, end: (end - start).total_seconds()) @timed('cut', cut_type='fast', normalize=lambda _, segments, start, end: (end - start).total_seconds())
def fast_cut_segments(segments, start, end): def fast_cut_segments(segments, start, end):
"""Yields chunks of a MPEGTS video file covering the exact timestamp range. """Yields chunks of a MPEGTS video file covering the exact timestamp range.
segments should be a list of segments as returned by get_best_segments(). segments should be a list of segments as returned by get_best_segments().
@ -433,7 +433,7 @@ def feed_input(segments, pipe):
@timed('cut', @timed('cut',
type=lambda _, segments, start, end, encode_args, stream=False: ("full-streamed" if stream else "full-buffered"), cut_type=lambda _, segments, start, end, encode_args, stream=False: ("full-streamed" if stream else "full-buffered"),
normalize=lambda _, segments, start, end, *a, **k: (end - start).total_seconds(), normalize=lambda _, segments, start, end, *a, **k: (end - start).total_seconds(),
) )
def full_cut_segments(segments, start, end, encode_args, stream=False): def full_cut_segments(segments, start, end, encode_args, stream=False):

@ -11,6 +11,10 @@ from monotonic import monotonic
import prometheus_client as prom import prometheus_client as prom
# need to keep global track of what metrics we've registered
# because we're not allowed to re-register
metrics = {}
def timed(name=None, def timed(name=None,
buckets=[10.**x for x in range(-9, 5)], normalized_buckets=None, buckets=[10.**x for x in range(-9, 5)], normalized_buckets=None,
@ -25,6 +29,7 @@ def timed(name=None,
if you're using gevent and the wrapped function blocks) and do not include subprocesses. if you're using gevent and the wrapped function blocks) and do not include subprocesses.
NAME defaults to the wrapped function's name. NAME defaults to the wrapped function's name.
NAME must be unique OR have the exact same labels as other timed() calls with that name.
Any labels passed in are included. Given label values may be callable, in which case Any labels passed in are included. Given label values may be callable, in which case
they are passed the input and result from the wrapped function and should return a label value. they are passed the input and result from the wrapped function and should return a label value.
@ -86,31 +91,40 @@ def timed(name=None,
# can't safely assign to name inside closure, we use a new _name variable instead # can't safely assign to name inside closure, we use a new _name variable instead
_name = fn.__name__ if name is None else name _name = fn.__name__ if name is None else name
latency = prom.Histogram( if name in metrics:
"{}_latency".format(_name), latency, cputime = metrics[name]
"Wall clock time taken to execute {}".format(_name), else:
labels.keys() + ['error'], latency = prom.Histogram(
buckets=buckets, "{}_latency".format(_name),
) "Wall clock time taken to execute {}".format(_name),
cputime = prom.Histogram(
"{}_cputime".format(_name),
"Process-wide consumed CPU time during execution of {}".format(_name),
labels.keys() + ['error', 'type'],
buckets=buckets,
)
if normalize:
normal_latency = prom.Histogram(
"{}_latency_normalized".format(_name),
"Wall clock time taken to execute {} per unit of work".format(_name),
labels.keys() + ['error'], labels.keys() + ['error'],
buckets=normalized_buckets, buckets=buckets,
) )
normal_cputime = prom.Histogram( cputime = prom.Histogram(
"{}_cputime_normalized".format(_name), "{}_cputime".format(_name),
"Process-wide consumed CPU time during execution of {} per unit of work".format(_name), "Process-wide consumed CPU time during execution of {}".format(_name),
labels.keys() + ['error', 'type'], labels.keys() + ['error', 'type'],
buckets=normalized_buckets, buckets=buckets,
) )
metrics[name] = latency, cputime
if normalize:
normname = '{} normalized'.format(name)
if normname in metrics:
normal_latency, normal_cputime = metrics[normname]
else:
normal_latency = prom.Histogram(
"{}_latency_normalized".format(_name),
"Wall clock time taken to execute {} per unit of work".format(_name),
labels.keys() + ['error'],
buckets=normalized_buckets,
)
normal_cputime = prom.Histogram(
"{}_cputime_normalized".format(_name),
"Process-wide consumed CPU time during execution of {} per unit of work".format(_name),
labels.keys() + ['error', 'type'],
buckets=normalized_buckets,
)
metrics[normname] = normal_latency, normal_cputime
@functools.wraps(fn) @functools.wraps(fn)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):

Loading…
Cancel
Save