From 78a9a4e5254017db95af2acf69c6465284298336 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 1 Jan 2019 14:16:20 -0800 Subject: [PATCH 1/9] Set up a docker compose file to run all images For ease-of-use, we use a jsonnet file to generate the yaml. Jsonnet is a language for generating JSON documents. In this case it's useful to us because it lets us have comments, references to settings defined at the top, and some basic logic like converting qualities from a list of strings to a comma-seperated string. To avoid requiring jsonnet to be installed, we use the official jsonnet docker image in the generate script. --- .gitignore | 2 +- README.md | 4 +++ docker-compose.jsonnet | 60 +++++++++++++++++++++++++++++++++++++++++ generate-docker-compose | 14 ++++++++++ 4 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 docker-compose.jsonnet create mode 100755 generate-docker-compose diff --git a/.gitignore b/.gitignore index 5b6b072..1120be9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -config.yaml +docker-compose.yml diff --git a/README.md b/README.md index 6ffcffe..d8d5d15 100644 --- a/README.md +++ b/README.md @@ -18,3 +18,7 @@ but a brief overview of the components: All components are built as docker images. Components which access the disk expect a shared directory mounted at `/mnt`. + +A docker-compose file is provided to run all components. See `docker-compose.jsonnet` +to set configuration options, then generate the compose file with `./generate-docker-compose`. +Then run `docker-compose up`. diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet new file mode 100644 index 0000000..a58044a --- /dev/null +++ b/docker-compose.jsonnet @@ -0,0 +1,60 @@ +// This is a jsonnet file, it generates a docker-compose.yml file. +// To generate, run "make docker-compose.yml". + +{ + + // These are the important top-level settings. + // Change these to configure the services. + + // Image tag (application version) to use. + // Note: "latest" is not reccomended in production, as you can't be sure what version + // you're actually running, and must manually re-pull to get an updated copy. + image_tag:: "latest", + + // Twitch channel to capture + channel:: "desertbus", + + // Stream qualities to capture in addition to source. + qualities:: ["480p"], + + // Local path to save segments to. Full path must already exist. Cannot contain ':'. + segments_path:: "/var/lib/wubloader/", + + // The host's port to expose the restreamer on. + restreamer_port:: 8080, + + + // Now for the actual docker-compose config + + // docker-compose version + version: "3", + + services: { + + downloader: { + image: "quay.io/ekimekim/wubloader-downloader:%s" % $.image_tag, + // Args for the downloader: set channel and qualities + command: [ + $.channel, + "--qualities", std.join(",", $.qualities), + ], + // Mount the segments directory at /mnt + volumes: ["%s:/mnt" % $.segments_path], + // If the application crashes, restart it. + restart: "on-failure", + }, + + restreamer: { + image: "quay.io/ekimekim/wubloader-restreamer:%s" % $.image_tag, + // Mount the segments directory at /mnt + volumes: ["%s:/mnt" % $.segments_path], + // If the application crashes, restart it. + restart: "on-failure", + // Expose on the configured host port by mapping that port to the default + // port for restreamer, which is 8000. + ports: ["%s:8000" % $.restreamer_port], + }, + + }, + +} diff --git a/generate-docker-compose b/generate-docker-compose new file mode 100755 index 0000000..8b00bb9 --- /dev/null +++ b/generate-docker-compose @@ -0,0 +1,14 @@ +#!/bin/bash + +set -eu + +# We generate first, and capture the output, to avoid overwriting the file on error. +# To avoid jsonnet needing to exist locally, we run it in a container. +output=$(docker run -i sparkprime/jsonnet - < docker-compose.jsonnet) + +{ + echo "# DO NOT EDIT THIS FILE!" + echo "# This file is generated from docker-compose.jsonnet" + echo "# It can be generated by running ./generate-docker-compose" + echo "$output" +} > docker-compose.yml From 6815924097a38535175d49f5de11762d4676a147 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 1 Jan 2019 14:27:08 -0800 Subject: [PATCH 2/9] Fix some bugs and linter errors introduced by backfiller I ran `pyflakes` on the repo and found these bugs: ``` ./common/common.py:289: undefined name 'random' ./downloader/downloader/main.py:7: 'random' imported but unused ./backfiller/backfiller/main.py:150: undefined name 'variant' ./backfiller/backfiller/main.py:158: undefined name 'timedelta' ./backfiller/backfiller/main.py:171: undefined name 'sort' ./backfiller/backfiller/main.py:173: undefined name 'sort' ``` (ok, the "imported but unused" one isn't a bug, but the rest are) This fixes those, as well as a further issue I saw with sorting of hours. Iterables are not sortable. As an obvious example, what if your iterable was infinite? As a result, any attempt to sort an iterable that is not already a friendly type like a list or tuple will result in an error. We avoid this by coercing to list, fully realising the iterable and putting it into a form that python will let us sort. It also avoids the nasty side-effect of mutating the list that gets passed into us, which the caller may not expect. Consider this example: ``` >>> my_hours = ["one", "two", "three"] >>> print my_hours ["one", "two", "three"] >>> backfill_node(base_dir, node, stream, variants, hours=my_hours, order='forward') >>> print my_hours ["one", "three", "two"] ``` Also, one of the linter errors was non-trivial to fix - we were trying to get a list of hours (which is an api call for a particular variant), but at a time when we weren't dealing with a single variant. My solution was to get a list of hours for ALL variants, and take the union. --- backfiller/backfiller/main.py | 14 +++++++++----- common/common.py | 1 + downloader/downloader/main.py | 1 - 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index dadbef5..f4ecc85 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -147,15 +147,19 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, seconds to prioritise letting the downloader grab these segments.""" if hours is None: - hours = list_remote_hours(node, stream, variant) + # gather all available hours from all variants and take the union + hours = set().union(*[ + list_remote_hours(node, stream, variant) + for variant in variants + ]) elif is_iterable(hours): - pass # hours already in desired format + hours = list(hours) # coerce to list so it can be sorted else: n_hours = hours if n_hours < 1: raise ValueError('Number of hours has to be 1 or greater') now = datetime.datetime.utcnow() - hours = [(now - i * timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] + hours = [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)] if start is not None: hours = [hour for hour in hours if hour >= start] @@ -168,9 +172,9 @@ def backfill_node(base_dir, node, stream, variants, hours=None, start=None, if order == 'random': random.shuffle(hours) elif order == 'forward': - sort(hours) + hours.sort() elif order == 'reverse': - sort(hours, reverse=True) + hours.sort(reverse=True) for variant in variants: diff --git a/common/common.py b/common/common.py index acb6b08..ba8f48d 100644 --- a/common/common.py +++ b/common/common.py @@ -8,6 +8,7 @@ import errno import itertools import logging import os +import random import sys from collections import namedtuple diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index d06cc50..0a068a7 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -4,7 +4,6 @@ import errno import hashlib import logging import os -import random import signal import sys import uuid From 6bf709287a7212abda1234494f9e5f40c4d8b7cb Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 2 Jan 2019 06:36:27 -0800 Subject: [PATCH 3/9] cutter: Introduce an alternate cutting approach that is much faster This cutter works by only cutting the first and last segments to size, then concatting them with the other segments, so we only ever process a few seconds of video instead of the entire video duration. However, to make this work, care must be taken that the cut segments use the same codecs as the other segments. The reason it's experimental is that we are not yet confident in its ability to cut accurately and without sync issues. We have seen some minor issues when trying to play back the raw output files, but youtube's re-encoding has consistently smoothed out those issues and they seem to be highly player-specific. Vigorous testing is needed. Also note that both methods right now (cat then cut, and cut then cat) only work if all the segments are cattable, that is they all use the same codecs, have the same resolution, etc. If a stream were to change its encoding settings, and we were cutting over that change, both approaches would not work. We should add checks for that scenario (which can only happen over a stream drop), and if so fallback to a slow method using ffmpeg's concat filter, which will work even for disparate codecs, though reconciling mismatched resolutions or frame rates may require further work. --- restreamer/restreamer/main.py | 91 +++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 1e01d08..657a926 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -7,6 +7,7 @@ import logging import os import shutil import signal +from contextlib import closing import dateutil.parser import gevent @@ -198,6 +199,8 @@ def cut(stream, variant): if any holes are detected, rather than producing a video with missing parts. Set to true by passing "true" (case insensitive). Even if holes are allowed, a 406 may result if the resulting video would be empty. + experimental: Optional, default false. If true, use the new, much faster, but experimental + method of cutting. """ start = dateutil.parser.parse(request.args['start']) end = dateutil.parser.parse(request.args['end']) @@ -231,6 +234,11 @@ def cut(stream, variant): # finally, calculate actual output duration, which is what ffmpeg will use duration = full_duration - cut_start - cut_end + # possibly defer to experiemntal version now that we've parsed our inputs. + # we'll clean up this whole flow later. + if request.args.get('experimental') == 'true': + return cut_experimental(segments, cut_start, cut_end) + def feed_input(pipe): # pass each segment into ffmpeg's stdin in order, while outputing everything on stdout. for segment in segments: @@ -277,6 +285,89 @@ def cut(stream, variant): return Response(_cut(), mimetype='video/MP2T') +def cut_experimental(segments, cut_start, cut_end): + """Experimental cutting method where we cut the first and last segments only, + then cat them all together.""" + # Note: assumes codecs don't change from segment to segment. + + def streams_info(segment): + """Return ffprobe's info on streams as a list of dicts""" + output = subprocess.check_output(['ffprobe', '-of', 'json', '-show_streams', segment.path]) + return json.loads(output)['streams'] + + def ffmpeg(segment, cut_start=None, cut_end=None): + """Return a Popen object which is ffmpeg cutting the given segment""" + args = ['ffmpeg', '-i', segment.path] + # output from ffprobe is generally already sorted but let's be paranoid, + # because the order of map args matters. + for stream in sorted(streams_info(segment), key=lambda stream: stream['index']): + # map the same stream in the same position from input to output + args += ['-map', '0:{}'.format(stream['index'])] + if stream['codec_type'] in ('video', 'audio'): + # for non-metadata streams, make sure we use the same codec (metadata streams + # are a bit weirder, and ffmpeg will do the right thing anyway) + args += ['-codec:{}'.format(stream['index']), stream['codec_name']] + # now add trim args + if cut_start: + args += ['-ss', str(cut_start)] + if cut_end: + args += ['-to', str(cut_end)] + # output to stdout as MPEG-TS + args += ['-f', 'mpegts', '-'] + # run it + logging.info("Running segment cut with args: {}".format(" ".join(args))) + return subprocess.Popen(args, stdout=subprocess.PIPE) + + def chunks(fileobj, chunk_size=16*1024): + """Read fileobj until EOF, yielding chunk_size sized chunks of data.""" + while True: + chunk = fileobj.read(chunk_size) + if not chunk: + break + yield chunk + + def _cut(): + # set first and last only if they actually need cutting + first = segments[0] if cut_start else None + last = segments[-1] if cut_end else None + for segment in segments: + # note first and last might be the same segment. + # note a segment will only match if cutting actually needs to be done + # (ie. cut_start or cut_end is not 0) + if segment in (first, last): + proc = None + try: + proc = ffmpeg( + segment, + cut_start if segment == first else None, + cut_end if segment == last else None, + ) + with closing(proc.stdout): + for chunk in chunks(proc.stdout): + yield chunk + proc.wait() + except Exception: + # try to clean up proc, ignoring errors + try: + proc.kill() + except OSError: + pass + else: + # check if ffmpeg had errors + if proc.returncode != 0: + raise Exception( + "Error while streaming cut: ffmpeg exited {}".format(proc.returncode) + ) + else: + # no cutting needed, just serve the file + with open(segment.path) as f: + for chunk in chunks(f): + yield chunk + + return Response(_cut(), mimetype='video/MP2T') + + + def main(host='0.0.0.0', port=8000, base_dir='.'): app.static_folder = base_dir server = WSGIServer((host, port), cors(app)) From c8cc4a68a06423b8b3e21b7e6259236c1b11dd74 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 2 Jan 2019 07:13:27 -0800 Subject: [PATCH 4/9] cutter: Fix bugs that meant things wouldn't actually be cut The calculations were backwards, so instead of cutting a video by, say, 2 seconds, it would cut by -2 seconds, which was clamped to 0. So it would never actually cut, it would always use the closest segment. Also, once we were actually cutting, we hit an issue where ffmpeg would finish and close its input early, because we'd reached the end of the cut video, but not all input had been written yet. This resulted in an EPIPE error (write to closed pipe) in the input feeder. We now ignore that. --- restreamer/restreamer/main.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 657a926..7fba30b 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -226,11 +226,11 @@ def cut(stream, variant): return "We have no content available within the requested time range.", 406 # how far into the first segment to begin - cut_start = max(0, (segments[0].start - start).total_seconds()) + cut_start = max(0, (start - segments[0].start).total_seconds()) # calculate full uncut duration of content, ie. without holes. full_duration = sum(segment.duration.total_seconds() for segment in segments) # calculate how much of final segment should be cut off - cut_end = max(0, (end - segments[-1].end).total_seconds()) + cut_end = max(0, (segments[-1].end - end).total_seconds()) # finally, calculate actual output duration, which is what ffmpeg will use duration = full_duration - cut_start - cut_end @@ -243,7 +243,12 @@ def cut(stream, variant): # pass each segment into ffmpeg's stdin in order, while outputing everything on stdout. for segment in segments: with open(segment.path) as f: - shutil.copyfileobj(f, pipe) + try: + shutil.copyfileobj(f, pipe) + except OSError as e: + # ignore EPIPE, as this just means the end cut meant we didn't need all input + if e.errno != errno.EPIPE: + raise pipe.close() def _cut(): From 57e665df2e2a5663dc28d2113b3713b0cdeb35bb Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 3 Jan 2019 17:59:39 -0800 Subject: [PATCH 5/9] generate-docker-compose: Clean up the container afterwards I'll never understand why this isn't the default, docker. --- generate-docker-compose | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/generate-docker-compose b/generate-docker-compose index 8b00bb9..108d190 100755 --- a/generate-docker-compose +++ b/generate-docker-compose @@ -4,7 +4,7 @@ set -eu # We generate first, and capture the output, to avoid overwriting the file on error. # To avoid jsonnet needing to exist locally, we run it in a container. -output=$(docker run -i sparkprime/jsonnet - < docker-compose.jsonnet) +output=$(docker run --rm -i sparkprime/jsonnet - < docker-compose.jsonnet) { echo "# DO NOT EDIT THIS FILE!" From a628676e7433afaef70aec90f91e18e93ddb3334 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 3 Jan 2019 19:16:29 -0800 Subject: [PATCH 6/9] downloader: Log to subloggers instead of the root logger This gives us some context when logging, and is best practice. --- downloader/downloader/main.py | 78 ++++++++++++++++++--------------- downloader/downloader/twitch.py | 9 ++-- 2 files changed, 49 insertions(+), 38 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 0a068a7..8783eb0 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -90,6 +90,7 @@ class StreamsManager(object): def __init__(self, channel, base_dir, qualities): self.channel = channel + self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.base_dir = base_dir self.stream_workers = {name: [] for name in qualities + ["source"]} # {stream name: [workers]} self.latest_urls = {} # {stream name: (fetch time, url)} @@ -102,7 +103,7 @@ class StreamsManager(object): and any older workers are safe to stop.""" workers = self.stream_workers[worker.stream] if worker not in workers: - logging.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers)) + self.logger.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers)) return # stop everything older than given worker for old in workers[:workers.index(worker)]: @@ -116,12 +117,12 @@ class StreamsManager(object): """ workers = self.stream_workers[worker.stream] if worker not in workers: - logging.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers)) + self.logger.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers)) return if worker is not workers[-1]: - logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) + self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) return - logging.info("Starting new worker for {} by request of old worker".format(worker.stream)) + self.logger.info("Starting new worker for {} by request of old worker".format(worker.stream)) self.wait_for_new_url(worker.stream, worker.url) self.start_worker(worker.stream) self.trigger_refresh() @@ -146,13 +147,13 @@ class StreamsManager(object): new_time, new_url = self.latest_urls[stream] if new_url != old_url: return - logging.info("Triggering master playlist refresh as we need a new url") + self.logger.info("Triggering master playlist refresh as we need a new url") self.trigger_refresh() self.latest_urls_changed.wait() def stop(self): """Shut down all workers and stop capturing stream.""" - logging.info("Stopping streams manager") + self.logger.info("Stopping") self.stopping.set() def start_worker(self, stream): @@ -160,7 +161,7 @@ class StreamsManager(object): # it's possible for fetch_latest to call us after we've started stopping, # in that case do nothing. if self.stopping.is_set(): - logging.info("Ignoring worker start as we are stopping") + self.logger.info("Ignoring worker start as we are stopping") return url_time, url = self.latest_urls[stream] worker = StreamWorker(self, stream, url, url_time) @@ -171,7 +172,7 @@ class StreamsManager(object): """Re-fetch master playlist and start new workers if needed""" try: # Fetch playlist. On soft timeout, retry. - logging.info("Fetching master playlist") + self.logger.info("Fetching master playlist") fetch_time = monotonic() with soft_hard_timeout("fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): master_playlist = twitch.get_master_playlist(self.channel) @@ -180,23 +181,23 @@ class StreamsManager(object): for stream, workers in self.stream_workers.items(): # warn and retry if the url is missing if stream not in new_urls: - logging.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream)) + self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream)) self.trigger_refresh() # is it newly found? if not workers and stream in self.latest_urls: - logging.info("Starting new worker for {} as none exist".format(stream)) + self.logger.info("Starting new worker for {} as none exist".format(stream)) self.start_worker(stream) latest_worker = workers[-1] # is the old worker too old? if latest_worker.age() > self.MAX_WORKER_AGE: - logging.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) + self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) self.start_worker(stream) except Exception as e: if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404: - logging.info("Stream is not up. Retrying.") + self.logger.info("Stream is not up. Retrying.") self.trigger_refresh() else: - logging.exception("Failed to fetch master playlist") + self.logger.exception("Failed to fetch master playlist") # don't retry on hard timeout as we already retried on soft timeout if not isinstance(e, TimedOutError): self.trigger_refresh() @@ -209,7 +210,7 @@ class StreamsManager(object): self.MAX_WORKER_AGE - workers[-1].age() for workers in self.stream_workers.values() if workers ] or [0])) - logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) + self.logger.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age)) # wait until refresh triggered, next max age reached, or we're stopping (whichever happens first) gevent.wait([self.stopping, self.refresh_needed], timeout=time_to_next_max_age, count=1) if not self.stopping.is_set(): @@ -217,7 +218,7 @@ class StreamsManager(object): gevent.spawn(self.fetch_latest) # wait min retry interval with jitter, unless we're stopping self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL)) - logging.info("Stopping workers") + self.logger.info("Stopping workers") for workers in self.stream_workers.values(): for worker in workers: worker.stop() @@ -248,6 +249,7 @@ class StreamWorker(object): def __init__(self, manager, stream, url, url_time): self.manager = manager + self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(stream, id(self))) self.stream = stream self.url = url self.url_time = url_time @@ -268,14 +270,14 @@ class StreamWorker(object): self.stopping.set() def run(self): - logging.info("Worker {} starting".format(self)) + self.logger.info("Worker starting") try: self._run() except Exception: - logging.exception("Worker {} failed".format(self)) + self.logger.exception("Worker failed") self.trigger_new_worker() else: - logging.info("Worker {} stopped".format(self)) + self.logger.info("Worker stopped") finally: for getter in self.getters.values(): getter.done.wait() @@ -293,18 +295,18 @@ class StreamWorker(object): first = True while not self.stopping.is_set(): - logging.debug("{} getting media playlist {}".format(self, self.url)) + self.logger.debug("Getting media playlist {}".format(self.url)) try: with soft_hard_timeout("getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): playlist = twitch.get_media_playlist(self.url) except Exception as e: - logging.warning("{} failed to fetch media playlist {}".format(self, self.url), exc_info=True) + self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() if first: - logging.warning("{} failed on first fetch, stopping".format(self)) + self.logger.warning("Failed on first fetch, stopping") self.stop() elif isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 403: - logging.warning("{} failed with 403 Forbidden, stopping".format(self)) + self.logger.warning("Failed with 403 Forbidden, stopping") self.stop() self.wait(self.FETCH_RETRY_INTERVAL) continue @@ -320,7 +322,7 @@ class StreamWorker(object): if segment.uri not in self.getters: if date is None: raise ValueError("Cannot determine date of segment") - self.getters[segment.uri] = SegmentGetter(self.manager.base_dir, self.manager.channel, self.stream, segment, date) + self.getters[segment.uri] = SegmentGetter(self.logger, self.manager.base_dir, self.manager.channel, self.stream, segment, date) gevent.spawn(self.getters[segment.uri].run) if date is not None: date += datetime.timedelta(seconds=segment.duration) @@ -335,7 +337,7 @@ class StreamWorker(object): # Stop if end-of-stream if playlist.is_endlist: - logging.info("{} stopping due to end-of-playlist".format(self)) + self.logger.info("Stopping due to end-of-playlist") # Trigger a new worker for when the stream comes back up. # In the short term this will cause some thrashing until the master playlist # starts returning 404, but it's the best way to avoid missing anything @@ -364,7 +366,8 @@ class SegmentGetter(object): FETCH_HEADERS_TIMEOUTS = 5, 30 FETCH_FULL_TIMEOUTS = 15, 240 - def __init__(self, base_dir, channel, stream, segment, date): + def __init__(self, parent_logger, base_dir, channel, stream, segment, date): + self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel self.stream = stream @@ -380,7 +383,7 @@ class SegmentGetter(object): try: self._run() except Exception: - logging.exception("Failure in SegmentGetter {}".format(self.segment)) + self.logger.exception("Unexpected exception while getting segment {}, retrying".format(self.segment)) gevent.sleep(common.jitter(self.UNEXPECTED_FAILURE_RETRY)) else: break @@ -388,6 +391,7 @@ class SegmentGetter(object): self.done.set() def _run(self): + self.logger.debug("Getter started") while not self.exists(): self.retry = gevent.event.Event() worker = gevent.spawn(self.get_segment) @@ -398,6 +402,7 @@ class SegmentGetter(object): break # if retry not set, wait for FETCH_RETRY first self.retry.wait(common.jitter(self.FETCH_RETRY)) + self.logger.debug("Getter is done") def make_path_prefix(self): """Generate leading part of filepath which doesn't change with the hash.""" @@ -437,8 +442,6 @@ class SegmentGetter(object): def get_segment(self): - # save current value of self.retry so we can't set any later instance - # after a retry for this round has already occurred. try: self._get_segment() except Exception: @@ -448,13 +451,16 @@ class SegmentGetter(object): return True def _get_segment(self): + # save current value of self.retry so we can't set any later instance + # after a retry for this round has already occurred. + retry = self.retry temp_path = self.make_path("temp") hash = hashlib.sha256() file_created = False try: - logging.debug("Getting segment {}".format(self.segment)) - with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, self.retry.set): - with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, self.retry.set): + logging.debug("Downloading segment {} to {}".format(self.segment, temp_path)) + with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): + with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): resp = requests.get(self.segment.uri, stream=True) if resp.status_code == 403: logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) @@ -474,12 +480,14 @@ class SegmentGetter(object): # another exception in the interim ex_type, ex, tb = sys.exc_info() if file_created: - common.rename(temp_path, self.make_path("partial", hash)) + partial_path = self.make_path("partial", hash) + self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) + common.rename(temp_path, partial_path) raise ex_type, ex, tb else: - common.rename(temp_path, self.make_path("full", hash)) - - + full_path = self.make_path("full", hash) + self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) + common.rename(temp_path, full_path) def main(channel, base_dir=".", qualities=""): diff --git a/downloader/downloader/twitch.py b/downloader/downloader/twitch.py index 92358bb..ba37d6c 100644 --- a/downloader/downloader/twitch.py +++ b/downloader/downloader/twitch.py @@ -7,6 +7,9 @@ import requests import hls_playlist +logger = logging.getLogger(__name__) + + def get_master_playlist(channel): """Get the master playlist for given channel from twitch""" resp = requests.get( @@ -62,10 +65,10 @@ def get_media_playlist_uris(master_playlist, target_qualities): def variant_name(variant): names = set(media.name for media in variant.media if media.type == "VIDEO" and media.name) if not names: - logging.warning("Variant {} has no named video renditions, can't determine name".format(variant)) + logger.warning("Variant {} has no named video renditions, can't determine name".format(variant)) return None if len(names) > 1: - logging.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant)) + logger.warning("Variant {} has multiple possible names, picking one arbitrarily".format(variant)) return list(names)[0] if not master_playlist.playlists: @@ -73,7 +76,7 @@ def get_media_playlist_uris(master_playlist, target_qualities): for variant in master_playlist.playlists: if any(media.uri for media in variant.media): - logging.warning("Variant has a rendition with its own URI: {}".format(variant)) + logger.warning("Variant has a rendition with its own URI: {}".format(variant)) by_name = {variant_name(variant): variant for variant in master_playlist.playlists} From c0357680cffb7323fc05b6a50b6f4545220cd696 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 3 Jan 2019 19:18:43 -0800 Subject: [PATCH 7/9] downloader: Use caller's logger inside soft_hard_timeout --- downloader/downloader/main.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 8783eb0..0aac4d2 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -25,7 +25,7 @@ class TimedOutError(Exception): @contextmanager -def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout): +def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft_timeout): """Context manager that wraps a piece of code in a pair of timeouts, a "soft" timeout and a "hard" one. If the block does not complete before the soft timeout, the given on_soft_timeout() function is called in a new greenlet. @@ -47,7 +47,7 @@ def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout if finished: # We finished before soft timeout was hit return - logging.warning("Hit soft timeout {}s while {}".format(soft_timeout, description)) + logger.warning("Hit soft timeout {}s while {}".format(soft_timeout, description)) on_soft_timeout() gevent.spawn_later(soft_timeout, dispatch_soft_timeout) error = TimedOutError("Timed out after {}s while {}".format(hard_timeout, description)) @@ -174,7 +174,7 @@ class StreamsManager(object): # Fetch playlist. On soft timeout, retry. self.logger.info("Fetching master playlist") fetch_time = monotonic() - with soft_hard_timeout("fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): + with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): master_playlist = twitch.get_master_playlist(self.channel) new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys()) self.update_urls(fetch_time, new_urls) @@ -297,7 +297,7 @@ class StreamWorker(object): self.logger.debug("Getting media playlist {}".format(self.url)) try: - with soft_hard_timeout("getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): + with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): playlist = twitch.get_media_playlist(self.url) except Exception as e: self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) @@ -459,8 +459,8 @@ class SegmentGetter(object): file_created = False try: logging.debug("Downloading segment {} to {}".format(self.segment, temp_path)) - with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): - with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): + with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): + with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): resp = requests.get(self.segment.uri, stream=True) if resp.status_code == 403: logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) From 17972b87aaa409cc9194318fc44fd059173dd2dc Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 3 Jan 2019 19:28:45 -0800 Subject: [PATCH 8/9] Allow setting of log level via WUBLOADER_LOG_LEVEL env var By using an env var, it is universal and happens prior to arg parsing, at the same point we do other logging setup. --- downloader/downloader/__main__.py | 4 +++- restreamer/restreamer/__main__.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/downloader/downloader/__main__.py b/downloader/downloader/__main__.py index 45d813e..2f2fa3f 100644 --- a/downloader/downloader/__main__.py +++ b/downloader/downloader/__main__.py @@ -3,6 +3,7 @@ import gevent.monkey gevent.monkey.patch_all() import logging +import os import argh @@ -10,5 +11,6 @@ from downloader.main import main LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" -logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) +level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=level, format=LOG_FORMAT) argh.dispatch_command(main) diff --git a/restreamer/restreamer/__main__.py b/restreamer/restreamer/__main__.py index 24935eb..f0ad12b 100644 --- a/restreamer/restreamer/__main__.py +++ b/restreamer/restreamer/__main__.py @@ -3,6 +3,7 @@ import gevent.monkey gevent.monkey.patch_all() import logging +import os import argh @@ -10,5 +11,6 @@ from restreamer.main import main LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" -logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) +level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=level, format=LOG_FORMAT) argh.dispatch_command(main) From 7525b7c13570edd4f1caa42395fa96c811435529 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 3 Jan 2019 21:10:23 -0800 Subject: [PATCH 9/9] restreamer: Add basic prometheus stats to all endpoints I had to go to some effort to get nice labelling, which also meant none of the existing libs for this were any good, but this works well enough. Exposes the metrics on /metrics. --- restreamer/restreamer/main.py | 14 ++++++ restreamer/restreamer/stats.py | 80 ++++++++++++++++++++++++++++++++++ restreamer/setup.py | 1 + 3 files changed, 95 insertions(+) create mode 100644 restreamer/restreamer/stats.py diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 7fba30b..9b354ef 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -11,6 +11,7 @@ from contextlib import closing import dateutil.parser import gevent +import prometheus_client as prom from flask import Flask, url_for, request, abort, Response from gevent import subprocess from gevent.pywsgi import WSGIServer @@ -18,9 +19,11 @@ from gevent.pywsgi import WSGIServer from common import get_best_segments import generate_hls +from stats import stats, after_request app = Flask('restreamer', static_url_path='/segments') +app.after_request(after_request) """ @@ -80,6 +83,13 @@ def cors(app): return handle +@app.route('/metrics') +@stats +def metrics(): + """Return current metrics in prometheus metrics format""" + return prom.generate_latest() + + @app.route('/files//') @has_path_args def list_hours(stream, variant): @@ -95,6 +105,7 @@ def list_hours(stream, variant): @app.route('/files///') +@stats @has_path_args def list_segments(stream, variant, hour): """Returns a JSON list of segment files for a given stream, variant and hour. @@ -121,6 +132,7 @@ def time_range_for_variant(stream, variant): @app.route('/playlist/.m3u8') +@stats @has_path_args def generate_master_playlist(stream): """Returns a HLS master playlist for the given stream. @@ -150,6 +162,7 @@ def generate_master_playlist(stream): @app.route('/playlist//.m3u8') +@stats @has_path_args def generate_media_playlist(stream, variant): """Returns a HLS media playlist for the given stream and variant. @@ -189,6 +202,7 @@ def generate_media_playlist(stream, variant): @app.route('/cut//.ts') +@stats @has_path_args def cut(stream, variant): """Return a MPEGTS video file covering the exact timestamp range. diff --git a/restreamer/restreamer/stats.py b/restreamer/restreamer/stats.py new file mode 100644 index 0000000..b1e2a24 --- /dev/null +++ b/restreamer/restreamer/stats.py @@ -0,0 +1,80 @@ + +import functools + +import prometheus_client as prom +from flask import request +from flask import g as request_store +from monotonic import monotonic + + +def stats(fn): + """Decorator that wraps a handler func to collect metrics. + Adds handler func args as labels, along with 'endpoint' label using func's name, + method and response status where applicable.""" + # We have to jump through some hoops here, because the prometheus client lib demands + # we pre-define our label names, but we don't know the names of the handler kwargs + # until the first time the function's called. So we delay defining the metrics until + # first call. + metrics = {} + endpoint = fn.__name__ + + @functools.wraps(fn) + def _stats(**kwargs): + if not metrics: + # first call, set up metrics + labels_no_status = sorted(kwargs.keys()) + ['endpoint', 'method'] + labels = labels_no_status + ['status'] + metrics['latency'] = prom.Histogram( + 'http_request_latency', + 'Time taken to run the request handler and create a response', + labels, + # buckets: very long playlists / cutting can be quite slow, + # so we have a wider range of latencies than default, up to 10min. + buckets=[.001, .005, .01, .05, .1, .5, 1, 5, 10, 30, 60, 120, 300, 600], + ) + metrics['size'] = prom.Histogram( + 'http_response_size', + 'Size in bytes of response body for non-chunked responses', + labels, + # buckets: powers of 4 up to 1GiB (1, 4, 16, 64, 256, 1Ki, 4Ki, ...) + buckets=[4**i for i in range(16)], + ) + metrics['concurrent'] = prom.Gauge( + 'http_request_concurrency', + 'Number of requests currently ongoing', + labels_no_status, + ) + + request_store.metrics = metrics + request_store.labels = {k: str(v) for k, v in kwargs.items()} + request_store.labels.update(endpoint=endpoint, method=request.method) + metrics['concurrent'].labels(**request_store.labels).inc() + request_store.start_time = monotonic() + return fn(**kwargs) + + return _stats + + +def after_request(response): + """Must be registered to run after requests. Finishes tracking the request + and logs most of the metrics. + We do it in this way, instead of inside the stats wrapper, because it lets flask + normalize the handler result into a Response object. + """ + if 'metrics' not in request_store: + return # untracked handler + + end_time = monotonic() + metrics = request_store.metrics + labels = request_store.labels + start_time = request_store.start_time + + metrics['concurrent'].labels(**labels).dec() + + labels['status'] = str(response.status_code) + metrics['latency'].labels(**labels).observe(end_time - start_time) + size = response.calculate_content_length() + if size is not None: + metrics['size'].labels(**labels).observe(size) + + return response diff --git a/restreamer/setup.py b/restreamer/setup.py index 999c956..7e1344e 100644 --- a/restreamer/setup.py +++ b/restreamer/setup.py @@ -8,6 +8,7 @@ setup( "argh", "flask", "gevent", + "prometheus-client", "wubloader-common", ], )