add refresh extractor arg

pull/11787/head
c-basalt 1 month ago
parent b605978fcf
commit 86f074bd43

@ -69,9 +69,13 @@ class NicoChannelCommonBaseIE(InfoExtractor):
class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE):
_NETRC_MACHINE = False
_AUTH_SETTINGS = {}
_AUTH_TOKENS = {}
_NETRC_MACHINE = False
_ARG_REFRESH_USED = False
_REFRESH_TIMEOUT_THRES = 15
_netrc_domain: str
def _get_auth(self, url) -> dict:
return self._AUTH_TOKENS.get(urljoin(url, '/'), {})
@ -79,12 +83,10 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE):
def _set_auth(self, url, auth):
self._AUTH_TOKENS[urljoin(url, '/')] = auth
@property
def has_attempted_login(self):
return getattr(self, '_netrc_domain', None) is not None
def _login_hint(self, *args, **kwargs):
return super()._login_hint('password', netrc=getattr(self, '_netrc_domain', None))
return (super()._login_hint('password', netrc=getattr(self, '_netrc_domain', None))
+ ', or --extractor-args "niconicochannelplus:jwt_token=xxx" or --extractor-args '
'"niconicochannelplus:refresh_token=xxx" to directly providing auth token')
def _get_auth_settings(self, url):
fanclub_site_id = self._get_settings(url)['fanclub_site_id']
@ -98,20 +100,27 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE):
return self._AUTH_SETTINGS[fanclub_site_id]
def _get_jwt_token(self, url):
if not self.has_attempted_login:
try:
self._perform_sheeta_login(url)
except Exception as e:
self.report_warning(f'Failed to login, continuing without login: {e}')
def _load_access_token():
if access_token := self._get_auth(url).get('access_token'):
if jwt_decode_hs256(access_token)['exp'] < time.time() + 15:
if time.time() < jwt_decode_hs256(access_token)['exp'] - self._REFRESH_TIMEOUT_THRES:
return access_token
def _try_then_load(func, error_msg, *args, **kwargs):
try:
self._refresh_sheeta_token(url)
func(*args, **kwargs)
return _load_access_token()
except Exception as e:
self.report_warning(f'Failed to refresh token: {e}')
self._perform_sheeta_login(url)
return self._get_auth(url)['access_token']
self.report_warning(f'{error_msg}: {e}')
if access_token := _load_access_token():
return access_token
if access_token := _try_then_load(self._refresh_sheeta_token, 'Failed to refresh token', url):
return access_token
if not self._has_attempted_login:
if access_token := _try_then_load(self._perform_sheeta_login, 'Failed to login', url):
return access_token
if jwt_args := self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True):
jwt_token = jwt_args[0]
@ -130,6 +139,10 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE):
'version': '2.0.6',
}, separators=(',', ':')).encode()).decode()
@property
def _has_attempted_login(self):
return getattr(self, '_netrc_domain', None) is not None
def _perform_sheeta_login(self, url):
def _random_string():
return ''.join(random.choices('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_~.', k=43))
@ -196,9 +209,15 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE):
'refresh_token': data['refresh_token'],
})
def _load_args_refresh_token(self):
if self._ARG_REFRESH_USED:
return
if refresh_token_args := self._configuration_arg('refresh_token', ie_key='niconicochannelplus', casesense=True):
self._ARG_REFRESH_USED = True
return refresh_token_args[0]
def _refresh_sheeta_token(self, url):
auth = self._get_auth(url)
if not auth or not auth.get('refresh_token'):
if not (refresh_token := self._get_auth(url).get('refresh_token') or self._load_args_refresh_token()):
return
auth_settings = self._get_auth_settings(url)
@ -208,7 +227,7 @@ class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE):
'client_id': auth_settings['auth0_web_client'],
'redirect_uri': urljoin(url, '/login/login-redirect'),
'grant_type': 'refresh_token',
'refresh_token': auth['refresh_token'],
'refresh_token': refresh_token,
}), headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Auth0-Client': self._auth0_client,

Loading…
Cancel
Save