diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 83262c4..f3fe061 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -1,4 +1,5 @@ +import json import logging import signal @@ -6,10 +7,17 @@ import gevent.backdoor import gevent.event import prometheus_client as prom +import common + +from .youtube import Youtube + class Cutter(object): - def __init__(self, stop): - """Stop is an Event triggering graceful shutdown when set.""" + def __init__(self, youtube, stop): + """youtube is an authenticated and initialized youtube api client. + Stop is an Event triggering graceful shutdown when set. + """ + self.youtube = youtube self.stop = stop self.logger = logging.getLogger(type(self).__name__) @@ -20,10 +28,12 @@ class Cutter(object): class TranscodeChecker(object): - def __init__(self, stop): + def __init__(self, youtube, stop): """ + youtube is an authenticated and initialized youtube api client. Stop is an Event triggering graceful shutdown when set. """ + self.youtube = youtube self.stop = stop self.logger = logging.getLogger(type(self).__name__) @@ -32,7 +42,10 @@ class TranscodeChecker(object): pass -def main(metrics_port=8003, backdoor_port=0): +def main(youtube_creds_file, metrics_port=8003, backdoor_port=0): + """ + youtube_creds_file should be a json file containing keys 'client_id', 'client_secret' and 'refresh_token'. + """ common.PromLogCountsHandler.install() common.install_stacksampler() prom.start_http_server(metrics_port) @@ -48,8 +61,14 @@ def main(metrics_port=8003, backdoor_port=0): # We have two independent jobs to do - to perform cut jobs (cutter), # and to check the status of transcoding videos to see if they're done (transcode checker). # We want to error if either errors, and shut down if either exits. - cutter = Cutter(stop) - transcode_checker = TranscodeChecker(stop) + youtube_creds = json.load(open(youtube_creds_file)) + youtube = Youtube( + client_id=youtube_creds['client_id'], + client_secret=youtube_creds['client_secret'], + refresh_token=youtube_creds['refresh_token'], + ) + cutter = Cutter(youtube, stop) + transcode_checker = TranscodeChecker(youtube, stop) jobs = [ gevent.spawn(cutter.run), gevent.spawn(transcode_checker.run), diff --git a/cutter/cutter/youtube.py b/cutter/cutter/youtube.py new file mode 100644 index 0000000..278822c --- /dev/null +++ b/cutter/cutter/youtube.py @@ -0,0 +1,111 @@ + +import logging +import time + +import gevent +import requests + + +class Youtube(object): + """Manages access to youtube and maintains an active access token""" + + ACCESS_TOKEN_ERROR_RETRY_INTERVAL = 10 + # Refresh token 10min before it expires (it normally lasts an hour) + ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY = 600 + + def __init__(self, client_id, client_secret, refresh_token): + self.logger = logging.getLogger(type(self).__name__) + self.client_id = client_id + self.client_secret = client_secret + self.refresh_token = refresh_token + + self._first_get_access_token = gevent.spawn(self.get_access_token) + + @property + def access_token(self): + """Blocks if access token unavailable yet""" + self._first_get_access_token.join() + return self._access_token + + def get_access_token(self): + """Authenticates against the youtube API and retrieves a token we will use in + subsequent requests. + This function gets called automatically when needed, there should be no need to call it + yourself.""" + while True: + try: + start_time = time.time() + resp = requests.post('https://www.googleapis.com/oauth2/v4/token', data={ + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'refresh_token': self.refresh_token, + 'grant_type': 'refresh_token', + }) + resp.raise_for_status() + data = resp.json() + self._access_token = data['access_token'] + expires_in = (start_time + data['expires_in']) - time.time() + if expires_in < self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY: + self.logger.warning("Access token expires in {}s, less than normal leeway time of {}s".format( + expires_in, self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY, + )) + gevent.spawn_later(expires_in - self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY, self.get_access_token) + except Exception: + self.logger.exception("Failed to fetch access token, retrying") + self.wait(self.ACCESS_TOKEN_ERROR_RETRY_INTERVAL) + else: + break + + def auth_headers(self): + return {'Authorization': 'Bearer {}'.format(self.access_token)} + + def upload_video(self, title, description, tags, data, hidden=False): + """Data may be a string, file-like object or iterator. Returns id.""" + json = { + 'snippet': { + 'title': title, + 'description': description, + 'tags': tags, + }, + } + if hidden: + json['status'] = { + 'privacyStatus': 'unlisted', + } + resp = requests.post( + 'https://www.googleapis.com/upload/youtube/v3/videos', + headers=self.auth_headers(), + params={ + 'part': 'snippet,status' if hidden else 'snippet', + 'uploadType': 'resumable', + }, + json=json, + ) + resp.raise_for_status() + upload_url = resp.headers['Location'] + resp = requests.post(upload_url, headers=self.auth_headers(), data=data) + resp.raise_for_status() + return resp.json()['id'] + + def get_video_status(self, ids): + """For a list of video ids, returns a dict {id: upload status}. + A video is fully processed when upload status is 'processed'. + NOTE: Video ids may be missing from the result, this probably indicates + the video is errored. + """ + output = {} + # Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable. + for i in range(0, len(ids), 10): + group = ids[i:i+10] + resp = requests.get( + 'https://www.googleapis.com/youtube/v3/videos', + headers=self.auth_headers(), + params={ + 'part': 'id,status', + 'id': ','.join(group), + }, + ) + resp.raise_for_status() + for item in resp.json()['items']: + output[item['id']] = item['status']['uploadStatus'] + return output