mirror of https://github.com/ekimekim/wubloader
Implement playlist manager
parent
9562032c24
commit
9413b99b91
@ -1,4 +1,308 @@
|
||||
|
||||
import logging
|
||||
import json
|
||||
import signal
|
||||
|
||||
def main():
|
||||
pass
|
||||
import argh
|
||||
import gevent
|
||||
import gevent.event
|
||||
import prometheus_client as prom
|
||||
|
||||
import common
|
||||
from common.database import DBManager, query
|
||||
from common.googleapis import GoogleAPIClient
|
||||
|
||||
|
||||
class PlaylistManager(object):
|
||||
|
||||
def __init__(self, dbmanager, api_client, upload_locations, playlist_tags):
|
||||
self.dbmanager = dbmanager
|
||||
self.api = YoutubeAPI(api_client)
|
||||
self.upload_locations = upload_locations
|
||||
self.playlist_tags = playlist_tags
|
||||
self.reset()
|
||||
|
||||
def reset(self, playlist=None):
|
||||
"""Called to clear saved state and force a refresh after errors.
|
||||
Either reset a specific playlist, or all if no arg given.
|
||||
"""
|
||||
if playlist is None:
|
||||
# playlist_state represents our mirrored view of the list of items in each playlist.
|
||||
# If a playlist is not present, it means we need to refresh our view of it.
|
||||
# {playlist_id: [video_id]}
|
||||
self.playlist_state = {}
|
||||
else:
|
||||
self.playlist_state.pop(playlist, None)
|
||||
|
||||
def run_once(self):
|
||||
"""Do one check of all videos and playlists.
|
||||
At a high level:
|
||||
Fetch all eligible videos from the database
|
||||
Group them into playlists depending on their tags
|
||||
For each playlist, concurrently:
|
||||
Compare this generated playlist to our local copy of the real thing
|
||||
If they match, do nothing (don't even bother checking the real thing)
|
||||
Check real thing for total length. If it matches, assume we're good. Otherwise refresh.
|
||||
For each video to add:
|
||||
Determine correct point in sort order
|
||||
Add to playlist
|
||||
Update local mirror with the action we just performed
|
||||
"""
|
||||
logging.info("Checking for new videos")
|
||||
videos = self.get_videos()
|
||||
logging.debug("Found {} eligible videos".format(len(videos)))
|
||||
|
||||
# start all workers
|
||||
workers = {}
|
||||
for playlist, tags in self.playlist_tags.items():
|
||||
workers[playlist] = gevent.spawn(self.update_playlist, playlist, tags, videos)
|
||||
|
||||
# check each one for success, reset on failure
|
||||
for playlist, worker in workers.items():
|
||||
try:
|
||||
worker.get()
|
||||
except Exception:
|
||||
logging.exception("Failed to update playlist {}".format(playlist))
|
||||
self.reset(playlist)
|
||||
|
||||
def get_videos(self):
|
||||
# Most of the time by getting then re-putting the conn, we'll just use the same
|
||||
# one every time. But if there's an error we won't re-put it so we'll get a new one
|
||||
# the next time.
|
||||
conn = self.dbmanager.get_conn()
|
||||
videos = query(conn, """
|
||||
SELECT video_id, tags, start_time
|
||||
FROM events
|
||||
WHERE state = 'DONE' AND upload_location IN %s
|
||||
""", self.upload_locations)
|
||||
self.dbmanager.put_conn(conn)
|
||||
return {video.video_id: video for video in videos}
|
||||
|
||||
def update_playlist(self, playlist, tags, videos):
|
||||
# Filter the video list for videos with matching tags
|
||||
matching = [
|
||||
video for video in videos.values()
|
||||
if all(tag in video.tags for tag in tags)
|
||||
]
|
||||
logging.debug("Found {} matching videos for playlist {}".format(len(matching), playlist))
|
||||
# If we have nothing to add, short circuit without doing any API calls to save quota.
|
||||
if not set([v.video_id for v in matching]) - set(self.playlist_state.get(playlist, [])):
|
||||
logging.debug("All videos already in playlist, nothing to do")
|
||||
return
|
||||
# Refresh our playlist state, if nessecary.
|
||||
self.refresh_playlist(playlist)
|
||||
# Get an updated list of new videos
|
||||
new_videos = [
|
||||
video for video in matching
|
||||
if video.video_id not in self.playlist_state[playlist]
|
||||
]
|
||||
# It shouldn't matter, but just for clarity let's sort them by event order
|
||||
new_videos.sort(key=lambda v: v.start_time)
|
||||
# Insert each new video one at a time
|
||||
logging.debug("Inserting new videos for playlist {}: {}".format(playlist, new_videos))
|
||||
for video in new_videos:
|
||||
index = self.find_insert_index(videos, self.playlist_state[playlist], video)
|
||||
self.insert_into_playlist(playlist, video.video_id, index)
|
||||
|
||||
def refresh_playlist(self, playlist):
|
||||
"""Check playlist mirror is in a good state, and fetch it if it isn't.
|
||||
We try to do this with only one page of list output, to save on quota.
|
||||
If the total length does not match (or we don't have a copy at all),
|
||||
then we do a full refresh.
|
||||
"""
|
||||
logging.debug("Fetching first page of playlist {}".format(playlist))
|
||||
query = self.api.list_playlist(playlist)
|
||||
# See if we can avoid further page fetches.
|
||||
if playlist not in self.playlist_state:
|
||||
logging.info("Fetching playlist {} because we don't currently have it".format(playlist))
|
||||
elif query.is_complete:
|
||||
logging.debug("First page of {} was entire playlist".format(playlist))
|
||||
elif len(self.playlist_state[playlist]) == query.total_size:
|
||||
logging.debug("Skipping fetching of remainder of playlist {}, size matches".format(playlist))
|
||||
return
|
||||
else:
|
||||
logging.warning("Playlist {} has size mismatch ({} saved vs {} actual), refetching".format(
|
||||
playlist, len(self.playlist_state[playlist]), query.total_size,
|
||||
))
|
||||
# Fetch remaining pages, if any
|
||||
query.fetch_all()
|
||||
# Update saved copy with video ids
|
||||
self.playlist_state[playlist] = [
|
||||
item['snippet']['resourceId'].get('videoId') # api implies it's possible that non-videos are added
|
||||
for item in query.items
|
||||
]
|
||||
|
||||
def find_insert_index(self, videos, playlist, new_video):
|
||||
"""Find the index at which to insert new_video into playlist such that
|
||||
playlist remains sorted (it is assumed to already be sorted).
|
||||
videos should be a mapping from video ids to video info.
|
||||
"""
|
||||
# To try to behave as best we can in the presence of mis-sorted playlists,
|
||||
# we walk through the list linearly. We insert immediately before the first
|
||||
# item that should be after us in sort order.
|
||||
# Note that we treat unknown items (videos we don't know) as being before us
|
||||
# in sort order, so that we always insert after them.
|
||||
for n, video_id in enumerate(playlist):
|
||||
if video_id not in videos:
|
||||
# ignore unknowns
|
||||
continue
|
||||
video = videos[video_id]
|
||||
# if this video is after new video, return this index
|
||||
if new_video.start_time < video.start_time:
|
||||
return n
|
||||
# if we reach here, it means everything on the list was before the new video
|
||||
# therefore insert at end
|
||||
return len(playlist)
|
||||
|
||||
def insert_into_playlist(self, playlist, video_id, index):
|
||||
"""Insert video into given playlist at given index.
|
||||
Makes the API call then also updates our mirrored copy.
|
||||
"""
|
||||
logging.info("Inserting {} at index {} of {}".format(video_id, index, playlist))
|
||||
self.api.insert_into_playlist(playlist, video_id, index)
|
||||
# Update our copy
|
||||
self.playlist_state.setdefault(playlist, []).insert(index, video_id)
|
||||
|
||||
|
||||
class YoutubeAPI(object):
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
|
||||
def insert_into_playlist(self, playlist, video_id, index):
|
||||
json = {
|
||||
"snippet": {
|
||||
"playlistId": playlist,
|
||||
"resourceId": {
|
||||
"kind": "youtube#video",
|
||||
"videoId": video_id,
|
||||
},
|
||||
"position": index,
|
||||
},
|
||||
}
|
||||
resp = self.client.request("POST", "https://www.googleapis.com/youtube/v3/playlistItems",
|
||||
params={"part": "snippet"},
|
||||
json=json,
|
||||
metric_name="playlist_insert",
|
||||
)
|
||||
if not resp.ok:
|
||||
raise Exception("Failed to insert {video_id} at index {index} of {playlist} with {resp.status_code}: {resp.content}".format(
|
||||
playlist=playlist, video_id=video_id, index=index, resp=resp,
|
||||
))
|
||||
|
||||
def list_playlist(self, playlist):
|
||||
"""Fetches the first page of playlist contents and returns a ListQuery object.
|
||||
You can use this object to look up info and optionally retrieve the whole playlist."""
|
||||
data = self._list_playlist(playlist)
|
||||
return ListQuery(self, playlist, data)
|
||||
|
||||
def _list_playlist(self, playlist, page_token=None):
|
||||
"""Internal method that actually does the list query.
|
||||
Returns the full response json."""
|
||||
params = {
|
||||
"part": "snippet",
|
||||
"playlistId": playlist,
|
||||
"maxResults": 50,
|
||||
}
|
||||
if page_token is not None:
|
||||
params['pageToken'] = page_token
|
||||
resp = self.client.request("GET", "https://www.googleapis.com/youtube/v3/playlistItems",
|
||||
params=params,
|
||||
metric_name="playlist_list",
|
||||
)
|
||||
if not resp.ok:
|
||||
raise Exception("Failed to list {playlist} (page_token={!r}) with {resp.status_code}: {resp.content}".format(
|
||||
playlist=playlist, page_token=page_token, resp=resp,
|
||||
))
|
||||
return resp.json()
|
||||
|
||||
|
||||
class ListQuery(object):
|
||||
"""Represents a partially-fetched list query for all playlist items.
|
||||
To save on quota, we avoid fetching the entire playlist until asked."""
|
||||
def __init__(self, api, playlist_id, data):
|
||||
self.api = api
|
||||
self.playlist_id = playlist_id
|
||||
self.total_size = data['pageInfo']['totalResults']
|
||||
self.is_complete = self.total_size <= data['pageInfo']['resultsPerPage']
|
||||
self.items = data['items']
|
||||
self.page_token = data.get('nextPageToken')
|
||||
|
||||
def fetch_all(self):
|
||||
if self.is_complete:
|
||||
return
|
||||
page_token = self.page_token
|
||||
while len(self.items) < self.total_size:
|
||||
assert page_token is not None, "More items to fetch but no page token"
|
||||
data = self.api._list_playlist(self.playlist_id, page_token)
|
||||
self.items += data['items']
|
||||
page_token = data.get('nextPageToken')
|
||||
# I'm just being paranoid here about an infinite loop blowing our quota,
|
||||
# let's assert we always are making forward progress
|
||||
assert data['items'], "Got no extra items from new page"
|
||||
self.is_complete = True
|
||||
|
||||
|
||||
def parse_playlist_arg(arg):
|
||||
playlist, tags = arg.split('=', 1)
|
||||
tags = tags.split(",") if tags else []
|
||||
return playlist, tags
|
||||
|
||||
|
||||
@argh.arg("playlists", nargs="*", metavar="PLAYLIST={TAG,}", type=parse_playlist_arg, help=
|
||||
"Each playlist arg specifies a youtube playlist ID, along with any number of tags. "
|
||||
"Events will be added to the playlist if that event has all the tags. For example, "
|
||||
"some_playlist_id=Day 1,Technical would populate that playlist with all Technical events "
|
||||
"from Day 1. Note that having no tags (ie. 'id=') is allowed and all events will be added to it. "
|
||||
"Note playlist ids must be unique (can't specify the same one twice)."
|
||||
)
|
||||
def main(
|
||||
dbconnect,
|
||||
creds_file,
|
||||
playlists,
|
||||
upload_location_allowlist="youtube",
|
||||
interval=600,
|
||||
metrics_port=8007,
|
||||
backdoor_port=0,
|
||||
):
|
||||
"""
|
||||
dbconnect should be a postgres connection string
|
||||
|
||||
creds_file should contain youtube api creds
|
||||
|
||||
upload_location_allowlist is a comma-seperated list of database upload locations to
|
||||
consider as eligible to being added to playlists. For these locations, the database video id
|
||||
must be a youtube video id.
|
||||
|
||||
interval is how often to check for new videos, default every 10min.
|
||||
"""
|
||||
common.PromLogCountsHandler.install()
|
||||
common.install_stacksampler()
|
||||
prom.start_http_server(metrics_port)
|
||||
|
||||
if backdoor_port:
|
||||
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
|
||||
|
||||
upload_locations = upload_location_allowlist.split(",") if upload_location_allowlist else []
|
||||
playlists = dict(playlists)
|
||||
|
||||
stop = gevent.event.Event()
|
||||
gevent.signal_handler(signal.SIGTERM, stop.set) # shut down on sigterm
|
||||
|
||||
logging.info("Starting up")
|
||||
|
||||
with open(creds_file) as f:
|
||||
creds = json.load(f)
|
||||
client = GoogleAPIClient(creds['client_id'], creds['client_secret'], creds['refresh_token'])
|
||||
|
||||
dbmanager = DBManager(dsn=dbconnect)
|
||||
manager = PlaylistManager(dbmanager, client, upload_locations, playlists)
|
||||
|
||||
while not stop.is_set():
|
||||
try:
|
||||
manager.run_once()
|
||||
except Exception:
|
||||
logging.exception("Failed to run playlist manager")
|
||||
manager.reset()
|
||||
stop.wait(interval) # wait for interval, or until stopping
|
||||
|
||||
logging.info("Stopped")
|
||||
|
Loading…
Reference in New Issue