Merge branch 'master' into chrusher/backfiller

pull/20/head
Christopher Usher 7 years ago committed by GitHub
commit 34372aa7dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

2
.gitignore vendored

@ -1 +1 @@
config.yaml
docker-compose.yml

@ -18,3 +18,7 @@ but a brief overview of the components:
All components are built as docker images.
Components which access the disk expect a shared directory mounted at `/mnt`.
A docker-compose file is provided to run all components. See `docker-compose.jsonnet`
to set configuration options, then generate the compose file with `./generate-docker-compose`.
Then run `docker-compose up`.

@ -0,0 +1,60 @@
// This is a jsonnet file, it generates a docker-compose.yml file.
// To generate, run "make docker-compose.yml".
{
// These are the important top-level settings.
// Change these to configure the services.
// Image tag (application version) to use.
// Note: "latest" is not reccomended in production, as you can't be sure what version
// you're actually running, and must manually re-pull to get an updated copy.
image_tag:: "latest",
// Twitch channel to capture
channel:: "desertbus",
// Stream qualities to capture in addition to source.
qualities:: ["480p"],
// Local path to save segments to. Full path must already exist. Cannot contain ':'.
segments_path:: "/var/lib/wubloader/",
// The host's port to expose the restreamer on.
restreamer_port:: 8080,
// Now for the actual docker-compose config
// docker-compose version
version: "3",
services: {
downloader: {
image: "quay.io/ekimekim/wubloader-downloader:%s" % $.image_tag,
// Args for the downloader: set channel and qualities
command: [
$.channel,
"--qualities", std.join(",", $.qualities),
],
// Mount the segments directory at /mnt
volumes: ["%s:/mnt" % $.segments_path],
// If the application crashes, restart it.
restart: "on-failure",
},
restreamer: {
image: "quay.io/ekimekim/wubloader-restreamer:%s" % $.image_tag,
// Mount the segments directory at /mnt
volumes: ["%s:/mnt" % $.segments_path],
// If the application crashes, restart it.
restart: "on-failure",
// Expose on the configured host port by mapping that port to the default
// port for restreamer, which is 8000.
ports: ["%s:8000" % $.restreamer_port],
},
},
}

