Sheetsync: Split into the main loop logic + sheets-specific middleware

NOTE ON CONFLICTS

In master, we moved sheets.py to common as it only contained a generic client.
Now sheets.py also contains specific sheetsync stuff.

Our resolution:
- Keep the generic version in common
- Keep the old version verbatim (including the now-redundant generic client) in sheetsync

We will move the sheetsync implementation to the generic client after the rebase is complete.
pull/401/head
Mike Lang 1 year ago committed by Mike Lang
parent 0e5bf1a0fe
commit 72f7c59a77

@ -2,7 +2,6 @@
import json import json
import logging import logging
import signal import signal
import uuid
from collections import defaultdict from collections import defaultdict
import argh import argh
@ -18,7 +17,7 @@ import common
import common.dateutil import common.dateutil
from common.database import DBManager, query, get_column_placeholder from common.database import DBManager, query, get_column_placeholder
from common.sheets import Sheets from .sheets import SheetsClient, SheetsMiddleware
sheets_synced = prom.Counter( sheets_synced = prom.Counter(
'sheets_synced', 'sheets_synced',
@ -70,61 +69,11 @@ class SheetSync(object):
# Time to wait after getting an error # Time to wait after getting an error
ERROR_RETRY_INTERVAL = 10 ERROR_RETRY_INTERVAL = 10
# How many syncs of active sheets to do before checking inactive sheets. def __init__(self, middleware, stop, dbmanager, edit_url):
# By checking inactive sheets less often, we stay within our API limits. self.middleware = middleware
# For example, 4 syncs per inactive check * 5 seconds between syncs = 20s between inactive checks
SYNCS_PER_INACTIVE_CHECK = 4
# How many worksheets to keep "active" based on most recent modify time
ACTIVE_SHEET_COUNT = 2
# Expected quota usage per 100s =
# (100 / RETRY_INTERVAL) * ACTIVE_SHEET_COUNT
# + (100 / RETRY_INTERVAL / SYNCS_PER_INACTIVE_CHECK) * (len(worksheets) - ACTIVE_SHEET_COUNT)
# If playlist_worksheet is defined, add 1 to len(worksheets).
# For current values, this is 100/5 * 2 + 100/5/4 * 7 = 75
def __init__(self, stop, dbmanager, sheets, sheet_id, worksheets, edit_url, bustime_start, allocate_ids=False):
self.stop = stop self.stop = stop
self.dbmanager = dbmanager self.dbmanager = dbmanager
self.sheets = sheets
self.sheet_id = sheet_id
# map {worksheet: last modify time}
self.worksheets = {w: 0 for w in worksheets}
self.edit_url = edit_url self.edit_url = edit_url
self.bustime_start = bustime_start
self.allocate_ids = allocate_ids
# Maps DB column names (or general identifier, for non-DB columns) to sheet column indexes.
# Hard-coded for now, future work: determine this from column headers in sheet
self.column_map = {
'event_start': 0,
'event_end': 1,
'category': 2,
'description': 3,
'submitter_winner': 4,
'poster_moment': 5,
'image_links': 6,
'marked_for_edit': 7,
'notes': 8,
'tags': 9,
'video_link': 11,
'state': 12,
'edit_link': 13,
'error': 14,
'id': 15,
}
# Maps column names to a function that parses that column's value.
# Functions take a single arg (the value to parse) and ValueError is
# interpreted as None.
# Columns missing from this map default to simply using the string value.
self.column_parsers = {
'event_start': lambda v: self.parse_bustime(v),
'event_end': lambda v: self.parse_bustime(v, preserve_dash=True),
'poster_moment': lambda v: v == '[\u2713]', # check mark
'image_links': lambda v: [link.strip() for link in v.split()] if v.strip() else [],
'tags': lambda v: [tag.strip() for tag in v.split(',') if tag.strip()],
'id': lambda v: uuid.UUID(v) if v.strip() else None,
}
# List of input columns # List of input columns
self.input_columns = [ self.input_columns = [
'event_start', 'event_start',
@ -144,24 +93,9 @@ class SheetSync(object):
'error', 'error',
] ]
def parse_bustime(self, value, preserve_dash=False):
"""Convert from HH:MM or HH:MM:SS format to datetime.
If preserve_dash=True and value is "--", returns "--"
as a sentinel value instead of None. "" will still result in None.
"""
if not value.strip():
return None
if value.strip() == "--":
return "--" if preserve_dash else None
bustime = common.parse_bustime(value)
return common.bustime_to_dt(self.bustime_start, bustime)
def run(self): def run(self):
self.conn = self.dbmanager.get_conn() self.conn = self.dbmanager.get_conn()
# tracks when to do inactive checks
sync_count = 0
while not self.stop.is_set(): while not self.stop.is_set():
try: try:
@ -170,46 +104,10 @@ class SheetSync(object):
# each row is more expensive than the cost of just grabbing the entire table # each row is more expensive than the cost of just grabbing the entire table
# and comparing locally. # and comparing locally.
events = self.get_events() events = self.get_events()
if sync_count % self.SYNCS_PER_INACTIVE_CHECK == 0:
# check all worksheets
worksheets = self.worksheets
else:
# only check most recently changed worksheets
worksheets = sorted(
self.worksheets.keys(), key=lambda k: self.worksheets[k], reverse=True,
)[:self.ACTIVE_SHEET_COUNT]
sync_count += 1
worksheets = self.middleware.pick_worksheets()
for worksheet in worksheets: for worksheet in worksheets:
rows = self.sheets.get_rows(self.sheet_id, worksheet) for row in self.middleware.get_rows(worksheet):
for row_index, row in enumerate(rows):
# Skip first row (ie. the column titles).
# Need to do it inside the loop and not eg. use rows[1:],
# because then row_index won't be correct.
if row_index == 0:
continue
row = self.parse_row(worksheet, row_index, row)
if row['id'] is None:
# If a row is all empty (including no id), ignore it.
# Ignore the tags column for this check since it is never non-empty due to implicit tags
# (and even if there's other tags, we don't care if there's nothing else in the row).
if not any(row[col] for col in self.input_columns if col != 'tags'):
continue
# If we can't allocate ids, warn and ignore.
if not self.allocate_ids:
logging.warning(f"Row {worksheet!r}:{row['index']} has no valid id, skipping")
continue
# Otherwise, allocate id for a new row.
row['id'] = uuid.uuid4()
logging.info(f"Allocating id for row {worksheet!r}:{row['index']} = {row['id']}")
self.sheets.write_value(
self.sheet_id, worksheet,
row["index"], self.column_map['id'],
str(row['id']),
)
self.sync_row(worksheet, row, events.get(row['id'])) self.sync_row(worksheet, row, events.get(row['id']))
except Exception as e: except Exception as e:
@ -255,39 +153,6 @@ class SheetSync(object):
event_counts.labels(*labels).set(count) event_counts.labels(*labels).set(count)
return by_id return by_id
def parse_row(self, worksheet, row_index, row):
"""Take a row as a sequence of columns, and return a dict {column: value}"""
row_dict = {'_parse_errors': []}
for column, index in self.column_map.items():
if index >= len(row):
# Sheets omits trailing columns if they're all empty, so substitute empty string
value = ''
else:
value = row[index]
if column in self.column_parsers:
try:
value = self.column_parsers[column](value)
except ValueError as e:
value = None
row_dict['_parse_errors'].append("Failed to parse column {}: {}".format(column, e))
row_dict[column] = value
# As a special case, add some implicit tags to the tags column.
# We prepend these to make it slightly more consistent for the editor,
# ie. it's always DAY, CATEGORY, POSTER_MOMENT, CUSTOM
row_dict['tags'] = (
[
row_dict['category'], # category name
worksheet, # sheet name
] + (['Poster Moment'] if row_dict['poster_moment'] else [])
+ row_dict['tags']
)
# As a special case, treat an end time of "--" as equal to the start time.
if row_dict["event_end"] == "--":
row_dict["event_end"] = row_dict["event_start"]
# Always include row index
row_dict["index"] = row_index
return row_dict
def sync_row(self, worksheet, row, event): def sync_row(self, worksheet, row, event):
"""Take a row dict and an Event from the database (or None if id not found) """Take a row dict and an Event from the database (or None if id not found)
and take whatever action is required to sync them, ie. writing to the database or sheet.""" and take whatever action is required to sync them, ie. writing to the database or sheet."""
@ -309,7 +174,7 @@ class SheetSync(object):
query(self.conn, built_query, sheet_name=worksheet, **row) query(self.conn, built_query, sheet_name=worksheet, **row)
rows_found.labels(worksheet).inc() rows_found.labels(worksheet).inc()
rows_changed.labels('insert', worksheet).inc() rows_changed.labels('insert', worksheet).inc()
self.mark_modified(worksheet) self.middleware.mark_modified(worksheet)
return return
rows_found.labels(worksheet).inc() rows_found.labels(worksheet).inc()
@ -341,7 +206,7 @@ class SheetSync(object):
)) ))
query(self.conn, built_query, **row) query(self.conn, built_query, **row)
rows_changed.labels('input', worksheet).inc() rows_changed.labels('input', worksheet).inc()
self.mark_modified(worksheet) self.middleware.mark_modified(worksheet)
# Update sheet with any changed outputs # Update sheet with any changed outputs
format_output = lambda v: '' if v is None else v # cast nulls to empty string format_output = lambda v: '' if v is None else v # cast nulls to empty string
@ -351,13 +216,12 @@ class SheetSync(object):
row['id'], ', '.join(changed) row['id'], ', '.join(changed)
)) ))
for col in changed: for col in changed:
self.sheets.write_value( self.middleware.write_value(
self.sheet_id, worksheet, worksheet, row,
row["index"], self.column_map[col], col, format_output(getattr(event, col)),
format_output(getattr(event, col)),
) )
rows_changed.labels('output', worksheet).inc() rows_changed.labels('output', worksheet).inc()
self.mark_modified(worksheet) self.middleware.mark_modified(worksheet)
# Set edit link if marked for editing and start/end set. # Set edit link if marked for editing and start/end set.
# This prevents accidents / clicking the wrong row and provides # This prevents accidents / clicking the wrong row and provides
@ -366,17 +230,11 @@ class SheetSync(object):
edit_link = self.edit_url.format(row['id']) if row['marked_for_edit'] == '[+] Marked' else '' edit_link = self.edit_url.format(row['id']) if row['marked_for_edit'] == '[+] Marked' else ''
if row['edit_link'] != edit_link: if row['edit_link'] != edit_link:
logging.info("Updating sheet row {} with edit link {}".format(row['id'], edit_link)) logging.info("Updating sheet row {} with edit link {}".format(row['id'], edit_link))
self.sheets.write_value( self.middleware.write_value(
self.sheet_id, worksheet, worksheet, row,
row["index"], self.column_map['edit_link'], "edit_link", edit_link,
edit_link,
) )
self.mark_modified(worksheet) self.middleware.mark_modified(worksheet)
def mark_modified(self, worksheet):
"""Mark worksheet as having had a change made, bumping it to the top of
the most-recently-modified queue."""
self.worksheets[worksheet] = monotonic()
class PlaylistSync: class PlaylistSync:
@ -521,17 +379,18 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh
sheets_creds = json.load(open(sheets_creds_file)) sheets_creds = json.load(open(sheets_creds_file))
sheets = Sheets( sheets_client = SheetsClient(
client_id=sheets_creds['client_id'], client_id=sheets_creds['client_id'],
client_secret=sheets_creds['client_secret'], client_secret=sheets_creds['client_secret'],
refresh_token=sheets_creds['refresh_token'], refresh_token=sheets_creds['refresh_token'],
) )
sheets_middleware = SheetsMiddleware(sheets_client, sheet_id, worksheet_names, bustime_start, allocate_ids)
workers = [ workers = [
SheetSync(stop, dbmanager, sheets, sheet_id, worksheet_names, edit_url, bustime_start, allocate_ids), SheetSync(sheets_middleware, stop, dbmanager, edit_url),
] ]
if playlist_worksheet: if playlist_worksheet:
workers.append(PlaylistSync(stop, dbmanager, sheets, sheet_id, playlist_worksheet)) workers.append(PlaylistSync(stop, dbmanager, sheets_client, sheet_id, playlist_worksheet))
jobs = [gevent.spawn(worker.run) for worker in workers] jobs = [gevent.spawn(worker.run) for worker in workers]
# Block until any one exits # Block until any one exits

