From 4172ad9e7fc4df8a64bd6be2562bde16c3a8bf3d Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 3 Feb 2019 06:17:27 -0800 Subject: [PATCH] Add some common database code This code manages the database connections, setting their isolation level correctly and ensuring the idempotent schema is applied before they're used. Applying the schema on startup means we don't need to deal with the database's state, setting it up before running, running migrations etc. However, it does put constraints on the changes we can safely make. Our use of seralizable isolation means that all transactions can be treated as fully independent - the server must behave as though they'd been run seperately in some valid order. This will give us the least surprising results when multiple connections try to modify the same data, though we'll need to deal with occasional transaction commit failures due to conficts. --- common/common/database.py | 124 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 common/common/database.py diff --git a/common/common/database.py b/common/common/database.py new file mode 100644 index 0000000..018122c --- /dev/null +++ b/common/common/database.py @@ -0,0 +1,124 @@ + +""" +Code shared between components that touch the database. +Note that this code requires psycopg2 and psycogreen, but the common module +as a whole does not to avoid needing to install them for components that don't need it. +""" + +import psycopg2 +import psycopg2.extensions +import psycopg2.extras +from psycogreen.gevent import patch_psycopg + + +# Schema is applied on startup and should be idemponent, +# and include any migrations potentially needed. +SCHEMA = """ + +-- Create type if it doesn't already exist +DO $$ BEGIN + CREATE TYPE event_state as ENUM ( + 'UNEDITED', + 'EDITED', + 'CLAIMED', + 'FINALIZING', + 'TRANSCODING', + 'DONE' + ); +EXCEPTION WHEN duplicate_object THEN + NULL; +END $$; + +CREATE TABLE IF NOT EXISTS events ( + id UUID PRIMARY KEY, + event_start TIMESTAMP, + event_end TIMESTAMP, + category TEXT NOT NULL DEFAULT '', + description TEXT NOT NULL DEFAULT '', + submitter_winner TEXT NOT NULL DEFAULT '', + poster_moment BOOLEAN NOT NULL DEFAULT FALSE, + notes TEXT NOT NULL DEFAULT '', + allow_holes BOOLEAN NOT NULL DEFAULT FALSE, + uploader_whitelist TEXT[], + upload_location TEXT NOT NULL DEFAULT '', + video_start TIMESTAMP, + video_end TIMESTAMP, + video_title TEXT, + video_description TEXT, + video_channel TEXT, + video_quality TEXT NOT NULL DEFAULT 'source', + state event_state NOT NULL DEFAULT 'UNEDITED', + uploader TEXT, + error TEXT, + video_link TEXT +); + +-- Index on state, since that's almost always what we're querying on besides id +CREATE INDEX IF NOT EXISTS event_state ON events (state); + +""" + + +class DBManager(object): + """Patches psycopg2 before any connections are created, and applies the schema. + Stores connect info for easy creation of new connections, and sets some defaults before + returning them. + + It has the ability to serve as a primitive connection pool, as getting a new conn will + return existing conns it knows about first, but this mainly just exists to re-use + the initial conn used to apply the schema, and you should use a real conn pool for + any non-trivial use. + + Returned conns are set to seralizable isolation level, and use NamedTupleCursor cursors. + """ + def __init__(self, **connect_kwargs): + patch_psycopg() + self.conns = [] + self.connect_kwargs = connect_kwargs + conn = self.get_conn() + with conn: + with conn.cursor() as cur: + cur.execute(SCHEMA) + self.put_conn(conn) + + def put_conn(self, conn): + self.conns.append(conn) + + def get_conn(self): + if self.conns: + return self.conns.pop(0) + conn = psycopg2.connect(cursor_factory=psycopg2.extras.NamedTupleCursor, **self.connect_kwargs) + # We use serializable because it means less issues to think about, + # we don't care about the performance concerns and everything we do is easily retryable. + # This shouldn't matter in practice anyway since everything we're doing is either read-only + # searches or targetted single-row updates. + conn.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + return conn + + +def retry_on_conflict(conn, func, *args, **kwargs): + """Run func(conn, *args, **kwargs) in a transaction up to max_tries (given as kwarg) times + (at least once, default 5), retrying if it raises an error indicating a transaction conflict. + After max_tries, raises TransactionRollbackError. + """ + max_tries = kwargs.pop('max_tries', 5) + for _ in range(max_tries - 1): + try: + with conn: + return func(conn) + except psycopg2.extensions.TransactionRollbackError: + pass + with conn: + return func(conn) + + +def query(conn, query, *args, **kwargs): + """Helper that takes a conn, creates a cursor and executes query against it, + then returns the cursor. + Variables may be given as positional or keyword args (but not both), corresponding + to %s vs %(key)s placeholder forms.""" + if args and kwargs: + raise TypeError("Cannot give both args and kwargs") + cur = conn.cursor() + cur.execute(query, args or kwargs or None) + return cur