Add some common database code

This code manages the database connections, setting their isolation level correctly
and ensuring the idempotent schema is applied before they're used.

Applying the schema on startup means we don't need to deal with the database's state,
setting it up before running, running migrations etc. However, it does put constraints on
the changes we can safely make.

Our use of seralizable isolation means that all transactions can be treated as fully
independent - the server must behave as though they'd been run seperately in some valid order.
This will give us the least surprising results when multiple connections try to modify the same
data, though we'll need to deal with occasional transaction commit failures due to conficts.
pull/45/head
Mike Lang 6 years ago committed by Christopher Usher
parent cea66a4bbf
commit dc2eb6ed74

@ -0,0 +1,124 @@
"""
Code shared between components that touch the database.
Note that this code requires psycopg2 and psycogreen, but the common module
as a whole does not to avoid needing to install them for components that don't need it.
"""
import psycopg2
import psycopg2.extensions
import psycopg2.extras
from psycogreen.gevent import patch_psycopg
# Schema is applied on startup and should be idemponent,
# and include any migrations potentially needed.
SCHEMA = """
-- Create type if it doesn't already exist
DO $$ BEGIN
CREATE TYPE event_state as ENUM (
'UNEDITED',
'EDITED',
'CLAIMED',
'FINALIZING',
'TRANSCODING',
'DONE'
);
EXCEPTION WHEN duplicate_object THEN
NULL;
END $$;
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY,
event_start TIMESTAMP,
event_end TIMESTAMP,
category TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
submitter_winner TEXT NOT NULL DEFAULT '',
poster_moment BOOLEAN NOT NULL DEFAULT FALSE,
notes TEXT NOT NULL DEFAULT '',
allow_holes BOOLEAN NOT NULL DEFAULT FALSE,
uploader_whitelist TEXT[],
upload_location TEXT NOT NULL DEFAULT '',
video_start TIMESTAMP,
video_end TIMESTAMP,
video_title TEXT,
video_description TEXT,
video_channel TEXT,
video_quality TEXT NOT NULL DEFAULT 'source',
state event_state NOT NULL DEFAULT 'UNEDITED',
uploader TEXT,
error TEXT,
video_link TEXT
);
-- Index on state, since that's almost always what we're querying on besides id
CREATE INDEX IF NOT EXISTS event_state ON events (state);
"""
class DBManager(object):
"""Patches psycopg2 before any connections are created, and applies the schema.
Stores connect info for easy creation of new connections, and sets some defaults before
returning them.
It has the ability to serve as a primitive connection pool, as getting a new conn will
return existing conns it knows about first, but this mainly just exists to re-use
the initial conn used to apply the schema, and you should use a real conn pool for
any non-trivial use.
Returned conns are set to seralizable isolation level, and use NamedTupleCursor cursors.
"""
def __init__(self, **connect_kwargs):
patch_psycopg()
self.conns = []
self.connect_kwargs = connect_kwargs
conn = self.get_conn()
with conn:
with conn.cursor() as cur:
cur.execute(SCHEMA)
self.put_conn(conn)
def put_conn(self, conn):
self.conns.append(conn)
def get_conn(self):
if self.conns:
return self.conns.pop(0)
conn = psycopg2.connect(cursor_factory=psycopg2.extras.NamedTupleCursor, **self.connect_kwargs)
# We use serializable because it means less issues to think about,
# we don't care about the performance concerns and everything we do is easily retryable.
# This shouldn't matter in practice anyway since everything we're doing is either read-only
# searches or targetted single-row updates.
conn.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
return conn
def retry_on_conflict(conn, func, *args, **kwargs):
"""Run func(conn, *args, **kwargs) in a transaction up to max_tries (given as kwarg) times
(at least once, default 5), retrying if it raises an error indicating a transaction conflict.
After max_tries, raises TransactionRollbackError.
"""
max_tries = kwargs.pop('max_tries', 5)
for _ in range(max_tries - 1):
try:
with conn:
return func(conn)
except psycopg2.extensions.TransactionRollbackError:
pass
with conn:
return func(conn)
def query(conn, query, *args, **kwargs):
"""Helper that takes a conn, creates a cursor and executes query against it,
then returns the cursor.
Variables may be given as positional or keyword args (but not both), corresponding
to %s vs %(key)s placeholder forms."""
if args and kwargs:
raise TypeError("Cannot give both args and kwargs")
cur = conn.cursor()
cur.execute(query, args or kwargs or None)
return cur
Loading…
Cancel
Save