@ -3,6 +3,7 @@ import gevent.monkey
gevent.monkey.patch_all()
import logging
import os
import argh
@ -10,5 +11,6 @@ from downloader.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=level, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -25,7 +25,7 @@ class TimedOutError(Exception):
@contextmanager
def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout):
def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft_timeout):
"""Context manager that wraps a piece of code in a pair of timeouts,
a "soft" timeout and a "hard" one. If the block does not complete before
the soft timeout, the given on_soft_timeout() function is called in a new greenlet.
@ -47,7 +47,7 @@ def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout
if finished:
# We finished before soft timeout was hit
return
logging.warning("Hit soft timeout {}s while {}".format(soft_timeout, description))
logger.warning("Hit soft timeout {}s while {}".format(soft_timeout, description))
on_soft_timeout()
gevent.spawn_later(soft_timeout, dispatch_soft_timeout)
error = TimedOutError("Timed out after {}s while {}".format(hard_timeout, description))
@ -90,6 +90,7 @@ class StreamsManager(object):
def __init__(self, channel, base_dir, qualities):
self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel))
self.base_dir = base_dir
self.stream_workers = {name: [] for name in qualities + ["source"]} # {stream name: [workers]}
self.latest_urls = {} # {stream name: (fetch time, url)}
@ -102,7 +103,7 @@ class StreamsManager(object):
and any older workers are safe to stop."""
workers = self.stream_workers[worker.stream]
if worker not in workers:
logging.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers))
self.logger.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers))
return
# stop everything older than given worker
for old in workers[:workers.index(worker)]:
@ -116,12 +117,12 @@ class StreamsManager(object):
"""
workers = self.stream_workers[worker.stream]
if worker not in workers:
logging.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers))
self.logger.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers))
return
if worker is not workers[-1]:
logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream))
self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream))
return
logging.info("Starting new worker for {} by request of old worker".format(worker.stream))
self.logger.info("Starting new worker for {} by request of old worker".format(worker.stream))
self.wait_for_new_url(worker.stream, worker.url)
self.start_worker(worker.stream)
self.trigger_refresh()
@ -146,13 +147,13 @@ class StreamsManager(object):
new_time, new_url = self.latest_urls[stream]
if new_url != old_url:
return
logging.info("Triggering master playlist refresh as we need a new url")
self.logger.info("Triggering master playlist refresh as we need a new url")
self.trigger_refresh()
self.latest_urls_changed.wait()
def stop(self):
"""Shut down all workers and stop capturing stream."""
logging.info("Stopping streams manager")
self.logger.info("Stopping")
self.stopping.set()
def start_worker(self, stream):
@ -160,7 +161,7 @@ class StreamsManager(object):
# it's possible for fetch_latest to call us after we've started stopping,
# in that case do nothing.
if self.stopping.is_set():
logging.info("Ignoring worker start as we are stopping")
self.logger.info("Ignoring worker start as we are stopping")
return
url_time, url = self.latest_urls[stream]
worker = StreamWorker(self, stream, url, url_time)
@ -171,32 +172,32 @@ class StreamsManager(object):
"""Re-fetch master playlist and start new workers if needed"""
try:
# Fetch playlist. On soft timeout, retry.
logging.info("Fetching master playlist")
self.logger.info("Fetching master playlist")
fetch_time = monotonic()
with soft_hard_timeout("fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh):
with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh):
master_playlist = twitch.get_master_playlist(self.channel)
new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys())
self.update_urls(fetch_time, new_urls)
for stream, workers in self.stream_workers.items():
# warn and retry if the url is missing
if stream not in new_urls:
logging.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream))
self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream))
self.trigger_refresh()
# is it newly found?
if not workers and stream in self.latest_urls:
logging.info("Starting new worker for {} as none exist".format(stream))
self.logger.info("Starting new worker for {} as none exist".format(stream))
self.start_worker(stream)
latest_worker = workers[-1]
# is the old worker too old?
if latest_worker.age() > self.MAX_WORKER_AGE:
logging.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.))
self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.))
self.start_worker(stream)
except Exception as e:
if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404:
logging.info("Stream is not up. Retrying.")
self.logger.info("Stream is not up. Retrying.")
self.trigger_refresh()
else:
logging.exception("Failed to fetch master playlist")
self.logger.exception("Failed to fetch master playlist")
# don't retry on hard timeout as we already retried on soft timeout
if not isinstance(e, TimedOutError):
self.trigger_refresh()
@ -209,7 +210,7 @@ class StreamsManager(object):
self.MAX_WORKER_AGE - workers[-1].age()
for workers in self.stream_workers.values() if workers
] or [0]))
logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
self.logger.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
# wait until refresh triggered, next max age reached, or we're stopping (whichever happens first)
gevent.wait([self.stopping, self.refresh_needed], timeout=time_to_next_max_age, count=1)
if not self.stopping.is_set():
@ -217,7 +218,7 @@ class StreamsManager(object):
gevent.spawn(self.fetch_latest)
# wait min retry interval with jitter, unless we're stopping
self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL))
logging.info("Stopping workers")
self.logger.info("Stopping workers")
for workers in self.stream_workers.values():
for worker in workers:
worker.stop()
@ -248,6 +249,7 @@ class StreamWorker(object):
def __init__(self, manager, stream, url, url_time):
self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(stream, id(self)))
self.stream = stream
self.url = url
self.url_time = url_time
@ -268,14 +270,14 @@ class StreamWorker(object):
self.stopping.set()
def run(self):
logging.info("Worker {} starting".format(self))
self.logger.info("Worker starting")
try:
self._run()
except Exception:
logging.exception("Worker {} failed".format(self))
self.logger.exception("Worker failed")
self.trigger_new_worker()
else:
logging.info("Worker {} stopped".format(self))
self.logger.info("Worker stopped")
finally:
for getter in self.getters.values():
getter.done.wait()
@ -293,18 +295,18 @@ class StreamWorker(object):
first = True
while not self.stopping.is_set():
logging.debug("{} getting media playlist {}".format(self, self.url))
self.logger.debug("Getting media playlist {}".format(self.url))
try:
with soft_hard_timeout("getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
playlist = twitch.get_media_playlist(self.url)
except Exception as e:
logging.warning("{} failed to fetch media playlist {}".format(self, self.url), exc_info=True)
self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True)
self.trigger_new_worker()
if first:
logging.warning("{} failed on first fetch, stopping".format(self))
self.logger.warning("Failed on first fetch, stopping")
self.stop()
elif isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 403:
logging.warning("{} failed with 403 Forbidden, stopping".format(self))
self.logger.warning("Failed with 403 Forbidden, stopping")
self.stop()
self.wait(self.FETCH_RETRY_INTERVAL)
continue
@ -320,7 +322,7 @@ class StreamWorker(object):
if segment.uri not in self.getters:
if date is None:
raise ValueError("Cannot determine date of segment")
self.getters[segment.uri] = SegmentGetter(self.manager.base_dir, self.manager.channel, self.stream, segment, date)
self.getters[segment.uri] = SegmentGetter(self.logger, self.manager.base_dir, self.manager.channel, self.stream, segment, date)
gevent.spawn(self.getters[segment.uri].run)
if date is not None:
date += datetime.timedelta(seconds=segment.duration)
@ -335,7 +337,7 @@ class StreamWorker(object):
# Stop if end-of-stream
if playlist.is_endlist:
logging.info("{} stopping due to end-of-playlist".format(self))
self.logger.info("Stopping due to end-of-playlist")
# Trigger a new worker for when the stream comes back up.
# In the short term this will cause some thrashing until the master playlist
# starts returning 404, but it's the best way to avoid missing anything
@ -364,7 +366,8 @@ class SegmentGetter(object):
FETCH_HEADERS_TIMEOUTS = 5, 30
FETCH_FULL_TIMEOUTS = 15, 240
def __init__(self, base_dir, channel, stream, segment, date):
def __init__(self, parent_logger, base_dir, channel, stream, segment, date):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir
self.channel = channel
self.stream = stream
@ -380,7 +383,7 @@ class SegmentGetter(object):
try:
self._run()
except Exception:
logging.exception("Failure in SegmentGetter {}".format(self.segment))
self.logger.exception("Unexpected exception while getting segment {}, retrying".format(self.segment))
gevent.sleep(common.jitter(self.UNEXPECTED_FAILURE_RETRY))
else:
break
@ -388,6 +391,7 @@ class SegmentGetter(object):
self.done.set()
def _run(self):
self.logger.debug("Getter started")
while not self.exists():
self.retry = gevent.event.Event()
worker = gevent.spawn(self.get_segment)
@ -398,6 +402,7 @@ class SegmentGetter(object):
break
# if retry not set, wait for FETCH_RETRY first
self.retry.wait(common.jitter(self.FETCH_RETRY))
self.logger.debug("Getter is done")
def make_path_prefix(self):
"""Generate leading part of filepath which doesn't change with the hash."""
@ -437,8 +442,6 @@ class SegmentGetter(object):
def get_segment(self):
# save current value of self.retry so we can't set any later instance
# after a retry for this round has already occurred.
try:
self._get_segment()
except Exception:
@ -448,13 +451,16 @@ class SegmentGetter(object):
return True
def _get_segment(self):
# save current value of self.retry so we can't set any later instance
# after a retry for this round has already occurred.
retry = self.retry
temp_path = self.make_path("temp")
hash = hashlib.sha256()
file_created = False
try:
logging.debug("Getting segment {}".format(self.segment))
with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, self.retry.set):
with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, self.retry.set):
logging.debug("Downloading segment {} to {}".format(self.segment, temp_path))
with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set):
with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set):
resp = requests.get(self.segment.uri, stream=True)
if resp.status_code == 403:
logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment))
@ -474,12 +480,14 @@ class SegmentGetter(object):
# another exception in the interim
ex_type, ex, tb = sys.exc_info()
if file_created:
common.rename(temp_path, self.make_path("partial", hash))
partial_path = self.make_path("partial", hash)
self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path))
common.rename(temp_path, partial_path)
raise ex_type, ex, tb
else:
common.rename(temp_path, self.make_path("full", hash))
full_path = self.make_path("full", hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path)
def main(channel, base_dir=".", qualities=""):

