Add new state to postgres and implement cutter logic

pull/295/head
Mike Lang 3 years ago committed by Mike Lang
parent 1add3c5c22
commit c4a1d72240

@ -48,7 +48,12 @@ is not yet able to be played (or only at reduced resolution).
* `DONE`: An event whose video is ready for public consumption. As with `TRANSCODING`, if changes need * `DONE`: An event whose video is ready for public consumption. As with `TRANSCODING`, if changes need
to be made, an operator should manually delete or unlist the video then set the state back to be made, an operator should manually delete or unlist the video then set the state back
to `UNEDITED`. to `UNEDITED`, or modify the video if possible (see `MODIFIED`).
* `MODIFIED`: An event that was previously successfully uploaded, which has had some of its edit inputs
modified. Cutters will see this state and attempt to edit the video to match the new edit inputs,
though the possible edits depend on the upload backend. This only includes edits to metadata fields
like title, and should not require re-cutting the video. Once updated, the cutter returns the video to `DONE`.
The following transitions are possible: The following transitions are possible:
@ -85,15 +90,21 @@ and the upload location requires no further processing.
* `TRANSCODING -> DONE`: When any cutter detects that the upload location is finished * `TRANSCODING -> DONE`: When any cutter detects that the upload location is finished
transcoding the video, and it is ready for public consumption. transcoding the video, and it is ready for public consumption.
* `DONE -> MODIFIED`: When an operator modifies an uploaded video
* `MODIFIED -> DONE`: When a cutter successfully updates a modified video, or when
an operator cancels the modification (leaving the video in an indeterminate state,
which the operator is responsible for verifying).
This is summarised in the below graph: This is summarised in the below graph:
``` ```
retry retry ┌──────────┐
┌───────────────────────────────────────────────┐ ┌───────────────────────────────────────────────┐ │ MODIFIED │
│ │ │ │ └──────────┘
cancel │ │ cancel │ │ ∧ │
┌──────────────────────┼───────────────────┐ │ ┌──────────────────────┼───────────────────┐ │ modify │ │ updated
│ │ │ │
┌──────────┐ edit ┌────────┐ claim ┌─────────┐ pre-finalize ┌────────────┐ post-finalize ┌─────────────┐ when ready ┌──────┐ ┌──────────┐ edit ┌────────┐ claim ┌─────────┐ pre-finalize ┌────────────┐ post-finalize ┌─────────────┐ when ready ┌──────┐
│ │ ────────> │ │ ───────> │ │ ──────────────> │ │ ───────────────> │ TRANSCODING │ ────────────> │ DONE │ │ │ ────────> │ │ ───────> │ │ ──────────────> │ │ ───────────────> │ TRANSCODING │ ────────────> │ DONE │
│ │ │ │ │ │ │ │ └─────────────┘ └──────┘ │ │ │ │ │ │ │ │ └─────────────┘ └──────┘

@ -518,7 +518,6 @@ class Cutter(object):
)) ))
class TranscodeChecker(object): class TranscodeChecker(object):
NO_VIDEOS_RETRY_INTERVAL = 5 # can be fast because it's just a DB lookup NO_VIDEOS_RETRY_INTERVAL = 5 # can be fast because it's just a DB lookup
FOUND_VIDEOS_RETRY_INTERVAL = 20 FOUND_VIDEOS_RETRY_INTERVAL = 20
@ -591,6 +590,95 @@ class TranscodeChecker(object):
return result.rowcount return result.rowcount
class VideoUpdater(object):
CHECK_INTERVAL = 10 # this is slow to reduce the chance of multiple cutters updating the same row
ERROR_RETRY_INTERVAL = 20
def __init__(self, location, backend, dbmanager, stop):
"""
backend is an upload backend that supports video updates.
Stop is an Event triggering graceful shutdown when set.
"""
self.location = location
self.backend = backend
self.dbmanager = dbmanager
self.stop = stop
self.logger = logging.getLogger(type(self).__name__)
def wait(self, interval):
"""Wait for INTERVAL with jitter, unless we're stopping"""
self.stop.wait(common.jitter(interval))
def run(self):
self.conn = self.dbmanager.get_conn()
while not self.stop.is_set():
try:
videos = list(self.get_videos())
self.logger.info("Found {} videos in MODIFIED".format(len(videos)))
for id, video_id, title, description, tags in videos:
# NOTE: Since we aren't claiming videos, it's technically possible for this
# to happen:
# 1. we get MODIFIED video with title A
# 2. title is updated to B in database
# 3. someone else updates it to B in backend
# 4. we update it to A in backend
# 5. it appears to be successfully updated with B, but the title is actually A.
# This is unlikely and not a disaster, so we'll just live with it.
try:
self.backend.update_video(video_id, title, description, tags)
except Exception as ex:
self.logger.exception("Failed to update video")
self.mark_errored(id, "Failed to update video: {}".format(ex))
continue
marked = self.mark_done(id, video_id, title, description, tags)
if marked:
assert marked == 1
self.logger.info("Updated video {}".format(id))
else:
self.logger.warning("Updated video {}, but row has changed since. Did someone else already update it?".format(id))
self.wait(self.CHECK_INTERVAL)
except Exception:
self.logger.exception("Error in VideoUpdater")
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
# This is heavy-handed but simple and effective.
self.conn = self.dbmanager.get_conn()
self.wait(self.ERROR_RETRY_INTERVAL)
def get_videos(self):
# To avoid exhausting API quota, errors aren't retryable.
# We ignore any rows where error is not null.
return query(self.conn, """
SELECT id, video_id, video_title, video_description, video_tags
FROM events
WHERE state = 'MODIFIED' AND error IS NULL
""")
def mark_done(self, id, video_id, title, description, tags):
"""We don't want to set to DONE if the video has been modified *again* since
we saw it."""
args = dict(id=id, video_id=video_id, video_title=title, video_description=description, video_tags=tags)
built_query = sql.SQL("""
UPDATE events
SET state = 'DONE'
WHERE state = 'MODIFIED' AND {}
""").format(
sql.SQL(" AND ").join(
sql.SQL("{} = {}").format(sql.Identifier(key), get_column_placeholder(key))
for key in args
)
)
return query(self.conn, built_query, **args).rowcount
def mark_errored(self, id, error):
# We don't overwrite any existing error, it is most likely from another attempt to update
# anyway.
query(self.conn, """
UPDATE events
SET error = %s
WHERE id = %s and error IS NULL
""", error, id)
def main( def main(
dbconnect, dbconnect,
config, config,
@ -668,9 +756,11 @@ def main(
config = json.loads(config) config = json.loads(config)
upload_locations = {} upload_locations = {}
needs_transcode_check = {} needs_transcode_check = {}
needs_updater = {}
for location, backend_config in config.items(): for location, backend_config in config.items():
backend_type = backend_config.pop('type') backend_type = backend_config.pop('type')
no_transcode_check = backend_config.pop('no_transcode_check', False) no_transcode_check = backend_config.pop('no_transcode_check', False)
no_updater = backend_config.pop('no_updater', False)
cut_type = backend_config.pop('cut_type', 'full') cut_type = backend_config.pop('cut_type', 'full')
if backend_type == 'youtube': if backend_type == 'youtube':
backend_type = Youtube backend_type = Youtube
@ -687,15 +777,24 @@ def main(
upload_locations[location] = backend upload_locations[location] = backend
if backend.needs_transcode and not no_transcode_check: if backend.needs_transcode and not no_transcode_check:
needs_transcode_check[location] = backend needs_transcode_check[location] = backend
if not no_updater:
needs_updater[location] = backend
cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir, tags) cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir, tags)
transcode_checkers = [ transcode_checkers = [
TranscodeChecker(location, backend, dbmanager, stop) TranscodeChecker(location, backend, dbmanager, stop)
for location, backend in needs_transcode_check.items() for location, backend in needs_transcode_check.items()
] ]
updaters = [
VideoUpdater(location, backend, dbmanager, stop)
for location, backend in needs_updater.items()
]
jobs = [gevent.spawn(cutter.run)] + [ jobs = [gevent.spawn(cutter.run)] + [
gevent.spawn(transcode_checker.run) gevent.spawn(transcode_checker.run)
for transcode_checker in transcode_checkers for transcode_checker in transcode_checkers
] + [
gevent.spawn(updater.run)
for updater in updaters
] ]
# Block until any one exits # Block until any one exits
gevent.wait(jobs, count=1) gevent.wait(jobs, count=1)

@ -63,6 +63,11 @@ class UploadBackend(object):
If it does, it should also have a method check_status(ids) which takes a If it does, it should also have a method check_status(ids) which takes a
list of video ids and returns a list of the ones who have finished processing. list of video ids and returns a list of the ones who have finished processing.
If updating existing videos is supported, the backend should also define a method
update_video(video_id, title, description, tags).
Fields which cannot be updated may be ignored.
Must not change the video id or link. Returns nothing.
The upload backend also determines the encoding settings for the cutting The upload backend also determines the encoding settings for the cutting
process, this is given as a list of ffmpeg args process, this is given as a list of ffmpeg args
under the 'encoding_settings' attribute. under the 'encoding_settings' attribute.
@ -86,6 +91,9 @@ class UploadBackend(object):
def check_status(self, ids): def check_status(self, ids):
raise NotImplementedError raise NotImplementedError
def update_video(self, video_id, title, description, tags):
raise NotImplementedError
class Youtube(UploadBackend): class Youtube(UploadBackend):
"""Represents a youtube channel to upload to, and settings for doing so. """Represents a youtube channel to upload to, and settings for doing so.
@ -200,6 +208,47 @@ class Youtube(UploadBackend):
output.append(item['id']) output.append(item['id'])
return output return output
def update_video(self, video_id, title, description, tags):
# Any values we don't give will be deleted on PUT, so we need to first
# get all the existing values then merge in our updates.
resp = self.client.request('GET',
'https://www.googleapis.com/youtube/v3/videos',
params={
'part': 'id,snippet',
'id': video_id,
},
metric_name='get_video',
)
resp.raise_for_status()
data = resp.json()['items']
if len(data) == 0:
raise Exception("Could not find video {}".format(video_id))
assert len(data) == 1
data = data[0]
snippet = data['snippet'].copy()
snippet['title'] = title
snippet['description'] = description
snippet['tags'] = tags
# Since we're fetching this data anyway, we can save some quota by avoiding repeated work.
# We could still race and do the same update twice, but that's fine.
if snippet == data['snippet']:
self.logger.info("Skipping update for video {}: No changes".format(video_id))
return
resp = self.client.request('PUT',
'https://www.googleapis.com/youtube/v3/videos',
params={
'part': 'id,snippet',
},
json={
'id': video_id,
'snippet': snippet,
},
metric_name='update_video',
)
resp.raise_for_status()
class Local(UploadBackend): class Local(UploadBackend):
"""An "upload" backend that just saves the file to local disk. """An "upload" backend that just saves the file to local disk.
@ -260,3 +309,14 @@ class Local(UploadBackend):
else: else:
url = 'file://{}'.format(filepath) url = 'file://{}'.format(filepath)
return video_id, url return video_id, url
def update_video(self, video_id, title, description, tags):
if not self.write_info:
return
safe_title = re.sub('[^A-Za-z0-9_]', '-', title)
with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f:
common.writeall(f.write, json.dumps({
'title': title,
'description': description,
'tags': tags,
}) + '\n')

@ -43,7 +43,8 @@ CREATE TYPE event_state as ENUM (
'CLAIMED', 'CLAIMED',
'FINALIZING', 'FINALIZING',
'TRANSCODING', 'TRANSCODING',
'DONE' 'DONE',
'MODIFIED'
); );
CREATE TYPE video_range as ( CREATE TYPE video_range as (
@ -89,8 +90,8 @@ CREATE TABLE events (
video_link TEXT CHECK (state != 'DONE' OR video_link IS NOT NULL), video_link TEXT CHECK (state != 'DONE' OR video_link IS NOT NULL),
editor TEXT, editor TEXT,
edit_time TIMESTAMP CHECK (state = 'UNEDITED' OR editor IS NOT NULL), edit_time TIMESTAMP CHECK (state = 'UNEDITED' OR editor IS NOT NULL),
upload_time TIMESTAMP CHECK (state != 'DONE' OR upload_time IS NOT NULL) upload_time TIMESTAMP CHECK (state != 'DONE' OR upload_time IS NOT NULL),
last_modified TIMESTAMP CHECK (state != 'MODIFIED' OR last_modified IS NOT NULL)
); );
-- Index on state, since that's almost always what we're querying on besides id -- Index on state, since that's almost always what we're querying on besides id

Loading…
Cancel
Save