Merging Thrimbletrimmer into Wubloader with latest changes.

pull/75/head
mg 5 years ago
commit 6edd224395

@ -32,9 +32,9 @@ Alternatively if you have `git` installed you can clone the git repository:
You can edit the `docker-compose.jsonnet` file to set the configuration options. Important options include: You can edit the `docker-compose.jsonnet` file to set the configuration options. Important options include:
* `channel`, the Twitch channel to capture from * `channel`, the Twitch channel to capture from.
* `segments_path`, the local path to save segments to * `segments_path`, the local path to save segments to.
* `db_args`, the arguments for connecting to the wubloader database * `db_args`, the arguments for connecting to the wubloader database. You will likely need to update the `user`, `password` and `host` to match the database node that you are connecting to.
* `ports`, the ports to expose each service on. Only the `nginx` port (default on port 80) needs to be externally accessible for a non-database node as all the other services are routed through `nginx`. * `ports`, the ports to expose each service on. Only the `nginx` port (default on port 80) needs to be externally accessible for a non-database node as all the other services are routed through `nginx`.
To generate the `docker-compose.yml` file used by `docker-compose`, run `generate-docker-compose` To generate the `docker-compose.yml` file used by `docker-compose`, run `generate-docker-compose`
@ -43,7 +43,7 @@ To generate the `docker-compose.yml` file used by `docker-compose`, run `generat
After making any changes to `docker-compose.jsonnet`, you will need to rerun `generate-docker-compose`. After making any changes to `docker-compose.jsonnet`, you will need to rerun `generate-docker-compose`.
By default the `downloader`, `restreamer`, `backfiller`, `cutter`, `thrimshim` and `nginx` services of the wubloader will be run. To change which services are run edit the `enabled` object in `docker-compose.jsonnet`. A complete wubloader set up also requires one and only one `database` service (though having a backup database is a good idea) and one and only one `sheetsync` service. TODO: explain how to setup database By default the `downloader`, `restreamer`, `backfiller`, `cutter`, `thrimshim` and `nginx` services of the wubloader will be run. To change which services are run edit the `enabled` object in `docker-compose.jsonnet`. A complete wubloader set up also requires one and only one `database` service (though having a backup database is a good idea) and one and only one `sheetsync` service.
## Running the wubloader ## Running the wubloader
@ -55,4 +55,47 @@ To stop the wubloader and clean up, simply run
`docker-compose down` `docker-compose down`
To backfill from a node, the other nodes need to know about it. The best way to do this is to add the node to the database's nodes table. ## Database setup
When setting up a database node, a number of database specific options can be set.
* `database_path`, the local path to save the database to. If this directory is empty then the database setups scripts will be run to create a new database. Otherwise, the database container will load the database stored in this folder.
* `db_args.user`, `db_args.password`, the username and password for the database user that the rest of the wubloader will connect to.
* `db_super_user`, `super_password`, the username and password for the database superuser that is only accessible from the local machine.
* `db_replication_user`, `db_replication_password`, the username and password for the database user other nodes can connect as to replicate the database. If `db_replication_user` is an empty string, remote replication will be disabled.
* `db_standby`, If true this database node will replicate the database node given by `db_args.host`.
It is recommended that the passwords be changed from the defaults in production.
A database node needs to expose its database on a port. By default this is `5432` but the port exposed to the outside can be changed in the `ports` object.
The `events` table will be automatically populated by the `sheetsync`. If creating a new database, the startup script will attempt to populate the `nodes` and `editors` tables from the `nodes.csv` and `editors.csv` files in `segments_path` directory. The expected format for these files is:
```
nodes.csv
name,url,backfill_from
example,http://example.com,TRUE
```
```
editors.csv
name,email
example,example@gmail.com
```
Alternatively, nodes can be added manually to the database's `nodes` table:
`wubloader=> INSERT INTO nodes (name, url) VALUES ('example_name', 'http://example.com');`
and editors to the database's `editors` table:
`wubloader=> INSERT INTO editors (name, email) VALUES ('example', 'example@gmail.com');`
### Promoting the standby server
To promote the standby server to primary touch the trigger file in the docker container:
`docker exec wubloader_postgres_1 touch /tmp/touch_to_promote_to_master`
Be careful to prevent the original primary from restarting as another primary.

@ -8,7 +8,7 @@ set -eu
# Pass PUSH=true to also push the resulting images, or PUSH=latest to push them as :latest tag # Pass PUSH=true to also push the resulting images, or PUSH=latest to push them as :latest tag
# The different images we can build # The different images we can build
COMPONENTS=(downloader restreamer backfiller thrimshim cutter sheetsync nginx) COMPONENTS=(downloader restreamer backfiller thrimshim cutter sheetsync nginx postgres)
# Define push if not already defined # Define push if not already defined
PUSH=${PUSH:-} PUSH=${PUSH:-}

@ -13,90 +13,24 @@ import psycopg2.extras
from psycogreen.gevent import patch_psycopg from psycogreen.gevent import patch_psycopg
# Schema is applied on startup and should be idemponent,
# and include any migrations potentially needed.
SCHEMA = """
-- Create type if it doesn't already exist
DO $$ BEGIN
CREATE TYPE event_state as ENUM (
'UNEDITED',
'EDITED',
'CLAIMED',
'FINALIZING',
'TRANSCODING',
'DONE'
);
EXCEPTION WHEN duplicate_object THEN
NULL;
END $$;
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY,
event_start TIMESTAMP,
event_end TIMESTAMP,
category TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
submitter_winner TEXT NOT NULL DEFAULT '',
poster_moment BOOLEAN NOT NULL DEFAULT FALSE,
image_links TEXT[] NOT NULL DEFAULT '{}', -- default empty array
notes TEXT NOT NULL DEFAULT '',
allow_holes BOOLEAN NOT NULL DEFAULT FALSE,
uploader_whitelist TEXT[],
upload_location TEXT CHECK (state = 'UNEDITED' OR upload_location IS NOT NULL),
video_start TIMESTAMP CHECK (state IN ('UNEDITED', 'DONE') OR video_start IS NOT NULL),
video_end TIMESTAMP CHECK (state IN ('UNEDITED', 'DONE') OR video_end IS NOT NULL),
video_title TEXT CHECK (state IN ('UNEDITED', 'DONE') OR video_title IS NOT NULL),
video_description TEXT CHECK (state IN ('UNEDITED', 'DONE') OR video_description IS NOT NULL),
video_channel TEXT CHECK (state IN ('UNEDITED', 'DONE') OR video_channel IS NOT NULL),
video_quality TEXT NOT NULL DEFAULT 'source',
state event_state NOT NULL DEFAULT 'UNEDITED',
uploader TEXT CHECK (state IN ('UNEDITED', 'EDITED', 'DONE') OR uploader IS NOT NULL),
error TEXT,
video_id TEXT,
video_link TEXT CHECK (state != 'DONE' OR video_link IS NOT NULL),
editor TEXT CHECK (state = 'UNEDITED' OR editor IS NOT NULL),
edit_time TIMESTAMP CHECK (state = 'UNEDITED' OR editor IS NOT NULL),
upload_time TIMESTAMP CHECK (state != 'DONE' OR upload_time IS NOT NULL)
);
-- Index on state, since that's almost always what we're querying on besides id
CREATE INDEX IF NOT EXISTS event_state ON events (state);
CREATE TABLE IF NOT EXISTS nodes (
name TEXT PRIMARY KEY,
url TEXT NOT NULL,
backfill_from BOOLEAN NOT NULL DEFAULT TRUE
);
CREATE TABLE IF NOT EXISTS editors (
email TEXT PRIMARY KEY,
name TEXT NOT NULL
);
"""
class DBManager(object): class DBManager(object):
"""Patches psycopg2 before any connections are created, and applies the schema. """Patches psycopg2 before any connections are created. Stores connect info
Stores connect info for easy creation of new connections, and sets some defaults before for easy creation of new connections, and sets some defaults before
returning them. returning them.
It has the ability to serve as a primitive connection pool, as getting a new conn will It has the ability to serve as a primitive connection pool, as getting a
return existing conns it knows about first, but this mainly just exists to re-use new conn will return existing conns it knows about first, but this mainly
the initial conn used to apply the schema, and you should use a real conn pool for just exists to re-use the initial conn used to test the connection, and you
any non-trivial use. should use a real conn pool for any non-trivial use.
Returned conns are set to seralizable isolation level, autocommit, and use NamedTupleCursor cursors. Returned conns are set to seralizable isolation level, autocommit, and use
""" NamedTupleCursor cursors."""
def __init__(self, **connect_kwargs): def __init__(self, **connect_kwargs):
patch_psycopg() patch_psycopg()
self.conns = [] self.conns = []
self.connect_kwargs = connect_kwargs self.connect_kwargs = connect_kwargs
# get a connection to test whether connection is working.
conn = self.get_conn() conn = self.get_conn()
with transaction(conn):
query(conn, SCHEMA)
self.put_conn(conn) self.put_conn(conn)
def put_conn(self, conn): def put_conn(self, conn):

@ -495,7 +495,16 @@ def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=80
# We have two independent jobs to do - to perform cut jobs (cutter), # We have two independent jobs to do - to perform cut jobs (cutter),
# and to check the status of transcoding videos to see if they're done (transcode checker). # and to check the status of transcoding videos to see if they're done (transcode checker).
# We want to error if either errors, and shut down if either exits. # We want to error if either errors, and shut down if either exits.
dbmanager = DBManager(dsn=dbconnect) dbmanager = None
stopping = gevent.event.Event()
while dbmanager is None:
try:
dbmanager = DBManager(dsn=dbconnect)
except Exception:
delay = common.jitter(10)
logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay))
stop.wait(delay)
youtube_creds = json.load(open(youtube_creds_file)) youtube_creds = json.load(open(youtube_creds_file))
youtube = Youtube( youtube = Youtube(
client_id=youtube_creds['client_id'], client_id=youtube_creds['client_id'],

@ -0,0 +1,16 @@
#! /bin/bash
NAME=${1:-postgres}
BUCKET=${2:-wubloader-db}
CONTAINER=$(docker ps --format "{{.Names}}" | grep "$NAME")
if [ -z "$CONTAINER" ]; then
echo "Container not found"
exit 1
fi
if [ "$(wc -l <<<"$CONTAINER")" -ne 1 ]; then
echo "Multiple containers found"
exit 1
fi
FILE="wubloader-$(date +%Y-%m-%dT%H:%M:%S).sql"
echo "Dumping $CONTAINER to $FILE"
docker exec $CONTAINER pg_dump wubloader -U postgres | aws s3 cp - "s3://$BUCKET/$FILE"

@ -0,0 +1,28 @@
#! /bin/bash
if [ -z "$1" ]; then
echo "No SQL script file"
echo "USAGE: $0 SQL_SCRIPT"
exit 1
fi
NAME=${2:-postgres}
CONTAINER=$(docker ps --format "{{.Names}}" | grep "$NAME")
if [ -z "$CONTAINER" ]; then
echo "Container not found"
exit 1
fi
if [ "$(wc -l <<<"$CONTAINER")" -ne 1 ]; then
echo "Multiple containers found"
exit 1
fi
docker cp "./$1" "$CONTAINER:/tmp.sql"
# this will fail if there are active sessions by users other than 'postgres'
# make sure all wubloader components are disconnected
docker exec "$CONTAINER" dropdb wubloader -U postgres --if-exists
docker exec "$CONTAINER" createdb wubloader -U postgres
# this assumes that the vst role is in the postgres database
docker exec "$CONTAINER" psql -d wubloader -f tmp.sql -U postgres
docker exec "$CONTAINER" rm tmp.sql

@ -7,7 +7,7 @@
// Change these to configure the services. // Change these to configure the services.
// Image tag (application version) to use. // Image tag (application version) to use.
// Note: "latest" is not reccomended in production, as you can't be sure what version // Note: "latest" is not recommended in production, as you can't be sure what version
// you're actually running, and must manually re-pull to get an updated copy. // you're actually running, and must manually re-pull to get an updated copy.
image_tag:: "latest", image_tag:: "latest",
@ -25,7 +25,7 @@
}, },
// Twitch channel to capture // Twitch channel to capture
channel:: "rpglimitbreak", channel:: "desertbus",
// Stream qualities to capture // Stream qualities to capture
qualities:: ["source", "480p"], qualities:: ["source", "480p"],
@ -34,7 +34,11 @@
// On OSX you need to change this to /private/var/lib/wubloader // On OSX you need to change this to /private/var/lib/wubloader
segments_path:: "/var/lib/wubloader/", segments_path:: "/var/lib/wubloader/",
// Local path to save database to. Full path must already exist. Cannot contain ':'. // Local path to save database to. Full path must already exist. Cannot
// contain ':'. If this directory is non-empty, the database will start with
// the database in this directory and not run the setup scripts to create a
// new database.
// On OSX you need to change this to /private/var/lib/wubloader_postgres/
database_path:: "/var/lib/wubloader_postgres/", database_path:: "/var/lib/wubloader_postgres/",
// The host's port to expose each service on. // The host's port to expose each service on.
@ -54,21 +58,30 @@
// You can exec into the container and telnet to this port to get a python shell. // You can exec into the container and telnet to this port to get a python shell.
backdoor_port:: 1234, backdoor_port:: 1234,
// Other nodes to backfill from. You should not include the local node. // Other nodes to always backfill from. You should not include the local node.
// If you are using the database to find peers, you should leave this empty.
peers:: [ peers:: [
"http://wubloader.codegunner.com/"
], ],
authentication:: true, // set to false to disable auth in thrimshim
// Connection args for the database. // Connection args for the database.
// If database is defined in this config, host and port should be postgres:5432. // If database is defined in this config, host and port should be postgres:5432.
db_args:: { db_args:: {
user: "postgres", user: "vst",
password: "postgres", password: "dbfh2019", // don't use default in production. Must not contain ' or \ as these are not escaped.
host: "postgres", host: "postgres",
port: 5432, port: 5432,
dbname: "wubloader", dbname: "wubloader",
}, },
// Other database arguments
db_super_user:: "postgres", // only accessible from localhost
db_super_password:: "postgres", // Must not contain ' or \ as these are not escaped.
db_replication_user:: "replicate", // if empty, don't allow replication
db_replication_password:: "standby", // don't use default in production. Must not contain ' or \ as these are not escaped.
db_standby:: false, // set to true to have this database replicate another server
// Path to a JSON file containing google credentials as keys // Path to a JSON file containing google credentials as keys
// 'client_id', 'client_secret' and 'refresh_token'. // 'client_id', 'client_secret' and 'refresh_token'.
google_creds:: "./google_creds.json", google_creds:: "./google_creds.json",
@ -138,7 +151,7 @@
"--qualities", std.join(",", $.qualities), "--qualities", std.join(",", $.qualities),
"--static-nodes", std.join(",", $.peers), "--static-nodes", std.join(",", $.peers),
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
"--node-database", $.db_connect, "--node-database", $.db_connect,
], ],
// Mount the segments directory at /mnt // Mount the segments directory at /mnt
volumes: ["%s:/mnt" % $.segments_path], volumes: ["%s:/mnt" % $.segments_path],
@ -177,8 +190,7 @@
command: [ command: [
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
$.db_connect, $.db_connect,
// "--no-authentication", //uncomment to run thrimshim without authentication ] + if $.authentication then [] else ["--no-authentication"],
],
// Mount the segments directory at /mnt // Mount the segments directory at /mnt
volumes: ["%s:/mnt" % $.segments_path], volumes: ["%s:/mnt" % $.segments_path],
// If the application crashes, restart it. // If the application crashes, restart it.
@ -234,14 +246,22 @@
}, },
[if $.enabled.postgres then "postgres"]: { [if $.enabled.postgres then "postgres"]: {
image: "postgres:latest", image: "quay.io/ekimekim/wubloader-postgres:%s" % $.image_tag,
restart: "on-failure", restart: "on-failure",
[if "postgres" in $.ports then "ports"]: ["%s:5432" % $.ports.postgres], [if "postgres" in $.ports then "ports"]: ["%s:5432" % $.ports.postgres],
environment: { environment: {
POSTGRES_USER: $.db_args.user, POSTGRES_USER: $.db_super_user,
POSTGRES_PASSWORD: $.db_args.password, POSTGRES_PASSWORD: $.db_super_password,
POSTGRES_DB: $.db_args.dbname, POSTGRES_DB: $.db_args.dbname,
PGDATA: "/mnt/database",
WUBLOADER_USER: $.db_args.user,
WUBLOADER_PASSWORD: $.db_args.password,
REPLICATION_USER: $.db_replication_user,
REPLICATION_PASSWORD: $.db_replication_password,
MASTER_NODE: $.db_args.host,
}, },
volumes: ["%s:/mnt/database" % $.database_path, "%s:/mnt/wubloader" % $.segments_path],
[if $.db_standby then "command"]: ["/standby_setup.sh"],
}, },
}, },

@ -0,0 +1,5 @@
FROM postgres:latest
COPY postgres/setup.sh /docker-entrypoint-initdb.d/setup.sh
RUN chmod 0666 /docker-entrypoint-initdb.d/setup.sh
COPY postgres/standby_setup.sh /standby_setup.sh
RUN chmod 0700 /standby_setup.sh

@ -0,0 +1,108 @@
#! /bin/bash
set -e
# only allow the $WUBLOADER_USER to connect remotely rather than all users
sed -i "/host all all all/d" "$PGDATA/pg_hba.conf"
echo "host all $WUBLOADER_USER all md5" >> "$PGDATA/pg_hba.conf"
echo "Creating $WUBLOADER_USER"
psql -v ON_ERROR_STOP=1 -U $POSTGRES_USER <<-EOSQL
CREATE USER $WUBLOADER_USER LOGIN PASSWORD '$WUBLOADER_PASSWORD';
EOSQL
if [ -n "$REPLICATION_USER" ]; then
echo "Creating $REPLICATION_USER"
# allow the $REPLICATION user to replicate remotely
echo "host replication $REPLICATION_USER all md5" >> "$PGDATA/pg_hba.conf"
psql -v ON_ERROR_STOP=1 -U $POSTGRES_USER <<-EOSQL
CREATE USER $REPLICATION_USER LOGIN REPLICATION PASSWORD '$REPLICATION_PASSWORD';
EOSQL
cat >> ${PGDATA}/postgresql.conf <<-EOF
wal_level = replica
archive_mode = on
archive_command = 'cd .'
max_wal_senders = 8
wal_keep_segments = 8
EOF
fi
echo "Applying schema for $POSTGRES_DB"
psql -v ON_ERROR_STOP=1 -U $WUBLOADER_USER -d $POSTGRES_DB <<-EOSQL
CREATE TYPE event_state as ENUM (
'UNEDITED',
'EDITED',
'CLAIMED',
'FINALIZING',
'TRANSCODING',
'DONE'
);
CREATE TABLE events (
id UUID PRIMARY KEY,
event_start TIMESTAMP,
event_end TIMESTAMP,
category TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
submitter_winner TEXT NOT NULL DEFAULT '',
poster_moment BOOLEAN NOT NULL DEFAULT FALSE,
image_links TEXT[] NOT NULL DEFAULT '{}', -- default empty array
notes TEXT NOT NULL DEFAULT '',
allow_holes BOOLEAN NOT NULL DEFAULT FALSE,
uploader_whitelist TEXT[],
upload_location TEXT CHECK (state = 'UNEDITED' OR upload_location IS NOT NULL),
video_start TIMESTAMP CHECK (state IN ('UNEDITED', 'DONE') OR video_start IS NOT NULL),
video_end TIMESTAMP CHECK (state IN ('UNEDITED', 'DONE') OR video_end IS NOT NULL),
video_title TEXT CHECK (state IN ('UNEDITED', 'DONE') OR video_title IS NOT NULL),
video_description TEXT CHECK (state IN ('UNEDITED', 'DONE') OR video_description IS NOT NULL),
video_channel TEXT CHECK (state IN ('UNEDITED', 'DONE') OR video_channel IS NOT NULL),
video_quality TEXT NOT NULL DEFAULT 'source',
state event_state NOT NULL DEFAULT 'UNEDITED',
uploader TEXT CHECK (state IN ('UNEDITED', 'EDITED', 'DONE') OR uploader IS NOT NULL),
error TEXT,
video_id TEXT,
video_link TEXT CHECK (state != 'DONE' OR video_link IS NOT NULL),
editor TEXT,
edit_time TIMESTAMP CHECK (state = 'UNEDITED' OR editor IS NOT NULL),
upload_time TIMESTAMP CHECK (state != 'DONE' OR upload_time IS NOT NULL)
);
-- Index on state, since that's almost always what we're querying on besides id
CREATE INDEX event_state ON events (state);
CREATE TABLE nodes (
name TEXT PRIMARY KEY,
url TEXT NOT NULL,
backfill_from BOOLEAN NOT NULL DEFAULT TRUE
);
CREATE TABLE editors (
email TEXT PRIMARY KEY,
name TEXT NOT NULL
);
EOSQL
if [ -a /mnt/wubloader/nodes.csv ]; then
echo "Loading nodes from nodes.csv"
psql -v ON_ERROR_STOP=1 -U $POSTGRES_USER -d $POSTGRES_DB <<-EOF
COPY nodes FROM '/mnt/wubloader/nodes.csv' DELIMITER ',' CSV HEADER;
EOF
fi
if [ -a /mnt/wubloader/editors.csv ]; then
echo "Loading editors from editors.csv"
psql -v ON_ERROR_STOP=1 -U $POSTGRES_USER -d $POSTGRES_DB <<-EOF
COPY editors FROM '/mnt/wubloader/editors.csv' DELIMITER ',' CSV HEADER;
EOF
fi

@ -0,0 +1,24 @@
#! /bin/bash
set -e
# if postgres database does not exist in $PGDATA
if [ ! -s "$PGDATA/PG_VERSION" ]; then
# get a binary backup of the database on $MASTER_NODE
pg_basebackup -d "host=$MASTER_NODE password=$REPLICATION_PASSWORD port=5432 user=$REPLICATION_USER" -D ${PGDATA} -vP
cat > ${PGDATA}/recovery.conf <<-EOF
standby_mode = on
primary_conninfo = 'host=$MASTER_NODE password=$REPLICATION_PASSWORD port=5432 user=$REPLICATION_USER'
# touch this file to promote this node to master
trigger_file = '/tmp/touch_to_promote_to_master'
EOF
chown postgres. ${PGDATA} -R
chmod 700 ${PGDATA} -R
fi
# start postgres
gosu postgres postgres

@ -280,7 +280,15 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh
logging.info("Starting up") logging.info("Starting up")
dbmanager = DBManager(dsn=dbconnect) dbmanager = None
while dbmanager is None:
try:
dbmanager = DBManager(dsn=dbconnect)
except Exception:
delay = common.jitter(10)
logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay))
stop.wait(delay)
sheets_creds = json.load(open(sheets_creds_file)) sheets_creds = json.load(open(sheets_creds_file))
sheets = Sheets( sheets = Sheets(

@ -3,6 +3,7 @@ from functools import wraps
import json import json
import logging import logging
import signal import signal
import sys
import argh import argh
import flask import flask
@ -13,11 +14,12 @@ import prometheus_client
import psycopg2 import psycopg2
from psycopg2 import sql from psycopg2 import sql
from common import database, PromLogCountsHandler, install_stacksampler import common
from common import database
from common.flask_stats import request_stats, after_request from common.flask_stats import request_stats, after_request
from google.oauth2 import id_token import google.oauth2.id_token
from google.auth.transport import requests import google.auth.transport.requests
psycopg2.extras.register_uuid() psycopg2.extras.register_uuid()
app = flask.Flask('thrimshim') app = flask.Flask('thrimshim')
@ -46,46 +48,43 @@ def authenticate(f):
Reference: https://developers.google.com/identity/sign-in/web/backend-auth""" Reference: https://developers.google.com/identity/sign-in/web/backend-auth"""
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def auth_wrapper(*args, **kwargs):
if flask.request.method == 'POST': if app.no_authentication:
if app.no_authentication: return f(*args, editor='NOT_AUTH', **kwargs)
return f(*args, editor='NOT_AUTH', **kwargs)
try:
userToken = flask.request.json['token'] userToken = flask.request.json['token']
# check whether token is valid except (KeyError, TypeError):
try: return 'User token required', 401
idinfo = id_token.verify_oauth2_token(userToken, requests.Request(), None) # check whether token is valid
if idinfo['iss'] not in ['accounts.google.com', 'https://accounts.google.com']: try:
raise ValueError('Wrong issuer.') idinfo = google.oauth2.id_token.verify_oauth2_token(userToken, google.auth.transport.requests.Request(), None)
except ValueError: if idinfo['iss'] not in ['accounts.google.com', 'https://accounts.google.com']:
return 'Invalid token. Access denied.', 403 raise ValueError('Wrong issuer.')
except ValueError:
# check whether user is in the database return 'Invalid token. Access denied.', 403
email = idinfo['email']
conn = app.db_manager.get_conn() # check whether user is in the database
results = database.query(conn, """ email = idinfo['email'].lower()
SELECT email conn = app.db_manager.get_conn()
FROM editors results = database.query(conn, """
WHERE email = %s""", email) SELECT email
row = results.fetchone() FROM editors
if row is None: WHERE lower(email) = %s""", email)
return 'Unknown user. Access denied.', 403 row = results.fetchone()
if row is None:
return f(*args, editor=email, **kwargs) return 'Unknown user. Access denied.', 403
else: return f(*args, editor=email, **kwargs)
return f(*args, **kwargs)
return decorated_function return auth_wrapper
@app.route('/thrimshim/auth-test', methods=['GET', 'POST']) @app.route('/thrimshim/auth-test', methods=['POST'])
@request_stats @request_stats
@authenticate @authenticate
def test(editor=None): def test(editor=None):
if flask.request.method == 'POST': return json.dumps(editor)
return json.dumps(editor)
else:
return "Hello World!"
@app.route('/metrics') @app.route('/metrics')
@request_stats @request_stats
@ -116,17 +115,9 @@ def get_all_rows():
logging.info('All rows fetched') logging.info('All rows fetched')
return json.dumps(rows) return json.dumps(rows)
@app.route('/thrimshim/<uuid:ident>', methods=['GET', 'POST'])
@request_stats
@authenticate
def thrimshim(ident, editor=None):
"""Comunicate between Thrimbletrimmer and the Wubloader database."""
if flask.request.method == 'POST':
row = flask.request.json
return update_row(ident, row, editor)
else:
return get_row(ident)
@app.route('/thrimshim/<uuid:ident>', methods=['GET'])
@request_stats
def get_row(ident): def get_row(ident):
"""Gets the row from the database with id == ident.""" """Gets the row from the database with id == ident."""
conn = app.db_manager.get_conn() conn = app.db_manager.get_conn()
@ -150,7 +141,11 @@ def get_row(ident):
logging.info('Row {} fetched'.format(ident)) logging.info('Row {} fetched'.format(ident))
return json.dumps(response) return json.dumps(response)
def update_row(ident, new_row, editor): @app.route('/thrimshim/<uuid:ident>', methods=['POST'])
@request_stats
@authenticate
def update_row(ident, editor=None):
new_row = flask.request.json
"""Updates row of database with id = ident with the edit columns in """Updates row of database with id = ident with the edit columns in
new_row.""" new_row."""
@ -231,7 +226,7 @@ def update_row(ident, new_row, editor):
def manual_link(ident, editor=None): def manual_link(ident, editor=None):
"""Manually set a video_link if the state is 'UNEDITED' or 'DONE' and the """Manually set a video_link if the state is 'UNEDITED' or 'DONE' and the
upload_location is 'manual'.""" upload_location is 'manual'."""
link = flask.request.json link = flask.request.json['link']
conn = app.db_manager.get_conn() conn = app.db_manager.get_conn()
results = database.query(conn, """ results = database.query(conn, """
SELECT id, state, upload_location SELECT id, state, upload_location
@ -246,7 +241,7 @@ def manual_link(ident, editor=None):
results = database.query(conn, """ results = database.query(conn, """
UPDATE events UPDATE events
SET state='DONE', upload_location = 'manual', video_link = %s, SET state='DONE', upload_location = 'manual', video_link = %s,
editor = %s, edit_time = %s, upload_time = %s editor = %s, edit_time = %s, upload_time = %s
WHERE id = %s AND (state = 'UNEDITED' OR (state = 'DONE' AND WHERE id = %s AND (state = 'UNEDITED' OR (state = 'DONE' AND
upload_location = 'manual'))""", link, editor, now, now, ident) upload_location = 'manual'))""", link, editor, now, now, ident)
logging.info("Row {} video_link set to {}".format(ident, link)) logging.info("Row {} video_link set to {}".format(ident, link))
@ -279,16 +274,32 @@ def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0,
no_authentication=False): no_authentication=False):
"""Thrimshim service.""" """Thrimshim service."""
server = WSGIServer((host, port), cors(app)) server = WSGIServer((host, port), cors(app))
app.db_manager = database.DBManager(dsn=connection_string)
app.no_authentication = no_authentication app.no_authentication = no_authentication
stopping = gevent.event.Event()
def stop(): def stop():
logging.info('Shutting down') logging.info("Shutting down")
server.stop() stopping.set()
# handle when the server is running
if hasattr(server, 'socket'):
server.stop()
# and when not
else:
sys.exit()
gevent.signal(signal.SIGTERM, stop) gevent.signal(signal.SIGTERM, stop)
PromLogCountsHandler.install() app.db_manager = None
install_stacksampler() while app.db_manager is None and not stopping.is_set():
try:
app.db_manager = database.DBManager(dsn=connection_string)
except Exception:
delay = common.jitter(10)
logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay))
stopping.wait(delay)
common.PromLogCountsHandler.install()
common.install_stacksampler()
if backdoor_port: if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()

Loading…
Cancel
Save