From f9b48bc70eeb0683282d87942dd64e060f16fd4e Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 31 Oct 2019 17:51:42 -0700 Subject: [PATCH] cutter: Add more metrics --- cutter/cutter/main.py | 43 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index b399724..bbbe04d 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -17,22 +17,40 @@ from psycopg2 import sql import common from common.database import DBManager, query from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles +from common.stats import timed from .upload_backends import Youtube, Local, UploadError -videos_uploaded = prom.Counter( +videos_uploaded = prom.Counter( 'videos_uploaded', 'Number of videos successfully uploaded', ['video_channel', 'video_quality', 'upload_location'] ) -upload_errors = prom.Counter( +upload_errors = prom.Counter( 'upload_errors', 'Number of errors uploading a video', ['video_channel', 'video_quality', 'upload_location', 'final_state'] ) +no_candidates = prom.Counter( + 'no_candidates', + "Number of times we looked for candidate jobs but didn't find any", +) + +videos_transcoding = prom.Gauge( + 'videos_transcoding', + "Number of videos currently in transcoding", + ['location'], +) + +videos_marked_done = prom.Counter( + 'videos_marked_done', + "Number of videos we have successfully marked as done", + ['location'], +) + # A list of all the DB column names in CutJob CUT_JOB_PARAMS = [ "sheet_name", @@ -156,7 +174,6 @@ class Cutter(object): try: segments = self.check_candidate(candidate) except ContainsHoles: - # TODO metric self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate))) set_error( "Node {} does not have all the video needed to cut this row. " @@ -184,8 +201,10 @@ class Cutter(object): return CutJob(segments=segments, **candidate._asdict()) # No candidates + no_candidates.inc() self.wait(self.NO_CANDIDATES_RETRY_INTERVAL) + @timed() def list_candidates(self): """Return a list of all available candidates that we might be able to cut.""" # We only accept candidates if they haven't excluded us by whitelist, @@ -202,6 +221,7 @@ class Cutter(object): result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys()) return result.fetchall() + # No need to instrument this function, just use get_best_segments() stats def check_candidate(self, candidate): return get_best_segments( os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality), @@ -210,6 +230,10 @@ class Cutter(object): allow_holes=candidate.allow_holes, ) + @timed( + video_channel = lambda self, job: job.video_channel, + video_quality = lambda self, job: job.video_quality, + ) def claim_job(self, job): """Update event in DB to say we're working on it. If someone beat us to it, or it's changed, raise CandidateGone.""" @@ -445,13 +469,14 @@ class TranscodeChecker(object): FOUND_VIDEOS_RETRY_INTERVAL = 20 ERROR_RETRY_INTERVAL = 20 - def __init__(self, backend, dbmanager, stop): + def __init__(self, location, backend, dbmanager, stop): """ backend is an upload backend that supports transcoding and defines check_status(). Conn is a database connection. Stop is an Event triggering graceful shutdown when set. """ + self.location = location self.backend = backend self.dbmanager = dbmanager self.stop = stop @@ -469,11 +494,13 @@ class TranscodeChecker(object): if not ids: self.wait(self.NO_VIDEOS_RETRY_INTERVAL) continue + videos_transcoding.labels(self.location).set(len(ids)) self.logger.info("Found {} videos in TRANSCODING".format(len(ids))) ids = self.check_ids(ids) if ids: self.logger.info("{} videos are done".format(len(ids))) done = self.mark_done(ids) + videos_marked_done.labels(self.location).inc(done) self.logger.info("Marked {} videos as done".format(done)) self.wait(self.FOUND_VIDEOS_RETRY_INTERVAL) except Exception: @@ -585,7 +612,7 @@ def main( config = json.loads(config) upload_locations = {} - needs_transcode_check = [] + needs_transcode_check = {} for location, backend_config in config.items(): backend_type = backend_config.pop('type') no_transcode_check = backend_config.pop('no_transcode_check', False) @@ -604,12 +631,12 @@ def main( raise ValueError("Unknown cut type: {!r}".format(cut_type)) upload_locations[location] = backend if backend.needs_transcode and not no_transcode_check: - needs_transcode_check.append(backend) + needs_transcode_check[location] = backend cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir, tags) transcode_checkers = [ - TranscodeChecker(backend, dbmanager, stop) - for backend in needs_transcode_check + TranscodeChecker(location, backend, dbmanager, stop) + for location, backend in needs_transcode_check.items() ] jobs = [gevent.spawn(cutter.run)] + [ gevent.spawn(transcode_checker.run)