From 86f074bd434bd03f22a862790543ef1f50499a6d Mon Sep 17 00:00:00 2001 From: c-basalt <117849907+c-basalt@users.noreply.github.com> Date: Mon, 16 Dec 2024 03:32:24 -0500 Subject: [PATCH] add refresh extractor arg --- yt_dlp/extractor/niconicochannelplus.py | 59 ++++++++++++++++--------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/yt_dlp/extractor/niconicochannelplus.py b/yt_dlp/extractor/niconicochannelplus.py index 5a4ed57a1..e2624b76a 100644 --- a/yt_dlp/extractor/niconicochannelplus.py +++ b/yt_dlp/extractor/niconicochannelplus.py @@ -69,9 +69,13 @@ class NicoChannelCommonBaseIE(InfoExtractor): class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): + _NETRC_MACHINE = False + _AUTH_SETTINGS = {} _AUTH_TOKENS = {} - _NETRC_MACHINE = False + _ARG_REFRESH_USED = False + _REFRESH_TIMEOUT_THRES = 15 + _netrc_domain: str def _get_auth(self, url) -> dict: return self._AUTH_TOKENS.get(urljoin(url, '/'), {}) @@ -79,12 +83,10 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): def _set_auth(self, url, auth): self._AUTH_TOKENS[urljoin(url, '/')] = auth - @property - def has_attempted_login(self): - return getattr(self, '_netrc_domain', None) is not None - def _login_hint(self, *args, **kwargs): - return super()._login_hint('password', netrc=getattr(self, '_netrc_domain', None)) + return (super()._login_hint('password', netrc=getattr(self, '_netrc_domain', None)) + + ', or --extractor-args "niconicochannelplus:jwt_token=xxx" or --extractor-args ' + '"niconicochannelplus:refresh_token=xxx" to directly providing auth token') def _get_auth_settings(self, url): fanclub_site_id = self._get_settings(url)['fanclub_site_id'] @@ -98,20 +100,27 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): return self._AUTH_SETTINGS[fanclub_site_id] def _get_jwt_token(self, url): - if not self.has_attempted_login: + def _load_access_token(): + if access_token := self._get_auth(url).get('access_token'): + if time.time() < jwt_decode_hs256(access_token)['exp'] - self._REFRESH_TIMEOUT_THRES: + return access_token + + def _try_then_load(func, error_msg, *args, **kwargs): try: - self._perform_sheeta_login(url) + func(*args, **kwargs) + return _load_access_token() except Exception as e: - self.report_warning(f'Failed to login, continuing without login: {e}') + self.report_warning(f'{error_msg}: {e}') - if access_token := self._get_auth(url).get('access_token'): - if jwt_decode_hs256(access_token)['exp'] < time.time() + 15: - try: - self._refresh_sheeta_token(url) - except Exception as e: - self.report_warning(f'Failed to refresh token: {e}') - self._perform_sheeta_login(url) - return self._get_auth(url)['access_token'] + if access_token := _load_access_token(): + return access_token + + if access_token := _try_then_load(self._refresh_sheeta_token, 'Failed to refresh token', url): + return access_token + + if not self._has_attempted_login: + if access_token := _try_then_load(self._perform_sheeta_login, 'Failed to login', url): + return access_token if jwt_args := self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True): jwt_token = jwt_args[0] @@ -130,6 +139,10 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): 'version': '2.0.6', }, separators=(',', ':')).encode()).decode() + @property + def _has_attempted_login(self): + return getattr(self, '_netrc_domain', None) is not None + def _perform_sheeta_login(self, url): def _random_string(): return ''.join(random.choices('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_~.', k=43)) @@ -196,9 +209,15 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): 'refresh_token': data['refresh_token'], }) + def _load_args_refresh_token(self): + if self._ARG_REFRESH_USED: + return + if refresh_token_args := self._configuration_arg('refresh_token', ie_key='niconicochannelplus', casesense=True): + self._ARG_REFRESH_USED = True + return refresh_token_args[0] + def _refresh_sheeta_token(self, url): - auth = self._get_auth(url) - if not auth or not auth.get('refresh_token'): + if not (refresh_token := self._get_auth(url).get('refresh_token') or self._load_args_refresh_token()): return auth_settings = self._get_auth_settings(url) @@ -208,7 +227,7 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE): 'client_id': auth_settings['auth0_web_client'], 'redirect_uri': urljoin(url, '/login/login-redirect'), 'grant_type': 'refresh_token', - 'refresh_token': auth['refresh_token'], + 'refresh_token': refresh_token, }), headers={ 'Content-Type': 'application/x-www-form-urlencoded', 'Auth0-Client': self._auth0_client,