@ -7,6 +7,9 @@ import requests
import hls_playlist
logger = logging.getLogger(__name__)
def get_master_playlist(channel):
"""Get the master playlist for given channel from twitch"""
resp = requests.get(
@ -62,10 +65,10 @@ def get_media_playlist_uris(master_playlist, target_qualities):
def variant_name(variant):
names = set(media.name for media in variant.media if media.type == "VIDEO" and media.name)
if not names:
logging.warning("Variant {} has no named video renditions, can't determine name".format(variant))
logger.warning("Variant {} has no named video renditions, can't determine name".format(variant))
return None
if len(names) > 1:
logging.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant))
logger.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant))
return list(names)[0]
if not master_playlist.playlists:
@ -73,7 +76,7 @@ def get_media_playlist_uris(master_playlist, target_qualities):
for variant in master_playlist.playlists:
if any(media.uri for media in variant.media):
logging.warning("Variant has a rendition with its own URI: {}".format(variant))
logger.warning("Variant has a rendition with its own URI: {}".format(variant))
by_name = {variant_name(variant): variant for variant in master_playlist.playlists}

@ -0,0 +1,14 @@
#!/bin/bash
set -eu
# We generate first, and capture the output, to avoid overwriting the file on error.
# To avoid jsonnet needing to exist locally, we run it in a container.
output=$(docker run --rm -i sparkprime/jsonnet - < docker-compose.jsonnet)
{
echo "# DO NOT EDIT THIS FILE!"
echo "# This file is generated from docker-compose.jsonnet"
echo "# It can be generated by running ./generate-docker-compose"
echo "$output"
} > docker-compose.yml

