@ -18,7 +18,7 @@ import common
from common . database import DBManager , query
from common . database import DBManager , query
from common . segments import get_best_segments , fast_cut_segments , full_cut_segments , ContainsHoles
from common . segments import get_best_segments , fast_cut_segments , full_cut_segments , ContainsHoles
from . upload_backends import Youtube , Local
from . upload_backends import Youtube , Local , UploadError
videos_uploaded = prom . Counter (
videos_uploaded = prom . Counter (
@ -281,11 +281,6 @@ class Cutter(object):
# a closure (as py2 lacks the nonlocal keyword).
# a closure (as py2 lacks the nonlocal keyword).
finalize_begun = [ False ]
finalize_begun = [ False ]
# This dummy exception is used to pass control flow back out of upload_wrapper
# if we've already handled the error and do not need to do anything further.
class ErrorHandled ( Exception ) :
pass
# This exception indicates a job we thought was ours somehow disappeared
# This exception indicates a job we thought was ours somehow disappeared
# while we were still trying to cut it. This most likely represents a logic error
# while we were still trying to cut it. This most likely represents a logic error
# or that our instance is in a bad state, and will be raised up to run() to terminate
# or that our instance is in a bad state, and will be raised up to run() to terminate
@ -294,11 +289,8 @@ class Cutter(object):
pass
pass
def set_row ( * * kwargs ) :
def set_row ( * * kwargs ) :
""" Set columns on the row being cut. Returns True on success,
""" Set columns on the row being cut. Raises JobConsistencyError on failure.
False if row could not be found .
Example : set_row ( state = ' UNEDITED ' , error = e )
Example :
if not set_row ( state = ' UNEDITED ' , error = e ) :
< handle row having gone missing >
"""
"""
# construct an UPDATE query like "SET key1=%(key1)s, key2=%(key2)s, ..."
# construct an UPDATE query like "SET key1=%(key1)s, key2=%(key2)s, ..."
built_query = sql . SQL ( """
built_query = sql . SQL ( """
@ -311,7 +303,10 @@ class Cutter(object):
) for key in kwargs
) for key in kwargs
) )
) )
result = query ( self . conn , built_query , id = job . id , name = self . name , * * kwargs )
result = query ( self . conn , built_query , id = job . id , name = self . name , * * kwargs )
return result . rowcount == 1
if result . rowcount != 1 :
raise JobConsistencyError ( " No job with id {} and uploader {} when setting: {} " . format (
job . id , self . name , " , " . join ( " {} = {!r} " . format ( k , v ) for k , v in kwargs . items ( ) )
) )
def upload_wrapper ( ) :
def upload_wrapper ( ) :
# This generator wraps the cut_segments generator so we can
# This generator wraps the cut_segments generator so we can
@ -323,61 +318,54 @@ class Cutter(object):
yield chunk
yield chunk
except Exception as ex :
except Exception as ex :
self . logger . exception ( " Error occurred while trying to cut job {} " . format ( format_job ( job ) ) )
self . logger . exception ( " Error occurred while trying to cut job {} " . format ( format_job ( job ) ) )
# Assumed error is not retryable, set state back to UNEDITED and set error.
# Assumed error is not retryable
if not set_row ( state = ' UNEDITED ' , error = " Error while cutting: {} " . format ( ex ) , uploader = None ) :
raise UploadError ( " Error while cutting: {} " . format ( ex ) , retryable = False )
self . logger . warning ( " Tried to roll back row {} to unedited but it was already cancelled. " . format ( job . id ) )
upload_errors . labels ( video_channel = job . video_channel ,
video_quality = job . video_quality ,
upload_location = job . upload_location ,
final_state = ' UNEDITED ' ) . inc ( )
# Abort the cut without further error handling
raise ErrorHandled
# The data is now fully uploaded, but the request is not finalized.
# The data is now fully uploaded, but the request is not finalized.
# We now set the DB state to finalized so we know about failures during this
# We now set the DB state to finalized so we know about failures during this
# critical section.
# critical section.
self . logger . debug ( " Setting job to finalizing " )
self . logger . debug ( " Setting job to finalizing " )
if not set_row ( state = ' FINALIZING ' ) :
set_row ( state = ' FINALIZING ' )
# Abort the cut and crash the program, forcing a state resync
raise JobConsistencyError (
" No job with id {} and uploader {} when setting FINALIZING "
. format ( job . id , self . name )
)
finalize_begun [ 0 ] = True
finalize_begun [ 0 ] = True
# Now we return from this generator, and any errors between now and returning
# Now we return from this generator, and any unknown errors between now and returning
# from requests.post() are not recoverable.
# from the upload backend are not recoverable.
try :
try :
video_id , video_link = upload_backend . upload_video (
# UploadErrors in the except block below should be caught
title = (
# the same as UploadErrors in the main try block, so we wrap
" {} - {} " . format ( self . title_header , job . video_title )
# a second try around the whole thing.
if self . title_header else job . video_title
try :
) ,
video_id , video_link = upload_backend . upload_video (
description = (
title = (
" {} \n \n {} " . format ( job . video_description , self . description_footer )
" {} - {} " . format ( self . title_header , job . video_title )
if self . description_footer else job . video_description
if self . title_header else job . video_title
) ,
) ,
# Add category and sheet_name as tags
description = (
tags = self . tags + [ job . category , job . sheet_name ] ,
" {} \n \n {} " . format ( job . video_description , self . description_footer )
data = upload_wrapper ( ) ,
if self . description_footer else job . video_description
)
) ,
except JobConsistencyError :
# Add category and sheet_name as tags
raise # this ensures it's not caught in the next except block
tags = self . tags + [ job . category , job . sheet_name ] ,
except ErrorHandled :
data = upload_wrapper ( ) ,
# we're aborting the cut, error handling has already happened
)
return
except ( JobConsistencyError , UploadError ) :
except Exception as ex :
raise # this ensures these aren't not caught in the except Exception block
self . refresh_conn ( )
except Exception as ex :
self . refresh_conn ( )
# for HTTPErrors, getting http response body is also useful
if isinstance ( ex , requests . HTTPError ) :
ex = " {} : {} " . format ( ex , ex . response . content )
# for HTTPErrors, getting http response body is also useful
if not finalize_begun [ 0 ] :
if isinstance ( ex , requests . HTTPError ) :
# error before finalizing, assume it's a network issue / retryable.
ex = " {} : {} " . format ( ex , ex . response . content )
self . logger . exception ( " Retryable error when uploading job {} " . format ( format_job ( job ) ) )
raise UploadError ( " Unhandled error in upload: {} " . format ( ex ) , retryable = True )
# if error during finalizing, set it in the database and leave it
# unknown error during finalizing, set it in the database and leave it
# stuck in FINALIZING state for operator intervention.
# stuck in FINALIZING state for operator intervention.
if finalize_begun [ 0 ] :
self . logger . critical ( (
self . logger . critical ( (
" Error occurred while finalizing upload of job {} . "
" Error occurred while finalizing upload of job {} . "
" You will need to check the state of the video manually. "
" You will need to check the state of the video manually. "
@ -388,43 +376,42 @@ class Cutter(object):
" to EDITED and clear uploader. "
" to EDITED and clear uploader. "
" Error: {} "
" Error: {} "
) . format ( ex )
) . format ( ex )
upload_errors . labels ( video_channel = job . video_channel ,
upload_errors . labels (
video_quality = job . video_quality ,
video_channel = job . video_channel ,
upload_location = job . upload_location ,
final_state = ' FINALIZING ' ) . inc ( )
if not set_row ( error = error ) :
# Not only do we not know if it was uploaded, we also failed to set that in the database!
raise JobConsistencyError (
" No job with id {} and uploader {} when setting error while finalizing! "
. format ( job . id , self . name )
)
return
# error before finalizing, assume it's a network issue / retryable.
# set back to EDITED but still set error
self . logger . exception ( " Retryable error when uploading job {} " . format ( format_job ( job ) ) )
upload_errors . labels ( video_channel = job . video_channel ,
video_quality = job . video_quality ,
video_quality = job . video_quality ,
upload_location = job . upload_location ,
upload_location = job . upload_location ,
final_state = ' EDITED ' ) . inc ( )
final_state = ' FINALIZING ' ,
if not set_row ( state = ' EDITED ' , error = " Retryable error while uploading: {} " . format ( ex ) , uploader = None ) :
) . inc ( )
raise JobConsistencyError (
set_row ( error = error )
" No job with id {} and uploader {} when setting error while rolling back for retryable error "
return
. format ( job . id , self . name )
)
except UploadError as ex :
# pause briefly so we don't immediately grab the same one again in a rapid retry loop
# At this stage, we assume whatever raised UploadError has already
gevent . sleep ( self . RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL )
# logged about it. We're just setting the database as appropriate.
# If it's retryable, we clear uploader and set back to EDITED.
# If it isn't, we don't clear uploader (so we know where it failed)
# and we set it back to UNEDITED, waiting for an editor to manually retry.
if ex . retryable :
state = ' EDITED '
kwargs = { }
else :
state = ' UNEDITED '
kwargs = { ' uploader ' : None }
upload_errors . labels (
video_channel = job . video_channel ,
video_quality = job . video_quality ,
upload_location = job . upload_location ,
final_state = state ,
) . inc ( )
set_row ( state = state , error = str ( ex ) , * * kwargs )
if ex . retryable :
# pause briefly so we don't immediately grab the same one again in a rapid retry loop
gevent . sleep ( self . RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL )
return
return
# Success! Set TRANSCODING or DONE and clear any previous error.
# Success! Set TRANSCODING or DONE and clear any previous error.
success_state = ' TRANSCODING ' if upload_backend . needs_transcode else ' DONE '
success_state = ' TRANSCODING ' if upload_backend . needs_transcode else ' DONE '
if not set_row ( state = success_state , video_id = video_id , video_link = video_link , error = None ) :
set_row ( state = success_state , video_id = video_id , video_link = video_link , error = None )
# This will result in it being stuck in FINALIZING, and an operator will need to go
# confirm it was really uploaded.
raise JobConsistencyError (
" No job with id {} and uploader {} when setting to {} "
. format ( job . id , self . name , success_state )
)
self . logger . info ( " Successfully cut and uploaded job {} as {} " . format ( format_job ( job ) , video_link ) )
self . logger . info ( " Successfully cut and uploaded job {} as {} " . format ( format_job ( job ) , video_link ) )
videos_uploaded . labels ( video_channel = job . video_channel ,
videos_uploaded . labels ( video_channel = job . video_channel ,