Merge pull request #100 from ekimekim/mike/cutter/multiple-locations

cutter: Allow multiple upload locations
pull/102/head
Mike Lang 5 years ago committed by GitHub
commit 1159a518f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,7 +18,7 @@ import common
from common.database import DBManager, query from common.database import DBManager, query
from common.segments import get_best_segments, cut_segments, ContainsHoles from common.segments import get_best_segments, cut_segments, ContainsHoles
from .youtube import Youtube from .upload_backends import Youtube, Local
videos_uploaded = prom.Counter( videos_uploaded = prom.Counter(
@ -72,15 +72,15 @@ class Cutter(object):
ERROR_RETRY_INTERVAL = 5 ERROR_RETRY_INTERVAL = 5
RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL = 5 RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL = 5
def __init__(self, youtube, dbmanager, stop, name, segments_path): def __init__(self, upload_locations, dbmanager, stop, name, segments_path):
"""youtube is an authenticated and initialized youtube api client. """upload_locations is a map {location name: upload location backend}
Conn is a database connection. Conn is a database connection.
Stop is an Event triggering graceful shutdown when set. Stop is an Event triggering graceful shutdown when set.
Name is this uploader's unique name. Name is this uploader's unique name.
Segments path is where to look for segments. Segments path is where to look for segments.
""" """
self.name = name self.name = name
self.youtube = youtube self.upload_locations = upload_locations
self.dbmanager = dbmanager self.dbmanager = dbmanager
self.stop = stop self.stop = stop
self.segments_path = segments_path self.segments_path = segments_path
@ -186,15 +186,18 @@ class Cutter(object):
def list_candidates(self): def list_candidates(self):
"""Return a list of all available candidates that we might be able to cut.""" """Return a list of all available candidates that we might be able to cut."""
# We only accept candidates if they haven't excluded us by whitelist,
# and we are capable of uploading to their desired upload location.
built_query = sql.SQL(""" built_query = sql.SQL("""
SELECT id, {} SELECT id, {}
FROM events FROM events
WHERE state = 'EDITED' WHERE state = 'EDITED'
AND (uploader_whitelist IS NULL OR %(name)s = ANY (uploader_whitelist)) AND (uploader_whitelist IS NULL OR %(name)s = ANY (uploader_whitelist))
AND upload_location = ANY (%(upload_locations)s)
""").format( """).format(
sql.SQL(", ").join(sql.Identifier(key) for key in CUT_JOB_PARAMS) sql.SQL(", ").join(sql.Identifier(key) for key in CUT_JOB_PARAMS)
) )
result = query(self.conn, built_query, name=self.name) result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys())
return result.fetchall() return result.fetchall()
def check_candidate(self, candidate): def check_candidate(self, candidate):
@ -257,9 +260,9 @@ class Cutter(object):
at which point it will re-sync with DB as best it can. at which point it will re-sync with DB as best it can.
This situation almost certainly requires operator intervention. This situation almost certainly requires operator intervention.
""" """
# TODO handle multiple upload locations. Currently everything's hard-coded to youtube.
self.logger.info("Cutting and uploading job {}".format(format_job(job))) upload_backend = self.upload_locations[job.upload_location]
self.logger.info("Cutting and uploading job {} to {}".format(format_job(job), upload_backend))
cut = cut_segments(job.segments, job.video_start, job.video_end) cut = cut_segments(job.segments, job.video_start, job.video_end)
# This flag tracks whether we've told requests to finalize the upload, # This flag tracks whether we've told requests to finalize the upload,
@ -337,7 +340,7 @@ class Cutter(object):
# from requests.post() are not recoverable. # from requests.post() are not recoverable.
try: try:
video_id = self.youtube.upload_video( video_id = upload_backend.upload_video(
title=job.video_title, title=job.video_title,
description=job.video_description, description=job.video_description,
tags=[], # TODO tags=[], # TODO
@ -365,7 +368,7 @@ class Cutter(object):
).format(format_job(job))) ).format(format_job(job)))
error = ( error = (
"An error occurred during FINALIZING, please determine if video was actually " "An error occurred during FINALIZING, please determine if video was actually "
"uploaded or not and either move to TRANSCODING and populate video_id or rollback " "uploaded or not and either move to TRANSCODING/DONE and populate video_id or rollback "
"to EDITED and clear uploader. " "to EDITED and clear uploader. "
"Error: {}" "Error: {}"
).format(ex) ).format(ex)
@ -397,14 +400,15 @@ class Cutter(object):
gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL) gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL)
return return
# Success! Set TRANSCODING and clear any previous error. # Success! Set TRANSCODING or DONE and clear any previous error.
success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE'
link = "https://youtu.be/{}".format(video_id) link = "https://youtu.be/{}".format(video_id)
if not set_row(state='TRANSCODING', video_id=video_id, video_link=link, error=None): if not set_row(state=success_state, video_id=video_id, video_link=link, error=None):
# This will result in it being stuck in FINALIZING, and an operator will need to go # This will result in it being stuck in FINALIZING, and an operator will need to go
# confirm it was really uploaded. # confirm it was really uploaded.
raise JobConsistencyError( raise JobConsistencyError(
"No job with id {} and uploader {} when setting to TRANSCODING" "No job with id {} and uploader {} when setting to {}"
.format(job.id, self.name) .format(job.id, self.name, success_state)
) )
self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), link)) self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), link))
@ -432,7 +436,7 @@ class Cutter(object):
WHERE state = 'FINALIZING' AND uploader = %(name)s AND error IS NULL WHERE state = 'FINALIZING' AND uploader = %(name)s AND error IS NULL
""", name=self.name, error=( """, name=self.name, error=(
"Uploader died during FINALIZING, please determine if video was actually " "Uploader died during FINALIZING, please determine if video was actually "
"uploaded or not and either move to TRANSCODING and populate video_id or rollback " "uploaded or not and either move to TRANSCODING/DONE and populate video_id or rollback "
"to EDITED and clear uploader." "to EDITED and clear uploader."
)) ))
if result.rowcount > 0: if result.rowcount > 0:
@ -447,13 +451,14 @@ class TranscodeChecker(object):
FOUND_VIDEOS_RETRY_INTERVAL = 20 FOUND_VIDEOS_RETRY_INTERVAL = 20
ERROR_RETRY_INTERVAL = 20 ERROR_RETRY_INTERVAL = 20
def __init__(self, youtube, dbmanager, stop): def __init__(self, backend, dbmanager, stop):
""" """
youtube is an authenticated and initialized youtube api client. backend is an upload backend that supports transcoding
and defines check_status().
Conn is a database connection. Conn is a database connection.
Stop is an Event triggering graceful shutdown when set. Stop is an Event triggering graceful shutdown when set.
""" """
self.youtube = youtube self.backend = backend
self.dbmanager = dbmanager self.dbmanager = dbmanager
self.stop = stop self.stop = stop
self.logger = logging.getLogger(type(self).__name__) self.logger = logging.getLogger(type(self).__name__)
@ -495,10 +500,10 @@ class TranscodeChecker(object):
def check_ids(self, ids): def check_ids(self, ids):
# Future work: Set error in DB if video id is not present, # Future work: Set error in DB if video id is not present,
# and/or try to get more info from yt about what's wrong. # and/or try to get more info from yt about what's wrong.
statuses = self.youtube.get_video_status(ids.values()) done = self.backend.check_status(ids.values())
return { return {
id: video_id for id, video_id in ids.items() id: video_id for id, video_id in ids.items()
if statuses.get(video_id) == 'processed' if video_id in done
} }
def mark_done(self, ids): def mark_done(self, ids):
@ -510,12 +515,23 @@ class TranscodeChecker(object):
return result.rowcount return result.rowcount
def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=8003, backdoor_port=0): def main(dbconnect, config, creds_file, name=None, base_dir=".", metrics_port=8003, backdoor_port=0):
"""dbconnect should be a postgres connection string, which is either a space-separated """dbconnect should be a postgres connection string, which is either a space-separated
list of key=value pairs, or a URI like: list of key=value pairs, or a URI like:
postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE
youtube_creds_file should be a json file containing keys 'client_id', 'client_secret' and 'refresh_token'. config should be a json blob mapping upload location names to a config object
for that location. This config object should contain the keys:
type:
the name of the upload backend type
no_transcode_check:
bool. If true, won't check for when videos are done transcoding.
This is useful when multiple upload locations actually refer to the
same place just with different settings, and you only want one of them
to actually do the check.
along with any additional config options defined for that backend type.
creds_file should contain any required credentials for the upload backends, as JSON.
name defaults to hostname. name defaults to hostname.
""" """
@ -547,25 +563,42 @@ def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=80
logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay)) logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay))
stop.wait(delay) stop.wait(delay)
youtube_creds = json.load(open(youtube_creds_file)) with open(creds_file) as f:
youtube = Youtube( credentials = json.load(f)
client_id=youtube_creds['client_id'],
client_secret=youtube_creds['client_secret'], config = json.loads(config)
refresh_token=youtube_creds['refresh_token'], upload_locations = {}
) needs_transcode_check = []
cutter = Cutter(youtube, dbmanager, stop, name, base_dir) for location, backend_config in config.items():
transcode_checker = TranscodeChecker(youtube, dbmanager, stop) backend_type = backend_config.pop('type')
jobs = [ no_transcode_check = backend_config.pop('no_transcode_check', False)
gevent.spawn(cutter.run), if type == 'youtube':
gevent.spawn(transcode_checker.run), backend_type = Youtube
elif type == 'local':
backend_type = Local
else:
raise ValueError("Unknown upload backend type: {!r}".format(type))
backend = backend_type(credentials, **backend_config)
upload_locations[location] = backend
if backend.needs_transcode and not no_transcode_check:
needs_transcode_check.append(backend)
cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir)
transcode_checkers = [
TranscodeChecker(backend, dbmanager, stop)
for backend in needs_transcode_check
]
jobs = [gevent.spawn(cutter.run)] + [
gevent.spawn(transcode_checker.run)
for transcode_checker in transcode_checkers
] ]
# Block until either exits # Block until any one exits
gevent.wait(jobs, count=1) gevent.wait(jobs, count=1)
# Stop the other if it isn't stopping already # Stop the others if they aren't stopping already
stop.set() stop.set()
# Block until both have exited # Block until all have exited
gevent.wait(jobs) gevent.wait(jobs)
# Call get() for each to re-raise if either errored # Call get() for each one to re-raise if any errored
for job in jobs: for job in jobs:
job.get() job.get()

