mirror of https://github.com/yt-dlp/yt-dlp
Merge branch 'yt-dlp:master' into pr/6498
commit
194bc49c55
@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Allow direct execution
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import http.cookiejar
|
||||
|
||||
from test.helper import FakeYDL
|
||||
from yt_dlp.downloader.external import (
|
||||
Aria2cFD,
|
||||
AxelFD,
|
||||
CurlFD,
|
||||
FFmpegFD,
|
||||
HttpieFD,
|
||||
WgetFD,
|
||||
)
|
||||
|
||||
TEST_COOKIE = {
|
||||
'version': 0,
|
||||
'name': 'test',
|
||||
'value': 'ytdlp',
|
||||
'port': None,
|
||||
'port_specified': False,
|
||||
'domain': '.example.com',
|
||||
'domain_specified': True,
|
||||
'domain_initial_dot': False,
|
||||
'path': '/',
|
||||
'path_specified': True,
|
||||
'secure': False,
|
||||
'expires': None,
|
||||
'discard': False,
|
||||
'comment': None,
|
||||
'comment_url': None,
|
||||
'rest': {},
|
||||
}
|
||||
|
||||
TEST_INFO = {'url': 'http://www.example.com/'}
|
||||
|
||||
|
||||
class TestHttpieFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = HttpieFD(ydl, {})
|
||||
self.assertEqual(
|
||||
downloader._make_cmd('test', TEST_INFO),
|
||||
['http', '--download', '--output', 'test', 'http://www.example.com/'])
|
||||
|
||||
# Test cookie header is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
self.assertEqual(
|
||||
downloader._make_cmd('test', TEST_INFO),
|
||||
['http', '--download', '--output', 'test', 'http://www.example.com/', 'Cookie:test=ytdlp'])
|
||||
|
||||
|
||||
class TestAxelFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = AxelFD(ydl, {})
|
||||
self.assertEqual(
|
||||
downloader._make_cmd('test', TEST_INFO),
|
||||
['axel', '-o', 'test', '--', 'http://www.example.com/'])
|
||||
|
||||
# Test cookie header is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
self.assertEqual(
|
||||
downloader._make_cmd('test', TEST_INFO),
|
||||
['axel', '-o', 'test', '-H', 'Cookie: test=ytdlp', '--max-redirect=0', '--', 'http://www.example.com/'])
|
||||
|
||||
|
||||
class TestWgetFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = WgetFD(ydl, {})
|
||||
self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
|
||||
# Test cookiejar tempfile arg is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
|
||||
|
||||
|
||||
class TestCurlFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = CurlFD(ydl, {})
|
||||
self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))
|
||||
# Test cookie header is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))
|
||||
self.assertIn('test=ytdlp', downloader._make_cmd('test', TEST_INFO))
|
||||
|
||||
|
||||
class TestAria2cFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = Aria2cFD(ydl, {})
|
||||
downloader._make_cmd('test', TEST_INFO)
|
||||
self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
|
||||
|
||||
# Test cookiejar tempfile arg is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
cmd = downloader._make_cmd('test', TEST_INFO)
|
||||
self.assertIn(f'--load-cookies={downloader._cookies_tempfile}', cmd)
|
||||
|
||||
|
||||
@unittest.skipUnless(FFmpegFD.available(), 'ffmpeg not found')
|
||||
class TestFFmpegFD(unittest.TestCase):
|
||||
_args = []
|
||||
|
||||
def _test_cmd(self, args):
|
||||
self._args = args
|
||||
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = FFmpegFD(ydl, {})
|
||||
downloader._debug_cmd = self._test_cmd
|
||||
|
||||
downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
|
||||
self.assertEqual(self._args, [
|
||||
'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/',
|
||||
'-c', 'copy', '-f', 'mp4', 'file:test'])
|
||||
|
||||
# Test cookies arg is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
|
||||
self.assertEqual(self._args, [
|
||||
'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n',
|
||||
'-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
|
||||
|
||||
# Test with non-url input (ffmpeg reads from stdin '-' for websockets)
|
||||
downloader._call_downloader('test', {'url': 'x', 'ext': 'mp4'})
|
||||
self.assertEqual(self._args, [
|
||||
'ffmpeg', '-y', '-hide_banner', '-i', 'x', '-c', 'copy', '-f', 'mp4', 'file:test'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -1,500 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Allow direct execution
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import gzip
|
||||
import http.cookiejar
|
||||
import http.server
|
||||
import io
|
||||
import pathlib
|
||||
import ssl
|
||||
import tempfile
|
||||
import threading
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
import zlib
|
||||
|
||||
from test.helper import http_server_port
|
||||
from yt_dlp import YoutubeDL
|
||||
from yt_dlp.dependencies import brotli
|
||||
from yt_dlp.utils import sanitized_Request, urlencode_postdata
|
||||
|
||||
from .helper import FakeYDL
|
||||
|
||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
protocol_version = 'HTTP/1.1'
|
||||
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
|
||||
def _headers(self):
|
||||
payload = str(self.headers).encode('utf-8')
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
|
||||
def _redirect(self):
|
||||
self.send_response(int(self.path[len('/redirect_'):]))
|
||||
self.send_header('Location', '/method')
|
||||
self.send_header('Content-Length', '0')
|
||||
self.end_headers()
|
||||
|
||||
def _method(self, method, payload=None):
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Length', str(len(payload or '')))
|
||||
self.send_header('Method', method)
|
||||
self.end_headers()
|
||||
if payload:
|
||||
self.wfile.write(payload)
|
||||
|
||||
def _status(self, status):
|
||||
payload = f'<html>{status} NOT FOUND</html>'.encode()
|
||||
self.send_response(int(status))
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
|
||||
def _read_data(self):
|
||||
if 'Content-Length' in self.headers:
|
||||
return self.rfile.read(int(self.headers['Content-Length']))
|
||||
|
||||
def do_POST(self):
|
||||
data = self._read_data()
|
||||
if self.path.startswith('/redirect_'):
|
||||
self._redirect()
|
||||
elif self.path.startswith('/method'):
|
||||
self._method('POST', data)
|
||||
elif self.path.startswith('/headers'):
|
||||
self._headers()
|
||||
else:
|
||||
self._status(404)
|
||||
|
||||
def do_HEAD(self):
|
||||
if self.path.startswith('/redirect_'):
|
||||
self._redirect()
|
||||
elif self.path.startswith('/method'):
|
||||
self._method('HEAD')
|
||||
else:
|
||||
self._status(404)
|
||||
|
||||
def do_PUT(self):
|
||||
data = self._read_data()
|
||||
if self.path.startswith('/redirect_'):
|
||||
self._redirect()
|
||||
elif self.path.startswith('/method'):
|
||||
self._method('PUT', data)
|
||||
else:
|
||||
self._status(404)
|
||||
|
||||
def do_GET(self):
|
||||
if self.path == '/video.html':
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload))) # required for persistent connections
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
elif self.path == '/vid.mp4':
|
||||
payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'video/mp4')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
elif self.path == '/%E4%B8%AD%E6%96%87.html':
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
elif self.path == '/%c7%9f':
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
elif self.path.startswith('/redirect_'):
|
||||
self._redirect()
|
||||
elif self.path.startswith('/method'):
|
||||
self._method('GET')
|
||||
elif self.path.startswith('/headers'):
|
||||
self._headers()
|
||||
elif self.path == '/trailing_garbage':
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Encoding', 'gzip')
|
||||
buf = io.BytesIO()
|
||||
with gzip.GzipFile(fileobj=buf, mode='wb') as f:
|
||||
f.write(payload)
|
||||
compressed = buf.getvalue() + b'trailing garbage'
|
||||
self.send_header('Content-Length', str(len(compressed)))
|
||||
self.end_headers()
|
||||
self.wfile.write(compressed)
|
||||
elif self.path == '/302-non-ascii-redirect':
|
||||
new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
|
||||
self.send_response(301)
|
||||
self.send_header('Location', new_url)
|
||||
self.send_header('Content-Length', '0')
|
||||
self.end_headers()
|
||||
elif self.path == '/content-encoding':
|
||||
encodings = self.headers.get('ytdl-encoding', '')
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
for encoding in filter(None, (e.strip() for e in encodings.split(','))):
|
||||
if encoding == 'br' and brotli:
|
||||
payload = brotli.compress(payload)
|
||||
elif encoding == 'gzip':
|
||||
buf = io.BytesIO()
|
||||
with gzip.GzipFile(fileobj=buf, mode='wb') as f:
|
||||
f.write(payload)
|
||||
payload = buf.getvalue()
|
||||
elif encoding == 'deflate':
|
||||
payload = zlib.compress(payload)
|
||||
elif encoding == 'unsupported':
|
||||
payload = b'raw'
|
||||
break
|
||||
else:
|
||||
self._status(415)
|
||||
return
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Encoding', encodings)
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
|
||||
else:
|
||||
self._status(404)
|
||||
|
||||
def send_header(self, keyword, value):
|
||||
"""
|
||||
Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
|
||||
This is against what is defined in RFC 3986, however we need to test we support this
|
||||
since some sites incorrectly do this.
|
||||
"""
|
||||
if keyword.lower() == 'connection':
|
||||
return super().send_header(keyword, value)
|
||||
|
||||
if not hasattr(self, '_headers_buffer'):
|
||||
self._headers_buffer = []
|
||||
|
||||
self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
|
||||
|
||||
|
||||
class FakeLogger:
|
||||
def debug(self, msg):
|
||||
pass
|
||||
|
||||
def warning(self, msg):
|
||||
pass
|
||||
|
||||
def error(self, msg):
|
||||
pass
|
||||
|
||||
|
||||
class TestHTTP(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# HTTP server
|
||||
self.http_httpd = http.server.ThreadingHTTPServer(
|
||||
('127.0.0.1', 0), HTTPTestRequestHandler)
|
||||
self.http_port = http_server_port(self.http_httpd)
|
||||
self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
|
||||
# FIXME: we should probably stop the http server thread after each test
|
||||
# See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
|
||||
self.http_server_thread.daemon = True
|
||||
self.http_server_thread.start()
|
||||
|
||||
# HTTPS server
|
||||
certfn = os.path.join(TEST_DIR, 'testcert.pem')
|
||||
self.https_httpd = http.server.ThreadingHTTPServer(
|
||||
('127.0.0.1', 0), HTTPTestRequestHandler)
|
||||
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
sslctx.load_cert_chain(certfn, None)
|
||||
self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True)
|
||||
self.https_port = http_server_port(self.https_httpd)
|
||||
self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
|
||||
self.https_server_thread.daemon = True
|
||||
self.https_server_thread.start()
|
||||
|
||||
def test_nocheckcertificate(self):
|
||||
with FakeYDL({'logger': FakeLogger()}) as ydl:
|
||||
with self.assertRaises(urllib.error.URLError):
|
||||
ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
|
||||
|
||||
with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
|
||||
r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
|
||||
self.assertEqual(r.status, 200)
|
||||
r.close()
|
||||
|
||||
def test_percent_encode(self):
|
||||
with FakeYDL() as ydl:
|
||||
# Unicode characters should be encoded with uppercase percent-encoding
|
||||
res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
|
||||
self.assertEqual(res.status, 200)
|
||||
res.close()
|
||||
# don't normalize existing percent encodings
|
||||
res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
|
||||
self.assertEqual(res.status, 200)
|
||||
res.close()
|
||||
|
||||
def test_unicode_path_redirection(self):
|
||||
with FakeYDL() as ydl:
|
||||
r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
|
||||
self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
|
||||
r.close()
|
||||
|
||||
def test_redirect(self):
|
||||
with FakeYDL() as ydl:
|
||||
def do_req(redirect_status, method):
|
||||
data = b'testdata' if method in ('POST', 'PUT') else None
|
||||
res = ydl.urlopen(sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
|
||||
return res.read().decode('utf-8'), res.headers.get('method', '')
|
||||
|
||||
# A 303 must either use GET or HEAD for subsequent request
|
||||
self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
|
||||
self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
|
||||
|
||||
self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
|
||||
|
||||
# 301 and 302 turn POST only into a GET
|
||||
self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
|
||||
self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
|
||||
self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
|
||||
self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
|
||||
|
||||
self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
|
||||
self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
|
||||
|
||||
# 307 and 308 should not change method
|
||||
for m in ('POST', 'PUT'):
|
||||
self.assertEqual(do_req(307, m), ('testdata', m))
|
||||
self.assertEqual(do_req(308, m), ('testdata', m))
|
||||
|
||||
self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
|
||||
self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
|
||||
|
||||
# These should not redirect and instead raise an HTTPError
|
||||
for code in (300, 304, 305, 306):
|
||||
with self.assertRaises(urllib.error.HTTPError):
|
||||
do_req(code, 'GET')
|
||||
|
||||
def test_content_type(self):
|
||||
# https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
|
||||
with FakeYDL({'nocheckcertificate': True}) as ydl:
|
||||
# method should be auto-detected as POST
|
||||
r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'}))
|
||||
|
||||
headers = ydl.urlopen(r).read().decode('utf-8')
|
||||
self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
|
||||
|
||||
# test http
|
||||
r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'}))
|
||||
headers = ydl.urlopen(r).read().decode('utf-8')
|
||||
self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
|
||||
|
||||
def test_cookiejar(self):
|
||||
with FakeYDL() as ydl:
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
|
||||
0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
|
||||
False, '/headers', True, False, None, False, None, None, {}))
|
||||
data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
|
||||
self.assertIn(b'Cookie: test=ytdlp', data)
|
||||
|
||||
def test_no_compression_compat_header(self):
|
||||
with FakeYDL() as ydl:
|
||||
data = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/headers',
|
||||
headers={'Youtubedl-no-compression': True})).read()
|
||||
self.assertIn(b'Accept-Encoding: identity', data)
|
||||
self.assertNotIn(b'youtubedl-no-compression', data.lower())
|
||||
|
||||
def test_gzip_trailing_garbage(self):
|
||||
# https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
|
||||
# https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
|
||||
with FakeYDL() as ydl:
|
||||
data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
|
||||
self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
@unittest.skipUnless(brotli, 'brotli support is not installed')
|
||||
def test_brotli(self):
|
||||
with FakeYDL() as ydl:
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': 'br'}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), 'br')
|
||||
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
def test_deflate(self):
|
||||
with FakeYDL() as ydl:
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': 'deflate'}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
|
||||
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
def test_gzip(self):
|
||||
with FakeYDL() as ydl:
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': 'gzip'}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
|
||||
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
def test_multiple_encodings(self):
|
||||
# https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
|
||||
with FakeYDL() as ydl:
|
||||
for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': pair}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), pair)
|
||||
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
def test_unsupported_encoding(self):
|
||||
# it should return the raw content
|
||||
with FakeYDL() as ydl:
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': 'unsupported'}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
|
||||
self.assertEqual(res.read(), b'raw')
|
||||
|
||||
|
||||
class TestClientCert(unittest.TestCase):
|
||||
def setUp(self):
|
||||
certfn = os.path.join(TEST_DIR, 'testcert.pem')
|
||||
self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
|
||||
cacertfn = os.path.join(self.certdir, 'ca.crt')
|
||||
self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
|
||||
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
sslctx.verify_mode = ssl.CERT_REQUIRED
|
||||
sslctx.load_verify_locations(cafile=cacertfn)
|
||||
sslctx.load_cert_chain(certfn, None)
|
||||
self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
|
||||
self.port = http_server_port(self.httpd)
|
||||
self.server_thread = threading.Thread(target=self.httpd.serve_forever)
|
||||
self.server_thread.daemon = True
|
||||
self.server_thread.start()
|
||||
|
||||
def _run_test(self, **params):
|
||||
ydl = YoutubeDL({
|
||||
'logger': FakeLogger(),
|
||||
# Disable client-side validation of unacceptable self-signed testcert.pem
|
||||
# The test is of a check on the server side, so unaffected
|
||||
'nocheckcertificate': True,
|
||||
**params,
|
||||
})
|
||||
r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html')
|
||||
self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4')
|
||||
|
||||
def test_certificate_combined_nopass(self):
|
||||
self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
|
||||
|
||||
def test_certificate_nocombined_nopass(self):
|
||||
self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
|
||||
client_certificate_key=os.path.join(self.certdir, 'client.key'))
|
||||
|
||||
def test_certificate_combined_pass(self):
|
||||
self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
|
||||
client_certificate_password='foobar')
|
||||
|
||||
def test_certificate_nocombined_pass(self):
|
||||
self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
|
||||
client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
|
||||
client_certificate_password='foobar')
|
||||
|
||||
|
||||
def _build_proxy_handler(name):
|
||||
class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
proxy_name = name
|
||||
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
|
||||
def do_GET(self):
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.end_headers()
|
||||
self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
|
||||
return HTTPTestRequestHandler
|
||||
|
||||
|
||||
class TestProxy(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.proxy = http.server.HTTPServer(
|
||||
('127.0.0.1', 0), _build_proxy_handler('normal'))
|
||||
self.port = http_server_port(self.proxy)
|
||||
self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
|
||||
self.proxy_thread.daemon = True
|
||||
self.proxy_thread.start()
|
||||
|
||||
self.geo_proxy = http.server.HTTPServer(
|
||||
('127.0.0.1', 0), _build_proxy_handler('geo'))
|
||||
self.geo_port = http_server_port(self.geo_proxy)
|
||||
self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
|
||||
self.geo_proxy_thread.daemon = True
|
||||
self.geo_proxy_thread.start()
|
||||
|
||||
def test_proxy(self):
|
||||
geo_proxy = f'127.0.0.1:{self.geo_port}'
|
||||
ydl = YoutubeDL({
|
||||
'proxy': f'127.0.0.1:{self.port}',
|
||||
'geo_verification_proxy': geo_proxy,
|
||||
})
|
||||
url = 'http://foo.com/bar'
|
||||
response = ydl.urlopen(url).read().decode()
|
||||
self.assertEqual(response, f'normal: {url}')
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header('Ytdl-request-proxy', geo_proxy)
|
||||
response = ydl.urlopen(req).read().decode()
|
||||
self.assertEqual(response, f'geo: {url}')
|
||||
|
||||
def test_proxy_with_idn(self):
|
||||
ydl = YoutubeDL({
|
||||
'proxy': f'127.0.0.1:{self.port}',
|
||||
})
|
||||
url = 'http://中文.tw/'
|
||||
response = ydl.urlopen(url).read().decode()
|
||||
# b'xn--fiq228c' is '中文'.encode('idna')
|
||||
self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
|
||||
|
||||
|
||||
class TestFileURL(unittest.TestCase):
|
||||
# See https://github.com/ytdl-org/youtube-dl/issues/8227
|
||||
def test_file_urls(self):
|
||||
tf = tempfile.NamedTemporaryFile(delete=False)
|
||||
tf.write(b'foobar')
|
||||
tf.close()
|
||||
url = pathlib.Path(tf.name).as_uri()
|
||||
with FakeYDL() as ydl:
|
||||
self.assertRaisesRegex(
|
||||
urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url)
|
||||
with FakeYDL({'enable_file_urls': True}) as ydl:
|
||||
res = ydl.urlopen(url)
|
||||
self.assertEqual(res.read(), b'foobar')
|
||||
res.close()
|
||||
os.unlink(tf.name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,279 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Allow direct execution
|
||||
import os
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import contextlib
|
||||
import io
|
||||
import platform
|
||||
import random
|
||||
import ssl
|
||||
import urllib.error
|
||||
import warnings
|
||||
|
||||
from yt_dlp.cookies import YoutubeDLCookieJar
|
||||
from yt_dlp.dependencies import certifi
|
||||
from yt_dlp.networking import Response
|
||||
from yt_dlp.networking._helper import (
|
||||
InstanceStoreMixin,
|
||||
add_accept_encoding_header,
|
||||
get_redirect_method,
|
||||
make_socks_proxy_opts,
|
||||
select_proxy,
|
||||
ssl_load_certs,
|
||||
)
|
||||
from yt_dlp.networking.exceptions import (
|
||||
HTTPError,
|
||||
IncompleteRead,
|
||||
_CompatHTTPError,
|
||||
)
|
||||
from yt_dlp.socks import ProxyType
|
||||
from yt_dlp.utils.networking import HTTPHeaderDict
|
||||
|
||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
class TestNetworkingUtils:
|
||||
|
||||
def test_select_proxy(self):
|
||||
proxies = {
|
||||
'all': 'socks5://example.com',
|
||||
'http': 'http://example.com:1080',
|
||||
'no': 'bypass.example.com,yt-dl.org'
|
||||
}
|
||||
|
||||
assert select_proxy('https://example.com', proxies) == proxies['all']
|
||||
assert select_proxy('http://example.com', proxies) == proxies['http']
|
||||
assert select_proxy('http://bypass.example.com', proxies) is None
|
||||
assert select_proxy('https://yt-dl.org', proxies) is None
|
||||
|
||||
@pytest.mark.parametrize('socks_proxy,expected', [
|
||||
('socks5h://example.com', {
|
||||
'proxytype': ProxyType.SOCKS5,
|
||||
'addr': 'example.com',
|
||||
'port': 1080,
|
||||
'rdns': True,
|
||||
'username': None,
|
||||
'password': None
|
||||
}),
|
||||
('socks5://user:@example.com:5555', {
|
||||
'proxytype': ProxyType.SOCKS5,
|
||||
'addr': 'example.com',
|
||||
'port': 5555,
|
||||
'rdns': False,
|
||||
'username': 'user',
|
||||
'password': ''
|
||||
}),
|
||||
('socks4://u%40ser:pa%20ss@127.0.0.1:1080', {
|
||||
'proxytype': ProxyType.SOCKS4,
|
||||
'addr': '127.0.0.1',
|
||||
'port': 1080,
|
||||
'rdns': False,
|
||||
'username': 'u@ser',
|
||||
'password': 'pa ss'
|
||||
}),
|
||||
('socks4a://:pa%20ss@127.0.0.1', {
|
||||
'proxytype': ProxyType.SOCKS4A,
|
||||
'addr': '127.0.0.1',
|
||||
'port': 1080,
|
||||
'rdns': True,
|
||||
'username': '',
|
||||
'password': 'pa ss'
|
||||
})
|
||||
])
|
||||
def test_make_socks_proxy_opts(self, socks_proxy, expected):
|
||||
assert make_socks_proxy_opts(socks_proxy) == expected
|
||||
|
||||
def test_make_socks_proxy_unknown(self):
|
||||
with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
|
||||
make_socks_proxy_opts('socks://127.0.0.1')
|
||||
|
||||
@pytest.mark.skipif(not certifi, reason='certifi is not installed')
|
||||
def test_load_certifi(self):
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
context2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ssl_load_certs(context, use_certifi=True)
|
||||
context2.load_verify_locations(cafile=certifi.where())
|
||||
assert context.get_ca_certs() == context2.get_ca_certs()
|
||||
|
||||
# Test load normal certs
|
||||
# XXX: could there be a case where system certs are the same as certifi?
|
||||
context3 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ssl_load_certs(context3, use_certifi=False)
|
||||
assert context3.get_ca_certs() != context.get_ca_certs()
|
||||
|
||||
@pytest.mark.parametrize('method,status,expected', [
|
||||
('GET', 303, 'GET'),
|
||||
('HEAD', 303, 'HEAD'),
|
||||
('PUT', 303, 'GET'),
|
||||
('POST', 301, 'GET'),
|
||||
('HEAD', 301, 'HEAD'),
|
||||
('POST', 302, 'GET'),
|
||||
('HEAD', 302, 'HEAD'),
|
||||
('PUT', 302, 'PUT'),
|
||||
('POST', 308, 'POST'),
|
||||
('POST', 307, 'POST'),
|
||||
('HEAD', 308, 'HEAD'),
|
||||
('HEAD', 307, 'HEAD'),
|
||||
])
|
||||
def test_get_redirect_method(self, method, status, expected):
|
||||
assert get_redirect_method(method, status) == expected
|
||||
|
||||
@pytest.mark.parametrize('headers,supported_encodings,expected', [
|
||||
({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
|
||||
({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
|
||||
({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
|
||||
])
|
||||
def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
|
||||
headers = HTTPHeaderDict(headers)
|
||||
add_accept_encoding_header(headers, supported_encodings)
|
||||
assert headers == HTTPHeaderDict(expected)
|
||||
|
||||
|
||||
class TestInstanceStoreMixin:
|
||||
|
||||
class FakeInstanceStoreMixin(InstanceStoreMixin):
|
||||
def _create_instance(self, **kwargs):
|
||||
return random.randint(0, 1000000)
|
||||
|
||||
def _close_instance(self, instance):
|
||||
pass
|
||||
|
||||
def test_mixin(self):
|
||||
mixin = self.FakeInstanceStoreMixin()
|
||||
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
|
||||
|
||||
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
|
||||
|
||||
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
|
||||
|
||||
assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
|
||||
|
||||
assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
|
||||
|
||||
cookiejar = YoutubeDLCookieJar()
|
||||
assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
|
||||
|
||||
assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
|
||||
|
||||
# Different order
|
||||
assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
|
||||
|
||||
m = mixin._get_instance(t=1234)
|
||||
assert mixin._get_instance(t=1234) == m
|
||||
mixin._clear_instances()
|
||||
assert mixin._get_instance(t=1234) != m
|
||||
|
||||
|
||||
class TestNetworkingExceptions:
|
||||
|
||||
@staticmethod
|
||||
def create_response(status):
|
||||
return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
|
||||
|
||||
@pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
|
||||
def test_http_error(self, http_error_class):
|
||||
|
||||
response = self.create_response(403)
|
||||
error = http_error_class(response)
|
||||
|
||||
assert error.status == 403
|
||||
assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
|
||||
assert error.reason == response.reason
|
||||
assert error.response is response
|
||||
|
||||
data = error.response.read()
|
||||
assert data == b'test'
|
||||
assert repr(error) == '<HTTPError 403: Forbidden>'
|
||||
|
||||
@pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
|
||||
def test_redirect_http_error(self, http_error_class):
|
||||
response = self.create_response(301)
|
||||
error = http_error_class(response, redirect_loop=True)
|
||||
assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
|
||||
assert error.reason == 'Moved Permanently'
|
||||
|
||||
def test_compat_http_error(self):
|
||||
response = self.create_response(403)
|
||||
error = _CompatHTTPError(HTTPError(response))
|
||||
assert isinstance(error, HTTPError)
|
||||
assert isinstance(error, urllib.error.HTTPError)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def raises_deprecation_warning():
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter('always')
|
||||
yield
|
||||
|
||||
if len(w) == 0:
|
||||
pytest.fail('Did not raise DeprecationWarning')
|
||||
if len(w) > 1:
|
||||
pytest.fail(f'Raised multiple warnings: {w}')
|
||||
|
||||
if not issubclass(w[-1].category, DeprecationWarning):
|
||||
pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
|
||||
w.clear()
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.code == 403
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.getcode() == 403
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.hdrs is error.response.headers
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.info() is error.response.headers
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.headers is error.response.headers
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.filename == error.response.url
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.url == error.response.url
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.geturl() == error.response.url
|
||||
|
||||
# Passthrough file operations
|
||||
with raises_deprecation_warning():
|
||||
assert error.read() == b'test'
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert not error.closed
|
||||
|
||||
with raises_deprecation_warning():
|
||||
# Technically Response operations are also passed through, which should not be used.
|
||||
assert error.get_header('test') == 'test'
|
||||
|
||||
# Should not raise a warning
|
||||
error.close()
|
||||
|
||||
@pytest.mark.skipif(
|
||||
platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
|
||||
def test_compat_http_error_autoclose(self):
|
||||
# Compat HTTPError should not autoclose response
|
||||
response = self.create_response(403)
|
||||
_CompatHTTPError(HTTPError(response))
|
||||
assert not response.closed
|
||||
|
||||
def test_incomplete_read_error(self):
|
||||
error = IncompleteRead(b'test', 3, cause='test')
|
||||
assert isinstance(error, IncompleteRead)
|
||||
assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
|
||||
assert str(error) == error.msg == '4 bytes read, 3 more expected'
|
||||
assert error.partial == b'test'
|
||||
assert error.expected == 3
|
||||
assert error.cause == 'test'
|
||||
|
||||
error = IncompleteRead(b'aaa')
|
||||
assert repr(error) == '<IncompleteRead: 3 bytes read>'
|
||||
assert str(error) == '3 bytes read'
|
@ -0,0 +1,13 @@
|
||||
# flake8: noqa: F405
|
||||
from types import * # noqa: F403
|
||||
|
||||
from .compat_utils import passthrough_module
|
||||
|
||||
passthrough_module(__name__, 'types')
|
||||
del passthrough_module
|
||||
|
||||
try:
|
||||
# NB: pypy has builtin NoneType, so checking NameError won't work
|
||||
from types import NoneType # >= 3.10
|
||||
except ImportError:
|
||||
NoneType = type(None)
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue