cutter: Implement actual cut methods

Each method is fairly complicated, but is self-contained and can be examined independently.

cut_jobs in particular contains several extra helpers and directs control flow
via some iterators. This is unfortunately nessecary due to the requests interface.
pull/47/head
Mike Lang 6 years ago committed by Mike Lang
parent ae809c696c
commit 5b2a1ef6b7

@ -1,6 +1,8 @@
import json import json
import logging import logging
import os
import random
import signal import signal
import socket import socket
from collections import namedtuple from collections import namedtuple
@ -8,9 +10,12 @@ from collections import namedtuple
import gevent.backdoor import gevent.backdoor
import gevent.event import gevent.event
import prometheus_client as prom import prometheus_client as prom
import requests
from psycopg2 import sql
import common import common
from common.database import DBManager, query from common.database import DBManager, query
from common.segments import get_best_segments, cut_segments, ContainsHoles
from .youtube import Youtube from .youtube import Youtube
@ -50,6 +55,10 @@ class CandidateGone(Exception):
class Cutter(object): class Cutter(object):
NO_CANDIDATES_RETRY_INTERVAL = 1
ERROR_RETRY_INTERVAL = 5
RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL = 5
def __init__(self, youtube, conn, stop, name, segments_path): def __init__(self, youtube, conn, stop, name, segments_path):
"""youtube is an authenticated and initialized youtube api client. """youtube is an authenticated and initialized youtube api client.
Conn is a database connection. Conn is a database connection.
@ -86,25 +95,294 @@ class Cutter(object):
(or for which allow_holes is true), returning a CutJob. (or for which allow_holes is true), returning a CutJob.
Polls until one is available. Polls until one is available.
""" """
raise NotImplementedError while not self.stop.is_set():
try:
candidates = self.list_candidates()
except Exception:
self.logger.exception("Error while listing candidates")
self.wait(self.ERROR_RETRY_INTERVAL)
continue
if candidates:
self.logger.info("Found {} job candidates".format(len(candidates)))
# Shuffle the list so that (most of the time) we don't try to claim the same one as other nodes
random.shuffle(candidates)
for candidate in candidates:
try:
segments = self.check_candidate(candidate)
except ContainsHoles:
# TODO metric
self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate)))
continue # bad candidate, let someone else take it or just try again later
except Exception as e:
# Unknown error. This is either a problem with us, or a problem with the candidate
# (or most likely a problem with us that is only triggered by this candidate).
# In this case we would rather stay running so other jobs can continue to work if possible.
# But to give at least some feedback, we set the error message on the job
# if it isn't already.
self.logger.exception("Failed to check candidate {}, setting error on row".format(format_job(candidate)))
try:
# Since this error message is just for humans, we don't go to too large
# a length to prevent it being put on the row if the row has changed.
# We just check its state is still EDITING.
# Any successful claim will clear its error.
result = query(self.conn, """
UPDATE events
SET error = %s
WHERE id = %s AND state = 'EDITED' AND error IS NULL
""", id=candidate.id, error='{}: Error while checking candidate: {}'.format(self.name, e))
except Exception:
self.logger.exception("Failed to set error for candidate {}, ignoring".format(format_job(candidate)))
else:
if result.rowcount > 0:
assert result.rowcount == 1
self.logger.info("Set error for candidate {}".format(format_job(candidate)))
self.wait(self.ERROR_RETRY_INTERVAL)
continue
if all(segment is None for segment in segments):
self.logger.info("Ignoring candidate {} as we have no segments".format(format_job(candidate)))
continue
return CutJob(segments=segments, **candidate._asdict())
# No candidates
self.wait(self.NO_CANDIDATES_RETRY_INTERVAL)
def list_candidates(self):
"""Return a list of all available candidates that we might be able to cut."""
built_query = sql.SQL("""
SELECT id, {}
FROM events
WHERE state = 'EDITED'
AND (uploader_whitelist IS NULL OR %(name)s = ANY (uploader_whitelist))
""").format(
sql.SQL(", ").join(sql.Identifier(key) for key in CUT_JOB_PARAMS)
)
result = query(self.conn, built_query, name=self.name)
return result.fetchall()
def check_candidate(self, candidate):
return get_best_segments(
os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality),
candidate.video_start,
candidate.video_end,
allow_holes=candidate.allow_holes,
)
def claim_job(self, job): def claim_job(self, job):
"""Update event in DB to say we're working on it. """Update event in DB to say we're working on it.
If someone beat us to it, or it's changed, raise CandidateGone.""" If someone beat us to it, or it's changed, raise CandidateGone."""
# We need to verify all relevant cut params are unchanged, in case they # We need to verify all relevant cut params are unchanged, in case they
# were updated between verifying the candidate and now. # were updated between verifying the candidate and now.
raise NotImplementedError built_query = sql.SQL("""
UPDATE events
SET state = 'CLAIMED', uploader = %(name)s, error = NULL
WHERE id = %(id)s
AND state = 'EDITED'
AND {}
""").format(
# A built AND over all CUT_JOB_PARAMS to check key = %(key)s.
# Note the use of IS NOT DISTINCT FROM because key = NULL is false if key is NULL.
sql.SQL(' AND ').join(
sql.SQL("{} IS NOT DISTINCT FROM {}").format(sql.Identifier(key), sql.Placeholder(key))
for key in CUT_JOB_PARAMS
)
)
try:
result = query(self.conn, built_query, name=self.name, **job._asdict())
except Exception:
# Rather than retry on failure here, just assume someone else claimed it in the meantime
self.logger.exception("Error while claiming job {}, aborting claim".format(format_job(job)))
self.wait(self.ERROR_RETRY_INTERVAL)
raise CandidateGone
if result.rowcount == 0:
self.logger.info("Failed to claim job {}".format(format_job(job)))
raise CandidateGone
self.logger.info("Claimed job {}".format(format_job(job)))
assert result.rowcount == 1
def cut_job(self, job): def cut_job(self, job):
"""Perform the actual cut and upload, taking the job through FINALIZING and into """Perform the actual cut and upload, taking the job through FINALIZING and into
TRANSCODING or DONE. TRANSCODING or DONE.
Handles various error conditions:
* Errors while cutting: Assumed to be non-retryable until cut parameters are changed
by operator. Sets error and rolls back to UNEDITED.
* Request error before request body closed: Assumed to be a transient network failure,
immediately retryable. Sets error and rolls back to EDITED.
* Request error after request body closed: It's unknown whether the request went through.
Sets error and remains in FINALIZING. Operator intervention is required.
* Row has changed (no longer claimed by us) before request body closed:
Assumed an operator has made changes and changed state back. Abort cutting without error.
* Row has changed (no longer claimed by us) after request body closed:
Request has already gone through, but we failed to update database with this state.
Causes program crash (JobConsistencyError) and restart,
at which point it will re-sync with DB as best it can.
This situation almost certainly requires operator intervention.
""" """
raise NotImplementedError # TODO handle multiple upload locations. Currently everything's hard-coded to youtube.
self.logger.info("Cutting and uploading job {}".format(format_job(job)))
cut = cut_segments(job.segments, job.video_start, job.video_end)
# This flag tracks whether we've told requests to finalize the upload,
# and serves to detect whether errors from the request call are recoverable.
# Wrapping it in a one-element list is a hack that lets us modify it from within
# a closure (as py2 lacks the nonlocal keyword).
finalize_begun = [False]
# This dummy exception is used to pass control flow back out of upload_wrapper
# if we've already handled the error and do not need to do anything further.
class ErrorHandled(Exception):
pass
# This exception indicates a job we thought was ours somehow disappeared
# while we were still trying to cut it. This most likely represents a logic error
# or that our instance is in a bad state, and will be raised up to run() to terminate
# the cutter entirely.
class JobConsistencyError(Exception):
pass
def set_row(**kwargs):
"""Set columns on the row being cut. Returns True on success,
False if row could not be found.
Example:
if not set_row(state='UNEDITED', error=e):
<handle row having gone missing>
"""
# construct an UPDATE query like "SET key1=%(key1)s, key2=%(key2)s, ..."
built_query = sql.SQL("""
UPDATE events
SET {}
WHERE id = %(id)s AND uploader = %(name)s
""").format(sql.SQL(", ").join(
sql.SQL("{} = {}").format(
sql.Identifier(key), sql.Placeholder(key),
) for key in kwargs
))
result = query(self.conn, built_query, id=job.id, name=self.name, **kwargs)
return result.rowcount == 1
def upload_wrapper():
# This generator wraps the cut_segments generator so we can
# do things in between the data being finished and finalizing the request.
# This is also where we do the main error handling.
try:
for chunk in cut:
yield chunk
except Exception as ex:
self.logger.exception("Error occurred while trying to cut job {}".format(format_job(job)))
# Assumed error is not retryable, set state back to UNEDITED and set error.
if not set_row(state='UNEDITED', error="Error while cutting: {}".format(ex), uploader=None):
self.logger.warning("Tried to roll back row {} to unedited but it was already cancelled.".format(job.id))
# Abort the cut without further error handling
raise ErrorHandled
# The data is now fully uploaded, but the request is not finalized.
# We now set the DB state to finalized so we know about failures during this
# critical section.
self.logger.debug("Setting job to finalizing")
if not set_row(state='FINALIZING'):
# Abort the cut and crash the program, forcing a state resync
raise JobConsistencyError(
"No job with id {} and uploader {} when setting FINALIZING"
.format(job.id, self.name)
)
finalize_begun[0] = True
# Now we return from this generator, and any errors between now and returning
# from requests.post() are not recoverable.
try:
video_id = self.youtube.upload_video(
title=job.video_title,
description=job.video_description,
tags=[], # TODO
data=upload_wrapper(),
hidden=True, # TODO remove when not testing
)
except JobConsistencyError:
raise # this ensures it's not caught in the next except block
except ErrorHandled:
# we're aborting the cut, error handling has already happened
return
except Exception as ex:
# for HTTPErrors, getting http response body is also useful
if isinstance(ex, requests.HTTPError):
ex = "{}: {}".format(ex, ex.response.content)
# if error during finalizing, set it in the database and leave it
# stuck in FINALIZING state for operator intervention.
if finalize_begun[0]:
self.logger.critical((
"Error occurred while finalizing upload of job {}. "
"You will need to check the state of the video manually."
).format(format_job(job)))
error = (
"An error occurred during FINALIZING, please determine if video was actually "
"uploaded or not and either move to TRANSCODING and populate video_id or rollback "
"to EDITED and clear uploader. "
"Error: {}"
).format(ex)
if not set_row(error=error):
# Not only do we not know if it was uploaded, we also failed to set that in the database!
raise JobConsistencyError(
"No job with id {} and uploader {} when setting error while finalizing!"
.format(job.id, self.name)
)
return
# error before finalizing, assume it's a network issue / retryable.
# set back to EDITED but still set error
self.logger.exception("Retryable error when uploading job {}".format(format_job(job)))
if not set_row(state='EDITED', error="Retryable error while uploading: {}".format(ex), uploader=None):
raise JobConsistencyError(
"No job with id {} and uploader {} when setting error while rolling back for retryable error"
.format(job.id, self.name)
)
# pause briefly so we don't immediately grab the same one again in a rapid retry loop
gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL)
return
# Success! Set TRANSCODING and clear any previous error.
link = "https://youtu.be/{}".format(video_id)
if not set_row(state='TRANSCODING', video_id=video_id, video_link=link, error=None):
# This will result in it being stuck in FINALIZING, and an operator will need to go
# confirm it was really uploaded.
raise JobConsistencyError(
"No job with id {} and uploader {} when setting to TRANSCODING"
.format(job.id, self.name)
)
self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), link))
def rollback_all_owned(self): def rollback_all_owned(self):
"""Roll back any in-progress jobs that claim to be owned by us, """Roll back any in-progress jobs that claim to be owned by us,
to recover from an unclean shutdown.""" to recover from an unclean shutdown."""
raise NotImplementedError result = query(self.conn, """
UPDATE events
SET state = 'EDITED', uploader = NULL
WHERE state = 'CLAIMED' AND uploader = %(name)s
""", name=self.name)
if result.rowcount > 0:
self.logger.warning("Rolled back {} CLAIMED rows for {} - unclean shutdown?".format(
result.rowcount, self.name,
))
# Also mark any rows in FINALIZED owned by us as errored, these require manual intervention
result = query(self.conn, """
UPDATE events
SET error = %(error)s
WHERE state = 'FINALIZING' AND uploader = %(name)s AND error IS NULL
""", name=self.name, error=(
"Uploader died during FINALIZING, please determine if video was actually "
"uploaded or not and either move to TRANSCODING and populate video_id or rollback "
"to EDITED and clear uploader."
))
if result.rowcount > 0:
self.logger.error("Found {} FINALIZING rows for {}, marked as errored".format(
result.rowcount, self.name,
))
class TranscodeChecker(object): class TranscodeChecker(object):

Loading…
Cancel
Save