cutter: Add lightweight youtube client

Provides basic youtube api calls, and gets passed into both transcode checker and cutter.

The official youtube client library is many orders of magnitude larger and more complicated,
and can't actually do what we want (stream an upload of unknown size).
pull/47/head
Mike Lang 6 years ago committed by Mike Lang
parent dfc64481a6
commit fdd245a6d9

@ -1,4 +1,5 @@
import json
import logging
import signal
@ -6,10 +7,17 @@ import gevent.backdoor
import gevent.event
import prometheus_client as prom
import common
from .youtube import Youtube
class Cutter(object):
def __init__(self, stop):
"""Stop is an Event triggering graceful shutdown when set."""
def __init__(self, youtube, stop):
"""youtube is an authenticated and initialized youtube api client.
Stop is an Event triggering graceful shutdown when set.
"""
self.youtube = youtube
self.stop = stop
self.logger = logging.getLogger(type(self).__name__)
@ -20,10 +28,12 @@ class Cutter(object):
class TranscodeChecker(object):
def __init__(self, stop):
def __init__(self, youtube, stop):
"""
youtube is an authenticated and initialized youtube api client.
Stop is an Event triggering graceful shutdown when set.
"""
self.youtube = youtube
self.stop = stop
self.logger = logging.getLogger(type(self).__name__)
@ -32,7 +42,10 @@ class TranscodeChecker(object):
pass
def main(metrics_port=8003, backdoor_port=0):
def main(youtube_creds_file, metrics_port=8003, backdoor_port=0):
"""
youtube_creds_file should be a json file containing keys 'client_id', 'client_secret' and 'refresh_token'.
"""
common.PromLogCountsHandler.install()
common.install_stacksampler()
prom.start_http_server(metrics_port)
@ -48,8 +61,14 @@ def main(metrics_port=8003, backdoor_port=0):
# We have two independent jobs to do - to perform cut jobs (cutter),
# and to check the status of transcoding videos to see if they're done (transcode checker).
# We want to error if either errors, and shut down if either exits.
cutter = Cutter(stop)
transcode_checker = TranscodeChecker(stop)
youtube_creds = json.load(open(youtube_creds_file))
youtube = Youtube(
client_id=youtube_creds['client_id'],
client_secret=youtube_creds['client_secret'],
refresh_token=youtube_creds['refresh_token'],
)
cutter = Cutter(youtube, stop)
transcode_checker = TranscodeChecker(youtube, stop)
jobs = [
gevent.spawn(cutter.run),
gevent.spawn(transcode_checker.run),

@ -0,0 +1,111 @@
import logging
import time
import gevent
import requests
class Youtube(object):
"""Manages access to youtube and maintains an active access token"""
ACCESS_TOKEN_ERROR_RETRY_INTERVAL = 10
# Refresh token 10min before it expires (it normally lasts an hour)
ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY = 600
def __init__(self, client_id, client_secret, refresh_token):
self.logger = logging.getLogger(type(self).__name__)
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self._first_get_access_token = gevent.spawn(self.get_access_token)
@property
def access_token(self):
"""Blocks if access token unavailable yet"""
self._first_get_access_token.join()
return self._access_token
def get_access_token(self):
"""Authenticates against the youtube API and retrieves a token we will use in
subsequent requests.
This function gets called automatically when needed, there should be no need to call it
yourself."""
while True:
try:
start_time = time.time()
resp = requests.post('https://www.googleapis.com/oauth2/v4/token', data={
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token,
'grant_type': 'refresh_token',
})
resp.raise_for_status()
data = resp.json()
self._access_token = data['access_token']
expires_in = (start_time + data['expires_in']) - time.time()
if expires_in < self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY:
self.logger.warning("Access token expires in {}s, less than normal leeway time of {}s".format(
expires_in, self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY,
))
gevent.spawn_later(expires_in - self.ACCESS_TOKEN_REFRESH_TIME_BEFORE_EXPIRY, self.get_access_token)
except Exception:
self.logger.exception("Failed to fetch access token, retrying")
self.wait(self.ACCESS_TOKEN_ERROR_RETRY_INTERVAL)
else:
break
def auth_headers(self):
return {'Authorization': 'Bearer {}'.format(self.access_token)}
def upload_video(self, title, description, tags, data, hidden=False):
"""Data may be a string, file-like object or iterator. Returns id."""
json = {
'snippet': {
'title': title,
'description': description,
'tags': tags,
},
}
if hidden:
json['status'] = {
'privacyStatus': 'unlisted',
}
resp = requests.post(
'https://www.googleapis.com/upload/youtube/v3/videos',
headers=self.auth_headers(),
params={
'part': 'snippet,status' if hidden else 'snippet',
'uploadType': 'resumable',
},
json=json,
)
resp.raise_for_status()
upload_url = resp.headers['Location']
resp = requests.post(upload_url, headers=self.auth_headers(), data=data)
resp.raise_for_status()
return resp.json()['id']
def get_video_status(self, ids):
"""For a list of video ids, returns a dict {id: upload status}.
A video is fully processed when upload status is 'processed'.
NOTE: Video ids may be missing from the result, this probably indicates
the video is errored.
"""
output = {}
# Break up into groups of 10 videos. I'm not sure what the limit is so this is reasonable.
for i in range(0, len(ids), 10):
group = ids[i:i+10]
resp = requests.get(
'https://www.googleapis.com/youtube/v3/videos',
headers=self.auth_headers(),
params={
'part': 'id,status',
'id': ','.join(group),
},
)
resp.raise_for_status()
for item in resp.json()['items']:
output[item['id']] = item['status']['uploadStatus']
return output
Loading…
Cancel
Save