From cf44d850ac7bdca6fc39f3d7fc414a89a2ed22d9 Mon Sep 17 00:00:00 2001 From: c-basalt <117849907+c-basalt@users.noreply.github.com> Date: Sun, 15 Dec 2024 12:14:22 -0500 Subject: [PATCH] fix --- yt_dlp/extractor/niconicochannelplus.py | 55 ++++++++++++++----------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/yt_dlp/extractor/niconicochannelplus.py b/yt_dlp/extractor/niconicochannelplus.py index 58860a001..4dae99a01 100644 --- a/yt_dlp/extractor/niconicochannelplus.py +++ b/yt_dlp/extractor/niconicochannelplus.py @@ -28,7 +28,6 @@ _SUITABLE_NICOCHANNEL_PLUS_DOMAINS = set() class NicoChannelCommonBaseIE(InfoExtractor): _SITE_SETTINGS = {} - _DOMAIN_SITE_ID = {} def _get_jwt_token(self, url): pass @@ -64,30 +63,19 @@ class NicoChannelCommonBaseIE(InfoExtractor): f'{settings["api_base_url"]}{path}', video_id, headers=headers, expected_status=403, **kwargs) if handle.status == 403: if not self._get_jwt_token(site_url): - self.raise_login_required(expected=True) + self.raise_login_required() raise ExtractorError('You may have no access to this video') return data - def _get_fanclub_site_id(self, url): - settings = self._get_settings(url) - if settings['platform_id'] == 'SHTA': - return str(settings['fanclub_site_id']) - else: - parsed = urllib.parse.urlparse(url) - # parsed.path starts with '/', so index 0 is empty string - domain_url = f'{parsed.scheme}://{parsed.netloc}/{parsed.path.split("/")[1].lower()}' - if domain_url not in self._DOMAIN_SITE_ID: - self._DOMAIN_SITE_ID[domain_url] = str(self._download_api_json( - url, '/content_providers/channel_domain', domain_url, - query={'current_site_domain': domain_url})['data']['content_providers']['id']) - return self._DOMAIN_SITE_ID[domain_url] - class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): _AUTH_SETTINGS = {} _AUTH_TOKENS = {} _netrc_domain: str + def supports_login(self): + return True + def _get_auth(self, url) -> dict: return self._AUTH_TOKENS.get(urljoin(url, '/'), {}) @@ -98,16 +86,15 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): def has_attempted_login(self): return getattr(self, '_netrc_domain', None) is not None - def _login_hint(self, **kwargs): + def _login_hint(self, *args, **kwargs): return super()._login_hint('password', netrc=getattr(self, '_netrc_domain', None)) def _get_auth_settings(self, url): - fanclub_site_id = self._get_fanclub_site_id(url) + fanclub_site_id = self._get_settings(url)['fanclub_site_id'] if fanclub_site_id not in self._AUTH_SETTINGS: self._AUTH_SETTINGS[fanclub_site_id] = traverse_obj(self._download_api_json( url, f'/fanclub_sites/{fanclub_site_id}/login', fanclub_site_id, note='Downloading auth settings'), ('data', 'fanclub_site', { - 'auth0_app_client': ('auth0_app_client_id', {str}), 'auth0_web_client': ('auth0_web_client_id', {str}), 'auth0_domain': ('fanclub_group', 'auth0_domain', {str}), })) @@ -115,11 +102,18 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): def _get_jwt_token(self, url): if not self.has_attempted_login: - self._perform_sheeta_login(url) + try: + self._perform_sheeta_login(url) + except Exception as e: + self.report_warning(f'Failed to login, continuing without login: {e}') if access_token := self._get_auth(url).get('access_token'): if jwt_decode_hs256(access_token)['exp'] < time.time() + 15: - self._refresh_sheeta_token(url) + try: + self._refresh_sheeta_token(url) + except Exception as e: + self.report_warning(f'Failed to refresh token: {e}') + self._perform_sheeta_login(url) return self._get_auth(url)['access_token'] if jwt_args := self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True): @@ -172,11 +166,11 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): 'auth0Client': self._auth0_client, } - webpage, handler = self._download_webpage_handle( + _, handler = self._download_webpage_handle( f'https://{auth_settings["auth0_domain"]}/authorize', 'preauth', query=preauth_query) _, handler = self._download_webpage_handle(handler.url, 'login', data=urlencode_postdata({ - **self._hidden_inputs(webpage), + 'state': parse_qs(handler.url)['state'][0], 'username': username, 'password': password, 'action': 'default', @@ -234,6 +228,7 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): class NiconicoChannelPlusBaseIE(NicoChannelAuthBaseIE): _CHANNEL_NAMES = {} _CHANNEL_AGE_LIMIT = {} + _DOMAIN_SITE_ID = {} def _get_channel_id(self, url): parsed = urllib.parse.urlparse(url) @@ -244,6 +239,20 @@ class NiconicoChannelPlusBaseIE(NicoChannelAuthBaseIE): else: return f'{parsed.hostname.replace(".", "_")}_{parsed.path.split("/")[1]}' + def _get_fanclub_site_id(self, url): + settings = self._get_settings(url) + if settings['platform_id'] == 'SHTA': + return str(settings['fanclub_site_id']) + else: + parsed = urllib.parse.urlparse(url) + # parsed.path starts with '/', so index 0 is empty string + domain_url = f'{parsed.scheme}://{parsed.netloc}/{parsed.path.split("/")[1].lower()}' + if domain_url not in self._DOMAIN_SITE_ID: + self._DOMAIN_SITE_ID[domain_url] = str(self._download_api_json( + url, '/content_providers/channel_domain', domain_url, + query={'current_site_domain': domain_url})['data']['content_providers']['id']) + return self._DOMAIN_SITE_ID[domain_url] + def _get_channel_url(self, url): parsed = urllib.parse.urlparse(url) if self._get_settings(url)['platform_id'] == 'SHTA':