|
|
|
@ -18,7 +18,7 @@ from common.database import DBManager, query, get_column_placeholder
|
|
|
|
|
from common.sheets import Sheets as SheetsClient
|
|
|
|
|
|
|
|
|
|
from .sheets import SheetsEventsMiddleware, SheetsPlaylistsMiddleware
|
|
|
|
|
from .streamlog import StreamLogClient, StreamLogMiddleware
|
|
|
|
|
from .streamlog import StreamLogClient, StreamLogEventsMiddleware, StreamLogPlaylistsMiddleware
|
|
|
|
|
|
|
|
|
|
sheets_synced = prom.Counter(
|
|
|
|
|
'sheets_synced',
|
|
|
|
@ -325,25 +325,30 @@ class PlaylistsSync(SheetSync):
|
|
|
|
|
metavar="SYNC-CONFIG",
|
|
|
|
|
nargs="+",
|
|
|
|
|
type=json.loads,
|
|
|
|
|
help="\n".join([
|
|
|
|
|
'A JSON object describing a sync operation to perform. One of:',
|
|
|
|
|
' type: "sheets"',
|
|
|
|
|
' creds: path to credentials JSON file containing "client_id", "client_secret" and "refresh_token"',
|
|
|
|
|
' sheet_id: The id of the Google Sheet to use',
|
|
|
|
|
' worksheets: List of worksheets within that sheet to sync',
|
|
|
|
|
' edit_url: a format string for edit links, with {} as a placeholder for id',
|
|
|
|
|
' bustime_start: Timestamp string at which bustime is 00:00',
|
|
|
|
|
' allocate_ids: Boolean, optional. When true, will give rows without ids an id.',
|
|
|
|
|
' Only one sheetsync acting on the same sheet should have this enabled.',
|
|
|
|
|
' reverse_sync: Boolean, optional. When true, enables an alternate mode',
|
|
|
|
|
' where all data is synced from the database to the sheet',
|
|
|
|
|
' playlist_worksheet: An optional additional worksheet name that holds playlist tag definitions',
|
|
|
|
|
'or:',
|
|
|
|
|
' type: streamlog',
|
|
|
|
|
' creds: path to a file containing the auth key',
|
|
|
|
|
' url: The URL of the streamlog server',
|
|
|
|
|
' event_id: The id of the streamlog event to sync',
|
|
|
|
|
]),
|
|
|
|
|
help="""
|
|
|
|
|
A JSON object describing a sync operation to perform.
|
|
|
|
|
Always present:
|
|
|
|
|
name: A human identifier for this sync operation
|
|
|
|
|
backend: The data source. One of "sheets" or "streamlog"
|
|
|
|
|
type: What kind of data is being synced. One of "events" or "playlists"
|
|
|
|
|
When backend is "sheets":
|
|
|
|
|
creds: path to credentials JSON file containing "client_id", "client_secret" and "refresh_token"
|
|
|
|
|
sheet_id: The id of the Google Sheet to use
|
|
|
|
|
worksheets: List of worksheets within that sheet to sync
|
|
|
|
|
allocate_ids: Boolean, optional. When true, will give rows without ids an id.
|
|
|
|
|
Only one sheetsync acting on the same sheet should have this enabled.
|
|
|
|
|
reverse_sync: Boolean, optional. When true, enables an alternate mode
|
|
|
|
|
where all data is synced from the database to the sheet.
|
|
|
|
|
Only one sheetsync acting on the same sheet should have this enabled.
|
|
|
|
|
When type is "events":
|
|
|
|
|
edit_url: a format string for edit links, with {} as a placeholder for id
|
|
|
|
|
bustime_start: Timestamp string at which bustime is 00:00
|
|
|
|
|
When backend is "streamlog":
|
|
|
|
|
type: streamlog
|
|
|
|
|
creds: path to a file containing the auth key
|
|
|
|
|
url: The URL of the streamlog server
|
|
|
|
|
event_id: The id of the streamlog event to sync
|
|
|
|
|
""",
|
|
|
|
|
)
|
|
|
|
|
def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
|
|
|
|
|
"""
|
|
|
|
@ -383,44 +388,55 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
|
|
|
|
|
workers = []
|
|
|
|
|
|
|
|
|
|
for config in sync_configs:
|
|
|
|
|
if config["type"] == "sheets":
|
|
|
|
|
if config["backend"] == "sheets":
|
|
|
|
|
creds = json.load(open(config["creds"]))
|
|
|
|
|
client = SheetsClient(
|
|
|
|
|
client_id=creds['client_id'],
|
|
|
|
|
client_secret=creds['client_secret'],
|
|
|
|
|
refresh_token=creds['refresh_token'],
|
|
|
|
|
)
|
|
|
|
|
middleware = SheetsEventsMiddleware(
|
|
|
|
|
client,
|
|
|
|
|
config["sheet_id"],
|
|
|
|
|
config["worksheets"],
|
|
|
|
|
common.dateutil.parse(config["bustime_start"]),
|
|
|
|
|
config["edit_url"],
|
|
|
|
|
config.get("allocate_ids", False),
|
|
|
|
|
)
|
|
|
|
|
if "playlist_worksheet" in config:
|
|
|
|
|
allocate_ids = config.get("allocate_ids", False)
|
|
|
|
|
if config["type"] == "sheets":
|
|
|
|
|
middleware = SheetsEventsMiddleware(
|
|
|
|
|
client,
|
|
|
|
|
config["sheet_id"],
|
|
|
|
|
config["worksheets"],
|
|
|
|
|
common.dateutil.parse(config["bustime_start"]),
|
|
|
|
|
config["edit_url"],
|
|
|
|
|
allocate_ids,
|
|
|
|
|
)
|
|
|
|
|
elif config["type"] == "playlists":
|
|
|
|
|
middleware = SheetsPlaylistsMiddleware(
|
|
|
|
|
client,
|
|
|
|
|
config["sheet_id"],
|
|
|
|
|
[config["playlist_worksheet"]],
|
|
|
|
|
config.get("allocate_ids", False),
|
|
|
|
|
)
|
|
|
|
|
workers.append(
|
|
|
|
|
PlaylistsSync("playlists", middleware, stop, dbmanager, config.get("reverse_sync", False))
|
|
|
|
|
)
|
|
|
|
|
elif config["type"] == "streamlog":
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError("Unknown type {!r}".format(config["type"]))
|
|
|
|
|
elif config["backend"] == "streamlog":
|
|
|
|
|
auth_token = open(config["creds"]).read().strip()
|
|
|
|
|
client = StreamLogClient(
|
|
|
|
|
config["url"],
|
|
|
|
|
config["event_id"],
|
|
|
|
|
auth_token,
|
|
|
|
|
)
|
|
|
|
|
middleware = StreamLogMiddleware(client)
|
|
|
|
|
if config["type"] == "events":
|
|
|
|
|
middleware = StreamLogEventsMiddleware(client)
|
|
|
|
|
elif config["type"] == "playlists":
|
|
|
|
|
middleware = StreamLogPlaylistsMiddleware(client)
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError("Unknown type {!r}".format(config["type"]))
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError("Unknown type {!r}".format(config["type"]))
|
|
|
|
|
workers.append(
|
|
|
|
|
EventsSync(config["type"], middleware, stop, dbmanager, config.get("reverse_sync", False)),
|
|
|
|
|
)
|
|
|
|
|
raise ValueError("Unknown backend {!r}".format(config["backend"]))
|
|
|
|
|
|
|
|
|
|
sync_class = {
|
|
|
|
|
"events": EventsSync,
|
|
|
|
|
"playlists": PlaylistsSync,
|
|
|
|
|
}[config["type"]]
|
|
|
|
|
reverse_sync = config.get("reverse_sync", False)
|
|
|
|
|
sync = sync_class(config["name"], middleware, stop, dbmanager, reverse_sync)
|
|
|
|
|
workers.append(sync)
|
|
|
|
|
|
|
|
|
|
jobs = [gevent.spawn(worker.run) for worker in workers]
|
|
|
|
|
# Block until any one exits
|
|
|
|
|