@ -3,6 +3,7 @@ import gevent.monkey
gevent.monkey.patch_all()
import logging
import os
import argh
@ -10,5 +11,6 @@ from restreamer.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=level, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -7,9 +7,11 @@ import logging
import os
import shutil
import signal
from contextlib import closing
import dateutil.parser
import gevent
import prometheus_client as prom
from flask import Flask, url_for, request, abort, Response
from gevent import subprocess
from gevent.pywsgi import WSGIServer
@ -17,9 +19,11 @@ from gevent.pywsgi import WSGIServer
from common import get_best_segments
import generate_hls
from stats import stats, after_request
app = Flask('restreamer', static_url_path='/segments')
app.after_request(after_request)
"""
@ -79,6 +83,13 @@ def cors(app):
return handle
@app.route('/metrics')
@stats
def metrics():
"""Return current metrics in prometheus metrics format"""
return prom.generate_latest()
@app.route('/files/<stream>/<variant>')
@has_path_args
def list_hours(stream, variant):
@ -94,6 +105,7 @@ def list_hours(stream, variant):
@app.route('/files/<stream>/<variant>/<hour>')
@stats
@has_path_args
def list_segments(stream, variant, hour):
"""Returns a JSON list of segment files for a given stream, variant and hour.
@ -120,6 +132,7 @@ def time_range_for_variant(stream, variant):
@app.route('/playlist/<stream>.m3u8')
@stats
@has_path_args
def generate_master_playlist(stream):
"""Returns a HLS master playlist for the given stream.
@ -149,6 +162,7 @@ def generate_master_playlist(stream):
@app.route('/playlist/<stream>/<variant>.m3u8')
@stats
@has_path_args
def generate_media_playlist(stream, variant):
"""Returns a HLS media playlist for the given stream and variant.
@ -188,6 +202,7 @@ def generate_media_playlist(stream, variant):
@app.route('/cut/<stream>/<variant>.ts')
@stats
@has_path_args
def cut(stream, variant):
"""Return a MPEGTS video file covering the exact timestamp range.
@ -198,6 +213,8 @@ def cut(stream, variant):
if any holes are detected, rather than producing a video with missing parts.
Set to true by passing "true" (case insensitive).
Even if holes are allowed, a 406 may result if the resulting video would be empty.
experimental: Optional, default false. If true, use the new, much faster, but experimental
method of cutting.
"""
start = dateutil.parser.parse(request.args['start'])
end = dateutil.parser.parse(request.args['end'])
@ -223,19 +240,29 @@ def cut(stream, variant):
return "We have no content available within the requested time range.", 406
# how far into the first segment to begin
cut_start = max(0, (segments[0].start - start).total_seconds())
cut_start = max(0, (start - segments[0].start).total_seconds())
# calculate full uncut duration of content, ie. without holes.
full_duration = sum(segment.duration.total_seconds() for segment in segments)
# calculate how much of final segment should be cut off
cut_end = max(0, (end - segments[-1].end).total_seconds())
cut_end = max(0, (segments[-1].end - end).total_seconds())
# finally, calculate actual output duration, which is what ffmpeg will use
duration = full_duration - cut_start - cut_end
# possibly defer to experiemntal version now that we've parsed our inputs.
# we'll clean up this whole flow later.
if request.args.get('experimental') == 'true':
return cut_experimental(segments, cut_start, cut_end)
def feed_input(pipe):
# pass each segment into ffmpeg's stdin in order, while outputing everything on stdout.
for segment in segments:
with open(segment.path) as f:
shutil.copyfileobj(f, pipe)
try:
shutil.copyfileobj(f, pipe)
except OSError as e:
# ignore EPIPE, as this just means the end cut meant we didn't need all input
if e.errno != errno.EPIPE:
raise
pipe.close()
def _cut():
@ -277,6 +304,89 @@ def cut(stream, variant):
return Response(_cut(), mimetype='video/MP2T')
def cut_experimental(segments, cut_start, cut_end):
"""Experimental cutting method where we cut the first and last segments only,
then cat them all together."""
# Note: assumes codecs don't change from segment to segment.
def streams_info(segment):
"""Return ffprobe's info on streams as a list of dicts"""
output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path])
return json.loads(output)['streams']
def ffmpeg(segment, cut_start=None, cut_end=None):
"""Return a Popen object which is ffmpeg cutting the given segment"""
args = ['ffmpeg', '-i', segment.path]
# output from ffprobe is generally already sorted but let's be paranoid,
# because the order of map args matters.
for stream in sorted(streams_info(segment), key=lambda stream: stream['index']):
# map the same stream in the same position from input to output
args += ['-map', '0:{}'.format(stream['index'])]
if stream['codec_type'] in ('video', 'audio'):
# for non-metadata streams, make sure we use the same codec (metadata streams
# are a bit weirder, and ffmpeg will do the right thing anyway)
args += ['-codec:{}'.format(stream['index']), stream['codec_name']]
# now add trim args
if cut_start:
args += ['-ss', str(cut_start)]
if cut_end:
args += ['-to', str(cut_end)]
# output to stdout as MPEG-TS
args += ['-f', 'mpegts', '-']
# run it
logging.info("Running segment cut with args: {}".format(" ".join(args)))
return subprocess.Popen(args, stdout=subprocess.PIPE)
def chunks(fileobj, chunk_size=16*1024):
"""Read fileobj until EOF, yielding chunk_size sized chunks of data."""
while True:
chunk = fileobj.read(chunk_size)
if not chunk:
break
yield chunk
def _cut():
# set first and last only if they actually need cutting
first = segments[0] if cut_start else None
last = segments[-1] if cut_end else None
for segment in segments:
# note first and last might be the same segment.
# note a segment will only match if cutting actually needs to be done
# (ie. cut_start or cut_end is not 0)
if segment in (first, last):
proc = None
try:
proc = ffmpeg(
segment,
cut_start if segment == first else None,
cut_end if segment == last else None,
)
with closing(proc.stdout):
for chunk in chunks(proc.stdout):
yield chunk
proc.wait()
except Exception:
# try to clean up proc, ignoring errors
try:
proc.kill()
except OSError:
pass
else:
# check if ffmpeg had errors
if proc.returncode != 0:
raise Exception(
"Error while streaming cut: ffmpeg exited {}".format(proc.returncode)
)
else:
# no cutting needed, just serve the file
with open(segment.path) as f:
for chunk in chunks(f):
yield chunk
return Response(_cut(), mimetype='video/MP2T')
def main(host='0.0.0.0', port=8000, base_dir='.'):
app.static_folder = base_dir
server = WSGIServer((host, port), cors(app))

