[utils] Add parse_qs, update_url

[skip ci]
pull/31716/head
dirkf 2 years ago
parent 249f2b6316
commit 90c9f789d9

@ -42,6 +42,7 @@ from .compat import (
compat_HTMLParser, compat_HTMLParser,
compat_HTTPError, compat_HTTPError,
compat_basestring, compat_basestring,
compat_casefold,
compat_chr, compat_chr,
compat_collections_abc, compat_collections_abc,
compat_cookiejar, compat_cookiejar,
@ -54,18 +55,18 @@ from .compat import (
compat_integer_types, compat_integer_types,
compat_kwargs, compat_kwargs,
compat_os_name, compat_os_name,
compat_parse_qs, compat_re_Match,
compat_shlex_quote, compat_shlex_quote,
compat_str, compat_str,
compat_struct_pack, compat_struct_pack,
compat_struct_unpack, compat_struct_unpack,
compat_urllib_error, compat_urllib_error,
compat_urllib_parse, compat_urllib_parse,
compat_urllib_parse_parse_qs as compat_parse_qs,
compat_urllib_parse_urlencode, compat_urllib_parse_urlencode,
compat_urllib_parse_urlparse, compat_urllib_parse_urlparse,
compat_urllib_parse_unquote_plus, compat_urllib_parse_unquote_plus,
compat_urllib_request, compat_urllib_request,
compat_urlparse,
compat_xpath, compat_xpath,
) )
@ -80,12 +81,12 @@ def register_socks_protocols():
# In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904 # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
# URLs with protocols not in urlparse.uses_netloc are not handled correctly # URLs with protocols not in urlparse.uses_netloc are not handled correctly
for scheme in ('socks', 'socks4', 'socks4a', 'socks5'): for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
if scheme not in compat_urlparse.uses_netloc: if scheme not in compat_urllib_parse.uses_netloc:
compat_urlparse.uses_netloc.append(scheme) compat_urllib_parse.uses_netloc.append(scheme)
# This is not clearly defined otherwise # Unfavoured alias
compiled_regex_type = type(re.compile('')) compiled_regex_type = compat_re_Match
def random_user_agent(): def random_user_agent():
@ -2725,7 +2726,7 @@ def make_socks_conn_class(base_class, socks_proxy):
assert issubclass(base_class, ( assert issubclass(base_class, (
compat_http_client.HTTPConnection, compat_http_client.HTTPSConnection)) compat_http_client.HTTPConnection, compat_http_client.HTTPSConnection))
url_components = compat_urlparse.urlparse(socks_proxy) url_components = compat_urllib_parse.urlparse(socks_proxy)
if url_components.scheme.lower() == 'socks5': if url_components.scheme.lower() == 'socks5':
socks_type = ProxyType.SOCKS5 socks_type = ProxyType.SOCKS5
elif url_components.scheme.lower() in ('socks', 'socks4'): elif url_components.scheme.lower() in ('socks', 'socks4'):
@ -3673,7 +3674,7 @@ def remove_quotes(s):
def url_basename(url): def url_basename(url):
path = compat_urlparse.urlparse(url).path path = compat_urllib_parse.urlparse(url).path
return path.strip('/').split('/')[-1] return path.strip('/').split('/')[-1]
@ -3693,7 +3694,7 @@ def urljoin(base, path):
if not isinstance(base, compat_str) or not re.match( if not isinstance(base, compat_str) or not re.match(
r'^(?:https?:)?//', base): r'^(?:https?:)?//', base):
return None return None
return compat_urlparse.urljoin(base, path) return compat_urllib_parse.urljoin(base, path)
class HEADRequest(compat_urllib_request.Request): class HEADRequest(compat_urllib_request.Request):
@ -4091,6 +4092,10 @@ def escape_url(url):
).geturl() ).geturl()
def parse_qs(url):
return compat_parse_qs(compat_urllib_parse.urlparse(url).query)
def read_batch_urls(batch_fd): def read_batch_urls(batch_fd):
def fixup(url): def fixup(url):
if not isinstance(url, compat_str): if not isinstance(url, compat_str):
@ -4111,25 +4116,28 @@ def urlencode_postdata(*args, **kargs):
return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii') return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii')
def update_url_query(url, query):
if not query:
return url
parsed_url = compat_urlparse.urlparse(url)
qs = compat_parse_qs(parsed_url.query)
qs.update(query)
return compat_urlparse.urlunparse(parsed_url._replace(
query=compat_urllib_parse_urlencode(qs, True)))
def update_url(url, **kwargs): def update_url(url, **kwargs):
"""Replace URL components specified by kwargs """Replace URL components specified by kwargs
url: compat_str or parsed URL tuple url: compat_str or parsed URL tuple
returns: compat_str""" if query_update is in kwargs, update query with
its value instead of replacing (overrides any `query`)
returns: compat_str
"""
if not kwargs: if not kwargs:
return compat_urlparse.urlunparse(url) if isinstance(url, tuple) else url return compat_urllib_parse.urlunparse(url) if isinstance(url, tuple) else url
if not isinstance(url, tuple): if not isinstance(url, tuple):
url = compat_urlparse.urlparse(url) url = compat_urllib_parse.urlparse(url)
return compat_urlparse.urlunparse(url._replace(**kwargs)) query = kwargs.pop('query_update', None)
if query:
qs = compat_parse_qs(url.query)
qs.update(query)
kwargs['query'] = compat_urllib_parse_urlencode(qs, True)
kwargs = compat_kwargs(kwargs)
return compat_urllib_parse.urlunparse(url._replace(**kwargs))
def update_url_query(url, query):
return update_url(url, query_update=query)
def update_Request(req, url=None, data=None, headers={}, query={}): def update_Request(req, url=None, data=None, headers={}, query={}):
@ -5597,7 +5605,7 @@ class PerRequestProxyHandler(compat_urllib_request.ProxyHandler):
if proxy == '__noproxy__': if proxy == '__noproxy__':
return None # No Proxy return None # No Proxy
if compat_urlparse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'): if compat_urllib_parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
req.add_header('Ytdl-socks-proxy', proxy) req.add_header('Ytdl-socks-proxy', proxy)
# youtube-dl's http/https handlers do wrapping the socket with socks # youtube-dl's http/https handlers do wrapping the socket with socks
return None return None
@ -6035,14 +6043,6 @@ def traverse_obj(obj, *paths, **kwargs):
str = compat_str str = compat_str
is_sequence = lambda x: isinstance(x, compat_collections_abc.Sequence) and not isinstance(x, (str, bytes)) is_sequence = lambda x: isinstance(x, compat_collections_abc.Sequence) and not isinstance(x, (str, bytes))
# stand-in until compat_re_Match is added
compat_re_Match = type(re.match('a', 'a'))
# stand-in until casefold.py is added
try:
''.casefold()
compat_casefold = lambda s: s.casefold()
except AttributeError:
compat_casefold = lambda s: s.lower()
casefold = lambda k: compat_casefold(k) if isinstance(k, str) else k casefold = lambda k: compat_casefold(k) if isinstance(k, str) else k
if isinstance(expected_type, type): if isinstance(expected_type, type):

Loading…
Cancel
Save