[youtube] Improve signature caching

and refactor related functions
pull/4700/head
pukkandan 2 years ago
parent 2f1a299c50
commit 580ce00782
No known key found for this signature in database
GPG Key ID: 7EEE9E1E817D0A39

@ -2512,20 +2512,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
assert os.path.basename(func_id) == func_id assert os.path.basename(func_id) == func_id
self.write_debug(f'Extracting signature function {func_id}') self.write_debug(f'Extracting signature function {func_id}')
cache_spec = self.cache.load('youtube-sigfuncs', func_id) cache_spec, code = self.cache.load('youtube-sigfuncs', func_id), None
if cache_spec is not None:
return lambda s: ''.join(s[i] for i in cache_spec)
code = self._load_player(video_id, player_url) if not cache_spec:
code = self._load_player(video_id, player_url)
if code: if code:
res = self._parse_sig_js(code) res = self._parse_sig_js(code)
test_string = ''.join(map(chr, range(len(example_sig)))) test_string = ''.join(map(chr, range(len(example_sig))))
cache_res = res(test_string) cache_spec = [ord(c) for c in res(test_string)]
cache_spec = [ord(c) for c in cache_res]
self.cache.store('youtube-sigfuncs', func_id, cache_spec) self.cache.store('youtube-sigfuncs', func_id, cache_spec)
return res
return lambda s: ''.join(s[i] for i in cache_spec)
def _print_sig_code(self, func, example_sig): def _print_sig_code(self, func, example_sig):
if not self.get_param('youtube_print_sig_code'): if not self.get_param('youtube_print_sig_code'):
@ -2593,18 +2590,29 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
initial_function = jsi.extract_function(funcname) initial_function = jsi.extract_function(funcname)
return lambda s: initial_function([s]) return lambda s: initial_function([s])
def _cached(self, func, *cache_id):
def inner(*args, **kwargs):
if cache_id not in self._player_cache:
try:
self._player_cache[cache_id] = func(*args, **kwargs)
except ExtractorError as e:
self._player_cache[cache_id] = e
except Exception as e:
self._player_cache[cache_id] = ExtractorError(traceback.format_exc(), cause=e)
ret = self._player_cache[cache_id]
if isinstance(ret, Exception):
raise ret
return ret
return inner
def _decrypt_signature(self, s, video_id, player_url): def _decrypt_signature(self, s, video_id, player_url):
"""Turn the encrypted s field into a working signature""" """Turn the encrypted s field into a working signature"""
try: extract_sig = self._cached(
player_id = (player_url, self._signature_cache_id(s)) self._extract_signature_function, 'sig', player_url, self._signature_cache_id(s))
if player_id not in self._player_cache: func = extract_sig(video_id, player_url, s)
func = self._extract_signature_function(video_id, player_url, s) self._print_sig_code(func, s)
self._player_cache[player_id] = func return func(s)
func = self._player_cache[player_id]
self._print_sig_code(func, s)
return func(s)
except Exception as e:
raise ExtractorError(traceback.format_exc(), cause=e, video_id=video_id)
def _decrypt_nsig(self, s, video_id, player_url): def _decrypt_nsig(self, s, video_id, player_url):
"""Turn the encrypted n field into a working signature""" """Turn the encrypted n field into a working signature"""
@ -2612,54 +2620,47 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
raise ExtractorError('Cannot decrypt nsig without player_url') raise ExtractorError('Cannot decrypt nsig without player_url')
player_url = urljoin('https://www.youtube.com', player_url) player_url = urljoin('https://www.youtube.com', player_url)
sig_id = ('nsig_value', s) jsi, player_id, func_code = self._extract_n_function_code(video_id, player_url)
if sig_id in self._player_cache: if self.get_param('youtube_print_sig_code'):
return self._player_cache[sig_id] self.to_screen(f'Extracted nsig function from {player_id}:\n{func_code[1]}\n')
try:
player_id = ('nsig', player_url)
if player_id not in self._player_cache:
self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
func = self._player_cache[player_id]
self._player_cache[sig_id] = func(s)
self.write_debug(f'Decrypted nsig {s} => {self._player_cache[sig_id]}')
return self._player_cache[sig_id]
except Exception as e:
raise ExtractorError(traceback.format_exc(), cause=e, video_id=video_id)
def _extract_n_function_name(self, jscode):
nfunc, idx = self._search_regex(
r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z0-9$]+)(?:\[(?P<idx>\d+)\])?\([a-zA-Z0-9]\)',
jscode, 'Initial JS player n function name', group=('nfunc', 'idx'))
if not idx:
return nfunc
return json.loads(js_to_json(self._search_regex(
rf'var {re.escape(nfunc)}\s*=\s*(\[.+?\]);', jscode,
f'Initial JS player n function list ({nfunc}.{idx})')))[int(idx)]
def _extract_n_function(self, video_id, player_url): extract_nsig = self._cached(self._extract_n_function_from_code, 'nsig func', player_url)
ret = extract_nsig(jsi, func_code)(s)
self.write_debug(f'Decrypted nsig {s} => {ret}')
return ret
def _extract_n_function_code(self, video_id, player_url):
player_id = self._extract_player_info(player_url) player_id = self._extract_player_info(player_url)
func_code = self.cache.load('youtube-nsig', player_id) func_code = self.cache.load('youtube-nsig', player_id)
jscode = func_code or self._load_player(video_id, player_url)
jsi = JSInterpreter(jscode)
if func_code: if func_code:
jsi = JSInterpreter(func_code) return jsi, player_id, func_code
else:
jscode = self._load_player(video_id, player_url)
funcname = self._extract_n_function_name(jscode)
jsi = JSInterpreter(jscode)
func_code = jsi.extract_function_code(funcname)
self.cache.store('youtube-nsig', player_id, func_code)
if self.get_param('youtube_print_sig_code'): funcname, idx = self._search_regex(
self.to_screen(f'Extracted nsig function from {player_id}:\n{func_code[1]}\n') r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z0-9$]+)(?:\[(?P<idx>\d+)\])?\([a-zA-Z0-9]\)',
jscode, 'Initial JS player n function name', group=('nfunc', 'idx'))
if idx:
funcname = json.loads(js_to_json(self._search_regex(
rf'var {re.escape(funcname)}\s*=\s*(\[.+?\]);', jscode,
f'Initial JS player n function list ({funcname}.{idx})')))[int(idx)]
func_code = jsi.extract_function_code(funcname)
self.cache.store('youtube-nsig', player_id, func_code)
return jsi, player_id, func_code
def _extract_n_function_from_code(self, jsi, func_code):
func = jsi.extract_function_from_code(*func_code) func = jsi.extract_function_from_code(*func_code)
def inner(s): def extract_nsig(s):
ret = func([s]) ret = func([s])
if ret.startswith('enhanced_except_'): if ret.startswith('enhanced_except_'):
raise ExtractorError('Signature function returned an exception') raise ExtractorError('Signature function returned an exception')
return ret return ret
return inner
return extract_nsig
def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False): def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False):
""" """
@ -3225,7 +3226,8 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self._decrypt_signature(encrypted_sig, video_id, player_url) self._decrypt_signature(encrypted_sig, video_id, player_url)
) )
except ExtractorError as e: except ExtractorError as e:
self.report_warning('Signature extraction failed: Some formats may be missing', only_once=True) self.report_warning('Signature extraction failed: Some formats may be missing',
video_id=video_id, only_once=True)
self.write_debug(e, only_once=True) self.write_debug(e, only_once=True)
continue continue
@ -3233,12 +3235,14 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
throttled = False throttled = False
if query.get('n'): if query.get('n'):
try: try:
decrypt_nsig = self._cached(self._decrypt_nsig, 'nsig', query['n'][0])
fmt_url = update_url_query(fmt_url, { fmt_url = update_url_query(fmt_url, {
'n': self._decrypt_nsig(query['n'][0], video_id, player_url)}) 'n': decrypt_nsig(query['n'][0], video_id, player_url)
})
except ExtractorError as e: except ExtractorError as e:
self.report_warning( self.report_warning(
'nsig extraction failed: You may experience throttling for some formats\n' 'nsig extraction failed: You may experience throttling for some formats\n'
f'n = {query["n"][0]} ; player = {player_url}', only_once=True) f'n = {query["n"][0]} ; player = {player_url}', video_id=video_id, only_once=True)
self.write_debug(e, only_once=True) self.write_debug(e, only_once=True)
throttled = True throttled = True

Loading…
Cancel
Save