|
|
@ -2,6 +2,7 @@
|
|
|
|
|
|
|
|
|
|
|
|
# Allow direct execution
|
|
|
|
# Allow direct execution
|
|
|
|
import os
|
|
|
|
import os
|
|
|
|
|
|
|
|
import re
|
|
|
|
import sys
|
|
|
|
import sys
|
|
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
import pytest
|
|
|
@ -36,7 +37,7 @@ from test.helper import (
|
|
|
|
verify_address_availability,
|
|
|
|
verify_address_availability,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
from yt_dlp.cookies import YoutubeDLCookieJar
|
|
|
|
from yt_dlp.cookies import YoutubeDLCookieJar
|
|
|
|
from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
|
|
|
|
from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3, zstandard
|
|
|
|
from yt_dlp.networking import (
|
|
|
|
from yt_dlp.networking import (
|
|
|
|
HEADRequest,
|
|
|
|
HEADRequest,
|
|
|
|
PUTRequest,
|
|
|
|
PUTRequest,
|
|
|
@ -62,7 +63,7 @@ from yt_dlp.networking.impersonate import (
|
|
|
|
ImpersonateTarget,
|
|
|
|
ImpersonateTarget,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
from yt_dlp.utils import YoutubeDLError
|
|
|
|
from yt_dlp.utils import YoutubeDLError
|
|
|
|
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
|
|
|
from yt_dlp.utils._utils import _YDLLogger as FakeLogger, int_or_none
|
|
|
|
from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
|
|
|
|
from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
|
|
|
|
|
|
|
|
|
|
|
|
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
@ -217,6 +218,7 @@ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
|
|
|
self.end_headers()
|
|
|
|
self.end_headers()
|
|
|
|
elif self.path == '/content-encoding':
|
|
|
|
elif self.path == '/content-encoding':
|
|
|
|
encodings = self.headers.get('ytdl-encoding', '')
|
|
|
|
encodings = self.headers.get('ytdl-encoding', '')
|
|
|
|
|
|
|
|
content_encoding_header = self.headers.get('ytdl-encoding-header', encodings)
|
|
|
|
payload = b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
payload = b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
for encoding in filter(None, (e.strip() for e in encodings.split(','))):
|
|
|
|
for encoding in filter(None, (e.strip() for e in encodings.split(','))):
|
|
|
|
if encoding == 'br' and brotli:
|
|
|
|
if encoding == 'br' and brotli:
|
|
|
@ -228,6 +230,8 @@ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
|
|
|
payload = buf.getvalue()
|
|
|
|
payload = buf.getvalue()
|
|
|
|
elif encoding == 'deflate':
|
|
|
|
elif encoding == 'deflate':
|
|
|
|
payload = zlib.compress(payload)
|
|
|
|
payload = zlib.compress(payload)
|
|
|
|
|
|
|
|
elif encoding == 'zstd':
|
|
|
|
|
|
|
|
payload = zstandard.compress(payload)
|
|
|
|
elif encoding == 'unsupported':
|
|
|
|
elif encoding == 'unsupported':
|
|
|
|
payload = b'raw'
|
|
|
|
payload = b'raw'
|
|
|
|
break
|
|
|
|
break
|
|
|
@ -235,7 +239,7 @@ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
|
|
|
self._status(415)
|
|
|
|
self._status(415)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-Encoding', encodings)
|
|
|
|
self.send_header('Content-Encoding', content_encoding_header)
|
|
|
|
self.send_header('Content-Length', str(len(payload)))
|
|
|
|
self.send_header('Content-Length', str(len(payload)))
|
|
|
|
self.end_headers()
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(payload)
|
|
|
|
self.wfile.write(payload)
|
|
|
@ -622,7 +626,7 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
|
|
|
|
assert data == '<html><video src="/vid.mp4" /></html>'
|
|
|
|
assert data == '<html><video src="/vid.mp4" /></html>'
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skip_handler('CurlCFFI', 'not applicable to curl-cffi')
|
|
|
|
@pytest.mark.skip_handler('CurlCFFI', 'not applicable to curl-cffi')
|
|
|
|
@pytest.mark.skipif(not brotli, reason='brotli support is not installed')
|
|
|
|
@pytest.mark.skipif(not brotli, reason='brotli not available')
|
|
|
|
def test_brotli(self, handler):
|
|
|
|
def test_brotli(self, handler):
|
|
|
|
with handler() as rh:
|
|
|
|
with handler() as rh:
|
|
|
|
res = validate_and_send(
|
|
|
|
res = validate_and_send(
|
|
|
@ -632,6 +636,52 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
|
|
|
|
assert res.headers.get('Content-Encoding') == 'br'
|
|
|
|
assert res.headers.get('Content-Encoding') == 'br'
|
|
|
|
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not brotli, reason='brotli not available')
|
|
|
|
|
|
|
|
def test_brotli_error(self, handler):
|
|
|
|
|
|
|
|
with handler() as rh:
|
|
|
|
|
|
|
|
with pytest.raises(TransportError):
|
|
|
|
|
|
|
|
# depending on implementation, error may be raised at request time or read time
|
|
|
|
|
|
|
|
res = validate_and_send(
|
|
|
|
|
|
|
|
rh, Request(
|
|
|
|
|
|
|
|
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
|
|
|
|
|
headers={'ytdl-encoding': 'deflate', 'ytdl-encoding-header': 'br'}))
|
|
|
|
|
|
|
|
res.read()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: implement centralised version parser
|
|
|
|
|
|
|
|
@pytest.mark.skip_handler_if(
|
|
|
|
|
|
|
|
'CurlCFFI',
|
|
|
|
|
|
|
|
lambda _: tuple(map(int, re.split(r'\D+', curl_cffi.__version__)[:3])) < (0, 7, 0),
|
|
|
|
|
|
|
|
'zstd not supported by curl_cffi < 0.7.0')
|
|
|
|
|
|
|
|
@pytest.mark.skip_handler_if(
|
|
|
|
|
|
|
|
'Requests',
|
|
|
|
|
|
|
|
lambda _: tuple(int_or_none(x, default=0) for x in urllib3.__version__.split('.')) < (2, 0, 0),
|
|
|
|
|
|
|
|
'zstd not supported by urllib3 < 2.0.0')
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not zstandard, reason='zstandard not available')
|
|
|
|
|
|
|
|
def test_zstd(self, handler):
|
|
|
|
|
|
|
|
with handler() as rh:
|
|
|
|
|
|
|
|
res = validate_and_send(
|
|
|
|
|
|
|
|
rh, Request(
|
|
|
|
|
|
|
|
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
|
|
|
|
|
headers={'ytdl-encoding': 'zstd'}))
|
|
|
|
|
|
|
|
assert res.headers.get('Content-Encoding') == 'zstd'
|
|
|
|
|
|
|
|
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: implement centralised version parser
|
|
|
|
|
|
|
|
@pytest.mark.skip_handler_if(
|
|
|
|
|
|
|
|
'Requests',
|
|
|
|
|
|
|
|
lambda _: tuple(int_or_none(x, default=0) for x in urllib3.__version__.split('.')) < (2, 0, 0),
|
|
|
|
|
|
|
|
'zstd not supported by urllib3 < 2.0.0')
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not zstandard, reason='zstandard not available')
|
|
|
|
|
|
|
|
def test_zstd_error(self, handler):
|
|
|
|
|
|
|
|
with handler() as rh:
|
|
|
|
|
|
|
|
with pytest.raises(TransportError):
|
|
|
|
|
|
|
|
# depending on implementation, error may be raised at request time or read time
|
|
|
|
|
|
|
|
res = validate_and_send(
|
|
|
|
|
|
|
|
rh, Request(
|
|
|
|
|
|
|
|
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
|
|
|
|
|
headers={'ytdl-encoding': 'unsupported', 'ytdl-encoding-header': 'zstd'}))
|
|
|
|
|
|
|
|
res.read()
|
|
|
|
|
|
|
|
|
|
|
|
def test_deflate(self, handler):
|
|
|
|
def test_deflate(self, handler):
|
|
|
|
with handler() as rh:
|
|
|
|
with handler() as rh:
|
|
|
|
res = validate_and_send(
|
|
|
|
res = validate_and_send(
|
|
|
@ -641,6 +691,16 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
|
|
|
|
assert res.headers.get('Content-Encoding') == 'deflate'
|
|
|
|
assert res.headers.get('Content-Encoding') == 'deflate'
|
|
|
|
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_deflate_error(self, handler):
|
|
|
|
|
|
|
|
with handler() as rh:
|
|
|
|
|
|
|
|
with pytest.raises(TransportError):
|
|
|
|
|
|
|
|
# depending on implementation, error may be raised at request time or read time
|
|
|
|
|
|
|
|
res = validate_and_send(
|
|
|
|
|
|
|
|
rh, Request(
|
|
|
|
|
|
|
|
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
|
|
|
|
|
headers={'ytdl-encoding': 'gzip', 'ytdl-encoding-header': 'deflate'}))
|
|
|
|
|
|
|
|
res.read()
|
|
|
|
|
|
|
|
|
|
|
|
def test_gzip(self, handler):
|
|
|
|
def test_gzip(self, handler):
|
|
|
|
with handler() as rh:
|
|
|
|
with handler() as rh:
|
|
|
|
res = validate_and_send(
|
|
|
|
res = validate_and_send(
|
|
|
@ -650,6 +710,16 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
|
|
|
|
assert res.headers.get('Content-Encoding') == 'gzip'
|
|
|
|
assert res.headers.get('Content-Encoding') == 'gzip'
|
|
|
|
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_gzip_error(self, handler):
|
|
|
|
|
|
|
|
with handler() as rh:
|
|
|
|
|
|
|
|
with pytest.raises(TransportError):
|
|
|
|
|
|
|
|
# depending on implementation, error may be raised at request time or read time
|
|
|
|
|
|
|
|
res = validate_and_send(
|
|
|
|
|
|
|
|
rh, Request(
|
|
|
|
|
|
|
|
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
|
|
|
|
|
headers={'ytdl-encoding': 'unsupported', 'ytdl-encoding-header': 'gzip'}))
|
|
|
|
|
|
|
|
res.read()
|
|
|
|
|
|
|
|
|
|
|
|
def test_multiple_encodings(self, handler):
|
|
|
|
def test_multiple_encodings(self, handler):
|
|
|
|
with handler() as rh:
|
|
|
|
with handler() as rh:
|
|
|
|
for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
|
|
|
|
for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
|
|
|
|