sheetsync: Change how options are specified to allow multiple backends / syncs

pull/401/head
Mike Lang 1 year ago committed by Mike Lang
parent 74869de89d
commit 986a1db964

@ -434,16 +434,18 @@
// Args for the sheetsync // Args for the sheetsync
command: [ command: [
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
"--allocate-ids",
$.db_connect, $.db_connect,
"--playlist-worksheet", $.playlist_worksheet, std.manifestJson({
"/etc/wubloader-creds.json", type: "sheets",
$.edit_url, creds: "/etc/wubloader-creds.json",
$.bustime_start, sheet_id: $.sheet_id,
$.sheet_id, worksheets: $.worksheets,
] allocate_ids: true,
+ $.worksheets edit_url: $.edit_url,
+ (if $.archive_worksheet != null then ["--archive-worksheet", $.archive_worksheet] else []), bustime_start: $.bustime_start,
playlist_worksheet: $.playlist_worksheet,
}),
],
volumes: [ volumes: [
// Mount the creds file into /etc // Mount the creds file into /etc
"%s:/etc/wubloader-creds.json" % $.sheetsync_creds_file, "%s:/etc/wubloader-creds.json" % $.sheetsync_creds_file,

@ -18,6 +18,7 @@ import common.dateutil
from common.database import DBManager, query, get_column_placeholder from common.database import DBManager, query, get_column_placeholder
from .sheets import SheetsClient, SheetsMiddleware from .sheets import SheetsClient, SheetsMiddleware
from .streamlog import StreamLogClient, StreamLogMiddleware
sheets_synced = prom.Counter( sheets_synced = prom.Counter(
'sheets_synced', 'sheets_synced',
@ -331,32 +332,31 @@ class PlaylistSync:
"list of key=value pairs, or a URI like:\n" "list of key=value pairs, or a URI like:\n"
"\tpostgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE" "\tpostgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE"
) )
@argh.arg('sheets-creds-file', help= @argh.arg('sync-configs',
"sheets_creds_file should be a json file containing keys " metavar="SYNC-CONFIG",
"'client_id', 'client_secret' and 'refresh_token'." nargs="+",
type=json.loads,
help="\n".join([
'A JSON object describing a sync operation to perform. One of:',
' type: "sheets"',
' creds: path to credentials JSON file containing "client_id", "client_secret" and "refresh_token"',
' sheet_id: The id of the Google Sheet to use',
' worksheets: List of worksheets within that sheet to sync',
' edit_url: a format string for edit links, with {} as a placeholder for id',
' bustime_start: Timestamp string at which bustime is 00:00',
' allocate_ids: Boolean, optional. When true, will give rows without ids an id.',
' Only one sheetsync acting on the same sheet should have this enabled.',
' reverse_sync: Boolean, optional. When true, enables an alternate mode',
' where all data is synced from the database to the sheet',
' playlist_worksheet: An optional additional worksheet name that holds playlist tag definitions',
'or:',
' type: streamlog',
' creds: path to a file containing the auth key',
' url: The URL of the streamlog server',
' event_name: The name of the streamlog event to sync',
]),
) )
@argh.arg('edit-url', help= def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
'edit_url should be a format string for edit links, with {} as a placeholder for id. '
'eg. "https://myeditor.example.com/edit/{}" will produce edit urls like '
'"https://myeditor.example.com/edit/da6cf4df-4871-4a9a-a660-0b1e1a6a9c10".'
)
@argh.arg('bustime_start', type=common.dateutil.parse, help=
"bustime_start is the timestamp which is bustime 00:00."
)
@argh.arg('worksheet-names', nargs='+', help=
"The names of the individual worksheets within the sheet to operate on."
)
@argh.arg('--allocate-ids', help=
"--allocate-ids means that it will give rows without ids an id. "
"Only one sheet sync should have --allocate-ids on for a given sheet at once!"
)
@argh.arg('--playlist-worksheet', help=
"An optional additional worksheet name that holds playlist tag definitions",
)
@argh.arg('--reverse-sync', help=
"Enables an alternate mode where all data is synced from the database to the sheet",
)
def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksheet_names, metrics_port=8005, backdoor_port=0, allocate_ids=False, playlist_worksheet=None, reverse_sync=False):
""" """
Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet
to the DB and outputs from the DB to the sheet. to the DB and outputs from the DB to the sheet.
@ -391,20 +391,41 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh
dbmanager.put_conn(conn) dbmanager.put_conn(conn)
break break
sheets_creds = json.load(open(sheets_creds_file)) workers = []
sheets_client = SheetsClient( for config in sync_configs:
client_id=sheets_creds['client_id'], if config["type"] == "sheets":
client_secret=sheets_creds['client_secret'], creds = json.load(open(config["creds"]))
refresh_token=sheets_creds['refresh_token'], client = SheetsClient(
client_id=creds['client_id'],
client_secret=creds['client_secret'],
refresh_token=creds['refresh_token'],
)
middleware = SheetsMiddleware(
client,
config["sheet_id"],
config["worksheets"],
common.dateutil.parse(config["bustime_start"]),
config["edit_url"],
config["allocate_ids"],
)
if "playlist_worksheet" in config:
workers.append(
PlaylistSync(stop, dbmanager, client, config["sheet_id"], config["playlist_worksheet"])
)
elif config["type"] == "streamlog":
auth_token = open(config["creds"]).read().strip()
client = StreamLogClient(
config["url"],
config["event_name"],
auth_token,
)
middleware = StreamLogMiddleware(client)
else:
raise ValueError("Unknown type {!r}".format(config["type"]))
workers.append(
SheetSync(middleware, stop, dbmanager, config["reverse_sync"]),
) )
sheets_middleware = SheetsMiddleware(sheets_client, sheet_id, worksheet_names, bustime_start, edit_url, allocate_ids)
workers = [
SheetSync(sheets_middleware, stop, dbmanager, reverse_sync),
]
if playlist_worksheet:
workers.append(PlaylistSync(stop, dbmanager, sheets_client, sheet_id, playlist_worksheet))
jobs = [gevent.spawn(worker.run) for worker in workers] jobs = [gevent.spawn(worker.run) for worker in workers]
# Block until any one exits # Block until any one exits

Loading…
Cancel
Save