Merge branch 'master' into chrusher-database-replication

pull/73/head
Christopher Usher 5 years ago committed by GitHub
commit fec8cff185
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -145,3 +145,6 @@ columns | type | role |
`error` | `TEXT` | state | A human-readable error message, set if a non-retryable error occurs. Its presence indicates operator intervention is required. Cleared on a re-edit if set.
`video_id` | `TEXT` | state | An id that can be used to refer to the video to check if transcoding is complete. Often the video_link can be generated from this, but not nessecarily.
`video_link` | `TEXT` | output | A link to the uploaded video. Only set when state is `TRANSCODING` or `DONE`.
`editor` | `TEXT` | state | Email address of the last editor; corresponds to an entry in the `editors` table. Only set when state is not `UNEDITED`.
`edit_time` | `TIMESTAMP` | state | Time of the last edit. Only set when state is not `UNEDITED`.
`upload_time` | `TIMESTAMP` | state | Time when video state is set to `DONE`. Only set when state is `DONE`.

@ -54,7 +54,11 @@ CREATE TABLE IF NOT EXISTS events (
uploader TEXT CHECK (state IN ('UNEDITED', 'EDITED', 'DONE') OR uploader IS NOT NULL),
error TEXT,
video_id TEXT,
video_link TEXT CHECK (state != 'DONE' OR video_link IS NOT NULL)
video_link TEXT CHECK (state != 'DONE' OR video_link IS NOT NULL),
editor TEXT,
edit_time TIMESTAMP CHECK (state = 'UNEDITED' OR editor IS NOT NULL),
upload_time TIMESTAMP CHECK (state != 'DONE' OR upload_time IS NOT NULL)
);
-- Index on state, since that's almost always what we're querying on besides id
@ -66,6 +70,11 @@ CREATE TABLE IF NOT EXISTS nodes (
backfill_from BOOLEAN NOT NULL DEFAULT TRUE
);
CREATE TABLE IF NOT EXISTS editors (
email TEXT PRIMARY KEY,
name TEXT NOT NULL
);
"""

@ -1,4 +1,5 @@
import datetime
import json
import logging
import os
@ -461,9 +462,9 @@ class TranscodeChecker(object):
def mark_done(self, ids):
result = query(self.conn, """
UPDATE events
SET state = 'DONE'
SET state = 'DONE', upload_time = %s
WHERE id = ANY (%s::uuid[]) AND state = 'TRANSCODING'
""", ids.keys())
""", datetime.datetime.utcnow(), ids.keys())
return result.rowcount

@ -61,6 +61,8 @@
peers:: [
],
authentication:: true, // set to false to disable auth in thrimshim
// Connection args for the database.
// If database is defined in this config, host and port should be postgres:5432.
db_args:: {
@ -186,7 +188,7 @@
command: [
"--backdoor-port", std.toString($.backdoor_port),
$.db_connect,
],
] + if $.authentication then [] else ["--no-authentication"],
// Mount the segments directory at /mnt
volumes: ["%s:/mnt" % $.segments_path],
// If the application crashes, restart it.

@ -8,8 +8,10 @@ setup(
"argh",
"flask",
"gevent",
"google-auth",
"psycogreen",
"psycopg2",
"requests",
"wubloader-common",
],
)

