diff --git a/.gitignore b/.gitignore index 5b6b072..1120be9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -config.yaml +docker-compose.yml diff --git a/README.md b/README.md index 6ffcffe..d8d5d15 100644 --- a/README.md +++ b/README.md @@ -18,3 +18,7 @@ but a brief overview of the components: All components are built as docker images. Components which access the disk expect a shared directory mounted at `/mnt`. + +A docker-compose file is provided to run all components. See `docker-compose.jsonnet` +to set configuration options, then generate the compose file with `./generate-docker-compose`. +Then run `docker-compose up`. diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet new file mode 100644 index 0000000..a58044a --- /dev/null +++ b/docker-compose.jsonnet @@ -0,0 +1,60 @@ +// This is a jsonnet file, it generates a docker-compose.yml file. +// To generate, run "make docker-compose.yml". + +{ + + // These are the important top-level settings. + // Change these to configure the services. + + // Image tag (application version) to use. + // Note: "latest" is not reccomended in production, as you can't be sure what version + // you're actually running, and must manually re-pull to get an updated copy. + image_tag:: "latest", + + // Twitch channel to capture + channel:: "desertbus", + + // Stream qualities to capture in addition to source. + qualities:: ["480p"], + + // Local path to save segments to. Full path must already exist. Cannot contain ':'. + segments_path:: "/var/lib/wubloader/", + + // The host's port to expose the restreamer on. + restreamer_port:: 8080, + + + // Now for the actual docker-compose config + + // docker-compose version + version: "3", + + services: { + + downloader: { + image: "quay.io/ekimekim/wubloader-downloader:%s" % $.image_tag, + // Args for the downloader: set channel and qualities + command: [ + $.channel, + "--qualities", std.join(",", $.qualities), + ], + // Mount the segments directory at /mnt + volumes: ["%s:/mnt" % $.segments_path], + // If the application crashes, restart it. + restart: "on-failure", + }, + + restreamer: { + image: "quay.io/ekimekim/wubloader-restreamer:%s" % $.image_tag, + // Mount the segments directory at /mnt + volumes: ["%s:/mnt" % $.segments_path], + // If the application crashes, restart it. + restart: "on-failure", + // Expose on the configured host port by mapping that port to the default + // port for restreamer, which is 8000. + ports: ["%s:8000" % $.restreamer_port], + }, + + }, + +} diff --git a/downloader/downloader/__main__.py b/downloader/downloader/__main__.py index 45d813e..2f2fa3f 100644 --- a/downloader/downloader/__main__.py +++ b/downloader/downloader/__main__.py @@ -3,6 +3,7 @@ import gevent.monkey gevent.monkey.patch_all() import logging +import os import argh @@ -10,5 +11,6 @@ from downloader.main import main LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" -logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) +level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=level, format=LOG_FORMAT) argh.dispatch_command(main) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 0a068a7..0aac4d2 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -25,7 +25,7 @@ class TimedOutError(Exception): @contextmanager -def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout): +def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft_timeout): """Context manager that wraps a piece of code in a pair of timeouts, a "soft" timeout and a "hard" one. If the block does not complete before the soft timeout, the given on_soft_timeout() function is called in a new greenlet. @@ -47,7 +47,7 @@ def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout if finished: # We finished before soft timeout was hit return - logging.warning("Hit soft timeout {}s while {}".format(soft_timeout, description)) + logger.warning("Hit soft timeout {}s while {}".format(soft_timeout, description)) on_soft_timeout() gevent.spawn_later(soft_timeout, dispatch_soft_timeout) error = TimedOutError("Timed out after {}s while {}".format(hard_timeout, description)) @@ -90,6 +90,7 @@ class StreamsManager(object): def __init__(self, channel, base_dir, qualities): self.channel = channel + self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.base_dir = base_dir self.stream_workers = {name: [] for name in qualities + ["source"]} # {stream name: [workers]} self.latest_urls = {} # {stream name: (fetch time, url)} @@ -102,7 +103,7 @@ class StreamsManager(object): and any older workers are safe to stop.""" workers = self.stream_workers[worker.stream] if worker not in workers: - logging.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers)) + self.logger.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers)) return # stop everything older than given worker for old in workers[:workers.index(worker)]: @@ -116,12 +117,12 @@ class StreamsManager(object): """ workers = self.stream_workers[worker.stream] if worker not in workers: - logging.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers)) + self.logger.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers)) return if worker is not workers[-1]: - logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) + self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) return - logging.info("Starting new worker for {} by request of old worker".format(worker.stream)) + self.logger.info("Starting new worker for {} by request of old worker".format(worker.stream)) self.wait_for_new_url(worker.stream, worker.url) self.start_worker(worker.stream) self.trigger_refresh() @@ -146,13 +147,13 @@ class StreamsManager(object): new_time, new_url = self.latest_urls[stream] if new_url != old_url: return - logging.info("Triggering master playlist refresh as we need a new url") + self.logger.info("Triggering master playlist refresh as we need a new url") self.trigger_refresh() self.latest_urls_changed.wait() def stop(self): """Shut down all workers and stop capturing stream.""" - logging.info("Stopping streams manager") + self.logger.info("Stopping") self.stopping.set() def start_worker(self, stream): @@ -160,7 +161,7 @@ class StreamsManager(object): # it's possible for fetch_latest to call us after we've started stopping, # in that case do nothing. if self.stopping.is_set(): - logging.info("Ignoring worker start as we are stopping") + self.logger.info("Ignoring worker start as we are stopping") return url_time, url = self.latest_urls[stream] worker = StreamWorker(self, stream, url, url_time) @@ -171,32 +172,32 @@ class StreamsManager(object): """Re-fetch master playlist and start new workers if needed""" try: # Fetch playlist. On soft timeout, retry. - logging.info("Fetching master playlist") + self.logger.info("Fetching master playlist") fetch_time = monotonic() - with soft_hard_timeout("fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): + with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): master_playlist = twitch.get_master_playlist(self.channel) new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys()) self.update_urls(fetch_time, new_urls) for stream, workers in self.stream_workers.items(): # warn and retry if the url is missing if stream not in new_urls: - logging.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream)) + self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream)) self.trigger_refresh() # is it newly found? if not workers and stream in self.latest_urls: - logging.info("Starting new worker for {} as none exist".format(stream)) + self.logger.info("Starting new worker for {} as none exist".format(stream)) self.start_worker(stream) latest_worker = workers[-1] # is the old worker too old? if latest_worker.age() > self.MAX_WORKER_AGE: - logging.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) + self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) self.start_worker(stream) except Exception as e: if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404: - logging.info("Stream is not up. Retrying.") + self.logger.info("Stream is not up. Retrying.") self.trigger_refresh() else: - logging.exception("Failed to fetch master playlist") + self.logger.exception("Failed to fetch master playlist") # don't retry on hard timeout as we already retried on soft timeout if not isinstance(e, TimedOutError): self.trigger_refresh() @@ -209,7 +210,7 @@ class StreamsManager(object): self.MAX_WORKER_AGE - workers[-1].age() for workers in self.stream_workers.values() if workers ] or [0])) - logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) + self.logger.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) # wait until refresh triggered, next max age reached, or we're stopping (whichever happens first) gevent.wait([self.stopping, self.refresh_needed], timeout=time_to_next_max_age, count=1) if not self.stopping.is_set(): @@ -217,7 +218,7 @@ class StreamsManager(object): gevent.spawn(self.fetch_latest) # wait min retry interval with jitter, unless we're stopping self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL)) - logging.info("Stopping workers") + self.logger.info("Stopping workers") for workers in self.stream_workers.values(): for worker in workers: worker.stop() @@ -248,6 +249,7 @@ class StreamWorker(object): def __init__(self, manager, stream, url, url_time): self.manager = manager + self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(stream, id(self))) self.stream = stream self.url = url self.url_time = url_time @@ -268,14 +270,14 @@ class StreamWorker(object): self.stopping.set() def run(self): - logging.info("Worker {} starting".format(self)) + self.logger.info("Worker starting") try: self._run() except Exception: - logging.exception("Worker {} failed".format(self)) + self.logger.exception("Worker failed") self.trigger_new_worker() else: - logging.info("Worker {} stopped".format(self)) + self.logger.info("Worker stopped") finally: for getter in self.getters.values(): getter.done.wait() @@ -293,18 +295,18 @@ class StreamWorker(object): first = True while not self.stopping.is_set(): - logging.debug("{} getting media playlist {}".format(self, self.url)) + self.logger.debug("Getting media playlist {}".format(self.url)) try: - with soft_hard_timeout("getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): + with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): playlist = twitch.get_media_playlist(self.url) except Exception as e: - logging.warning("{} failed to fetch media playlist {}".format(self, self.url), exc_info=True) + self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() if first: - logging.warning("{} failed on first fetch, stopping".format(self)) + self.logger.warning("Failed on first fetch, stopping") self.stop() elif isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 403: - logging.warning("{} failed with 403 Forbidden, stopping".format(self)) + self.logger.warning("Failed with 403 Forbidden, stopping") self.stop() self.wait(self.FETCH_RETRY_INTERVAL) continue @@ -320,7 +322,7 @@ class StreamWorker(object): if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") - self.getters[segment.uri] = SegmentGetter(self.manager.base_dir, self.manager.channel, self.stream, segment, date) + self.getters[segment.uri] = SegmentGetter(self.logger, self.manager.base_dir, self.manager.channel, self.stream, segment, date) gevent.spawn(self.getters[segment.uri].run) if date is not None: date += datetime.timedelta(seconds=segment.duration) @@ -335,7 +337,7 @@ class StreamWorker(object): # Stop if end-of-stream if playlist.is_endlist: - logging.info("{} stopping due to end-of-playlist".format(self)) + self.logger.info("Stopping due to end-of-playlist") # Trigger a new worker for when the stream comes back up. # In the short term this will cause some thrashing until the master playlist # starts returning 404, but it's the best way to avoid missing anything @@ -364,7 +366,8 @@ class SegmentGetter(object): FETCH_HEADERS_TIMEOUTS = 5, 30 FETCH_FULL_TIMEOUTS = 15, 240 - def __init__(self, base_dir, channel, stream, segment, date): + def __init__(self, parent_logger, base_dir, channel, stream, segment, date): + self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.stream = stream @@ -380,7 +383,7 @@ class SegmentGetter(object): try: self._run() except Exception: - logging.exception("Failure in SegmentGetter {}".format(self.segment)) + self.logger.exception("Unexpected exception while getting segment {}, retrying".format(self.segment)) gevent.sleep(common.jitter(self.UNEXPECTED_FAILURE_RETRY)) else: break @@ -388,6 +391,7 @@ class SegmentGetter(object): self.done.set() def _run(self): + self.logger.debug("Getter started") while not self.exists(): self.retry = gevent.event.Event() worker = gevent.spawn(self.get_segment) @@ -398,6 +402,7 @@ class SegmentGetter(object): break # if retry not set, wait for FETCH_RETRY first self.retry.wait(common.jitter(self.FETCH_RETRY)) + self.logger.debug("Getter is done") def make_path_prefix(self): """Generate leading part of filepath which doesn't change with the hash.""" @@ -437,8 +442,6 @@ class SegmentGetter(object): def get_segment(self): - # save current value of self.retry so we can't set any later instance - # after a retry for this round has already occurred. try: self._get_segment() except Exception: @@ -448,13 +451,16 @@ class SegmentGetter(object): return True def _get_segment(self): + # save current value of self.retry so we can't set any later instance + # after a retry for this round has already occurred. + retry = self.retry temp_path = self.make_path("temp") hash = hashlib.sha256() file_created = False try: - logging.debug("Getting segment {}".format(self.segment)) - with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, self.retry.set): - with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, self.retry.set): + logging.debug("Downloading segment {} to {}".format(self.segment, temp_path)) + with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): + with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): resp = requests.get(self.segment.uri, stream=True) if resp.status_code == 403: logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) @@ -474,12 +480,14 @@ class SegmentGetter(object): # another exception in the interim ex_type, ex, tb = sys.exc_info() if file_created: - common.rename(temp_path, self.make_path("partial", hash)) + partial_path = self.make_path("partial", hash) + self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) + common.rename(temp_path, partial_path) raise ex_type, ex, tb else: - common.rename(temp_path, self.make_path("full", hash)) - - + full_path = self.make_path("full", hash) + self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) + common.rename(temp_path, full_path) def main(channel, base_dir=".", qualities=""): diff --git a/downloader/downloader/twitch.py b/downloader/downloader/twitch.py index 92358bb..ba37d6c 100644 --- a/downloader/downloader/twitch.py +++ b/downloader/downloader/twitch.py @@ -7,6 +7,9 @@ import requests import hls_playlist +logger = logging.getLogger(__name__) + + def get_master_playlist(channel): """Get the master playlist for given channel from twitch""" resp = requests.get( @@ -62,10 +65,10 @@ def get_media_playlist_uris(master_playlist, target_qualities): def variant_name(variant): names = set(media.name for media in variant.media if media.type == "VIDEO" and media.name) if not names: - logging.warning("Variant {} has no named video renditions, can't determine name".format(variant)) + logger.warning("Variant {} has no named video renditions, can't determine name".format(variant)) return None if len(names) > 1: - logging.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant)) + logger.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant)) return list(names)[0] if not master_playlist.playlists: @@ -73,7 +76,7 @@ def get_media_playlist_uris(master_playlist, target_qualities): for variant in master_playlist.playlists: if any(media.uri for media in variant.media): - logging.warning("Variant has a rendition with its own URI: {}".format(variant)) + logger.warning("Variant has a rendition with its own URI: {}".format(variant)) by_name = {variant_name(variant): variant for variant in master_playlist.playlists} diff --git a/generate-docker-compose b/generate-docker-compose new file mode 100755 index 0000000..108d190 --- /dev/null +++ b/generate-docker-compose @@ -0,0 +1,14 @@ +#!/bin/bash + +set -eu + +# We generate first, and capture the output, to avoid overwriting the file on error. +# To avoid jsonnet needing to exist locally, we run it in a container. +output=$(docker run --rm -i sparkprime/jsonnet - < docker-compose.jsonnet) + +{ + echo "# DO NOT EDIT THIS FILE!" + echo "# This file is generated from docker-compose.jsonnet" + echo "# It can be generated by running ./generate-docker-compose" + echo "$output" +} > docker-compose.yml diff --git a/restreamer/restreamer/__main__.py b/restreamer/restreamer/__main__.py index 24935eb..f0ad12b 100644 --- a/restreamer/restreamer/__main__.py +++ b/restreamer/restreamer/__main__.py @@ -3,6 +3,7 @@ import gevent.monkey gevent.monkey.patch_all() import logging +import os import argh @@ -10,5 +11,6 @@ from restreamer.main import main LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" -logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) +level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=level, format=LOG_FORMAT) argh.dispatch_command(main) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 1e01d08..9b354ef 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -7,9 +7,11 @@ import logging import os import shutil import signal +from contextlib import closing import dateutil.parser import gevent +import prometheus_client as prom from flask import Flask, url_for, request, abort, Response from gevent import subprocess from gevent.pywsgi import WSGIServer @@ -17,9 +19,11 @@ from gevent.pywsgi import WSGIServer from common import get_best_segments import generate_hls +from stats import stats, after_request app = Flask('restreamer', static_url_path='/segments') +app.after_request(after_request) """ @@ -79,6 +83,13 @@ def cors(app): return handle +@app.route('/metrics') +@stats +def metrics(): + """Return current metrics in prometheus metrics format""" + return prom.generate_latest() + + @app.route('/files//') @has_path_args def list_hours(stream, variant): @@ -94,6 +105,7 @@ def list_hours(stream, variant): @app.route('/files///') +@stats @has_path_args def list_segments(stream, variant, hour): """Returns a JSON list of segment files for a given stream, variant and hour. @@ -120,6 +132,7 @@ def time_range_for_variant(stream, variant): @app.route('/playlist/.m3u8') +@stats @has_path_args def generate_master_playlist(stream): """Returns a HLS master playlist for the given stream. @@ -149,6 +162,7 @@ def generate_master_playlist(stream): @app.route('/playlist//.m3u8') +@stats @has_path_args def generate_media_playlist(stream, variant): """Returns a HLS media playlist for the given stream and variant. @@ -188,6 +202,7 @@ def generate_media_playlist(stream, variant): @app.route('/cut//.ts') +@stats @has_path_args def cut(stream, variant): """Return a MPEGTS video file covering the exact timestamp range. @@ -198,6 +213,8 @@ def cut(stream, variant): if any holes are detected, rather than producing a video with missing parts. Set to true by passing "true" (case insensitive). Even if holes are allowed, a 406 may result if the resulting video would be empty. + experimental: Optional, default false. If true, use the new, much faster, but experimental + method of cutting. """ start = dateutil.parser.parse(request.args['start']) end = dateutil.parser.parse(request.args['end']) @@ -223,19 +240,29 @@ def cut(stream, variant): return "We have no content available within the requested time range.", 406 # how far into the first segment to begin - cut_start = max(0, (segments[0].start - start).total_seconds()) + cut_start = max(0, (start - segments[0].start).total_seconds()) # calculate full uncut duration of content, ie. without holes. full_duration = sum(segment.duration.total_seconds() for segment in segments) # calculate how much of final segment should be cut off - cut_end = max(0, (end - segments[-1].end).total_seconds()) + cut_end = max(0, (segments[-1].end - end).total_seconds()) # finally, calculate actual output duration, which is what ffmpeg will use duration = full_duration - cut_start - cut_end + # possibly defer to experiemntal version now that we've parsed our inputs. + # we'll clean up this whole flow later. + if request.args.get('experimental') == 'true': + return cut_experimental(segments, cut_start, cut_end) + def feed_input(pipe): # pass each segment into ffmpeg's stdin in order, while outputing everything on stdout. for segment in segments: with open(segment.path) as f: - shutil.copyfileobj(f, pipe) + try: + shutil.copyfileobj(f, pipe) + except OSError as e: + # ignore EPIPE, as this just means the end cut meant we didn't need all input + if e.errno != errno.EPIPE: + raise pipe.close() def _cut(): @@ -277,6 +304,89 @@ def cut(stream, variant): return Response(_cut(), mimetype='video/MP2T') +def cut_experimental(segments, cut_start, cut_end): + """Experimental cutting method where we cut the first and last segments only, + then cat them all together.""" + # Note: assumes codecs don't change from segment to segment. + + def streams_info(segment): + """Return ffprobe's info on streams as a list of dicts""" + output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path]) + return json.loads(output)['streams'] + + def ffmpeg(segment, cut_start=None, cut_end=None): + """Return a Popen object which is ffmpeg cutting the given segment""" + args = ['ffmpeg', '-i', segment.path] + # output from ffprobe is generally already sorted but let's be paranoid, + # because the order of map args matters. + for stream in sorted(streams_info(segment), key=lambda stream: stream['index']): + # map the same stream in the same position from input to output + args += ['-map', '0:{}'.format(stream['index'])] + if stream['codec_type'] in ('video', 'audio'): + # for non-metadata streams, make sure we use the same codec (metadata streams + # are a bit weirder, and ffmpeg will do the right thing anyway) + args += ['-codec:{}'.format(stream['index']), stream['codec_name']] + # now add trim args + if cut_start: + args += ['-ss', str(cut_start)] + if cut_end: + args += ['-to', str(cut_end)] + # output to stdout as MPEG-TS + args += ['-f', 'mpegts', '-'] + # run it + logging.info("Running segment cut with args: {}".format(" ".join(args))) + return subprocess.Popen(args, stdout=subprocess.PIPE) + + def chunks(fileobj, chunk_size=16*1024): + """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" + while True: + chunk = fileobj.read(chunk_size) + if not chunk: + break + yield chunk + + def _cut(): + # set first and last only if they actually need cutting + first = segments[0] if cut_start else None + last = segments[-1] if cut_end else None + for segment in segments: + # note first and last might be the same segment. + # note a segment will only match if cutting actually needs to be done + # (ie. cut_start or cut_end is not 0) + if segment in (first, last): + proc = None + try: + proc = ffmpeg( + segment, + cut_start if segment == first else None, + cut_end if segment == last else None, + ) + with closing(proc.stdout): + for chunk in chunks(proc.stdout): + yield chunk + proc.wait() + except Exception: + # try to clean up proc, ignoring errors + try: + proc.kill() + except OSError: + pass + else: + # check if ffmpeg had errors + if proc.returncode != 0: + raise Exception( + "Error while streaming cut: ffmpeg exited {}".format(proc.returncode) + ) + else: + # no cutting needed, just serve the file + with open(segment.path) as f: + for chunk in chunks(f): + yield chunk + + return Response(_cut(), mimetype='video/MP2T') + + + def main(host='0.0.0.0', port=8000, base_dir='.'): app.static_folder = base_dir server = WSGIServer((host, port), cors(app)) diff --git a/restreamer/restreamer/stats.py b/restreamer/restreamer/stats.py new file mode 100644 index 0000000..b1e2a24 --- /dev/null +++ b/restreamer/restreamer/stats.py @@ -0,0 +1,80 @@ + +import functools + +import prometheus_client as prom +from flask import request +from flask import g as request_store +from monotonic import monotonic + + +def stats(fn): + """Decorator that wraps a handler func to collect metrics. + Adds handler func args as labels, along with 'endpoint' label using func's name, + method and response status where applicable.""" + # We have to jump through some hoops here, because the prometheus client lib demands + # we pre-define our label names, but we don't know the names of the handler kwargs + # until the first time the function's called. So we delay defining the metrics until + # first call. + metrics = {} + endpoint = fn.__name__ + + @functools.wraps(fn) + def _stats(**kwargs): + if not metrics: + # first call, set up metrics + labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method'] + labels = labels_no_status + ['status'] + metrics['latency'] = prom.Histogram( + 'http_request_latency', + 'Time taken to run the request handler and create a response', + labels, + # buckets: very long playlists / cutting can be quite slow, + # so we have a wider range of latencies than default, up to 10min. + buckets=[.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600], + ) + metrics['size'] = prom.Histogram( + 'http_response_size', + 'Size in bytes of response body for non-chunked responses', + labels, + # buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...) + buckets=[4**i for i in range(16)], + ) + metrics['concurrent'] = prom.Gauge( + 'http_request_concurrency', + 'Number of requests currently ongoing', + labels_no_status, + ) + + request_store.metrics = metrics + request_store.labels = {k: str(v) for k, v in kwargs.items()} + request_store.labels.update(endpoint=endpoint, method=request.method) + metrics['concurrent'].labels(**request_store.labels).inc() + request_store.start_time = monotonic() + return fn(**kwargs) + + return _stats + + +def after_request(response): + """Must be registered to run after requests. Finishes tracking the request + and logs most of the metrics. + We do it in this way, instead of inside the stats wrapper, because it lets flask + normalize the handler result into a Response object. + """ + if 'metrics' not in request_store: + return # untracked handler + + end_time = monotonic() + metrics = request_store.metrics + labels = request_store.labels + start_time = request_store.start_time + + metrics['concurrent'].labels(**labels).dec() + + labels['status'] = str(response.status_code) + metrics['latency'].labels(**labels).observe(end_time - start_time) + size = response.calculate_content_length() + if size is not None: + metrics['size'].labels(**labels).observe(size) + + return response diff --git a/restreamer/setup.py b/restreamer/setup.py index 999c956..7e1344e 100644 --- a/restreamer/setup.py +++ b/restreamer/setup.py @@ -8,6 +8,7 @@ setup( "argh", "flask", "gevent", + "prometheus-client", "wubloader-common", ], )