Merge pull request #133 from ekimekim/mike/more-metrics

Add lots of metrics
pull/136/head
Mike Lang 5 years ago committed by GitHub
commit 981b89551e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,11 +15,14 @@ import argh
import gevent.backdoor import gevent.backdoor
import gevent.pool import gevent.pool
import prometheus_client as prom import prometheus_client as prom
import requests
import common import common
from common import dateutil from common import dateutil
from common import database from common import database
from common.requests import InstrumentedSession
# Wraps all requests in some metric collection
requests = InstrumentedSession()
segments_backfilled = prom.Counter( segments_backfilled = prom.Counter(
'segments_backfilled', 'segments_backfilled',
@ -103,7 +106,7 @@ def list_remote_hours(node, channel, quality, timeout=TIMEOUT):
"""Wrapper around a call to restreamer.list_hours.""" """Wrapper around a call to restreamer.list_hours."""
uri = '{}/files/{}/{}'.format(node, channel, quality) uri = '{}/files/{}/{}'.format(node, channel, quality)
logging.debug('Getting list of hours from {}'.format(uri)) logging.debug('Getting list of hours from {}'.format(uri))
resp = requests.get(uri, timeout=timeout) resp = requests.get(uri, timeout=timeout, metric_name='list_remote_hours')
return common.encode_strings(resp.json()) return common.encode_strings(resp.json())
@ -111,7 +114,7 @@ def list_remote_segments(node, channel, quality, hour, timeout=TIMEOUT):
"""Wrapper around a call to restreamer.list_segments.""" """Wrapper around a call to restreamer.list_segments."""
uri = '{}/files/{}/{}/{}'.format(node, channel, quality, hour) uri = '{}/files/{}/{}/{}'.format(node, channel, quality, hour)
logging.debug('Getting list of segments from {}'.format(uri)) logging.debug('Getting list of segments from {}'.format(uri))
resp = requests.get(uri, timeout=timeout) resp = requests.get(uri, timeout=timeout, metric_name='list_remote_segments')
return common.encode_strings(resp.json()) return common.encode_strings(resp.json())
@ -140,7 +143,7 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
try: try:
logging.debug('Fetching segment {} from {}'.format(path, node)) logging.debug('Fetching segment {} from {}'.format(path, node))
uri = '{}/segments/{}/{}/{}/{}'.format(node, channel, quality, hour, missing_segment) uri = '{}/segments/{}/{}/{}/{}'.format(node, channel, quality, hour, missing_segment)
resp = requests.get(uri, stream=True, timeout=timeout) resp = requests.get(uri, stream=True, timeout=timeout, metric_name='get_remote_segment')
resp.raise_for_status() resp.raise_for_status()

@ -2,7 +2,11 @@
import time import time
import gevent import gevent
import requests
from .requests import InstrumentedSession
# Wraps all requests in some metric collection
requests = InstrumentedSession()
class GoogleAPIClient(object): class GoogleAPIClient(object):
@ -40,7 +44,7 @@ class GoogleAPIClient(object):
'client_secret': self.client_secret, 'client_secret': self.client_secret,
'refresh_token': self.refresh_token, 'refresh_token': self.refresh_token,
'grant_type': 'refresh_token', 'grant_type': 'refresh_token',
}) }, metric_name='get_access_token')
resp.raise_for_status() resp.raise_for_status()
data = resp.json() data = resp.json()
self._access_token = data['access_token'] self._access_token = data['access_token']

@ -0,0 +1,51 @@
"""Code for instrumenting requests calls. Requires requests, obviously."""
import urlparse
import requests
import prometheus_client as prom
from monotonic import monotonic
request_latency = prom.Histogram(
'http_client_request_latency',
'Time taken to make an outgoing HTTP request. '
'Status = "error" is used if an error occurs. Measured as time from first byte sent to '
'headers finished being parsed, ie. does not include reading a streaming response.',
['name', 'method', 'domain', 'status'],
)
response_size = prom.Histogram(
'http_client_response_size',
"The content length of (non-streaming) responses to outgoing HTTP requests.",
['name', 'method', 'domain', 'status'],
)
request_concurrency = prom.Gauge(
'http_client_request_concurrency',
"The number of outgoing HTTP requests currently ongoing",
['name', 'method', 'domain'],
)
class InstrumentedSession(requests.Session):
"""A requests Session that automatically records metrics on requests made.
Users may optionally pass a 'metric_name' kwarg that will be included as the 'name' label.
"""
def request(self, method, url, *args, **kwargs):
_, domain, _, _, _ = urlparse.urlsplit(url)
name = kwargs.pop('metric_name', '')
start = monotonic() # we only use our own measured latency if an error occurs
try:
with request_concurrency.labels(name, method, domain).track_inprogress():
response = super(InstrumentedSession, self).request(method, url, *args, **kwargs)
except Exception:
latency = monotonic() - start
request_latency.labels(name, method, domain, "error").observe(latency)
raise
request_latency.labels(name, method, domain, response.status_code).observe(response.elapsed)
if 'content-length' in response.headers:
response_size.labels(name, method, domain, response.status_code).observe(response.headers['content-length'])
return response