@ -0,0 +1,234 @@
import logging
import uuid
from monotonic import monotonic
import common
from common.googleapis import GoogleAPIClient
class SheetsClient(object):
"""Manages Google Sheets API operations"""
def __init__(self, client_id, client_secret, refresh_token):
self.logger = logging.getLogger(type(self).__name__)
self.client = GoogleAPIClient(client_id, client_secret, refresh_token)
def get_rows(self, spreadsheet_id, sheet_name, range=None):
"""Return a list of rows, where each row is a list of the values of each column.
Range optionally restricts returned rows, and uses A1 format, eg. "A1:B5".
"""
if range:
range = "'{}'!{}".format(sheet_name, range)
else:
range = "'{}'".format(sheet_name)
resp = self.client.request('GET',
'https://sheets.googleapis.com/v4/spreadsheets/{}/values/{}'.format(
spreadsheet_id, range,
),
metric_name='get_rows',
)
resp.raise_for_status()
data = resp.json()
return data['values']
def write_value(self, spreadsheet_id, sheet_name, row, column, value):
"""Write value to the row and column (0-based) given."""
range = "'{sheet}'!{col}{row}:{col}{row}".format(
sheet = sheet_name,
row = row + 1, # 1-indexed rows in range syntax
col = self.index_to_column(column),
)
resp = self.client.request('PUT',
'https://sheets.googleapis.com/v4/spreadsheets/{}/values/{}'.format(
spreadsheet_id, range,
),
params={
"valueInputOption": "1", # RAW
},
json={
"range": range,
"values": [[value]],
},
metric_name='write_value',
)
resp.raise_for_status()
def index_to_column(self, index):
"""For a given column index, convert to a column description, eg. 0 -> A, 1 -> B, 26 -> AA."""
# This is equivalent to the 0-based index in base-26 (where A = 0, B = 1, ..., Z = 25)
digits = []
while index:
index, digit = divmod(index, 26)
digits.append(digit)
# We now have the digits, but they're backwards.
digits = digits[::-1]
# Now convert the digits to letters
digits = [chr(ord('A') + digit) for digit in digits]
# Finally, convert to string
return ''.join(digits)
class SheetsMiddleware():
# How many syncs of active sheets to do before checking inactive sheets.
# By checking inactive sheets less often, we stay within our API limits.
# For example, 4 syncs per inactive check * 5 seconds between syncs = 20s between inactive checks
SYNCS_PER_INACTIVE_CHECK = 4
# How many worksheets to keep "active" based on most recent modify time
ACTIVE_SHEET_COUNT = 2
# Expected quota usage per 100s =
# (100 / RETRY_INTERVAL) * ACTIVE_SHEET_COUNT
# + (100 / RETRY_INTERVAL / SYNCS_PER_INACTIVE_CHECK) * (len(worksheets) - ACTIVE_SHEET_COUNT)
# If playlist_worksheet is defined, add 1 to len(worksheets).
# For current values, this is 100/5 * 2 + 100/5/4 * 7 = 75
def __init__(self, client, sheet_id, worksheets, bustime_start, allocate_ids=False):
self.client = client
self.sheet_id = sheet_id
# map {worksheet: last modify time}
self.worksheets = {w: 0 for w in worksheets}
self.allocate_ids = allocate_ids
# Maps DB column names (or general identifier, for non-DB columns) to sheet column indexes.
# Hard-coded for now, future work: determine this from column headers in sheet
self.column_map = {
'event_start': 0,
'event_end': 1,
'category': 2,
'description': 3,
'submitter_winner': 4,
'poster_moment': 5,
'image_links': 6,
'marked_for_edit': 7,
'notes': 8,
'tags': 9,
'video_link': 11,
'state': 12,
'edit_link': 13,
'error': 14,
'id': 15,
}
# Maps column names to a function that parses that column's value.
# Functions take a single arg (the value to parse) and ValueError is
# interpreted as None.
# Columns missing from this map default to simply using the string value.
self.column_parsers = {
'event_start': lambda v: self.parse_bustime(v),
'event_end': lambda v: self.parse_bustime(v, preserve_dash=True),
'poster_moment': lambda v: v == '[\u2713]', # check mark
'image_links': lambda v: [link.strip() for link in v.split()] if v.strip() else [],
'tags': lambda v: [tag.strip() for tag in v.split(',') if tag.strip()],
'id': lambda v: uuid.UUID(v) if v.strip() else None,
}
# tracks when to do inactive checks
self.sync_count = 0
def parse_bustime(self, value, preserve_dash=False):
"""Convert from HH:MM or HH:MM:SS format to datetime.
If preserve_dash=True and value is "--", returns "--"
as a sentinel value instead of None. "" will still result in None.
"""
if not value.strip():
return None
if value.strip() == "--":
return "--" if preserve_dash else None
bustime = common.parse_bustime(value)
return common.bustime_to_dt(self.bustime_start, bustime)
def pick_worksheets(self):
"""Returns a list of worksheets to check, which may not be the same every time
for quota limit reasons."""
if self.sync_count % self.SYNCS_PER_INACTIVE_CHECK == 0:
# check all worksheets
worksheets = self.worksheets
else:
# only check most recently changed worksheets
worksheets = sorted(
self.worksheets.keys(), key=lambda k: self.worksheets[k], reverse=True,
)[:self.ACTIVE_SHEET_COUNT]
self.sync_count += 1
return worksheets
def get_rows(self, worksheet):
"""Fetch all rows of worksheet, parsed into a list of dicts."""
rows = self.sheets.get_rows(self.sheet_id, worksheet)
for row_index, row in enumerate(rows):
# Skip first row (ie. the column titles).
# Need to do it inside the loop and not eg. use rows[1:],
# because then row_index won't be correct.
if row_index == 0:
continue
row = self.parse_row(worksheet, row_index, row)
# Handle rows without an allocated id
if row['id'] is None:
# If a row is all empty (including no id), ignore it.
# Ignore the tags column for this check since it is never non-empty due to implicit tags
# (and even if there's other tags, we don't care if there's nothing else in the row).
if not any(row[col] for col in self.input_columns if col != 'tags'):
continue
# If we can't allocate ids, warn and ignore.
if not self.allocate_ids:
logging.warning(f"Row {worksheet!r}:{row['index']} has no valid id, skipping")
continue
# Otherwise, allocate id for a new row.
row['id'] = uuid.uuid4()
logging.info(f"Allocating id for row {worksheet!r}:{row['index']} = {row['id']}")
self.sheets.write_value(
self.sheet_id, worksheet,
row["index"], self.column_map['id'],
str(row['id']),
)
yield row
def parse_row(self, worksheet, row_index, row):
"""Take a row as a sequence of columns, and return a dict {column: value}"""
row_dict = {'_parse_errors': []}
for column, index in self.column_map.items():
if index >= len(row):
# Sheets omits trailing columns if they're all empty, so substitute empty string
value = ''
else:
value = row[index]
if column in self.column_parsers:
try:
value = self.column_parsers[column](value)
except ValueError as e:
value = None
row_dict['_parse_errors'].append("Failed to parse column {}: {}".format(column, e))
row_dict[column] = value
# As a special case, add some implicit tags to the tags column.
# We prepend these to make it slightly more consistent for the editor,
# ie. it's always DAY, CATEGORY, POSTER_MOMENT, CUSTOM
row_dict['tags'] = (
[
row_dict['category'], # category name
worksheet, # sheet name
] + (['Poster Moment'] if row_dict['poster_moment'] else [])
+ row_dict['tags']
)
# As a special case, treat an end time of "--" as equal to the start time.
if row_dict["event_end"] == "--":
row_dict["event_end"] = row_dict["event_start"]
# Always include row index
row_dict["index"] = row_index
return row_dict
def write_value(self, worksheet, row, key, value):
"""Write key=value to the given row, as identified by worksheet + row dict."""
return self.client.write_value(
self.sheet_id,
worksheet,
row["index"],
self.column_map[key],
value,
)
def mark_modified(self, worksheet):
"""Mark worksheet as having had a change made, bumping it to the top of
the most-recently-modified queue."""
self.worksheets[worksheet] = monotonic()
Loading…
Cancel
Save