Merge pull request #54 from ekimekim/mike/sheet-sync/initial

sheet sync
pull/58/head
Mike Lang 6 years ago committed by GitHub
commit 499e486b0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -8,7 +8,7 @@ set -eu
# Pass PUSH=true to also push the resulting images, or PUSH=latest to push them as :latest tag # Pass PUSH=true to also push the resulting images, or PUSH=latest to push them as :latest tag
# The different images we can build # The different images we can build
COMPONENTS=(downloader restreamer backfiller thrimshim cutter) COMPONENTS=(downloader restreamer backfiller thrimshim cutter sheetsync)
# Define push if not already defined # Define push if not already defined
PUSH=${PUSH:-} PUSH=${PUSH:-}

@ -21,6 +21,28 @@ def bustime_to_dt(start, bustime):
return start + datetime.timedelta(seconds=bustime) return start + datetime.timedelta(seconds=bustime)
def parse_bustime(bustime):
"""Convert from bus time human-readable string [-]HH:MM[:SS[.fff]]
to float seconds since bustime 00:00. Inverse of format_bustime(),
see it for detail."""
if bustime.startswith('-'):
# parse without the -, then negate it
return -parse_bustime(bustime[:1])
parts = bustime.strip().split(':')
if len(parts) == 2:
hours, mins = parts
secs = 0
elif len(parts) == 3:
hours, mins, secs = parts
else:
raise ValueError("Invalid bustime: must be HH:MM[:SS]")
hours = int(hours)
mins = int(mins)
secs = float(secs)
return 3600 * hours + 60 * mins + secs
def format_bustime(bustime, round="millisecond"): def format_bustime(bustime, round="millisecond"):
"""Convert bustime to a human-readable string (-)HH:MM:SS.fff, with the """Convert bustime to a human-readable string (-)HH:MM:SS.fff, with the
ending cut off depending on the value of round: ending cut off depending on the value of round:

