sheetsync fixes

typos and omissions
pull/401/head
Mike Lang 3 months ago committed by Mike Lang
parent d5f73c226c
commit 367e6a7a7a

@ -159,7 +159,7 @@ class SheetSync(object):
| self.output_columns | self.output_columns
| self.metrics_columns | self.metrics_columns
), ),
sql.Identifier("table"), sql.Identifier(self.table),
) )
result = query(self.conn, built_query) result = query(self.conn, built_query)
by_id = {} by_id = {}
@ -207,7 +207,7 @@ class SheetSync(object):
self.logger.info("Skipping db row {} without any matching sheet row".format(db_row.id)) self.logger.info("Skipping db row {} without any matching sheet row".format(db_row.id))
return return
self.logger.info("Adding new row {}".format(db_row.id)) self.logger.info("Adding new row {}".format(db_row.id))
sheet_row = self.middleware.create_row(db_row.sheet_name, db_row.id) sheet_row = self.middleware.create_row(db_row)
worksheet = sheet_row["sheet_name"] worksheet = sheet_row["sheet_name"]
rows_found.labels(self.name, worksheet).inc() rows_found.labels(self.name, worksheet).inc()
@ -222,12 +222,11 @@ class SheetSync(object):
UPDATE {} UPDATE {}
SET {} SET {}
WHERE id = %(id)s WHERE id = %(id)s
""").format(sql.SQL(", ").join( """).format(sql.Identifier(self.table), sql.SQL(", ").join([
[sql.Identifer(self.table)] + sql.SQL("{} = {}").format(
[sql.SQL("{} = {}").format(
sql.Identifier(col), get_column_placeholder(col) sql.Identifier(col), get_column_placeholder(col)
) for col in changed] ) for col in changed
)) ]))
query(self.conn, built_query, **sheet_row) query(self.conn, built_query, **sheet_row)
rows_changed.labels(self.name, 'input', worksheet).inc() rows_changed.labels(self.name, 'input', worksheet).inc()
self.middleware.mark_modified(sheet_row) self.middleware.mark_modified(sheet_row)
@ -250,7 +249,7 @@ class SheetSync(object):
class EventsSync(SheetSync): class EventsSync(SheetSync):
table = "events" table = "events"
input_columns = { input_columns = {
'sheet_name' 'sheet_name',
'event_start', 'event_start',
'event_end', 'event_end',
'category', 'category',
@ -411,19 +410,22 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
workers = [] workers = []
for config in sync_configs: for config in sync_configs:
reverse_sync = config.get("reverse_sync", False)
if config["backend"] == "sheets": if config["backend"] == "sheets":
allocate_ids = config.get("allocate_ids", False)
if allocate_ids and reverse_sync:
raise ValueError("Cannot combine allocate_ids and reverse_sync")
creds = json.load(open(config["creds"])) creds = json.load(open(config["creds"]))
client = SheetsClient( client = SheetsClient(
client_id=creds['client_id'], client_id=creds['client_id'],
client_secret=creds['client_secret'], client_secret=creds['client_secret'],
refresh_token=creds['refresh_token'], refresh_token=creds['refresh_token'],
) )
allocate_ids = config.get("allocate_ids", False) if config["type"] in ("events", "archive"):
if config["type"] in ("sheets", "archive"):
middleware_cls = { middleware_cls = {
"sheets": SheetsEventsMiddleware, "events": SheetsEventsMiddleware,
"archive": SheetsArchiveMiddleware, "archive": SheetsArchiveMiddleware,
} }[config["type"]]
middleware = middleware_cls( middleware = middleware_cls(
client, client,
config["sheet_id"], config["sheet_id"],
@ -436,7 +438,7 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
middleware = SheetsPlaylistsMiddleware( middleware = SheetsPlaylistsMiddleware(
client, client,
config["sheet_id"], config["sheet_id"],
[config["playlist_worksheet"]], config["worksheets"],
config.get("allocate_ids", False), config.get("allocate_ids", False),
) )
else: else:
@ -464,7 +466,6 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
"playlists": PlaylistsSync, "playlists": PlaylistsSync,
"archive": ArchiveSync, "archive": ArchiveSync,
}[config["type"]] }[config["type"]]
reverse_sync = config.get("reverse_sync", False)
sync = sync_class(config["name"], middleware, stop, dbmanager, reverse_sync) sync = sync_class(config["name"], middleware, stop, dbmanager, reverse_sync)
workers.append(sync) workers.append(sync)

@ -37,7 +37,7 @@ class Middleware:
Intended as a way to keep track of recently-changed rows for quota optimization.""" Intended as a way to keep track of recently-changed rows for quota optimization."""
pass pass
def create_row(self, worksheet, id): def create_row(self, row):
"""Create a new row with given id in the given worksheet and return it. """Create a new blank row with id and worksheet determined from the given db row.
Only used for reverse sync.""" Only used for reverse sync."""
raise NotImplementedError raise NotImplementedError

@ -86,6 +86,8 @@ class SheetsMiddleware(Middleware):
if row_index < self.header_rows: if row_index < self.header_rows:
continue continue
row = self.parse_row(worksheet, row_index, row) row = self.parse_row(worksheet, row_index, row)
if row is None:
continue
# Handle rows without an allocated id # Handle rows without an allocated id
if row['id'] is None: if row['id'] is None:
@ -117,7 +119,8 @@ class SheetsMiddleware(Middleware):
) )
def parse_row(self, worksheet, row_index, row): def parse_row(self, worksheet, row_index, row):
"""Take a row as a sequence of columns, and return a dict {column: value}""" """Take a row as a sequence of columns, and return a dict {column: value}.
May return None to skip the row (used by subclasses)."""
row_dict = { row_dict = {
"sheet_name": worksheet, "sheet_name": worksheet,
"index": row_index, "index": row_index,
@ -154,7 +157,9 @@ class SheetsMiddleware(Middleware):
the most-recently-modified queue.""" the most-recently-modified queue."""
self.worksheets[row["sheet_name"]] = monotonic() self.worksheets[row["sheet_name"]] = monotonic()
def create_row(self, worksheet, id): def _create_row(self, worksheet, id):
"""Because the way we get the worksheet name differs for events vs playlists,
we have the common code here and defer extracting the worksheet and id to per-type implementations"""
unassigned_rows = self.unassigned_rows.get(worksheet, []) unassigned_rows = self.unassigned_rows.get(worksheet, [])
if not unassigned_rows: if not unassigned_rows:
raise Exception(f"Worksheet {worksheet} has no available space to create a new row in, or it wasn't fetched") raise Exception(f"Worksheet {worksheet} has no available space to create a new row in, or it wasn't fetched")
@ -185,6 +190,8 @@ def check_playlist(playlist_id):
class SheetsPlaylistsMiddleware(SheetsMiddleware): class SheetsPlaylistsMiddleware(SheetsMiddleware):
header_rows = 2
column_map = { column_map = {
"tags": 0, "tags": 0,
"description": 1, "description": 1,
@ -206,6 +213,10 @@ class SheetsPlaylistsMiddleware(SheetsMiddleware):
), ),
"playlist_id": check_playlist, "playlist_id": check_playlist,
"show_in_description": PARSE_CHECKMARK, "show_in_description": PARSE_CHECKMARK,
"first_event_id": EMPTY_IS_NONE,
"last_event_id": EMPTY_IS_NONE,
"error": EMPTY_IS_NONE,
"id": EMPTY_IS_NONE,
} }
column_encode = { column_encode = {
@ -217,13 +228,27 @@ class SheetsPlaylistsMiddleware(SheetsMiddleware):
"show_in_description": ENCODE_CHECKMARK, "show_in_description": ENCODE_CHECKMARK,
} }
def parse_row(self, worksheet, row_index, row):
row = super().parse_row(worksheet, row_index, row)
if row["id"] == "<ignore>":
# Special case, row is marked to be ignored
return None
return row
def row_was_expected(self, db_row, worksheets): def row_was_expected(self, db_row, worksheets):
# Database does not record a worksheet for playlists, we assume there's only one # Database does not record a worksheet for playlists, we assume there's only one
# sheet and so it should always be there. # sheet and so it should always be there.
return True return True
def row_is_non_empty(self, row): def row_is_non_empty(self, row):
return row["tags"] is not None return row["tags"] is not None or any(
row[key] for key in ("description", "name", "playlist_id")
)
def create_row(self, row):
# Always create in the first worksheet. We should only have one anyway.
worksheet = list(self.worksheets.keys())[0]
return self._create_row(worksheet, row.id)
class SheetsEventsMiddleware(SheetsMiddleware): class SheetsEventsMiddleware(SheetsMiddleware):
@ -321,20 +346,32 @@ class SheetsEventsMiddleware(SheetsMiddleware):
# Also clear it if it shouldn't be set. # Also clear it if it shouldn't be set.
# We do this here instead of in sync_row() because it's Sheets-specific logic # We do this here instead of in sync_row() because it's Sheets-specific logic
# that doesn't depend on the DB event in any way. # that doesn't depend on the DB event in any way.
edit_link = self.edit_url.format(row['id']) if self.show_edit_url(row) else '' edit_link = self.edit_url.format(row_dict['id']) if self.show_edit_url(row_dict) else ''
if row['edit_link'] != edit_link: if row_dict['edit_link'] != edit_link:
logging.info("Updating sheet row {} with edit link {}".format(row['id'], edit_link)) logging.info("Updating sheet row {} with edit link {}".format(row_dict['id'], edit_link))
self.write_value(row, "edit_link", edit_link) self.write_value(row_dict, "edit_link", edit_link)
self.mark_modified(row) self.mark_modified(row_dict)
return row_dict return row_dict
def show_edit_url(self, row): def show_edit_url(self, row):
return row['marked_for_edit'] == '[+] Marked' return row['marked_for_edit'] == '[+] Marked'
def write_value(self, row, key, value):
# Undo the implicitly added tags
if key == "tags":
value = value[2:]
if row["poster_moment"]:
value = value[1:]
return super().write_value(row, key, value)
def create_row(self, row):
return self._create_row(row.sheet_name, row.id)
class SheetsArchiveMiddleware(SheetsEventsMiddleware): class SheetsArchiveMiddleware(SheetsEventsMiddleware):
# Archive sheet is similar to events sheet but is missing some columns. # Archive sheet is similar to events sheet but is missing some columns.
header_rows = 3
column_map = { column_map = {
'event_start': 0, 'event_start': 0,
'event_end': 1, 'event_end': 1,

Loading…
Cancel
Save