Implement thumbnails in cutter

pull/302/head
Mike Lang 2 years ago committed by Mike Lang
parent ce1f50db06
commit d3e21ae9b0

@ -1,5 +1,6 @@
import datetime
import hashlib
import json
import logging
import os
@ -16,7 +17,8 @@ from psycopg2 import sql
import common
from common.database import DBManager, query, get_column_placeholder
from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles
from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, extract_frame, ContainsHoles
from common.images import compose_thumbnail_template
from common.stats import timed
from .upload_backends import Youtube, Local, UploadError
@ -66,11 +68,17 @@ CUT_JOB_PARAMS = [
"video_tags",
"video_channel",
"video_quality",
"thumbnail_mode",
"thumbnail_time",
"thumbnail_template",
"thumbnail_image",
]
CutJob = namedtuple('CutJob', [
"id",
# for each range, the list of segments as returned by get_best_segments()
"segment_ranges",
# if any, the segments we need to create a thumbnail from
"thumbnail_segments",
# params which map directly from DB columns
] + CUT_JOB_PARAMS)
@ -198,7 +206,7 @@ class Cutter(object):
self.logger.info("Set error for candidate {}".format(format_job(candidate)))
try:
segment_ranges = self.check_candidate(candidate)
segment_ranges, thumbnail_segments = self.check_candidate(candidate)
except ContainsHoles:
self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate)))
set_error(
@ -221,7 +229,7 @@ class Cutter(object):
self.wait(self.ERROR_RETRY_INTERVAL)
continue
return CutJob(segment_ranges=segment_ranges, **candidate._asdict())
return CutJob(segment_ranges=segment_ranges, thumbnail_segments=thumbnail_segments, **candidate._asdict())
# No candidates
no_candidates.inc()
@ -264,7 +272,13 @@ class Cutter(object):
if segments == [None]:
raise ContainsHoles
segment_ranges.append(segments)
return segment_ranges
# Also check the thumbnail time if we need to generate it
thumbnail_segments = None
if candidate.thumbnail_mode in ('BARE', 'TEMPLATE') and candidate.thumbnail_image is None:
thumbnail_segments = get_best_segments(hours_path, candidate.thumbnail_time, candidate.thumbnail_time)
if thumbnail_segments == [None]:
raise ContainsHoles
return segment_ranges, thumbnail_segments
@timed(
video_channel = lambda ret, self, job: job.video_channel,
@ -308,12 +322,19 @@ class Cutter(object):
TRANSCODING or DONE.
Handles various error conditions:
* Errors generating thumbnail: Assumed to be non-retryable until thumbnail parameters
are changed by operator. Sets error and rolls back to UNEDITED.
* Errors while cutting: Assumed to be non-retryable until cut parameters are changed
by operator. Sets error and rolls back to UNEDITED.
* Request error before request body closed: Assumed to be a transient network failure,
immediately retryable. Sets error and rolls back to EDITED.
* Request error after request body closed: It's unknown whether the request went through.
Sets error and remains in FINALIZING. Operator intervention is required.
* Request error when setting thumbnail: Assumed to be a transient network failure,
but the video was already uploaded. So we can't set back to EDITED. Instead, we set error
error and set state to MODIFIED, indicating the video's state and the database are out
of sync. Since MODIFIED rows with an error are not processed, an operator will need to
dismiss the error before continuing.
* Row has changed (no longer claimed by us) before request body closed:
Assumed an operator has made changes and changed state back. Abort cutting without error.
* Row has changed (no longer claimed by us) after request body closed:
@ -326,9 +347,12 @@ class Cutter(object):
upload_backend = self.upload_locations[job.upload_location]
self.logger.info("Cutting and uploading job {} to {}".format(format_job(job), upload_backend))
# This flag tracks whether we've told requests to finalize the upload,
# This flag tracks the state of the upload request:
# * 'not finished'
# * 'finishing'
# * 'finished'
# and serves to detect whether errors from the request call are recoverable.
finalize_begun = False
upload_finished = 'not finished'
# This exception indicates a job we thought was ours somehow disappeared
# while we were still trying to cut it. This most likely represents a logic error
@ -353,9 +377,9 @@ class Cutter(object):
))
result = query(self.conn, built_query, id=job.id, name=self.name, **kwargs)
if result.rowcount != 1:
# If we hadn't yet set finalizing, then this means an operator cancelled the job
# If we hadn't yet finished the upload, then this means an operator cancelled the job
# while we were cutting it. This isn't a problem.
if not finalize_begun:
if upload_finished == 'not finished':
raise JobCancelled()
raise JobConsistencyError("No job with id {} and uploader {} when setting: {}".format(
job.id, self.name, ", ".join("{} = {!r}".format(k, v) for k, v in kwargs.items())
@ -366,10 +390,10 @@ class Cutter(object):
# do things in between the data being finished and finalizing the request.
# This is also where we do the main error handling.
# Tell python to use the finalize_begun variable from the enclosing scope,
# Tell python to use the upload_finished variable from the enclosing scope,
# instead of creating a new (shadowing) variable which is the default when
# you do "variable = value".
nonlocal finalize_begun
nonlocal upload_finished
try:
if upload_backend.encoding_settings is None:
@ -403,12 +427,44 @@ class Cutter(object):
self.logger.debug("Setting job to finalizing")
set_row(state='FINALIZING')
finalize_begun = True
upload_finished = 'finishing'
# Now we return from this generator, and any unknown errors between now and returning
# from the upload backend are not recoverable.
def generate_thumbnail():
# no need to generate if it already exists, or no thumbnail is desired
if job.thumbnail_mode == 'NONE':
return None
if job.thumbnail_image is not None:
return job.thumbnail_image
frame = extract_frame(job.thumbnail_segments, job.thumbnail_time)
# collect chunks into one bytestring as we need to use it multiple times
frame = b''.join(frame)
if job.thumbnail_mode == 'BARE':
image_data = frame
elif job.thumbnail_mode == 'TEMPLATE':
image_data = compose_thumbnail_template(self.segments_path, job.thumbnail_template, frame)
else:
# shouldn't be able to happen given database constraints
assert False, "Bad thumbnail mode: {}".format(job.thumbnail_mode)
# Save what we've generated to the database now, easier than doing it later
# and might save some effort if we need to retry.
set_row(thumbnail_image=image_data)
return image_data
try:
# Get thumbnail image, generating it if needed
try:
thumbnail = generate_thumbnail()
except Exception as ex:
self.logger.exception("Error occurred while trying to generate thumbnail for job {}".format(format_job(job)))
# Assumed error is not retryable
raise UploadError("Error while generating thumbnail: {}".format(ex), retryable=False)
# UploadErrors in the except block below should be caught
# the same as UploadErrors in the main try block, so we wrap
# a second try around the whole thing.
@ -421,6 +477,9 @@ class Cutter(object):
public=job.public,
data=upload_wrapper(),
)
upload_finished = 'finished'
if thumbnail is not None:
upload_backend.set_thumbnail(video_id, thumbnail)
except (JobConsistencyError, JobCancelled, UploadError):
raise # this ensures these aren't not caught in the except Exception block
except Exception as ex:
@ -430,31 +489,53 @@ class Cutter(object):
if isinstance(ex, requests.HTTPError):
ex = "{}: {}".format(ex, ex.response.content)
if not finalize_begun:
if upload_finished == 'not finished':
# error before finalizing, assume it's a network issue / retryable.
self.logger.exception("Retryable error when uploading job {}".format(format_job(job)))
raise UploadError("Unhandled error in upload: {}".format(ex), retryable=True)
# unknown error during finalizing, set it in the database and leave it
# stuck in FINALIZING state for operator intervention.
self.logger.critical((
"Error occurred while finalizing upload of job {}. "
"You will need to check the state of the video manually."
).format(format_job(job)), exc_info=True)
error = (
"An error occurred during FINALIZING, please determine if video was actually "
"uploaded or not and either move to TRANSCODING/DONE and populate video_id or rollback "
"to EDITED and clear uploader. "
"Error: {}"
).format(ex)
upload_errors.labels(
video_channel=job.video_channel,
video_quality=job.video_quality,
upload_location=job.upload_location,
final_state='FINALIZING',
).inc()
set_row(error=error)
return
elif upload_finished == 'finished':
# error after finalizing, ie. during thumbnail upload.
# put the video in MODIFIED to indicate it's out of sync, but set error
# so an operator will check what happened before correcting it.
self.logger.exception("Error setting thumbnail in job {}".format(format_job(job)))
upload_errors.labels(
video_channel=job.video_channel,
video_quality=job.video_quality,
upload_location=job.upload_location,
final_state='MODIFIED',
).inc()
set_row(
state='MODIFIED',
last_modified=datetime.datetime.utcnow(),
error="Error setting thumbnail: {}".format(ex),
)
return
elif upload_finished == 'finishing':
# unknown error during finalizing, set it in the database and leave it
# stuck in FINALIZING state for operator intervention.
self.logger.critical((
"Error occurred while finalizing upload of job {}. "
"You will need to check the state of the video manually."
).format(format_job(job)), exc_info=True)
error = (
"An error occurred during FINALIZING, please determine if video was actually "
"uploaded or not and either move to TRANSCODING/DONE and populate video_id or rollback "
"to EDITED and clear uploader. "
"Error: {}"
).format(ex)
upload_errors.labels(
video_channel=job.video_channel,
video_quality=job.video_quality,
upload_location=job.upload_location,
final_state='FINALIZING',
).inc()
set_row(error=error)
return
else:
assert False, "Bad upload_finished value: {!r}".format(upload_finished)
except UploadError as ex:
# At this stage, we assume whatever raised UploadError has already
@ -482,9 +563,14 @@ class Cutter(object):
return
# Success! Set TRANSCODING or DONE and clear any previous error.
# Also set thumbnail_last_written if we wrote a thumbnail.
success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE'
maybe_upload_time = {"upload_time": datetime.datetime.utcnow()} if success_state == 'DONE' else {}
set_row(state=success_state, video_id=video_id, video_link=video_link, error=None, **maybe_upload_time)
kwargs = {}
if success_state == 'DONE':
kwargs["upload_time"] = datetime.datetime.utcnow()
if thumbnail is not None:
kwargs["thumbnail_last_written"] = hashlib.sha256(thumbnail).digest()
set_row(state=success_state, video_id=video_id, video_link=video_link, error=None, **kwargs)
self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), video_link))
videos_uploaded.labels(video_channel=job.video_channel,
@ -592,16 +678,35 @@ class TranscodeChecker(object):
return result.rowcount
UPDATE_JOB_PARAMS = [
"video_id",
"video_channel",
"video_quality",
"video_title",
"video_description",
"video_tags",
"public",
"thumbnail_mode",
"thumbnail_time",
"thumbnail_template",
"thumbnail_image",
"thumbnail_last_written",
]
UpdateJob = namedtuple('UpdateJob', [
"id",
] + UPDATE_JOB_PARAMS)
class VideoUpdater(object):
CHECK_INTERVAL = 10 # this is slow to reduce the chance of multiple cutters updating the same row
ERROR_RETRY_INTERVAL = 20
def __init__(self, location, backend, dbmanager, stop):
def __init__(self, location, segments_path, backend, dbmanager, stop):
"""
backend is an upload backend that supports video updates.
Stop is an Event triggering graceful shutdown when set.
"""
self.location = location
self.segments_path = segments_path
self.backend = backend
self.dbmanager = dbmanager
self.stop = stop
@ -615,9 +720,9 @@ class VideoUpdater(object):
self.conn = self.dbmanager.get_conn()
while not self.stop.is_set():
try:
videos = list(self.get_videos())
videos = self.get_videos()
self.logger.info("Found {} videos in MODIFIED".format(len(videos)))
for id, video_id, title, description, tags, public in videos:
for job in videos:
# NOTE: Since we aren't claiming videos, it's technically possible for this
# to happen:
# 1. we get MODIFIED video with title A
@ -626,18 +731,45 @@ class VideoUpdater(object):
# 4. we update it to A in backend
# 5. it appears to be successfully updated with B, but the title is actually A.
# This is unlikely and not a disaster, so we'll just live with it.
updates = {}
try:
self.backend.update_video(video_id, title, description, tags, public)
# Update video metadata
self.backend.update_video(job.video_id, job.video_title, job.video_description, job.video_tags, job.public)
# Update thumbnail if needed. This might fail if we don't have the right segments,
# but that should be very rare and can be dealt with out of band.
if job.thumbnail_mode != 'NONE':
thumbnail_image = job.thumbnail_image
if thumbnail_image is None:
self.logger.info("Regenerating thumbnail for {}".format(job.id))
hours_path = os.path.join(self.segments_path, job.video_channel, job.video_quality)
segments = get_best_segments(hours_path, job.thumbnail_time, job.thumbnail_time)
frame = extract_frame(segments, job.thumbnail_time)
frame = b''.join(frame)
if job.thumbnail_mode == 'BARE':
thumbnail_image = frame
elif job.thumbnail_mode == 'TEMPLATE':
thumbnail_image = compose_thumbnail_template(self.segments_path, job.thumbnail_template, frame)
else:
assert False, "Bad thumbnail mode: {}".format(job.thumbnail_mode)
updates['thumbnail_image'] = thumbnail_image
new_hash = hashlib.sha256(thumbnail_image).digest()
if job.thumbnail_last_written != new_hash:
self.logger.info("Setting thumbnail for {}".format(job.id))
self.backend.set_thumbnail(job.video_id, job.thumbnail_image)
updates['thumbnail_last_written'] = new_hash
except Exception as ex:
self.logger.exception("Failed to update video")
self.mark_errored(id, "Failed to update video: {}".format(ex))
self.mark_errored(job.id, "Failed to update video: {}".format(ex))
continue
marked = self.mark_done(id, video_id, title, description, tags, public)
marked = self.mark_done(job, updates)
if marked:
assert marked == 1
self.logger.info("Updated video {}".format(id))
self.logger.info("Updated video {}".format(job.id))
else:
self.logger.warning("Updated video {}, but row has changed since. Did someone else already update it?".format(id))
self.logger.warning("Updated video {}, but row has changed since. Did someone else already update it?".format(job.id))
self.wait(self.CHECK_INTERVAL)
except Exception:
self.logger.exception("Error in VideoUpdater")
@ -649,27 +781,35 @@ class VideoUpdater(object):
def get_videos(self):
# To avoid exhausting API quota, errors aren't retryable.
# We ignore any rows where error is not null.
return query(self.conn, """
SELECT id, video_id, video_title, video_description, video_tags, public
built_query = sql.SQL("""
SELECT id, {}
FROM events
WHERE state = 'MODIFIED' AND error IS NULL
""")
""").format(
sql.SQL(", ").join(sql.Identifier(key) for key in UPDATE_JOB_PARAMS)
)
return [UpdateJob(**row) for row in query(self.conn, built_query)]
def mark_done(self, id, video_id, title, description, tags, public):
def mark_done(self, job, updates):
"""We don't want to set to DONE if the video has been modified *again* since
we saw it."""
args = dict(id=id, video_id=video_id, video_title=title, video_description=description, video_tags=tags, public=public)
built_query = sql.SQL("""
UPDATE events
SET state = 'DONE'
SET state = 'DONE', {}
WHERE state = 'MODIFIED' AND {}
""").format(
sql.SQL(", ").join(
sql.SQL("{} = {}").format(
sql.Identifier(key), get_column_placeholder("new_{}".format(key)),
) for key in updates
),
sql.SQL(" AND ").join(
sql.SQL("{} = {}").format(sql.Identifier(key), get_column_placeholder(key))
for key in args
for key in UPDATE_JOB_PARAMS
)
)
return query(self.conn, built_query, **args).rowcount
updates = {"new_{}".format(key): value for key, value in updates.items()}
return query(self.conn, built_query, **job, **updates).rowcount
def mark_errored(self, id, error):
# We don't overwrite any existing error, it is most likely from another attempt to update
@ -788,7 +928,7 @@ def main(
for location, backend in needs_transcode_check.items()
]
updaters = [
VideoUpdater(location, backend, dbmanager, stop)
VideoUpdater(location, base_dir, backend, dbmanager, stop)
for location, backend in needs_updater.items()
]
jobs = [gevent.spawn(cutter.run)] + [

@ -69,6 +69,10 @@ class UploadBackend(object):
Fields which cannot be updated may be ignored.
Must not change the video id or link. Returns nothing.
If uploading thumbnails for a video is supported, the backend should define a method
set_thumbnail(video_id, thumbnail) where thumbnail is a bytestring containing image data.
Returns nothing.
The upload backend also determines the encoding settings for the cutting
process, this is given as a list of ffmpeg args
under the 'encoding_settings' attribute.
@ -95,6 +99,9 @@ class UploadBackend(object):
def update_video(self, video_id, title, description, tags, public):
raise NotImplementedError
def set_thumbnail(self, video_id, thumbnail):
raise NotImplementedError
class Youtube(UploadBackend):
"""Represents a youtube channel to upload to, and settings for doing so.
@ -249,6 +256,14 @@ class Youtube(UploadBackend):
)
resp.raise_for_status()
def set_thumbnail(self, video_id, thumbnail):
resp = self.client.request('POST',
'https://www.googleapis.com/youtube/v3/thumbnails/set',
params={'videoId': video_id},
body=thumbnail,
)
resp.raise_for_status()
class Local(UploadBackend):
"""An "upload" backend that just saves the file to local disk.

Loading…
Cancel
Save