Merge pull request #113 from ekimekim/mike/cutter/upload-errors

Refactor error handling in uploads
pull/115/head
Mike Lang 5 years ago committed by GitHub
commit b2d3faeab2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,7 +18,7 @@ import common
from common.database import DBManager, query
from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles
from .upload_backends import Youtube, Local
from .upload_backends import Youtube, Local, UploadError
videos_uploaded = prom.Counter(
@ -281,11 +281,6 @@ class Cutter(object):
# a closure (as py2 lacks the nonlocal keyword).
finalize_begun = [False]
# This dummy exception is used to pass control flow back out of upload_wrapper
# if we've already handled the error and do not need to do anything further.
class ErrorHandled(Exception):
pass
# This exception indicates a job we thought was ours somehow disappeared
# while we were still trying to cut it. This most likely represents a logic error
# or that our instance is in a bad state, and will be raised up to run() to terminate
@ -294,11 +289,8 @@ class Cutter(object):
pass
def set_row(**kwargs):
"""Set columns on the row being cut. Returns True on success,
False if row could not be found.
Example:
if not set_row(state='UNEDITED', error=e):
<handle row having gone missing>
"""Set columns on the row being cut. Raises JobConsistencyError on failure.
Example: set_row(state='UNEDITED', error=e)
"""
# construct an UPDATE query like "SET key1=%(key1)s, key2=%(key2)s, ..."
built_query = sql.SQL("""
@ -311,7 +303,10 @@ class Cutter(object):
) for key in kwargs
))
result = query(self.conn, built_query, id=job.id, name=self.name, **kwargs)
return result.rowcount == 1
if result.rowcount != 1:
raise JobConsistencyError("No job with id {} and uploader {} when setting: {}".format(
job.id, self.name, ", ".join("{} = {!r}".format(k, v) for k, v in kwargs.items())
))
def upload_wrapper():
# This generator wraps the cut_segments generator so we can
@ -323,61 +318,54 @@ class Cutter(object):
yield chunk
except Exception as ex:
self.logger.exception("Error occurred while trying to cut job {}".format(format_job(job)))
# Assumed error is not retryable, set state back to UNEDITED and set error.
if not set_row(state='UNEDITED', error="Error while cutting: {}".format(ex), uploader=None):
self.logger.warning("Tried to roll back row {} to unedited but it was already cancelled.".format(job.id))
upload_errors.labels(video_channel=job.video_channel,
video_quality=job.video_quality,
upload_location=job.upload_location,
final_state='UNEDITED').inc()
# Abort the cut without further error handling
raise ErrorHandled
# Assumed error is not retryable
raise UploadError("Error while cutting: {}".format(ex), retryable=False)
# The data is now fully uploaded, but the request is not finalized.
# We now set the DB state to finalized so we know about failures during this
# critical section.
self.logger.debug("Setting job to finalizing")
if not set_row(state='FINALIZING'):
# Abort the cut and crash the program, forcing a state resync
raise JobConsistencyError(
"No job with id {} and uploader {} when setting FINALIZING"
.format(job.id, self.name)
)
set_row(state='FINALIZING')
finalize_begun[0] = True
# Now we return from this generator, and any errors between now and returning
# from requests.post() are not recoverable.
# Now we return from this generator, and any unknown errors between now and returning
# from the upload backend are not recoverable.
try:
video_id, video_link = upload_backend.upload_video(
title=(
"{} - {}".format(self.title_header, job.video_title)
if self.title_header else job.video_title
),
description=(
"{}\n\n{}".format(job.video_description, self.description_footer)
if self.description_footer else job.video_description
),
# Add category and sheet_name as tags
tags=self.tags + [job.category, job.sheet_name],
data=upload_wrapper(),
)
except JobConsistencyError:
raise # this ensures it's not caught in the next except block
except ErrorHandled:
# we're aborting the cut, error handling has already happened
return
except Exception as ex:
self.refresh_conn()
# UploadErrors in the except block below should be caught
# the same as UploadErrors in the main try block, so we wrap
# a second try around the whole thing.
try:
video_id, video_link = upload_backend.upload_video(
title=(
"{} - {}".format(self.title_header, job.video_title)
if self.title_header else job.video_title
),
description=(
"{}\n\n{}".format(job.video_description, self.description_footer)
if self.description_footer else job.video_description
),
# Add category and sheet_name as tags
tags=self.tags + [job.category, job.sheet_name],
data=upload_wrapper(),
)
except (JobConsistencyError, UploadError):
raise # this ensures these aren't not caught in the except Exception block
except Exception as ex:
self.refresh_conn()
# for HTTPErrors, getting http response body is also useful
if isinstance(ex, requests.HTTPError):
ex = "{}: {}".format(ex, ex.response.content)
# for HTTPErrors, getting http response body is also useful
if isinstance(ex, requests.HTTPError):
ex = "{}: {}".format(ex, ex.response.content)
if not finalize_begun[0]:
# error before finalizing, assume it's a network issue / retryable.
self.logger.exception("Retryable error when uploading job {}".format(format_job(job)))
raise UploadError("Unhandled error in upload: {}".format(ex), retryable=True)
# if error during finalizing, set it in the database and leave it
# stuck in FINALIZING state for operator intervention.
if finalize_begun[0]:
# unknown error during finalizing, set it in the database and leave it
# stuck in FINALIZING state for operator intervention.
self.logger.critical((
"Error occurred while finalizing upload of job {}. "
"You will need to check the state of the video manually."
@ -388,43 +376,42 @@ class Cutter(object):
"to EDITED and clear uploader. "
"Error: {}"
).format(ex)
upload_errors.labels(video_channel=job.video_channel,
video_quality=job.video_quality,
upload_location=job.upload_location,
final_state='FINALIZING').inc()
if not set_row(error=error):
# Not only do we not know if it was uploaded, we also failed to set that in the database!
raise JobConsistencyError(
"No job with id {} and uploader {} when setting error while finalizing!"
.format(job.id, self.name)
)
return
# error before finalizing, assume it's a network issue / retryable.
# set back to EDITED but still set error
self.logger.exception("Retryable error when uploading job {}".format(format_job(job)))
upload_errors.labels(video_channel=job.video_channel,
upload_errors.labels(
video_channel=job.video_channel,
video_quality=job.video_quality,
upload_location=job.upload_location,
final_state='EDITED').inc()
if not set_row(state='EDITED', error="Retryable error while uploading: {}".format(ex), uploader=None):
raise JobConsistencyError(
"No job with id {} and uploader {} when setting error while rolling back for retryable error"
.format(job.id, self.name)
)
# pause briefly so we don't immediately grab the same one again in a rapid retry loop
gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL)
final_state='FINALIZING',
).inc()
set_row(error=error)
return
except UploadError as ex:
# At this stage, we assume whatever raised UploadError has already
# logged about it. We're just setting the database as appropriate.
# If it's retryable, we clear uploader and set back to EDITED.
# If it isn't, we don't clear uploader (so we know where it failed)
# and we set it back to UNEDITED, waiting for an editor to manually retry.
if ex.retryable:
state = 'EDITED'
kwargs = {}
else:
state = 'UNEDITED'
kwargs = {'uploader': None}
upload_errors.labels(
video_channel=job.video_channel,
video_quality=job.video_quality,
upload_location=job.upload_location,
final_state=state,
).inc()
set_row(state=state, error=str(ex), **kwargs)
if ex.retryable:
# pause briefly so we don't immediately grab the same one again in a rapid retry loop
gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL)
return
# Success! Set TRANSCODING or DONE and clear any previous error.
success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE'
if not set_row(state=success_state, video_id=video_id, video_link=video_link, error=None):
# This will result in it being stuck in FINALIZING, and an operator will need to go
# confirm it was really uploaded.
raise JobConsistencyError(
"No job with id {} and uploader {} when setting to {}"
.format(job.id, self.name, success_state)
)
set_row(state=success_state, video_id=video_id, video_link=video_link, error=None)
self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), video_link))
videos_uploaded.labels(video_channel=job.video_channel,

@ -9,6 +9,40 @@ import uuid
from common.googleapis import GoogleAPIClient
class UploadError(Exception):
"""Upload backends should raise this error when uploading
and an expected failure occurs.
In particular, they should NOT raise this error if they're
unsure whether the video was uploaded or not.
They should also indicate if the error is retryable without
manual intervention.
Examples of retryable errors:
Short-term rate limits (try again in a few seconds)
Upload backends which are fully idempotent
Examples of unretryable errors:
Bad Request (indicates logic bug, or that the video is unacceptable in some way)
Long-term rate limits (trying again quickly is counterproductive, wait for operator)
Examples of errors which should not be caught, allowing the FINALIZING logic
to determine if it's safe to retry:
500s (We don't know if the request went through)
Network errors (same as above)
Unexpected exceptions (they might have occurred after the upload is finished)
Raisers should log the underlying exception before raising, as this error
will not be further logged.
"""
def __init__(self, error, retryable=False):
"""Error should be a string error message to put into the database."""
self.error = error
self.retryable = retryable
def __str__(self):
return "{} error while uploading: {}".format(
"Retryable" if self.retryable else "Non-retryable",
self.error,
)
class UploadBackend(object):
"""Represents a place a video can be uploaded,
and maintains any state needed to perform uploads.
@ -110,9 +144,16 @@ class Youtube(UploadBackend):
},
json=json,
)
resp.raise_for_status()
if not resp.ok:
# Don't retry, because failed calls still count against our upload quota.
# The risk of repeated failed attempts blowing through our quota is too high.
raise UploadError("Youtube create video call failed with {resp.status_code}: {resp.content}".format(resp=resp))
upload_url = resp.headers['Location']
resp = self.client.request('POST', upload_url, data=data)
if 400 <= resp.status_code < 500:
# As above, don't retry. But with 4xx's we know the upload didn't go through.
# On a 5xx, we can't be sure (the server is in an unspecified state).
raise UploadError("Youtube video data upload failed with {status_code}: {resp.content}".format(resp=resp))
resp.raise_for_status()
id = resp.json()['id']
return id, 'https://youtu.be/{}'.format(id)
@ -176,16 +217,21 @@ class Local(UploadBackend):
ext = 'ts' if self.encoding_settings is None else 'mp4'
filename = '{}-{}.{}'.format(safe_title, video_id, ext)
filepath = os.path.join(self.path, filename)
if self.write_info:
with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f:
f.write(json.dumps({
'title': title,
'description': description,
'tags': tags,
}) + '\n')
with open(filepath, 'w') as f:
for chunk in data:
f.write(chunk)
try:
if self.write_info:
with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f:
f.write(json.dumps({
'title': title,
'description': description,
'tags': tags,
}) + '\n')
with open(filepath, 'w') as f:
for chunk in data:
f.write(chunk)
except (OSError, IOError) as e:
# Because duplicate videos don't actually matter with this backend,
# we consider all disk errors retryable.
raise UploadError("{} while writing local file: {}".format(type(e).__name__, e), retryable=True)
if self.url_prefix is not None:
url = self.url_prefix + filename
else:

Loading…
Cancel
Save