@ -328,6 +328,7 @@ def read_chunks(fileobj, chunk_size=16*1024):
yield chunk yield chunk
@timed('cut', type='fast', normalize=lambda _, segments, start, end: (end - start).total_seconds())
def fast_cut_segments(segments, start, end): def fast_cut_segments(segments, start, end):
"""Yields chunks of a MPEGTS video file covering the exact timestamp range. """Yields chunks of a MPEGTS video file covering the exact timestamp range.
segments should be a list of segments as returned by get_best_segments(). segments should be a list of segments as returned by get_best_segments().
@ -400,6 +401,7 @@ def fast_cut_segments(segments, start, end):
yield chunk yield chunk
@timed('cut', type='full', normalize=lambda _, segments, start, end, encode_args: (end - start).total_seconds())
def full_cut_segments(segments, start, end, encode_args): def full_cut_segments(segments, start, end, encode_args):
# how far into the first segment to begin # how far into the first segment to begin
cut_start = max(0, (start - segments[0].start).total_seconds()) cut_start = max(0, (start - segments[0].start).total_seconds())

@ -17,6 +17,7 @@ from psycopg2 import sql
import common import common
from common.database import DBManager, query from common.database import DBManager, query
from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles
from common.stats import timed
from .upload_backends import Youtube, Local, UploadError from .upload_backends import Youtube, Local, UploadError
@ -33,6 +34,23 @@ upload_errors = prom.Counter(
['video_channel', 'video_quality', 'upload_location', 'final_state'] ['video_channel', 'video_quality', 'upload_location', 'final_state']
) )
no_candidates = prom.Counter(
'no_candidates',
"Number of times we looked for candidate jobs but didn't find any",
)
videos_transcoding = prom.Gauge(
'videos_transcoding',
"Number of videos currently in transcoding",
['location'],
)
videos_marked_done = prom.Counter(
'videos_marked_done',
"Number of videos we have successfully marked as done",
['location'],
)
# A list of all the DB column names in CutJob # A list of all the DB column names in CutJob
CUT_JOB_PARAMS = [ CUT_JOB_PARAMS = [
"sheet_name", "sheet_name",
@ -156,7 +174,6 @@ class Cutter(object):
try: try:
segments = self.check_candidate(candidate) segments = self.check_candidate(candidate)
except ContainsHoles: except ContainsHoles:
# TODO metric
self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate))) self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate)))
set_error( set_error(
"Node {} does not have all the video needed to cut this row. " "Node {} does not have all the video needed to cut this row. "
@ -184,8 +201,10 @@ class Cutter(object):
return CutJob(segments=segments, **candidate._asdict()) return CutJob(segments=segments, **candidate._asdict())
# No candidates # No candidates
no_candidates.inc()
self.wait(self.NO_CANDIDATES_RETRY_INTERVAL) self.wait(self.NO_CANDIDATES_RETRY_INTERVAL)
@timed()
def list_candidates(self): def list_candidates(self):
"""Return a list of all available candidates that we might be able to cut.""" """Return a list of all available candidates that we might be able to cut."""
# We only accept candidates if they haven't excluded us by whitelist, # We only accept candidates if they haven't excluded us by whitelist,
@ -202,6 +221,7 @@ class Cutter(object):
result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys()) result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys())
return result.fetchall() return result.fetchall()
# No need to instrument this function, just use get_best_segments() stats
def check_candidate(self, candidate): def check_candidate(self, candidate):
return get_best_segments( return get_best_segments(
os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality), os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality),
@ -210,6 +230,10 @@ class Cutter(object):
allow_holes=candidate.allow_holes, allow_holes=candidate.allow_holes,
) )
@timed(
video_channel = lambda self, job: job.video_channel,
video_quality = lambda self, job: job.video_quality,
)
def claim_job(self, job): def claim_job(self, job):
"""Update event in DB to say we're working on it. """Update event in DB to say we're working on it.
If someone beat us to it, or it's changed, raise CandidateGone.""" If someone beat us to it, or it's changed, raise CandidateGone."""
@ -445,13 +469,14 @@ class TranscodeChecker(object):
FOUND_VIDEOS_RETRY_INTERVAL = 20 FOUND_VIDEOS_RETRY_INTERVAL = 20
ERROR_RETRY_INTERVAL = 20 ERROR_RETRY_INTERVAL = 20
def __init__(self, backend, dbmanager, stop): def __init__(self, location, backend, dbmanager, stop):
""" """
backend is an upload backend that supports transcoding backend is an upload backend that supports transcoding
and defines check_status(). and defines check_status().
Conn is a database connection. Conn is a database connection.
Stop is an Event triggering graceful shutdown when set. Stop is an Event triggering graceful shutdown when set.
""" """
self.location = location
self.backend = backend self.backend = backend
self.dbmanager = dbmanager self.dbmanager = dbmanager
self.stop = stop self.stop = stop
@ -469,11 +494,13 @@ class TranscodeChecker(object):
if not ids: if not ids:
self.wait(self.NO_VIDEOS_RETRY_INTERVAL) self.wait(self.NO_VIDEOS_RETRY_INTERVAL)
continue continue
videos_transcoding.labels(self.location).set(len(ids))
self.logger.info("Found {} videos in TRANSCODING".format(len(ids))) self.logger.info("Found {} videos in TRANSCODING".format(len(ids)))
ids = self.check_ids(ids) ids = self.check_ids(ids)
if ids: if ids:
self.logger.info("{} videos are done".format(len(ids))) self.logger.info("{} videos are done".format(len(ids)))
done = self.mark_done(ids) done = self.mark_done(ids)
videos_marked_done.labels(self.location).inc(done)
self.logger.info("Marked {} videos as done".format(done)) self.logger.info("Marked {} videos as done".format(done))
self.wait(self.FOUND_VIDEOS_RETRY_INTERVAL) self.wait(self.FOUND_VIDEOS_RETRY_INTERVAL)
except Exception: except Exception:
@ -585,7 +612,7 @@ def main(
config = json.loads(config) config = json.loads(config)
upload_locations = {} upload_locations = {}
needs_transcode_check = [] needs_transcode_check = {}
for location, backend_config in config.items(): for location, backend_config in config.items():
backend_type = backend_config.pop('type') backend_type = backend_config.pop('type')
no_transcode_check = backend_config.pop('no_transcode_check', False) no_transcode_check = backend_config.pop('no_transcode_check', False)
@ -604,12 +631,12 @@ def main(
raise ValueError("Unknown cut type: {!r}".format(cut_type)) raise ValueError("Unknown cut type: {!r}".format(cut_type))
upload_locations[location] = backend upload_locations[location] = backend
if backend.needs_transcode and not no_transcode_check: if backend.needs_transcode and not no_transcode_check:
needs_transcode_check.append(backend) needs_transcode_check[location] = backend
cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir, tags) cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir, tags)
transcode_checkers = [ transcode_checkers = [
TranscodeChecker(backend, dbmanager, stop) TranscodeChecker(location, backend, dbmanager, stop)
for backend in needs_transcode_check for location, backend in needs_transcode_check.items()
] ]
jobs = [gevent.spawn(cutter.run)] + [ jobs = [gevent.spawn(cutter.run)] + [
gevent.spawn(transcode_checker.run) gevent.spawn(transcode_checker.run)

@ -143,13 +143,14 @@ class Youtube(UploadBackend):
'uploadType': 'resumable', 'uploadType': 'resumable',
}, },
json=json, json=json,
metric_name='create_video',
) )
if not resp.ok: if not resp.ok:
# Don't retry, because failed calls still count against our upload quota. # Don't retry, because failed calls still count against our upload quota.
# The risk of repeated failed attempts blowing through our quota is too high. # The risk of repeated failed attempts blowing through our quota is too high.
raise UploadError("Youtube create video call failed with {resp.status_code}: {resp.content}".format(resp=resp)) raise UploadError("Youtube create video call failed with {resp.status_code}: {resp.content}".format(resp=resp))
upload_url = resp.headers['Location'] upload_url = resp.headers['Location']
resp = self.client.request('POST', upload_url, data=data) resp = self.client.request('POST', upload_url, data=data, metric_name='upload_video')
if 400 <= resp.status_code < 500: if 400 <= resp.status_code < 500:
# As above, don't retry. But with 4xx's we know the upload didn't go through. # As above, don't retry. But with 4xx's we know the upload didn't go through.
# On a 5xx, we can't be sure (the server is in an unspecified state). # On a 5xx, we can't be sure (the server is in an unspecified state).
@ -169,6 +170,7 @@ class Youtube(UploadBackend):
'part': 'id,status', 'part': 'id,status',
'id': ','.join(group), 'id': ','.join(group),
}, },
metric_name='list_videos',
) )
resp.raise_for_status() resp.raise_for_status()
for item in resp.json()['items']: for item in resp.json()['items']:

@ -21,6 +21,7 @@ from monotonic import monotonic
import twitch import twitch
import common import common
import common.dateutil import common.dateutil
import common.requests
segments_downloaded = prom.Counter( segments_downloaded = prom.Counter(
@ -29,6 +30,13 @@ segments_downloaded = prom.Counter(
["partial", "channel", "quality"], ["partial", "channel", "quality"],
) )
segment_duration_downloaded = prom.Counter(
"segment_duration_downloaded",
"Total duration of all segments partially or fully downloaded. "
"Note partial segments still count the full duration.",
["partial", "stream", "variant"],
)
latest_segment = prom.Gauge( latest_segment = prom.Gauge(
"latest_segment", "latest_segment",
"Timestamp of the time of the newest segment fully downloaded", "Timestamp of the time of the newest segment fully downloaded",
@ -74,8 +82,6 @@ def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft
finished = True finished = True
class StreamsManager(object): class StreamsManager(object):
"""Keeps track of what qualities are being downloaded and the workers doing so. """Keeps track of what qualities are being downloaded and the workers doing so.
Re-fetches master playlist when needed and starts new stream workers. Re-fetches master playlist when needed and starts new stream workers.
@ -280,7 +286,7 @@ class StreamWorker(object):
# with our connection pool. # with our connection pool.
# This worker's SegmentGetters will use its session by default for performance, # This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong. # but will fall back to a new one if something goes wrong.
self.session = requests.Session() self.session = common.requests.InstrumentedSession()
def __repr__(self): def __repr__(self):
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
@ -456,7 +462,7 @@ class SegmentGetter(object):
break break
# Create a new session, so we don't reuse a connection from the old session # Create a new session, so we don't reuse a connection from the old session
# which had an error / some other issue. This is mostly just out of paranoia. # which had an error / some other issue. This is mostly just out of paranoia.
self.session = requests.Session() self.session = common.requests.InstrumentedSession()
# if retry not set, wait for FETCH_RETRY first # if retry not set, wait for FETCH_RETRY first
self.retry.wait(common.jitter(self.FETCH_RETRY)) self.retry.wait(common.jitter(self.FETCH_RETRY))
self.logger.debug("Getter is done") self.logger.debug("Getter is done")
@ -517,7 +523,7 @@ class SegmentGetter(object):
self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path))
with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set):
with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set):
resp = self.session.get(self.segment.uri, stream=True) resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment')
# twitch returns 403 for expired segment urls, and 404 for very old urls where the original segment is gone. # twitch returns 403 for expired segment urls, and 404 for very old urls where the original segment is gone.
# the latter can happen if we have a network issue that cuts us off from twitch for some time. # the latter can happen if we have a network issue that cuts us off from twitch for some time.
if resp.status_code in (403, 404): if resp.status_code in (403, 404):
@ -542,12 +548,14 @@ class SegmentGetter(object):
self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path))
common.rename(temp_path, partial_path) common.rename(temp_path, partial_path)
segments_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc() segments_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc()
segment_duration_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc(self.segment.duration)
raise ex_type, ex, tb raise ex_type, ex, tb
else: else:
full_path = self.make_path("full", hash) full_path = self.make_path("full", hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path) common.rename(temp_path, full_path)
segments_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc() segments_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc()
segment_duration_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc(self.segment.duration)
# Prom doesn't provide a way to compare value to gauge's existing value, # Prom doesn't provide a way to compare value to gauge's existing value,
# we need to reach into internals # we need to reach into internals
stat = latest_segment.labels(channel=self.channel, quality=self.quality) stat = latest_segment.labels(channel=self.channel, quality=self.quality)

@ -2,16 +2,18 @@
import logging import logging
import random import random
import requests
import hls_playlist import hls_playlist
from common.requests import InstrumentedSession
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_master_playlist(channel, session=requests): def get_master_playlist(channel, session=None):
"""Get the master playlist for given channel from twitch""" """Get the master playlist for given channel from twitch"""
if session is None:
session = InstrumentedSession()
resp = session.get( resp = session.get(
"https://api.twitch.tv/api/channels/{}/access_token.json".format(channel), "https://api.twitch.tv/api/channels/{}/access_token.json".format(channel),
params={'as3': 't'}, params={'as3': 't'},
@ -19,6 +21,7 @@ def get_master_playlist(channel, session=requests):
'Accept': 'application/vnd.twitchtv.v3+json', 'Accept': 'application/vnd.twitchtv.v3+json',
'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j', 'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j',
}, },
metric_name='get_access_token',
) )
resp.raise_for_status() # getting access token resp.raise_for_status() # getting access token
token = resp.json() token = resp.json()
@ -42,6 +45,7 @@ def get_master_playlist(channel, session=requests):
# in flux. Better to just blend in with the crowd for now. # in flux. Better to just blend in with the crowd for now.
# "platform": "_" # "platform": "_"
}, },
metric_name='get_master_playlist',
) )
resp.raise_for_status() # getting master playlist resp.raise_for_status() # getting master playlist
playlist = hls_playlist.load(resp.text, base_uri=resp.url) playlist = hls_playlist.load(resp.text, base_uri=resp.url)
@ -96,7 +100,9 @@ def get_media_playlist_uris(master_playlist, target_qualities):
return {name: variant.uri for name, variant in variants.items()} return {name: variant.uri for name, variant in variants.items()}
def get_media_playlist(uri, session=requests): def get_media_playlist(uri, session=None):
resp = session.get(uri) if session is None:
session = InstrumentedSession()
resp = session.get(uri, metric_name='get_media_playlist')
resp.raise_for_status() resp.raise_for_status()
return hls_playlist.load(resp.text, base_uri=resp.url) return hls_playlist.load(resp.text, base_uri=resp.url)

@ -28,6 +28,18 @@ sync_errors = prom.Counter(
'Number of errors syncing sheets', 'Number of errors syncing sheets',
) )
rows_found = prom.Counter(
'rows_found',
'Number of rows that sheetsync looked at with an id',
['worksheet'],
)
rows_changed = prom.Counter(
'rows_changed',
'Number of rows that needed changes applied, with type=insert, type=input or type=output',
['type', 'worksheet'],
)
class SheetSync(object): class SheetSync(object):
# Time between syncs # Time between syncs
@ -202,8 +214,12 @@ class SheetSync(object):
sql.SQL(", ").join(sql.Placeholder(col) for col in insert_cols), sql.SQL(", ").join(sql.Placeholder(col) for col in insert_cols),
) )
query(self.conn, built_query, sheet_name=worksheet, **row) query(self.conn, built_query, sheet_name=worksheet, **row)
rows_found(worksheet).inc()
rows_changed('insert', worksheet).inc()
return return
rows_found(worksheet).inc()
# Update database with any changed inputs # Update database with any changed inputs
changed = [col for col in self.input_columns if row[col] != getattr(event, col)] changed = [col for col in self.input_columns if row[col] != getattr(event, col)]
if changed: if changed:
@ -220,6 +236,7 @@ class SheetSync(object):
) for col in changed ) for col in changed
)) ))
query(self.conn, built_query, **row) query(self.conn, built_query, **row)
rows_changed('input', worksheet).inc()
# Update sheet with any changed outputs # Update sheet with any changed outputs
format_output = lambda v: '' if v is None else v # cast nulls to empty string format_output = lambda v: '' if v is None else v # cast nulls to empty string
@ -234,6 +251,7 @@ class SheetSync(object):
row_index, self.column_map[col], row_index, self.column_map[col],
format_output(getattr(event, col)), format_output(getattr(event, col)),
) )
rows_changed('output', worksheet).inc()
# Set edit link if marked for editing and start/end set. # Set edit link if marked for editing and start/end set.
# This prevents accidents / clicking the wrong row and provides # This prevents accidents / clicking the wrong row and provides

@ -23,6 +23,7 @@ class Sheets(object):
'https://sheets.googleapis.com/v4/spreadsheets/{}/values/{}'.format( 'https://sheets.googleapis.com/v4/spreadsheets/{}/values/{}'.format(
spreadsheet_id, range, spreadsheet_id, range,
), ),
metric_name='get_rows',
) )
resp.raise_for_status() resp.raise_for_status()
data = resp.json() data = resp.json()
@ -46,6 +47,7 @@ class Sheets(object):
"range": range, "range": range,
"values": [[value]], "values": [[value]],
}, },
metric_name='write_value',
) )
resp.raise_for_status() resp.raise_for_status()

Loading…
Cancel
Save