|
|
@ -226,7 +226,7 @@ class Cutter(object):
|
|
|
|
""").format(
|
|
|
|
""").format(
|
|
|
|
sql.SQL(", ").join(sql.Identifier(key) for key in CUT_JOB_PARAMS)
|
|
|
|
sql.SQL(", ").join(sql.Identifier(key) for key in CUT_JOB_PARAMS)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys())
|
|
|
|
result = query(self.conn, built_query, name=self.name, upload_locations=list(self.upload_locations.keys()))
|
|
|
|
return result.fetchall()
|
|
|
|
return result.fetchall()
|
|
|
|
|
|
|
|
|
|
|
|
# No need to instrument this function, just use get_best_segments() stats
|
|
|
|
# No need to instrument this function, just use get_best_segments() stats
|
|
|
@ -313,9 +313,7 @@ class Cutter(object):
|
|
|
|
|
|
|
|
|
|
|
|
# This flag tracks whether we've told requests to finalize the upload,
|
|
|
|
# This flag tracks whether we've told requests to finalize the upload,
|
|
|
|
# and serves to detect whether errors from the request call are recoverable.
|
|
|
|
# and serves to detect whether errors from the request call are recoverable.
|
|
|
|
# Wrapping it in a one-element list is a hack that lets us modify it from within
|
|
|
|
finalize_begun = False
|
|
|
|
# a closure (as py2 lacks the nonlocal keyword).
|
|
|
|
|
|
|
|
finalize_begun = [False]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# This exception indicates a job we thought was ours somehow disappeared
|
|
|
|
# This exception indicates a job we thought was ours somehow disappeared
|
|
|
|
# while we were still trying to cut it. This most likely represents a logic error
|
|
|
|
# while we were still trying to cut it. This most likely represents a logic error
|
|
|
@ -342,7 +340,7 @@ class Cutter(object):
|
|
|
|
if result.rowcount != 1:
|
|
|
|
if result.rowcount != 1:
|
|
|
|
# If we hadn't yet set finalizing, then this means an operator cancelled the job
|
|
|
|
# If we hadn't yet set finalizing, then this means an operator cancelled the job
|
|
|
|
# while we were cutting it. This isn't a problem.
|
|
|
|
# while we were cutting it. This isn't a problem.
|
|
|
|
if not finalize_begun[0]:
|
|
|
|
if not finalize_begun:
|
|
|
|
raise JobCancelled()
|
|
|
|
raise JobCancelled()
|
|
|
|
raise JobConsistencyError("No job with id {} and uploader {} when setting: {}".format(
|
|
|
|
raise JobConsistencyError("No job with id {} and uploader {} when setting: {}".format(
|
|
|
|
job.id, self.name, ", ".join("{} = {!r}".format(k, v) for k, v in kwargs.items())
|
|
|
|
job.id, self.name, ", ".join("{} = {!r}".format(k, v) for k, v in kwargs.items())
|
|
|
@ -353,6 +351,11 @@ class Cutter(object):
|
|
|
|
# do things in between the data being finished and finalizing the request.
|
|
|
|
# do things in between the data being finished and finalizing the request.
|
|
|
|
# This is also where we do the main error handling.
|
|
|
|
# This is also where we do the main error handling.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Tell python to use the finalize_begun variable from the enclosing scope,
|
|
|
|
|
|
|
|
# instead of creating a new (shadowing) variable which is the default when
|
|
|
|
|
|
|
|
# you do "variable = value".
|
|
|
|
|
|
|
|
nonlocal finalize_begun
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
for chunk in cut:
|
|
|
|
for chunk in cut:
|
|
|
|
yield chunk
|
|
|
|
yield chunk
|
|
|
@ -367,7 +370,7 @@ class Cutter(object):
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.debug("Setting job to finalizing")
|
|
|
|
self.logger.debug("Setting job to finalizing")
|
|
|
|
set_row(state='FINALIZING')
|
|
|
|
set_row(state='FINALIZING')
|
|
|
|
finalize_begun[0] = True
|
|
|
|
finalize_begun = True
|
|
|
|
|
|
|
|
|
|
|
|
# Now we return from this generator, and any unknown errors between now and returning
|
|
|
|
# Now we return from this generator, and any unknown errors between now and returning
|
|
|
|
# from the upload backend are not recoverable.
|
|
|
|
# from the upload backend are not recoverable.
|
|
|
@ -393,7 +396,7 @@ class Cutter(object):
|
|
|
|
if isinstance(ex, requests.HTTPError):
|
|
|
|
if isinstance(ex, requests.HTTPError):
|
|
|
|
ex = "{}: {}".format(ex, ex.response.content)
|
|
|
|
ex = "{}: {}".format(ex, ex.response.content)
|
|
|
|
|
|
|
|
|
|
|
|
if not finalize_begun[0]:
|
|
|
|
if not finalize_begun:
|
|
|
|
# error before finalizing, assume it's a network issue / retryable.
|
|
|
|
# error before finalizing, assume it's a network issue / retryable.
|
|
|
|
self.logger.exception("Retryable error when uploading job {}".format(format_job(job)))
|
|
|
|
self.logger.exception("Retryable error when uploading job {}".format(format_job(job)))
|
|
|
|
raise UploadError("Unhandled error in upload: {}".format(ex), retryable=True)
|
|
|
|
raise UploadError("Unhandled error in upload: {}".format(ex), retryable=True)
|
|
|
@ -552,7 +555,7 @@ class TranscodeChecker(object):
|
|
|
|
UPDATE events
|
|
|
|
UPDATE events
|
|
|
|
SET state = 'DONE', upload_time = %s
|
|
|
|
SET state = 'DONE', upload_time = %s
|
|
|
|
WHERE id = ANY (%s::uuid[]) AND state = 'TRANSCODING'
|
|
|
|
WHERE id = ANY (%s::uuid[]) AND state = 'TRANSCODING'
|
|
|
|
""", datetime.datetime.utcnow(), ids.keys())
|
|
|
|
""", datetime.datetime.utcnow(), list(ids.keys()))
|
|
|
|
return result.rowcount
|
|
|
|
return result.rowcount
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|