mirror of https://github.com/ekimekim/wubloader
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
311 lines
11 KiB
Python
311 lines
11 KiB
Python
|
|
import logging
|
|
import json
|
|
import signal
|
|
|
|
import argh
|
|
import gevent
|
|
import gevent.backdoor
|
|
import gevent.event
|
|
import prometheus_client as prom
|
|
|
|
import common
|
|
from common.database import DBManager, query
|
|
from common.googleapis import GoogleAPIClient
|
|
|
|
|
|
class PlaylistManager(object):
|
|
|
|
def __init__(self, dbmanager, api_client, upload_locations, playlist_tags):
|
|
self.dbmanager = dbmanager
|
|
self.api = YoutubeAPI(api_client)
|
|
self.upload_locations = upload_locations
|
|
self.playlist_tags = playlist_tags
|
|
self.reset()
|
|
|
|
def reset(self, playlist=None):
|
|
"""Called to clear saved state and force a refresh after errors.
|
|
Either reset a specific playlist, or all if no arg given.
|
|
"""
|
|
if playlist is None:
|
|
# playlist_state represents our mirrored view of the list of items in each playlist.
|
|
# If a playlist is not present, it means we need to refresh our view of it.
|
|
# {playlist_id: [video_id]}
|
|
self.playlist_state = {}
|
|
else:
|
|
self.playlist_state.pop(playlist, None)
|
|
|
|
def run_once(self):
|
|
"""Do one check of all videos and playlists.
|
|
At a high level:
|
|
Fetch all eligible videos from the database
|
|
Group them into playlists depending on their tags
|
|
For each playlist, concurrently:
|
|
Compare this generated playlist to our local copy of the real thing
|
|
If they match, do nothing (don't even bother checking the real thing)
|
|
Check real thing for total length. If it matches, assume we're good. Otherwise refresh.
|
|
For each video to add:
|
|
Determine correct point in sort order
|
|
Add to playlist
|
|
Update local mirror with the action we just performed
|
|
"""
|
|
logging.info("Checking for new videos")
|
|
videos = self.get_videos()
|
|
logging.debug("Found {} eligible videos".format(len(videos)))
|
|
|
|
# start all workers
|
|
workers = {}
|
|
for playlist, tags in self.playlist_tags.items():
|
|
workers[playlist] = gevent.spawn(self.update_playlist, playlist, tags, videos)
|
|
|
|
# check each one for success, reset on failure
|
|
for playlist, worker in workers.items():
|
|
try:
|
|
worker.get()
|
|
except Exception:
|
|
logging.exception("Failed to update playlist {}".format(playlist))
|
|
self.reset(playlist)
|
|
|
|
def get_videos(self):
|
|
# Most of the time by getting then re-putting the conn, we'll just use the same
|
|
# one every time. But if there's an error we won't re-put it so we'll get a new one
|
|
# the next time.
|
|
conn = self.dbmanager.get_conn()
|
|
videos = query(conn, """
|
|
SELECT video_id, tags, COALESCE(video_start, event_start) AS start_time
|
|
FROM events
|
|
WHERE state = 'DONE' AND upload_location = ANY (%s)
|
|
""", self.upload_locations)
|
|
self.dbmanager.put_conn(conn)
|
|
return {video.video_id: video for video in videos}
|
|
|
|
def update_playlist(self, playlist, tags, videos):
|
|
# Filter the video list for videos with matching tags
|
|
matching = [
|
|
video for video in videos.values()
|
|
if all(tag in [t.lower() for t in video.tags] for tag in tags)
|
|
]
|
|
logging.debug("Found {} matching videos for playlist {}".format(len(matching), playlist))
|
|
# If we have nothing to add, short circuit without doing any API calls to save quota.
|
|
if not set([v.video_id for v in matching]) - set(self.playlist_state.get(playlist, [])):
|
|
logging.debug("All videos already in playlist, nothing to do")
|
|
return
|
|
# Refresh our playlist state, if nessecary.
|
|
self.refresh_playlist(playlist)
|
|
# Get an updated list of new videos
|
|
new_videos = [
|
|
video for video in matching
|
|
if video.video_id not in self.playlist_state[playlist]
|
|
]
|
|
# It shouldn't matter, but just for clarity let's sort them by event order
|
|
new_videos.sort(key=lambda v: v.start_time)
|
|
# Insert each new video one at a time
|
|
logging.debug("Inserting new videos for playlist {}: {}".format(playlist, new_videos))
|
|
for video in new_videos:
|
|
index = self.find_insert_index(videos, self.playlist_state[playlist], video)
|
|
self.insert_into_playlist(playlist, video.video_id, index)
|
|
|
|
def refresh_playlist(self, playlist):
|
|
"""Check playlist mirror is in a good state, and fetch it if it isn't.
|
|
We try to do this with only one page of list output, to save on quota.
|
|
If the total length does not match (or we don't have a copy at all),
|
|
then we do a full refresh.
|
|
"""
|
|
logging.debug("Fetching first page of playlist {}".format(playlist))
|
|
query = self.api.list_playlist(playlist)
|
|
# See if we can avoid further page fetches.
|
|
if playlist not in self.playlist_state:
|
|
logging.info("Fetching playlist {} because we don't currently have it".format(playlist))
|
|
elif query.is_complete:
|
|
logging.debug("First page of {} was entire playlist".format(playlist))
|
|
elif len(self.playlist_state[playlist]) == query.total_size:
|
|
logging.debug("Skipping fetching of remainder of playlist {}, size matches".format(playlist))
|
|
return
|
|
else:
|
|
logging.warning("Playlist {} has size mismatch ({} saved vs {} actual), refetching".format(
|
|
playlist, len(self.playlist_state[playlist]), query.total_size,
|
|
))
|
|
# Fetch remaining pages, if any
|
|
query.fetch_all()
|
|
# Update saved copy with video ids
|
|
self.playlist_state[playlist] = [
|
|
item['snippet']['resourceId'].get('videoId') # api implies it's possible that non-videos are added
|
|
for item in query.items
|
|
]
|
|
|
|
def find_insert_index(self, videos, playlist, new_video):
|
|
"""Find the index at which to insert new_video into playlist such that
|
|
playlist remains sorted (it is assumed to already be sorted).
|
|
videos should be a mapping from video ids to video info.
|
|
"""
|
|
# To try to behave as best we can in the presence of mis-sorted playlists,
|
|
# we walk through the list linearly. We insert immediately before the first
|
|
# item that should be after us in sort order.
|
|
# Note that we treat unknown items (videos we don't know) as being before us
|
|
# in sort order, so that we always insert after them.
|
|
for n, video_id in enumerate(playlist):
|
|
if video_id not in videos:
|
|
# ignore unknowns
|
|
continue
|
|
video = videos[video_id]
|
|
# if this video is after new video, return this index
|
|
if new_video.start_time < video.start_time:
|
|
return n
|
|
# if we reach here, it means everything on the list was before the new video
|
|
# therefore insert at end
|
|
return len(playlist)
|
|
|
|
def insert_into_playlist(self, playlist, video_id, index):
|
|
"""Insert video into given playlist at given index.
|
|
Makes the API call then also updates our mirrored copy.
|
|
"""
|
|
logging.info("Inserting {} at index {} of {}".format(video_id, index, playlist))
|
|
self.api.insert_into_playlist(playlist, video_id, index)
|
|
# Update our copy
|
|
self.playlist_state.setdefault(playlist, []).insert(index, video_id)
|
|
|
|
|
|
class YoutubeAPI(object):
|
|
def __init__(self, client):
|
|
self.client = client
|
|
|
|
def insert_into_playlist(self, playlist, video_id, index):
|
|
json = {
|
|
"snippet": {
|
|
"playlistId": playlist,
|
|
"resourceId": {
|
|
"kind": "youtube#video",
|
|
"videoId": video_id,
|
|
},
|
|
"position": index,
|
|
},
|
|
}
|
|
resp = self.client.request("POST", "https://www.googleapis.com/youtube/v3/playlistItems",
|
|
params={"part": "snippet"},
|
|
json=json,
|
|
metric_name="playlist_insert",
|
|
)
|
|
if not resp.ok:
|
|
raise Exception("Failed to insert {video_id} at index {index} of {playlist} with {resp.status_code}: {resp.content}".format(
|
|
playlist=playlist, video_id=video_id, index=index, resp=resp,
|
|
))
|
|
|
|
def list_playlist(self, playlist):
|
|
"""Fetches the first page of playlist contents and returns a ListQuery object.
|
|
You can use this object to look up info and optionally retrieve the whole playlist."""
|
|
data = self._list_playlist(playlist)
|
|
return ListQuery(self, playlist, data)
|
|
|
|
def _list_playlist(self, playlist, page_token=None):
|
|
"""Internal method that actually does the list query.
|
|
Returns the full response json."""
|
|
params = {
|
|
"part": "snippet",
|
|
"playlistId": playlist,
|
|
"maxResults": 50,
|
|
}
|
|
if page_token is not None:
|
|
params['pageToken'] = page_token
|
|
resp = self.client.request("GET", "https://www.googleapis.com/youtube/v3/playlistItems",
|
|
params=params,
|
|
metric_name="playlist_list",
|
|
)
|
|
if not resp.ok:
|
|
raise Exception("Failed to list {playlist} (page_token={!r}) with {resp.status_code}: {resp.content}".format(
|
|
playlist=playlist, page_token=page_token, resp=resp,
|
|
))
|
|
return resp.json()
|
|
|
|
|
|
class ListQuery(object):
|
|
"""Represents a partially-fetched list query for all playlist items.
|
|
To save on quota, we avoid fetching the entire playlist until asked."""
|
|
def __init__(self, api, playlist_id, data):
|
|
self.api = api
|
|
self.playlist_id = playlist_id
|
|
self.total_size = data['pageInfo']['totalResults']
|
|
self.is_complete = self.total_size <= data['pageInfo']['resultsPerPage']
|
|
self.items = data['items']
|
|
self.page_token = data.get('nextPageToken')
|
|
|
|
def fetch_all(self):
|
|
if self.is_complete:
|
|
return
|
|
page_token = self.page_token
|
|
while len(self.items) < self.total_size:
|
|
assert page_token is not None, "More items to fetch but no page token"
|
|
data = self.api._list_playlist(self.playlist_id, page_token)
|
|
self.items += data['items']
|
|
page_token = data.get('nextPageToken')
|
|
# I'm just being paranoid here about an infinite loop blowing our quota,
|
|
# let's assert we always are making forward progress
|
|
assert data['items'], "Got no extra items from new page"
|
|
self.is_complete = True
|
|
|
|
|
|
def parse_playlist_arg(arg):
|
|
playlist, tags = arg.split('=', 1)
|
|
tags = tags.split(",") if tags else []
|
|
tags = [tag.lower() for tag in tags]
|
|
return playlist, tags
|
|
|
|
|
|
@argh.arg("playlists", nargs="*", metavar="PLAYLIST={TAG,}", type=parse_playlist_arg, help=
|
|
"Each playlist arg specifies a youtube playlist ID, along with any number of tags. "
|
|
"Events will be added to the playlist if that event has all the tags. For example, "
|
|
"some_playlist_id=Day 1,Technical would populate that playlist with all Technical events "
|
|
"from Day 1. Note that having no tags (ie. 'id=') is allowed and all events will be added to it. "
|
|
"Note playlist ids must be unique (can't specify the same one twice)."
|
|
)
|
|
def main(
|
|
dbconnect,
|
|
creds_file,
|
|
playlists,
|
|
upload_location_allowlist="youtube",
|
|
interval=600,
|
|
metrics_port=8007,
|
|
backdoor_port=0,
|
|
):
|
|
"""
|
|
dbconnect should be a postgres connection string
|
|
|
|
creds_file should contain youtube api creds
|
|
|
|
upload_location_allowlist is a comma-seperated list of database upload locations to
|
|
consider as eligible to being added to playlists. For these locations, the database video id
|
|
must be a youtube video id.
|
|
|
|
interval is how often to check for new videos, default every 10min.
|
|
"""
|
|
common.PromLogCountsHandler.install()
|
|
common.install_stacksampler()
|
|
prom.start_http_server(metrics_port)
|
|
|
|
if backdoor_port:
|
|
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
|
|
|
|
upload_locations = upload_location_allowlist.split(",") if upload_location_allowlist else []
|
|
playlists = dict(playlists)
|
|
|
|
stop = gevent.event.Event()
|
|
gevent.signal_handler(signal.SIGTERM, stop.set) # shut down on sigterm
|
|
|
|
logging.info("Starting up")
|
|
|
|
with open(creds_file) as f:
|
|
creds = json.load(f)
|
|
client = GoogleAPIClient(creds['client_id'], creds['client_secret'], creds['refresh_token'])
|
|
|
|
dbmanager = DBManager(dsn=dbconnect)
|
|
manager = PlaylistManager(dbmanager, client, upload_locations, playlists)
|
|
|
|
while not stop.is_set():
|
|
try:
|
|
manager.run_once()
|
|
except Exception:
|
|
logging.exception("Failed to run playlist manager")
|
|
manager.reset()
|
|
stop.wait(interval) # wait for interval, or until stopping
|
|
|
|
logging.info("Stopped")
|