@ -39,6 +39,7 @@ CREATE TABLE IF NOT EXISTS events (
description TEXT NOT NULL DEFAULT '', description TEXT NOT NULL DEFAULT '',
submitter_winner TEXT NOT NULL DEFAULT '', submitter_winner TEXT NOT NULL DEFAULT '',
poster_moment BOOLEAN NOT NULL DEFAULT FALSE, poster_moment BOOLEAN NOT NULL DEFAULT FALSE,
image_links TEXT[] NOT NULL DEFAULT '{}', -- default empty array
notes TEXT NOT NULL DEFAULT '', notes TEXT NOT NULL DEFAULT '',
allow_holes BOOLEAN NOT NULL DEFAULT FALSE, allow_holes BOOLEAN NOT NULL DEFAULT FALSE,
uploader_whitelist TEXT[], uploader_whitelist TEXT[],

@ -0,0 +1,62 @@
import time
import gevent
import requests
class GoogleAPIClient(object):
"""Manages access to google apis and maintains an active access token.
Make calls using client.request(), which is a wrapper for requests.request().
"""
ACCESS_TOKEN_ERROR_RETRY_INTERVAL = 10
# Refresh token 10min before it expires (it normally lasts an hour)
ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY = 600
def __init__(self, client_id, client_secret, refresh_token):
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self._first_get_access_token = gevent.spawn(self.get_access_token)
@property
def access_token(self):
"""Blocks if access token unavailable yet"""
self._first_get_access_token.join()
return self._access_token
def get_access_token(self):
"""Authenticates against google's API and retrieves a token we will use in
subsequent requests.
This function gets called automatically when needed, there should be no need to call it
yourself."""
while True:
try:
start_time = time.time()
resp = requests.post('https://www.googleapis.com/oauth2/v4/token', data={
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token,
'grant_type': 'refresh_token',
})
resp.raise_for_status()
data = resp.json()
self._access_token = data['access_token']
expires_in = (start_time + data['expires_in']) - time.time()
if expires_in < self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY:
self.logger.warning("Access token expires in {}s, less than normal leeway time of {}s".format(
expires_in, self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY,
))
gevent.spawn_later(expires_in - self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY, self.get_access_token)
except Exception:
self.logger.exception("Failed to fetch access token, retrying")
self.wait(self.ACCESS_TOKEN_ERROR_RETRY_INTERVAL)
else:
break
def request(self, method, url, headers={}, **kwargs):
# merge in auth header
headers = dict(headers, Authorization='Bearer {}'.format(self.access_token))
return requests.request(method, url, headers=headers, **kwargs)

@ -1,63 +1,15 @@
import logging import logging
import time
import gevent from common.googleapis import GoogleAPIClient
import requests
class Youtube(object): class Youtube(object):
"""Manages access to youtube and maintains an active access token""" """Manages youtube API operations"""
ACCESS_TOKEN_ERROR_RETRY_INTERVAL = 10
# Refresh token 10min before it expires (it normally lasts an hour)
ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY = 600
def __init__(self, client_id, client_secret, refresh_token): def __init__(self, client_id, client_secret, refresh_token):
self.logger = logging.getLogger(type(self).__name__) self.logger = logging.getLogger(type(self).__name__)
self.client_id = client_id self.client = GoogleAPIClient(client_id, client_secret, refresh_token)
self.client_secret = client_secret
self.refresh_token = refresh_token
self._first_get_access_token = gevent.spawn(self.get_access_token)
@property
def access_token(self):
"""Blocks if access token unavailable yet"""
self._first_get_access_token.join()
return self._access_token
def get_access_token(self):
"""Authenticates against the youtube API and retrieves a token we will use in
subsequent requests.
This function gets called automatically when needed, there should be no need to call it
yourself."""
while True:
try:
start_time = time.time()
resp = requests.post('https://www.googleapis.com/oauth2/v4/token', data={
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token,
'grant_type': 'refresh_token',
})
resp.raise_for_status()
data = resp.json()
self._access_token = data['access_token']
expires_in = (start_time + data['expires_in']) - time.time()
if expires_in < self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY:
self.logger.warning("Access token expires in {}s, less than normal leeway time of {}s".format(
expires_in, self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY,
))
gevent.spawn_later(expires_in - self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY, self.get_access_token)
except Exception:
self.logger.exception("Failed to fetch access token, retrying")
self.wait(self.ACCESS_TOKEN_ERROR_RETRY_INTERVAL)
else:
break
def auth_headers(self):
return {'Authorization': 'Bearer {}'.format(self.access_token)}
def upload_video(self, title, description, tags, data, hidden=False): def upload_video(self, title, description, tags, data, hidden=False):
"""Data may be a string, file-like object or iterator. Returns id.""" """Data may be a string, file-like object or iterator. Returns id."""
@ -72,7 +24,7 @@ class Youtube(object):
json['status'] = { json['status'] = {
'privacyStatus': 'unlisted', 'privacyStatus': 'unlisted',
} }
resp = requests.post( resp = self.client.request('POST',
'https://www.googleapis.com/upload/youtube/v3/videos', 'https://www.googleapis.com/upload/youtube/v3/videos',
headers=self.auth_headers(), headers=self.auth_headers(),
params={ params={
@ -83,7 +35,7 @@ class Youtube(object):
) )
resp.raise_for_status() resp.raise_for_status()
upload_url = resp.headers['Location'] upload_url = resp.headers['Location']
resp = requests.post(upload_url, headers=self.auth_headers(), data=data) resp = self.client.request('POST', upload_url, headers=self.auth_headers(), data=data)
resp.raise_for_status() resp.raise_for_status()
return resp.json()['id'] return resp.json()['id']
@ -97,7 +49,7 @@ class Youtube(object):
# Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable. # Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable.
for i in range(0, len(ids), 10): for i in range(0, len(ids), 10):
group = ids[i:i+10] group = ids[i:i+10]
resp = requests.get( resp = self.client.request('GET',
'https://www.googleapis.com/youtube/v3/videos', 'https://www.googleapis.com/youtube/v3/videos',
headers=self.auth_headers(), headers=self.auth_headers(),
params={ params={

@ -29,6 +29,7 @@
downloader: 8001, downloader: 8001,
backfiller: 8002, backfiller: 8002,
cutter: 8003, cutter: 8003,
sheetsync: 8005,
}, },
// The local port within each container to bind the backdoor server on. // The local port within each container to bind the backdoor server on.
@ -49,9 +50,19 @@
dbname: "wubloader", dbname: "wubloader",
}, },
// Path to a JSON file containing youtube credentials as keys // Path to a JSON file containing google credentials as keys
// 'client_id', 'client_secret' and 'refresh_token'. // 'client_id', 'client_secret' and 'refresh_token'.
youtube_creds:: "./youtube_creds.json", google_creds:: "./google_creds.json",
// The URL to write to the sheet for edit links, with {} being replaced by the id
edit_url:: "http://thrimbletrimmer.codegunner.com/{}",
// The timestamp corresponding to 00:00 in bustime
bustime_start:: "1970-01-01T00:00:00Z",
// The spreadsheet id and worksheet names for sheet sync to act on
sheet_id:: "your_id_here",
worksheets:: ["Day %d" % n for n in std.range(1, 7)],
// Now for the actual docker-compose config // Now for the actual docker-compose config
@ -120,18 +131,18 @@
cutter: { cutter: {
image: "quay.io/ekimekim/wubloader-cutter:%s" % $.image_tag, image: "quay.io/ekimekim/wubloader-cutter:%s" % $.image_tag,
// Args for the cutter: DB and youtube creds // Args for the cutter: DB and google creds
command: [ command: [
"--base-dir", "/mnt", "--base-dir", "/mnt",
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
$.db_connect, $.db_connect,
"/etc/wubloader-youtube-creds.json", "/etc/wubloader-google-creds.json",
], ],
volumes: [ volumes: [
// Mount the segments directory at /mnt // Mount the segments directory at /mnt
"%s:/mnt" % $.segments_path, "%s:/mnt" % $.segments_path,
// Mount the creds file into /etc // Mount the creds file into /etc
"%s:/etc/wubloader-youtube-creds.json" % $.youtube_creds, "%s:/etc/wubloader-google-creds.json" % $.google_creds,
], ],
// If the application crashes, restart it. // If the application crashes, restart it.
restart: "on-failure", restart: "on-failure",
@ -156,6 +167,28 @@
ports: ["%s:8004" % $.ports.thrimshim] ports: ["%s:8004" % $.ports.thrimshim]
}, },
sheetsync: {
image: "quay.io/ekimekim/wubloader-sheetsync:%s" % $.image_tag,
// Args for the sheetsync
command: [
"--backdoor-port", std.toString($.backdoor_port),
$.db_connect,
"/etc/wubloader-google-creds.json",
$.edit_url,
$.bustime_start,
$.sheet_id,
] + $.worksheets,
volumes: [
// Mount the creds file into /etc
"%s:/etc/wubloader-google-creds.json" % $.google_creds,
],
// If the application crashes, restart it.
restart: "on-failure",
// Expose on the configured host port by mapping that port to the default
// port for sheetsync, which is 8005.
ports: ["%s:8005" % $.ports.sheetsync]
},
}, },
} }