@ -0,0 +1,80 @@
import functools
import prometheus_client as prom
from flask import request
from flask import g as request_store
from monotonic import monotonic
def stats(fn):
"""Decorator that wraps a handler func to collect metrics.
Adds handler func args as labels, along with 'endpoint' label using func's name,
method and response status where applicable."""
# We have to jump through some hoops here, because the prometheus client lib demands
# we pre-define our label names, but we don't know the names of the handler kwargs
# until the first time the function's called. So we delay defining the metrics until
# first call.
metrics = {}
endpoint = fn.__name__
@functools.wraps(fn)
def _stats(**kwargs):
if not metrics:
# first call, set up metrics
labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method']
labels = labels_no_status + ['status']
metrics['latency'] = prom.Histogram(
'http_request_latency',
'Time taken to run the request handler and create a response',
labels,
# buckets: very long playlists / cutting can be quite slow,
# so we have a wider range of latencies than default, up to 10min.
buckets=[.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600],
)
metrics['size'] = prom.Histogram(
'http_response_size',
'Size in bytes of response body for non-chunked responses',
labels,
# buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...)
buckets=[4**i for i in range(16)],
)
metrics['concurrent'] = prom.Gauge(
'http_request_concurrency',
'Number of requests currently ongoing',
labels_no_status,
)
request_store.metrics = metrics
request_store.labels = {k: str(v) for k, v in kwargs.items()}
request_store.labels.update(endpoint=endpoint, method=request.method)
metrics['concurrent'].labels(**request_store.labels).inc()
request_store.start_time = monotonic()
return fn(**kwargs)
return _stats
def after_request(response):
"""Must be registered to run after requests. Finishes tracking the request
and logs most of the metrics.
We do it in this way, instead of inside the stats wrapper, because it lets flask
normalize the handler result into a Response object.
"""
if 'metrics' not in request_store:
return # untracked handler
end_time = monotonic()
metrics = request_store.metrics
labels = request_store.labels
start_time = request_store.start_time
metrics['concurrent'].labels(**labels).dec()
labels['status'] = str(response.status_code)
metrics['latency'].labels(**labels).observe(end_time - start_time)
size = response.calculate_content_length()
if size is not None:
metrics['size'].labels(**labels).observe(size)
return response

@ -8,6 +8,7 @@ setup(
"argh",
"flask",
"gevent",
"prometheus-client",
"wubloader-common",
],
)

Loading…
Cancel
Save