From 010a938835830b85a2e39869bd7ca4239e4a9b45 Mon Sep 17 00:00:00 2001 From: c-basalt <117849907+c-basalt@users.noreply.github.com> Date: Sun, 15 Dec 2024 11:20:45 -0500 Subject: [PATCH] SHTA auth --- yt_dlp/extractor/niconicochannelplus.py | 178 ++++++++++++++++++++++-- 1 file changed, 165 insertions(+), 13 deletions(-) diff --git a/yt_dlp/extractor/niconicochannelplus.py b/yt_dlp/extractor/niconicochannelplus.py index 9bffb18bc0..58860a0016 100644 --- a/yt_dlp/extractor/niconicochannelplus.py +++ b/yt_dlp/extractor/niconicochannelplus.py @@ -1,5 +1,8 @@ +import base64 import functools +import hashlib import json +import random import re import time import urllib.parse @@ -16,17 +19,19 @@ from ..utils import ( traverse_obj, unified_timestamp, url_or_none, + urlencode_postdata, urljoin, ) _SUITABLE_NICOCHANNEL_PLUS_DOMAINS = set() -class NiconicoChannelPlusBaseIE(InfoExtractor): +class NicoChannelCommonBaseIE(InfoExtractor): _SITE_SETTINGS = {} _DOMAIN_SITE_ID = {} - _CHANNEL_NAMES = {} - _CHANNEL_AGE_LIMIT = {} + + def _get_jwt_token(self, url): + pass def _get_settings(self, url, video_id=None): base_url = urljoin(url, '/') @@ -52,20 +57,14 @@ class NiconicoChannelPlusBaseIE(InfoExtractor): 'fc_use_device': 'null', **headers, } - if jwt_args := self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True): - jwt_token = jwt_args[0] - try: - if time.time() > jwt_decode_hs256(jwt_token)['exp']: - self.report_warning('JWT token is expired. Access to video may be denied.') - except Exception: - self.report_warning('Possibly invalid JWT token is provided. Access to video may be denied.') + if jwt_token := self._get_jwt_token(site_url): headers['Authorization'] = f'Bearer {jwt_token}' + data, handle = self._download_json_handle( f'{settings["api_base_url"]}{path}', video_id, headers=headers, expected_status=403, **kwargs) if handle.status == 403: - if not self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True): - raise ExtractorError('Login is required. Use --extractor-args "niconicochannelplus:jwt_token=xxx"' - 'to provide account credentials', expected=True) + if not self._get_jwt_token(site_url): + self.raise_login_required(expected=True) raise ExtractorError('You may have no access to this video') return data @@ -83,6 +82,159 @@ class NiconicoChannelPlusBaseIE(InfoExtractor): query={'current_site_domain': domain_url})['data']['content_providers']['id']) return self._DOMAIN_SITE_ID[domain_url] + +class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): + _AUTH_SETTINGS = {} + _AUTH_TOKENS = {} + _netrc_domain: str + + def _get_auth(self, url) -> dict: + return self._AUTH_TOKENS.get(urljoin(url, '/'), {}) + + def _set_auth(self, url, auth): + self._AUTH_TOKENS[urljoin(url, '/')] = auth + + @property + def has_attempted_login(self): + return getattr(self, '_netrc_domain', None) is not None + + def _login_hint(self, **kwargs): + return super()._login_hint('password', netrc=getattr(self, '_netrc_domain', None)) + + def _get_auth_settings(self, url): + fanclub_site_id = self._get_fanclub_site_id(url) + if fanclub_site_id not in self._AUTH_SETTINGS: + self._AUTH_SETTINGS[fanclub_site_id] = traverse_obj(self._download_api_json( + url, f'/fanclub_sites/{fanclub_site_id}/login', fanclub_site_id, + note='Downloading auth settings'), ('data', 'fanclub_site', { + 'auth0_app_client': ('auth0_app_client_id', {str}), + 'auth0_web_client': ('auth0_web_client_id', {str}), + 'auth0_domain': ('fanclub_group', 'auth0_domain', {str}), + })) + return self._AUTH_SETTINGS[fanclub_site_id] + + def _get_jwt_token(self, url): + if not self.has_attempted_login: + self._perform_sheeta_login(url) + + if access_token := self._get_auth(url).get('access_token'): + if jwt_decode_hs256(access_token)['exp'] < time.time() + 15: + self._refresh_sheeta_token(url) + return self._get_auth(url)['access_token'] + + if jwt_args := self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True): + jwt_token = jwt_args[0] + try: + if time.time() < jwt_decode_hs256(jwt_token)['exp']: + return jwt_token + else: + self.report_warning('JWT token expired, continuing without login.') + except Exception: + self.report_warning('Invalid JWT token, continuing without login.') + + @property + def _auth0_client(self): + return base64.b64encode(json.dumps({ # index.js: btoa(JSON.stringify(s || Aq)) + 'name': 'auth0-spa-js', # index.js: Aq = ... + 'version': '2.0.6', + }, separators=(',', ':')).encode()).decode() + + def _perform_sheeta_login(self, url): + def _random_string(): + return ''.join(random.choices('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_~.', k=43)) + + self._netrc_domain = urllib.parse.urlparse(url).netloc + username, password = self._get_login_info(netrc_machine=self._netrc_domain) + if not username or not password: + return + + auth_settings = self._get_auth_settings(url) + site_settings = self._get_settings(url) + + code_verifier = _random_string() + code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode().rstrip('=') + + preauth_query = { + 'client_id': auth_settings['auth0_web_client'], + 'scope': 'openid profile email offline_access', + 'redirect_uri': urljoin(url, '/login/login-redirect'), + 'audience': urllib.parse.urlparse(site_settings['api_base_url']).hostname, + 'ext-group_id': site_settings['fanclub_group_id'], + 'ext-platform_id': site_settings['platform_id'], + 'ext-terms': urljoin(url, '/terms__content_type___nfc_terms_of_services'), + 'prompt': 'login', + 'response_type': 'code', + 'response_mode': 'query', + 'state': base64.b64encode(_random_string().encode()).decode(), + 'nonce': base64.b64encode(_random_string().encode()).decode(), + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', + 'auth0Client': self._auth0_client, + } + + webpage, handler = self._download_webpage_handle( + f'https://{auth_settings["auth0_domain"]}/authorize', 'preauth', query=preauth_query) + + _, handler = self._download_webpage_handle(handler.url, 'login', data=urlencode_postdata({ + **self._hidden_inputs(webpage), + 'username': username, + 'password': password, + 'action': 'default', + }), headers={ + 'Content-Type': 'application/x-www-form-urlencoded', + 'Origin': urljoin(handler.url, '/').rstrip('/'), + 'Referer': handler.url, + }, expected_status=404) + + data = self._download_json( + f'https://{auth_settings["auth0_domain"]}/oauth/token', 'login-token', + data=urlencode_postdata({ + 'client_id': auth_settings['auth0_web_client'], + 'code_verifier': code_verifier, + 'grant_type': 'authorization_code', + 'code': parse_qs(handler.url)['code'][0], + 'redirect_uri': urljoin(url, '/login/login-redirect'), + }), headers={ + 'Content-Type': 'application/x-www-form-urlencoded', + 'Origin': urljoin(handler.url, '/').rstrip('/'), + 'Referer': handler.url, + }) + + self._set_auth(url, { + 'access_token': data['access_token'], + 'refresh_token': data['refresh_token'], + }) + + def _refresh_sheeta_token(self, url): + auth = self._get_auth(url) + if not auth or not auth.get('refresh_token'): + return + + auth_settings = self._get_auth_settings(url) + data = self._download_json( + f'https://{auth_settings["auth0_domain"]}/oauth/token', 'refresh', + data=urlencode_postdata({ + 'client_id': auth_settings['auth0_web_client'], + 'redirect_uri': urljoin(url, '/login/login-redirect'), + 'grant_type': 'refresh_token', + 'refresh_token': auth['refresh_token'], + }), headers={ + 'Content-Type': 'application/x-www-form-urlencoded', + 'Auth0-Client': self._auth0_client, + 'Origin': urljoin(url, '/').rstrip('/'), + 'Referer': urljoin(url, '/'), + }, note='Refreshing auth token') + + self._set_auth(url, { + 'access_token': data['access_token'], + 'refresh_token': data['refresh_token'], + }) + + +class NiconicoChannelPlusBaseIE(NicoChannelAuthBaseIE): + _CHANNEL_NAMES = {} + _CHANNEL_AGE_LIMIT = {} + def _get_channel_id(self, url): parsed = urllib.parse.urlparse(url) if self._get_settings(url)['platform_id'] == 'SHTA':