[utils] Allow partial application for more functions (#11391)

Also adds the `trim_str` traversal helper

Authored by: bashonly, Grub4K

Co-authored-by: Simon Sawicki <contact@grub4k.xyz>
pull/11418/head
bashonly 2 months ago committed by GitHub
parent 76802f4613
commit b6dc2c49e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -12,9 +12,10 @@ from yt_dlp.utils import (
str_or_none, str_or_none,
) )
from yt_dlp.utils.traversal import ( from yt_dlp.utils.traversal import (
traverse_obj,
require, require,
subs_list_to_dict, subs_list_to_dict,
traverse_obj,
trim_str,
) )
_TEST_DATA = { _TEST_DATA = {
@ -495,6 +496,20 @@ class TestTraversalHelpers:
{'url': 'https://example.com/subs/en2', 'ext': 'ext'}, {'url': 'https://example.com/subs/en2', 'ext': 'ext'},
]}, '`quality` key should sort subtitle list accordingly' ]}, '`quality` key should sort subtitle list accordingly'
def test_trim_str(self):
with pytest.raises(TypeError):
trim_str('positional')
assert callable(trim_str(start='a'))
assert trim_str(start='ab')('abc') == 'c'
assert trim_str(end='bc')('abc') == 'a'
assert trim_str(start='a', end='c')('abc') == 'b'
assert trim_str(start='ab', end='c')('abc') == ''
assert trim_str(start='a', end='bc')('abc') == ''
assert trim_str(start='ab', end='bc')('abc') == ''
assert trim_str(start='abc', end='abc')('abc') == ''
assert trim_str(start='', end='')('abc') == 'abc'
class TestDictGet: class TestDictGet:
def test_dict_get(self): def test_dict_get(self):

@ -4,6 +4,7 @@
import os import os
import sys import sys
import unittest import unittest
import unittest.mock
import warnings import warnings
import datetime as dt import datetime as dt
@ -71,6 +72,7 @@ from yt_dlp.utils import (
intlist_to_bytes, intlist_to_bytes,
iri_to_uri, iri_to_uri,
is_html, is_html,
join_nonempty,
js_to_json, js_to_json,
limit_length, limit_length,
locked_file, locked_file,
@ -343,11 +345,13 @@ class TestUtil(unittest.TestCase):
self.assertEqual(remove_start(None, 'A - '), None) self.assertEqual(remove_start(None, 'A - '), None)
self.assertEqual(remove_start('A - B', 'A - '), 'B') self.assertEqual(remove_start('A - B', 'A - '), 'B')
self.assertEqual(remove_start('B - A', 'A - '), 'B - A') self.assertEqual(remove_start('B - A', 'A - '), 'B - A')
self.assertEqual(remove_start('non-empty', ''), 'non-empty')
def test_remove_end(self): def test_remove_end(self):
self.assertEqual(remove_end(None, ' - B'), None) self.assertEqual(remove_end(None, ' - B'), None)
self.assertEqual(remove_end('A - B', ' - B'), 'A') self.assertEqual(remove_end('A - B', ' - B'), 'A')
self.assertEqual(remove_end('B - A', ' - B'), 'B - A') self.assertEqual(remove_end('B - A', ' - B'), 'B - A')
self.assertEqual(remove_end('non-empty', ''), 'non-empty')
def test_remove_quotes(self): def test_remove_quotes(self):
self.assertEqual(remove_quotes(None), None) self.assertEqual(remove_quotes(None), None)
@ -2148,6 +2152,16 @@ Line 1
assert run_shell(args) == expected assert run_shell(args) == expected
assert run_shell(shell_quote(args, shell=True)) == expected assert run_shell(shell_quote(args, shell=True)) == expected
def test_partial_application(self):
assert callable(int_or_none(scale=10)), 'missing positional parameter should apply partially'
assert int_or_none(10, scale=0.1) == 100, 'positionally passed argument should call function'
assert int_or_none(v=10) == 10, 'keyword passed positional should call function'
assert int_or_none(scale=0.1)(10) == 100, 'call after partial applicatino should call the function'
assert callable(join_nonempty(delim=', ')), 'varargs positional should apply partially'
assert callable(join_nonempty()), 'varargs positional should apply partially'
assert join_nonempty(None, delim=', ') == '', 'passed varargs should call the function'
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

@ -212,6 +212,23 @@ def write_json_file(obj, fn):
raise raise
def partial_application(func):
sig = inspect.signature(func)
required_args = [
param.name for param in sig.parameters.values()
if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.VAR_POSITIONAL)
if param.default is inspect.Parameter.empty
]
@functools.wraps(func)
def wrapped(*args, **kwargs):
if set(required_args[len(args):]).difference(kwargs):
return functools.partial(func, *args, **kwargs)
return func(*args, **kwargs)
return wrapped
def find_xpath_attr(node, xpath, key, val=None): def find_xpath_attr(node, xpath, key, val=None):
""" Find the xpath xpath[@key=val] """ """ Find the xpath xpath[@key=val] """
assert re.match(r'^[a-zA-Z_-]+$', key) assert re.match(r'^[a-zA-Z_-]+$', key)
@ -1192,6 +1209,7 @@ def extract_timezone(date_str, default=None):
return timezone, date_str return timezone, date_str
@partial_application
def parse_iso8601(date_str, delimiter='T', timezone=None): def parse_iso8601(date_str, delimiter='T', timezone=None):
""" Return a UNIX timestamp from the given date """ """ Return a UNIX timestamp from the given date """
@ -1269,6 +1287,7 @@ def unified_timestamp(date_str, day_first=True):
return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds() return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds()
@partial_application
def determine_ext(url, default_ext='unknown_video'): def determine_ext(url, default_ext='unknown_video'):
if url is None or '.' not in url: if url is None or '.' not in url:
return default_ext return default_ext
@ -1944,7 +1963,7 @@ def remove_start(s, start):
def remove_end(s, end): def remove_end(s, end):
return s[:-len(end)] if s is not None and s.endswith(end) else s return s[:-len(end)] if s is not None and end and s.endswith(end) else s
def remove_quotes(s): def remove_quotes(s):
@ -1973,6 +1992,7 @@ def base_url(url):
return re.match(r'https?://[^?#]+/', url).group() return re.match(r'https?://[^?#]+/', url).group()
@partial_application
def urljoin(base, path): def urljoin(base, path):
if isinstance(path, bytes): if isinstance(path, bytes):
path = path.decode() path = path.decode()
@ -1988,21 +2008,6 @@ def urljoin(base, path):
return urllib.parse.urljoin(base, path) return urllib.parse.urljoin(base, path)
def partial_application(func):
sig = inspect.signature(func)
@functools.wraps(func)
def wrapped(*args, **kwargs):
try:
sig.bind(*args, **kwargs)
except TypeError:
return functools.partial(func, *args, **kwargs)
else:
return func(*args, **kwargs)
return wrapped
@partial_application @partial_application
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1, base=None): def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1, base=None):
if get_attr and v is not None: if get_attr and v is not None:
@ -2583,6 +2588,7 @@ def urlencode_postdata(*args, **kargs):
return urllib.parse.urlencode(*args, **kargs).encode('ascii') return urllib.parse.urlencode(*args, **kargs).encode('ascii')
@partial_application
def update_url(url, *, query_update=None, **kwargs): def update_url(url, *, query_update=None, **kwargs):
"""Replace URL components specified by kwargs """Replace URL components specified by kwargs
@param url str or parse url tuple @param url str or parse url tuple
@ -2603,6 +2609,7 @@ def update_url(url, *, query_update=None, **kwargs):
return urllib.parse.urlunparse(url._replace(**kwargs)) return urllib.parse.urlunparse(url._replace(**kwargs))
@partial_application
def update_url_query(url, query): def update_url_query(url, query):
return update_url(url, query_update=query) return update_url(url, query_update=query)
@ -2924,6 +2931,7 @@ def error_to_str(err):
return f'{type(err).__name__}: {err}' return f'{type(err).__name__}: {err}'
@partial_application
def mimetype2ext(mt, default=NO_DEFAULT): def mimetype2ext(mt, default=NO_DEFAULT):
if not isinstance(mt, str): if not isinstance(mt, str):
if default is not NO_DEFAULT: if default is not NO_DEFAULT:
@ -4664,6 +4672,7 @@ def to_high_limit_path(path):
return path return path
@partial_application
def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=IDENTITY): def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=IDENTITY):
val = traversal.traverse_obj(obj, *variadic(field)) val = traversal.traverse_obj(obj, *variadic(field))
if not val if ignore is NO_DEFAULT else val in variadic(ignore): if not val if ignore is NO_DEFAULT else val in variadic(ignore):
@ -4828,6 +4837,7 @@ def number_of_digits(number):
return len('%d' % number) return len('%d' % number)
@partial_application
def join_nonempty(*values, delim='-', from_dict=None): def join_nonempty(*values, delim='-', from_dict=None):
if from_dict is not None: if from_dict is not None:
values = (traversal.traverse_obj(from_dict, variadic(v)) for v in values) values = (traversal.traverse_obj(from_dict, variadic(v)) for v in values)
@ -5278,6 +5288,7 @@ class RetryManager:
time.sleep(delay) time.sleep(delay)
@partial_application
def make_archive_id(ie, video_id): def make_archive_id(ie, video_id):
ie_key = ie if isinstance(ie, str) else ie.ie_key() ie_key = ie if isinstance(ie, str) else ie.ie_key()
return f'{ie_key.lower()} {video_id}' return f'{ie_key.lower()} {video_id}'

@ -435,6 +435,20 @@ def find_elements(*, tag=None, cls=None, attr=None, value=None, html=False):
return functools.partial(func, cls) return functools.partial(func, cls)
def trim_str(*, start=None, end=None):
def trim(s):
if s is None:
return None
start_idx = 0
if start and s.startswith(start):
start_idx = len(start)
if end and s.endswith(end):
return s[start_idx:-len(end)]
return s[start_idx:]
return trim
def get_first(obj, *paths, **kwargs): def get_first(obj, *paths, **kwargs):
return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False) return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False)

Loading…
Cancel
Save