@ -15,7 +15,6 @@ import email.utils
import email . header
import errno
import functools
import gzip
import inspect
import io
import itertools
@ -42,6 +41,7 @@ from .compat import (
compat_HTMLParseError ,
compat_HTMLParser ,
compat_basestring ,
compat_brotli as brotli ,
compat_casefold ,
compat_chr ,
compat_collections_abc ,
@ -55,6 +55,7 @@ from .compat import (
compat_http_client ,
compat_integer_types ,
compat_kwargs ,
compat_ncompress as ncompress ,
compat_os_name ,
compat_re_Match ,
compat_re_Pattern ,
@ -2638,11 +2639,44 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
req )
@staticmethod
def deflate ( data ) :
def deflate _gz ( data ) :
try :
return zlib . decompress ( data , - zlib . MAX_WBITS )
# format:zlib,gzip + windowsize:32768
return data and zlib . decompress ( data , 32 + zlib . MAX_WBITS )
except zlib . error :
return zlib . decompress ( data )
# raw zlib * windowsize:32768 (RFC 9110: "non-conformant")
return zlib . decompress ( data , - zlib . MAX_WBITS )
@staticmethod
def gzip ( data ) :
from gzip import GzipFile
def _gzip ( data ) :
with io . BytesIO ( data ) as data_buf :
gz = GzipFile ( fileobj = data_buf , mode = ' rb ' )
return gz . read ( )
try :
return _gzip ( data )
except IOError as original_ioerror :
# There may be junk at the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details
for i in range ( 1 , 1024 ) :
try :
return _gzip ( data [ : - i ] )
except IOError :
continue
else :
raise original_ioerror
@staticmethod
def brotli ( data ) :
return data and brotli . decompress ( data )
@staticmethod
def compress ( data ) :
return data and ncompress . decompress ( data )
def http_request ( self , req ) :
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
@ -2679,33 +2713,59 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
def http_response ( self , req , resp ) :
old_resp = resp
# gzip
if resp . headers . get ( ' Content-encoding ' , ' ' ) == ' gzip ' :
content = resp . read ( )
gz = gzip . GzipFile ( fileobj = io . BytesIO ( content ) , mode = ' rb ' )
try :
uncompressed = io . BytesIO ( gz . read ( ) )
except IOError as original_ioerror :
# There may be junk at the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details
for i in range ( 1 , 1024 ) :
# Content-Encoding header lists the encodings in order that they were applied [1].
# To decompress, we simply do the reverse.
# [1]: https://datatracker.ietf.org/doc/html/rfc9110#name-content-encoding
decoded_response = None
decoders = {
' gzip ' : self . deflate_gz ,
' deflate ' : self . deflate_gz ,
}
if brotli :
decoders [ ' br ' ] = self . brotli
if ncompress :
decoders [ ' compress ' ] = self . compress
if sys . platform . startswith ( ' java ' ) :
# Jython zlib implementation misses gzip
decoders [ ' gzip ' ] = self . gzip
def encodings ( hdrs ) :
# A header field that allows multiple values can have multiple instances [2].
# [2]: https://datatracker.ietf.org/doc/html/rfc9110#name-fields
for e in reversed ( ' , ' . join ( hdrs ) . split ( ' , ' ) ) :
if e :
yield e . strip ( )
encodings_left = [ ]
try :
gz = gzip . GzipFile ( fileobj = io . BytesIO ( content [ : - i ] ) , mode = ' rb ' )
uncompressed = io . BytesIO ( gz . read ( ) )
except IOError :
resp . headers . get_all
hdrs = resp . headers
except AttributeError :
# Py2 has no get_all() method: headers are rfc822.Message
from email . message import Message
hdrs = Message ( )
for k , v in resp . headers . items ( ) :
hdrs [ k ] = v
decoder , decoded_response = True , None
for encoding in encodings ( hdrs . get_all ( ' Content-Encoding ' , [ ] ) ) :
# "SHOULD consider" x-compress, x-gzip as compress, gzip
decoder = decoder and decoders . get ( remove_start ( encoding , ' x- ' ) )
if not decoder :
encodings_left . insert ( 0 , encoding )
continue
break
else :
raise original_ioerror
resp = compat_urllib_request . addinfourl ( uncompressed , old_resp . headers , old_resp . url , old_resp . code )
decoded_response = decoder ( decoded_response or resp . read ( ) )
if decoded_response is not Non e:
resp = compat_urllib_request . addinfourl (
io . BytesIO ( decoded_response ) , old_resp . headers , old_resp . url , old_resp . code )
resp . msg = old_resp . msg
del resp . headers [ ' Content-encoding ' ]
# deflate
if resp . headers . get ( ' Content-encoding ' , ' ' ) == ' deflate ' :
gz = io . BytesIO ( self . deflate ( resp . read ( ) ) )
resp = compat_urllib_request . addinfourl ( gz , old_resp . headers , old_resp . url , old_resp . code )
resp . msg = old_resp . msg
del resp . headers [ ' Content-encoding ' ]
del resp . headers [ ' Content-Length ' ]
resp . headers [ ' Content-Length ' ] = ' %d ' % len ( decoded_response )
del resp . headers [ ' Content-Encoding ' ]
if encodings_left :
resp . headers [ ' Content-Encoding ' ] = ' , ' . join ( encodings_left )
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/ytdl-org/youtube-dl/issues/6457).
if 300 < = resp . code < 400 :