diff --git a/common/common/database.py b/common/common/database.py new file mode 100644 index 0000000..018122c --- /dev/null +++ b/common/common/database.py @@ -0,0 +1,124 @@ + +""" +Code shared between components that touch the database. +Note that this code requires psycopg2 and psycogreen, but the common module +as a whole does not to avoid needing to install them for components that don't need it. +""" + +import psycopg2 +import psycopg2.extensions +import psycopg2.extras +from psycogreen.gevent import patch_psycopg + + +# Schema is applied on startup and should be idemponent, +# and include any migrations potentially needed. +SCHEMA = """ + +-- Create type if it doesn't already exist +DO $$ BEGIN + CREATE TYPE event_state as ENUM ( + 'UNEDITED', + 'EDITED', + 'CLAIMED', + 'FINALIZING', + 'TRANSCODING', + 'DONE' + ); +EXCEPTION WHEN duplicate_object THEN + NULL; +END $$; + +CREATE TABLE IF NOT EXISTS events ( + id UUID PRIMARY KEY, + event_start TIMESTAMP, + event_end TIMESTAMP, + category TEXT NOT NULL DEFAULT '', + description TEXT NOT NULL DEFAULT '', + submitter_winner TEXT NOT NULL DEFAULT '', + poster_moment BOOLEAN NOT NULL DEFAULT FALSE, + notes TEXT NOT NULL DEFAULT '', + allow_holes BOOLEAN NOT NULL DEFAULT FALSE, + uploader_whitelist TEXT[], + upload_location TEXT NOT NULL DEFAULT '', + video_start TIMESTAMP, + video_end TIMESTAMP, + video_title TEXT, + video_description TEXT, + video_channel TEXT, + video_quality TEXT NOT NULL DEFAULT 'source', + state event_state NOT NULL DEFAULT 'UNEDITED', + uploader TEXT, + error TEXT, + video_link TEXT +); + +-- Index on state, since that's almost always what we're querying on besides id +CREATE INDEX IF NOT EXISTS event_state ON events (state); + +""" + + +class DBManager(object): + """Patches psycopg2 before any connections are created, and applies the schema. + Stores connect info for easy creation of new connections, and sets some defaults before + returning them. + + It has the ability to serve as a primitive connection pool, as getting a new conn will + return existing conns it knows about first, but this mainly just exists to re-use + the initial conn used to apply the schema, and you should use a real conn pool for + any non-trivial use. + + Returned conns are set to seralizable isolation level, and use NamedTupleCursor cursors. + """ + def __init__(self, **connect_kwargs): + patch_psycopg() + self.conns = [] + self.connect_kwargs = connect_kwargs + conn = self.get_conn() + with conn: + with conn.cursor() as cur: + cur.execute(SCHEMA) + self.put_conn(conn) + + def put_conn(self, conn): + self.conns.append(conn) + + def get_conn(self): + if self.conns: + return self.conns.pop(0) + conn = psycopg2.connect(cursor_factory=psycopg2.extras.NamedTupleCursor, **self.connect_kwargs) + # We use serializable because it means less issues to think about, + # we don't care about the performance concerns and everything we do is easily retryable. + # This shouldn't matter in practice anyway since everything we're doing is either read-only + # searches or targetted single-row updates. + conn.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + return conn + + +def retry_on_conflict(conn, func, *args, **kwargs): + """Run func(conn, *args, **kwargs) in a transaction up to max_tries (given as kwarg) times + (at least once, default 5), retrying if it raises an error indicating a transaction conflict. + After max_tries, raises TransactionRollbackError. + """ + max_tries = kwargs.pop('max_tries', 5) + for _ in range(max_tries - 1): + try: + with conn: + return func(conn) + except psycopg2.extensions.TransactionRollbackError: + pass + with conn: + return func(conn) + + +def query(conn, query, *args, **kwargs): + """Helper that takes a conn, creates a cursor and executes query against it, + then returns the cursor. + Variables may be given as positional or keyword args (but not both), corresponding + to %s vs %(key)s placeholder forms.""" + if args and kwargs: + raise TypeError("Cannot give both args and kwargs") + cur = conn.cursor() + cur.execute(query, args or kwargs or None) + return cur