From fdd245a6d934283d8c1d18394d6dc1a21af3d290 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 10 Jun 2019 04:01:17 -0700 Subject: [PATCH] cutter: Add lightweight youtube client Provides basic youtube api calls, and gets passed into both transcode checker and cutter. The official youtube client library is many orders of magnitude larger and more complicated, and can't actually do what we want (stream an upload of unknown size). --- cutter/cutter/main.py | 31 ++++++++--- cutter/cutter/youtube.py | 111 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 6 deletions(-) create mode 100644 cutter/cutter/youtube.py diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 83262c4..f3fe061 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -1,4 +1,5 @@ +import json import logging import signal @@ -6,10 +7,17 @@ import gevent.backdoor import gevent.event import prometheus_client as prom +import common + +from .youtube import Youtube + class Cutter(object): - def __init__(self, stop): - """Stop is an Event triggering graceful shutdown when set.""" + def __init__(self, youtube, stop): + """youtube is an authenticated and initialized youtube api client. + Stop is an Event triggering graceful shutdown when set. + """ + self.youtube = youtube self.stop = stop self.logger = logging.getLogger(type(self).__name__) @@ -20,10 +28,12 @@ class Cutter(object): class TranscodeChecker(object): - def __init__(self, stop): + def __init__(self, youtube, stop): """ + youtube is an authenticated and initialized youtube api client. Stop is an Event triggering graceful shutdown when set. """ + self.youtube = youtube self.stop = stop self.logger = logging.getLogger(type(self).__name__) @@ -32,7 +42,10 @@ class TranscodeChecker(object): pass -def main(metrics_port=8003, backdoor_port=0): +def main(youtube_creds_file, metrics_port=8003, backdoor_port=0): + """ + youtube_creds_file should be a json file containing keys 'client_id', 'client_secret' and 'refresh_token'. + """ common.PromLogCountsHandler.install() common.install_stacksampler() prom.start_http_server(metrics_port) @@ -48,8 +61,14 @@ def main(metrics_port=8003, backdoor_port=0): # We have two independent jobs to do - to perform cut jobs (cutter), # and to check the status of transcoding videos to see if they're done (transcode checker). # We want to error if either errors, and shut down if either exits. - cutter = Cutter(stop) - transcode_checker = TranscodeChecker(stop) + youtube_creds = json.load(open(youtube_creds_file)) + youtube = Youtube( + client_id=youtube_creds['client_id'], + client_secret=youtube_creds['client_secret'], + refresh_token=youtube_creds['refresh_token'], + ) + cutter = Cutter(youtube, stop) + transcode_checker = TranscodeChecker(youtube, stop) jobs = [ gevent.spawn(cutter.run), gevent.spawn(transcode_checker.run), diff --git a/cutter/cutter/youtube.py b/cutter/cutter/youtube.py new file mode 100644 index 0000000..278822c --- /dev/null +++ b/cutter/cutter/youtube.py @@ -0,0 +1,111 @@ + +import logging +import time + +import gevent +import requests + + +class Youtube(object): + """Manages access to youtube and maintains an active access token""" + + ACCESS_TOKEN_ERROR_RETRY_INTERVAL = 10 + # Refresh token 10min before it expires (it normally lasts an hour) + ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY = 600 + + def __init__(self, client_id, client_secret, refresh_token): + self.logger = logging.getLogger(type(self).__name__) + self.client_id = client_id + self.client_secret = client_secret + self.refresh_token = refresh_token + + self._first_get_access_token = gevent.spawn(self.get_access_token) + + @property + def access_token(self): + """Blocks if access token unavailable yet""" + self._first_get_access_token.join() + return self._access_token + + def get_access_token(self): + """Authenticates against the youtube API and retrieves a token we will use in + subsequent requests. + This function gets called automatically when needed, there should be no need to call it + yourself.""" + while True: + try: + start_time = time.time() + resp = requests.post('https://www.googleapis.com/oauth2/v4/token', data={ + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'refresh_token': self.refresh_token, + 'grant_type': 'refresh_token', + }) + resp.raise_for_status() + data = resp.json() + self._access_token = data['access_token'] + expires_in = (start_time + data['expires_in']) - time.time() + if expires_in < self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY: + self.logger.warning("Access token expires in {}s, less than normal leeway time of {}s".format( + expires_in, self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY, + )) + gevent.spawn_later(expires_in - self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY, self.get_access_token) + except Exception: + self.logger.exception("Failed to fetch access token, retrying") + self.wait(self.ACCESS_TOKEN_ERROR_RETRY_INTERVAL) + else: + break + + def auth_headers(self): + return {'Authorization': 'Bearer {}'.format(self.access_token)} + + def upload_video(self, title, description, tags, data, hidden=False): + """Data may be a string, file-like object or iterator. Returns id.""" + json = { + 'snippet': { + 'title': title, + 'description': description, + 'tags': tags, + }, + } + if hidden: + json['status'] = { + 'privacyStatus': 'unlisted', + } + resp = requests.post( + 'https://www.googleapis.com/upload/youtube/v3/videos', + headers=self.auth_headers(), + params={ + 'part': 'snippet,status' if hidden else 'snippet', + 'uploadType': 'resumable', + }, + json=json, + ) + resp.raise_for_status() + upload_url = resp.headers['Location'] + resp = requests.post(upload_url, headers=self.auth_headers(), data=data) + resp.raise_for_status() + return resp.json()['id'] + + def get_video_status(self, ids): + """For a list of video ids, returns a dict {id: upload status}. + A video is fully processed when upload status is 'processed'. + NOTE: Video ids may be missing from the result, this probably indicates + the video is errored. + """ + output = {} + # Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable. + for i in range(0, len(ids), 10): + group = ids[i:i+10] + resp = requests.get( + 'https://www.googleapis.com/youtube/v3/videos', + headers=self.auth_headers(), + params={ + 'part': 'id,status', + 'id': ','.join(group), + }, + ) + resp.raise_for_status() + for item in resp.json()['items']: + output[item['id']] = item['status']['uploadStatus'] + return output