From 96f87aaa3b34d80bc72097a7475d8093849091fc Mon Sep 17 00:00:00 2001 From: df Date: Tue, 2 Nov 2021 11:18:39 +0000 Subject: [PATCH] Back-port JS interpreter upgrade from yt-dlp PR #1437 --- test/test_jsinterp.py | 51 +++++ youtube_dl/compat.py | 5 + youtube_dl/jsinterp.py | 504 ++++++++++++++++++++++++++++++++--------- 3 files changed, 453 insertions(+), 107 deletions(-) diff --git a/test/test_jsinterp.py b/test/test_jsinterp.py index c24b8ca74..4d05ea610 100644 --- a/test/test_jsinterp.py +++ b/test/test_jsinterp.py @@ -112,6 +112,57 @@ class TestJSInterpreter(unittest.TestCase): ''') self.assertEqual(jsi.call_function('z'), 5) + def test_for_loop(self): + # function x() { a=0; for (i=0; i-10; i++) {a++} a } + jsi = JSInterpreter(''' + function x() { a=0; for (i=0; i-10; i = i + 1) {a++} a } + ''') + self.assertEqual(jsi.call_function('x'), 10) + + def test_switch(self): + jsi = JSInterpreter(''' + function x(f) { switch(f){ + case 1:f+=1; + case 2:f+=2; + case 3:f+=3;break; + case 4:f+=4; + default:f=0; + } return f } + ''') + self.assertEqual(jsi.call_function('x', 1), 7) + self.assertEqual(jsi.call_function('x', 3), 6) + self.assertEqual(jsi.call_function('x', 5), 0) + + def test_try(self): + jsi = JSInterpreter(''' + function x() { try{return 10} catch(e){return 5} } + ''') + self.assertEqual(jsi.call_function('x'), 10) + + def test_for_loop_continue(self): + jsi = JSInterpreter(''' + function x() { a=0; for (i=0; i-10; i++) { continue; a++ } a } + ''') + self.assertEqual(jsi.call_function('x'), 0) + + def test_for_loop_break(self): + jsi = JSInterpreter(''' + function x() { a=0; for (i=0; i-10; i++) { break; a++ } a } + ''') + self.assertEqual(jsi.call_function('x'), 0) + + def test_literal_list(self): + jsi = JSInterpreter(''' + function x() { [1, 2, "asdf", [5, 6, 7]][3] } + ''') + self.assertEqual(jsi.call_function('x'), [5, 6, 7]) + + def test_comma(self): + jsi = JSInterpreter(''' + function x() { a=5; a -= 1, a+=3; return a } + ''') + self.assertEqual(jsi.call_function('x'), 7) + if __name__ == '__main__': unittest.main() diff --git a/youtube_dl/compat.py b/youtube_dl/compat.py index 29e0d3a02..2004a405a 100644 --- a/youtube_dl/compat.py +++ b/youtube_dl/compat.py @@ -21,6 +21,10 @@ import subprocess import sys import xml.etree.ElementTree +try: + import collections.abc as compat_collections_abc +except ImportError: + import collections as compat_collections_abc try: import urllib.request as compat_urllib_request @@ -3025,6 +3029,7 @@ __all__ = [ 'compat_b64decode', 'compat_basestring', 'compat_chr', + 'compat_collections_abc', 'compat_cookiejar', 'compat_cookiejar_Cookie', 'compat_cookies', diff --git a/youtube_dl/jsinterp.py b/youtube_dl/jsinterp.py index 7bda59610..061e92c2a 100644 --- a/youtube_dl/jsinterp.py +++ b/youtube_dl/jsinterp.py @@ -8,6 +8,15 @@ from .utils import ( ExtractorError, remove_quotes, ) +from .compat import ( + compat_collections_abc +) +MutableMapping = compat_collections_abc.MutableMapping + + +class Nonlocal: + pass + _OPERATORS = [ ('|', operator.or_), @@ -22,11 +31,55 @@ _OPERATORS = [ ('*', operator.mul), ] _ASSIGN_OPERATORS = [(op + '=', opfunc) for op, opfunc in _OPERATORS] -_ASSIGN_OPERATORS.append(('=', lambda cur, right: right)) +_ASSIGN_OPERATORS.append(('=', (lambda cur, right: right))) _NAME_RE = r'[a-zA-Z_$][a-zA-Z_$0-9]*' +class JS_Break(ExtractorError): + def __init__(self): + ExtractorError.__init__(self, 'Invalid break') + + +class JS_Continue(ExtractorError): + def __init__(self): + ExtractorError.__init__(self, 'Invalid continue') + + +class LocalNameSpace(MutableMapping): + def __init__(self, *stack): + self.stack = tuple(stack) + + def __getitem__(self, key): + for scope in self.stack: + if key in scope: + return scope[key] + raise KeyError(key) + + def __setitem__(self, key, value): + for scope in self.stack: + if key in scope: + scope[key] = value + break + else: + self.stack[0][key] = value + return value + + def __delitem__(self, key): + raise NotImplementedError('Deleting is not supported') + + def __iter__(self): + for scope in self.stack: + for scope_item in iter(scope): + yield scope_item + + def __len__(self, key): + return len(iter(self)) + + def __repr__(self): + return 'LocalNameSpace%s' % (self.stack, ) + + class JSInterpreter(object): def __init__(self, code, objects=None): if objects is None: @@ -34,11 +87,58 @@ class JSInterpreter(object): self.code = code self._functions = {} self._objects = objects + self.__named_object_counter = 0 + + def _named_object(self, namespace, obj): + self.__named_object_counter += 1 + name = '__youtube_dl_jsinterp_obj%s' % (self.__named_object_counter, ) + namespace[name] = obj + return name + + @staticmethod + def _separate(expr, delim=',', max_split=None): + if not expr: + return + parens = {'(': 0, '{': 0, '[': 0, ']': 0, '}': 0, ')': 0} + start, splits, pos, max_pos = 0, 0, 0, len(delim) - 1 + for idx, char in enumerate(expr): + if char in parens: + parens[char] += 1 + is_in_parens = (parens['['] - parens[']'] + or parens['('] - parens[')'] + or parens['{'] - parens['}']) + if char == delim[pos] and not is_in_parens: + if pos == max_pos: + pos = 0 + yield expr[start: idx - max_pos] + start = idx + 1 + splits += 1 + if max_split and splits >= max_split: + break + else: + pos += 1 + else: + pos = 0 + yield expr[start:] + + @staticmethod + def _separate_at_paren(expr, delim): + separated = list(JSInterpreter._separate(expr, delim, 1)) + if len(separated) < 2: + raise ExtractorError('No terminating paren {0} in {1}'.format(delim, expr)) + return separated[0][1:].strip(), separated[1].strip() def interpret_statement(self, stmt, local_vars, allow_recursion=100): if allow_recursion < 0: raise ExtractorError('Recursion limit reached') + sub_statements = list(self._separate(stmt, ';')) + stmt = (sub_statements or ['']).pop() + for sub_stmt in sub_statements: + ret, should_abort = self.interpret_statement(sub_stmt, local_vars, allow_recursion - 1) + if should_abort: + return ret + should_abort = False stmt = stmt.lstrip() stmt_m = re.match(r'var\s', stmt) @@ -61,25 +161,119 @@ class JSInterpreter(object): if expr == '': # Empty expression return None + if expr.startswith('{'): + inner, outer = self._separate_at_paren(expr, '}') + inner, should_abort = self.interpret_statement(inner, local_vars, allow_recursion - 1) + if not outer or should_abort: + return inner + else: + expr = json.dumps(inner) + outer + if expr.startswith('('): - parens_count = 0 - for m in re.finditer(r'[()]', expr): - if m.group(0) == '(': - parens_count += 1 + inner, outer = self._separate_at_paren(expr, ')') + inner = self.interpret_expression(inner, local_vars, allow_recursion) + if not outer: + return inner + else: + expr = json.dumps(inner) + outer + + if expr.startswith('['): + inner, outer = self._separate_at_paren(expr, ']') + name = self._named_object(local_vars, [ + self.interpret_expression(item, local_vars, allow_recursion) + for item in self._separate(inner)]) + expr = name + outer + + m = re.match(r'try\s*', expr) + if m: + if expr[m.end()] == '{': + try_expr, expr = self._separate_at_paren(expr[m.end():], '}') + else: + try_expr, expr = expr[m.end() - 1:], '' + ret, should_abort = self.interpret_statement(try_expr, local_vars, allow_recursion - 1) + if should_abort: + return ret + return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0] + + m = re.match(r'(?:(?Pcatch)|(?Pfor)|(?Pswitch))\s*\(', expr) + md = m.groupdict() if m else {} + if md.get('catch'): + # We ignore the catch block + _, expr = self._separate_at_paren(expr, '}') + return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0] + + elif md.get('for'): + def raise_constructor_error(c): + raise ExtractorError( + 'Premature return in the initialization of a for loop in {0!r}'.format(c)) + + constructor, remaining = self._separate_at_paren(expr[m.end() - 1:], ')') + if remaining.startswith('{'): + body, expr = self._separate_at_paren(remaining, '}') + else: + m = re.match(r'switch\s*\(', remaining) # FIXME + if m: + switch_val, remaining = self._separate_at_paren(remaining[m.end() - 1:], ')') + body, expr = self._separate_at_paren(remaining, '}') + body = 'switch(%s){%s}' % (switch_val, body) else: - parens_count -= 1 - if parens_count == 0: - sub_expr = expr[1:m.start()] - sub_result = self.interpret_expression( - sub_expr, local_vars, allow_recursion) - remaining_expr = expr[m.end():].strip() - if not remaining_expr: - return sub_result - else: - expr = json.dumps(sub_result) + remaining_expr + body, expr = remaining, '' + start, cndn, increment = self._separate(constructor, ';') + if self.interpret_statement(start, local_vars, allow_recursion - 1)[1]: + raise_constructor_error(constructor) + while True: + if not self.interpret_expression(cndn, local_vars, allow_recursion): + break + try: + ret, should_abort = self.interpret_statement(body, local_vars, allow_recursion - 1) + if should_abort: + return ret + except JS_Break: + break + except JS_Continue: + pass + if self.interpret_statement(increment, local_vars, allow_recursion - 1)[1]: + raise_constructor_error(constructor) + return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0] + + elif md.get('switch'): + switch_val, remaining = self._separate_at_paren(expr[m.end() - 1:], ')') + switch_val = self.interpret_expression(switch_val, local_vars, allow_recursion) + body, expr = self._separate_at_paren(remaining, '}') + body, default = body.split('default:') if 'default:' in body else (body, None) + items = body.split('case ')[1:] + if default: + items.append('default:%s' % (default, )) + matched = False + for item in items: + case, stmt = [i.strip() for i in self._separate(item, ':', 1)] + matched = matched or case == 'default' or switch_val == self.interpret_expression(case, local_vars, allow_recursion) + if matched: + try: + ret, should_abort = self.interpret_statement(stmt, local_vars, allow_recursion - 1) + if should_abort: + return ret + except JS_Break: break - else: - raise ExtractorError('Premature end of parens in %r' % expr) + return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0] + + # Comma separated statements + sub_expressions = list(self._separate(expr)) + expr = sub_expressions.pop().strip() if sub_expressions else '' + for sub_expr in sub_expressions: + self.interpret_expression(sub_expr, local_vars, allow_recursion) + + for m in re.finditer(r'''(?x) + (?P\+\+|--)(?P%(_NAME_RE)s)| + (?P%(_NAME_RE)s)(?P\+\+|--)''' % globals(), expr): + var = m.group('var1') or m.group('var2') + start, end = m.span() + sign = m.group('pre_sign') or m.group('post_sign') + ret = local_vars[var] + local_vars[var] += 1 if sign[0] == '+' else -1 + if m.group('pre_sign'): + ret = local_vars[var] + expr = expr[:start] + json.dumps(ret) + expr[end:] for op, opfunc in _ASSIGN_OPERATORS: m = re.match(r'''(?x) @@ -88,14 +282,13 @@ class JSInterpreter(object): (?P.*)$''' % (_NAME_RE, re.escape(op)), expr) if not m: continue - right_val = self.interpret_expression( - m.group('expr'), local_vars, allow_recursion - 1) + right_val = self.interpret_expression(m.group('expr'), local_vars, allow_recursion) if m.groupdict().get('index'): lvar = local_vars[m.group('out')] - idx = self.interpret_expression( - m.group('index'), local_vars, allow_recursion) - assert isinstance(idx, int) + idx = self.interpret_expression(m.group('index'), local_vars, allow_recursion) + if not isinstance(idx, int): + raise ExtractorError('List indices must be integers: %s' % (idx, )) cur = lvar[idx] val = opfunc(cur, right_val) lvar[idx] = val @@ -109,8 +302,13 @@ class JSInterpreter(object): if expr.isdigit(): return int(expr) + if expr == 'break': + raise JS_Break() + elif expr == 'continue': + raise JS_Continue() + var_m = re.match( - r'(?!if|return|true|false)(?P%s)$' % _NAME_RE, + r'(?!if|return|true|false|null)(?P%s)$' % _NAME_RE, expr) if var_m: return local_vars[var_m.group('name')] @@ -124,91 +322,161 @@ class JSInterpreter(object): r'(?P%s)\[(?P.+)\]$' % _NAME_RE, expr) if m: val = local_vars[m.group('in')] - idx = self.interpret_expression( - m.group('idx'), local_vars, allow_recursion - 1) + idx = self.interpret_expression(m.group('idx'), local_vars, allow_recursion) return val[idx] + def raise_expr_error(where, op, exp): + raise ExtractorError('Premature {0} return of {1} in {2!r}'.format(where, op, exp)) + + for op, opfunc in _OPERATORS: + separated = list(self._separate(expr, op)) + if len(separated) < 2: + continue + right_val = separated.pop() + left_val = op.join(separated) + left_val, should_abort = self.interpret_statement( + left_val, local_vars, allow_recursion - 1) + if should_abort: + raise_expr_error('left-side', op, expr) + right_val, should_abort = self.interpret_statement( + right_val, local_vars, allow_recursion - 1) + if should_abort: + raise_expr_error('right-side', op, expr) + return opfunc(left_val or 0, right_val) + m = re.match( - r'(?P%s)(?:\.(?P[^(]+)|\[(?P[^]]+)\])\s*(?:\(+(?P[^()]*)\))?$' % _NAME_RE, + r'(?P%s)(?:\.(?P[^(]+)|\[(?P[^]]+)\])\s*' % _NAME_RE, expr) if m: variable = m.group('var') - member = remove_quotes(m.group('member') or m.group('member2')) - arg_str = m.group('args') + nl = Nonlocal() - if variable in local_vars: - obj = local_vars[variable] - else: - if variable not in self._objects: - self._objects[variable] = self.extract_object(variable) - obj = self._objects[variable] - - if arg_str is None: - # Member access - if member == 'length': - return len(obj) - return obj[member] - - assert expr.endswith(')') - # Function call - if arg_str == '': - argvals = tuple() + nl.member = remove_quotes(m.group('member') or m.group('member2')) + arg_str = expr[m.end():] + if arg_str.startswith('('): + arg_str, remaining = self._separate_at_paren(arg_str, ')') else: - argvals = tuple([ + arg_str, remaining = None, arg_str + + def assertion(cndn, msg): + """ assert, but without risk of getting optimized out """ + if not cndn: + raise ExtractorError('{0} {1}: {2}'.format(nl.member, msg, expr)) + + def eval_method(): + # nonlocal member + member = nl.member + if variable == 'String': + obj = str + elif variable in local_vars: + obj = local_vars[variable] + else: + if variable not in self._objects: + self._objects[variable] = self.extract_object(variable) + obj = self._objects[variable] + + if arg_str is None: + # Member access + if member == 'length': + return len(obj) + return obj[member] + + # Function call + argvals = [ self.interpret_expression(v, local_vars, allow_recursion) - for v in arg_str.split(',')]) - - if member == 'split': - assert argvals == ('',) - return list(obj) - if member == 'join': - assert len(argvals) == 1 - return argvals[0].join(obj) - if member == 'reverse': - assert len(argvals) == 0 - obj.reverse() - return obj - if member == 'slice': - assert len(argvals) == 1 - return obj[argvals[0]:] - if member == 'splice': - assert isinstance(obj, list) - index, howMany = argvals - res = [] - for i in range(index, min(index + howMany, len(obj))): - res.append(obj.pop(index)) - return res - - return obj[member](argvals) - - for op, opfunc in _OPERATORS: - m = re.match(r'(?P.+?)%s(?P.+)' % re.escape(op), expr) - if not m: - continue - x, abort = self.interpret_statement( - m.group('x'), local_vars, allow_recursion - 1) - if abort: - raise ExtractorError( - 'Premature left-side return of %s in %r' % (op, expr)) - y, abort = self.interpret_statement( - m.group('y'), local_vars, allow_recursion - 1) - if abort: - raise ExtractorError( - 'Premature right-side return of %s in %r' % (op, expr)) - return opfunc(x, y) + for v in self._separate(arg_str)] + + if obj == str: + if member == 'fromCharCode': + assertion(argvals, 'takes one or more arguments') + return ''.join(map(chr, argvals)) + raise ExtractorError('Unsupported string method %s' % (member, )) + + if member == 'split': + assertion(argvals, 'takes one or more arguments') + assertion(argvals == [''], 'with arguments is not implemented') + return list(obj) + elif member == 'join': + assertion(isinstance(obj, list), 'must be applied on a list') + assertion(len(argvals) == 1, 'takes exactly one argument') + return argvals[0].join(obj) + elif member == 'reverse': + assertion(not argvals, 'does not take any arguments') + obj.reverse() + return obj + elif member == 'slice': + assertion(isinstance(obj, list), 'must be applied on a list') + assertion(len(argvals) == 1, 'takes exactly one argument') + return obj[argvals[0]:] + elif member == 'splice': + assertion(isinstance(obj, list), 'must be applied on a list') + assertion(argvals, 'takes one or more arguments') + index, howMany = (argvals + [len(obj)])[:2] + if index < 0: + index += len(obj) + add_items = argvals[2:] + res = [] + for i in range(index, min(index + howMany, len(obj))): + res.append(obj.pop(index)) + for i, item in enumerate(add_items): + obj.insert(index + i, item) + return res + elif member == 'unshift': + assertion(isinstance(obj, list), 'must be applied on a list') + assertion(argvals, 'takes one or more arguments') + for item in reversed(argvals): + obj.insert(0, item) + return obj + elif member == 'pop': + assertion(isinstance(obj, list), 'must be applied on a list') + assertion(not argvals, 'does not take any arguments') + if not obj: + return + return obj.pop() + elif member == 'push': + assertion(argvals, 'takes one or more arguments') + obj.extend(argvals) + return obj + elif member == 'forEach': + assertion(argvals, 'takes one or more arguments') + assertion(len(argvals) <= 2, 'takes at-most 2 arguments') + f, this = (argvals + [''])[:2] + return [f((item, idx, obj), this=this) for idx, item in enumerate(obj)] + elif member == 'indexOf': + assertion(argvals, 'takes one or more arguments') + assertion(len(argvals) <= 2, 'takes at-most 2 arguments') + idx, start = (argvals + [0])[:2] + try: + return obj.index(idx, start) + except ValueError: + return -1 + + if isinstance(obj, list): + member = int(member) + nl.member = member + return obj[member](argvals) + + if remaining: + return self.interpret_expression( + self._named_object(local_vars, eval_method()) + remaining, + local_vars, allow_recursion) + else: + return eval_method() - m = re.match( - r'^(?P%s)\((?P[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr) + m = re.match(r'^(?P%s)\((?P[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr) if m: fname = m.group('func') argvals = tuple([ int(v) if v.isdigit() else local_vars[v] - for v in m.group('args').split(',')]) if len(m.group('args')) > 0 else tuple() - if fname not in self._functions: + for v in self._separate(m.group('args'))]) + if fname in local_vars: + return local_vars[fname](argvals) + elif fname not in self._functions: self._functions[fname] = self.extract_function(fname) return self._functions[fname](argvals) - raise ExtractorError('Unsupported JS expression %r' % expr) + if expr: + raise ExtractorError('Unsupported JS expression %r' % expr) def extract_object(self, objname): _FUNC_NAME_RE = r'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')''' @@ -233,30 +501,52 @@ class JSInterpreter(object): return obj - def extract_function(self, funcname): + def extract_function_code(self, funcname): + """ @returns argnames, code """ func_m = re.search( r'''(?x) - (?:function\s+%s|[{;,]\s*%s\s*=\s*function|var\s+%s\s*=\s*function)\s* + (?:function\s+%(f_n)s|[{;,]\s*%(f_n)s\s*=\s*function|var\s+%(f_n)s\s*=\s*function)\s* \((?P[^)]*)\)\s* - \{(?P[^}]+)\}''' % ( - re.escape(funcname), re.escape(funcname), re.escape(funcname)), + (?P\{(?:(?!};)[^"]|"([^"]|\\")*")+\})''' % {'f_n': re.escape(funcname), }, self.code) + code, _ = self._separate_at_paren(func_m.group('code'), '}') # refine the match if func_m is None: raise ExtractorError('Could not find JS function %r' % funcname) - argnames = func_m.group('args').split(',') + return func_m.group('args').split(','), code - return self.build_function(argnames, func_m.group('code')) + def extract_function(self, funcname): + return self.extract_function_from_code(*self.extract_function_code(funcname)) + + def extract_function_from_code(self, argnames, code, *global_stack): + local_vars = {} + while True: + mobj = re.search(r'function\((?P[^)]*)\)\s*{', code) + if mobj is None: + break + start, body_start = mobj.span() + body, remaining = self._separate_at_paren(code[body_start - 1:], '}') + name = self._named_object( + local_vars, + self.extract_function_from_code( + [str.strip(x) for x in mobj.group('args').split(',')], + body, local_vars, *global_stack)) + code = code[:start] + name + remaining + return self.build_function(argnames, code, local_vars, *global_stack) def call_function(self, funcname, *args): - f = self.extract_function(funcname) - return f(args) - - def build_function(self, argnames, code): - def resf(args): - local_vars = dict(zip(argnames, args)) - for stmt in code.split(';'): - res, abort = self.interpret_statement(stmt, local_vars) - if abort: + return self.extract_function(funcname)(args) + + def build_function(self, argnames, code, *global_stack): + global_stack = list(global_stack) or [{}] + local_vars = global_stack.pop(0) + + def resf(args, **kwargs): + local_vars.update(dict(zip(argnames, args))) + local_vars.update(kwargs) + var_stack = LocalNameSpace(local_vars, *global_stack) + for stmt in self._separate(code.replace('\n', ''), ';'): + ret, should_abort = self.interpret_statement(stmt, var_stack) + if should_abort: break - return res + return ret return resf