From 74869de89d863387ff19a9ef58839dca07da56ec Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 3 Oct 2023 01:02:11 +1100 Subject: [PATCH] Implement reverse sync mode This is a mode where all data flows one-way from the database to the sheet. It is intended to be used to populate an empty sheet from database events, possibly sourced from somewhere else. To make this work, a few changes were required: * Track which ids we've seen so we know what events were not matched with a row * Allow `row` to be None in sync_rows * When it is, call the middleware to create a new row with a new id * In sheets, this is implemented by tracking the last empty rows we saw, and claiming them as needed. --- sheetsync/sheetsync/main.py | 50 ++++++++++++++++++++++++++------ sheetsync/sheetsync/sheets.py | 29 ++++++++++++++---- sheetsync/sheetsync/streamlog.py | 3 ++ 3 files changed, 67 insertions(+), 15 deletions(-) diff --git a/sheetsync/sheetsync/main.py b/sheetsync/sheetsync/main.py index d3942d4..d72f393 100644 --- a/sheetsync/sheetsync/main.py +++ b/sheetsync/sheetsync/main.py @@ -69,10 +69,11 @@ class SheetSync(object): # Time to wait after getting an error ERROR_RETRY_INTERVAL = 10 - def __init__(self, middleware, stop, dbmanager): + def __init__(self, middleware, stop, dbmanager, reverse_sync=False): self.middleware = middleware self.stop = stop self.dbmanager = dbmanager + self.create_missing_ids = False # List of input columns self.input_columns = [ 'event_start', @@ -91,6 +92,14 @@ class SheetSync(object): 'state', 'error', ] + if reverse_sync: + # Reverse Sync refers to copying all event data from the database into the sheet, + # instead of it (mostly) being the other way. In particular: + # - All columns become output columns + # - We are allowed to create new sheet rows for database events if they don't exist. + self.create_missing_ids = True + self.output_columns += self.input_columns + self.input_columns = [] def run(self): self.conn = self.dbmanager.get_conn() @@ -103,10 +112,18 @@ class SheetSync(object): # each row is more expensive than the cost of just grabbing the entire table # and comparing locally. events = self.get_events() + seen = set() for row in self.middleware.get_rows(): + if row['id'] in seen: + logging.error("Duplicate id {}, skipping".format(row['id'])) + continue + seen.add(row['id']) self.sync_row(row, events.get(row['id'])) + for event in [e for id, e in events.items() if id not in seen]: + self.sync_row(event["sheet_name"], None, event) + except Exception as e: # for HTTPErrors, http response body includes the more detailed error detail = '' @@ -151,11 +168,14 @@ class SheetSync(object): return by_id def sync_row(self, row, event): - """Take a row dict and an Event from the database (or None if id not found) - and take whatever action is required to sync them, ie. writing to the database or sheet.""" - worksheet = row["sheet_name"] + """Take a row dict (or None) and an Event from the database (or None) + and take whatever action is required to sync them, ie. writing to the database or sheet. + At least one of row and event must be non-None. + """ if event is None: + assert row + worksheet = row["sheet_name"] # No event currently in DB, create it. logging.info("Inserting new event {}".format(row['id'])) # Insertion conflict just means that another sheet sync beat us to the insert. @@ -175,6 +195,15 @@ class SheetSync(object): self.middleware.mark_modified(worksheet) return + if row is None: + assert event + if not self.create_missing_ids: + logging.info("Skipping event {} without any matching row".format(event["id"])) + return + logging.info("Adding new row {}".format(event["id"])) + row = self.middleware.create_row(event["sheet_name"], event["id"]) + + worksheet = row["sheet_name"] rows_found.labels(worksheet).inc() # If no database error, but we have parse errors, indicate they should be displayed. @@ -188,7 +217,7 @@ class SheetSync(object): event = event._replace(state='UNLISTED') # Update database with any changed inputs - changed = [col for col in self.input_columns if row[col] != getattr(event, col)] + changed = [col for col in self.input_columns if row.get(col) != getattr(event, col)] if changed: logging.info("Updating event {} with new value(s) for {}".format( row['id'], ', '.join(changed) @@ -208,7 +237,7 @@ class SheetSync(object): # Update sheet with any changed outputs format_output = lambda v: '' if v is None else v # cast nulls to empty string - changed = [col for col in self.output_columns if row[col] != format_output(getattr(event, col))] + changed = [col for col in self.output_columns if row.get(col) != format_output(getattr(event, col))] if changed: logging.info("Updating sheet row {} with new value(s) for {}".format( row['id'], ', '.join(changed) @@ -324,12 +353,15 @@ class PlaylistSync: @argh.arg('--playlist-worksheet', help= "An optional additional worksheet name that holds playlist tag definitions", ) -def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksheet_names, metrics_port=8005, backdoor_port=0, allocate_ids=False, playlist_worksheet=None): +@argh.arg('--reverse-sync', help= + "Enables an alternate mode where all data is synced from the database to the sheet", +) +def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksheet_names, metrics_port=8005, backdoor_port=0, allocate_ids=False, playlist_worksheet=None, reverse_sync=False): """ Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet to the DB and outputs from the DB to the sheet. - With the exception of id allocation, all operations are idempotent and multiple sheet syncs + With the exception of id allocation or reverse sync mode, all operations are idempotent and multiple sheet syncs may be run for redundancy. """ common.PromLogCountsHandler.install() @@ -369,7 +401,7 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh sheets_middleware = SheetsMiddleware(sheets_client, sheet_id, worksheet_names, bustime_start, edit_url, allocate_ids) workers = [ - SheetSync(sheets_middleware, stop, dbmanager), + SheetSync(sheets_middleware, stop, dbmanager, reverse_sync), ] if playlist_worksheet: workers.append(PlaylistSync(stop, dbmanager, sheets_client, sheet_id, playlist_worksheet)) diff --git a/sheetsync/sheetsync/sheets.py b/sheetsync/sheetsync/sheets.py index edd9b68..3cd5889 100644 --- a/sheetsync/sheetsync/sheets.py +++ b/sheetsync/sheetsync/sheets.py @@ -155,6 +155,8 @@ class SheetsMiddleware(): def get_rows(self): """Fetch all rows of worksheet, parsed into a list of dicts.""" + # Clear previously seen unassigned rows + self.unassigned_rows = {} for worksheet in self.pick_worksheets(): rows = self.sheets.get_rows(self.sheet_id, worksheet) for row_index, row in enumerate(rows): @@ -167,10 +169,11 @@ class SheetsMiddleware(): # Handle rows without an allocated id if row['id'] is None: - # If a row is all empty (including no id), ignore it. + # If a row is all empty (including no id), ignore it and mark it down for possible use in create_row(). # Ignore the tags column for this check since it is never non-empty due to implicit tags # (and even if there's other tags, we don't care if there's nothing else in the row). if not any(row[col] for col in self.input_columns if col != 'tags'): + self.unassigned_rows.setdefault(worksheet, []).append(row["index"]) continue # If we can't allocate ids, warn and ignore. if not self.allocate_ids: @@ -179,11 +182,7 @@ class SheetsMiddleware(): # Otherwise, allocate id for a new row. row['id'] = str(uuid.uuid4()) logging.info(f"Allocating id for row {worksheet!r}:{row['index']} = {row['id']}") - self.sheets.write_value( - self.sheet_id, worksheet, - row["index"], self.column_map['id'], - str(row['id']), - ) + self.write_id(row) # Set edit link if marked for editing and start/end set. # This prevents accidents / clicking the wrong row and provides @@ -199,6 +198,13 @@ class SheetsMiddleware(): yield row + def write_id(self, row): + self.sheets.write_value( + self.sheet_id, row["sheet_name"], + row["index"], self.column_map['id'], + str(row['id']), + ) + def parse_row(self, worksheet, row_index, row): """Take a row as a sequence of columns, and return a dict {column: value}""" row_dict = {'_parse_errors': []} @@ -247,3 +253,14 @@ class SheetsMiddleware(): """Mark row as having had a change made, bumping its worksheet to the top of the most-recently-modified queue.""" self.worksheets[row["sheet_name"]] = monotonic() + + def create_row(self, worksheet, id): + index = self.unassigned_rows[worksheet].pop(0) + row = { + "sheet_name": worksheet, + "id": id, + "index": index, + } + logging.info(f"Assigning existing id {row['id']} to empty row {worksheet!r}:{row['index']}") + self.write_id(row) + return row diff --git a/sheetsync/sheetsync/streamlog.py b/sheetsync/sheetsync/streamlog.py index e82afef..ee5e338 100644 --- a/sheetsync/sheetsync/streamlog.py +++ b/sheetsync/sheetsync/streamlog.py @@ -104,3 +104,6 @@ class StreamLogMiddleware: def mark_modified(self, row): pass # not a concept we have + + def create_row(self, worksheet, id): + raise NotImplementedError