diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 7232c49..446e218 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -1,6 +1,8 @@ import json import logging +import os +import random import signal import socket from collections import namedtuple @@ -8,9 +10,12 @@ from collections import namedtuple import gevent.backdoor import gevent.event import prometheus_client as prom +import requests +from psycopg2 import sql import common from common.database import DBManager, query +from common.segments import get_best_segments, cut_segments, ContainsHoles from .youtube import Youtube @@ -50,6 +55,10 @@ class CandidateGone(Exception): class Cutter(object): + NO_CANDIDATES_RETRY_INTERVAL = 1 + ERROR_RETRY_INTERVAL = 5 + RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL = 5 + def __init__(self, youtube, conn, stop, name, segments_path): """youtube is an authenticated and initialized youtube api client. Conn is a database connection. @@ -86,25 +95,294 @@ class Cutter(object): (or for which allow_holes is true), returning a CutJob. Polls until one is available. """ - raise NotImplementedError + while not self.stop.is_set(): + try: + candidates = self.list_candidates() + except Exception: + self.logger.exception("Error while listing candidates") + self.wait(self.ERROR_RETRY_INTERVAL) + continue + if candidates: + self.logger.info("Found {} job candidates".format(len(candidates))) + # Shuffle the list so that (most of the time) we don't try to claim the same one as other nodes + random.shuffle(candidates) + for candidate in candidates: + try: + segments = self.check_candidate(candidate) + except ContainsHoles: + # TODO metric + self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate))) + continue # bad candidate, let someone else take it or just try again later + except Exception as e: + # Unknown error. This is either a problem with us, or a problem with the candidate + # (or most likely a problem with us that is only triggered by this candidate). + # In this case we would rather stay running so other jobs can continue to work if possible. + # But to give at least some feedback, we set the error message on the job + # if it isn't already. + self.logger.exception("Failed to check candidate {}, setting error on row".format(format_job(candidate))) + try: + # Since this error message is just for humans, we don't go to too large + # a length to prevent it being put on the row if the row has changed. + # We just check its state is still EDITING. + # Any successful claim will clear its error. + result = query(self.conn, """ + UPDATE events + SET error = %s + WHERE id = %s AND state = 'EDITED' AND error IS NULL + """, id=candidate.id, error='{}: Error while checking candidate: {}'.format(self.name, e)) + except Exception: + self.logger.exception("Failed to set error for candidate {}, ignoring".format(format_job(candidate))) + else: + if result.rowcount > 0: + assert result.rowcount == 1 + self.logger.info("Set error for candidate {}".format(format_job(candidate))) + self.wait(self.ERROR_RETRY_INTERVAL) + continue + if all(segment is None for segment in segments): + self.logger.info("Ignoring candidate {} as we have no segments".format(format_job(candidate))) + continue + return CutJob(segments=segments, **candidate._asdict()) + # No candidates + self.wait(self.NO_CANDIDATES_RETRY_INTERVAL) + + def list_candidates(self): + """Return a list of all available candidates that we might be able to cut.""" + built_query = sql.SQL(""" + SELECT id, {} + FROM events + WHERE state = 'EDITED' + AND (uploader_whitelist IS NULL OR %(name)s = ANY (uploader_whitelist)) + """).format( + sql.SQL(", ").join(sql.Identifier(key) for key in CUT_JOB_PARAMS) + ) + result = query(self.conn, built_query, name=self.name) + return result.fetchall() + + def check_candidate(self, candidate): + return get_best_segments( + os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality), + candidate.video_start, + candidate.video_end, + allow_holes=candidate.allow_holes, + ) def claim_job(self, job): """Update event in DB to say we're working on it. If someone beat us to it, or it's changed, raise CandidateGone.""" # We need to verify all relevant cut params are unchanged, in case they # were updated between verifying the candidate and now. - raise NotImplementedError + built_query = sql.SQL(""" + UPDATE events + SET state = 'CLAIMED', uploader = %(name)s, error = NULL + WHERE id = %(id)s + AND state = 'EDITED' + AND {} + """).format( + # A built AND over all CUT_JOB_PARAMS to check key = %(key)s. + # Note the use of IS NOT DISTINCT FROM because key = NULL is false if key is NULL. + sql.SQL(' AND ').join( + sql.SQL("{} IS NOT DISTINCT FROM {}").format(sql.Identifier(key), sql.Placeholder(key)) + for key in CUT_JOB_PARAMS + ) + ) + try: + result = query(self.conn, built_query, name=self.name, **job._asdict()) + except Exception: + # Rather than retry on failure here, just assume someone else claimed it in the meantime + self.logger.exception("Error while claiming job {}, aborting claim".format(format_job(job))) + self.wait(self.ERROR_RETRY_INTERVAL) + raise CandidateGone + if result.rowcount == 0: + self.logger.info("Failed to claim job {}".format(format_job(job))) + raise CandidateGone + self.logger.info("Claimed job {}".format(format_job(job))) + assert result.rowcount == 1 def cut_job(self, job): """Perform the actual cut and upload, taking the job through FINALIZING and into TRANSCODING or DONE. + + Handles various error conditions: + * Errors while cutting: Assumed to be non-retryable until cut parameters are changed + by operator. Sets error and rolls back to UNEDITED. + * Request error before request body closed: Assumed to be a transient network failure, + immediately retryable. Sets error and rolls back to EDITED. + * Request error after request body closed: It's unknown whether the request went through. + Sets error and remains in FINALIZING. Operator intervention is required. + * Row has changed (no longer claimed by us) before request body closed: + Assumed an operator has made changes and changed state back. Abort cutting without error. + * Row has changed (no longer claimed by us) after request body closed: + Request has already gone through, but we failed to update database with this state. + Causes program crash (JobConsistencyError) and restart, + at which point it will re-sync with DB as best it can. + This situation almost certainly requires operator intervention. """ - raise NotImplementedError + # TODO handle multiple upload locations. Currently everything's hard-coded to youtube. + + self.logger.info("Cutting and uploading job {}".format(format_job(job))) + cut = cut_segments(job.segments, job.video_start, job.video_end) + + # This flag tracks whether we've told requests to finalize the upload, + # and serves to detect whether errors from the request call are recoverable. + # Wrapping it in a one-element list is a hack that lets us modify it from within + # a closure (as py2 lacks the nonlocal keyword). + finalize_begun = [False] + + # This dummy exception is used to pass control flow back out of upload_wrapper + # if we've already handled the error and do not need to do anything further. + class ErrorHandled(Exception): + pass + + # This exception indicates a job we thought was ours somehow disappeared + # while we were still trying to cut it. This most likely represents a logic error + # or that our instance is in a bad state, and will be raised up to run() to terminate + # the cutter entirely. + class JobConsistencyError(Exception): + pass + + def set_row(**kwargs): + """Set columns on the row being cut. Returns True on success, + False if row could not be found. + Example: + if not set_row(state='UNEDITED', error=e): + + """ + # construct an UPDATE query like "SET key1=%(key1)s, key2=%(key2)s, ..." + built_query = sql.SQL(""" + UPDATE events + SET {} + WHERE id = %(id)s AND uploader = %(name)s + """).format(sql.SQL(", ").join( + sql.SQL("{} = {}").format( + sql.Identifier(key), sql.Placeholder(key), + ) for key in kwargs + )) + result = query(self.conn, built_query, id=job.id, name=self.name, **kwargs) + return result.rowcount == 1 + + def upload_wrapper(): + # This generator wraps the cut_segments generator so we can + # do things in between the data being finished and finalizing the request. + # This is also where we do the main error handling. + + try: + for chunk in cut: + yield chunk + except Exception as ex: + self.logger.exception("Error occurred while trying to cut job {}".format(format_job(job))) + # Assumed error is not retryable, set state back to UNEDITED and set error. + if not set_row(state='UNEDITED', error="Error while cutting: {}".format(ex), uploader=None): + self.logger.warning("Tried to roll back row {} to unedited but it was already cancelled.".format(job.id)) + # Abort the cut without further error handling + raise ErrorHandled + + # The data is now fully uploaded, but the request is not finalized. + # We now set the DB state to finalized so we know about failures during this + # critical section. + + self.logger.debug("Setting job to finalizing") + if not set_row(state='FINALIZING'): + # Abort the cut and crash the program, forcing a state resync + raise JobConsistencyError( + "No job with id {} and uploader {} when setting FINALIZING" + .format(job.id, self.name) + ) + finalize_begun[0] = True + + # Now we return from this generator, and any errors between now and returning + # from requests.post() are not recoverable. + + try: + video_id = self.youtube.upload_video( + title=job.video_title, + description=job.video_description, + tags=[], # TODO + data=upload_wrapper(), + hidden=True, # TODO remove when not testing + ) + except JobConsistencyError: + raise # this ensures it's not caught in the next except block + except ErrorHandled: + # we're aborting the cut, error handling has already happened + return + except Exception as ex: + # for HTTPErrors, getting http response body is also useful + if isinstance(ex, requests.HTTPError): + ex = "{}: {}".format(ex, ex.response.content) + + # if error during finalizing, set it in the database and leave it + # stuck in FINALIZING state for operator intervention. + if finalize_begun[0]: + self.logger.critical(( + "Error occurred while finalizing upload of job {}. " + "You will need to check the state of the video manually." + ).format(format_job(job))) + error = ( + "An error occurred during FINALIZING, please determine if video was actually " + "uploaded or not and either move to TRANSCODING and populate video_id or rollback " + "to EDITED and clear uploader. " + "Error: {}" + ).format(ex) + if not set_row(error=error): + # Not only do we not know if it was uploaded, we also failed to set that in the database! + raise JobConsistencyError( + "No job with id {} and uploader {} when setting error while finalizing!" + .format(job.id, self.name) + ) + return + + # error before finalizing, assume it's a network issue / retryable. + # set back to EDITED but still set error + self.logger.exception("Retryable error when uploading job {}".format(format_job(job))) + if not set_row(state='EDITED', error="Retryable error while uploading: {}".format(ex), uploader=None): + raise JobConsistencyError( + "No job with id {} and uploader {} when setting error while rolling back for retryable error" + .format(job.id, self.name) + ) + # pause briefly so we don't immediately grab the same one again in a rapid retry loop + gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL) + return + + # Success! Set TRANSCODING and clear any previous error. + link = "https://youtu.be/{}".format(video_id) + if not set_row(state='TRANSCODING', video_id=video_id, video_link=link, error=None): + # This will result in it being stuck in FINALIZING, and an operator will need to go + # confirm it was really uploaded. + raise JobConsistencyError( + "No job with id {} and uploader {} when setting to TRANSCODING" + .format(job.id, self.name) + ) + + self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), link)) def rollback_all_owned(self): """Roll back any in-progress jobs that claim to be owned by us, to recover from an unclean shutdown.""" - raise NotImplementedError + result = query(self.conn, """ + UPDATE events + SET state = 'EDITED', uploader = NULL + WHERE state = 'CLAIMED' AND uploader = %(name)s + """, name=self.name) + if result.rowcount > 0: + self.logger.warning("Rolled back {} CLAIMED rows for {} - unclean shutdown?".format( + result.rowcount, self.name, + )) + + # Also mark any rows in FINALIZED owned by us as errored, these require manual intervention + result = query(self.conn, """ + UPDATE events + SET error = %(error)s + WHERE state = 'FINALIZING' AND uploader = %(name)s AND error IS NULL + """, name=self.name, error=( + "Uploader died during FINALIZING, please determine if video was actually " + "uploaded or not and either move to TRANSCODING and populate video_id or rollback " + "to EDITED and clear uploader." + )) + if result.rowcount > 0: + self.logger.error("Found {} FINALIZING rows for {}, marked as errored".format( + result.rowcount, self.name, + )) + class TranscodeChecker(object):