@ -0,0 +1,16 @@
FROM alpine:3.7
# dependencies needed for compiling c extensions
# also busybox-extras for telnet for easier use of backdoor
# and postgresql-dev as a dependency of psycopg2.
# Add postgresql-client for easier debugging of DB issues.
RUN apk --update add py2-pip gcc python-dev musl-dev busybox-extras postgresql-dev postgresql-client
# Install common lib first as it changes less
COPY common /tmp/common
RUN pip install /tmp/common && rm -r /tmp/common
# Install actual application
COPY sheetsync /tmp/sheetsync
RUN pip install /tmp/sheetsync && rm -r /tmp/sheetsync
ENTRYPOINT ["python2", "-m", "sheetsync"]

@ -0,0 +1,17 @@
from setuptools import setup, find_packages
setup(
name = "wubloader-cutter",
version = "0.0.0",
packages = find_packages(),
install_requires = [
"argh",
"gevent",
"prometheus-client",
"psycogreen",
"psycopg2",
"python-dateutil",
"requests",
"wubloader-common",
],
)

@ -0,0 +1,16 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import os
import argh
from sheetsync.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=level, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -0,0 +1,284 @@
import json
import logging
import signal
import uuid
import argh
import gevent.backdoor
import gevent.event
import prometheus_client as prom
from psycopg2 import sql
from psycopg2.extras import register_uuid
import common
import common.dateutil
from common.database import DBManager, query
from .sheets import Sheets
class SheetSync(object):
# Time between syncs
RETRY_INTERVAL = 5
# Time to wait after getting an error
ERROR_RETRY_INTERVAL = 10
def __init__(self, stop, dbmanager, sheets, sheet_id, worksheets, edit_url, bustime_start, allocate_ids=False):
self.stop = stop
self.conn = dbmanager.get_conn()
self.sheets = sheets
self.sheet_id = sheet_id
self.worksheets = worksheets
self.edit_url = edit_url
self.bustime_start = bustime_start
self.allocate_ids = allocate_ids
# Maps DB column names (or general identifier, for non-DB columns) to sheet column indexes.
# Hard-coded for now, future work: determine this from column headers in sheet
self.column_map = {
'event_start': 0,
'event_end': 1,
'category': 2,
'description': 3,
'submitter_winner': 4,
'poster_moment': 5,
'image_links': 6,
'marked_for_edit': 7,
'notes': 8,
'video_link': 10,
'state': 11,
'edit_link': 12,
'error': 13,
'id': 14,
}
# Maps column names to a function that parses that column's value.
# Functions take a single arg (the value to parse) and ValueError is
# interpreted as None.
# Columns missing from this map default to simply using the string value.
self.column_parsers = {
'event_start': self.parse_bustime,
'event_end': self.parse_bustime,
'poster_moment': lambda v: v == u'[\u2713]', # check mark
'image_links': lambda v: [link.strip() for link in v.split()] if v.strip() else [],
'id': lambda v: uuid.UUID(v) if v.strip() else None,
}
# List of input columns
self.input_columns = [
'event_start',
'event_end',
'category',
'description',
'submitter_winner',
'poster_moment',
'image_links',
'notes',
]
# List of output columns
self.output_columns = [
'video_link',
'state',
'error',
]
def parse_bustime(self, value):
"""Convert from HH:MM or HH:MM:SS format to datetime"""
bustime = common.parse_bustime(value)
return common.bustime_to_dt(self.bustime_start, bustime)
def wait(self, interval):
self.stop.wait(common.jitter(interval))
def run(self):
while not self.stop.is_set():
try:
# Since the full dataset is small, the cost of round tripping to the database to check
# each row is more expensive than the cost of just grabbing the entire table
# and comparing locally.
events = self.get_events()
for worksheet in self.worksheets:
rows = self.sheets.get_rows(self.sheet_id, worksheet)
for row_index, row in enumerate(rows):
# Skip first row (ie. the column titles).
# Need to do it inside the loop and not eg. use rows[1:],
# because then row_index won't be correct.
if row_index == 0:
continue
row = self.parse_row(row)
self.sync_row(worksheet, row_index, row, events.get(row['id']))
except Exception:
logging.exception("Failed to sync")
self.wait(self.ERROR_RETRY_INTERVAL)
else:
logging.info("Successful sync")
self.wait(self.RETRY_INTERVAL)
def get_events(self):
"""Return the entire events table as a map {id: event namedtuple}"""
result = query(self.conn, "SELECT * FROM events")
by_id = {}
for row in result.fetchall():
by_id[row.id] = row
return by_id
def parse_row(self, row):
"""Take a row as a sequence of columns, and return a dict {column: value}"""
row_dict = {}
for column, index in self.column_map.items():
if index >= len(row):
# Sheets omits trailing columns if they're all empty, so substitute empty string
value = ''
else:
value = row[index]
if column in self.column_parsers:
try:
value = self.column_parsers[column](value)
except ValueError:
value = None
row_dict[column] = value
return row_dict
def sync_row(self, worksheet, row_index, row, event):
"""Take a row dict and an Event from the database (or None if id not found)
and take whatever action is required to sync them, ie. writing to the database or sheet."""
if event is None:
# No event currently in DB, if any field is non-empty, then create it.
# Otherwise ignore it.
if not any(row[col] for col in self.input_columns):
return
# Only generate row when needed (unless it's already there)
# Originally we would allocate rows on first sync, but this led to rate limiting issues.
if row['id'] is None:
if self.allocate_ids:
row['id'] = uuid.uuid4()
logging.info("Allocating id for row {!r}:{} = {}".format(worksheet, row_index, row['id']))
self.sheets.write_value(
self.sheet_id, worksheet,
row_index, self.column_map['id'],
str(row['id']),
)
else:
logging.warning("Row {!r}:{} has no valid id, skipping".format(worksheet, row_index))
return
logging.info("Inserting new event {}".format(row['id']))
# Insertion conflict just means that another sheet sync beat us to the insert.
# We can ignore it.
insert_cols = ['id'] + self.input_columns
built_query = sql.SQL("""
INSERT INTO events ({})
VALUES ({})
ON CONFLICT DO NOTHING
""").format(
sql.SQL(", ").join(sql.Identifier(col) for col in insert_cols),
sql.SQL(", ").join(sql.Placeholder(col) for col in insert_cols),
)
query(self.conn, built_query, **row)
return
# Update database with any changed inputs
changed = [col for col in self.input_columns if row[col] != getattr(event, col)]
if changed:
logging.info("Updating event {} with new value(s) for {}".format(
row['id'], ', '.join(changed)
))
built_query = sql.SQL("""
UPDATE events
SET {}
WHERE id = %(id)s
""").format(sql.SQL(", ").join(
sql.SQL("{} = {}").format(
sql.Identifier(col), sql.Placeholder(col)
) for col in changed
))
query(self.conn, built_query, **row)
# Update sheet with any changed outputs
format_output = lambda v: '' if v is None else v # cast nulls to empty string
changed = [col for col in self.output_columns if row[col] != format_output(getattr(event, col))]
if changed:
logging.info("Updating sheet row {} with new value(s) for {}".format(
row['id'], ', '.join(changed)
))
for col in changed:
self.sheets.write_value(
self.sheet_id, worksheet,
row_index, self.column_map[col],
format_output(getattr(event, col)),
)
# Set edit link if marked for editing and start/end set.
# This prevents accidents / clicking the wrong row and provides
# feedback that sheet sync is still working.
# Also clear it if it shouldn't be set.
edit_link = self.edit_url.format(row['id']) if row['marked_for_edit'] == '[+] Marked' else ''
if row['edit_link'] != edit_link:
logging.info("Updating sheet row {} with edit link {}".format(row['id'], edit_link))
self.sheets.write_value(
self.sheet_id, worksheet,
row_index, self.column_map['edit_link'],
edit_link,
)
@argh.arg('dbconnect', help=
"dbconnect should be a postgres connection string, which is either a space-separated "
"list of key=value pairs, or a URI like:\n"
"\tpostgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE"
)
@argh.arg('sheets-creds-file', help=
"sheets_creds_file should be a json file containing keys "
"'client_id', 'client_secret' and 'refresh_token'."
)
@argh.arg('edit-url', help=
'edit_url should be a format string for edit links, with {} as a placeholder for id. '
'eg. "https://myeditor.example.com/edit/{}" will produce edit urls like '
'"https://myeditor.example.com/edit/da6cf4df-4871-4a9a-a660-0b1e1a6a9c10".'
)
@argh.arg('bustime_start', type=common.dateutil.parse, help=
"bustime_start is the timestamp which is bustime 00:00."
)
@argh.arg('worksheet-names', nargs='+', help=
"The names of the individual worksheets within the sheet to operate on."
)
@argh.arg('--allocate-ids', help=
"--allocate-ids means that it will give rows without ids an id. "
"Only one sheet sync should have --allocate-ids on for a given sheet at once!"
)
def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksheet_names, metrics_port=8005, backdoor_port=0, allocate_ids=False):
"""
Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet
to the DB and outputs from the DB to the sheet.
With the exception of id allocation, all operations are idempotent and multiple sheet syncs
may be run for redundancy.
"""
common.PromLogCountsHandler.install()
common.install_stacksampler()
prom.start_http_server(metrics_port)
register_uuid()
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
stop = gevent.event.Event()
gevent.signal(signal.SIGTERM, stop.set) # shut down on sigterm
logging.info("Starting up")
dbmanager = DBManager(dsn=dbconnect)
sheets_creds = json.load(open(sheets_creds_file))
sheets = Sheets(
client_id=sheets_creds['client_id'],
client_secret=sheets_creds['client_secret'],
refresh_token=sheets_creds['refresh_token'],
)
SheetSync(stop, dbmanager, sheets, sheet_id, worksheet_names, edit_url, bustime_start, allocate_ids).run()
logging.info("Gracefully stopped")

