From 736040435c7c504da71591a8d16c000e52304e7c Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 23 Oct 2019 22:06:18 +1100 Subject: [PATCH 1/2] Refactor error handling in uploads Instead of handling each error condition seperately, we raise an UploadError which includes whether it's retryable. The advantage of this is that upload backends can also raise an UploadError to indicate two conditions it currently cannot: That an error is unretryable That an error is retryable, even if the row was already in finalizing Under this scheme, errors while cutting become unretryable UploadErrors, and unhandled exceptions in uploading become retryable UploadErrors if the row is not yet finalizing only. --- cutter/cutter/main.py | 159 ++++++++++++++----------------- cutter/cutter/upload_backends.py | 34 +++++++ 2 files changed, 107 insertions(+), 86 deletions(-) diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index dc8fa76..733b391 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -18,7 +18,7 @@ import common from common.database import DBManager, query from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles -from .upload_backends import Youtube, Local +from .upload_backends import Youtube, Local, UploadError videos_uploaded = prom.Counter( @@ -281,11 +281,6 @@ class Cutter(object): # a closure (as py2 lacks the nonlocal keyword). finalize_begun = [False] - # This dummy exception is used to pass control flow back out of upload_wrapper - # if we've already handled the error and do not need to do anything further. - class ErrorHandled(Exception): - pass - # This exception indicates a job we thought was ours somehow disappeared # while we were still trying to cut it. This most likely represents a logic error # or that our instance is in a bad state, and will be raised up to run() to terminate @@ -294,11 +289,8 @@ class Cutter(object): pass def set_row(**kwargs): - """Set columns on the row being cut. Returns True on success, - False if row could not be found. - Example: - if not set_row(state='UNEDITED', error=e): - + """Set columns on the row being cut. Raises JobConsistencyError on failure. + Example: set_row(state='UNEDITED', error=e) """ # construct an UPDATE query like "SET key1=%(key1)s, key2=%(key2)s, ..." built_query = sql.SQL(""" @@ -311,7 +303,10 @@ class Cutter(object): ) for key in kwargs )) result = query(self.conn, built_query, id=job.id, name=self.name, **kwargs) - return result.rowcount == 1 + if result.rowcount != 1: + raise JobConsistencyError("No job with id {} and uploader {} when setting: {}".format( + job.id, self.name, ", ".join("{} = {!r}".format(k, v) for k, v in kwargs.items()) + )) def upload_wrapper(): # This generator wraps the cut_segments generator so we can @@ -323,61 +318,54 @@ class Cutter(object): yield chunk except Exception as ex: self.logger.exception("Error occurred while trying to cut job {}".format(format_job(job))) - # Assumed error is not retryable, set state back to UNEDITED and set error. - if not set_row(state='UNEDITED', error="Error while cutting: {}".format(ex), uploader=None): - self.logger.warning("Tried to roll back row {} to unedited but it was already cancelled.".format(job.id)) - upload_errors.labels(video_channel=job.video_channel, - video_quality=job.video_quality, - upload_location=job.upload_location, - final_state='UNEDITED').inc() - # Abort the cut without further error handling - raise ErrorHandled + # Assumed error is not retryable + raise UploadError("Error while cutting: {}".format(ex), retryable=False) # The data is now fully uploaded, but the request is not finalized. # We now set the DB state to finalized so we know about failures during this # critical section. self.logger.debug("Setting job to finalizing") - if not set_row(state='FINALIZING'): - # Abort the cut and crash the program, forcing a state resync - raise JobConsistencyError( - "No job with id {} and uploader {} when setting FINALIZING" - .format(job.id, self.name) - ) + set_row(state='FINALIZING') finalize_begun[0] = True - # Now we return from this generator, and any errors between now and returning - # from requests.post() are not recoverable. + # Now we return from this generator, and any unknown errors between now and returning + # from the upload backend are not recoverable. try: - video_id, video_link = upload_backend.upload_video( - title=( - "{} - {}".format(self.title_header, job.video_title) - if self.title_header else job.video_title - ), - description=( - "{}\n\n{}".format(job.video_description, self.description_footer) - if self.description_footer else job.video_description - ), - # Add category and sheet_name as tags - tags=self.tags + [job.category, job.sheet_name], - data=upload_wrapper(), - ) - except JobConsistencyError: - raise # this ensures it's not caught in the next except block - except ErrorHandled: - # we're aborting the cut, error handling has already happened - return - except Exception as ex: - self.refresh_conn() + # UploadErrors in the except block below should be caught + # the same as UploadErrors in the main try block, so we wrap + # a second try around the whole thing. + try: + video_id, video_link = upload_backend.upload_video( + title=( + "{} - {}".format(self.title_header, job.video_title) + if self.title_header else job.video_title + ), + description=( + "{}\n\n{}".format(job.video_description, self.description_footer) + if self.description_footer else job.video_description + ), + # Add category and sheet_name as tags + tags=self.tags + [job.category, job.sheet_name], + data=upload_wrapper(), + ) + except (JobConsistencyError, UploadError): + raise # this ensures these aren't not caught in the except Exception block + except Exception as ex: + self.refresh_conn() + + # for HTTPErrors, getting http response body is also useful + if isinstance(ex, requests.HTTPError): + ex = "{}: {}".format(ex, ex.response.content) - # for HTTPErrors, getting http response body is also useful - if isinstance(ex, requests.HTTPError): - ex = "{}: {}".format(ex, ex.response.content) + if not finalize_begun[0]: + # error before finalizing, assume it's a network issue / retryable. + self.logger.exception("Retryable error when uploading job {}".format(format_job(job))) + raise UploadError("Unhandled error in upload: {}".format(ex), retryable=True) - # if error during finalizing, set it in the database and leave it - # stuck in FINALIZING state for operator intervention. - if finalize_begun[0]: + # unknown error during finalizing, set it in the database and leave it + # stuck in FINALIZING state for operator intervention. self.logger.critical(( "Error occurred while finalizing upload of job {}. " "You will need to check the state of the video manually." @@ -388,43 +376,42 @@ class Cutter(object): "to EDITED and clear uploader. " "Error: {}" ).format(ex) - upload_errors.labels(video_channel=job.video_channel, - video_quality=job.video_quality, - upload_location=job.upload_location, - final_state='FINALIZING').inc() - if not set_row(error=error): - # Not only do we not know if it was uploaded, we also failed to set that in the database! - raise JobConsistencyError( - "No job with id {} and uploader {} when setting error while finalizing!" - .format(job.id, self.name) - ) - return - - # error before finalizing, assume it's a network issue / retryable. - # set back to EDITED but still set error - self.logger.exception("Retryable error when uploading job {}".format(format_job(job))) - upload_errors.labels(video_channel=job.video_channel, + upload_errors.labels( + video_channel=job.video_channel, video_quality=job.video_quality, upload_location=job.upload_location, - final_state='EDITED').inc() - if not set_row(state='EDITED', error="Retryable error while uploading: {}".format(ex), uploader=None): - raise JobConsistencyError( - "No job with id {} and uploader {} when setting error while rolling back for retryable error" - .format(job.id, self.name) - ) - # pause briefly so we don't immediately grab the same one again in a rapid retry loop - gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL) + final_state='FINALIZING', + ).inc() + set_row(error=error) + return + + except UploadError as ex: + # At this stage, we assume whatever raised UploadError has already + # logged about it. We're just setting the database as appropriate. + # If it's retryable, we clear uploader and set back to EDITED. + # If it isn't, we don't clear uploader (so we know where it failed) + # and we set it back to UNEDITED, waiting for an editor to manually retry. + if ex.retryable: + state = 'EDITED' + kwargs = {} + else: + state = 'UNEDITED' + kwargs = {'uploader': None} + upload_errors.labels( + video_channel=job.video_channel, + video_quality=job.video_quality, + upload_location=job.upload_location, + final_state=state, + ).inc() + set_row(state=state, error=str(ex), **kwargs) + if ex.retryable: + # pause briefly so we don't immediately grab the same one again in a rapid retry loop + gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL) return # Success! Set TRANSCODING or DONE and clear any previous error. success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE' - if not set_row(state=success_state, video_id=video_id, video_link=video_link, error=None): - # This will result in it being stuck in FINALIZING, and an operator will need to go - # confirm it was really uploaded. - raise JobConsistencyError( - "No job with id {} and uploader {} when setting to {}" - .format(job.id, self.name, success_state) - ) + set_row(state=success_state, video_id=video_id, video_link=video_link, error=None) self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), video_link)) videos_uploaded.labels(video_channel=job.video_channel, diff --git a/cutter/cutter/upload_backends.py b/cutter/cutter/upload_backends.py index 7d8e38d..3d29bf5 100644 --- a/cutter/cutter/upload_backends.py +++ b/cutter/cutter/upload_backends.py @@ -9,6 +9,40 @@ import uuid from common.googleapis import GoogleAPIClient +class UploadError(Exception): + """Upload backends should raise this error when uploading + and an expected failure occurs. + In particular, they should NOT raise this error if they're + unsure whether the video was uploaded or not. + They should also indicate if the error is retryable without + manual intervention. + Examples of retryable errors: + Authorization errors (likely a bad config - let another node get it) + Short-term rate limits (try again in a few seconds) + Examples of unretryable errors: + Bad Request (indicates logic bug, or that the video is unacceptable in some way) + Long-term rate limits (trying again quickly is counterproductive, wait for operator) + Examples of errors which should not be caught, allowing the FINALIZING logic + to determine if it's safe to retry: + 500s (We don't know if the request went through) + Network errors (same as above) + Unexpected exceptions (they might have occurred after the upload is finished) + + Raisers should log the underlying exception before raising, as this error + will not be further logged. + """ + def __init__(self, error, retryable=False): + """Error should be a string error message to put into the database.""" + self.error = error + self.retryable = retryable + + def __str__(self): + return "{} error while uploading: {}".format( + "Retryable" if self.retryable else "Non-retryable", + self.error, + ) + + class UploadBackend(object): """Represents a place a video can be uploaded, and maintains any state needed to perform uploads. From 596cd9264442212952a6140f0ced8ef4440cf1a8 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 23 Oct 2019 22:23:30 +1100 Subject: [PATCH 2/2] cutter: Add more specific error handling to upload backends For youtube, know that 4xx's are safe even if finalizing was set. For local, make all disk errors retryable since it doesn't matter. --- cutter/cutter/upload_backends.py | 36 +++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/cutter/cutter/upload_backends.py b/cutter/cutter/upload_backends.py index 3d29bf5..d9ab1b0 100644 --- a/cutter/cutter/upload_backends.py +++ b/cutter/cutter/upload_backends.py @@ -17,8 +17,8 @@ class UploadError(Exception): They should also indicate if the error is retryable without manual intervention. Examples of retryable errors: - Authorization errors (likely a bad config - let another node get it) Short-term rate limits (try again in a few seconds) + Upload backends which are fully idempotent Examples of unretryable errors: Bad Request (indicates logic bug, or that the video is unacceptable in some way) Long-term rate limits (trying again quickly is counterproductive, wait for operator) @@ -144,9 +144,16 @@ class Youtube(UploadBackend): }, json=json, ) - resp.raise_for_status() + if not resp.ok: + # Don't retry, because failed calls still count against our upload quota. + # The risk of repeated failed attempts blowing through our quota is too high. + raise UploadError("Youtube create video call failed with {resp.status_code}: {resp.content}".format(resp=resp)) upload_url = resp.headers['Location'] resp = self.client.request('POST', upload_url, data=data) + if 400 <= resp.status_code < 500: + # As above, don't retry. But with 4xx's we know the upload didn't go through. + # On a 5xx, we can't be sure (the server is in an unspecified state). + raise UploadError("Youtube video data upload failed with {status_code}: {resp.content}".format(resp=resp)) resp.raise_for_status() id = resp.json()['id'] return id, 'https://youtu.be/{}'.format(id) @@ -210,16 +217,21 @@ class Local(UploadBackend): ext = 'ts' if self.encoding_settings is None else 'mp4' filename = '{}-{}.{}'.format(safe_title, video_id, ext) filepath = os.path.join(self.path, filename) - if self.write_info: - with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f: - f.write(json.dumps({ - 'title': title, - 'description': description, - 'tags': tags, - }) + '\n') - with open(filepath, 'w') as f: - for chunk in data: - f.write(chunk) + try: + if self.write_info: + with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f: + f.write(json.dumps({ + 'title': title, + 'description': description, + 'tags': tags, + }) + '\n') + with open(filepath, 'w') as f: + for chunk in data: + f.write(chunk) + except (OSError, IOError) as e: + # Because duplicate videos don't actually matter with this backend, + # we consider all disk errors retryable. + raise UploadError("{} while writing local file: {}".format(type(e).__name__, e), retryable=True) if self.url_prefix is not None: url = self.url_prefix + filename else: