refactor: extract common `_is_jwt_token_expired` to `InfoExtractor`

pull/12630/head
Michaël De Boey 5 months ago
parent e67d786c7c
commit 0242090a86
No known key found for this signature in database

@ -11,7 +11,6 @@ from ..utils import (
float_or_none, float_or_none,
int_or_none, int_or_none,
js_to_json, js_to_json,
jwt_decode_hs256,
mimetype2ext, mimetype2ext,
orderedSet, orderedSet,
parse_age_limit, parse_age_limit,
@ -620,9 +619,6 @@ class CBCGemIE(CBCGemBaseIE):
'https://services.radio-canada.ca/ott/catalog/v1/gem/settings', None, 'https://services.radio-canada.ca/ott/catalog/v1/gem/settings', None,
'Downloading site settings', query={'device': 'web'})['identityManagement']['ropc'] 'Downloading site settings', query={'device': 'web'})['identityManagement']['ropc']
def _is_jwt_expired(self, token):
return jwt_decode_hs256(token)['exp'] - time.time() < 300
def _call_oauth_api(self, oauth_data, note='Refreshing access token'): def _call_oauth_api(self, oauth_data, note='Refreshing access token'):
response = self._download_json( response = self._download_json(
self._ropc_settings['url'], None, note, data=urlencode_postdata({ self._ropc_settings['url'], None, note, data=urlencode_postdata({
@ -657,7 +653,7 @@ class CBCGemIE(CBCGemBaseIE):
raise raise
def _fetch_access_token(self): def _fetch_access_token(self):
if self._is_jwt_expired(self._access_token): if self._is_jwt_token_expired(self._access_token):
try: try:
self._call_oauth_api({ self._call_oauth_api({
'grant_type': 'refresh_token', 'grant_type': 'refresh_token',
@ -675,7 +671,7 @@ class CBCGemIE(CBCGemBaseIE):
if not self._get_login_info()[0]: if not self._get_login_info()[0]:
return None return None
if not self._claims_token or self._is_jwt_expired(self._claims_token): if not self._claims_token or self._is_jwt_token_expired(self._claims_token):
self._claims_token = self._download_json( self._claims_token = self._download_json(
'https://services.radio-canada.ca/ott/subscription/v2/gem/Subscriber/profile', 'https://services.radio-canada.ca/ott/subscription/v2/gem/Subscriber/profile',
None, 'Downloading claims token', query={'device': 'web'}, None, 'Downloading claims token', query={'device': 'web'},

@ -69,6 +69,7 @@ from ..utils import (
int_or_none, int_or_none,
join_nonempty, join_nonempty,
js_to_json, js_to_json,
jwt_decode_hs256,
mimetype2ext, mimetype2ext,
netrc_from_content, netrc_from_content,
orderedSet, orderedSet,
@ -994,6 +995,10 @@ class InfoExtractor:
return encoding return encoding
@staticmethod
def _is_jwt_token_expired(token):
return jwt_decode_hs256(token)['exp'] - time.time() < 300
def __check_blocked(self, content): def __check_blocked(self, content):
first_block = content[:512] first_block = content[:512]
if ('<title>Access to this site is blocked</title>' in content if ('<title>Access to this site is blocked</title>' in content

@ -1,5 +1,3 @@
import time
from .common import InfoExtractor from .common import InfoExtractor
from ..networking.exceptions import HTTPError from ..networking.exceptions import HTTPError
from ..utils import ( from ..utils import (
@ -84,16 +82,14 @@ class DigitalConcertHallIE(InfoExtractor):
'User-Agent': _USER_AGENT, 'User-Agent': _USER_AGENT,
} }
_access_token = None _access_token = None
_access_token_expiry = 0
_refresh_token = None _refresh_token = None
@property @property
def _access_token_is_expired(self): def _access_token_is_expired(self):
return self._access_token_expiry - 30 <= int(time.time()) return self._is_jwt_token_expired(self._access_token)
def _set_access_token(self, value): def _set_access_token(self, value):
self._access_token = value self._access_token = value
self._access_token_expiry = traverse_obj(value, ({jwt_decode_hs256}, 'exp', {int})) or 0
def _cache_tokens(self, /): def _cache_tokens(self, /):
self.cache.store(self._NETRC_MACHINE, 'tokens', { self.cache.store(self._NETRC_MACHINE, 'tokens', {

@ -1,7 +1,6 @@
import functools import functools
import hashlib import hashlib
import json import json
import time
import urllib.parse import urllib.parse
from .common import InfoExtractor from .common import InfoExtractor
@ -9,11 +8,9 @@ from ..utils import (
ExtractorError, ExtractorError,
OnDemandPagedList, OnDemandPagedList,
int_or_none, int_or_none,
jwt_decode_hs256,
mimetype2ext, mimetype2ext,
qualities, qualities,
traverse_obj, traverse_obj,
try_call,
unified_timestamp, unified_timestamp,
) )
@ -25,7 +22,7 @@ class IwaraBaseIE(InfoExtractor):
def _is_token_expired(self, token, token_type): def _is_token_expired(self, token, token_type):
# User token TTL == ~3 weeks, Media token TTL == ~1 hour # User token TTL == ~3 weeks, Media token TTL == ~1 hour
if (try_call(lambda: jwt_decode_hs256(token)['exp']) or 0) <= int(time.time() - 120): if self._is_jwt_token_expired(token):
self.to_screen(f'{token_type} token has expired') self.to_screen(f'{token_type} token has expired')
return True return True

@ -4,7 +4,6 @@ import json
import random import random
import re import re
import string import string
import time
from .common import InfoExtractor from .common import InfoExtractor
from ..utils import ( from ..utils import (
@ -106,11 +105,8 @@ class JioCinemaBaseIE(InfoExtractor):
'os': ('os', {str}), 'os': ('os', {str}),
})}, data=data) })}, data=data)
def _is_token_expired(self, token):
return (try_call(lambda: jwt_decode_hs256(token)['exp']) or 0) <= int(time.time() - 180)
def _perform_login(self, username, password): def _perform_login(self, username, password):
if self._ACCESS_TOKEN and not self._is_token_expired(self._ACCESS_TOKEN): if self._ACCESS_TOKEN and not self._is_jwt_token_expired(self._ACCESS_TOKEN):
return return
UUID_RE = r'[\da-f]{8}-(?:[\da-f]{4}-){3}[\da-f]{12}' UUID_RE = r'[\da-f]{8}-(?:[\da-f]{4}-){3}[\da-f]{12}'
@ -185,7 +181,7 @@ class JioCinemaBaseIE(InfoExtractor):
if JioCinemaBaseIE._REFRESH_TOKEN: if JioCinemaBaseIE._REFRESH_TOKEN:
self._cache_token('access') self._cache_token('access')
self.to_screen(f'Logging in as device ID "{JioCinemaBaseIE._DEVICE_ID}"') self.to_screen(f'Logging in as device ID "{JioCinemaBaseIE._DEVICE_ID}"')
if self._is_token_expired(JioCinemaBaseIE._ACCESS_TOKEN): if self._is_jwt_token_expired(JioCinemaBaseIE._ACCESS_TOKEN):
self._refresh_token() self._refresh_token()
@ -250,9 +246,9 @@ class JioCinemaIE(JioCinemaBaseIE):
def _real_extract(self, url): def _real_extract(self, url):
video_id = self._match_id(url) video_id = self._match_id(url)
if not self._ACCESS_TOKEN and self._is_token_expired(self._GUEST_TOKEN): if not self._ACCESS_TOKEN and self._is_jwt_token_expired(self._GUEST_TOKEN):
self._fetch_guest_token() self._fetch_guest_token()
elif self._ACCESS_TOKEN and self._is_token_expired(self._ACCESS_TOKEN): elif self._ACCESS_TOKEN and self._is_jwt_token_expired(self._ACCESS_TOKEN):
self._refresh_token() self._refresh_token()
playback = self._call_api( playback = self._call_api(

@ -1,6 +1,5 @@
import json import json
import re import re
import time
import uuid import uuid
from .common import InfoExtractor from .common import InfoExtractor
@ -10,7 +9,6 @@ from ..utils import (
determine_ext, determine_ext,
int_or_none, int_or_none,
join_nonempty, join_nonempty,
jwt_decode_hs256,
parse_duration, parse_duration,
parse_iso8601, parse_iso8601,
try_get, try_get,
@ -350,11 +348,10 @@ mutation initPlaybackSession(
_device_id = None _device_id = None
_session_id = None _session_id = None
_access_token = None _access_token = None
_token_expiry = 0
@property @property
def _api_headers(self): def _api_headers(self):
if (self._token_expiry - 120) <= time.time(): if self._is_jwt_token_expired(self._access_token):
self.write_debug('Access token has expired; re-logging in') self.write_debug('Access token has expired; re-logging in')
self._perform_login(*self._get_login_info()) self._perform_login(*self._get_login_info())
return {'Authorization': f'Bearer {self._access_token}'} return {'Authorization': f'Bearer {self._access_token}'}
@ -392,7 +389,6 @@ mutation initPlaybackSession(
raise ExtractorError('Invalid username or password', expected=True) raise ExtractorError('Invalid username or password', expected=True)
raise raise
self._token_expiry = traverse_obj(self._access_token, ({jwt_decode_hs256}, 'exp', {int})) or 0
self._set_device_id(username) self._set_device_id(username)
self._session_id = self._call_api({ self._session_id = self._call_api({

@ -1,11 +1,9 @@
import json import json
import time
from .common import InfoExtractor from .common import InfoExtractor
from ..utils import ( from ..utils import (
ExtractorError, ExtractorError,
int_or_none, int_or_none,
jwt_decode_hs256,
str_or_none, str_or_none,
traverse_obj, traverse_obj,
try_call, try_call,
@ -116,7 +114,7 @@ class QDanceIE(InfoExtractor):
self.raise_login_required() self.raise_login_required()
def _get_auth(self): def _get_auth(self):
if (try_call(lambda: jwt_decode_hs256(self._access_token)['exp']) or 0) <= int(time.time() - 120): if self._is_jwt_token_expired(self._access_token):
if not self._refresh_token: if not self._refresh_token:
raise ExtractorError( raise ExtractorError(
'Cannot refresh access token, login with yt-dlp or refresh cookies in browser') 'Cannot refresh access token, login with yt-dlp or refresh cookies in browser')

@ -1,5 +1,3 @@
import time
from .wrestleuniverse import WrestleUniverseBaseIE from .wrestleuniverse import WrestleUniverseBaseIE
from ..utils import ( from ..utils import (
int_or_none, int_or_none,
@ -22,7 +20,7 @@ class StacommuBaseIE(WrestleUniverseBaseIE):
@WrestleUniverseBaseIE._TOKEN.getter @WrestleUniverseBaseIE._TOKEN.getter
def _TOKEN(self): def _TOKEN(self):
if self._REAL_TOKEN and self._TOKEN_EXPIRY <= int(time.time()): if self._REAL_TOKEN and self._is_jwt_token_expired(self._REAL_TOKEN):
self._refresh_token() self._refresh_token()
return self._REAL_TOKEN return self._REAL_TOKEN

@ -13,7 +13,6 @@ from ..utils import (
get_element_by_class, get_element_by_class,
get_element_html_by_class, get_element_html_by_class,
int_or_none, int_or_none,
jwt_decode_hs256,
jwt_encode_hs256, jwt_encode_hs256,
make_archive_id, make_archive_id,
merge_dicts, merge_dicts,
@ -316,10 +315,6 @@ class VrtNUIE(VRTBaseIE):
# Refresh token cookie is scoped to /vrtmax/sso, others are scoped to / # Refresh token cookie is scoped to /vrtmax/sso, others are scoped to /
return try_call(lambda: self._get_cookies('https://www.vrt.be/vrtmax/sso')[cookie_name].value) return try_call(lambda: self._get_cookies('https://www.vrt.be/vrtmax/sso')[cookie_name].value)
@staticmethod
def _is_jwt_token_expired(token):
return jwt_decode_hs256(token)['exp'] - time.time() < 300
def _perform_login(self, username, password): def _perform_login(self, username, password):
refresh_token = self._get_vrt_cookie(self._REFRESH_TOKEN_COOKIE_NAME) refresh_token = self._get_vrt_cookie(self._REFRESH_TOKEN_COOKIE_NAME)
if refresh_token and not self._is_jwt_token_expired(refresh_token): if refresh_token and not self._is_jwt_token_expired(refresh_token):

@ -1,7 +1,6 @@
import base64 import base64
import binascii import binascii
import json import json
import time
import uuid import uuid
from .common import InfoExtractor from .common import InfoExtractor
@ -9,7 +8,6 @@ from ..dependencies import Cryptodome
from ..utils import ( from ..utils import (
ExtractorError, ExtractorError,
int_or_none, int_or_none,
jwt_decode_hs256,
traverse_obj, traverse_obj,
try_call, try_call,
url_basename, url_basename,
@ -25,7 +23,6 @@ class WrestleUniverseBaseIE(InfoExtractor):
_API_HOST = 'api.wrestle-universe.com' _API_HOST = 'api.wrestle-universe.com'
_API_PATH = None _API_PATH = None
_REAL_TOKEN = None _REAL_TOKEN = None
_TOKEN_EXPIRY = None
_REFRESH_TOKEN = None _REFRESH_TOKEN = None
_DEVICE_ID = None _DEVICE_ID = None
_LOGIN_QUERY = {'key': 'AIzaSyCaRPBsDQYVDUWWBXjsTrHESi2r_F3RAdA'} _LOGIN_QUERY = {'key': 'AIzaSyCaRPBsDQYVDUWWBXjsTrHESi2r_F3RAdA'}
@ -40,13 +37,13 @@ class WrestleUniverseBaseIE(InfoExtractor):
@property @property
def _TOKEN(self): def _TOKEN(self):
if not self._REAL_TOKEN or not self._TOKEN_EXPIRY: if not self._REAL_TOKEN:
token = try_call(lambda: self._get_cookies('https://www.wrestle-universe.com/')['token'].value) token = try_call(lambda: self._get_cookies('https://www.wrestle-universe.com/')['token'].value)
if not token and not self._REFRESH_TOKEN: if not token and not self._REFRESH_TOKEN:
self.raise_login_required() self.raise_login_required()
self._TOKEN = token self._TOKEN = token
if not self._REAL_TOKEN or self._TOKEN_EXPIRY <= int(time.time()): if not self._REAL_TOKEN or self._is_jwt_token_expired(self._REAL_TOKEN):
if not self._REFRESH_TOKEN: if not self._REFRESH_TOKEN:
raise ExtractorError( raise ExtractorError(
'Expired token. Refresh your cookies in browser and try again', expected=True) 'Expired token. Refresh your cookies in browser and try again', expected=True)
@ -58,11 +55,6 @@ class WrestleUniverseBaseIE(InfoExtractor):
def _TOKEN(self, value): def _TOKEN(self, value):
self._REAL_TOKEN = value self._REAL_TOKEN = value
expiry = traverse_obj(value, ({jwt_decode_hs256}, 'exp', {int_or_none}))
if not expiry:
raise ExtractorError('There was a problem with the auth token')
self._TOKEN_EXPIRY = expiry
def _perform_login(self, username, password): def _perform_login(self, username, password):
login = self._download_json( login = self._download_json(
'https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword', None, 'https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword', None,

@ -1,5 +1,4 @@
import json import json
import time
import uuid import uuid
from .common import InfoExtractor from .common import InfoExtractor
@ -124,10 +123,9 @@ class Zee5IE(InfoExtractor):
else: else:
raise ExtractorError(self._LOGIN_HINT, expected=True) raise ExtractorError(self._LOGIN_HINT, expected=True)
token = jwt_decode_hs256(self._USER_TOKEN) if self._is_jwt_token_expired(self._USER_TOKEN):
if token.get('exp', 0) <= int(time.time()):
raise ExtractorError('User token has expired', expected=True) raise ExtractorError('User token has expired', expected=True)
self._USER_COUNTRY = token.get('current_country') self._USER_COUNTRY = jwt_decode_hs256(self._USER_TOKEN).get('current_country')
def _real_extract(self, url): def _real_extract(self, url):
video_id, display_id = self._match_valid_url(url).group('id', 'display_id') video_id, display_id = self._match_valid_url(url).group('id', 'display_id')

Loading…
Cancel
Save