cutter: Add more metrics

pull/133/head
Mike Lang 5 years ago
parent 4f900c5925
commit f9b48bc70e

@ -17,22 +17,40 @@ from psycopg2 import sql
import common import common
from common.database import DBManager, query from common.database import DBManager, query
from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles
from common.stats import timed
from .upload_backends import Youtube, Local, UploadError from .upload_backends import Youtube, Local, UploadError
videos_uploaded = prom.Counter( videos_uploaded = prom.Counter(
'videos_uploaded', 'videos_uploaded',
'Number of videos successfully uploaded', 'Number of videos successfully uploaded',
['video_channel', 'video_quality', 'upload_location'] ['video_channel', 'video_quality', 'upload_location']
) )
upload_errors = prom.Counter( upload_errors = prom.Counter(
'upload_errors', 'upload_errors',
'Number of errors uploading a video', 'Number of errors uploading a video',
['video_channel', 'video_quality', 'upload_location', 'final_state'] ['video_channel', 'video_quality', 'upload_location', 'final_state']
) )
no_candidates = prom.Counter(
'no_candidates',
"Number of times we looked for candidate jobs but didn't find any",
)
videos_transcoding = prom.Gauge(
'videos_transcoding',
"Number of videos currently in transcoding",
['location'],
)
videos_marked_done = prom.Counter(
'videos_marked_done',
"Number of videos we have successfully marked as done",
['location'],
)
# A list of all the DB column names in CutJob # A list of all the DB column names in CutJob
CUT_JOB_PARAMS = [ CUT_JOB_PARAMS = [
"sheet_name", "sheet_name",
@ -156,7 +174,6 @@ class Cutter(object):
try: try:
segments = self.check_candidate(candidate) segments = self.check_candidate(candidate)
except ContainsHoles: except ContainsHoles:
# TODO metric
self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate))) self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate)))
set_error( set_error(
"Node {} does not have all the video needed to cut this row. " "Node {} does not have all the video needed to cut this row. "
@ -184,8 +201,10 @@ class Cutter(object):
return CutJob(segments=segments, **candidate._asdict()) return CutJob(segments=segments, **candidate._asdict())
# No candidates # No candidates
no_candidates.inc()
self.wait(self.NO_CANDIDATES_RETRY_INTERVAL) self.wait(self.NO_CANDIDATES_RETRY_INTERVAL)
@timed()
def list_candidates(self): def list_candidates(self):
"""Return a list of all available candidates that we might be able to cut.""" """Return a list of all available candidates that we might be able to cut."""
# We only accept candidates if they haven't excluded us by whitelist, # We only accept candidates if they haven't excluded us by whitelist,
@ -202,6 +221,7 @@ class Cutter(object):
result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys()) result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys())
return result.fetchall() return result.fetchall()
# No need to instrument this function, just use get_best_segments() stats
def check_candidate(self, candidate): def check_candidate(self, candidate):
return get_best_segments( return get_best_segments(
os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality), os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality),
@ -210,6 +230,10 @@ class Cutter(object):
allow_holes=candidate.allow_holes, allow_holes=candidate.allow_holes,
) )
@timed(
video_channel = lambda self, job: job.video_channel,
video_quality = lambda self, job: job.video_quality,
)
def claim_job(self, job): def claim_job(self, job):
"""Update event in DB to say we're working on it. """Update event in DB to say we're working on it.
If someone beat us to it, or it's changed, raise CandidateGone.""" If someone beat us to it, or it's changed, raise CandidateGone."""
@ -445,13 +469,14 @@ class TranscodeChecker(object):
FOUND_VIDEOS_RETRY_INTERVAL = 20 FOUND_VIDEOS_RETRY_INTERVAL = 20
ERROR_RETRY_INTERVAL = 20 ERROR_RETRY_INTERVAL = 20
def __init__(self, backend, dbmanager, stop): def __init__(self, location, backend, dbmanager, stop):
""" """
backend is an upload backend that supports transcoding backend is an upload backend that supports transcoding
and defines check_status(). and defines check_status().
Conn is a database connection. Conn is a database connection.
Stop is an Event triggering graceful shutdown when set. Stop is an Event triggering graceful shutdown when set.
""" """
self.location = location
self.backend = backend self.backend = backend
self.dbmanager = dbmanager self.dbmanager = dbmanager
self.stop = stop self.stop = stop
@ -469,11 +494,13 @@ class TranscodeChecker(object):
if not ids: if not ids:
self.wait(self.NO_VIDEOS_RETRY_INTERVAL) self.wait(self.NO_VIDEOS_RETRY_INTERVAL)
continue continue
videos_transcoding.labels(self.location).set(len(ids))
self.logger.info("Found {} videos in TRANSCODING".format(len(ids))) self.logger.info("Found {} videos in TRANSCODING".format(len(ids)))
ids = self.check_ids(ids) ids = self.check_ids(ids)
if ids: if ids:
self.logger.info("{} videos are done".format(len(ids))) self.logger.info("{} videos are done".format(len(ids)))
done = self.mark_done(ids) done = self.mark_done(ids)
videos_marked_done.labels(self.location).inc(done)
self.logger.info("Marked {} videos as done".format(done)) self.logger.info("Marked {} videos as done".format(done))
self.wait(self.FOUND_VIDEOS_RETRY_INTERVAL) self.wait(self.FOUND_VIDEOS_RETRY_INTERVAL)
except Exception: except Exception:
@ -585,7 +612,7 @@ def main(
config = json.loads(config) config = json.loads(config)
upload_locations = {} upload_locations = {}
needs_transcode_check = [] needs_transcode_check = {}
for location, backend_config in config.items(): for location, backend_config in config.items():
backend_type = backend_config.pop('type') backend_type = backend_config.pop('type')
no_transcode_check = backend_config.pop('no_transcode_check', False) no_transcode_check = backend_config.pop('no_transcode_check', False)
@ -604,12 +631,12 @@ def main(
raise ValueError("Unknown cut type: {!r}".format(cut_type)) raise ValueError("Unknown cut type: {!r}".format(cut_type))
upload_locations[location] = backend upload_locations[location] = backend
if backend.needs_transcode and not no_transcode_check: if backend.needs_transcode and not no_transcode_check:
needs_transcode_check.append(backend) needs_transcode_check[location] = backend
cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir, tags) cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir, tags)
transcode_checkers = [ transcode_checkers = [
TranscodeChecker(backend, dbmanager, stop) TranscodeChecker(location, backend, dbmanager, stop)
for backend in needs_transcode_check for location, backend in needs_transcode_check.items()
] ]
jobs = [gevent.spawn(cutter.run)] + [ jobs = [gevent.spawn(cutter.run)] + [
gevent.spawn(transcode_checker.run) gevent.spawn(transcode_checker.run)

Loading…
Cancel
Save