[core] Update redirect handling from yt-dlp

* Thx coletdjnz: https://github.com/yt-dlp/yt-dlp/pull/7094
* add test that redirected `POST` loses its `Content-Type`
pull/32015/merge
dirkf 2 years ago
parent 648dc5304c
commit 46fde7caee

@ -8,33 +8,160 @@ import sys
import unittest import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import gzip
import io
import ssl
import tempfile
import threading
import zlib
# avoid deprecated alias assertRaisesRegexp
if hasattr(unittest.TestCase, 'assertRaisesRegex'):
unittest.TestCase.assertRaisesRegexp = unittest.TestCase.assertRaisesRegex
try:
import brotli
except ImportError:
brotli = None
try:
from urllib.request import pathname2url
except ImportError:
from urllib import pathname2url
from youtube_dl.compat import (
compat_http_cookiejar_Cookie,
compat_http_server,
compat_str as str,
compat_urllib_error,
compat_urllib_HTTPError,
compat_urllib_parse,
compat_urllib_request,
)
from youtube_dl.utils import (
sanitized_Request,
urlencode_postdata,
)
from test.helper import ( from test.helper import (
FakeYDL,
FakeLogger, FakeLogger,
http_server_port, http_server_port,
) )
from youtube_dl import YoutubeDL from youtube_dl import YoutubeDL
from youtube_dl.compat import compat_http_server, compat_urllib_request
import ssl
import threading
TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DIR = os.path.dirname(os.path.abspath(__file__))
class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
# work-around old/new -style class inheritance
def super(self, meth_name, *args, **kwargs):
from types import MethodType
try:
super()
fn = lambda s, m, *a, **k: getattr(super(), m)(*a, **k)
except TypeError:
fn = lambda s, m, *a, **k: getattr(compat_http_server.BaseHTTPRequestHandler, m)(s, *a, **k)
self.super = MethodType(fn, self)
return self.super(meth_name, *args, **kwargs)
def log_message(self, format, *args): def log_message(self, format, *args):
pass pass
def do_GET(self): def _headers(self):
if self.path == '/video.html': payload = str(self.headers).encode('utf-8')
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(payload)))
self.end_headers() self.end_headers()
self.wfile.write(b'<html><video src="/vid.mp4" /></html>') self.wfile.write(payload)
elif self.path == '/vid.mp4':
def _redirect(self):
self.send_response(int(self.path[len('/redirect_'):]))
self.send_header('Location', '/method')
self.send_header('Content-Length', '0')
self.end_headers()
def _method(self, method, payload=None):
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'video/mp4') self.send_header('Content-Length', str(len(payload or '')))
self.send_header('Method', method)
self.end_headers()
if payload:
self.wfile.write(payload)
def _status(self, status):
payload = '<html>{0} NOT FOUND</html>'.format(status).encode('utf-8')
self.send_response(int(status))
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.send_header('Content-Length', str(len(payload)))
self.end_headers() self.end_headers()
self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') self.wfile.write(payload)
def _read_data(self):
if 'Content-Length' in self.headers:
return self.rfile.read(int(self.headers['Content-Length']))
def _test_url(self, path, host='127.0.0.1', scheme='http', port=None):
return '{0}://{1}:{2}/{3}'.format(
scheme, host,
port if port is not None
else http_server_port(self.server), path)
def do_POST(self):
data = self._read_data()
if self.path.startswith('/redirect_'):
self._redirect()
elif self.path.startswith('/method'):
self._method('POST', data)
elif self.path.startswith('/headers'):
self._headers()
else:
self._status(404)
def do_HEAD(self):
if self.path.startswith('/redirect_'):
self._redirect()
elif self.path.startswith('/method'):
self._method('HEAD')
else:
self._status(404)
def do_PUT(self):
data = self._read_data()
if self.path.startswith('/redirect_'):
self._redirect()
elif self.path.startswith('/method'):
self._method('PUT', data)
else:
self._status(404)
def do_GET(self):
def respond(payload=b'<html><video src="/vid.mp4" /></html>',
payload_type='text/html; charset=utf-8',
payload_encoding=None,
resp_code=200):
self.send_response(resp_code)
self.send_header('Content-Type', payload_type)
if payload_encoding:
self.send_header('Content-Encoding', payload_encoding)
self.send_header('Content-Length', str(len(payload))) # required for persistent connections
self.end_headers()
self.wfile.write(payload)
def gzip_compress(p):
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode='wb') as f:
f.write(p)
return buf.getvalue()
if self.path == '/video.html':
respond()
elif self.path == '/vid.mp4':
respond(b'\x00\x00\x00\x00\x20\x66\x74[video]', 'video/mp4')
elif self.path == '/302': elif self.path == '/302':
if sys.version_info[0] == 3: if sys.version_info[0] == 3:
# XXX: Python 3 http server does not allow non-ASCII header values # XXX: Python 3 http server does not allow non-ASCII header values
@ -42,60 +169,284 @@ class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
return return
new_url = 'http://127.0.0.1:%d/中文.html' % http_server_port(self.server) new_url = self._test_url('中文.html')
self.send_response(302) self.send_response(302)
self.send_header(b'Location', new_url.encode('utf-8')) self.send_header(b'Location', new_url.encode('utf-8'))
self.end_headers() self.end_headers()
elif self.path == '/%E4%B8%AD%E6%96%87.html': elif self.path == '/%E4%B8%AD%E6%96%87.html':
self.send_response(200) respond()
self.send_header('Content-Type', 'text/html; charset=utf-8') elif self.path == '/%c7%9f':
respond()
elif self.path.startswith('/redirect_'):
self._redirect()
elif self.path.startswith('/method'):
self._method('GET')
elif self.path.startswith('/headers'):
self._headers()
elif self.path == '/trailing_garbage':
payload = b'<html><video src="/vid.mp4" /></html>'
compressed = gzip_compress(payload) + b'trailing garbage'
respond(compressed, payload_encoding='gzip')
elif self.path == '/302-non-ascii-redirect':
new_url = self._test_url('中文.html')
# actually respond with permanent redirect
self.send_response(301)
self.send_header('Location', new_url)
self.send_header('Content-Length', '0')
self.end_headers() self.end_headers()
self.wfile.write(b'<html><video src="/vid.mp4" /></html>') elif self.path == '/content-encoding':
encodings = self.headers.get('ytdl-encoding', '')
payload = b'<html><video src="/vid.mp4" /></html>'
for encoding in filter(None, (e.strip() for e in encodings.split(','))):
if encoding == 'br' and brotli:
payload = brotli.compress(payload)
elif encoding == 'gzip':
payload = gzip_compress(payload)
elif encoding == 'deflate':
payload = zlib.compress(payload)
elif encoding == 'unsupported':
payload = b'raw'
break
else: else:
assert False self._status(415)
return
respond(payload, payload_encoding=encodings)
else:
self._status(404)
def send_header(self, keyword, value):
"""
Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
This is against what is defined in RFC 3986: but we need to test that we support this
since some sites incorrectly do this.
"""
if keyword.lower() == 'connection':
return self.super('send_header', keyword, value)
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []
self._headers_buffer.append('{0}: {1}\r\n'.format(keyword, value).encode('utf-8'))
def end_headers(self):
if hasattr(self, '_headers_buffer'):
self.wfile.write(b''.join(self._headers_buffer))
self._headers_buffer = []
self.super('end_headers')
class TestHTTP(unittest.TestCase): class TestHTTP(unittest.TestCase):
def setUp(self): def setUp(self):
self.httpd = compat_http_server.HTTPServer( # HTTP server
self.http_httpd = compat_http_server.HTTPServer(
('127.0.0.1', 0), HTTPTestRequestHandler) ('127.0.0.1', 0), HTTPTestRequestHandler)
self.port = http_server_port(self.httpd) self.http_port = http_server_port(self.http_httpd)
self.server_thread = threading.Thread(target=self.httpd.serve_forever)
self.server_thread.daemon = True
self.server_thread.start()
def test_unicode_path_redirection(self): self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
# XXX: Python 3 http server does not allow non-ASCII header values self.http_server_thread.daemon = True
if sys.version_info[0] == 3: self.http_server_thread.start()
return
ydl = YoutubeDL({'logger': FakeLogger()}) try:
r = ydl.extract_info('http://127.0.0.1:%d/302' % self.port) from http.server import ThreadingHTTPServer
self.assertEqual(r['entries'][0]['url'], 'http://127.0.0.1:%d/vid.mp4' % self.port) except ImportError:
try:
from socketserver import ThreadingMixIn
except ImportError:
from SocketServer import ThreadingMixIn
class ThreadingHTTPServer(ThreadingMixIn, compat_http_server.HTTPServer):
pass
class TestHTTPS(unittest.TestCase): # HTTPS server
def setUp(self):
certfn = os.path.join(TEST_DIR, 'testcert.pem') certfn = os.path.join(TEST_DIR, 'testcert.pem')
self.httpd = compat_http_server.HTTPServer( self.https_httpd = ThreadingHTTPServer(
('127.0.0.1', 0), HTTPTestRequestHandler) ('127.0.0.1', 0), HTTPTestRequestHandler)
self.httpd.socket = ssl.wrap_socket( try:
self.httpd.socket, certfile=certfn, server_side=True) sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
self.port = http_server_port(self.httpd) sslctx.verify_mode = ssl.CERT_NONE
self.server_thread = threading.Thread(target=self.httpd.serve_forever) sslctx.check_hostname = False
self.server_thread.daemon = True sslctx.load_cert_chain(certfn, None)
self.server_thread.start() self.https_httpd.socket = sslctx.wrap_socket(
self.https_httpd.socket, server_side=True)
except AttributeError:
self.https_httpd.socket = ssl.wrap_socket(
self.https_httpd.socket, certfile=certfn, server_side=True)
self.https_port = http_server_port(self.https_httpd)
self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
self.https_server_thread.daemon = True
self.https_server_thread.start()
def tearDown(self):
def closer(svr):
def _closer():
svr.shutdown()
svr.server_close()
return _closer
shutdown_thread = threading.Thread(target=closer(self.http_httpd))
shutdown_thread.start()
self.http_server_thread.join(2.0)
shutdown_thread = threading.Thread(target=closer(self.https_httpd))
shutdown_thread.start()
self.https_server_thread.join(2.0)
def _test_url(self, path, host='127.0.0.1', scheme='http', port=None):
return '{0}://{1}:{2}/{3}'.format(
scheme, host,
port if port is not None
else self.https_port if scheme == 'https'
else self.http_port, path)
def test_nocheckcertificate(self): def test_nocheckcertificate(self):
if sys.version_info >= (2, 7, 9): # No certificate checking anyways with FakeYDL({'logger': FakeLogger()}) as ydl:
ydl = YoutubeDL({'logger': FakeLogger()}) with self.assertRaises(compat_urllib_error.URLError):
self.assertRaises( ydl.urlopen(sanitized_Request(self._test_url('headers', scheme='https')))
Exception,
ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port) with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
r = ydl.urlopen(sanitized_Request(self._test_url('headers', scheme='https')))
self.assertEqual(r.getcode(), 200)
r.close()
def test_percent_encode(self):
with FakeYDL() as ydl:
# Unicode characters should be encoded with uppercase percent-encoding
res = ydl.urlopen(sanitized_Request(self._test_url('中文.html')))
self.assertEqual(res.getcode(), 200)
res.close()
# don't normalize existing percent encodings
res = ydl.urlopen(sanitized_Request(self._test_url('%c7%9f')))
self.assertEqual(res.getcode(), 200)
res.close()
def test_unicode_path_redirection(self):
with FakeYDL() as ydl:
r = ydl.urlopen(sanitized_Request(self._test_url('302-non-ascii-redirect')))
self.assertEqual(r.url, self._test_url('%E4%B8%AD%E6%96%87.html'))
r.close()
def test_redirect(self):
with FakeYDL() as ydl:
def do_req(redirect_status, method, check_no_content=False):
data = b'testdata' if method in ('POST', 'PUT') else None
res = ydl.urlopen(sanitized_Request(
self._test_url('redirect_{0}'.format(redirect_status)),
method=method, data=data))
if check_no_content:
self.assertNotIn('Content-Type', res.headers)
return res.read().decode('utf-8'), res.headers.get('method', '')
# A 303 must either use GET or HEAD for subsequent request
self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
# 301 and 302 turn POST only into a GET, with no Content-Type
self.assertEqual(do_req(301, 'POST', True), ('', 'GET'))
self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
self.assertEqual(do_req(302, 'POST', True), ('', 'GET'))
self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}) self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port) self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
self.assertEqual(r['entries'][0]['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port)
# 307 and 308 should not change method
for m in ('POST', 'PUT'):
self.assertEqual(do_req(307, m), ('testdata', m))
self.assertEqual(do_req(308, m), ('testdata', m))
self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
# These should not redirect and instead raise an HTTPError
for code in (300, 304, 305, 306):
with self.assertRaises(compat_urllib_HTTPError):
do_req(code, 'GET')
def test_content_type(self):
# https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
with FakeYDL({'nocheckcertificate': True}) as ydl:
# method should be auto-detected as POST
r = sanitized_Request(self._test_url('headers', scheme='https'), data=urlencode_postdata({'test': 'test'}))
headers = ydl.urlopen(r).read().decode('utf-8')
self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
# test http
r = sanitized_Request(self._test_url('headers'), data=urlencode_postdata({'test': 'test'}))
headers = ydl.urlopen(r).read().decode('utf-8')
self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
def test_cookiejar(self):
with FakeYDL() as ydl:
ydl.cookiejar.set_cookie(compat_http_cookiejar_Cookie(
0, 'test', 'ytdl', None, False, '127.0.0.1', True,
False, '/headers', True, False, None, False, None, None, {}))
data = ydl.urlopen(sanitized_Request(self._test_url('headers'))).read()
self.assertIn(b'Cookie: test=ytdl', data)
def test_no_compression_compat_header(self):
with FakeYDL() as ydl:
data = ydl.urlopen(
sanitized_Request(
self._test_url('headers'),
headers={'Youtubedl-no-compression': True})).read()
self.assertIn(b'Accept-Encoding: identity', data)
self.assertNotIn(b'youtubedl-no-compression', data.lower())
def test_gzip_trailing_garbage(self):
# https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
# https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
with FakeYDL() as ydl:
data = ydl.urlopen(sanitized_Request(self._test_url('trailing_garbage'))).read().decode('utf-8')
self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
def __test_compression(self, encoding):
with FakeYDL() as ydl:
res = ydl.urlopen(
sanitized_Request(
self._test_url('content-encoding'),
headers={'ytdl-encoding': encoding}))
self.assertEqual(res.headers.get('Content-Encoding'), encoding)
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
@unittest.skipUnless(brotli, 'brotli support is not installed')
@unittest.expectedFailure
def test_brotli(self):
self.__test_compression('br')
@unittest.expectedFailure
def test_deflate(self):
self.__test_compression('deflate')
@unittest.expectedFailure
def test_gzip(self):
self.__test_compression('gzip')
@unittest.expectedFailure # not yet implemented
def test_multiple_encodings(self):
# https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
with FakeYDL() as ydl:
for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
res = ydl.urlopen(
sanitized_Request(
self._test_url('content-encoding'),
headers={'ytdl-encoding': pair}))
self.assertEqual(res.headers.get('Content-Encoding'), pair)
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
def test_unsupported_encoding(self):
# it should return the raw content
with FakeYDL() as ydl:
res = ydl.urlopen(
sanitized_Request(
self._test_url('content-encoding'),
headers={'ytdl-encoding': 'unsupported'}))
self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
self.assertEqual(res.read(), b'raw')
def _build_proxy_handler(name): def _build_proxy_handler(name):
@ -109,7 +460,7 @@ def _build_proxy_handler(name):
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8') self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers() self.end_headers()
self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8')) self.wfile.write('{0}: {1}'.format(self.proxy_name, self.path).encode('utf-8'))
return HTTPTestRequestHandler return HTTPTestRequestHandler
@ -129,10 +480,30 @@ class TestProxy(unittest.TestCase):
self.geo_proxy_thread.daemon = True self.geo_proxy_thread.daemon = True
self.geo_proxy_thread.start() self.geo_proxy_thread.start()
def tearDown(self):
def closer(svr):
def _closer():
svr.shutdown()
svr.server_close()
return _closer
shutdown_thread = threading.Thread(target=closer(self.proxy))
shutdown_thread.start()
self.proxy_thread.join(2.0)
shutdown_thread = threading.Thread(target=closer(self.geo_proxy))
shutdown_thread.start()
self.geo_proxy_thread.join(2.0)
def _test_proxy(self, host='127.0.0.1', port=None):
return '{0}:{1}'.format(
host, port if port is not None else self.port)
def test_proxy(self): def test_proxy(self):
geo_proxy = '127.0.0.1:{0}'.format(self.geo_port) geo_proxy = self._test_proxy(port=self.geo_port)
ydl = YoutubeDL({ ydl = YoutubeDL({
'proxy': '127.0.0.1:{0}'.format(self.port), 'proxy': self._test_proxy(),
'geo_verification_proxy': geo_proxy, 'geo_verification_proxy': geo_proxy,
}) })
url = 'http://foo.com/bar' url = 'http://foo.com/bar'
@ -146,7 +517,7 @@ class TestProxy(unittest.TestCase):
def test_proxy_with_idn(self): def test_proxy_with_idn(self):
ydl = YoutubeDL({ ydl = YoutubeDL({
'proxy': '127.0.0.1:{0}'.format(self.port), 'proxy': self._test_proxy(),
}) })
url = 'http://中文.tw/' url = 'http://中文.tw/'
response = ydl.urlopen(url).read().decode('utf-8') response = ydl.urlopen(url).read().decode('utf-8')
@ -154,5 +525,25 @@ class TestProxy(unittest.TestCase):
self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
class TestFileURL(unittest.TestCase):
# See https://github.com/ytdl-org/youtube-dl/issues/8227
def test_file_urls(self):
tf = tempfile.NamedTemporaryFile(delete=False)
tf.write(b'foobar')
tf.close()
url = compat_urllib_parse.urljoin('file://', pathname2url(tf.name))
with FakeYDL() as ydl:
self.assertRaisesRegexp(
compat_urllib_error.URLError, 'file:// scheme is explicitly disabled in youtube-dl for security reasons', ydl.urlopen, url)
# not yet implemented
"""
with FakeYDL({'enable_file_urls': True}) as ydl:
res = ydl.urlopen(url)
self.assertEqual(res.read(), b'foobar')
res.close()
"""
os.unlink(tf.name)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

@ -41,7 +41,6 @@ import zlib
from .compat import ( from .compat import (
compat_HTMLParseError, compat_HTMLParseError,
compat_HTMLParser, compat_HTMLParser,
compat_HTTPError,
compat_basestring, compat_basestring,
compat_casefold, compat_casefold,
compat_chr, compat_chr,
@ -64,6 +63,7 @@ from .compat import (
compat_struct_pack, compat_struct_pack,
compat_struct_unpack, compat_struct_unpack,
compat_urllib_error, compat_urllib_error,
compat_urllib_HTTPError,
compat_urllib_parse, compat_urllib_parse,
compat_urllib_parse_parse_qs as compat_parse_qs, compat_urllib_parse_parse_qs as compat_parse_qs,
compat_urllib_parse_urlencode, compat_urllib_parse_urlencode,
@ -2614,7 +2614,8 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
Part of this code was copied from: Part of this code was copied from:
http://techknack.net/python-urllib2-handlers/ http://techknack.net/python-urllib2-handlers/, archived at
https://web.archive.org/web/20130527205558/http://techknack.net/python-urllib2-handlers/
Andrew Rowls, the author of that code, agreed to release it to the Andrew Rowls, the author of that code, agreed to release it to the
public domain. public domain.
@ -2672,7 +2673,9 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
req._Request__original = req._Request__original.partition('#')[0] req._Request__original = req._Request__original.partition('#')[0]
req._Request__r_type = req._Request__r_type.partition('#')[0] req._Request__r_type = req._Request__r_type.partition('#')[0]
return req # Use the totally undocumented AbstractHTTPHandler per
# https://github.com/yt-dlp/yt-dlp/pull/4158
return compat_urllib_request.AbstractHTTPHandler.do_request_(self, req)
def http_response(self, req, resp): def http_response(self, req, resp):
old_resp = resp old_resp = resp
@ -2683,7 +2686,7 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
try: try:
uncompressed = io.BytesIO(gz.read()) uncompressed = io.BytesIO(gz.read())
except IOError as original_ioerror: except IOError as original_ioerror:
# There may be junk add the end of the file # There may be junk at the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details # See http://stackoverflow.com/q/4928560/35070 for details
for i in range(1, 1024): for i in range(1, 1024):
try: try:
@ -2710,8 +2713,7 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
if location: if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3 # As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
if sys.version_info >= (3, 0): if sys.version_info >= (3, 0):
location = location.encode('iso-8859-1').decode('utf-8') location = location.encode('iso-8859-1')
else:
location = location.decode('utf-8') location = location.decode('utf-8')
location_escaped = escape_url(location) location_escaped = escape_url(location)
if location != location_escaped: if location != location_escaped:
@ -2940,17 +2942,16 @@ class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler):
The code is based on HTTPRedirectHandler implementation from CPython [1]. The code is based on HTTPRedirectHandler implementation from CPython [1].
This redirect handler solves two issues: This redirect handler fixes and improves the logic to better align with RFC7261
- ensures redirect URL is always unicode under python 2 and what browsers tend to do [2][3]
- introduces support for experimental HTTP response status code
308 Permanent Redirect [2] used by some sites [3]
1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py 1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
2. https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/308 2. https://datatracker.ietf.org/doc/html/rfc7231
3. https://github.com/ytdl-org/youtube-dl/issues/28768 3. https://github.com/python/cpython/issues/91306
""" """
http_error_301 = http_error_303 = http_error_307 = http_error_308 = compat_urllib_request.HTTPRedirectHandler.http_error_302 # Supply possibly missing alias
http_error_308 = compat_urllib_request.HTTPRedirectHandler.http_error_302
def redirect_request(self, req, fp, code, msg, headers, newurl): def redirect_request(self, req, fp, code, msg, headers, newurl):
"""Return a Request or None in response to a redirect. """Return a Request or None in response to a redirect.
@ -2962,19 +2963,16 @@ class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler):
else should try to handle this url. Return None if you can't else should try to handle this url. Return None if you can't
but another Handler might. but another Handler might.
""" """
m = req.get_method() if code not in (301, 302, 303, 307, 308):
if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD") raise compat_urllib_HTTPError(req.full_url, code, msg, headers, fp)
or code in (301, 302, 303) and m == "POST")):
raise compat_HTTPError(req.full_url, code, msg, headers, fp) new_method = req.get_method()
# Strictly (according to RFC 2616), 301 or 302 in response to new_data = req.data
# a POST MUST NOT cause a redirection without confirmation remove_headers = []
# from the user (of urllib.request, in this case). In practice,
# essentially all clients do redirect in this case, so we do
# the same.
# On python 2 urlh.geturl() may sometimes return redirect URL # On python 2 urlh.geturl() may sometimes return redirect URL
# as byte string instead of unicode. This workaround allows # as a byte string instead of unicode. This workaround forces
# to force it always return unicode. # it to return unicode.
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
newurl = compat_str(newurl) newurl = compat_str(newurl)
@ -2983,13 +2981,29 @@ class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler):
# but it is kept for compatibility with other callers. # but it is kept for compatibility with other callers.
newurl = newurl.replace(' ', '%20') newurl = newurl.replace(' ', '%20')
CONTENT_HEADERS = ("content-length", "content-type") # A 303 must either use GET or HEAD for subsequent request
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
if code == 303 and req.get_method() != 'HEAD':
new_method = 'GET'
# 301 and 302 redirects are commonly turned into a GET from a POST
# for subsequent requests by browsers, so we'll do the same.
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
elif code in (301, 302) and req.get_method() == 'POST':
new_method = 'GET'
# only remove payload if method changed (e.g. POST to GET)
if new_method != req.get_method():
new_data = None
remove_headers.extend(['Content-Length', 'Content-Type'])
# NB: don't use dict comprehension for python 2.6 compatibility # NB: don't use dict comprehension for python 2.6 compatibility
newheaders = dict((k, v) for k, v in req.headers.items() new_headers = dict((k, v) for k, v in req.header_items()
if k.lower() not in CONTENT_HEADERS) if k.lower() not in remove_headers)
return compat_urllib_request.Request( return compat_urllib_request.Request(
newurl, headers=newheaders, origin_req_host=req.origin_req_host, newurl, headers=new_headers, origin_req_host=req.origin_req_host,
unverifiable=True) unverifiable=True, method=new_method, data=new_data)
def extract_timezone(date_str): def extract_timezone(date_str):

Loading…
Cancel
Save