diff --git a/playlist_manager/playlist_manager/main.py b/playlist_manager/playlist_manager/main.py index 5562489..2d8bdcb 100644 --- a/playlist_manager/playlist_manager/main.py +++ b/playlist_manager/playlist_manager/main.py @@ -1,4 +1,308 @@ +import logging +import json +import signal -def main(): - pass +import argh +import gevent +import gevent.event +import prometheus_client as prom + +import common +from common.database import DBManager, query +from common.googleapis import GoogleAPIClient + + +class PlaylistManager(object): + + def __init__(self, dbmanager, api_client, upload_locations, playlist_tags): + self.dbmanager = dbmanager + self.api = YoutubeAPI(api_client) + self.upload_locations = upload_locations + self.playlist_tags = playlist_tags + self.reset() + + def reset(self, playlist=None): + """Called to clear saved state and force a refresh after errors. + Either reset a specific playlist, or all if no arg given. + """ + if playlist is None: + # playlist_state represents our mirrored view of the list of items in each playlist. + # If a playlist is not present, it means we need to refresh our view of it. + # {playlist_id: [video_id]} + self.playlist_state = {} + else: + self.playlist_state.pop(playlist, None) + + def run_once(self): + """Do one check of all videos and playlists. + At a high level: + Fetch all eligible videos from the database + Group them into playlists depending on their tags + For each playlist, concurrently: + Compare this generated playlist to our local copy of the real thing + If they match, do nothing (don't even bother checking the real thing) + Check real thing for total length. If it matches, assume we're good. Otherwise refresh. + For each video to add: + Determine correct point in sort order + Add to playlist + Update local mirror with the action we just performed + """ + logging.info("Checking for new videos") + videos = self.get_videos() + logging.debug("Found {} eligible videos".format(len(videos))) + + # start all workers + workers = {} + for playlist, tags in self.playlist_tags.items(): + workers[playlist] = gevent.spawn(self.update_playlist, playlist, tags, videos) + + # check each one for success, reset on failure + for playlist, worker in workers.items(): + try: + worker.get() + except Exception: + logging.exception("Failed to update playlist {}".format(playlist)) + self.reset(playlist) + + def get_videos(self): + # Most of the time by getting then re-putting the conn, we'll just use the same + # one every time. But if there's an error we won't re-put it so we'll get a new one + # the next time. + conn = self.dbmanager.get_conn() + videos = query(conn, """ + SELECT video_id, tags, start_time + FROM events + WHERE state = 'DONE' AND upload_location IN %s + """, self.upload_locations) + self.dbmanager.put_conn(conn) + return {video.video_id: video for video in videos} + + def update_playlist(self, playlist, tags, videos): + # Filter the video list for videos with matching tags + matching = [ + video for video in videos.values() + if all(tag in video.tags for tag in tags) + ] + logging.debug("Found {} matching videos for playlist {}".format(len(matching), playlist)) + # If we have nothing to add, short circuit without doing any API calls to save quota. + if not set([v.video_id for v in matching]) - set(self.playlist_state.get(playlist, [])): + logging.debug("All videos already in playlist, nothing to do") + return + # Refresh our playlist state, if nessecary. + self.refresh_playlist(playlist) + # Get an updated list of new videos + new_videos = [ + video for video in matching + if video.video_id not in self.playlist_state[playlist] + ] + # It shouldn't matter, but just for clarity let's sort them by event order + new_videos.sort(key=lambda v: v.start_time) + # Insert each new video one at a time + logging.debug("Inserting new videos for playlist {}: {}".format(playlist, new_videos)) + for video in new_videos: + index = self.find_insert_index(videos, self.playlist_state[playlist], video) + self.insert_into_playlist(playlist, video.video_id, index) + + def refresh_playlist(self, playlist): + """Check playlist mirror is in a good state, and fetch it if it isn't. + We try to do this with only one page of list output, to save on quota. + If the total length does not match (or we don't have a copy at all), + then we do a full refresh. + """ + logging.debug("Fetching first page of playlist {}".format(playlist)) + query = self.api.list_playlist(playlist) + # See if we can avoid further page fetches. + if playlist not in self.playlist_state: + logging.info("Fetching playlist {} because we don't currently have it".format(playlist)) + elif query.is_complete: + logging.debug("First page of {} was entire playlist".format(playlist)) + elif len(self.playlist_state[playlist]) == query.total_size: + logging.debug("Skipping fetching of remainder of playlist {}, size matches".format(playlist)) + return + else: + logging.warning("Playlist {} has size mismatch ({} saved vs {} actual), refetching".format( + playlist, len(self.playlist_state[playlist]), query.total_size, + )) + # Fetch remaining pages, if any + query.fetch_all() + # Update saved copy with video ids + self.playlist_state[playlist] = [ + item['snippet']['resourceId'].get('videoId') # api implies it's possible that non-videos are added + for item in query.items + ] + + def find_insert_index(self, videos, playlist, new_video): + """Find the index at which to insert new_video into playlist such that + playlist remains sorted (it is assumed to already be sorted). + videos should be a mapping from video ids to video info. + """ + # To try to behave as best we can in the presence of mis-sorted playlists, + # we walk through the list linearly. We insert immediately before the first + # item that should be after us in sort order. + # Note that we treat unknown items (videos we don't know) as being before us + # in sort order, so that we always insert after them. + for n, video_id in enumerate(playlist): + if video_id not in videos: + # ignore unknowns + continue + video = videos[video_id] + # if this video is after new video, return this index + if new_video.start_time < video.start_time: + return n + # if we reach here, it means everything on the list was before the new video + # therefore insert at end + return len(playlist) + + def insert_into_playlist(self, playlist, video_id, index): + """Insert video into given playlist at given index. + Makes the API call then also updates our mirrored copy. + """ + logging.info("Inserting {} at index {} of {}".format(video_id, index, playlist)) + self.api.insert_into_playlist(playlist, video_id, index) + # Update our copy + self.playlist_state.setdefault(playlist, []).insert(index, video_id) + + +class YoutubeAPI(object): + def __init__(self, client): + self.client = client + + def insert_into_playlist(self, playlist, video_id, index): + json = { + "snippet": { + "playlistId": playlist, + "resourceId": { + "kind": "youtube#video", + "videoId": video_id, + }, + "position": index, + }, + } + resp = self.client.request("POST", "https://www.googleapis.com/youtube/v3/playlistItems", + params={"part": "snippet"}, + json=json, + metric_name="playlist_insert", + ) + if not resp.ok: + raise Exception("Failed to insert {video_id} at index {index} of {playlist} with {resp.status_code}: {resp.content}".format( + playlist=playlist, video_id=video_id, index=index, resp=resp, + )) + + def list_playlist(self, playlist): + """Fetches the first page of playlist contents and returns a ListQuery object. + You can use this object to look up info and optionally retrieve the whole playlist.""" + data = self._list_playlist(playlist) + return ListQuery(self, playlist, data) + + def _list_playlist(self, playlist, page_token=None): + """Internal method that actually does the list query. + Returns the full response json.""" + params = { + "part": "snippet", + "playlistId": playlist, + "maxResults": 50, + } + if page_token is not None: + params['pageToken'] = page_token + resp = self.client.request("GET", "https://www.googleapis.com/youtube/v3/playlistItems", + params=params, + metric_name="playlist_list", + ) + if not resp.ok: + raise Exception("Failed to list {playlist} (page_token={!r}) with {resp.status_code}: {resp.content}".format( + playlist=playlist, page_token=page_token, resp=resp, + )) + return resp.json() + + +class ListQuery(object): + """Represents a partially-fetched list query for all playlist items. + To save on quota, we avoid fetching the entire playlist until asked.""" + def __init__(self, api, playlist_id, data): + self.api = api + self.playlist_id = playlist_id + self.total_size = data['pageInfo']['totalResults'] + self.is_complete = self.total_size <= data['pageInfo']['resultsPerPage'] + self.items = data['items'] + self.page_token = data.get('nextPageToken') + + def fetch_all(self): + if self.is_complete: + return + page_token = self.page_token + while len(self.items) < self.total_size: + assert page_token is not None, "More items to fetch but no page token" + data = self.api._list_playlist(self.playlist_id, page_token) + self.items += data['items'] + page_token = data.get('nextPageToken') + # I'm just being paranoid here about an infinite loop blowing our quota, + # let's assert we always are making forward progress + assert data['items'], "Got no extra items from new page" + self.is_complete = True + + +def parse_playlist_arg(arg): + playlist, tags = arg.split('=', 1) + tags = tags.split(",") if tags else [] + return playlist, tags + + +@argh.arg("playlists", nargs="*", metavar="PLAYLIST={TAG,}", type=parse_playlist_arg, help= + "Each playlist arg specifies a youtube playlist ID, along with any number of tags. " + "Events will be added to the playlist if that event has all the tags. For example, " + "some_playlist_id=Day 1,Technical would populate that playlist with all Technical events " + "from Day 1. Note that having no tags (ie. 'id=') is allowed and all events will be added to it. " + "Note playlist ids must be unique (can't specify the same one twice)." +) +def main( + dbconnect, + creds_file, + playlists, + upload_location_allowlist="youtube", + interval=600, + metrics_port=8007, + backdoor_port=0, +): + """ + dbconnect should be a postgres connection string + + creds_file should contain youtube api creds + + upload_location_allowlist is a comma-seperated list of database upload locations to + consider as eligible to being added to playlists. For these locations, the database video id + must be a youtube video id. + + interval is how often to check for new videos, default every 10min. + """ + common.PromLogCountsHandler.install() + common.install_stacksampler() + prom.start_http_server(metrics_port) + + if backdoor_port: + gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() + + upload_locations = upload_location_allowlist.split(",") if upload_location_allowlist else [] + playlists = dict(playlists) + + stop = gevent.event.Event() + gevent.signal_handler(signal.SIGTERM, stop.set) # shut down on sigterm + + logging.info("Starting up") + + with open(creds_file) as f: + creds = json.load(f) + client = GoogleAPIClient(creds['client_id'], creds['client_secret'], creds['refresh_token']) + + dbmanager = DBManager(dsn=dbconnect) + manager = PlaylistManager(dbmanager, client, upload_locations, playlists) + + while not stop.is_set(): + try: + manager.run_once() + except Exception: + logging.exception("Failed to run playlist manager") + manager.reset() + stop.wait(interval) # wait for interval, or until stopping + + logging.info("Stopped")