|
|
|
@ -23,17 +23,17 @@ class PlaylistManager(object):
|
|
|
|
|
self.static_playlist_tags = playlist_tags
|
|
|
|
|
self.reset()
|
|
|
|
|
|
|
|
|
|
def reset(self, playlist=None):
|
|
|
|
|
def reset(self, playlist_id=None):
|
|
|
|
|
"""Called to clear saved state and force a refresh after errors.
|
|
|
|
|
Either reset a specific playlist, or all if no arg given.
|
|
|
|
|
"""
|
|
|
|
|
if playlist is None:
|
|
|
|
|
if playlist_id is None:
|
|
|
|
|
# playlist_state represents our mirrored view of the list of items in each playlist.
|
|
|
|
|
# If a playlist is not present, it means we need to refresh our view of it.
|
|
|
|
|
# {playlist_id: [video_id]}
|
|
|
|
|
self.playlist_state = {}
|
|
|
|
|
else:
|
|
|
|
|
self.playlist_state.pop(playlist, None)
|
|
|
|
|
self.playlist_state.pop(playlist_id, None)
|
|
|
|
|
|
|
|
|
|
def run_once(self):
|
|
|
|
|
"""Do one check of all videos and playlists.
|
|
|
|
@ -59,16 +59,16 @@ class PlaylistManager(object):
|
|
|
|
|
|
|
|
|
|
# start all workers
|
|
|
|
|
workers = {}
|
|
|
|
|
for playlist, tags in playlist_tags.items():
|
|
|
|
|
workers[playlist] = gevent.spawn(self.update_playlist, playlist, tags, videos)
|
|
|
|
|
for playlist_id, tags in playlist_tags.items():
|
|
|
|
|
workers[playlist_id] = gevent.spawn(self.update_playlist, playlist_id, tags, videos)
|
|
|
|
|
|
|
|
|
|
# check each one for success, reset on failure
|
|
|
|
|
for playlist, worker in workers.items():
|
|
|
|
|
for playlist_id, worker in workers.items():
|
|
|
|
|
try:
|
|
|
|
|
worker.get()
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.exception("Failed to update playlist {}".format(playlist))
|
|
|
|
|
self.reset(playlist)
|
|
|
|
|
logging.exception("Failed to update playlist {}".format(playlist_id))
|
|
|
|
|
self.reset(playlist_id)
|
|
|
|
|
|
|
|
|
|
def get_videos(self):
|
|
|
|
|
# Most of the time by getting then re-putting the conn, we'll just use the same
|
|
|
|
@ -96,58 +96,58 @@ class PlaylistManager(object):
|
|
|
|
|
playlist_tags.update(self.static_playlist_tags)
|
|
|
|
|
return playlist_tags
|
|
|
|
|
|
|
|
|
|
def update_playlist(self, playlist, tags, videos):
|
|
|
|
|
def update_playlist(self, playlist_id, tags, videos):
|
|
|
|
|
# Filter the video list for videos with matching tags
|
|
|
|
|
matching = [
|
|
|
|
|
video for video in videos.values()
|
|
|
|
|
if all(tag in [t.lower() for t in video.tags] for tag in tags)
|
|
|
|
|
]
|
|
|
|
|
logging.debug("Found {} matching videos for playlist {}".format(len(matching), playlist))
|
|
|
|
|
logging.debug("Found {} matching videos for playlist {}".format(len(matching), playlist_id))
|
|
|
|
|
# If we have nothing to add, short circuit without doing any API calls to save quota.
|
|
|
|
|
|
|
|
|
|
matching_video_ids = {video.video_id for video in matching}
|
|
|
|
|
playlist_video_ids = set(self.playlist_state.get(playlist, []))
|
|
|
|
|
playlist_video_ids = set(self.playlist_state.get(playlist_id, []))
|
|
|
|
|
if not (matching_video_ids - playlist_video_ids):
|
|
|
|
|
logging.debug("All videos already in playlist, nothing to do")
|
|
|
|
|
return
|
|
|
|
|
# Refresh our playlist state, if necessary.
|
|
|
|
|
self.refresh_playlist(playlist)
|
|
|
|
|
self.refresh_playlist(playlist_id)
|
|
|
|
|
# Get an updated list of new videos
|
|
|
|
|
matching_video_ids = {video.video_id for video in matching}
|
|
|
|
|
playlist_video_ids = set(self.playlist_state.get(playlist, []))
|
|
|
|
|
playlist_video_ids = set(self.playlist_state.get(playlist_id, []))
|
|
|
|
|
# It shouldn't matter, but just for clarity let's sort them by event order
|
|
|
|
|
new_videos = sorted(matching_video_ids - playlist_video_ids, key=lambda v: v.start_time)
|
|
|
|
|
|
|
|
|
|
# Insert each new video one at a time
|
|
|
|
|
logging.debug("Inserting new videos for playlist {}: {}".format(playlist, new_videos))
|
|
|
|
|
logging.debug("Inserting new videos for playlist {}: {}".format(playlist_id, new_videos))
|
|
|
|
|
for video in new_videos:
|
|
|
|
|
index = self.find_insert_index(videos, self.playlist_state[playlist], video)
|
|
|
|
|
self.insert_into_playlist(playlist, video.video_id, index)
|
|
|
|
|
index = self.find_insert_index(videos, self.playlist_state[playlist_id], video)
|
|
|
|
|
self.insert_into_playlist(playlist_id, video.video_id, index)
|
|
|
|
|
|
|
|
|
|
def refresh_playlist(self, playlist):
|
|
|
|
|
def refresh_playlist(self, playlist_id):
|
|
|
|
|
"""Check playlist mirror is in a good state, and fetch it if it isn't.
|
|
|
|
|
We try to do this with only one page of list output, to save on quota.
|
|
|
|
|
If the total length does not match (or we don't have a copy at all),
|
|
|
|
|
then we do a full refresh.
|
|
|
|
|
"""
|
|
|
|
|
logging.debug("Fetching first page of playlist {}".format(playlist))
|
|
|
|
|
query = self.api.list_playlist(playlist)
|
|
|
|
|
logging.debug("Fetching first page of playlist {}".format(playlist_id))
|
|
|
|
|
query = self.api.list_playlist(playlist_id)
|
|
|
|
|
# See if we can avoid further page fetches.
|
|
|
|
|
if playlist not in self.playlist_state:
|
|
|
|
|
logging.info("Fetching playlist {} because we don't currently have it".format(playlist))
|
|
|
|
|
if playlist_id not in self.playlist_state:
|
|
|
|
|
logging.info("Fetching playlist {} because we don't currently have it".format(playlist_id))
|
|
|
|
|
elif query.is_complete:
|
|
|
|
|
logging.debug("First page of {} was entire playlist".format(playlist))
|
|
|
|
|
elif len(self.playlist_state[playlist]) == query.total_size:
|
|
|
|
|
logging.debug("Skipping fetching of remainder of playlist {}, size matches".format(playlist))
|
|
|
|
|
logging.debug("First page of {} was entire playlist".format(playlist_id))
|
|
|
|
|
elif len(self.playlist_state[playlist_id]) == query.total_size:
|
|
|
|
|
logging.debug("Skipping fetching of remainder of playlist {}, size matches".format(playlist_id))
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
logging.warning("Playlist {} has size mismatch ({} saved vs {} actual), refetching".format(
|
|
|
|
|
playlist, len(self.playlist_state[playlist]), query.total_size,
|
|
|
|
|
playlist_id, len(self.playlist_state[playlist_id]), query.total_size,
|
|
|
|
|
))
|
|
|
|
|
# Fetch remaining pages, if any
|
|
|
|
|
query.fetch_all()
|
|
|
|
|
# Update saved copy with video ids
|
|
|
|
|
self.playlist_state[playlist] = [
|
|
|
|
|
self.playlist_state[playlist_id] = [
|
|
|
|
|
item['snippet']['resourceId'].get('videoId') # api implies it's possible that non-videos are added
|
|
|
|
|
for item in query.items
|
|
|
|
|
]
|
|
|
|
@ -174,14 +174,14 @@ class PlaylistManager(object):
|
|
|
|
|
# therefore insert at end
|
|
|
|
|
return len(playlist)
|
|
|
|
|
|
|
|
|
|
def insert_into_playlist(self, playlist, video_id, index):
|
|
|
|
|
def insert_into_playlist(self, playlist_id, video_id, index):
|
|
|
|
|
"""Insert video into given playlist at given index.
|
|
|
|
|
Makes the API call then also updates our mirrored copy.
|
|
|
|
|
"""
|
|
|
|
|
logging.info("Inserting {} at index {} of {}".format(video_id, index, playlist))
|
|
|
|
|
self.api.insert_into_playlist(playlist, video_id, index)
|
|
|
|
|
logging.info("Inserting {} at index {} of {}".format(video_id, index, playlist_id))
|
|
|
|
|
self.api.insert_into_playlist(playlist_id, video_id, index)
|
|
|
|
|
# Update our copy
|
|
|
|
|
self.playlist_state.setdefault(playlist, []).insert(index, video_id)
|
|
|
|
|
self.playlist_state.setdefault(playlist_id, []).insert(index, video_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class YoutubeAPI(object):
|
|
|
|
@ -191,10 +191,10 @@ class YoutubeAPI(object):
|
|
|
|
|
# We could maybe have a per-video lock but this is easier.
|
|
|
|
|
self.insert_lock = gevent.lock.RLock()
|
|
|
|
|
|
|
|
|
|
def insert_into_playlist(self, playlist, video_id, index):
|
|
|
|
|
def insert_into_playlist(self, playlist_id, video_id, index):
|
|
|
|
|
json = {
|
|
|
|
|
"snippet": {
|
|
|
|
|
"playlistId": playlist,
|
|
|
|
|
"playlistId": playlist_id,
|
|
|
|
|
"resourceId": {
|
|
|
|
|
"kind": "youtube#video",
|
|
|
|
|
"videoId": video_id,
|
|
|
|
@ -210,21 +210,21 @@ class YoutubeAPI(object):
|
|
|
|
|
)
|
|
|
|
|
if not resp.ok:
|
|
|
|
|
raise Exception("Failed to insert {video_id} at index {index} of {playlist} with {resp.status_code}: {resp.content}".format(
|
|
|
|
|
playlist=playlist, video_id=video_id, index=index, resp=resp,
|
|
|
|
|
playlist=playlist_id, video_id=video_id, index=index, resp=resp,
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
def list_playlist(self, playlist):
|
|
|
|
|
def list_playlist(self, playlist_id):
|
|
|
|
|
"""Fetches the first page of playlist contents and returns a ListQuery object.
|
|
|
|
|
You can use this object to look up info and optionally retrieve the whole playlist."""
|
|
|
|
|
data = self._list_playlist(playlist)
|
|
|
|
|
return ListQuery(self, playlist, data)
|
|
|
|
|
data = self._list_playlist(playlist_id)
|
|
|
|
|
return ListQuery(self, playlist_id, data)
|
|
|
|
|
|
|
|
|
|
def _list_playlist(self, playlist, page_token=None):
|
|
|
|
|
def _list_playlist(self, playlist_id, page_token=None):
|
|
|
|
|
"""Internal method that actually does the list query.
|
|
|
|
|
Returns the full response json."""
|
|
|
|
|
params = {
|
|
|
|
|
"part": "snippet",
|
|
|
|
|
"playlistId": playlist,
|
|
|
|
|
"playlistId": playlist_id,
|
|
|
|
|
"maxResults": 50,
|
|
|
|
|
}
|
|
|
|
|
if page_token is not None:
|
|
|
|
@ -235,7 +235,7 @@ class YoutubeAPI(object):
|
|
|
|
|
)
|
|
|
|
|
if not resp.ok:
|
|
|
|
|
raise Exception("Failed to list {playlist} (page_token={page_token!r}) with {resp.status_code}: {resp.content}".format(
|
|
|
|
|
playlist=playlist, page_token=page_token, resp=resp,
|
|
|
|
|
playlist=playlist_id, page_token=page_token, resp=resp,
|
|
|
|
|
))
|
|
|
|
|
return resp.json()
|
|
|
|
|
|
|
|
|
|