diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index dc8fa76..733b391 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -18,7 +18,7 @@ import common from common.database import DBManager, query from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles -from .upload_backends import Youtube, Local +from .upload_backends import Youtube, Local, UploadError videos_uploaded = prom.Counter( @@ -281,11 +281,6 @@ class Cutter(object): # a closure (as py2 lacks the nonlocal keyword). finalize_begun = [False] - # This dummy exception is used to pass control flow back out of upload_wrapper - # if we've already handled the error and do not need to do anything further. - class ErrorHandled(Exception): - pass - # This exception indicates a job we thought was ours somehow disappeared # while we were still trying to cut it. This most likely represents a logic error # or that our instance is in a bad state, and will be raised up to run() to terminate @@ -294,11 +289,8 @@ class Cutter(object): pass def set_row(**kwargs): - """Set columns on the row being cut. Returns True on success, - False if row could not be found. - Example: - if not set_row(state='UNEDITED', error=e): - + """Set columns on the row being cut. Raises JobConsistencyError on failure. + Example: set_row(state='UNEDITED', error=e) """ # construct an UPDATE query like "SET key1=%(key1)s, key2=%(key2)s, ..." built_query = sql.SQL(""" @@ -311,7 +303,10 @@ class Cutter(object): ) for key in kwargs )) result = query(self.conn, built_query, id=job.id, name=self.name, **kwargs) - return result.rowcount == 1 + if result.rowcount != 1: + raise JobConsistencyError("No job with id {} and uploader {} when setting: {}".format( + job.id, self.name, ", ".join("{} = {!r}".format(k, v) for k, v in kwargs.items()) + )) def upload_wrapper(): # This generator wraps the cut_segments generator so we can @@ -323,61 +318,54 @@ class Cutter(object): yield chunk except Exception as ex: self.logger.exception("Error occurred while trying to cut job {}".format(format_job(job))) - # Assumed error is not retryable, set state back to UNEDITED and set error. - if not set_row(state='UNEDITED', error="Error while cutting: {}".format(ex), uploader=None): - self.logger.warning("Tried to roll back row {} to unedited but it was already cancelled.".format(job.id)) - upload_errors.labels(video_channel=job.video_channel, - video_quality=job.video_quality, - upload_location=job.upload_location, - final_state='UNEDITED').inc() - # Abort the cut without further error handling - raise ErrorHandled + # Assumed error is not retryable + raise UploadError("Error while cutting: {}".format(ex), retryable=False) # The data is now fully uploaded, but the request is not finalized. # We now set the DB state to finalized so we know about failures during this # critical section. self.logger.debug("Setting job to finalizing") - if not set_row(state='FINALIZING'): - # Abort the cut and crash the program, forcing a state resync - raise JobConsistencyError( - "No job with id {} and uploader {} when setting FINALIZING" - .format(job.id, self.name) - ) + set_row(state='FINALIZING') finalize_begun[0] = True - # Now we return from this generator, and any errors between now and returning - # from requests.post() are not recoverable. + # Now we return from this generator, and any unknown errors between now and returning + # from the upload backend are not recoverable. try: - video_id, video_link = upload_backend.upload_video( - title=( - "{} - {}".format(self.title_header, job.video_title) - if self.title_header else job.video_title - ), - description=( - "{}\n\n{}".format(job.video_description, self.description_footer) - if self.description_footer else job.video_description - ), - # Add category and sheet_name as tags - tags=self.tags + [job.category, job.sheet_name], - data=upload_wrapper(), - ) - except JobConsistencyError: - raise # this ensures it's not caught in the next except block - except ErrorHandled: - # we're aborting the cut, error handling has already happened - return - except Exception as ex: - self.refresh_conn() + # UploadErrors in the except block below should be caught + # the same as UploadErrors in the main try block, so we wrap + # a second try around the whole thing. + try: + video_id, video_link = upload_backend.upload_video( + title=( + "{} - {}".format(self.title_header, job.video_title) + if self.title_header else job.video_title + ), + description=( + "{}\n\n{}".format(job.video_description, self.description_footer) + if self.description_footer else job.video_description + ), + # Add category and sheet_name as tags + tags=self.tags + [job.category, job.sheet_name], + data=upload_wrapper(), + ) + except (JobConsistencyError, UploadError): + raise # this ensures these aren't not caught in the except Exception block + except Exception as ex: + self.refresh_conn() + + # for HTTPErrors, getting http response body is also useful + if isinstance(ex, requests.HTTPError): + ex = "{}: {}".format(ex, ex.response.content) - # for HTTPErrors, getting http response body is also useful - if isinstance(ex, requests.HTTPError): - ex = "{}: {}".format(ex, ex.response.content) + if not finalize_begun[0]: + # error before finalizing, assume it's a network issue / retryable. + self.logger.exception("Retryable error when uploading job {}".format(format_job(job))) + raise UploadError("Unhandled error in upload: {}".format(ex), retryable=True) - # if error during finalizing, set it in the database and leave it - # stuck in FINALIZING state for operator intervention. - if finalize_begun[0]: + # unknown error during finalizing, set it in the database and leave it + # stuck in FINALIZING state for operator intervention. self.logger.critical(( "Error occurred while finalizing upload of job {}. " "You will need to check the state of the video manually." @@ -388,43 +376,42 @@ class Cutter(object): "to EDITED and clear uploader. " "Error: {}" ).format(ex) - upload_errors.labels(video_channel=job.video_channel, - video_quality=job.video_quality, - upload_location=job.upload_location, - final_state='FINALIZING').inc() - if not set_row(error=error): - # Not only do we not know if it was uploaded, we also failed to set that in the database! - raise JobConsistencyError( - "No job with id {} and uploader {} when setting error while finalizing!" - .format(job.id, self.name) - ) - return - - # error before finalizing, assume it's a network issue / retryable. - # set back to EDITED but still set error - self.logger.exception("Retryable error when uploading job {}".format(format_job(job))) - upload_errors.labels(video_channel=job.video_channel, + upload_errors.labels( + video_channel=job.video_channel, video_quality=job.video_quality, upload_location=job.upload_location, - final_state='EDITED').inc() - if not set_row(state='EDITED', error="Retryable error while uploading: {}".format(ex), uploader=None): - raise JobConsistencyError( - "No job with id {} and uploader {} when setting error while rolling back for retryable error" - .format(job.id, self.name) - ) - # pause briefly so we don't immediately grab the same one again in a rapid retry loop - gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL) + final_state='FINALIZING', + ).inc() + set_row(error=error) + return + + except UploadError as ex: + # At this stage, we assume whatever raised UploadError has already + # logged about it. We're just setting the database as appropriate. + # If it's retryable, we clear uploader and set back to EDITED. + # If it isn't, we don't clear uploader (so we know where it failed) + # and we set it back to UNEDITED, waiting for an editor to manually retry. + if ex.retryable: + state = 'EDITED' + kwargs = {} + else: + state = 'UNEDITED' + kwargs = {'uploader': None} + upload_errors.labels( + video_channel=job.video_channel, + video_quality=job.video_quality, + upload_location=job.upload_location, + final_state=state, + ).inc() + set_row(state=state, error=str(ex), **kwargs) + if ex.retryable: + # pause briefly so we don't immediately grab the same one again in a rapid retry loop + gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL) return # Success! Set TRANSCODING or DONE and clear any previous error. success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE' - if not set_row(state=success_state, video_id=video_id, video_link=video_link, error=None): - # This will result in it being stuck in FINALIZING, and an operator will need to go - # confirm it was really uploaded. - raise JobConsistencyError( - "No job with id {} and uploader {} when setting to {}" - .format(job.id, self.name, success_state) - ) + set_row(state=success_state, video_id=video_id, video_link=video_link, error=None) self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), video_link)) videos_uploaded.labels(video_channel=job.video_channel, diff --git a/cutter/cutter/upload_backends.py b/cutter/cutter/upload_backends.py index 7d8e38d..d9ab1b0 100644 --- a/cutter/cutter/upload_backends.py +++ b/cutter/cutter/upload_backends.py @@ -9,6 +9,40 @@ import uuid from common.googleapis import GoogleAPIClient +class UploadError(Exception): + """Upload backends should raise this error when uploading + and an expected failure occurs. + In particular, they should NOT raise this error if they're + unsure whether the video was uploaded or not. + They should also indicate if the error is retryable without + manual intervention. + Examples of retryable errors: + Short-term rate limits (try again in a few seconds) + Upload backends which are fully idempotent + Examples of unretryable errors: + Bad Request (indicates logic bug, or that the video is unacceptable in some way) + Long-term rate limits (trying again quickly is counterproductive, wait for operator) + Examples of errors which should not be caught, allowing the FINALIZING logic + to determine if it's safe to retry: + 500s (We don't know if the request went through) + Network errors (same as above) + Unexpected exceptions (they might have occurred after the upload is finished) + + Raisers should log the underlying exception before raising, as this error + will not be further logged. + """ + def __init__(self, error, retryable=False): + """Error should be a string error message to put into the database.""" + self.error = error + self.retryable = retryable + + def __str__(self): + return "{} error while uploading: {}".format( + "Retryable" if self.retryable else "Non-retryable", + self.error, + ) + + class UploadBackend(object): """Represents a place a video can be uploaded, and maintains any state needed to perform uploads. @@ -110,9 +144,16 @@ class Youtube(UploadBackend): }, json=json, ) - resp.raise_for_status() + if not resp.ok: + # Don't retry, because failed calls still count against our upload quota. + # The risk of repeated failed attempts blowing through our quota is too high. + raise UploadError("Youtube create video call failed with {resp.status_code}: {resp.content}".format(resp=resp)) upload_url = resp.headers['Location'] resp = self.client.request('POST', upload_url, data=data) + if 400 <= resp.status_code < 500: + # As above, don't retry. But with 4xx's we know the upload didn't go through. + # On a 5xx, we can't be sure (the server is in an unspecified state). + raise UploadError("Youtube video data upload failed with {status_code}: {resp.content}".format(resp=resp)) resp.raise_for_status() id = resp.json()['id'] return id, 'https://youtu.be/{}'.format(id) @@ -176,16 +217,21 @@ class Local(UploadBackend): ext = 'ts' if self.encoding_settings is None else 'mp4' filename = '{}-{}.{}'.format(safe_title, video_id, ext) filepath = os.path.join(self.path, filename) - if self.write_info: - with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f: - f.write(json.dumps({ - 'title': title, - 'description': description, - 'tags': tags, - }) + '\n') - with open(filepath, 'w') as f: - for chunk in data: - f.write(chunk) + try: + if self.write_info: + with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f: + f.write(json.dumps({ + 'title': title, + 'description': description, + 'tags': tags, + }) + '\n') + with open(filepath, 'w') as f: + for chunk in data: + f.write(chunk) + except (OSError, IOError) as e: + # Because duplicate videos don't actually matter with this backend, + # we consider all disk errors retryable. + raise UploadError("{} while writing local file: {}".format(type(e).__name__, e), retryable=True) if self.url_prefix is not None: url = self.url_prefix + filename else: