From bb4cc8c6682862f3a24f801ae1bdb760c6a776cd Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Fri, 16 Aug 2024 01:35:50 +1000 Subject: [PATCH] sheetsync: Replace old special-case PlaylistSync with SheetSync subclass --- sheetsync/sheetsync/main.py | 93 ++++++++----------------------------- 1 file changed, 19 insertions(+), 74 deletions(-) diff --git a/sheetsync/sheetsync/main.py b/sheetsync/sheetsync/main.py index 31b559a..628363d 100644 --- a/sheetsync/sheetsync/main.py +++ b/sheetsync/sheetsync/main.py @@ -10,7 +10,6 @@ import gevent.event import prometheus_client as prom from monotonic import monotonic from psycopg2 import sql -from psycopg2.extras import execute_values from requests import HTTPError import common @@ -18,7 +17,7 @@ import common.dateutil from common.database import DBManager, query, get_column_placeholder from common.sheets import Sheets as SheetsClient -from .sheets import SheetsEventsMiddleware +from .sheets import SheetsEventsMiddleware, SheetsPlaylistsMiddleware from .streamlog import StreamLogClient, StreamLogMiddleware sheets_synced = prom.Counter( @@ -301,80 +300,20 @@ class EventsSync(SheetSync): super().sync_row(sheet_row, db_row) -class PlaylistSync: - # Time between syncs - RETRY_INTERVAL = 20 - - # Time to wait after getting an error - ERROR_RETRY_INTERVAL = 20 +class PlaylistsSync(SheetSync): - def __init__(self, stop, dbmanager, sheets, sheet_id, worksheet): - self.stop = stop - self.dbmanager = dbmanager - self.sheets = sheets - self.sheet_id = sheet_id - self.worksheet = worksheet - - def run(self): - self.conn = self.dbmanager.get_conn() + # Slower poll rate than events to avoid using large amounts of quota + retry_interval = 20 + error_retry_interval = 20 - while not self.stop.is_set(): - try: - sync_start = monotonic() - rows = self.sheets.get_rows(self.sheet_id, self.worksheet) - self.sync_playlists(rows) - except Exception as e: - # for HTTPErrors, http response body includes the more detailed error - detail = '' - if isinstance(e, HTTPError): - detail = ": {}".format(e.response.content) - logging.exception("Failed to sync{}".format(detail)) - sync_errors.labels("playlists").inc() - # To ensure a fresh slate and clear any DB-related errors, get a new conn on error. - # This is heavy-handed but simple and effective. - # If we can't re-connect, the program will crash from here, - # then restart and wait until it can connect again. - self.conn = self.dbmanager.get_conn() - wait(self.stop, sync_start, self.ERROR_RETRY_INTERVAL) - else: - logging.info("Successful sync of playlists") - sheets_synced.labels("playlists").inc() - sheet_sync_duration.labels("playlists").observe(monotonic() - sync_start) - wait(self.stop, sync_start, self.RETRY_INTERVAL) - - def sync_playlists(self, rows): - """Parse rows with a valid playlist id and at least one tag, - overwriting the entire playlists table""" - playlists = [] - for row in rows: - if len(row) == 5: - tags, _, name, playlist_id, show_in_description = row - elif len(row) == 4: - tags, _, name, playlist_id = row - show_in_description = "" - else: - continue - tags = [tag.strip() for tag in tags.split(',') if tag.strip()] - if not tags: - continue - # special-case for the "all everything" list, - # we don't want "no tags" to mean "all videos" so we need a sentinel value. - if tags == [""]: - tags = [] - playlist_id = playlist_id.strip() - if len(playlist_id) != 34 or not playlist_id.startswith('PL'): - continue - show_in_description = show_in_description == "[✓]" - playlists.append((playlist_id, tags, name, show_in_description)) - # We want to wipe and replace all the current entries in the table. - # The easiest way to do this is a DELETE then an INSERT, all within a transaction. - # The "with" block will perform everything under it within a transaction, rolling back - # on error or committing on exit. - logging.info("Updating playlists table with {} playlists".format(len(playlists))) - with self.conn: - query(self.conn, "DELETE FROM playlists") - execute_values(self.conn.cursor(), "INSERT INTO playlists(playlist_id, tags, name, show_in_description) VALUES %s", playlists) + table = "playlists" + input_columns = { + "playlist_id", + "tags", + "name", + "show_in_description", + } @argh.arg('dbconnect', help= @@ -460,8 +399,14 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0): config.get("allocate_ids", False), ) if "playlist_worksheet" in config: + middleware = SheetsPlaylistsMiddleware( + client, + config["sheet_id"], + [config["playlist_worksheet"]], + config.get("allocate_ids", False), + ) workers.append( - PlaylistSync(stop, dbmanager, client, config["sheet_id"], config["playlist_worksheet"]) + PlaylistsSync("playlists", middleware, stop, dbmanager, config.get("reverse_sync", False)) ) elif config["type"] == "streamlog": auth_token = open(config["creds"]).read().strip()