mirror of https://github.com/ekimekim/wubloader
buscribe: Use wubloader's version of common instead of an old copy
None of the apis it uses has changed, so no changes required except for having the dockerfiles take the full wubloader repo as build context.pull/414/head
parent
ea0e84f476
commit
961bc56fd4
@ -1,124 +0,0 @@
|
||||
|
||||
"""A place for common utilities between wubloader components"""
|
||||
import datetime
|
||||
import errno
|
||||
import os
|
||||
import random
|
||||
|
||||
from .segments import get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, parse_segment_path, SegmentInfo
|
||||
from .stats import timed, PromLogCountsHandler, install_stacksampler
|
||||
|
||||
|
||||
def dt_to_bustime(start, dt):
|
||||
"""Convert a datetime to bus time. Bus time is seconds since the given start point."""
|
||||
return (dt - start).total_seconds()
|
||||
|
||||
|
||||
def bustime_to_dt(start, bustime):
|
||||
"""Convert from bus time to a datetime"""
|
||||
return start + datetime.timedelta(seconds=bustime)
|
||||
|
||||
|
||||
def parse_bustime(bustime):
|
||||
"""Convert from bus time human-readable string [-]HH:MM[:SS[.fff]]
|
||||
to float seconds since bustime 00:00. Inverse of format_bustime(),
|
||||
see it for detail."""
|
||||
if bustime.startswith('-'):
|
||||
# parse without the -, then negate it
|
||||
return -parse_bustime(bustime[1:])
|
||||
|
||||
parts = bustime.strip().split(':')
|
||||
if len(parts) == 2:
|
||||
hours, mins = parts
|
||||
secs = 0
|
||||
elif len(parts) == 3:
|
||||
hours, mins, secs = parts
|
||||
else:
|
||||
raise ValueError("Invalid bustime: must be HH:MM[:SS]")
|
||||
hours = int(hours)
|
||||
mins = int(mins)
|
||||
secs = float(secs)
|
||||
return 3600 * hours + 60 * mins + secs
|
||||
|
||||
|
||||
def format_bustime(bustime, round="millisecond"):
|
||||
"""Convert bustime to a human-readable string (-)HH:MM:SS.fff, with the
|
||||
ending cut off depending on the value of round:
|
||||
"millisecond": (default) Round to the nearest millisecond.
|
||||
"second": Round down to the current second.
|
||||
"minute": Round down to the current minute.
|
||||
Examples:
|
||||
00:00:00.000
|
||||
01:23:00
|
||||
110:50
|
||||
159:59:59.999
|
||||
-10:30:01.100
|
||||
Negative times are formatted as time-until-start, preceeded by a minus
|
||||
sign.
|
||||
eg. "-1:20:00" indicates the run begins in 80 minutes.
|
||||
"""
|
||||
sign = ''
|
||||
if bustime < 0:
|
||||
sign = '-'
|
||||
bustime = -bustime
|
||||
total_mins, secs = divmod(bustime, 60)
|
||||
hours, mins = divmod(total_mins, 60)
|
||||
parts = [
|
||||
"{:02d}".format(int(hours)),
|
||||
"{:02d}".format(int(mins)),
|
||||
]
|
||||
if round == "minute":
|
||||
pass
|
||||
elif round == "second":
|
||||
parts.append("{:02d}".format(int(secs)))
|
||||
elif round == "millisecond":
|
||||
parts.append("{:06.3f}".format(secs))
|
||||
else:
|
||||
raise ValueError("Bad rounding value: {!r}".format(round))
|
||||
return sign + ":".join(parts)
|
||||
|
||||
|
||||
def rename(old, new):
|
||||
"""Atomic rename that succeeds if the target already exists, since we're naming everything
|
||||
by hash anyway, so if the filepath already exists the file itself is already there.
|
||||
In this case, we delete the source file.
|
||||
"""
|
||||
try:
|
||||
os.rename(old, new)
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
raise
|
||||
os.remove(old)
|
||||
|
||||
|
||||
def ensure_directory(path):
|
||||
"""Create directory that contains path, as well as any parent directories,
|
||||
if they don't already exist."""
|
||||
dir_path = os.path.dirname(path)
|
||||
os.makedirs(dir_path, exist_ok=True)
|
||||
|
||||
|
||||
def jitter(interval):
|
||||
"""Apply some 'jitter' to an interval. This is a random +/- 10% change in order to
|
||||
smooth out patterns and prevent everything from retrying at the same time.
|
||||
"""
|
||||
return interval * (0.9 + 0.2 * random.random())
|
||||
|
||||
|
||||
def writeall(write, value):
|
||||
"""Helper for writing a complete string to a file-like object.
|
||||
Pass the write function and the value to write, and it will loop if needed to ensure
|
||||
all data is written.
|
||||
Works for both text and binary files, as long as you pass the right value type for
|
||||
the write function.
|
||||
"""
|
||||
while value:
|
||||
n = write(value)
|
||||
if n is None:
|
||||
# The write func doesn't return the amount written, assume it always writes everything
|
||||
break
|
||||
if n == 0:
|
||||
# This would cause an infinite loop...blow up instead so it's clear what the problem is
|
||||
raise Exception("Wrote 0 chars while calling {} with {}-char {}".format(write, len(value), type(value).__name__))
|
||||
# remove the first n chars and go again if we have anything left
|
||||
value = value[n:]
|
@ -1,73 +0,0 @@
|
||||
|
||||
"""
|
||||
Code shared between components that touch the database.
|
||||
Note that this code requires psycopg2 and psycogreen, but the common module
|
||||
as a whole does not to avoid needing to install them for components that don't need it.
|
||||
"""
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
import psycopg2.extras
|
||||
from psycogreen.gevent import patch_psycopg
|
||||
|
||||
|
||||
class DBManager(object):
|
||||
"""Patches psycopg2 before any connections are created. Stores connect info
|
||||
for easy creation of new connections, and sets some defaults before
|
||||
returning them.
|
||||
|
||||
It has the ability to serve as a primitive connection pool, as getting a
|
||||
new conn will return existing conns it knows about first, but you
|
||||
should use a real conn pool for any non-trivial use.
|
||||
|
||||
Returned conns are set to seralizable isolation level, autocommit, and use
|
||||
NamedTupleCursor cursors."""
|
||||
def __init__(self, connect_timeout=30, **connect_kwargs):
|
||||
patch_psycopg()
|
||||
self.conns = []
|
||||
self.connect_timeout = connect_timeout
|
||||
self.connect_kwargs = connect_kwargs
|
||||
|
||||
def put_conn(self, conn):
|
||||
self.conns.append(conn)
|
||||
|
||||
def get_conn(self):
|
||||
if self.conns:
|
||||
return self.conns.pop(0)
|
||||
conn = psycopg2.connect(cursor_factory=psycopg2.extras.NamedTupleCursor,
|
||||
connect_timeout=self.connect_timeout, **self.connect_kwargs)
|
||||
# We use serializable because it means less issues to think about,
|
||||
# we don't care about the performance concerns and everything we do is easily retryable.
|
||||
# This shouldn't matter in practice anyway since everything we're doing is either read-only
|
||||
# searches or targetted single-row updates.
|
||||
conn.isolation_level = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
|
||||
conn.autocommit = True
|
||||
return conn
|
||||
|
||||
|
||||
@contextmanager
|
||||
def transaction(conn):
|
||||
"""Helper context manager that runs the code block as a single database transaction
|
||||
instead of in autocommit mode. The only difference between this and "with conn" is
|
||||
that we explicitly disable then re-enable autocommit."""
|
||||
old_autocommit = conn.autocommit
|
||||
conn.autocommit = False
|
||||
try:
|
||||
with conn:
|
||||
yield
|
||||
finally:
|
||||
conn.autocommit = old_autocommit
|
||||
|
||||
|
||||
def query(conn, query, *args, **kwargs):
|
||||
"""Helper that takes a conn, creates a cursor and executes query against it,
|
||||
then returns the cursor.
|
||||
Variables may be given as positional or keyword args (but not both), corresponding
|
||||
to %s vs %(key)s placeholder forms."""
|
||||
if args and kwargs:
|
||||
raise TypeError("Cannot give both args and kwargs")
|
||||
cur = conn.cursor()
|
||||
cur.execute(query, args or kwargs or None)
|
||||
return cur
|
@ -1,23 +0,0 @@
|
||||
|
||||
|
||||
"""Wrapper code around dateutil to use it more sanely"""
|
||||
|
||||
|
||||
# required so we are able to import dateutil despite this module also being called dateutil
|
||||
from __future__ import absolute_import
|
||||
|
||||
import dateutil.parser
|
||||
import dateutil.tz
|
||||
|
||||
|
||||
def parse(timestamp):
|
||||
"""Parse given timestamp, convert to UTC, and return naive UTC datetime"""
|
||||
dt = dateutil.parser.parse(timestamp)
|
||||
if dt.tzinfo is not None:
|
||||
dt = dt.astimezone(dateutil.tz.tzutc()).replace(tzinfo=None)
|
||||
return dt
|
||||
|
||||
|
||||
def parse_utc_only(timestamp):
|
||||
"""Parse given timestamp, but assume it's already in UTC and ignore other timezone info"""
|
||||
return dateutil.parser.parse(timestamp, ignoretz=True)
|
@ -1,98 +0,0 @@
|
||||
"""
|
||||
Code shared between components to gather stats from flask methods.
|
||||
Note that this code requires flask, but the common module as a whole does not
|
||||
to avoid needing to install them for components that don't need it.
|
||||
"""
|
||||
|
||||
import functools
|
||||
|
||||
from flask import request
|
||||
from flask import g as request_store
|
||||
from monotonic import monotonic
|
||||
import prometheus_client as prom
|
||||
|
||||
|
||||
# Generic metrics that all http requests get logged to (see below for specific metrics per endpoint)
|
||||
|
||||
LATENCY_HELP = "Time taken to run the request handler and create a response"
|
||||
# buckets: very long playlists / cutting can be quite slow,
|
||||
# so we have a wider range of latencies than default, up to 10min.
|
||||
LATENCY_BUCKETS = [.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600]
|
||||
generic_latency = prom.Histogram(
|
||||
'http_request_latency_all', LATENCY_HELP,
|
||||
['endpoint', 'method', 'status'],
|
||||
buckets=LATENCY_BUCKETS,
|
||||
)
|
||||
|
||||
CONCURRENT_HELP = 'Number of requests currently ongoing'
|
||||
generic_concurrent = prom.Gauge(
|
||||
'http_request_concurrency_all', CONCURRENT_HELP,
|
||||
['endpoint', 'method'],
|
||||
)
|
||||
|
||||
|
||||
def request_stats(fn):
|
||||
"""Decorator that wraps a handler func to collect metrics.
|
||||
Adds handler func args as labels, along with 'endpoint' label using func's name,
|
||||
method and response status where applicable."""
|
||||
# We have to jump through some hoops here, because the prometheus client lib demands
|
||||
# we pre-define our label names, but we don't know the names of the handler kwargs
|
||||
# until the first time the function's called. So we delay defining the metrics until
|
||||
# first call.
|
||||
# In addition, it doesn't let us have different sets of labels with the same name.
|
||||
# So we record everything twice: Once under a generic name with only endpoint, method
|
||||
# and status, and once under a name specific to the endpoint with the full set of labels.
|
||||
metrics = {}
|
||||
endpoint = fn.__name__
|
||||
|
||||
@functools.wraps(fn)
|
||||
def _stats(**kwargs):
|
||||
if not metrics:
|
||||
# first call, set up metrics
|
||||
labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method']
|
||||
labels = labels_no_status + ['status']
|
||||
metrics['latency'] = prom.Histogram(
|
||||
'http_request_latency_{}'.format(endpoint), LATENCY_HELP,
|
||||
labels, buckets=LATENCY_BUCKETS,
|
||||
)
|
||||
metrics['concurrent'] = prom.Gauge(
|
||||
'http_request_concurrency_{}'.format(endpoint), CONCURRENT_HELP,
|
||||
labels_no_status,
|
||||
)
|
||||
|
||||
request_store.metrics = metrics
|
||||
request_store.endpoint = endpoint
|
||||
request_store.method = request.method
|
||||
request_store.labels = {k: str(v) for k, v in kwargs.items()}
|
||||
generic_concurrent.labels(endpoint=endpoint, method=request.method).inc()
|
||||
metrics['concurrent'].labels(endpoint=endpoint, method=request.method, **request_store.labels).inc()
|
||||
request_store.start_time = monotonic()
|
||||
return fn(**kwargs)
|
||||
|
||||
return _stats
|
||||
|
||||
|
||||
def after_request(response):
|
||||
"""Must be registered to run after requests. Finishes tracking the request
|
||||
and logs most of the metrics.
|
||||
We do it in this way, instead of inside the request_stats wrapper, because it lets flask
|
||||
normalize the handler result into a Response object.
|
||||
"""
|
||||
if 'metrics' not in request_store:
|
||||
return response # untracked handler
|
||||
|
||||
end_time = monotonic()
|
||||
metrics = request_store.metrics
|
||||
endpoint = request_store.endpoint
|
||||
method = request_store.method
|
||||
labels = request_store.labels
|
||||
start_time = request_store.start_time
|
||||
|
||||
generic_concurrent.labels(endpoint=endpoint, method=method).dec()
|
||||
metrics['concurrent'].labels(endpoint=endpoint, method=method, **labels).dec()
|
||||
|
||||
status = str(response.status_code)
|
||||
generic_latency.labels(endpoint=endpoint, method=method, status=status).observe(end_time - start_time)
|
||||
metrics['latency'].labels(endpoint=endpoint, method=method, status=status, **labels).observe(end_time - start_time)
|
||||
|
||||
return response
|
@ -1,67 +0,0 @@
|
||||
|
||||
import time
|
||||
import logging
|
||||
|
||||
import gevent
|
||||
|
||||
from .requests import InstrumentedSession
|
||||
|
||||
# Wraps all requests in some metric collection
|
||||
requests = InstrumentedSession()
|
||||
|
||||
|
||||
class GoogleAPIClient(object):
|
||||
"""Manages access to google apis and maintains an active access token.
|
||||
Make calls using client.request(), which is a wrapper for requests.request().
|
||||
"""
|
||||
|
||||
ACCESS_TOKEN_ERROR_RETRY_INTERVAL = 10
|
||||
# Refresh token 10min before it expires (it normally lasts an hour)
|
||||
ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY = 600
|
||||
|
||||
def __init__(self, client_id, client_secret, refresh_token):
|
||||
self.client_id = client_id
|
||||
self.client_secret = client_secret
|
||||
self.refresh_token = refresh_token
|
||||
|
||||
self._first_get_access_token = gevent.spawn(self.get_access_token)
|
||||
|
||||
@property
|
||||
def access_token(self):
|
||||
"""Blocks if access token unavailable yet"""
|
||||
self._first_get_access_token.join()
|
||||
return self._access_token
|
||||
|
||||
def get_access_token(self):
|
||||
"""Authenticates against google's API and retrieves a token we will use in
|
||||
subsequent requests.
|
||||
This function gets called automatically when needed, there should be no need to call it
|
||||
yourself."""
|
||||
while True:
|
||||
try:
|
||||
start_time = time.time()
|
||||
resp = requests.post('https://www.googleapis.com/oauth2/v4/token', data={
|
||||
'client_id': self.client_id,
|
||||
'client_secret': self.client_secret,
|
||||
'refresh_token': self.refresh_token,
|
||||
'grant_type': 'refresh_token',
|
||||
}, metric_name='get_access_token')
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
self._access_token = data['access_token']
|
||||
expires_in = (start_time + data['expires_in']) - time.time()
|
||||
if expires_in < self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY:
|
||||
self.logger.warning("Access token expires in {}s, less than normal leeway time of {}s".format(
|
||||
expires_in, self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY,
|
||||
))
|
||||
gevent.spawn_later(expires_in - self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY, self.get_access_token)
|
||||
except Exception:
|
||||
logging.exception("Failed to fetch access token, retrying")
|
||||
gevent.sleep(self.ACCESS_TOKEN_ERROR_RETRY_INTERVAL)
|
||||
else:
|
||||
break
|
||||
|
||||
def request(self, method, url, headers={}, **kwargs):
|
||||
# merge in auth header
|
||||
headers = dict(headers, Authorization='Bearer {}'.format(self.access_token))
|
||||
return requests.request(method, url, headers=headers, **kwargs)
|
@ -1,55 +0,0 @@
|
||||
|
||||
"""Code for instrumenting requests calls. Requires requests, obviously."""
|
||||
|
||||
import urllib.parse
|
||||
|
||||
import requests.sessions
|
||||
import prometheus_client as prom
|
||||
from monotonic import monotonic
|
||||
|
||||
request_latency = prom.Histogram(
|
||||
'http_client_request_latency',
|
||||
'Time taken to make an outgoing HTTP request. '
|
||||
'Status = "error" is used if an error occurs. Measured as time from first byte sent to '
|
||||
'headers finished being parsed, ie. does not include reading a streaming response.',
|
||||
['name', 'method', 'domain', 'status'],
|
||||
)
|
||||
|
||||
response_size = prom.Histogram(
|
||||
'http_client_response_size',
|
||||
"The content length of (non-streaming) responses to outgoing HTTP requests.",
|
||||
['name', 'method', 'domain', 'status'],
|
||||
)
|
||||
|
||||
request_concurrency = prom.Gauge(
|
||||
'http_client_request_concurrency',
|
||||
"The number of outgoing HTTP requests currently ongoing",
|
||||
['name', 'method', 'domain'],
|
||||
)
|
||||
|
||||
class InstrumentedSession(requests.sessions.Session):
|
||||
"""A requests Session that automatically records metrics on requests made.
|
||||
Users may optionally pass a 'metric_name' kwarg that will be included as the 'name' label.
|
||||
"""
|
||||
|
||||
def request(self, method, url, *args, **kwargs):
|
||||
_, domain, _, _, _ = urllib.parse.urlsplit(url)
|
||||
name = kwargs.pop('metric_name', '')
|
||||
|
||||
start = monotonic() # we only use our own measured latency if an error occurs
|
||||
try:
|
||||
with request_concurrency.labels(name, method, domain).track_inprogress():
|
||||
response = super().request(method, url, *args, **kwargs)
|
||||
except Exception:
|
||||
latency = monotonic() - start
|
||||
request_latency.labels(name, method, domain, "error").observe(latency)
|
||||
raise
|
||||
|
||||
request_latency.labels(name, method, domain, response.status_code).observe(response.elapsed.total_seconds())
|
||||
try:
|
||||
content_length = int(response.headers['content-length'])
|
||||
except (KeyError, ValueError):
|
||||
pass # either not present or not valid
|
||||
else:
|
||||
response_size.labels(name, method, domain, response.status_code).observe(content_length)
|
||||
return response
|
@ -1,513 +0,0 @@
|
||||
|
||||
"""A place for common utilities between wubloader components"""
|
||||
|
||||
|
||||
import base64
|
||||
import datetime
|
||||
import errno
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from collections import namedtuple
|
||||
from contextlib import closing
|
||||
from tempfile import TemporaryFile
|
||||
|
||||
import gevent
|
||||
from gevent import subprocess
|
||||
|
||||
from .stats import timed
|
||||
|
||||
|
||||
def unpadded_b64_decode(s):
|
||||
"""Decode base64-encoded string that has had its padding removed.
|
||||
Note it takes a unicode and returns a bytes."""
|
||||
# right-pad with '=' to multiple of 4
|
||||
s = s + '=' * (- len(s) % 4)
|
||||
return base64.b64decode(s.encode(), b"-_")
|
||||
|
||||
|
||||
class SegmentInfo(
|
||||
namedtuple('SegmentInfoBase', [
|
||||
'path', 'channel', 'quality', 'start', 'duration', 'type', 'hash'
|
||||
])
|
||||
):
|
||||
"""Info parsed from a segment path, including original path.
|
||||
Note that start time is a datetime and duration is a timedelta, and hash is a decoded binary string."""
|
||||
@property
|
||||
def end(self):
|
||||
return self.start + self.duration
|
||||
@property
|
||||
def is_partial(self):
|
||||
"""Note that suspect is considered partial"""
|
||||
return self.type != "full"
|
||||
|
||||
|
||||
def parse_segment_timestamp(hour_str, min_str):
|
||||
"""This is faster than strptime, which dominates our segment processing time.
|
||||
It takes strictly formatted hour = "%Y-%m-%dT%H" and time = "%M:%S.%f"."""
|
||||
year = int(hour_str[0:4])
|
||||
month = int(hour_str[5:7])
|
||||
day = int(hour_str[8:10])
|
||||
hour = int(hour_str[11:13])
|
||||
min = int(min_str[0:2])
|
||||
sec = int(min_str[3:5])
|
||||
microsec_str = min_str[6:]
|
||||
microsec_str += '0' * (6 - len(microsec_str)) # right-pad zeros to 6 digits, eg. "123" -> "123000"
|
||||
microsec = int(microsec_str)
|
||||
return datetime.datetime(year, month, day, hour, min, sec, microsec)
|
||||
|
||||
|
||||
def parse_segment_path(path):
|
||||
"""Parse segment path, returning a SegmentInfo. If path is only the trailing part,
|
||||
eg. just a filename, it will leave unknown fields as None."""
|
||||
parts = path.split('/')
|
||||
# left-pad parts with None up to 4 parts
|
||||
parts = [None] * (4 - len(parts)) + parts
|
||||
# pull info out of path parts
|
||||
channel, quality, hour, filename = parts[-4:]
|
||||
# split filename, which should be TIME-DURATION-TYPE-HASH.ts
|
||||
try:
|
||||
if not filename.endswith('.ts'):
|
||||
raise ValueError("Does not end in .ts")
|
||||
filename = filename[:-len('.ts')] # chop off .ts
|
||||
parts = filename.split('-', 3)
|
||||
if len(parts) != 4:
|
||||
raise ValueError("Not enough dashes in filename")
|
||||
time, duration, type, hash = parts
|
||||
if type not in ('full', 'suspect', 'partial', 'temp'):
|
||||
raise ValueError("Unknown type {!r}".format(type))
|
||||
hash = None if type == 'temp' else unpadded_b64_decode(hash)
|
||||
start = None if hour is None else parse_segment_timestamp(hour, time)
|
||||
return SegmentInfo(
|
||||
path = path,
|
||||
channel = channel,
|
||||
quality = quality,
|
||||
start = start,
|
||||
duration = datetime.timedelta(seconds=float(duration)),
|
||||
type = type,
|
||||
hash = hash,
|
||||
)
|
||||
except ValueError as e:
|
||||
# wrap error but preserve original traceback
|
||||
raise ValueError("Bad path {!r}: {}".format(path, e)).with_traceback(e.__traceback__)
|
||||
|
||||
|
||||
class ContainsHoles(Exception):
|
||||
"""Raised by get_best_segments() when a hole is found and allow_holes is False"""
|
||||
|
||||
|
||||
@timed(
|
||||
hours_path=lambda ret, hours_path, *args, **kwargs: hours_path,
|
||||
has_holes=lambda ret, *args, **kwargs: None in ret,
|
||||
normalize=lambda ret, *args, **kwargs: len([x for x in ret if x is not None]),
|
||||
)
|
||||
def get_best_segments(hours_path, start, end, allow_holes=True):
|
||||
"""Return a list of the best sequence of non-overlapping segments
|
||||
we have for a given time range. Hours path should be the directory containing hour directories.
|
||||
Time args start and end should be given as datetime objects.
|
||||
The first segment may start before the time range, and the last may end after it.
|
||||
The returned list contains items that are either:
|
||||
SegmentInfo: a segment
|
||||
None: represents a discontinuity between the previous segment and the next one.
|
||||
ie. as long as two segments appear next to each other, we guarentee there is no gap between
|
||||
them, the second one starts right as the first one finishes.
|
||||
Similarly, unless the first item is None, the first segment starts <= the start of the time
|
||||
range, and unless the last item is None, the last segment ends >= the end of the time range.
|
||||
Example:
|
||||
Suppose you ask for a time range from 10 to 60. We have 10-second segments covering
|
||||
the following times:
|
||||
5 to 15
|
||||
15 to 25
|
||||
30 to 40
|
||||
40 to 50
|
||||
Then the output would look like:
|
||||
segment from 5 to 15
|
||||
segment from 15 to 25
|
||||
None, as the previous segment ends 5sec before the next one begins
|
||||
segment from 30 to 40
|
||||
segment from 40 to 50
|
||||
None, as the previous segment ends 10sec before the requested end time of 60.
|
||||
Note that any is_partial=True segment will be followed by a None, since we can't guarentee
|
||||
it joins on to the next segment fully intact.
|
||||
|
||||
If allow_holes is False, then we fail fast at the first discontinuity found
|
||||
and raise ContainsHoles. If ContainsHoles is not raised, the output is guarenteed to not contain
|
||||
any None items.
|
||||
"""
|
||||
# Note: The exact equality checks in this function are not vulnerable to floating point error,
|
||||
# but only because all input dates and durations are only precise to the millisecond, and
|
||||
# python's datetime types represent these as integer microseconds internally. So the parsing
|
||||
# to these types is exact, and all operations on them are exact, so all operations are exact.
|
||||
|
||||
result = []
|
||||
|
||||
for hour in hour_paths_for_range(hours_path, start, end):
|
||||
# Especially when processing multiple hours, this routine can take a signifigant amount
|
||||
# of time with no blocking. To ensure other stuff is still completed in a timely fashion,
|
||||
# we yield to let other things run.
|
||||
gevent.idle()
|
||||
|
||||
# best_segments_by_start will give us the best available segment for each unique start time
|
||||
for segment in best_segments_by_start(hour):
|
||||
|
||||
# special case: first segment
|
||||
if not result:
|
||||
# first segment is allowed to be before start as long as it includes it
|
||||
if segment.start <= start < segment.end:
|
||||
# segment covers start
|
||||
result.append(segment)
|
||||
elif start < segment.start < end:
|
||||
# segment is after start (but before end), so there was no segment that covers start
|
||||
# so we begin with a None
|
||||
if not allow_holes:
|
||||
raise ContainsHoles
|
||||
result.append(None)
|
||||
result.append(segment)
|
||||
else:
|
||||
# segment is before start, and doesn't cover start, or starts after end.
|
||||
# ignore and go to next.
|
||||
continue
|
||||
else:
|
||||
# normal case: check against previous segment end time
|
||||
prev_end = result[-1].end
|
||||
if segment.start < prev_end:
|
||||
# Overlap! This shouldn't happen, though it might be possible due to weirdness
|
||||
# if the stream drops then starts again quickly. We simply ignore the overlapping
|
||||
# segment and let the algorithm continue.
|
||||
logging.warning("Overlapping segments: {} overlaps end of {}".format(segment, result[-1]))
|
||||
continue
|
||||
if result[-1].is_partial or prev_end < segment.start:
|
||||
# there's a gap between prev end and this start, so add a None
|
||||
if not allow_holes:
|
||||
raise ContainsHoles
|
||||
result.append(None)
|
||||
result.append(segment)
|
||||
|
||||
# check if we've reached the end
|
||||
if end <= segment.end:
|
||||
break
|
||||
|
||||
# this is a weird little construct that says "if we broke from the inner loop,
|
||||
# then also break from the outer one. otherwise continue."
|
||||
else:
|
||||
continue
|
||||
break
|
||||
|
||||
# check if we need a trailing None because last segment is partial or doesn't reach end,
|
||||
# or we found nothing at all
|
||||
if not result or result[-1].is_partial or result[-1].end < end:
|
||||
if not allow_holes:
|
||||
raise ContainsHoles
|
||||
result.append(None)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def hour_paths_for_range(hours_path, start, end):
|
||||
"""Generate a list of hour paths to check when looking for segments between start and end."""
|
||||
# truncate start and end to the hour
|
||||
def truncate(dt):
|
||||
return dt.replace(microsecond=0, second=0, minute=0)
|
||||
current = truncate(start)
|
||||
end = truncate(end)
|
||||
# Begin in the hour prior to start, as there may be a segment that starts in that hour
|
||||
# but contains the start time, eg. if the start time is 01:00:01 and there's a segment
|
||||
# at 00:59:59 which goes for 3 seconds.
|
||||
# Checking the entire hour when in most cases it won't be needed is wasteful, but it's also
|
||||
# pretty quick and the complexity of only checking this case when needed just isn't worth it.
|
||||
current -= datetime.timedelta(hours=1)
|
||||
while current <= end:
|
||||
yield os.path.join(hours_path, current.strftime("%Y-%m-%dT%H"))
|
||||
current += datetime.timedelta(hours=1)
|
||||
|
||||
|
||||
def best_segments_by_start(hour):
|
||||
"""Within a given hour path, yield the "best" segment per unique segment start time.
|
||||
Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial.
|
||||
Note this means this function may perform os.stat()s.
|
||||
"""
|
||||
try:
|
||||
segment_paths = os.listdir(hour)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
# path does not exist, treat it as having no files
|
||||
return
|
||||
segment_paths.sort()
|
||||
# raise a warning for any files that don't parse as segments and ignore them
|
||||
parsed = []
|
||||
for name in segment_paths:
|
||||
try:
|
||||
parsed.append(parse_segment_path(os.path.join(hour, name)))
|
||||
except ValueError:
|
||||
logging.warning("Failed to parse segment {!r}".format(os.path.join(hour, name)), exc_info=True)
|
||||
|
||||
for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start):
|
||||
# ignore temp segments as they might go away by the time we want to use them
|
||||
segments = [segment for segment in segments if segment.type != "temp"]
|
||||
if not segments:
|
||||
# all segments were temp, move on
|
||||
continue
|
||||
|
||||
full_segments = [segment for segment in segments if not segment.is_partial]
|
||||
if full_segments:
|
||||
if len(full_segments) != 1:
|
||||
logging.info("Multiple versions of full segment at start_time {}: {}".format(
|
||||
start_time, ", ".join(map(str, segments))
|
||||
))
|
||||
# We've observed some cases where the same segment (with the same hash) will be reported
|
||||
# with different durations (generally at stream end). Prefer the longer duration (followed by longest size),
|
||||
# as this will ensure that if hashes are different we get the most data, and if they
|
||||
# are the same it should keep holes to a minimum.
|
||||
# If same duration and size, we have to pick one, so pick highest-sorting hash just so we're consistent.
|
||||
sizes = {segment: os.stat(segment.path).st_size for segment in segments}
|
||||
full_segments = [max(full_segments, key=lambda segment: (segment.duration, sizes[segment], segment.hash))]
|
||||
yield full_segments[0]
|
||||
continue
|
||||
# no full segments, fall back to measuring partials. Prefer suspect over partial.
|
||||
yield max(segments, key=lambda segment: (
|
||||
1 if segment.type == 'suspect' else 0,
|
||||
os.stat(segment.path).st_size,
|
||||
))
|
||||
|
||||
|
||||
def streams_info(segment):
|
||||
"""Return ffprobe's info on streams as a list of dicts"""
|
||||
output = subprocess.check_output([
|
||||
'ffprobe',
|
||||
'-hide_banner', '-loglevel', 'fatal', # suppress noisy output
|
||||
'-of', 'json', '-show_streams', # get streams info as json
|
||||
segment.path,
|
||||
])
|
||||
# output here is a bytes, but json.loads will accept it
|
||||
return json.loads(output)['streams']
|
||||
|
||||
|
||||
def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
|
||||
"""Return a Popen object which is ffmpeg cutting the given single segment.
|
||||
This is used when doing a fast cut.
|
||||
"""
|
||||
args = [
|
||||
'ffmpeg',
|
||||
'-hide_banner', '-loglevel', 'error', # suppress noisy output
|
||||
'-i', segment.path,
|
||||
]
|
||||
# output from ffprobe is generally already sorted but let's be paranoid,
|
||||
# because the order of map args matters.
|
||||
for stream in sorted(streams_info(segment), key=lambda stream: stream['index']):
|
||||
# map the same stream in the same position from input to output
|
||||
args += ['-map', '0:{}'.format(stream['index'])]
|
||||
if stream['codec_type'] in ('video', 'audio'):
|
||||
# for non-metadata streams, make sure we use the same codec (metadata streams
|
||||
# are a bit weirder, and ffmpeg will do the right thing anyway)
|
||||
args += ['-codec:{}'.format(stream['index']), stream['codec_name']]
|
||||
# now add trim args
|
||||
if cut_start:
|
||||
args += ['-ss', str(cut_start)]
|
||||
if cut_end:
|
||||
args += ['-to', str(cut_end)]
|
||||
# output to stdout as MPEG-TS
|
||||
args += ['-f', 'mpegts', '-']
|
||||
# run it
|
||||
logging.info("Running segment cut with args: {}".format(" ".join(args)))
|
||||
return subprocess.Popen(args, stdout=subprocess.PIPE)
|
||||
|
||||
|
||||
def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args):
|
||||
"""Return a Popen object which is ffmpeg cutting from stdin.
|
||||
This is used when doing a full cut.
|
||||
If output_file is not subprocess.PIPE,
|
||||
uses explicit output file object instead of using a pipe,
|
||||
because some video formats require a seekable file.
|
||||
"""
|
||||
args = [
|
||||
'ffmpeg',
|
||||
'-hide_banner', '-loglevel', 'error', # suppress noisy output
|
||||
'-i', '-',
|
||||
'-ss', cut_start,
|
||||
'-t', duration,
|
||||
] + list(encode_args)
|
||||
if output_file is subprocess.PIPE:
|
||||
args.append('-') # output to stdout
|
||||
else:
|
||||
args += [
|
||||
# We want ffmpeg to write to our tempfile, which is its stdout.
|
||||
# However, it assumes that '-' means the output is not seekable.
|
||||
# We trick it into understanding that its stdout is seekable by
|
||||
# telling it to write to the fd via its /proc/self filename.
|
||||
'/proc/self/fd/1',
|
||||
# But of course, that file "already exists", so we need to give it
|
||||
# permission to "overwrite" it.
|
||||
'-y',
|
||||
]
|
||||
args = list(map(str, args))
|
||||
logging.info("Running full cut with args: {}".format(" ".join(args)))
|
||||
return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file)
|
||||
|
||||
|
||||
def read_chunks(fileobj, chunk_size=16*1024):
|
||||
"""Read fileobj until EOF, yielding chunk_size sized chunks of data."""
|
||||
while True:
|
||||
chunk = fileobj.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
yield chunk
|
||||
|
||||
|
||||
@timed('cut', cut_type='rough', normalize=lambda _, segments, start, end: (end - start).total_seconds())
|
||||
def rough_cut_segments(segments, start, end):
|
||||
"""Yields chunks of a MPEGTS video file covering at least the timestamp range,
|
||||
likely with a few extra seconds on either side.
|
||||
This method works by simply concatenating all the segments, without any re-encoding.
|
||||
"""
|
||||
for segment in segments:
|
||||
with open(segment.path, 'rb') as f:
|
||||
for chunk in read_chunks(f):
|
||||
yield chunk
|
||||
|
||||
|
||||
@timed('cut', cut_type='fast', normalize=lambda _, segments, start, end: (end - start).total_seconds())
|
||||
def fast_cut_segments(segments, start, end):
|
||||
"""Yields chunks of a MPEGTS video file covering the exact timestamp range.
|
||||
segments should be a list of segments as returned by get_best_segments().
|
||||
This method works by only cutting the first and last segments, and concatenating the rest.
|
||||
This only works if the same codec settings etc are used across all segments.
|
||||
This should almost always be true but may cause weird results if not.
|
||||
"""
|
||||
|
||||
# how far into the first segment to begin (if no hole at start)
|
||||
cut_start = None
|
||||
if segments[0] is not None:
|
||||
cut_start = (start - segments[0].start).total_seconds()
|
||||
if cut_start < 0:
|
||||
raise ValueError("First segment doesn't begin until after cut start, but no leading hole indicated")
|
||||
|
||||
# how far into the final segment to end (if no hole at end)
|
||||
cut_end = None
|
||||
if segments[-1] is not None:
|
||||
cut_end = (end - segments[-1].start).total_seconds()
|
||||
if cut_end < 0:
|
||||
raise ValueError("Last segment ends before cut end, but no trailing hole indicated")
|
||||
|
||||
# Set first and last only if they actually need cutting.
|
||||
# Note this handles both the cut_start = None (no first segment to cut)
|
||||
# and cut_start = 0 (first segment already starts on time) cases.
|
||||
first = segments[0] if cut_start else None
|
||||
last = segments[-1] if cut_end else None
|
||||
|
||||
for segment in segments:
|
||||
if segment is None:
|
||||
logging.debug("Skipping discontinuity while cutting")
|
||||
# TODO: If we want to be safe against the possibility of codecs changing,
|
||||
# we should check the streams_info() after each discontinuity.
|
||||
continue
|
||||
|
||||
# note first and last might be the same segment.
|
||||
# note a segment will only match if cutting actually needs to be done
|
||||
# (ie. cut_start or cut_end is not 0)
|
||||
if segment in (first, last):
|
||||
proc = None
|
||||
try:
|
||||
proc = ffmpeg_cut_segment(
|
||||
segment,
|
||||
cut_start if segment == first else None,
|
||||
cut_end if segment == last else None,
|
||||
)
|
||||
with closing(proc.stdout):
|
||||
for chunk in read_chunks(proc.stdout):
|
||||
yield chunk
|
||||
proc.wait()
|
||||
except Exception as ex:
|
||||
# try to clean up proc, ignoring errors
|
||||
if proc is not None:
|
||||
try:
|
||||
proc.kill()
|
||||
except OSError:
|
||||
pass
|
||||
raise ex
|
||||
else:
|
||||
# check if ffmpeg had errors
|
||||
if proc.returncode != 0:
|
||||
raise Exception(
|
||||
"Error while streaming cut: ffmpeg exited {}".format(proc.returncode)
|
||||
)
|
||||
else:
|
||||
# no cutting needed, just serve the file
|
||||
with open(segment.path, 'rb') as f:
|
||||
for chunk in read_chunks(f):
|
||||
yield chunk
|
||||
|
||||
|
||||
def feed_input(segments, pipe):
|
||||
"""Write each segment's data into the given pipe in order.
|
||||
This is used to provide input to ffmpeg in a full cut."""
|
||||
for segment in segments:
|
||||
with open(segment.path, 'rb') as f:
|
||||
try:
|
||||
shutil.copyfileobj(f, pipe)
|
||||
except OSError as e:
|
||||
# ignore EPIPE, as this just means the end cut meant we didn't need all it
|
||||
if e.errno != errno.EPIPE:
|
||||
raise
|
||||
pipe.close()
|
||||
|
||||
|
||||
@timed('cut',
|
||||
cut_type=lambda _, segments, start, end, encode_args, stream=False: ("full-streamed" if stream else "full-buffered"),
|
||||
normalize=lambda _, segments, start, end, *a, **k: (end - start).total_seconds(),
|
||||
)
|
||||
def full_cut_segments(segments, start, end, encode_args, stream=False):
|
||||
"""If stream=true, assume encode_args gives a streamable format,
|
||||
and begin returning output immediately instead of waiting for ffmpeg to finish
|
||||
and buffering to disk."""
|
||||
|
||||
# Remove holes
|
||||
segments = [segment for segment in segments if segment is not None]
|
||||
|
||||
# how far into the first segment to begin
|
||||
cut_start = max(0, (start - segments[0].start).total_seconds())
|
||||
# duration
|
||||
duration = (end - start).total_seconds()
|
||||
|
||||
ffmpeg = None
|
||||
input_feeder = None
|
||||
try:
|
||||
|
||||
if stream:
|
||||
# When streaming, we can just use a pipe
|
||||
tempfile = subprocess.PIPE
|
||||
else:
|
||||
# Some ffmpeg output formats require a seekable file.
|
||||
# For the same reason, it's not safe to begin uploading until ffmpeg
|
||||
# has finished. We create a temporary file for this.
|
||||
tempfile = TemporaryFile()
|
||||
|
||||
ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, encode_args)
|
||||
input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin)
|
||||
|
||||
# When streaming, we can return data as it is available
|
||||
if stream:
|
||||
for chunk in read_chunks(ffmpeg.stdout):
|
||||
yield chunk
|
||||
|
||||
# check if any errors occurred in input writing, or if ffmpeg exited non-success.
|
||||
if ffmpeg.wait() != 0:
|
||||
raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode))
|
||||
input_feeder.get() # re-raise any errors from feed_input()
|
||||
|
||||
# When not streaming, we can only return the data once ffmpeg has exited
|
||||
if not stream:
|
||||
for chunk in read_chunks(tempfile):
|
||||
yield chunk
|
||||
finally:
|
||||
# if something goes wrong, try to clean up ignoring errors
|
||||
if input_feeder is not None:
|
||||
input_feeder.kill()
|
||||
if ffmpeg is not None and ffmpeg.poll() is None:
|
||||
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
|
||||
try:
|
||||
action()
|
||||
except (OSError, IOError):
|
||||
pass
|
@ -1,257 +0,0 @@
|
||||
|
||||
import atexit
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
|
||||
import gevent.lock
|
||||
from monotonic import monotonic
|
||||
import prometheus_client as prom
|
||||
|
||||
|
||||
# need to keep global track of what metrics we've registered
|
||||
# because we're not allowed to re-register
|
||||
metrics = {}
|
||||
|
||||
|
||||
def timed(name=None,
|
||||
buckets=[10.**x for x in range(-9, 5)], normalized_buckets=None,
|
||||
normalize=None,
|
||||
**labels
|
||||
):
|
||||
"""Decorator that instruments wrapped function to record real, user and system time
|
||||
as a prometheus histogram.
|
||||
|
||||
Metrics are recorded as NAME_latency, NAME_cputime{type=user} and NAME_cputime{type=system}
|
||||
respectively. User and system time are process-wide (which means they'll be largely meaningless
|
||||
if you're using gevent and the wrapped function blocks) and do not include subprocesses.
|
||||
|
||||
NAME defaults to the wrapped function's name.
|
||||
NAME must be unique OR have the exact same labels as other timed() calls with that name.
|
||||
|
||||
Any labels passed in are included. Given label values may be callable, in which case
|
||||
they are passed the input and result from the wrapped function and should return a label value.
|
||||
Otherwise the given label value is used directly. All label values are automatically str()'d.
|
||||
|
||||
In addition, the "error" label is automatically included, and set to "" if no exception
|
||||
occurs, or the name of the exception type if one does.
|
||||
|
||||
The normalize argument, if given, causes the creation of a second set of metrics
|
||||
NAME_normalized_latency, etc. The normalize argument should be a callable which
|
||||
takes the input and result of the wrapped function and returns a normalization factor.
|
||||
All normalized metrics divide the observed times by this factor.
|
||||
The intent is to allow a function which is expected to take longer given a larger input
|
||||
to be timed on a per-input basis.
|
||||
As a special case, when normalize returns 0 or None, normalized metrics are not updated.
|
||||
|
||||
The buckets kwarg is as per prometheus_client.Histogram. The default is a conservative
|
||||
but sparse range covering nanoseconds to hours.
|
||||
The normalized_buckets kwarg applies to the normalized metrics, and defaults to the same
|
||||
as buckets.
|
||||
|
||||
All callables that take inputs and result take them as follows: The first arg is the result,
|
||||
followed by *args and **kwargs as per the function's inputs.
|
||||
If the wrapped function errored, result is None.
|
||||
To simplify error handling in these functions, any errors are taken to mean None,
|
||||
and None is interpreted as '' for label values.
|
||||
|
||||
Contrived Example:
|
||||
@timed("scanner",
|
||||
# constant label
|
||||
foo="my example label",
|
||||
# label dependent on input
|
||||
all=lambda results, predicate, list, find_all=False: find_all,
|
||||
# label dependent on output
|
||||
found=lambda results, *a, **k: len(found) > 0,
|
||||
# normalized on input
|
||||
normalize=lambda results, predicate, list, **k: len(list),
|
||||
)
|
||||
def scanner(predicate, list, find_all=False):
|
||||
results = []
|
||||
for item in list:
|
||||
if predicate(item):
|
||||
results.append(item)
|
||||
if not find_all:
|
||||
break
|
||||
return results
|
||||
"""
|
||||
|
||||
if normalized_buckets is None:
|
||||
normalized_buckets = buckets
|
||||
# convert constant (non-callable) values into callables for consistency
|
||||
labels = {
|
||||
# need to create then call a function to properly bind v as otherwise it will
|
||||
# always return the final label value.
|
||||
k: v if callable(v) else (lambda v: (lambda *a, **k: v))(v)
|
||||
for k, v in labels.items()
|
||||
}
|
||||
|
||||
def _timed(fn):
|
||||
# can't safely assign to name inside closure, we use a new _name variable instead
|
||||
_name = fn.__name__ if name is None else name
|
||||
|
||||
if _name in metrics:
|
||||
latency, cputime = metrics[_name]
|
||||
else:
|
||||
latency = prom.Histogram(
|
||||
"{}_latency".format(_name),
|
||||
"Wall clock time taken to execute {}".format(_name),
|
||||
list(labels.keys()) + ['error'],
|
||||
buckets=buckets,
|
||||
)
|
||||
cputime = prom.Histogram(
|
||||
"{}_cputime".format(_name),
|
||||
"Process-wide consumed CPU time during execution of {}".format(_name),
|
||||
list(labels.keys()) + ['error', 'type'],
|
||||
buckets=buckets,
|
||||
)
|
||||
metrics[_name] = latency, cputime
|
||||
if normalize:
|
||||
normname = '{} normalized'.format(_name)
|
||||
if normname in metrics:
|
||||
normal_latency, normal_cputime = metrics[normname]
|
||||
else:
|
||||
normal_latency = prom.Histogram(
|
||||
"{}_latency_normalized".format(_name),
|
||||
"Wall clock time taken to execute {} per unit of work".format(_name),
|
||||
list(labels.keys()) + ['error'],
|
||||
buckets=normalized_buckets,
|
||||
)
|
||||
normal_cputime = prom.Histogram(
|
||||
"{}_cputime_normalized".format(_name),
|
||||
"Process-wide consumed CPU time during execution of {} per unit of work".format(_name),
|
||||
list(labels.keys()) + ['error', 'type'],
|
||||
buckets=normalized_buckets,
|
||||
)
|
||||
metrics[normname] = normal_latency, normal_cputime
|
||||
|
||||
@functools.wraps(fn)
|
||||
def wrapper(*args, **kwargs):
|
||||
start_monotonic = monotonic()
|
||||
start_user, start_sys, _, _, _ = os.times()
|
||||
|
||||
try:
|
||||
ret = fn(*args, **kwargs)
|
||||
except Exception as e:
|
||||
ret = None
|
||||
error = e
|
||||
else:
|
||||
error = None
|
||||
|
||||
end_monotonic = monotonic()
|
||||
end_user, end_sys, _, _, _ = os.times()
|
||||
wall_time = end_monotonic - start_monotonic
|
||||
user_time = end_user - start_user
|
||||
sys_time = end_sys - start_sys
|
||||
|
||||
label_values = {}
|
||||
for k, v in labels.items():
|
||||
try:
|
||||
value = v(ret, *args, **kwargs)
|
||||
except Exception:
|
||||
value = None
|
||||
label_values[k] = '' if value is None else str(value)
|
||||
label_values.update(error='' if error is None else type(error).__name__)
|
||||
|
||||
latency.labels(**label_values).observe(wall_time)
|
||||
cputime.labels(type='user', **label_values).observe(user_time)
|
||||
cputime.labels(type='system', **label_values).observe(sys_time)
|
||||
if normalize:
|
||||
try:
|
||||
factor = normalize(ret, *args, **kwargs)
|
||||
except Exception:
|
||||
factor = None
|
||||
if factor is not None and factor > 0:
|
||||
normal_latency.labels(**label_values).observe(wall_time / factor)
|
||||
normal_cputime.labels(type='user', **label_values).observe(user_time / factor)
|
||||
normal_cputime.labels(type='system', **label_values).observe(sys_time / factor)
|
||||
|
||||
if error is None:
|
||||
return ret
|
||||
raise error from None # re-raise error with original traceback
|
||||
|
||||
return wrapper
|
||||
|
||||
return _timed
|
||||
|
||||
|
||||
log_count = prom.Counter("log_count", "Count of messages logged", ["level", "module", "function"])
|
||||
|
||||
class PromLogCountsHandler(logging.Handler):
|
||||
"""A logging handler that records a count of logs by level, module and function."""
|
||||
def emit(self, record):
|
||||
log_count.labels(record.levelname, record.module, record.funcName).inc()
|
||||
|
||||
@classmethod
|
||||
def install(cls):
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.addHandler(cls())
|
||||
|
||||
|
||||
def install_stacksampler(interval=0.005):
|
||||
"""Samples the stack every INTERVAL seconds of user time.
|
||||
We could use user+sys time but that leads to interrupting syscalls,
|
||||
which may affect performance, and we care mostly about user time anyway.
|
||||
"""
|
||||
if os.environ.get('WUBLOADER_ENABLE_STACKSAMPLER', '').lower() != 'true':
|
||||
return
|
||||
|
||||
logging.info("Installing stacksampler")
|
||||
|
||||
# Note we only start each next timer once the previous timer signal has been processed.
|
||||
# There are two reasons for this:
|
||||
# 1. Avoid handling a signal while already handling a signal, however unlikely,
|
||||
# as this could lead to a deadlock due to locking inside prometheus_client.
|
||||
# 2. Avoid biasing the results by effectively not including the time taken to do the actual
|
||||
# stack sampling.
|
||||
|
||||
flamegraph = prom.Counter(
|
||||
"flamegraph",
|
||||
"Approx time consumed by each unique stack trace seen by sampling the stack",
|
||||
["stack"]
|
||||
)
|
||||
# HACK: It's possible to deadlock if we handle a signal during a prometheus collect
|
||||
# operation that locks our flamegraph metric. We then try to take the lock when recording the
|
||||
# metric, but can't.
|
||||
# As a hacky work around, we replace the lock with a dummy lock that doesn't actually lock anything.
|
||||
# This is reasonably safe. We know that only one copy of sample() will ever run at once,
|
||||
# and nothing else but sample() and collect() will touch the metric, leaving two possibilities:
|
||||
# 1. Multiple collects happen at once: Safe. They only do read operations.
|
||||
# 2. A sample during a collect: Safe. The collect only does a copy inside the locked part,
|
||||
# so it just means it'll either get a copy with the new label set, or without it.
|
||||
# This presumes the implementation doesn't change to make that different, however.
|
||||
flamegraph._lock = gevent.lock.DummySemaphore()
|
||||
# There is also a lock we need to bypass on the actual counter values themselves.
|
||||
# Since they get created dynamically, this means we need to replace the lock function
|
||||
# that is used to create them.
|
||||
# This unfortunately means we go without locking for all metrics, not just this one,
|
||||
# however this is safe because we are using gevent, not threading. The lock is only
|
||||
# used to make incrementing/decrementing the counter thread-safe, which is not a concern
|
||||
# under gevent since there are no switch points under the lock.
|
||||
import prometheus_client.values
|
||||
prometheus_client.values.Lock = gevent.lock.DummySemaphore
|
||||
|
||||
|
||||
def sample(signum, frame):
|
||||
stack = []
|
||||
while frame is not None:
|
||||
stack.append(frame)
|
||||
frame = frame.f_back
|
||||
# format each frame as FUNCTION(MODULE)
|
||||
stack = ";".join(
|
||||
"{}({})".format(frame.f_code.co_name, frame.f_globals.get('__name__'))
|
||||
for frame in stack[::-1]
|
||||
)
|
||||
# increase counter by interval, so final units are in seconds
|
||||
flamegraph.labels(stack).inc(interval)
|
||||
# schedule the next signal
|
||||
signal.setitimer(signal.ITIMER_VIRTUAL, interval)
|
||||
|
||||
def cancel():
|
||||
signal.setitimer(signal.ITIMER_VIRTUAL, 0)
|
||||
atexit.register(cancel)
|
||||
|
||||
signal.signal(signal.SIGVTALRM, sample)
|
||||
# deliver the first signal in INTERVAL seconds
|
||||
signal.setitimer(signal.ITIMER_VIRTUAL, interval)
|
@ -1,12 +0,0 @@
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name = "wubloader-common",
|
||||
version = "0.0.0",
|
||||
packages = find_packages(),
|
||||
install_requires = [
|
||||
"gevent",
|
||||
"monotonic",
|
||||
"prometheus-client",
|
||||
],
|
||||
)
|
@ -1,5 +1,5 @@
|
||||
FROM nginx:latest
|
||||
|
||||
COPY buscribe-web /usr/share/nginx/html/buscribe
|
||||
COPY professor /usr/share/nginx/html/professor
|
||||
COPY nginx/nginx.conf /etc/nginx/nginx.conf
|
||||
COPY buscribe/buscribe-web /usr/share/nginx/html/buscribe
|
||||
COPY buscribe/professor /usr/share/nginx/html/professor
|
||||
COPY buscribe/nginx/nginx.conf /etc/nginx/nginx.conf
|
||||
|
Loading…
Reference in New Issue