@ -59,7 +59,7 @@ class Cutter(object):
ERROR_RETRY_INTERVAL = 5
ERROR_RETRY_INTERVAL = 5
RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL = 5
RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL = 5
def __init__ ( self , youtube , conn , stop , name , segments_path ) :
def __init__ ( self , youtube , dbmanager , stop , name , segments_path ) :
""" youtube is an authenticated and initialized youtube api client.
""" youtube is an authenticated and initialized youtube api client.
Conn is a database connection .
Conn is a database connection .
Stop is an Event triggering graceful shutdown when set .
Stop is an Event triggering graceful shutdown when set .
@ -68,10 +68,11 @@ class Cutter(object):
"""
"""
self . name = name
self . name = name
self . youtube = youtube
self . youtube = youtube
self . conn = conn
self . dbmanager = dbmanager
self . stop = stop
self . stop = stop
self . segments_path = segments_path
self . segments_path = segments_path
self . logger = logging . getLogger ( type ( self ) . __name__ )
self . logger = logging . getLogger ( type ( self ) . __name__ )
self . refresh_conn ( )
def wait ( self , interval ) :
def wait ( self , interval ) :
""" Wait for INTERVAL with jitter, unless we ' re stopping """
""" Wait for INTERVAL with jitter, unless we ' re stopping """
@ -90,6 +91,11 @@ class Cutter(object):
continue
continue
self . cut_job ( job )
self . cut_job ( job )
def refresh_conn ( self ) :
""" After errors, we reconnect in case the error was connection-related. """
self . logger . debug ( " Reconnecting to DB " )
self . conn = self . dbmanager . get_conn ( )
def find_candidate ( self ) :
def find_candidate ( self ) :
""" List EDITED events and find one at random which we have all segments for
""" List EDITED events and find one at random which we have all segments for
( or for which allow_holes is true ) , returning a CutJob .
( or for which allow_holes is true ) , returning a CutJob .
@ -100,6 +106,7 @@ class Cutter(object):
candidates = self . list_candidates ( )
candidates = self . list_candidates ( )
except Exception :
except Exception :
self . logger . exception ( " Error while listing candidates " )
self . logger . exception ( " Error while listing candidates " )
self . refresh_conn ( )
self . wait ( self . ERROR_RETRY_INTERVAL )
self . wait ( self . ERROR_RETRY_INTERVAL )
continue
continue
if candidates :
if candidates :
@ -132,6 +139,7 @@ class Cutter(object):
""" , id=candidate.id, error= ' {} : Error while checking candidate: {} ' .format(self.name, e))
""" , id=candidate.id, error= ' {} : Error while checking candidate: {} ' .format(self.name, e))
except Exception :
except Exception :
self . logger . exception ( " Failed to set error for candidate {} , ignoring " . format ( format_job ( candidate ) ) )
self . logger . exception ( " Failed to set error for candidate {} , ignoring " . format ( format_job ( candidate ) ) )
self . refresh_conn ( )
else :
else :
if result . rowcount > 0 :
if result . rowcount > 0 :
assert result . rowcount == 1
assert result . rowcount == 1
@ -190,6 +198,7 @@ class Cutter(object):
except Exception :
except Exception :
# Rather than retry on failure here, just assume someone else claimed it in the meantime
# Rather than retry on failure here, just assume someone else claimed it in the meantime
self . logger . exception ( " Error while claiming job {} , aborting claim " . format ( format_job ( job ) ) )
self . logger . exception ( " Error while claiming job {} , aborting claim " . format ( format_job ( job ) ) )
self . refresh_conn ( )
self . wait ( self . ERROR_RETRY_INTERVAL )
self . wait ( self . ERROR_RETRY_INTERVAL )
raise CandidateGone
raise CandidateGone
if result . rowcount == 0 :
if result . rowcount == 0 :
@ -306,6 +315,8 @@ class Cutter(object):
# we're aborting the cut, error handling has already happened
# we're aborting the cut, error handling has already happened
return
return
except Exception as ex :
except Exception as ex :
self . refresh_conn ( )
# for HTTPErrors, getting http response body is also useful
# for HTTPErrors, getting http response body is also useful
if isinstance ( ex , requests . HTTPError ) :
if isinstance ( ex , requests . HTTPError ) :
ex = " {} : {} " . format ( ex , ex . response . content )
ex = " {} : {} " . format ( ex , ex . response . content )
@ -487,8 +498,8 @@ def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=80
client_secret = youtube_creds [ ' client_secret ' ] ,
client_secret = youtube_creds [ ' client_secret ' ] ,
refresh_token = youtube_creds [ ' refresh_token ' ] ,
refresh_token = youtube_creds [ ' refresh_token ' ] ,
)
)
cutter = Cutter ( youtube , dbmanager . get_conn ( ) , stop , name , base_dir )
cutter = Cutter ( youtube , dbmanager , stop , name , base_dir )
transcode_checker = TranscodeChecker ( youtube , dbmanager . get_conn ( ) , stop )
transcode_checker = TranscodeChecker ( youtube , dbmanager , stop )
jobs = [
jobs = [
gevent . spawn ( cutter . run ) ,
gevent . spawn ( cutter . run ) ,
gevent . spawn ( transcode_checker . run ) ,
gevent . spawn ( transcode_checker . run ) ,