@ -0,0 +1,165 @@
import errno
import json
import logging
import os
import re
import uuid
from common.googleapis import GoogleAPIClient
class UploadBackend(object):
"""Represents a place a video can be uploaded,
and maintains any state needed to perform uploads.
Config args for the backend are passed into __init__ as kwargs,
along with credentials as the first arg.
Should have a method upload_video(title, description, tags, data).
Title, description and tags may have backend-specific meaning.
Tags is a list of string.
Data is an iterator of strings.
It should return (video_id, video_link).
If the video must undergo additional processing before it's available
(ie. it should go into the TRANSCODING state), then the backend should
define the 'needs_transcode' attribute as True.
If it does, it should also have a method check_status(ids) which takes a
list of video ids and returns a list of the ones who have finished processing.
The upload backend also determines the encoding settings for the cutting
process, this is given as a list of ffmpeg args
under the 'encoding_settings' attribute.
"""
needs_transcode = False
# reasonable default if settings don't otherwise matter
encoding_settings = [] # TODO
def upload_video(self, title, description, tags, data):
raise NotImplementedError
def check_status(self, ids):
raise NotImplementedError
class Youtube(UploadBackend):
"""Represents a youtube channel to upload to, and settings for doing so.
Config args besides credentials:
hidden:
If false, video is public. If true, video is unlisted. Default false.
"""
needs_transcode = True
encoding_settings = [] # TODO youtube's recommended settings
def __init__(self, credentials, hidden=False):
self.logger = logging.getLogger(type(self).__name__)
self.client = GoogleAPIClient(
credentials['client_id'],
credentials['client_secret'],
credentials['refresh_token'],
)
self.hidden = hidden
def upload_video(self, title, description, tags, data):
json = {
'snippet': {
'title': title,
'description': description,
'tags': tags,
},
}
if self.hidden:
json['status'] = {
'privacyStatus': 'unlisted',
}
resp = self.client.request('POST',
'https://www.googleapis.com/upload/youtube/v3/videos',
params={
'part': 'snippet,status' if self.hidden else 'snippet',
'uploadType': 'resumable',
},
json=json,
)
resp.raise_for_status()
upload_url = resp.headers['Location']
resp = self.client.request('POST', upload_url, data=data)
resp.raise_for_status()
id = resp.json()['id']
return id, 'https://youtu.be/{}'.format(id)
def check_status(self, ids):
output = []
# Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable.
for i in range(0, len(ids), 10):
group = ids[i:i+10]
resp = self.client.request('GET',
'https://www.googleapis.com/youtube/v3/videos',
params={
'part': 'id,status',
'id': ','.join(group),
},
)
resp.raise_for_status()
for item in resp.json()['items']:
if item['status']['uploadStatus'] == 'processed':
output.append(item['id'])
return output
class Local(UploadBackend):
"""An "upload" backend that just saves the file to local disk.
Needs no credentials. Config args:
path:
Where to save the file.
url_prefix:
The leading part of the URL to return.
The filename will be appended to this to form the full URL.
So for example, if you set "http://example.com/videos/",
then a returned video URL might look like:
"http://example.com/videos/my-example-video-1ffd816b-6496-45d4-b8f5-5eb06ee532f9.ts"
If not given, returns a file:// url with the full path.
write_info:
If true, writes a json file alongside the video file containing
the video title, description and tags.
This is intended primarily for testing purposes.
Saves files under their title, plus a random video id to avoid conflicts.
Ignores description and tags.
"""
def __init__(self, credentials, path, url_prefix=None, write_info=False):
self.path = path
self.url_prefix = url_prefix
self.write_info = write_info
# make path if it doesn't already exist
try:
os.makedirs(self.path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
# ignore already-exists errors
def upload_video(self, title, description, tags, data):
video_id = uuid.uuid4()
# make title safe by removing offending characters, replacing with '-'
title = re.sub('[^A-Za-z0-9_]', '-', title)
filename = '{}-{}.ts'.format(title, video_id) # TODO with re-encoding, this ext must change
filepath = os.path.join(self.path, filename)
if self.write_info:
with open(os.path.join(self.path, '{}-{}.json'.format(title, video_id))) as f:
f.write(json.dumps({
'title': title,
'description': description,
'tags': tags,
}) + '\n')
with open(filepath, 'w') as f:
for chunk in data:
f.write(chunk)
if self.url_prefix is not None:
url = self.url_prefix + filename
else:
url = 'file://{}'.format(filepath)
return video_id, url

@ -1,61 +0,0 @@
import logging
from common.googleapis import GoogleAPIClient
class Youtube(object):
"""Manages youtube API operations"""
def __init__(self, client_id, client_secret, refresh_token):
self.logger = logging.getLogger(type(self).__name__)
self.client = GoogleAPIClient(client_id, client_secret, refresh_token)
def upload_video(self, title, description, tags, data, hidden=False):
"""Data may be a string, file-like object or iterator. Returns id."""
json = {
'snippet': {
'title': title,
'description': description,
'tags': tags,
},
}
if hidden:
json['status'] = {
'privacyStatus': 'unlisted',
}
resp = self.client.request('POST',
'https://www.googleapis.com/upload/youtube/v3/videos',
params={
'part': 'snippet,status' if hidden else 'snippet',
'uploadType': 'resumable',
},
json=json,
)
resp.raise_for_status()
upload_url = resp.headers['Location']
resp = self.client.request('POST', upload_url, data=data)
resp.raise_for_status()
return resp.json()['id']
def get_video_status(self, ids):
"""For a list of video ids, returns a dict {id: upload status}.
A video is fully processed when upload status is 'processed'.
NOTE: Video ids may be missing from the result, this probably indicates
the video is errored.
"""
output = {}
# Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable.
for i in range(0, len(ids), 10):
group = ids[i:i+10]
resp = self.client.request('GET',
'https://www.googleapis.com/youtube/v3/videos',
params={
'part': 'id,status',
'id': ','.join(group),
},
)
resp.raise_for_status()
for item in resp.json()['items']:
output[item['id']] = item['status']['uploadStatus']
return output

@ -85,9 +85,20 @@
db_replication_password:: "standby", // don't use default in production. Must not contain ' or \ as these are not escaped. db_replication_password:: "standby", // don't use default in production. Must not contain ' or \ as these are not escaped.
db_standby:: false, // set to true to have this database replicate another server db_standby:: false, // set to true to have this database replicate another server
// Path to a JSON file containing google credentials as keys // Path to a JSON file containing google credentials for cutter as keys
// 'client_id', 'client_secret' and 'refresh_token'. // 'client_id', 'client_secret' and 'refresh_token'.
google_creds:: "./google_creds.json", cutter_creds_file:: "./google_creds.json",
// Config for cutter upload locations. See cutter docs for full detail.
cutter_config:: {
desertbus: {type: "youtube"},
unlisted: {type: "youtube", hidden: true, no_transcode_check: true},
},
// Path to a JSON file containing google credentials for sheetsync as keys
// 'client_id', 'client_secret' and 'refresh_token'.
// May be the same as cutter_creds_file.
sheetsync_creds_file:: "./google_creds.json",
// The URL to write to the sheet for edit links, with {} being replaced by the id // The URL to write to the sheet for edit links, with {} being replaced by the id
edit_url:: "http://thrimbletrimmer.codegunner.com/?id={}", edit_url:: "http://thrimbletrimmer.codegunner.com/?id={}",
@ -168,18 +179,20 @@
[if $.enabled.cutter then "cutter"]: { [if $.enabled.cutter then "cutter"]: {
image: "quay.io/ekimekim/wubloader-cutter:%s" % $.image_tag, image: "quay.io/ekimekim/wubloader-cutter:%s" % $.image_tag,
// Args for the cutter: DB and google creds // Args for the cutter: DB and creds
command: [ command: [
"--base-dir", "/mnt", "--base-dir", "/mnt",
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
$.db_connect, $.db_connect,
"/etc/wubloader-google-creds.json", std.manifestJson($.cutter_config),
"/etc/wubloader-creds.json",
], ],
volumes: [ volumes: [
// Mount the segments directory at /mnt // Mount the segments directory at /mnt
"%s:/mnt" % $.segments_path, "%s:/mnt" % $.segments_path,
] + [
// Mount the creds file into /etc // Mount the creds file into /etc
"%s:/etc/wubloader-google-creds.json" % $.google_creds, "%s:/etc/wubloader-creds.json" % $.cutter_creds_file,
], ],
// If the application crashes, restart it. // If the application crashes, restart it.
restart: "on-failure", restart: "on-failure",
@ -212,14 +225,14 @@
command: [ command: [
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
$.db_connect, $.db_connect,
"/etc/wubloader-google-creds.json", "/etc/wubloader-creds.json",
$.edit_url, $.edit_url,
$.bustime_start, $.bustime_start,
$.sheet_id, $.sheet_id,
] + $.worksheets, ] + $.worksheets,
volumes: [ volumes: [
// Mount the creds file into /etc // Mount the creds file into /etc
"%s:/etc/wubloader-google-creds.json" % $.google_creds, "%s:/etc/wubloader-creds.json" % $.sheetsync_creds_file,
], ],
// If the application crashes, restart it. // If the application crashes, restart it.
restart: "on-failure", restart: "on-failure",

Loading…
Cancel
Save