From b4655f18c61fda2e29c758c5dfb6aaffbbf33a5c Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sat, 29 Jun 2019 15:54:04 -0700 Subject: [PATCH 1/8] downloader: Track total duration of downloaded segments --- downloader/downloader/main.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 26adcd8..4a42d2b 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -29,6 +29,13 @@ segments_downloaded = prom.Counter( ["partial", "channel", "quality"], ) +segment_duration_downloaded = prom.Counter( + "segment_duration_downloaded", + "Total duration of all segments partially or fully downloaded. " + "Note partial segments still count the full duration.", + ["partial", "stream", "variant"], +) + latest_segment = prom.Gauge( "latest_segment", "Timestamp of the time of the newest segment fully downloaded", @@ -74,8 +81,6 @@ def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft finished = True - - class StreamsManager(object): """Keeps track of what qualities are being downloaded and the workers doing so. Re-fetches master playlist when needed and starts new stream workers. @@ -542,12 +547,14 @@ class SegmentGetter(object): self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) common.rename(temp_path, partial_path) segments_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc() + segment_duration_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc(self.segment.duration) raise ex_type, ex, tb else: full_path = self.make_path("full", hash) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) common.rename(temp_path, full_path) segments_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc() + segment_duration_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc(self.segment.duration) # Prom doesn't provide a way to compare value to gauge's existing value, # we need to reach into internals stat = latest_segment.labels(channel=self.channel, quality=self.quality) From a2edb38a8595ec7d1551e951b43d95836a0e0ff8 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sat, 29 Jun 2019 18:36:13 -0700 Subject: [PATCH 2/8] Add an InstrumentedSession wrapper that automatically gathers metrics on http calls --- common/common/requests.py | 51 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 common/common/requests.py diff --git a/common/common/requests.py b/common/common/requests.py new file mode 100644 index 0000000..59896e5 --- /dev/null +++ b/common/common/requests.py @@ -0,0 +1,51 @@ + +"""Code for instrumenting requests calls. Requires requests, obviously.""" + +import urlparse + +import requests +import prometheus_client as prom +from monotonic import monotonic + +request_latency = prom.Histogram( + 'http_client_request_latency', + 'Time taken to make an outgoing HTTP request. ' + 'Status = "error" is used if an error occurs. Measured as time from first byte sent to ' + 'headers finished being parsed, ie. does not include reading a streaming response.', + ['name', 'method', 'domain', 'status'], +) + +response_size = prom.Histogram( + 'http_client_response_size', + "The content length of (non-streaming) responses to outgoing HTTP requests.", + ['name', 'method', 'domain', 'status'], +) + +request_concurrency = prom.Gauge( + 'http_client_request_concurrency', + "The number of outgoing HTTP requests currently ongoing", + ['name', 'method', 'domain'], +) + +class InstrumentedSession(requests.Session): + """A requests Session that automatically records metrics on requests made. + Users may optionally pass a 'metric_name' kwarg that will be included as the 'name' label. + """ + + def request(self, method, url, *args, **kwargs): + _, domain, _, _, _ = urlparse.urlsplit(url) + name = kwargs.pop('metric_name', '') + + start = monotonic() # we only use our own measured latency if an error occurs + try: + with request_concurrency.labels(name, method, domain).track_inprogress(): + response = super(InstrumentedSession, self).request(method, url, *args, **kwargs) + except Exception: + latency = monotonic() - start + request_latency.labels(name, method, domain, "error").observe(latency) + raise + + request_latency.labels(name, method, domain, response.status_code).observe(response.elapsed) + if 'content-length' in response.headers: + response_size.labels(name, method, domain, response.status_code).observe(response.headers['content-length']) + return response From d63ae573b78f161f6b21c1390bbeb766f89610f5 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 31 Oct 2019 17:09:11 -0700 Subject: [PATCH 3/8] backfiller: Collect metrics on http calls --- backfiller/backfiller/main.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index be0a957..1a1f742 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -15,11 +15,14 @@ import argh import gevent.backdoor import gevent.pool import prometheus_client as prom -import requests import common from common import dateutil from common import database +from common.requests import InstrumentedSession + +# Wraps all requests in some metric collection +requests = InstrumentedSession() segments_backfilled = prom.Counter( 'segments_backfilled', @@ -103,7 +106,7 @@ def list_remote_hours(node, channel, quality, timeout=TIMEOUT): """Wrapper around a call to restreamer.list_hours.""" uri = '{}/files/{}/{}'.format(node, channel, quality) logging.debug('Getting list of hours from {}'.format(uri)) - resp = requests.get(uri, timeout=timeout) + resp = requests.get(uri, timeout=timeout, metric_name='list_remote_hours') return common.encode_strings(resp.json()) @@ -111,7 +114,7 @@ def list_remote_segments(node, channel, quality, hour, timeout=TIMEOUT): """Wrapper around a call to restreamer.list_segments.""" uri = '{}/files/{}/{}/{}'.format(node, channel, quality, hour) logging.debug('Getting list of segments from {}'.format(uri)) - resp = requests.get(uri, timeout=timeout) + resp = requests.get(uri, timeout=timeout, metric_name='list_remote_segments') return common.encode_strings(resp.json()) @@ -140,7 +143,7 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment, try: logging.debug('Fetching segment {} from {}'.format(path, node)) uri = '{}/segments/{}/{}/{}/{}'.format(node, channel, quality, hour, missing_segment) - resp = requests.get(uri, stream=True, timeout=timeout) + resp = requests.get(uri, stream=True, timeout=timeout, metric_name='get_remote_segment') resp.raise_for_status() From 52e6c4ad41ab5ea6fd3331cd22e4efb49fda827f Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 31 Oct 2019 17:15:53 -0700 Subject: [PATCH 4/8] sheetsync, cutter: Collect metrics on http calls In particular, to google apis. --- common/common/googleapis.py | 8 ++++++-- cutter/cutter/upload_backends.py | 4 +++- sheetsync/sheetsync/sheets.py | 2 ++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/common/common/googleapis.py b/common/common/googleapis.py index b2b898d..51c59df 100644 --- a/common/common/googleapis.py +++ b/common/common/googleapis.py @@ -2,7 +2,11 @@ import time import gevent -import requests + +from .requests import InstrumentedSession + +# Wraps all requests in some metric collection +requests = InstrumentedSession() class GoogleAPIClient(object): @@ -40,7 +44,7 @@ class GoogleAPIClient(object): 'client_secret': self.client_secret, 'refresh_token': self.refresh_token, 'grant_type': 'refresh_token', - }) + }, metric_name='get_access_token') resp.raise_for_status() data = resp.json() self._access_token = data['access_token'] diff --git a/cutter/cutter/upload_backends.py b/cutter/cutter/upload_backends.py index d9ab1b0..734c4a6 100644 --- a/cutter/cutter/upload_backends.py +++ b/cutter/cutter/upload_backends.py @@ -143,13 +143,14 @@ class Youtube(UploadBackend): 'uploadType': 'resumable', }, json=json, + metric_name='create_video', ) if not resp.ok: # Don't retry, because failed calls still count against our upload quota. # The risk of repeated failed attempts blowing through our quota is too high. raise UploadError("Youtube create video call failed with {resp.status_code}: {resp.content}".format(resp=resp)) upload_url = resp.headers['Location'] - resp = self.client.request('POST', upload_url, data=data) + resp = self.client.request('POST', upload_url, data=data, metric_name='upload_video') if 400 <= resp.status_code < 500: # As above, don't retry. But with 4xx's we know the upload didn't go through. # On a 5xx, we can't be sure (the server is in an unspecified state). @@ -169,6 +170,7 @@ class Youtube(UploadBackend): 'part': 'id,status', 'id': ','.join(group), }, + metric_name='list_videos', ) resp.raise_for_status() for item in resp.json()['items']: diff --git a/sheetsync/sheetsync/sheets.py b/sheetsync/sheetsync/sheets.py index 63386b2..3294bf6 100644 --- a/sheetsync/sheetsync/sheets.py +++ b/sheetsync/sheetsync/sheets.py @@ -23,6 +23,7 @@ class Sheets(object): 'https://sheets.googleapis.com/v4/spreadsheets/{}/values/{}'.format( spreadsheet_id, range, ), + metric_name='get_rows', ) resp.raise_for_status() data = resp.json() @@ -46,6 +47,7 @@ class Sheets(object): "range": range, "values": [[value]], }, + metric_name='write_value', ) resp.raise_for_status() From 8ad61e98707118afbb31158f51d226b77e73dcc4 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 31 Oct 2019 17:23:47 -0700 Subject: [PATCH 5/8] downloader: Collect metrics on http calls --- downloader/downloader/main.py | 7 ++++--- downloader/downloader/twitch.py | 16 +++++++++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 4a42d2b..1f82aa6 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -21,6 +21,7 @@ from monotonic import monotonic import twitch import common import common.dateutil +import common.requests segments_downloaded = prom.Counter( @@ -285,7 +286,7 @@ class StreamWorker(object): # with our connection pool. # This worker's SegmentGetters will use its session by default for performance, # but will fall back to a new one if something goes wrong. - self.session = requests.Session() + self.session = common.requests.InstrumentedSession() def __repr__(self): return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) @@ -461,7 +462,7 @@ class SegmentGetter(object): break # Create a new session, so we don't reuse a connection from the old session # which had an error / some other issue. This is mostly just out of paranoia. - self.session = requests.Session() + self.session = common.requests.InstrumentedSession() # if retry not set, wait for FETCH_RETRY first self.retry.wait(common.jitter(self.FETCH_RETRY)) self.logger.debug("Getter is done") @@ -522,7 +523,7 @@ class SegmentGetter(object): self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): - resp = self.session.get(self.segment.uri, stream=True) + resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment') # twitch returns 403 for expired segment urls, and 404 for very old urls where the original segment is gone. # the latter can happen if we have a network issue that cuts us off from twitch for some time. if resp.status_code in (403, 404): diff --git a/downloader/downloader/twitch.py b/downloader/downloader/twitch.py index 94b60da..7124823 100644 --- a/downloader/downloader/twitch.py +++ b/downloader/downloader/twitch.py @@ -2,16 +2,18 @@ import logging import random -import requests - import hls_playlist +from common.requests import InstrumentedSession + logger = logging.getLogger(__name__) -def get_master_playlist(channel, session=requests): +def get_master_playlist(channel, session=None): """Get the master playlist for given channel from twitch""" + if session is None: + session = InstrumentedSession() resp = session.get( "https://api.twitch.tv/api/channels/{}/access_token.json".format(channel), params={'as3': 't'}, @@ -19,6 +21,7 @@ def get_master_playlist(channel, session=requests): 'Accept': 'application/vnd.twitchtv.v3+json', 'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j', }, + metric_name='get_access_token', ) resp.raise_for_status() # getting access token token = resp.json() @@ -42,6 +45,7 @@ def get_master_playlist(channel, session=requests): # in flux. Better to just blend in with the crowd for now. # "platform": "_" }, + metric_name='get_master_playlist', ) resp.raise_for_status() # getting master playlist playlist = hls_playlist.load(resp.text, base_uri=resp.url) @@ -96,7 +100,9 @@ def get_media_playlist_uris(master_playlist, target_qualities): return {name: variant.uri for name, variant in variants.items()} -def get_media_playlist(uri, session=requests): - resp = session.get(uri) +def get_media_playlist(uri, session=None): + if session is None: + session = InstrumentedSession() + resp = session.get(uri, metric_name='get_media_playlist') resp.raise_for_status() return hls_playlist.load(resp.text, base_uri=resp.url) From 4f900c5925476e5e56c123a7051e7c3afe0bf53e Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 31 Oct 2019 17:34:46 -0700 Subject: [PATCH 6/8] Collect metrics around cutting time --- common/common/segments.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/common/segments.py b/common/common/segments.py index 31746ed..26c36b4 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -328,6 +328,7 @@ def read_chunks(fileobj, chunk_size=16*1024): yield chunk +@timed('cut', type='fast', normalize=lambda _, segments, start, end: (end - start).total_seconds()) def fast_cut_segments(segments, start, end): """Yields chunks of a MPEGTS video file covering the exact timestamp range. segments should be a list of segments as returned by get_best_segments(). @@ -400,6 +401,7 @@ def fast_cut_segments(segments, start, end): yield chunk +@timed('cut', type='full', normalize=lambda _, segments, start, end, encode_args: (end - start).total_seconds()) def full_cut_segments(segments, start, end, encode_args): # how far into the first segment to begin cut_start = max(0, (start - segments[0].start).total_seconds()) From f9b48bc70eeb0683282d87942dd64e060f16fd4e Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 31 Oct 2019 17:51:42 -0700 Subject: [PATCH 7/8] cutter: Add more metrics --- cutter/cutter/main.py | 43 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index b399724..bbbe04d 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -17,22 +17,40 @@ from psycopg2 import sql import common from common.database import DBManager, query from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles +from common.stats import timed from .upload_backends import Youtube, Local, UploadError -videos_uploaded = prom.Counter( +videos_uploaded = prom.Counter( 'videos_uploaded', 'Number of videos successfully uploaded', ['video_channel', 'video_quality', 'upload_location'] ) -upload_errors = prom.Counter( +upload_errors = prom.Counter( 'upload_errors', 'Number of errors uploading a video', ['video_channel', 'video_quality', 'upload_location', 'final_state'] ) +no_candidates = prom.Counter( + 'no_candidates', + "Number of times we looked for candidate jobs but didn't find any", +) + +videos_transcoding = prom.Gauge( + 'videos_transcoding', + "Number of videos currently in transcoding", + ['location'], +) + +videos_marked_done = prom.Counter( + 'videos_marked_done', + "Number of videos we have successfully marked as done", + ['location'], +) + # A list of all the DB column names in CutJob CUT_JOB_PARAMS = [ "sheet_name", @@ -156,7 +174,6 @@ class Cutter(object): try: segments = self.check_candidate(candidate) except ContainsHoles: - # TODO metric self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate))) set_error( "Node {} does not have all the video needed to cut this row. " @@ -184,8 +201,10 @@ class Cutter(object): return CutJob(segments=segments, **candidate._asdict()) # No candidates + no_candidates.inc() self.wait(self.NO_CANDIDATES_RETRY_INTERVAL) + @timed() def list_candidates(self): """Return a list of all available candidates that we might be able to cut.""" # We only accept candidates if they haven't excluded us by whitelist, @@ -202,6 +221,7 @@ class Cutter(object): result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys()) return result.fetchall() + # No need to instrument this function, just use get_best_segments() stats def check_candidate(self, candidate): return get_best_segments( os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality), @@ -210,6 +230,10 @@ class Cutter(object): allow_holes=candidate.allow_holes, ) + @timed( + video_channel = lambda self, job: job.video_channel, + video_quality = lambda self, job: job.video_quality, + ) def claim_job(self, job): """Update event in DB to say we're working on it. If someone beat us to it, or it's changed, raise CandidateGone.""" @@ -445,13 +469,14 @@ class TranscodeChecker(object): FOUND_VIDEOS_RETRY_INTERVAL = 20 ERROR_RETRY_INTERVAL = 20 - def __init__(self, backend, dbmanager, stop): + def __init__(self, location, backend, dbmanager, stop): """ backend is an upload backend that supports transcoding and defines check_status(). Conn is a database connection. Stop is an Event triggering graceful shutdown when set. """ + self.location = location self.backend = backend self.dbmanager = dbmanager self.stop = stop @@ -469,11 +494,13 @@ class TranscodeChecker(object): if not ids: self.wait(self.NO_VIDEOS_RETRY_INTERVAL) continue + videos_transcoding.labels(self.location).set(len(ids)) self.logger.info("Found {} videos in TRANSCODING".format(len(ids))) ids = self.check_ids(ids) if ids: self.logger.info("{} videos are done".format(len(ids))) done = self.mark_done(ids) + videos_marked_done.labels(self.location).inc(done) self.logger.info("Marked {} videos as done".format(done)) self.wait(self.FOUND_VIDEOS_RETRY_INTERVAL) except Exception: @@ -585,7 +612,7 @@ def main( config = json.loads(config) upload_locations = {} - needs_transcode_check = [] + needs_transcode_check = {} for location, backend_config in config.items(): backend_type = backend_config.pop('type') no_transcode_check = backend_config.pop('no_transcode_check', False) @@ -604,12 +631,12 @@ def main( raise ValueError("Unknown cut type: {!r}".format(cut_type)) upload_locations[location] = backend if backend.needs_transcode and not no_transcode_check: - needs_transcode_check.append(backend) + needs_transcode_check[location] = backend cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir, tags) transcode_checkers = [ - TranscodeChecker(backend, dbmanager, stop) - for backend in needs_transcode_check + TranscodeChecker(location, backend, dbmanager, stop) + for location, backend in needs_transcode_check.items() ] jobs = [gevent.spawn(cutter.run)] + [ gevent.spawn(transcode_checker.run) From c740090c5352ad1a5d6f23b7de7c4789e0cd870f Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 31 Oct 2019 18:00:33 -0700 Subject: [PATCH 8/8] sheetsync: Add more metrics --- sheetsync/sheetsync/main.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sheetsync/sheetsync/main.py b/sheetsync/sheetsync/main.py index 27b612d..9e877a7 100644 --- a/sheetsync/sheetsync/main.py +++ b/sheetsync/sheetsync/main.py @@ -28,6 +28,18 @@ sync_errors = prom.Counter( 'Number of errors syncing sheets', ) +rows_found = prom.Counter( + 'rows_found', + 'Number of rows that sheetsync looked at with an id', + ['worksheet'], +) + +rows_changed = prom.Counter( + 'rows_changed', + 'Number of rows that needed changes applied, with type=insert, type=input or type=output', + ['type', 'worksheet'], +) + class SheetSync(object): # Time between syncs @@ -202,8 +214,12 @@ class SheetSync(object): sql.SQL(", ").join(sql.Placeholder(col) for col in insert_cols), ) query(self.conn, built_query, sheet_name=worksheet, **row) + rows_found(worksheet).inc() + rows_changed('insert', worksheet).inc() return + rows_found(worksheet).inc() + # Update database with any changed inputs changed = [col for col in self.input_columns if row[col] != getattr(event, col)] if changed: @@ -220,6 +236,7 @@ class SheetSync(object): ) for col in changed )) query(self.conn, built_query, **row) + rows_changed('input', worksheet).inc() # Update sheet with any changed outputs format_output = lambda v: '' if v is None else v # cast nulls to empty string @@ -234,6 +251,7 @@ class SheetSync(object): row_index, self.column_map[col], format_output(getattr(event, col)), ) + rows_changed('output', worksheet).inc() # Set edit link if marked for editing and start/end set. # This prevents accidents / clicking the wrong row and provides