From 467edf3d19e668fdad371e3758dab71a6adc5c25 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Fri, 5 Nov 2021 13:32:02 +1100 Subject: [PATCH] Read dynamic playlist manager config from sheet The sheetsync loads playlist ids and tags into a new table `playlists`. playlist manager reads this table and merges it with the playlists given on the command line. --- docker-compose.jsonnet | 2 + playlist_manager/playlist_manager/main.py | 24 ++++++++++-- postgres/setup.sh | 7 ++++ sheetsync/sheetsync/main.py | 46 ++++++++++++++++++++--- 4 files changed, 71 insertions(+), 8 deletions(-) diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index c50d045..d966d01 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -153,6 +153,7 @@ // The spreadsheet id and worksheet names for sheet sync to act on sheet_id:: "your_id_here", worksheets:: ["Tech Test & Preshow"] + ["Day %d" % n for n in std.range(1, 8)], + playlist_worksheet:: "Tags", // A map from youtube playlist IDs to a list of tags. // Playlist manager will populate each playlist with all videos which have all those tags. @@ -319,6 +320,7 @@ "--backdoor-port", std.toString($.backdoor_port), "--allocate-ids", $.db_connect, + "--playlist-worksheet", $.playlist_worksheet, "/etc/wubloader-creds.json", $.edit_url, $.bustime_start, diff --git a/playlist_manager/playlist_manager/main.py b/playlist_manager/playlist_manager/main.py index 096c705..81e6c2c 100644 --- a/playlist_manager/playlist_manager/main.py +++ b/playlist_manager/playlist_manager/main.py @@ -20,7 +20,7 @@ class PlaylistManager(object): self.dbmanager = dbmanager self.api = YoutubeAPI(api_client) self.upload_locations = upload_locations - self.playlist_tags = playlist_tags + self.static_playlist_tags = playlist_tags self.reset() def reset(self, playlist=None): @@ -53,9 +53,13 @@ class PlaylistManager(object): videos = self.get_videos() logging.debug("Found {} eligible videos".format(len(videos))) + logging.info("Getting dynamic playlists") + playlist_tags = self.get_playlist_tags() + logging.debug("Found {} playlists".format(len(playlist_tags))) + # start all workers workers = {} - for playlist, tags in self.playlist_tags.items(): + for playlist, tags in playlist_tags.items(): workers[playlist] = gevent.spawn(self.update_playlist, playlist, tags, videos) # check each one for success, reset on failure @@ -79,6 +83,19 @@ class PlaylistManager(object): self.dbmanager.put_conn(conn) return {video.video_id: video for video in videos} + def get_playlist_tags(self): + conn = self.dbmanager.get_conn() + playlist_tags = { + row.playlist_id: [tag.lower() for tag in row.tags] + for row in query(conn, "SELECT playlist_id, tags FROM playlists") + } + self.dbmanager.put_conn(conn) + duplicates = set(playlist_tags) & set(self.static_playlist_tags) + if duplicates: + raise ValueError("Some playlists are listed in both static and dynamic playlist sources: {}".format(", ".join(duplicates))) + playlist_tags.update(self.static_playlist_tags) + return playlist_tags + def update_playlist(self, playlist, tags, videos): # Filter the video list for videos with matching tags matching = [ @@ -255,7 +272,8 @@ def parse_playlist_arg(arg): "Events will be added to the playlist if that event has all the tags. For example, " "some_playlist_id=Day 1,Technical would populate that playlist with all Technical events " "from Day 1. Note that having no tags (ie. 'id=') is allowed and all events will be added to it. " - "Note playlist ids must be unique (can't specify the same one twice)." + "Note playlist ids must be unique (can't specify the same one twice). " + "These playlists will be added to ones listed in the database." ) def main( dbconnect, diff --git a/postgres/setup.sh b/postgres/setup.sh index 1820478..2f8277d 100644 --- a/postgres/setup.sh +++ b/postgres/setup.sh @@ -106,6 +106,13 @@ CREATE TABLE editors ( email TEXT PRIMARY KEY, name TEXT NOT NULL ); + +-- Playlists are communicated to playlist manager via this table. +-- Sheetsync will wipe and re-populate this table periodically to match what is in the sheet +CREATE TABLE playlists ( + tags TEXT[] NOT NULL, + playlist_id TEXT PRIMARY KEY, +); EOSQL if [ -a /mnt/wubloader/nodes.csv ]; then diff --git a/sheetsync/sheetsync/main.py b/sheetsync/sheetsync/main.py index 42018a3..2717857 100644 --- a/sheetsync/sheetsync/main.py +++ b/sheetsync/sheetsync/main.py @@ -11,7 +11,7 @@ import gevent.event import prometheus_client as prom from monotonic import monotonic from psycopg2 import sql -from psycopg2.extras import register_uuid +from psycopg2.extras import register_uuid, execute_values from requests import HTTPError import common @@ -67,9 +67,10 @@ class SheetSync(object): # Expected quota usage per 100s = # (100 / RETRY_INTERVAL) * ACTIVE_SHEET_COUNT # + (100 / RETRY_INTERVAL / SYNCS_PER_INACTIVE_CHECK) * (len(worksheets) - ACTIVE_SHEET_COUNT) - # For current values, this is 100/5 * 2 + 100/5/4 * 6 = 70 + # If playlist_worksheet is defined, add 1 to len(worksheets). + # For current values, this is 100/5 * 2 + 100/5/4 * 7 = 75 - def __init__(self, stop, dbmanager, sheets, sheet_id, worksheets, edit_url, bustime_start, allocate_ids=False): + def __init__(self, stop, dbmanager, sheets, sheet_id, worksheets, edit_url, bustime_start, allocate_ids=False, playlist_worksheet=None): self.stop = stop self.dbmanager = dbmanager self.sheets = sheets @@ -79,6 +80,8 @@ class SheetSync(object): self.edit_url = edit_url self.bustime_start = bustime_start self.allocate_ids = allocate_ids + # The playlist worksheet is checked on the same cadence as inactive sheets + self.playlist_worksheet = playlist_worksheet # Maps DB column names (or general identifier, for non-DB columns) to sheet column indexes. # Hard-coded for now, future work: determine this from column headers in sheet self.column_map = { @@ -159,11 +162,13 @@ class SheetSync(object): if sync_count % self.SYNCS_PER_INACTIVE_CHECK == 0: # check all worksheets worksheets = self.worksheets + playlist_worksheet = self.playlist_worksheet else: # only check most recently changed worksheets worksheets = sorted( self.worksheets.keys(), key=lambda k: self.worksheets[k], reverse=True, )[:self.ACTIVE_SHEET_COUNT] + playlist_worksheet = None sync_count += 1 sync_start = monotonic() @@ -178,6 +183,10 @@ class SheetSync(object): continue row = self.parse_row(worksheet, row) self.sync_row(worksheet, row_index, row, events.get(row['id'])) + + if playlist_worksheet is not None: + rows = self.sheets.get_rows(self.sheet_id, playlist_worksheet) + self.sync_playlists(rows) except Exception as e: # for HTTPErrors, http response body includes the more detailed error detail = '' @@ -344,6 +353,30 @@ class SheetSync(object): the most-recently-modified queue.""" self.worksheets[worksheet] = monotonic() + def sync_playlists(self, rows): + """Parse rows with a valid playlist id and at least one tag, + overwriting the entire playlists table""" + playlists = [] + for row in rows: + if len(row) != 3: + continue + tags, _, playlist_id = row + tags = self.column_parsers['tags'](tags) + if not tags: + continue + playlist_id = playlist_id.strip() + if len(playlist_id) != 34 or not playlist_id.startswith('PL'): + continue + playlists.append((tags, playlist_id)) + # We want to wipe and replace all the current entries in the table. + # The easiest way to do this is a DELETE then an INSERT, all within a transaction. + # The "with" block will perform everything under it within a transaction, rolling back + # on error or committing on exit. + logging.info("Updating playlists table with {} playlists".format(len(playlists))) + with self.conn: + query(self.conn, "DELETE FROM playlists") + execute_values(self.conn.cursor(), "INSERT INTO playlists(tags, playlist_id) VALUES %s", playlists) + @argh.arg('dbconnect', help= "dbconnect should be a postgres connection string, which is either a space-separated " @@ -369,7 +402,10 @@ class SheetSync(object): "--allocate-ids means that it will give rows without ids an id. " "Only one sheet sync should have --allocate-ids on for a given sheet at once!" ) -def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksheet_names, metrics_port=8005, backdoor_port=0, allocate_ids=False): +@argh.arg('--playlist-worksheet', help= + "An optional additional worksheet name that holds playlist tag definitions", +) +def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksheet_names, metrics_port=8005, backdoor_port=0, allocate_ids=False, playlist_worksheet=None): """ Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet to the DB and outputs from the DB to the sheet. @@ -414,6 +450,6 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh refresh_token=sheets_creds['refresh_token'], ) - SheetSync(stop, dbmanager, sheets, sheet_id, worksheet_names, edit_url, bustime_start, allocate_ids).run() + SheetSync(stop, dbmanager, sheets, sheet_id, worksheet_names, edit_url, bustime_start, allocate_ids, playlist_worksheet).run() logging.info("Gracefully stopped")