@ -0,0 +1,64 @@
import logging
from common.googleapis import GoogleAPIClient
class Sheets(object):
"""Manages Google Sheets API operations"""
def __init__(self, client_id, client_secret, refresh_token):
self.logger = logging.getLogger(type(self).__name__)
self.client = GoogleAPIClient(client_id, client_secret, refresh_token)
def get_rows(self, spreadsheet_id, sheet_name, range=None):
"""Return a list of rows, where each row is a list of the values of each column.
Range optionally restricts returned rows, and uses A1 format, eg. "A1:B5".
"""
if range:
range = "'{}'!{}".format(sheet_name, range)
else:
range = "'{}'".format(sheet_name)
resp = self.client.request('GET',
'https://sheets.googleapis.com/v4/spreadsheets/{}/values/{}'.format(
spreadsheet_id, range,
),
)
resp.raise_for_status()
data = resp.json()
return data['values']
def write_value(self, spreadsheet_id, sheet_name, row, column, value):
"""Write value to the row and column (0-based) given."""
range = "'{sheet}'!{col}{row}:{col}{row}".format(
sheet = sheet_name,
row = row + 1, # 1-indexed rows in range syntax
col = self.index_to_column(column),
)
resp = self.client.request('PUT',
'https://sheets.googleapis.com/v4/spreadsheets/{}/values/{}'.format(
spreadsheet_id, range,
),
params={
"valueInputOption": "1", # RAW
},
json={
"range": range,
"values": [[value]],
},
)
resp.raise_for_status()
def index_to_column(self, index):
"""For a given column index, convert to a column description, eg. 0 -> A, 1 -> B, 26 -> AA."""
# This is equivalent to the 0-based index in base-26 (where A = 0, B = 1, ..., Z = 25)
digits = []
while index:
index, digit = divmod(index, 26)
digits.append(digit)
# We now have the digits, but they're backwards.
digits = digits[::-1]
# Now convert the digits to letters
digits = [chr(ord('A') + digit) for digit in digits]
# Finally, convert to string
return ''.join(digits)
Loading…
Cancel
Save