From a5745f7bbd911c504dbc189cf5152e7ac3fdb053 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 3 Oct 2023 02:10:27 +1100 Subject: [PATCH] sheetsync: Change how options are specified to allow multiple backends / syncs --- docker-compose.jsonnet | 20 ++++---- sheetsync/sheetsync/main.py | 97 ++++++++++++++++++++++--------------- 2 files changed, 70 insertions(+), 47 deletions(-) diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index 1177bc4..e9947cc 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -434,16 +434,18 @@ // Args for the sheetsync command: [ "--backdoor-port", std.toString($.backdoor_port), - "--allocate-ids", $.db_connect, - "--playlist-worksheet", $.playlist_worksheet, - "/etc/wubloader-creds.json", - $.edit_url, - $.bustime_start, - $.sheet_id, - ] - + $.worksheets - + (if $.archive_worksheet != null then ["--archive-worksheet", $.archive_worksheet] else []), + std.manifestJson({ + type: "sheets", + creds: "/etc/wubloader-creds.json", + sheet_id: $.sheet_id, + worksheets: $.worksheets, + allocate_ids: true, + edit_url: $.edit_url, + bustime_start: $.bustime_start, + playlist_worksheet: $.playlist_worksheet, + }), + ], volumes: [ // Mount the creds file into /etc "%s:/etc/wubloader-creds.json" % $.sheetsync_creds_file, diff --git a/sheetsync/sheetsync/main.py b/sheetsync/sheetsync/main.py index d72f393..76bd00f 100644 --- a/sheetsync/sheetsync/main.py +++ b/sheetsync/sheetsync/main.py @@ -18,6 +18,7 @@ import common.dateutil from common.database import DBManager, query, get_column_placeholder from .sheets import SheetsClient, SheetsMiddleware +from .streamlog import StreamLogClient, StreamLogMiddleware sheets_synced = prom.Counter( 'sheets_synced', @@ -331,32 +332,31 @@ class PlaylistSync: "list of key=value pairs, or a URI like:\n" "\tpostgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE" ) -@argh.arg('sheets-creds-file', help= - "sheets_creds_file should be a json file containing keys " - "'client_id', 'client_secret' and 'refresh_token'." +@argh.arg('sync-configs', + metavar="SYNC-CONFIG", + nargs="+", + type=json.loads, + help="\n".join([ + 'A JSON object describing a sync operation to perform. One of:', + ' type: "sheets"', + ' creds: path to credentials JSON file containing "client_id", "client_secret" and "refresh_token"', + ' sheet_id: The id of the Google Sheet to use', + ' worksheets: List of worksheets within that sheet to sync', + ' edit_url: a format string for edit links, with {} as a placeholder for id', + ' bustime_start: Timestamp string at which bustime is 00:00', + ' allocate_ids: Boolean, optional. When true, will give rows without ids an id.', + ' Only one sheetsync acting on the same sheet should have this enabled.', + ' reverse_sync: Boolean, optional. When true, enables an alternate mode', + ' where all data is synced from the database to the sheet', + ' playlist_worksheet: An optional additional worksheet name that holds playlist tag definitions', + 'or:', + ' type: streamlog', + ' creds: path to a file containing the auth key', + ' url: The URL of the streamlog server', + ' event_name: The name of the streamlog event to sync', + ]), ) -@argh.arg('edit-url', help= - 'edit_url should be a format string for edit links, with {} as a placeholder for id. ' - 'eg. "https://myeditor.example.com/edit/{}" will produce edit urls like ' - '"https://myeditor.example.com/edit/da6cf4df-4871-4a9a-a660-0b1e1a6a9c10".' -) -@argh.arg('bustime_start', type=common.dateutil.parse, help= - "bustime_start is the timestamp which is bustime 00:00." -) -@argh.arg('worksheet-names', nargs='+', help= - "The names of the individual worksheets within the sheet to operate on." -) -@argh.arg('--allocate-ids', help= - "--allocate-ids means that it will give rows without ids an id. " - "Only one sheet sync should have --allocate-ids on for a given sheet at once!" -) -@argh.arg('--playlist-worksheet', help= - "An optional additional worksheet name that holds playlist tag definitions", -) -@argh.arg('--reverse-sync', help= - "Enables an alternate mode where all data is synced from the database to the sheet", -) -def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksheet_names, metrics_port=8005, backdoor_port=0, allocate_ids=False, playlist_worksheet=None, reverse_sync=False): +def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0): """ Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet to the DB and outputs from the DB to the sheet. @@ -391,20 +391,41 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh dbmanager.put_conn(conn) break - sheets_creds = json.load(open(sheets_creds_file)) - - sheets_client = SheetsClient( - client_id=sheets_creds['client_id'], - client_secret=sheets_creds['client_secret'], - refresh_token=sheets_creds['refresh_token'], - ) - sheets_middleware = SheetsMiddleware(sheets_client, sheet_id, worksheet_names, bustime_start, edit_url, allocate_ids) + workers = [] - workers = [ - SheetSync(sheets_middleware, stop, dbmanager, reverse_sync), - ] - if playlist_worksheet: - workers.append(PlaylistSync(stop, dbmanager, sheets_client, sheet_id, playlist_worksheet)) + for config in sync_configs: + if config["type"] == "sheets": + creds = json.load(open(config["creds"])) + client = SheetsClient( + client_id=creds['client_id'], + client_secret=creds['client_secret'], + refresh_token=creds['refresh_token'], + ) + middleware = SheetsMiddleware( + client, + config["sheet_id"], + config["worksheets"], + common.dateutil.parse(config["bustime_start"]), + config["edit_url"], + config["allocate_ids"], + ) + if "playlist_worksheet" in config: + workers.append( + PlaylistSync(stop, dbmanager, client, config["sheet_id"], config["playlist_worksheet"]) + ) + elif config["type"] == "streamlog": + auth_token = open(config["creds"]).read().strip() + client = StreamLogClient( + config["url"], + config["event_name"], + auth_token, + ) + middleware = StreamLogMiddleware(client) + else: + raise ValueError("Unknown type {!r}".format(config["type"])) + workers.append( + SheetSync(middleware, stop, dbmanager, config["reverse_sync"]), + ) jobs = [gevent.spawn(worker.run) for worker in workers] # Block until any one exits