|
|
|
@ -1,5 +1,8 @@
|
|
|
|
|
import base64
|
|
|
|
|
import functools
|
|
|
|
|
import hashlib
|
|
|
|
|
import json
|
|
|
|
|
import random
|
|
|
|
|
import re
|
|
|
|
|
import time
|
|
|
|
|
import urllib.parse
|
|
|
|
@ -16,17 +19,19 @@ from ..utils import (
|
|
|
|
|
traverse_obj,
|
|
|
|
|
unified_timestamp,
|
|
|
|
|
url_or_none,
|
|
|
|
|
urlencode_postdata,
|
|
|
|
|
urljoin,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
_SUITABLE_NICOCHANNEL_PLUS_DOMAINS = set()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NiconicoChannelPlusBaseIE(InfoExtractor):
|
|
|
|
|
class NicoChannelCommonBaseIE(InfoExtractor):
|
|
|
|
|
_SITE_SETTINGS = {}
|
|
|
|
|
_DOMAIN_SITE_ID = {}
|
|
|
|
|
_CHANNEL_NAMES = {}
|
|
|
|
|
_CHANNEL_AGE_LIMIT = {}
|
|
|
|
|
|
|
|
|
|
def _get_jwt_token(self, url):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def _get_settings(self, url, video_id=None):
|
|
|
|
|
base_url = urljoin(url, '/')
|
|
|
|
@ -52,20 +57,14 @@ class NiconicoChannelPlusBaseIE(InfoExtractor):
|
|
|
|
|
'fc_use_device': 'null',
|
|
|
|
|
**headers,
|
|
|
|
|
}
|
|
|
|
|
if jwt_args := self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True):
|
|
|
|
|
jwt_token = jwt_args[0]
|
|
|
|
|
try:
|
|
|
|
|
if time.time() > jwt_decode_hs256(jwt_token)['exp']:
|
|
|
|
|
self.report_warning('JWT token is expired. Access to video may be denied.')
|
|
|
|
|
except Exception:
|
|
|
|
|
self.report_warning('Possibly invalid JWT token is provided. Access to video may be denied.')
|
|
|
|
|
if jwt_token := self._get_jwt_token(site_url):
|
|
|
|
|
headers['Authorization'] = f'Bearer {jwt_token}'
|
|
|
|
|
|
|
|
|
|
data, handle = self._download_json_handle(
|
|
|
|
|
f'{settings["api_base_url"]}{path}', video_id, headers=headers, expected_status=403, **kwargs)
|
|
|
|
|
if handle.status == 403:
|
|
|
|
|
if not self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True):
|
|
|
|
|
raise ExtractorError('Login is required. Use --extractor-args "niconicochannelplus:jwt_token=xxx"'
|
|
|
|
|
'to provide account credentials', expected=True)
|
|
|
|
|
if not self._get_jwt_token(site_url):
|
|
|
|
|
self.raise_login_required(expected=True)
|
|
|
|
|
raise ExtractorError('You may have no access to this video')
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
@ -83,6 +82,159 @@ class NiconicoChannelPlusBaseIE(InfoExtractor):
|
|
|
|
|
query={'current_site_domain': domain_url})['data']['content_providers']['id'])
|
|
|
|
|
return self._DOMAIN_SITE_ID[domain_url]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE):
|
|
|
|
|
_AUTH_SETTINGS = {}
|
|
|
|
|
_AUTH_TOKENS = {}
|
|
|
|
|
_netrc_domain: str
|
|
|
|
|
|
|
|
|
|
def _get_auth(self, url) -> dict:
|
|
|
|
|
return self._AUTH_TOKENS.get(urljoin(url, '/'), {})
|
|
|
|
|
|
|
|
|
|
def _set_auth(self, url, auth):
|
|
|
|
|
self._AUTH_TOKENS[urljoin(url, '/')] = auth
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def has_attempted_login(self):
|
|
|
|
|
return getattr(self, '_netrc_domain', None) is not None
|
|
|
|
|
|
|
|
|
|
def _login_hint(self, **kwargs):
|
|
|
|
|
return super()._login_hint('password', netrc=getattr(self, '_netrc_domain', None))
|
|
|
|
|
|
|
|
|
|
def _get_auth_settings(self, url):
|
|
|
|
|
fanclub_site_id = self._get_fanclub_site_id(url)
|
|
|
|
|
if fanclub_site_id not in self._AUTH_SETTINGS:
|
|
|
|
|
self._AUTH_SETTINGS[fanclub_site_id] = traverse_obj(self._download_api_json(
|
|
|
|
|
url, f'/fanclub_sites/{fanclub_site_id}/login', fanclub_site_id,
|
|
|
|
|
note='Downloading auth settings'), ('data', 'fanclub_site', {
|
|
|
|
|
'auth0_app_client': ('auth0_app_client_id', {str}),
|
|
|
|
|
'auth0_web_client': ('auth0_web_client_id', {str}),
|
|
|
|
|
'auth0_domain': ('fanclub_group', 'auth0_domain', {str}),
|
|
|
|
|
}))
|
|
|
|
|
return self._AUTH_SETTINGS[fanclub_site_id]
|
|
|
|
|
|
|
|
|
|
def _get_jwt_token(self, url):
|
|
|
|
|
if not self.has_attempted_login:
|
|
|
|
|
self._perform_sheeta_login(url)
|
|
|
|
|
|
|
|
|
|
if access_token := self._get_auth(url).get('access_token'):
|
|
|
|
|
if jwt_decode_hs256(access_token)['exp'] < time.time() + 15:
|
|
|
|
|
self._refresh_sheeta_token(url)
|
|
|
|
|
return self._get_auth(url)['access_token']
|
|
|
|
|
|
|
|
|
|
if jwt_args := self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True):
|
|
|
|
|
jwt_token = jwt_args[0]
|
|
|
|
|
try:
|
|
|
|
|
if time.time() < jwt_decode_hs256(jwt_token)['exp']:
|
|
|
|
|
return jwt_token
|
|
|
|
|
else:
|
|
|
|
|
self.report_warning('JWT token expired, continuing without login.')
|
|
|
|
|
except Exception:
|
|
|
|
|
self.report_warning('Invalid JWT token, continuing without login.')
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def _auth0_client(self):
|
|
|
|
|
return base64.b64encode(json.dumps({ # index.js: btoa(JSON.stringify(s || Aq))
|
|
|
|
|
'name': 'auth0-spa-js', # index.js: Aq = ...
|
|
|
|
|
'version': '2.0.6',
|
|
|
|
|
}, separators=(',', ':')).encode()).decode()
|
|
|
|
|
|
|
|
|
|
def _perform_sheeta_login(self, url):
|
|
|
|
|
def _random_string():
|
|
|
|
|
return ''.join(random.choices('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_~.', k=43))
|
|
|
|
|
|
|
|
|
|
self._netrc_domain = urllib.parse.urlparse(url).netloc
|
|
|
|
|
username, password = self._get_login_info(netrc_machine=self._netrc_domain)
|
|
|
|
|
if not username or not password:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
auth_settings = self._get_auth_settings(url)
|
|
|
|
|
site_settings = self._get_settings(url)
|
|
|
|
|
|
|
|
|
|
code_verifier = _random_string()
|
|
|
|
|
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode().rstrip('=')
|
|
|
|
|
|
|
|
|
|
preauth_query = {
|
|
|
|
|
'client_id': auth_settings['auth0_web_client'],
|
|
|
|
|
'scope': 'openid profile email offline_access',
|
|
|
|
|
'redirect_uri': urljoin(url, '/login/login-redirect'),
|
|
|
|
|
'audience': urllib.parse.urlparse(site_settings['api_base_url']).hostname,
|
|
|
|
|
'ext-group_id': site_settings['fanclub_group_id'],
|
|
|
|
|
'ext-platform_id': site_settings['platform_id'],
|
|
|
|
|
'ext-terms': urljoin(url, '/terms__content_type___nfc_terms_of_services'),
|
|
|
|
|
'prompt': 'login',
|
|
|
|
|
'response_type': 'code',
|
|
|
|
|
'response_mode': 'query',
|
|
|
|
|
'state': base64.b64encode(_random_string().encode()).decode(),
|
|
|
|
|
'nonce': base64.b64encode(_random_string().encode()).decode(),
|
|
|
|
|
'code_challenge': code_challenge,
|
|
|
|
|
'code_challenge_method': 'S256',
|
|
|
|
|
'auth0Client': self._auth0_client,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
webpage, handler = self._download_webpage_handle(
|
|
|
|
|
f'https://{auth_settings["auth0_domain"]}/authorize', 'preauth', query=preauth_query)
|
|
|
|
|
|
|
|
|
|
_, handler = self._download_webpage_handle(handler.url, 'login', data=urlencode_postdata({
|
|
|
|
|
**self._hidden_inputs(webpage),
|
|
|
|
|
'username': username,
|
|
|
|
|
'password': password,
|
|
|
|
|
'action': 'default',
|
|
|
|
|
}), headers={
|
|
|
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
|
|
|
'Origin': urljoin(handler.url, '/').rstrip('/'),
|
|
|
|
|
'Referer': handler.url,
|
|
|
|
|
}, expected_status=404)
|
|
|
|
|
|
|
|
|
|
data = self._download_json(
|
|
|
|
|
f'https://{auth_settings["auth0_domain"]}/oauth/token', 'login-token',
|
|
|
|
|
data=urlencode_postdata({
|
|
|
|
|
'client_id': auth_settings['auth0_web_client'],
|
|
|
|
|
'code_verifier': code_verifier,
|
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
|
'code': parse_qs(handler.url)['code'][0],
|
|
|
|
|
'redirect_uri': urljoin(url, '/login/login-redirect'),
|
|
|
|
|
}), headers={
|
|
|
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
|
|
|
'Origin': urljoin(handler.url, '/').rstrip('/'),
|
|
|
|
|
'Referer': handler.url,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
self._set_auth(url, {
|
|
|
|
|
'access_token': data['access_token'],
|
|
|
|
|
'refresh_token': data['refresh_token'],
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
def _refresh_sheeta_token(self, url):
|
|
|
|
|
auth = self._get_auth(url)
|
|
|
|
|
if not auth or not auth.get('refresh_token'):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
auth_settings = self._get_auth_settings(url)
|
|
|
|
|
data = self._download_json(
|
|
|
|
|
f'https://{auth_settings["auth0_domain"]}/oauth/token', 'refresh',
|
|
|
|
|
data=urlencode_postdata({
|
|
|
|
|
'client_id': auth_settings['auth0_web_client'],
|
|
|
|
|
'redirect_uri': urljoin(url, '/login/login-redirect'),
|
|
|
|
|
'grant_type': 'refresh_token',
|
|
|
|
|
'refresh_token': auth['refresh_token'],
|
|
|
|
|
}), headers={
|
|
|
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
|
|
|
'Auth0-Client': self._auth0_client,
|
|
|
|
|
'Origin': urljoin(url, '/').rstrip('/'),
|
|
|
|
|
'Referer': urljoin(url, '/'),
|
|
|
|
|
}, note='Refreshing auth token')
|
|
|
|
|
|
|
|
|
|
self._set_auth(url, {
|
|
|
|
|
'access_token': data['access_token'],
|
|
|
|
|
'refresh_token': data['refresh_token'],
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NiconicoChannelPlusBaseIE(NicoChannelAuthBaseIE):
|
|
|
|
|
_CHANNEL_NAMES = {}
|
|
|
|
|
_CHANNEL_AGE_LIMIT = {}
|
|
|
|
|
|
|
|
|
|
def _get_channel_id(self, url):
|
|
|
|
|
parsed = urllib.parse.urlparse(url)
|
|
|
|
|
if self._get_settings(url)['platform_id'] == 'SHTA':
|
|
|
|
|