mirror of https://github.com/yt-dlp/yt-dlp
[networking] Rewrite architecture (#2861)
New networking interface consists of a `RequestDirector` that directs each `Request` to appropriate `RequestHandler` and returns the `Response` or raises `RequestError`. The handlers define adapters to transform its internal Request/Response/Errors to our interfaces. User-facing changes: - Fix issues with per request proxies on redirects for urllib - Support for `ALL_PROXY` environment variable for proxy setting - Support for `socks5h` proxy - Closes https://github.com/yt-dlp/yt-dlp/issues/6325, https://github.com/ytdl-org/youtube-dl/issues/22618, https://github.com/ytdl-org/youtube-dl/pull/28093 - Raise error when using `https` proxy instead of silently converting it to `http` Authored by: coletdjnzpull/5138/head
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,239 @@
#!/usr/bin/env python3
# Allow direct execution
import os
import sys
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import io
import platform
import random
import ssl
import urllib.error
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.dependencies import certifi
from yt_dlp.networking import Response
from yt_dlp.networking._helper import (
from yt_dlp.networking.exceptions import (
from yt_dlp.socks import ProxyType
from yt_dlp.utils.networking import HTTPHeaderDict
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
class TestNetworkingUtils:
def test_select_proxy(self):
proxies = {
'all': 'socks5://example.com',
'http': 'http://example.com:1080',
'no': 'bypass.example.com,yt-dl.org'
assert select_proxy('https://example.com', proxies) == proxies['all']
assert select_proxy('http://example.com', proxies) == proxies['http']
assert select_proxy('http://bypass.example.com', proxies) is None
assert select_proxy('https://yt-dl.org', proxies) is None
@pytest.mark.parametrize('socks_proxy,expected', [
('socks5h://example.com', {
'proxytype': ProxyType.SOCKS5,
'addr': 'example.com',
'port': 1080,
'rdns': True,
'username': None,
'password': None
('socks5://user:@example.com:5555', {
'proxytype': ProxyType.SOCKS5,
'addr': 'example.com',
'port': 5555,
'rdns': False,
'username': 'user',
'password': ''
('socks4://u%40ser:pa%20ss@', {
'proxytype': ProxyType.SOCKS4,
'addr': '',
'port': 1080,
'rdns': False,
'username': 'u@ser',
'password': 'pa ss'
('socks4a://:pa%20ss@', {
'proxytype': ProxyType.SOCKS4A,
'addr': '',
'port': 1080,
'rdns': True,
'username': '',
'password': 'pa ss'
def test_make_socks_proxy_opts(self, socks_proxy, expected):
assert make_socks_proxy_opts(socks_proxy) == expected
def test_make_socks_proxy_unknown(self):
with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
@pytest.mark.skipif(not certifi, reason='certifi is not installed')
def test_load_certifi(self):
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_load_certs(context, use_certifi=True)
assert context.get_ca_certs() == context2.get_ca_certs()
# Test load normal certs
# XXX: could there be a case where system certs are the same as certifi?
context3 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_load_certs(context3, use_certifi=False)
assert context3.get_ca_certs() != context.get_ca_certs()
@pytest.mark.parametrize('method,status,expected', [
('GET', 303, 'GET'),
('HEAD', 303, 'HEAD'),
('PUT', 303, 'GET'),
('POST', 301, 'GET'),
('HEAD', 301, 'HEAD'),
('POST', 302, 'GET'),
('HEAD', 302, 'HEAD'),
('PUT', 302, 'PUT'),
('POST', 308, 'POST'),
('POST', 307, 'POST'),
('HEAD', 308, 'HEAD'),
('HEAD', 307, 'HEAD'),
def test_get_redirect_method(self, method, status, expected):
assert get_redirect_method(method, status) == expected
@pytest.mark.parametrize('headers,supported_encodings,expected', [
({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
headers = HTTPHeaderDict(headers)
add_accept_encoding_header(headers, supported_encodings)
assert headers == HTTPHeaderDict(expected)
class TestInstanceStoreMixin:
class FakeInstanceStoreMixin(InstanceStoreMixin):
def _create_instance(self, **kwargs):
return random.randint(0, 1000000)
def _close_instance(self, instance):
def test_mixin(self):
mixin = self.FakeInstanceStoreMixin()
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
cookiejar = YoutubeDLCookieJar()
assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
# Different order
assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
m = mixin._get_instance(t=1234)
assert mixin._get_instance(t=1234) == m
assert mixin._get_instance(t=1234) != m
class TestNetworkingExceptions:
def create_response(status):
return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
@pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
def test_http_error(self, http_error_class):
response = self.create_response(403)
error = http_error_class(response)
assert error.status == 403
assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
assert error.reason == response.reason
assert error.response is response
data = error.response.read()
assert data == b'test'
assert repr(error) == '<HTTPError 403: Forbidden>'
@pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
def test_redirect_http_error(self, http_error_class):
response = self.create_response(301)
error = http_error_class(response, redirect_loop=True)
assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
assert error.reason == 'Moved Permanently'
def test_compat_http_error(self):
response = self.create_response(403)
error = _CompatHTTPError(HTTPError(response))
assert isinstance(error, HTTPError)
assert isinstance(error, urllib.error.HTTPError)
assert error.code == 403
assert error.getcode() == 403
assert error.hdrs is error.response.headers
assert error.info() is error.response.headers
assert error.headers is error.response.headers
assert error.filename == error.response.url
assert error.url == error.response.url
assert error.geturl() == error.response.url
# Passthrough file operations
assert error.read() == b'test'
assert not error.closed
# Technically Response operations are also passed through, which should not be used.
assert error.get_header('test') == 'test'
platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
def test_compat_http_error_autoclose(self):
# Compat HTTPError should not autoclose response
response = self.create_response(403)
assert not response.closed
def test_incomplete_read_error(self):
error = IncompleteRead(b'test', 3, cause='test')
assert isinstance(error, IncompleteRead)
assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
assert str(error) == error.msg == '4 bytes read, 3 more expected'
assert error.partial == b'test'
assert error.expected == 3
assert error.cause == 'test'
error = IncompleteRead(b'aaa')
assert repr(error) == '<IncompleteRead: 3 bytes read>'
assert str(error) == '3 bytes read'
@ -0,0 +1,13 @@
# flake8: noqa: 401
from .common import (
# isort: split
# TODO: all request handlers should be safely imported
from . import _urllib
@ -0,0 +1,522 @@
from __future__ import annotations
import abc
import copy
import enum
import functools
import io
import typing
import urllib.parse
import urllib.request
import urllib.response
from collections.abc import Iterable, Mapping
from email.message import Message
from http import HTTPStatus
from http.cookiejar import CookieJar
from ._helper import make_ssl_context, wrap_request_errors
from .exceptions import (
from ..utils import (
from ..utils.networking import HTTPHeaderDict
if typing.TYPE_CHECKING:
RequestData = bytes | Iterable[bytes] | typing.IO | None
class RequestDirector:
"""RequestDirector class
Helper class that, when given a request, forward it to a RequestHandler that supports it.
@param logger: Logger instance.
@param verbose: Print debug request information to stdout.
def __init__(self, logger, verbose=False):
self.handlers: dict[str, RequestHandler] = {}
self.logger = logger # TODO(Grub4k): default logger
self.verbose = verbose
def close(self):
for handler in self.handlers.values():
def add_handler(self, handler: RequestHandler):
"""Add a handler. If a handler of the same RH_KEY exists, it will overwrite it"""
assert isinstance(handler, RequestHandler), 'handler must be a RequestHandler'
self.handlers[handler.RH_KEY] = handler
def _print_verbose(self, msg):
if self.verbose:
self.logger.stdout(f'director: {msg}')
def send(self, request: Request) -> Response:
Passes a request onto a suitable RequestHandler
if not self.handlers:
raise RequestError('No request handlers configured')
assert isinstance(request, Request)
unexpected_errors = []
unsupported_errors = []
# TODO (future): add a per-request preference system
for handler in reversed(list(self.handlers.values())):
self._print_verbose(f'Checking if "{handler.RH_NAME}" supports this request.')
except UnsupportedRequest as e:
f'"{handler.RH_NAME}" cannot handle this request (reason: {error_to_str(e)})')
self._print_verbose(f'Sending request via "{handler.RH_NAME}"')
response = handler.send(request)
except RequestError:
except Exception as e:
f'[{handler.RH_NAME}] Unexpected error: {error_to_str(e)}{bug_reports_message()}',
assert isinstance(response, Response)
return response
raise NoSupportingHandlers(unsupported_errors, unexpected_errors)
def register(handler):
"""Register a RequestHandler class"""
assert issubclass(handler, RequestHandler), f'{handler} must be a subclass of RequestHandler'
assert handler.RH_KEY not in _REQUEST_HANDLERS, f'RequestHandler {handler.RH_KEY} already registered'
_REQUEST_HANDLERS[handler.RH_KEY] = handler
return handler
class Features(enum.Enum):
ALL_PROXY = enum.auto()
NO_PROXY = enum.auto()
class RequestHandler(abc.ABC):
"""Request Handler class
Request handlers are class that, given a Request,
process the request from start to finish and return a Response.
Concrete subclasses need to redefine the _send(request) method,
which handles the underlying request logic and returns a Response.
RH_NAME class variable may contain a display name for the RequestHandler.
By default, this is generated from the class name.
The concrete request handler MUST have "RH" as the suffix in the class name.
All exceptions raised by a RequestHandler should be an instance of RequestError.
Any other exception raised will be treated as a handler issue.
If a Request is not supported by the handler, an UnsupportedRequest
should be raised with a reason.
By default, some checks are done on the request in _validate() based on the following class variables:
- `_SUPPORTED_URL_SCHEMES`: a tuple of supported url schemes.
Any Request with an url scheme not in this list will raise an UnsupportedRequest.
- `_SUPPORTED_PROXY_SCHEMES`: a tuple of support proxy url schemes. Any Request that contains
a proxy url with an url scheme not in this list will raise an UnsupportedRequest.
- `_SUPPORTED_FEATURES`: a tuple of supported features, as defined in Features enum.
The above may be set to None to disable the checks.
@param logger: logger instance
@param headers: HTTP Headers to include when sending requests.
@param cookiejar: Cookiejar to use for requests.
@param timeout: Socket timeout to use when sending requests.
@param proxies: Proxies to use for sending requests.
@param source_address: Client-side IP address to bind to for requests.
@param verbose: Print debug request and traffic information to stdout.
@param prefer_system_certs: Whether to prefer system certificates over other means (e.g. certifi).
@param client_cert: SSL client certificate configuration.
dict with {client_certificate, client_certificate_key, client_certificate_password}
@param verify: Verify SSL certificates
@param legacy_ssl_support: Enable legacy SSL options such as legacy server connect and older cipher support.
Some configuration options may be available for individual Requests too. In this case,
either the Request configuration option takes precedence or they are merged.
Requests may have additional optional parameters defined as extensions.
RequestHandler subclasses may choose to support custom extensions.
The following extensions are defined for RequestHandler:
- `cookiejar`: Cookiejar to use for this request
- `timeout`: socket timeout to use for this request
Apart from the url protocol, proxies dict may contain the following keys:
- `all`: proxy to use for all protocols. Used as a fallback if no proxy is set for a specific protocol.
- `no`: comma seperated list of hostnames (optionally with port) to not use a proxy for.
Note: a RequestHandler may not support these, as defined in `_SUPPORTED_FEATURES`.
def __init__(
self, *,
logger, # TODO(Grub4k): default logger
headers: HTTPHeaderDict = None,
cookiejar: CookieJar = None,
timeout: float | int | None = None,
proxies: dict = None,
source_address: str = None,
verbose: bool = False,
prefer_system_certs: bool = False,
client_cert: dict[str, str | None] = None,
verify: bool = True,
legacy_ssl_support: bool = False,
self._logger = logger
self.headers = headers or {}
self.cookiejar = cookiejar if cookiejar is not None else CookieJar()
self.timeout = float(timeout or 20)
self.proxies = proxies or {}
self.source_address = source_address
self.verbose = verbose
self.prefer_system_certs = prefer_system_certs
self._client_cert = client_cert or {}
self.verify = verify
self.legacy_ssl_support = legacy_ssl_support
def _make_sslcontext(self):
return make_ssl_context(
use_certifi=not self.prefer_system_certs,
def _merge_headers(self, request_headers):
return HTTPHeaderDict(self.headers, request_headers)
def _check_url_scheme(self, request: Request):
scheme = urllib.parse.urlparse(request.url).scheme.lower()
if self._SUPPORTED_URL_SCHEMES is not None and scheme not in self._SUPPORTED_URL_SCHEMES:
raise UnsupportedRequest(f'Unsupported url scheme: "{scheme}"')
return scheme # for further processing
def _check_proxies(self, proxies):
for proxy_key, proxy_url in proxies.items():
if proxy_url is None:
if proxy_key == 'no':
if self._SUPPORTED_FEATURES is not None and Features.NO_PROXY not in self._SUPPORTED_FEATURES:
raise UnsupportedRequest('"no" proxy is not supported')
if (
proxy_key == 'all'
and self._SUPPORTED_FEATURES is not None
and Features.ALL_PROXY not in self._SUPPORTED_FEATURES
raise UnsupportedRequest('"all" proxy is not supported')
# Unlikely this handler will use this proxy, so ignore.
# This is to allow a case where a proxy may be set for a protocol
# for one handler in which such protocol (and proxy) is not supported by another handler.
if self._SUPPORTED_URL_SCHEMES is not None and proxy_key not in (*self._SUPPORTED_URL_SCHEMES, 'all'):
# Skip proxy scheme checks
# Scheme-less proxies are not supported
if urllib.request._parse_proxy(proxy_url)[0] is None:
raise UnsupportedRequest(f'Proxy "{proxy_url}" missing scheme')
scheme = urllib.parse.urlparse(proxy_url).scheme.lower()
if scheme not in self._SUPPORTED_PROXY_SCHEMES:
raise UnsupportedRequest(f'Unsupported proxy type: "{scheme}"')
def _check_cookiejar_extension(self, extensions):
if not extensions.get('cookiejar'):
if not isinstance(extensions['cookiejar'], CookieJar):
raise UnsupportedRequest('cookiejar is not a CookieJar')
def _check_timeout_extension(self, extensions):
if extensions.get('timeout') is None:
if not isinstance(extensions['timeout'], (float, int)):
raise UnsupportedRequest('timeout is not a float or int')
def _check_extensions(self, extensions):
def _validate(self, request):
self._check_proxies(request.proxies or self.proxies)
def validate(self, request: Request):
if not isinstance(request, Request):
raise TypeError('Expected an instance of Request')
def send(self, request: Request) -> Response:
if not isinstance(request, Request):
raise TypeError('Expected an instance of Request')
return self._send(request)
def _send(self, request: Request):
"""Handle a request from start to finish. Redefine in subclasses."""
def close(self):
def RH_NAME(cls):
return cls.__name__[:-2]
def RH_KEY(cls):
assert cls.__name__.endswith('RH'), 'RequestHandler class names must end with "RH"'
return cls.__name__[:-2]
def __enter__(self):
return self
def __exit__(self, *args):
class Request:
Represents a request to be made.
Partially backwards-compatible with urllib.request.Request.
@param url: url to send. Will be sanitized.
@param data: payload data to send. Must be bytes, iterable of bytes, a file-like object or None
@param headers: headers to send.
@param proxies: proxy dict mapping of proto:proxy to use for the request and any redirects.
@param query: URL query parameters to update the url with.
@param method: HTTP method to use. If no method specified, will use POST if payload data is present else GET
@param extensions: Dictionary of Request extensions to add, as supported by handlers.
def __init__(
url: str,
data: RequestData = None,
headers: typing.Mapping = None,
proxies: dict = None,
query: dict = None,
method: str = None,
extensions: dict = None
self._headers = HTTPHeaderDict()
self._data = None
if query:
url = update_url_query(url, query)
self.url = url
self.method = method
if headers:
self.headers = headers
self.data = data # note: must be done after setting headers
self.proxies = proxies or {}
self.extensions = extensions or {}
def url(self):
return self._url
def url(self, url):
if not isinstance(url, str):
raise TypeError('url must be a string')
elif url.startswith('//'):
url = 'http:' + url
self._url = escape_url(url)
def method(self):
return self._method or ('POST' if self.data is not None else 'GET')
def method(self, method):
if method is None:
self._method = None
elif isinstance(method, str):
self._method = method.upper()
raise TypeError('method must be a string')
def data(self):
return self._data
def data(self, data: RequestData):
# Try catch some common mistakes
if data is not None and (
not isinstance(data, (bytes, io.IOBase, Iterable)) or isinstance(data, (str, Mapping))
raise TypeError('data must be bytes, iterable of bytes, or a file-like object')
if data == self._data and self._data is None:
self.headers.pop('Content-Length', None)
# https://docs.python.org/3/library/urllib.request.html#urllib.request.Request.data
if data != self._data:
if self._data is not None:
self.headers.pop('Content-Length', None)
self._data = data
if self._data is None:
self.headers.pop('Content-Type', None)
if 'Content-Type' not in self.headers and self._data is not None:
self.headers['Content-Type'] = 'application/x-www-form-urlencoded'
def headers(self) -> HTTPHeaderDict:
return self._headers
def headers(self, new_headers: Mapping):
"""Replaces headers of the request. If not a CaseInsensitiveDict, it will be converted to one."""
if isinstance(new_headers, HTTPHeaderDict):
self._headers = new_headers
elif isinstance(new_headers, Mapping):
self._headers = HTTPHeaderDict(new_headers)
raise TypeError('headers must be a mapping')
def update(self, url=None, data=None, headers=None, query=None):
self.data = data or self.data
self.headers.update(headers or {})
self.url = update_url_query(url or self.url, query or {})
def copy(self):
return self.__class__(
HEADRequest = functools.partial(Request, method='HEAD')
PUTRequest = functools.partial(Request, method='PUT')
class Response(io.IOBase):
Base class for HTTP response adapters.
By default, it provides a basic wrapper for a file-like response object.
Interface partially backwards-compatible with addinfourl and http.client.HTTPResponse.
@param fp: Original, file-like, response.
@param url: URL that this is a response of.
@param headers: response headers.
@param status: Response HTTP status code. Default is 200 OK.
@param reason: HTTP status reason. Will use built-in reasons based on status code if not provided.
def __init__(
fp: typing.IO,
url: str,
headers: Mapping[str, str],
status: int = 200,
reason: str = None):
self.fp = fp
self.headers = Message()
for name, value in headers.items():
self.headers.add_header(name, value)
self.status = status
self.url = url
self.reason = reason or HTTPStatus(status).phrase
except ValueError:
self.reason = None
def readable(self):
return self.fp.readable()
def read(self, amt: int = None) -> bytes:
# Expected errors raised here should be of type RequestError or subclasses.
# Subclasses should redefine this method with more precise error handling.
return self.fp.read(amt)
except Exception as e:
raise TransportError(cause=e) from e
def close(self):
return super().close()
def get_header(self, name, default=None):
"""Get header for name.
If there are multiple matching headers, return all seperated by comma."""
headers = self.headers.get_all(name)
if not headers:
return default
if name.title() == 'Set-Cookie':
# Special case, only get the first one
# https://www.rfc-editor.org/rfc/rfc9110.html#section-5.3-4.1
return headers[0]
return ', '.join(headers)
# The following methods are for compatability reasons and are deprecated
def code(self):
return self.status
def getcode(self):
return self.status
def geturl(self):
return self.url
def info(self):
return self.headers
def getheader(self, name, default=None):
return self.get_header(name, default)
@ -1,9 +1,197 @@
import http.client
import socket
import ssl
from __future__ import annotations
import typing
import urllib.error
network_exceptions = [urllib.error.URLError, http.client.HTTPException, socket.error]
if hasattr(ssl, 'CertificateError'):
network_exceptions = tuple(network_exceptions)
from ..utils import YoutubeDLError
if typing.TYPE_CHECKING:
from .common import RequestHandler, Response
class RequestError(YoutubeDLError):
def __init__(
msg: str | None = None,
cause: Exception | str | None = None,
handler: RequestHandler = None
self.handler = handler
self.cause = cause
if not msg and cause:
msg = str(cause)
class UnsupportedRequest(RequestError):
"""raised when a handler cannot handle a request"""
class NoSupportingHandlers(RequestError):
"""raised when no handlers can support a request for various reasons"""
def __init__(self, unsupported_errors: list[UnsupportedRequest], unexpected_errors: list[Exception]):
self.unsupported_errors = unsupported_errors or []
self.unexpected_errors = unexpected_errors or []
# Print a quick summary of the errors
err_handler_map = {}
for err in unsupported_errors:
err_handler_map.setdefault(err.msg, []).append(err.handler.RH_NAME)
reason_str = ', '.join([f'{msg} ({", ".join(handlers)})' for msg, handlers in err_handler_map.items()])
if unexpected_errors:
reason_str = ' + '.join(filter(None, [reason_str, f'{len(unexpected_errors)} unexpected error(s)']))
err_str = 'Unable to handle request'
if reason_str:
err_str += f': {reason_str}'
class TransportError(RequestError):
"""Network related errors"""
class HTTPError(RequestError):
def __init__(self, response: Response, redirect_loop=False):
self.response = response
self.status = response.status
self.reason = response.reason
self.redirect_loop = redirect_loop
msg = f'HTTP Error {response.status}: {response.reason}'
if redirect_loop:
msg += ' (redirect loop detected)'
def close(self):
def __repr__(self):
return f'<HTTPError {self.status}: {self.reason}>'
class IncompleteRead(TransportError):
def __init__(self, partial, expected=None, **kwargs):
self.partial = partial
self.expected = expected
msg = f'{len(partial)} bytes read'
if expected is not None:
msg += f', {expected} more expected'
super().__init__(msg=msg, **kwargs)
def __repr__(self):
return f'<IncompleteRead: {self.msg}>'
class SSLError(TransportError):
class CertificateVerifyError(SSLError):
"""Raised when certificate validated has failed"""
class ProxyError(TransportError):
class _CompatHTTPError(urllib.error.HTTPError, HTTPError):
Provides backwards compatibility with urllib.error.HTTPError.
Do not use this class directly, use HTTPError instead.
def __init__(self, http_error: HTTPError):
self._closer.file = None # Disable auto close
self._http_error = http_error
HTTPError.__init__(self, http_error.response, redirect_loop=http_error.redirect_loop)
def status(self):
return self._http_error.status
def status(self, value):
def reason(self):
return self._http_error.reason
def reason(self, value):
def headers(self):
return self._http_error.response.headers
def headers(self, value):
def info(self):
return self.response.headers
def getcode(self):
return self.status
def geturl(self):
return self.response.url
def code(self):
return self.status
def code(self, value):
def url(self):
return self.response.url
def url(self, value):
def hdrs(self):
return self.response.headers
def hdrs(self, value):
def filename(self):
return self.response.url
def filename(self, value):
def __getattr__(self, name):
return super().__getattr__(name)
def __str__(self):
return str(self._http_error)
def __repr__(self):
return repr(self._http_error)
network_exceptions = (HTTPError, TransportError)
Reference in New Issue