From d3e21ae9b00f991784f3e9c1bf82f34ba49e31a1 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 12 Sep 2022 17:18:22 +1000 Subject: [PATCH] Implement thumbnails in cutter --- cutter/cutter/main.py | 242 ++++++++++++++++++++++++------- cutter/cutter/upload_backends.py | 15 ++ 2 files changed, 206 insertions(+), 51 deletions(-) diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 3968de0..162840b 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -1,5 +1,6 @@ import datetime +import hashlib import json import logging import os @@ -16,7 +17,8 @@ from psycopg2 import sql import common from common.database import DBManager, query, get_column_placeholder -from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles +from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, extract_frame, ContainsHoles +from common.images import compose_thumbnail_template from common.stats import timed from .upload_backends import Youtube, Local, UploadError @@ -66,11 +68,17 @@ CUT_JOB_PARAMS = [ "video_tags", "video_channel", "video_quality", + "thumbnail_mode", + "thumbnail_time", + "thumbnail_template", + "thumbnail_image", ] CutJob = namedtuple('CutJob', [ "id", # for each range, the list of segments as returned by get_best_segments() "segment_ranges", + # if any, the segments we need to create a thumbnail from + "thumbnail_segments", # params which map directly from DB columns ] + CUT_JOB_PARAMS) @@ -198,7 +206,7 @@ class Cutter(object): self.logger.info("Set error for candidate {}".format(format_job(candidate))) try: - segment_ranges = self.check_candidate(candidate) + segment_ranges, thumbnail_segments = self.check_candidate(candidate) except ContainsHoles: self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate))) set_error( @@ -221,7 +229,7 @@ class Cutter(object): self.wait(self.ERROR_RETRY_INTERVAL) continue - return CutJob(segment_ranges=segment_ranges, **candidate._asdict()) + return CutJob(segment_ranges=segment_ranges, thumbnail_segments=thumbnail_segments, **candidate._asdict()) # No candidates no_candidates.inc() @@ -264,7 +272,13 @@ class Cutter(object): if segments == [None]: raise ContainsHoles segment_ranges.append(segments) - return segment_ranges + # Also check the thumbnail time if we need to generate it + thumbnail_segments = None + if candidate.thumbnail_mode in ('BARE', 'TEMPLATE') and candidate.thumbnail_image is None: + thumbnail_segments = get_best_segments(hours_path, candidate.thumbnail_time, candidate.thumbnail_time) + if thumbnail_segments == [None]: + raise ContainsHoles + return segment_ranges, thumbnail_segments @timed( video_channel = lambda ret, self, job: job.video_channel, @@ -308,12 +322,19 @@ class Cutter(object): TRANSCODING or DONE. Handles various error conditions: + * Errors generating thumbnail: Assumed to be non-retryable until thumbnail parameters + are changed by operator. Sets error and rolls back to UNEDITED. * Errors while cutting: Assumed to be non-retryable until cut parameters are changed by operator. Sets error and rolls back to UNEDITED. * Request error before request body closed: Assumed to be a transient network failure, immediately retryable. Sets error and rolls back to EDITED. * Request error after request body closed: It's unknown whether the request went through. Sets error and remains in FINALIZING. Operator intervention is required. + * Request error when setting thumbnail: Assumed to be a transient network failure, + but the video was already uploaded. So we can't set back to EDITED. Instead, we set error + error and set state to MODIFIED, indicating the video's state and the database are out + of sync. Since MODIFIED rows with an error are not processed, an operator will need to + dismiss the error before continuing. * Row has changed (no longer claimed by us) before request body closed: Assumed an operator has made changes and changed state back. Abort cutting without error. * Row has changed (no longer claimed by us) after request body closed: @@ -326,9 +347,12 @@ class Cutter(object): upload_backend = self.upload_locations[job.upload_location] self.logger.info("Cutting and uploading job {} to {}".format(format_job(job), upload_backend)) - # This flag tracks whether we've told requests to finalize the upload, + # This flag tracks the state of the upload request: + # * 'not finished' + # * 'finishing' + # * 'finished' # and serves to detect whether errors from the request call are recoverable. - finalize_begun = False + upload_finished = 'not finished' # This exception indicates a job we thought was ours somehow disappeared # while we were still trying to cut it. This most likely represents a logic error @@ -353,9 +377,9 @@ class Cutter(object): )) result = query(self.conn, built_query, id=job.id, name=self.name, **kwargs) if result.rowcount != 1: - # If we hadn't yet set finalizing, then this means an operator cancelled the job + # If we hadn't yet finished the upload, then this means an operator cancelled the job # while we were cutting it. This isn't a problem. - if not finalize_begun: + if upload_finished == 'not finished': raise JobCancelled() raise JobConsistencyError("No job with id {} and uploader {} when setting: {}".format( job.id, self.name, ", ".join("{} = {!r}".format(k, v) for k, v in kwargs.items()) @@ -366,10 +390,10 @@ class Cutter(object): # do things in between the data being finished and finalizing the request. # This is also where we do the main error handling. - # Tell python to use the finalize_begun variable from the enclosing scope, + # Tell python to use the upload_finished variable from the enclosing scope, # instead of creating a new (shadowing) variable which is the default when # you do "variable = value". - nonlocal finalize_begun + nonlocal upload_finished try: if upload_backend.encoding_settings is None: @@ -403,12 +427,44 @@ class Cutter(object): self.logger.debug("Setting job to finalizing") set_row(state='FINALIZING') - finalize_begun = True + upload_finished = 'finishing' # Now we return from this generator, and any unknown errors between now and returning # from the upload backend are not recoverable. + def generate_thumbnail(): + # no need to generate if it already exists, or no thumbnail is desired + if job.thumbnail_mode == 'NONE': + return None + if job.thumbnail_image is not None: + return job.thumbnail_image + + frame = extract_frame(job.thumbnail_segments, job.thumbnail_time) + # collect chunks into one bytestring as we need to use it multiple times + frame = b''.join(frame) + + if job.thumbnail_mode == 'BARE': + image_data = frame + elif job.thumbnail_mode == 'TEMPLATE': + image_data = compose_thumbnail_template(self.segments_path, job.thumbnail_template, frame) + else: + # shouldn't be able to happen given database constraints + assert False, "Bad thumbnail mode: {}".format(job.thumbnail_mode) + + # Save what we've generated to the database now, easier than doing it later + # and might save some effort if we need to retry. + set_row(thumbnail_image=image_data) + return image_data + try: + # Get thumbnail image, generating it if needed + try: + thumbnail = generate_thumbnail() + except Exception as ex: + self.logger.exception("Error occurred while trying to generate thumbnail for job {}".format(format_job(job))) + # Assumed error is not retryable + raise UploadError("Error while generating thumbnail: {}".format(ex), retryable=False) + # UploadErrors in the except block below should be caught # the same as UploadErrors in the main try block, so we wrap # a second try around the whole thing. @@ -421,6 +477,9 @@ class Cutter(object): public=job.public, data=upload_wrapper(), ) + upload_finished = 'finished' + if thumbnail is not None: + upload_backend.set_thumbnail(video_id, thumbnail) except (JobConsistencyError, JobCancelled, UploadError): raise # this ensures these aren't not caught in the except Exception block except Exception as ex: @@ -430,31 +489,53 @@ class Cutter(object): if isinstance(ex, requests.HTTPError): ex = "{}: {}".format(ex, ex.response.content) - if not finalize_begun: + if upload_finished == 'not finished': # error before finalizing, assume it's a network issue / retryable. self.logger.exception("Retryable error when uploading job {}".format(format_job(job))) raise UploadError("Unhandled error in upload: {}".format(ex), retryable=True) - # unknown error during finalizing, set it in the database and leave it - # stuck in FINALIZING state for operator intervention. - self.logger.critical(( - "Error occurred while finalizing upload of job {}. " - "You will need to check the state of the video manually." - ).format(format_job(job)), exc_info=True) - error = ( - "An error occurred during FINALIZING, please determine if video was actually " - "uploaded or not and either move to TRANSCODING/DONE and populate video_id or rollback " - "to EDITED and clear uploader. " - "Error: {}" - ).format(ex) - upload_errors.labels( - video_channel=job.video_channel, - video_quality=job.video_quality, - upload_location=job.upload_location, - final_state='FINALIZING', - ).inc() - set_row(error=error) - return + elif upload_finished == 'finished': + # error after finalizing, ie. during thumbnail upload. + # put the video in MODIFIED to indicate it's out of sync, but set error + # so an operator will check what happened before correcting it. + self.logger.exception("Error setting thumbnail in job {}".format(format_job(job))) + upload_errors.labels( + video_channel=job.video_channel, + video_quality=job.video_quality, + upload_location=job.upload_location, + final_state='MODIFIED', + ).inc() + set_row( + state='MODIFIED', + last_modified=datetime.datetime.utcnow(), + error="Error setting thumbnail: {}".format(ex), + ) + return + + elif upload_finished == 'finishing': + # unknown error during finalizing, set it in the database and leave it + # stuck in FINALIZING state for operator intervention. + self.logger.critical(( + "Error occurred while finalizing upload of job {}. " + "You will need to check the state of the video manually." + ).format(format_job(job)), exc_info=True) + error = ( + "An error occurred during FINALIZING, please determine if video was actually " + "uploaded or not and either move to TRANSCODING/DONE and populate video_id or rollback " + "to EDITED and clear uploader. " + "Error: {}" + ).format(ex) + upload_errors.labels( + video_channel=job.video_channel, + video_quality=job.video_quality, + upload_location=job.upload_location, + final_state='FINALIZING', + ).inc() + set_row(error=error) + return + + else: + assert False, "Bad upload_finished value: {!r}".format(upload_finished) except UploadError as ex: # At this stage, we assume whatever raised UploadError has already @@ -482,9 +563,14 @@ class Cutter(object): return # Success! Set TRANSCODING or DONE and clear any previous error. + # Also set thumbnail_last_written if we wrote a thumbnail. success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE' - maybe_upload_time = {"upload_time": datetime.datetime.utcnow()} if success_state == 'DONE' else {} - set_row(state=success_state, video_id=video_id, video_link=video_link, error=None, **maybe_upload_time) + kwargs = {} + if success_state == 'DONE': + kwargs["upload_time"] = datetime.datetime.utcnow() + if thumbnail is not None: + kwargs["thumbnail_last_written"] = hashlib.sha256(thumbnail).digest() + set_row(state=success_state, video_id=video_id, video_link=video_link, error=None, **kwargs) self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), video_link)) videos_uploaded.labels(video_channel=job.video_channel, @@ -592,16 +678,35 @@ class TranscodeChecker(object): return result.rowcount +UPDATE_JOB_PARAMS = [ + "video_id", + "video_channel", + "video_quality", + "video_title", + "video_description", + "video_tags", + "public", + "thumbnail_mode", + "thumbnail_time", + "thumbnail_template", + "thumbnail_image", + "thumbnail_last_written", +] +UpdateJob = namedtuple('UpdateJob', [ + "id", +] + UPDATE_JOB_PARAMS) + class VideoUpdater(object): CHECK_INTERVAL = 10 # this is slow to reduce the chance of multiple cutters updating the same row ERROR_RETRY_INTERVAL = 20 - def __init__(self, location, backend, dbmanager, stop): + def __init__(self, location, segments_path, backend, dbmanager, stop): """ backend is an upload backend that supports video updates. Stop is an Event triggering graceful shutdown when set. """ self.location = location + self.segments_path = segments_path self.backend = backend self.dbmanager = dbmanager self.stop = stop @@ -615,9 +720,9 @@ class VideoUpdater(object): self.conn = self.dbmanager.get_conn() while not self.stop.is_set(): try: - videos = list(self.get_videos()) + videos = self.get_videos() self.logger.info("Found {} videos in MODIFIED".format(len(videos))) - for id, video_id, title, description, tags, public in videos: + for job in videos: # NOTE: Since we aren't claiming videos, it's technically possible for this # to happen: # 1. we get MODIFIED video with title A @@ -626,18 +731,45 @@ class VideoUpdater(object): # 4. we update it to A in backend # 5. it appears to be successfully updated with B, but the title is actually A. # This is unlikely and not a disaster, so we'll just live with it. + + updates = {} try: - self.backend.update_video(video_id, title, description, tags, public) + # Update video metadata + self.backend.update_video(job.video_id, job.video_title, job.video_description, job.video_tags, job.public) + + # Update thumbnail if needed. This might fail if we don't have the right segments, + # but that should be very rare and can be dealt with out of band. + if job.thumbnail_mode != 'NONE': + thumbnail_image = job.thumbnail_image + if thumbnail_image is None: + self.logger.info("Regenerating thumbnail for {}".format(job.id)) + hours_path = os.path.join(self.segments_path, job.video_channel, job.video_quality) + segments = get_best_segments(hours_path, job.thumbnail_time, job.thumbnail_time) + frame = extract_frame(segments, job.thumbnail_time) + frame = b''.join(frame) + if job.thumbnail_mode == 'BARE': + thumbnail_image = frame + elif job.thumbnail_mode == 'TEMPLATE': + thumbnail_image = compose_thumbnail_template(self.segments_path, job.thumbnail_template, frame) + else: + assert False, "Bad thumbnail mode: {}".format(job.thumbnail_mode) + updates['thumbnail_image'] = thumbnail_image + new_hash = hashlib.sha256(thumbnail_image).digest() + if job.thumbnail_last_written != new_hash: + self.logger.info("Setting thumbnail for {}".format(job.id)) + self.backend.set_thumbnail(job.video_id, job.thumbnail_image) + updates['thumbnail_last_written'] = new_hash except Exception as ex: self.logger.exception("Failed to update video") - self.mark_errored(id, "Failed to update video: {}".format(ex)) + self.mark_errored(job.id, "Failed to update video: {}".format(ex)) continue - marked = self.mark_done(id, video_id, title, description, tags, public) + + marked = self.mark_done(job, updates) if marked: assert marked == 1 - self.logger.info("Updated video {}".format(id)) + self.logger.info("Updated video {}".format(job.id)) else: - self.logger.warning("Updated video {}, but row has changed since. Did someone else already update it?".format(id)) + self.logger.warning("Updated video {}, but row has changed since. Did someone else already update it?".format(job.id)) self.wait(self.CHECK_INTERVAL) except Exception: self.logger.exception("Error in VideoUpdater") @@ -649,27 +781,35 @@ class VideoUpdater(object): def get_videos(self): # To avoid exhausting API quota, errors aren't retryable. # We ignore any rows where error is not null. - return query(self.conn, """ - SELECT id, video_id, video_title, video_description, video_tags, public + built_query = sql.SQL(""" + SELECT id, {} FROM events WHERE state = 'MODIFIED' AND error IS NULL - """) + """).format( + sql.SQL(", ").join(sql.Identifier(key) for key in UPDATE_JOB_PARAMS) + ) + return [UpdateJob(**row) for row in query(self.conn, built_query)] - def mark_done(self, id, video_id, title, description, tags, public): + def mark_done(self, job, updates): """We don't want to set to DONE if the video has been modified *again* since we saw it.""" - args = dict(id=id, video_id=video_id, video_title=title, video_description=description, video_tags=tags, public=public) built_query = sql.SQL(""" UPDATE events - SET state = 'DONE' + SET state = 'DONE', {} WHERE state = 'MODIFIED' AND {} """).format( + sql.SQL(", ").join( + sql.SQL("{} = {}").format( + sql.Identifier(key), get_column_placeholder("new_{}".format(key)), + ) for key in updates + ), sql.SQL(" AND ").join( sql.SQL("{} = {}").format(sql.Identifier(key), get_column_placeholder(key)) - for key in args + for key in UPDATE_JOB_PARAMS ) ) - return query(self.conn, built_query, **args).rowcount + updates = {"new_{}".format(key): value for key, value in updates.items()} + return query(self.conn, built_query, **job, **updates).rowcount def mark_errored(self, id, error): # We don't overwrite any existing error, it is most likely from another attempt to update @@ -788,7 +928,7 @@ def main( for location, backend in needs_transcode_check.items() ] updaters = [ - VideoUpdater(location, backend, dbmanager, stop) + VideoUpdater(location, base_dir, backend, dbmanager, stop) for location, backend in needs_updater.items() ] jobs = [gevent.spawn(cutter.run)] + [ diff --git a/cutter/cutter/upload_backends.py b/cutter/cutter/upload_backends.py index 60d9303..b4b2968 100644 --- a/cutter/cutter/upload_backends.py +++ b/cutter/cutter/upload_backends.py @@ -69,6 +69,10 @@ class UploadBackend(object): Fields which cannot be updated may be ignored. Must not change the video id or link. Returns nothing. + If uploading thumbnails for a video is supported, the backend should define a method + set_thumbnail(video_id, thumbnail) where thumbnail is a bytestring containing image data. + Returns nothing. + The upload backend also determines the encoding settings for the cutting process, this is given as a list of ffmpeg args under the 'encoding_settings' attribute. @@ -95,6 +99,9 @@ class UploadBackend(object): def update_video(self, video_id, title, description, tags, public): raise NotImplementedError + def set_thumbnail(self, video_id, thumbnail): + raise NotImplementedError + class Youtube(UploadBackend): """Represents a youtube channel to upload to, and settings for doing so. @@ -249,6 +256,14 @@ class Youtube(UploadBackend): ) resp.raise_for_status() + def set_thumbnail(self, video_id, thumbnail): + resp = self.client.request('POST', + 'https://www.googleapis.com/youtube/v3/thumbnails/set', + params={'videoId': video_id}, + body=thumbnail, + ) + resp.raise_for_status() + class Local(UploadBackend): """An "upload" backend that just saves the file to local disk.