@ -1,4 +1,5 @@
import datetime
from functools import wraps
import json
import logging
import signal
@ -17,6 +18,9 @@ import common
from common import database
from common.flask_stats import request_stats, after_request
import google.oauth2.id_token
import google.auth.transport.requests
psycopg2.extras.register_uuid()
app = flask.Flask('thrimshim')
app.after_request(after_request)
@ -38,6 +42,50 @@ def cors(app):
return handle
def authenticate(f):
"""Authenticate a token against the database.
Reference: https://developers.google.com/identity/sign-in/web/backend-auth"""
@wraps(f)
def auth_wrapper(*args, **kwargs):
if app.no_authentication:
return f(*args, editor='NOT_AUTH', **kwargs)
try:
userToken = flask.request.json['token']
except (KeyError, TypeError):
return 'User token required', 401
# check whether token is valid
try:
idinfo = google.oauth2.id_token.verify_oauth2_token(userToken, google.auth.transport.requests.Request(), None)
if idinfo['iss'] not in ['accounts.google.com', 'https://accounts.google.com']:
raise ValueError('Wrong issuer.')
except ValueError:
return 'Invalid token. Access denied.', 403
# check whether user is in the database
email = idinfo['email'].lower()
conn = app.db_manager.get_conn()
results = database.query(conn, """
SELECT email
FROM editors
WHERE lower(email) = %s""", email)
row = results.fetchone()
if row is None:
return 'Unknown user. Access denied.', 403
return f(*args, editor=email, **kwargs)
return auth_wrapper
@app.route('/thrimshim/auth-test', methods=['POST'])
@request_stats
@authenticate
def test(editor=None):
return json.dumps(editor)
@app.route('/metrics')
@request_stats
def metrics():
@ -66,16 +114,9 @@ def get_all_rows():
logging.info('All rows fetched')
return json.dumps(rows)
@app.route('/thrimshim/<uuid:ident>', methods=['GET', 'POST'])
@request_stats
def thrimshim(ident):
"""Comunicate between Thrimbletrimmer and the Wubloader database."""
if flask.request.method == 'POST':
row = flask.request.json
return update_row(ident, row)
else:
return get_row(ident)
@app.route('/thrimshim/<uuid:ident>', methods=['GET'])
@request_stats
def get_row(ident):
"""Gets the row from the database with id == ident."""
conn = app.db_manager.get_conn()
@ -99,7 +140,11 @@ def get_row(ident):
logging.info('Row {} fetched'.format(ident))
return json.dumps(response)
def update_row(ident, new_row):
@app.route('/thrimshim/<uuid:ident>', methods=['POST'])
@request_stats
@authenticate
def update_row(ident, editor=None):
new_row = flask.request.json
"""Updates row of database with id = ident with the edit columns in
new_row."""
@ -153,6 +198,8 @@ def update_row(ident, new_row):
return 'Invalid state {}'.format(new_row['state']), 400
new_row['uploader'] = None
new_row['error'] = None
new_row['editor'] = editor
new_row['edit_time'] = datetime.datetime.utcnow()
# actually update database
build_query = sql.SQL("""
@ -174,10 +221,11 @@ def update_row(ident, new_row):
@app.route('/thrimshim/manual-link/<uuid:ident>', methods=['POST'])
@request_stats
def manual_link(ident):
@authenticate
def manual_link(ident, editor=None):
"""Manually set a video_link if the state is 'UNEDITED' or 'DONE' and the
upload_location is 'manual'."""
link = flask.request.json
link = flask.request.json['link']
conn = app.db_manager.get_conn()
results = database.query(conn, """
SELECT id, state, upload_location
@ -188,25 +236,27 @@ def manual_link(ident):
return 'Row {} not found'.format(ident), 404
if old_row.state != 'UNEDITED' and not (old_row.state == 'DONE' and old_row.upload_location == 'manual'):
return 'Invalid state {} for manual video link'.format(old_row.state), 403
now = datetime.datetime.utcnow()
results = database.query(conn, """
UPDATE events
SET state='DONE', upload_location = 'manual', video_link = %s
SET state='DONE', upload_location = 'manual', video_link = %s,
editor = %s, edit_time = %s, upload_time = %s
WHERE id = %s AND (state = 'UNEDITED' OR (state = 'DONE' AND
upload_location = 'manual'))""", link, ident)
upload_location = 'manual'))""", link, editor, now, now, ident)
logging.info("Row {} video_link set to {}".format(ident, link))
return ''
@app.route('/thrimshim/reset/<uuid:ident>', methods=['POST'])
@request_stats
def reset_row(ident):
@authenticate
def reset_row(ident, editor=None):
"""Clear state and video_link columns and reset state to 'UNEDITED'."""
conn = app.db_manager.get_conn()
results = database.query(conn, """
UPDATE events
SET state='UNEDITED', error = NULL, video_id = NULL, video_link = NULL,
uploader = NULL
uploader = NULL, editor = NULL, edit_time = NULL, upload_time = NULL
WHERE id = %s""", ident)
if results.rowcount != 1:
return 'Row id = {} not found'.format(ident), 404
@ -218,10 +268,14 @@ def reset_row(ident):
@argh.arg('--port', help='Port server will listen on. Default is 8004.')
@argh.arg('connection-string', help='Postgres connection string, which is either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE')
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0):
@argh.arg('--no-authentication', help='Do not authenticate')
def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0,
no_authentication=False):
"""Thrimshim service."""
server = WSGIServer((host, port), cors(app))
app.no_authentication = no_authentication
stopping = gevent.event.Event()
def stop():
logging.info("Shutting down")
@ -232,7 +286,6 @@ def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0):
# and when not
else:
sys.exit()
gevent.signal(signal.SIGTERM, stop)
app.db_manager = None
@ -250,6 +303,8 @@ def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0):
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
logging.info("Starting up")
logging.info('Starting up')
if app.no_authentication:
logging.warning('Not authenticating POST requests')
server.serve_forever()
logging.info("Gracefully shut down")

Loading…
Cancel
Save