diff --git a/DATABASE.md b/DATABASE.md index ac109cd..e689b83 100644 --- a/DATABASE.md +++ b/DATABASE.md @@ -130,7 +130,7 @@ columns | type | role | `description` | `TEXT NOT NULL DEFAULT ''` | sheet input | Event description. Provides the default title and description for editors, and displayed on the public sheet. `submitter_winner` | `TEXT NOT NULL DEFAULT ''` | sheet input | A column detailing challenge submitter, auction winner, or other "associated person" data. This shouldn't be relied on in any processing but should be displayed on the public sheet. `poster_moment` | `BOOLEAN NOT NULL DEFAULT FALSE` | sheet input | Whether or not the event was featured on the poster. Used for building the postermap and also displayed on the public sheet. -`image_links` | `TEXT[] NOT NULL` | sheet input | Any additional gif or image links associated with the event. Displayed on the public sheet. +`image_links` | `TEXT[] NOT NULL` | sheet input | Any additional gif or image links associated with the event. Displayed on the public sheet. `notes` | `TEXT NOT NULL DEFAULT ''` | sheet input | Private notes on this event, used eg. to leave messages or special instructions. Displayed to the editor during editing, but otherwise unused. `allow_holes` | `BOOLEAN NOT NULL DEFAULT FALSE` | edit input | If false, any missing segments encountered while cutting will cause the cut to fail. Setting this to true should be done by an operator to indicate that holes are expected in this range. It is also the operator's responsibility to ensure that all allowed cutters have all segments that they can get, since there is no guarentee that only the cutter with the least missing segments will get the cut job. `uploader_whitelist` | `TEXT[]` | edit input | List of uploaders which are allowed to cut this entry, or NULL to indicate no restriction. This is useful if you are allowing holes and the amount of missing data differs between nodes (this shouldn't happen - this would mean replication is also failing), or if an operator is investigating a problem with a specific node. diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 8973501..36c12bf 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -18,7 +18,7 @@ import common from common.database import DBManager, query from common.segments import get_best_segments, cut_segments, ContainsHoles -from .youtube import Youtube +from .upload_backends import Youtube, Local videos_uploaded = prom.Counter( @@ -72,15 +72,15 @@ class Cutter(object): ERROR_RETRY_INTERVAL = 5 RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL = 5 - def __init__(self, youtube, dbmanager, stop, name, segments_path): - """youtube is an authenticated and initialized youtube api client. + def __init__(self, upload_locations, dbmanager, stop, name, segments_path): + """upload_locations is a map {location name: upload location backend} Conn is a database connection. Stop is an Event triggering graceful shutdown when set. Name is this uploader's unique name. Segments path is where to look for segments. """ self.name = name - self.youtube = youtube + self.upload_locations = upload_locations self.dbmanager = dbmanager self.stop = stop self.segments_path = segments_path @@ -186,15 +186,18 @@ class Cutter(object): def list_candidates(self): """Return a list of all available candidates that we might be able to cut.""" + # We only accept candidates if they haven't excluded us by whitelist, + # and we are capable of uploading to their desired upload location. built_query = sql.SQL(""" SELECT id, {} FROM events WHERE state = 'EDITED' AND (uploader_whitelist IS NULL OR %(name)s = ANY (uploader_whitelist)) + AND upload_location = ANY (%(upload_locations)s) """).format( sql.SQL(", ").join(sql.Identifier(key) for key in CUT_JOB_PARAMS) ) - result = query(self.conn, built_query, name=self.name) + result = query(self.conn, built_query, name=self.name, upload_locations=self.upload_locations.keys()) return result.fetchall() def check_candidate(self, candidate): @@ -257,9 +260,9 @@ class Cutter(object): at which point it will re-sync with DB as best it can. This situation almost certainly requires operator intervention. """ - # TODO handle multiple upload locations. Currently everything's hard-coded to youtube. - self.logger.info("Cutting and uploading job {}".format(format_job(job))) + upload_backend = self.upload_locations[job.upload_location] + self.logger.info("Cutting and uploading job {} to {}".format(format_job(job), upload_backend)) cut = cut_segments(job.segments, job.video_start, job.video_end) # This flag tracks whether we've told requests to finalize the upload, @@ -337,7 +340,7 @@ class Cutter(object): # from requests.post() are not recoverable. try: - video_id = self.youtube.upload_video( + video_id = upload_backend.upload_video( title=job.video_title, description=job.video_description, tags=[], # TODO @@ -365,7 +368,7 @@ class Cutter(object): ).format(format_job(job))) error = ( "An error occurred during FINALIZING, please determine if video was actually " - "uploaded or not and either move to TRANSCODING and populate video_id or rollback " + "uploaded or not and either move to TRANSCODING/DONE and populate video_id or rollback " "to EDITED and clear uploader. " "Error: {}" ).format(ex) @@ -397,14 +400,15 @@ class Cutter(object): gevent.sleep(self.RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL) return - # Success! Set TRANSCODING and clear any previous error. + # Success! Set TRANSCODING or DONE and clear any previous error. + success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE' link = "https://youtu.be/{}".format(video_id) - if not set_row(state='TRANSCODING', video_id=video_id, video_link=link, error=None): + if not set_row(state=success_state, video_id=video_id, video_link=link, error=None): # This will result in it being stuck in FINALIZING, and an operator will need to go # confirm it was really uploaded. raise JobConsistencyError( - "No job with id {} and uploader {} when setting to TRANSCODING" - .format(job.id, self.name) + "No job with id {} and uploader {} when setting to {}" + .format(job.id, self.name, success_state) ) self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), link)) @@ -432,7 +436,7 @@ class Cutter(object): WHERE state = 'FINALIZING' AND uploader = %(name)s AND error IS NULL """, name=self.name, error=( "Uploader died during FINALIZING, please determine if video was actually " - "uploaded or not and either move to TRANSCODING and populate video_id or rollback " + "uploaded or not and either move to TRANSCODING/DONE and populate video_id or rollback " "to EDITED and clear uploader." )) if result.rowcount > 0: @@ -447,13 +451,14 @@ class TranscodeChecker(object): FOUND_VIDEOS_RETRY_INTERVAL = 20 ERROR_RETRY_INTERVAL = 20 - def __init__(self, youtube, dbmanager, stop): + def __init__(self, backend, dbmanager, stop): """ - youtube is an authenticated and initialized youtube api client. + backend is an upload backend that supports transcoding + and defines check_status(). Conn is a database connection. Stop is an Event triggering graceful shutdown when set. """ - self.youtube = youtube + self.backend = backend self.dbmanager = dbmanager self.stop = stop self.logger = logging.getLogger(type(self).__name__) @@ -495,10 +500,10 @@ class TranscodeChecker(object): def check_ids(self, ids): # Future work: Set error in DB if video id is not present, # and/or try to get more info from yt about what's wrong. - statuses = self.youtube.get_video_status(ids.values()) + done = self.backend.check_status(ids.values()) return { id: video_id for id, video_id in ids.items() - if statuses.get(video_id) == 'processed' + if video_id in done } def mark_done(self, ids): @@ -510,12 +515,23 @@ class TranscodeChecker(object): return result.rowcount -def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=8003, backdoor_port=0): +def main(dbconnect, config, creds_file, name=None, base_dir=".", metrics_port=8003, backdoor_port=0): """dbconnect should be a postgres connection string, which is either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE - youtube_creds_file should be a json file containing keys 'client_id', 'client_secret' and 'refresh_token'. + config should be a json blob mapping upload location names to a config object + for that location. This config object should contain the keys: + type: + the name of the upload backend type + no_transcode_check: + bool. If true, won't check for when videos are done transcoding. + This is useful when multiple upload locations actually refer to the + same place just with different settings, and you only want one of them + to actually do the check. + along with any additional config options defined for that backend type. + + creds_file should contain any required credentials for the upload backends, as JSON. name defaults to hostname. """ @@ -547,25 +563,42 @@ def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=80 logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay)) stop.wait(delay) - youtube_creds = json.load(open(youtube_creds_file)) - youtube = Youtube( - client_id=youtube_creds['client_id'], - client_secret=youtube_creds['client_secret'], - refresh_token=youtube_creds['refresh_token'], - ) - cutter = Cutter(youtube, dbmanager, stop, name, base_dir) - transcode_checker = TranscodeChecker(youtube, dbmanager, stop) - jobs = [ - gevent.spawn(cutter.run), - gevent.spawn(transcode_checker.run), + with open(creds_file) as f: + credentials = json.load(f) + + config = json.loads(config) + upload_locations = {} + needs_transcode_check = [] + for location, backend_config in config.items(): + backend_type = backend_config.pop('type') + no_transcode_check = backend_config.pop('no_transcode_check', False) + if type == 'youtube': + backend_type = Youtube + elif type == 'local': + backend_type = Local + else: + raise ValueError("Unknown upload backend type: {!r}".format(type)) + backend = backend_type(credentials, **backend_config) + upload_locations[location] = backend + if backend.needs_transcode and not no_transcode_check: + needs_transcode_check.append(backend) + + cutter = Cutter(upload_locations, dbmanager, stop, name, base_dir) + transcode_checkers = [ + TranscodeChecker(backend, dbmanager, stop) + for backend in needs_transcode_check + ] + jobs = [gevent.spawn(cutter.run)] + [ + gevent.spawn(transcode_checker.run) + for transcode_checker in transcode_checkers ] - # Block until either exits + # Block until any one exits gevent.wait(jobs, count=1) - # Stop the other if it isn't stopping already + # Stop the others if they aren't stopping already stop.set() - # Block until both have exited + # Block until all have exited gevent.wait(jobs) - # Call get() for each to re-raise if either errored + # Call get() for each one to re-raise if any errored for job in jobs: job.get() diff --git a/cutter/cutter/upload_backends.py b/cutter/cutter/upload_backends.py new file mode 100644 index 0000000..8c11b6c --- /dev/null +++ b/cutter/cutter/upload_backends.py @@ -0,0 +1,165 @@ + +import errno +import json +import logging +import os +import re +import uuid + +from common.googleapis import GoogleAPIClient + + +class UploadBackend(object): + """Represents a place a video can be uploaded, + and maintains any state needed to perform uploads. + + Config args for the backend are passed into __init__ as kwargs, + along with credentials as the first arg. + + Should have a method upload_video(title, description, tags, data). + Title, description and tags may have backend-specific meaning. + Tags is a list of string. + Data is an iterator of strings. + It should return (video_id, video_link). + + If the video must undergo additional processing before it's available + (ie. it should go into the TRANSCODING state), then the backend should + define the 'needs_transcode' attribute as True. + If it does, it should also have a method check_status(ids) which takes a + list of video ids and returns a list of the ones who have finished processing. + + The upload backend also determines the encoding settings for the cutting + process, this is given as a list of ffmpeg args + under the 'encoding_settings' attribute. + """ + + needs_transcode = False + + # reasonable default if settings don't otherwise matter + encoding_settings = [] # TODO + + def upload_video(self, title, description, tags, data): + raise NotImplementedError + + def check_status(self, ids): + raise NotImplementedError + + +class Youtube(UploadBackend): + """Represents a youtube channel to upload to, and settings for doing so. + Config args besides credentials: + hidden: + If false, video is public. If true, video is unlisted. Default false. + """ + + needs_transcode = True + encoding_settings = [] # TODO youtube's recommended settings + + def __init__(self, credentials, hidden=False): + self.logger = logging.getLogger(type(self).__name__) + self.client = GoogleAPIClient( + credentials['client_id'], + credentials['client_secret'], + credentials['refresh_token'], + ) + self.hidden = hidden + + def upload_video(self, title, description, tags, data): + json = { + 'snippet': { + 'title': title, + 'description': description, + 'tags': tags, + }, + } + if self.hidden: + json['status'] = { + 'privacyStatus': 'unlisted', + } + resp = self.client.request('POST', + 'https://www.googleapis.com/upload/youtube/v3/videos', + params={ + 'part': 'snippet,status' if self.hidden else 'snippet', + 'uploadType': 'resumable', + }, + json=json, + ) + resp.raise_for_status() + upload_url = resp.headers['Location'] + resp = self.client.request('POST', upload_url, data=data) + resp.raise_for_status() + id = resp.json()['id'] + return id, 'https://youtu.be/{}'.format(id) + + def check_status(self, ids): + output = [] + # Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable. + for i in range(0, len(ids), 10): + group = ids[i:i+10] + resp = self.client.request('GET', + 'https://www.googleapis.com/youtube/v3/videos', + params={ + 'part': 'id,status', + 'id': ','.join(group), + }, + ) + resp.raise_for_status() + for item in resp.json()['items']: + if item['status']['uploadStatus'] == 'processed': + output.append(item['id']) + return output + + +class Local(UploadBackend): + """An "upload" backend that just saves the file to local disk. + Needs no credentials. Config args: + path: + Where to save the file. + url_prefix: + The leading part of the URL to return. + The filename will be appended to this to form the full URL. + So for example, if you set "http://example.com/videos/", + then a returned video URL might look like: + "http://example.com/videos/my-example-video-1ffd816b-6496-45d4-b8f5-5eb06ee532f9.ts" + If not given, returns a file:// url with the full path. + write_info: + If true, writes a json file alongside the video file containing + the video title, description and tags. + This is intended primarily for testing purposes. + Saves files under their title, plus a random video id to avoid conflicts. + Ignores description and tags. + """ + + def __init__(self, credentials, path, url_prefix=None, write_info=False): + self.path = path + self.url_prefix = url_prefix + self.write_info = write_info + # make path if it doesn't already exist + try: + os.makedirs(self.path) + except OSError as e: + if e.errno != errno.EEXIST: + raise + # ignore already-exists errors + + def upload_video(self, title, description, tags, data): + video_id = uuid.uuid4() + # make title safe by removing offending characters, replacing with '-' + title = re.sub('[^A-Za-z0-9_]', '-', title) + filename = '{}-{}.ts'.format(title, video_id) # TODO with re-encoding, this ext must change + filepath = os.path.join(self.path, filename) + if self.write_info: + with open(os.path.join(self.path, '{}-{}.json'.format(title, video_id))) as f: + f.write(json.dumps({ + 'title': title, + 'description': description, + 'tags': tags, + }) + '\n') + with open(filepath, 'w') as f: + for chunk in data: + f.write(chunk) + if self.url_prefix is not None: + url = self.url_prefix + filename + else: + url = 'file://{}'.format(filepath) + return video_id, url diff --git a/cutter/cutter/youtube.py b/cutter/cutter/youtube.py deleted file mode 100644 index 0bbe336..0000000 --- a/cutter/cutter/youtube.py +++ /dev/null @@ -1,61 +0,0 @@ - -import logging - -from common.googleapis import GoogleAPIClient - - -class Youtube(object): - """Manages youtube API operations""" - - def __init__(self, client_id, client_secret, refresh_token): - self.logger = logging.getLogger(type(self).__name__) - self.client = GoogleAPIClient(client_id, client_secret, refresh_token) - - def upload_video(self, title, description, tags, data, hidden=False): - """Data may be a string, file-like object or iterator. Returns id.""" - json = { - 'snippet': { - 'title': title, - 'description': description, - 'tags': tags, - }, - } - if hidden: - json['status'] = { - 'privacyStatus': 'unlisted', - } - resp = self.client.request('POST', - 'https://www.googleapis.com/upload/youtube/v3/videos', - params={ - 'part': 'snippet,status' if hidden else 'snippet', - 'uploadType': 'resumable', - }, - json=json, - ) - resp.raise_for_status() - upload_url = resp.headers['Location'] - resp = self.client.request('POST', upload_url, data=data) - resp.raise_for_status() - return resp.json()['id'] - - def get_video_status(self, ids): - """For a list of video ids, returns a dict {id: upload status}. - A video is fully processed when upload status is 'processed'. - NOTE: Video ids may be missing from the result, this probably indicates - the video is errored. - """ - output = {} - # Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable. - for i in range(0, len(ids), 10): - group = ids[i:i+10] - resp = self.client.request('GET', - 'https://www.googleapis.com/youtube/v3/videos', - params={ - 'part': 'id,status', - 'id': ','.join(group), - }, - ) - resp.raise_for_status() - for item in resp.json()['items']: - output[item['id']] = item['status']['uploadStatus'] - return output diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index d671177..c5a96b7 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -85,9 +85,20 @@ db_replication_password:: "standby", // don't use default in production. Must not contain ' or \ as these are not escaped. db_standby:: false, // set to true to have this database replicate another server - // Path to a JSON file containing google credentials as keys + // Path to a JSON file containing google credentials for cutter as keys // 'client_id', 'client_secret' and 'refresh_token'. - google_creds:: "./google_creds.json", + cutter_creds_file:: "./google_creds.json", + + // Config for cutter upload locations. See cutter docs for full detail. + cutter_config:: { + desertbus: {type: "youtube"}, + unlisted: {type: "youtube", hidden: true, no_transcode_check: true}, + }, + + // Path to a JSON file containing google credentials for sheetsync as keys + // 'client_id', 'client_secret' and 'refresh_token'. + // May be the same as cutter_creds_file. + sheetsync_creds_file:: "./google_creds.json", // The URL to write to the sheet for edit links, with {} being replaced by the id edit_url:: "http://thrimbletrimmer.codegunner.com/?id={}", @@ -168,18 +179,20 @@ [if $.enabled.cutter then "cutter"]: { image: "quay.io/ekimekim/wubloader-cutter:%s" % $.image_tag, - // Args for the cutter: DB and google creds + // Args for the cutter: DB and creds command: [ "--base-dir", "/mnt", "--backdoor-port", std.toString($.backdoor_port), $.db_connect, - "/etc/wubloader-google-creds.json", + std.manifestJson($.cutter_config), + "/etc/wubloader-creds.json", ], volumes: [ // Mount the segments directory at /mnt "%s:/mnt" % $.segments_path, + ] + [ // Mount the creds file into /etc - "%s:/etc/wubloader-google-creds.json" % $.google_creds, + "%s:/etc/wubloader-creds.json" % $.cutter_creds_file, ], // If the application crashes, restart it. restart: "on-failure", @@ -212,14 +225,14 @@ command: [ "--backdoor-port", std.toString($.backdoor_port), $.db_connect, - "/etc/wubloader-google-creds.json", + "/etc/wubloader-creds.json", $.edit_url, $.bustime_start, $.sheet_id, ] + $.worksheets, volumes: [ // Mount the creds file into /etc - "%s:/etc/wubloader-google-creds.json" % $.google_creds, + "%s:/etc/wubloader-creds.json" % $.sheetsync_creds_file, ], // If the application crashes, restart it. restart: "on-failure",