Merge branch 'yt-dlp:master' into playsuisse-handle-locale-parameters-from-urls

pull/12466/head
v3DJG6GL 2 months ago committed by GitHub
commit f4e4ab0c55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -720,6 +720,15 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
rh, Request(
f'http://127.0.0.1:{self.http_port}/headers', proxies={'all': 'http://10.255.255.255'})).close()
@pytest.mark.skip_handlers_if(lambda _, handler: handler not in ['Urllib', 'CurlCFFI'], 'handler does not support keep_header_casing')
def test_keep_header_casing(self, handler):
with handler() as rh:
res = validate_and_send(
rh, Request(
f'http://127.0.0.1:{self.http_port}/headers', headers={'X-test-heaDer': 'test'}, extensions={'keep_header_casing': True})).read().decode()
assert 'X-test-heaDer: test' in res
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
class TestClientCertificate:
@ -1289,6 +1298,7 @@ class TestRequestHandlerValidation:
({'legacy_ssl': False}, False),
({'legacy_ssl': True}, False),
({'legacy_ssl': 'notabool'}, AssertionError),
({'keep_header_casing': True}, UnsupportedRequest),
]),
('Requests', 'http', [
({'cookiejar': 'notacookiejar'}, AssertionError),
@ -1299,6 +1309,9 @@ class TestRequestHandlerValidation:
({'legacy_ssl': False}, False),
({'legacy_ssl': True}, False),
({'legacy_ssl': 'notabool'}, AssertionError),
({'keep_header_casing': False}, False),
({'keep_header_casing': True}, False),
({'keep_header_casing': 'notabool'}, AssertionError),
]),
('CurlCFFI', 'http', [
({'cookiejar': 'notacookiejar'}, AssertionError),

@ -3,19 +3,20 @@
# Allow direct execution
import os
import sys
import unittest
import unittest.mock
import warnings
import datetime as dt
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import contextlib
import datetime as dt
import io
import itertools
import json
import pickle
import subprocess
import unittest
import unittest.mock
import warnings
import xml.etree.ElementTree
from yt_dlp.compat import (
@ -2087,21 +2088,26 @@ Line 1
headers = HTTPHeaderDict()
headers['ytdl-test'] = b'0'
self.assertEqual(list(headers.items()), [('Ytdl-Test', '0')])
self.assertEqual(list(headers.sensitive().items()), [('ytdl-test', '0')])
headers['ytdl-test'] = 1
self.assertEqual(list(headers.items()), [('Ytdl-Test', '1')])
self.assertEqual(list(headers.sensitive().items()), [('ytdl-test', '1')])
headers['Ytdl-test'] = '2'
self.assertEqual(list(headers.items()), [('Ytdl-Test', '2')])
self.assertEqual(list(headers.sensitive().items()), [('Ytdl-test', '2')])
self.assertTrue('ytDl-Test' in headers)
self.assertEqual(str(headers), str(dict(headers)))
self.assertEqual(repr(headers), str(dict(headers)))
headers.update({'X-dlp': 'data'})
self.assertEqual(set(headers.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data')})
self.assertEqual(set(headers.sensitive().items()), {('Ytdl-test', '2'), ('X-dlp', 'data')})
self.assertEqual(dict(headers), {'Ytdl-Test': '2', 'X-Dlp': 'data'})
self.assertEqual(len(headers), 2)
self.assertEqual(headers.copy(), headers)
headers2 = HTTPHeaderDict({'X-dlp': 'data3'}, **headers, **{'X-dlp': 'data2'})
headers2 = HTTPHeaderDict({'X-dlp': 'data3'}, headers, **{'X-dlP': 'data2'})
self.assertEqual(set(headers2.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data2')})
self.assertEqual(set(headers2.sensitive().items()), {('Ytdl-test', '2'), ('X-dlP', 'data2')})
self.assertEqual(len(headers2), 2)
headers2.clear()
self.assertEqual(len(headers2), 0)
@ -2109,16 +2115,23 @@ Line 1
# ensure we prefer latter headers
headers3 = HTTPHeaderDict({'Ytdl-TeSt': 1}, {'Ytdl-test': 2})
self.assertEqual(set(headers3.items()), {('Ytdl-Test', '2')})
self.assertEqual(set(headers3.sensitive().items()), {('Ytdl-test', '2')})
del headers3['ytdl-tesT']
self.assertEqual(dict(headers3), {})
headers4 = HTTPHeaderDict({'ytdl-test': 'data;'})
self.assertEqual(set(headers4.items()), {('Ytdl-Test', 'data;')})
self.assertEqual(set(headers4.sensitive().items()), {('ytdl-test', 'data;')})
# common mistake: strip whitespace from values
# https://github.com/yt-dlp/yt-dlp/issues/8729
headers5 = HTTPHeaderDict({'ytdl-test': ' data; '})
self.assertEqual(set(headers5.items()), {('Ytdl-Test', 'data;')})
self.assertEqual(set(headers5.sensitive().items()), {('ytdl-test', 'data;')})
# test if picklable
headers6 = HTTPHeaderDict(a=1, b=2)
self.assertEqual(pickle.loads(pickle.dumps(headers6)), headers6)
def test_extract_basic_auth(self):
assert extract_basic_auth('http://:foo.bar') == ('http://:foo.bar', None)

@ -44,7 +44,7 @@ def websocket_handler(websocket):
return websocket.send('2')
elif isinstance(message, str):
if message == 'headers':
return websocket.send(json.dumps(dict(websocket.request.headers)))
return websocket.send(json.dumps(dict(websocket.request.headers.raw_items())))
elif message == 'path':
return websocket.send(websocket.request.path)
elif message == 'source_address':
@ -266,18 +266,18 @@ class TestWebsSocketRequestHandlerConformance:
with handler(cookiejar=cookiejar) as rh:
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
assert HTTPHeaderDict(json.loads(ws.recv()))['cookie'] == 'test=ytdlp'
ws.close()
with handler() as rh:
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert 'cookie' not in json.loads(ws.recv())
assert 'cookie' not in HTTPHeaderDict(json.loads(ws.recv()))
ws.close()
ws = ws_validate_and_send(rh, Request(self.ws_base_url, extensions={'cookiejar': cookiejar}))
ws.send('headers')
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
assert HTTPHeaderDict(json.loads(ws.recv()))['cookie'] == 'test=ytdlp'
ws.close()
@pytest.mark.skip_handler('Websockets', 'Set-Cookie not supported by websockets')
@ -287,7 +287,7 @@ class TestWebsSocketRequestHandlerConformance:
ws_validate_and_send(rh, Request(f'{self.ws_base_url}/get_cookie', extensions={'cookiejar': YoutubeDLCookieJar()}))
ws = ws_validate_and_send(rh, Request(self.ws_base_url, extensions={'cookiejar': YoutubeDLCookieJar()}))
ws.send('headers')
assert 'cookie' not in json.loads(ws.recv())
assert 'cookie' not in HTTPHeaderDict(json.loads(ws.recv()))
ws.close()
@pytest.mark.skip_handler('Websockets', 'Set-Cookie not supported by websockets')
@ -298,12 +298,12 @@ class TestWebsSocketRequestHandlerConformance:
ws_validate_and_send(rh, Request(f'{self.ws_base_url}/get_cookie'))
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
assert HTTPHeaderDict(json.loads(ws.recv()))['cookie'] == 'test=ytdlp'
ws.close()
cookiejar.clear_session_cookies()
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert 'cookie' not in json.loads(ws.recv())
assert 'cookie' not in HTTPHeaderDict(json.loads(ws.recv()))
ws.close()
def test_source_address(self, handler):
@ -341,6 +341,14 @@ class TestWebsSocketRequestHandlerConformance:
assert headers['test3'] == 'test3'
ws.close()
def test_keep_header_casing(self, handler):
with handler(headers=HTTPHeaderDict({'x-TeSt1': 'test'})) as rh:
ws = ws_validate_and_send(rh, Request(self.ws_base_url, headers={'x-TeSt2': 'test'}, extensions={'keep_header_casing': True}))
ws.send('headers')
headers = json.loads(ws.recv())
assert 'x-TeSt1' in headers
assert 'x-TeSt2' in headers
@pytest.mark.parametrize('client_cert', (
{'client_certificate': os.path.join(MTLS_CERT_DIR, 'clientwithkey.crt')},
{

@ -2224,6 +2224,7 @@ from .tvplay import (
TVPlayIE,
)
from .tvplayer import TVPlayerIE
from .tvw import TvwIE
from .tweakers import TweakersIE
from .twentymin import TwentyMinutenIE
from .twentythreevideo import TwentyThreeVideoIE

@ -3,7 +3,7 @@ from ..utils import int_or_none
class CultureUnpluggedIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?cultureunplugged\.com/documentary/watch-online/play/(?P<id>\d+)(?:/(?P<display_id>[^/]+))?'
_VALID_URL = r'https?://(?:www\.)?cultureunplugged\.com/(?:documentary/watch-online/)?play/(?P<id>\d+)(?:/(?P<display_id>[^/#?]+))?'
_TESTS = [{
'url': 'http://www.cultureunplugged.com/documentary/watch-online/play/53662/The-Next--Best-West',
'md5': 'ac6c093b089f7d05e79934dcb3d228fc',
@ -12,12 +12,25 @@ class CultureUnpluggedIE(InfoExtractor):
'display_id': 'The-Next--Best-West',
'ext': 'mp4',
'title': 'The Next, Best West',
'description': 'md5:0423cd00833dea1519cf014e9d0903b1',
'description': 'md5:770033a3b7c2946a3bcfb7f1c6fb7045',
'thumbnail': r're:^https?://.*\.jpg$',
'creator': 'Coldstream Creative',
'creators': ['Coldstream Creative'],
'duration': 2203,
'view_count': int,
},
}, {
'url': 'https://www.cultureunplugged.com/play/2833/Koi-Sunta-Hai--Journeys-with-Kumar---Kabir--Someone-is-Listening-',
'md5': 'dc2014bc470dfccba389a1c934fa29fa',
'info_dict': {
'id': '2833',
'display_id': 'Koi-Sunta-Hai--Journeys-with-Kumar---Kabir--Someone-is-Listening-',
'ext': 'mp4',
'title': 'Koi Sunta Hai: Journeys with Kumar & Kabir (Someone is Listening)',
'description': 'md5:fa94ac934927c98660362b8285b2cda5',
'view_count': int,
'thumbnail': 'https://s3.amazonaws.com/cdn.cultureunplugged.com/thumbnails_16_9/lg/2833.jpg',
'creators': ['Srishti'],
},
}, {
'url': 'http://www.cultureunplugged.com/documentary/watch-online/play/53662',
'only_matching': True,

@ -100,7 +100,7 @@ class DailymotionBaseInfoExtractor(InfoExtractor):
class DailymotionIE(DailymotionBaseInfoExtractor):
_VALID_URL = r'''(?ix)
https?://
(?:https?:)?//
(?:
dai\.ly/|
(?:
@ -116,7 +116,7 @@ class DailymotionIE(DailymotionBaseInfoExtractor):
(?P<id>[^/?_&#]+)(?:[\w-]*\?playlist=(?P<playlist_id>x[0-9a-z]+))?
'''
IE_NAME = 'dailymotion'
_EMBED_REGEX = [r'<(?:(?:embed|iframe)[^>]+?src=|input[^>]+id=[\'"]dmcloudUrlEmissionSelect[\'"][^>]+value=)(["\'])(?P<url>(?:https?:)?//(?:www\.)?dailymotion\.com/(?:embed|swf)/video/.+?)\1']
_EMBED_REGEX = [rf'(?ix)<(?:(?:embed|iframe)[^>]+?src=|input[^>]+id=[\'"]dmcloudUrlEmissionSelect[\'"][^>]+value=)["\'](?P<url>{_VALID_URL[5:]})']
_TESTS = [{
'url': 'http://www.dailymotion.com/video/x5kesuj_office-christmas-party-review-jason-bateman-olivia-munn-t-j-miller_news',
'md5': '074b95bdee76b9e3654137aee9c79dfe',
@ -308,6 +308,25 @@ class DailymotionIE(DailymotionBaseInfoExtractor):
'description': 'Que lindura',
'tags': [],
},
}, {
# //geo.dailymotion.com/player/xysxq.html?video=k2Y4Mjp7krAF9iCuINM
'url': 'https://lcp.fr/programmes/avant-la-catastrophe-la-naissance-de-la-dictature-nazie-1933-1936-346819',
'info_dict': {
'id': 'k2Y4Mjp7krAF9iCuINM',
'ext': 'mp4',
'title': 'Avant la catastrophe la naissance de la dictature nazie 1933 -1936',
'description': 'md5:7b620d5e26edbe45f27bbddc1c0257c1',
'uploader': 'LCP Assemblée nationale',
'uploader_id': 'xbz33d',
'view_count': int,
'like_count': int,
'age_limit': 0,
'duration': 3220,
'thumbnail': 'https://s1.dmcdn.net/v/Xvumk1djJBUZfjj2a/x1080',
'tags': [],
'timestamp': 1739919947,
'upload_date': '20250218',
},
}]
_GEO_BYPASS = False
_COMMON_MEDIA_FIELDS = '''description

@ -1,35 +1,36 @@
from .common import InfoExtractor
from ..utils import parse_age_limit, parse_duration, traverse_obj
from ..utils import parse_age_limit, parse_duration, url_or_none
from ..utils.traversal import traverse_obj
class MagellanTVIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?magellantv\.com/(?:watch|video)/(?P<id>[\w-]+)'
_TESTS = [{
'url': 'https://www.magellantv.com/watch/my-dads-on-death-row?type=v',
'url': 'https://www.magellantv.com/watch/incas-the-new-story?type=v',
'info_dict': {
'id': 'my-dads-on-death-row',
'id': 'incas-the-new-story',
'ext': 'mp4',
'title': 'My Dad\'s On Death Row',
'description': 'md5:33ba23b9f0651fc4537ed19b1d5b0d7a',
'duration': 3780.0,
'title': 'Incas: The New Story',
'description': 'md5:936c7f6d711c02dfb9db22a067b586fe',
'age_limit': 14,
'tags': ['Justice', 'Reality', 'United States', 'True Crime'],
'duration': 3060.0,
'tags': ['Ancient History', 'Archaeology', 'Anthropology'],
},
'params': {'skip_download': 'm3u8'},
}, {
'url': 'https://www.magellantv.com/video/james-bulger-the-new-revelations',
'url': 'https://www.magellantv.com/video/tortured-to-death-murdering-the-nanny',
'info_dict': {
'id': 'james-bulger-the-new-revelations',
'id': 'tortured-to-death-murdering-the-nanny',
'ext': 'mp4',
'title': 'James Bulger: The New Revelations',
'description': 'md5:7b97922038bad1d0fe8d0470d8a189f2',
'title': 'Tortured to Death: Murdering the Nanny',
'description': 'md5:d87033594fa218af2b1a8b49f52511e5',
'age_limit': 14,
'duration': 2640.0,
'age_limit': 0,
'tags': ['Investigation', 'True Crime', 'Justice', 'Europe'],
'tags': ['True Crime', 'Murder'],
},
'params': {'skip_download': 'm3u8'},
}, {
'url': 'https://www.magellantv.com/watch/celebration-nation',
'url': 'https://www.magellantv.com/watch/celebration-nation?type=s',
'info_dict': {
'id': 'celebration-nation',
'ext': 'mp4',
@ -43,10 +44,19 @@ class MagellanTVIE(InfoExtractor):
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id)
data = traverse_obj(self._search_nextjs_data(webpage, video_id), (
'props', 'pageProps', 'reactContext',
(('video', 'detail'), ('series', 'currentEpisode')), {dict}), get_all=False)
formats, subtitles = self._extract_m3u8_formats_and_subtitles(data['jwpVideoUrl'], video_id)
context = self._search_nextjs_data(webpage, video_id)['props']['pageProps']['reactContext']
data = traverse_obj(context, ((('video', 'detail'), ('series', 'currentEpisode')), {dict}, any))
formats, subtitles = [], {}
for m3u8_url in set(traverse_obj(data, ((('manifests', ..., 'hls'), 'jwp_video_url'), {url_or_none}))):
fmts, subs = self._extract_m3u8_formats_and_subtitles(
m3u8_url, video_id, 'mp4', m3u8_id='hls', fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
if not formats and (error := traverse_obj(context, ('errorDetailPage', 'errorMessage', {str}))):
if 'available in your country' in error:
self.raise_geo_restricted(msg=error)
self.raise_no_formats(f'{self.IE_NAME} said: {error}', expected=True)
return {
'id': video_id,

@ -0,0 +1,117 @@
import json
from .common import InfoExtractor
from ..utils import clean_html, remove_end, unified_timestamp, url_or_none
from ..utils.traversal import traverse_obj
class TvwIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?tvw\.org/video/(?P<id>[^/?#]+)'
_TESTS = [{
'url': 'https://tvw.org/video/billy-frank-jr-statue-maquette-unveiling-ceremony-2024011211/',
'md5': '9ceb94fe2bb7fd726f74f16356825703',
'info_dict': {
'id': '2024011211',
'ext': 'mp4',
'title': 'Billy Frank Jr. Statue Maquette Unveiling Ceremony',
'thumbnail': r're:^https?://.*\.(?:jpe?g|png)$',
'description': 'md5:58a8150017d985b4f377e11ee8f6f36e',
'timestamp': 1704902400,
'upload_date': '20240110',
'location': 'Legislative Building',
'display_id': 'billy-frank-jr-statue-maquette-unveiling-ceremony-2024011211',
'categories': ['General Interest'],
},
}, {
'url': 'https://tvw.org/video/ebeys-landing-state-park-2024081007/',
'md5': '71e87dae3deafd65d75ff3137b9a32fc',
'info_dict': {
'id': '2024081007',
'ext': 'mp4',
'title': 'Ebey\'s Landing State Park',
'thumbnail': r're:^https?://.*\.(?:jpe?g|png)$',
'description': 'md5:50c5bd73bde32fa6286a008dbc853386',
'timestamp': 1724310900,
'upload_date': '20240822',
'location': 'Ebeys Landing State Park',
'display_id': 'ebeys-landing-state-park-2024081007',
'categories': ['Washington State Parks'],
},
}, {
'url': 'https://tvw.org/video/home-warranties-workgroup-2',
'md5': 'f678789bf94d07da89809f213cf37150',
'info_dict': {
'id': '1999121000',
'ext': 'mp4',
'title': 'Home Warranties Workgroup',
'thumbnail': r're:^https?://.*\.(?:jpe?g|png)$',
'description': 'md5:861396cc523c9641d0dce690bc5c35f3',
'timestamp': 946389600,
'upload_date': '19991228',
'display_id': 'home-warranties-workgroup-2',
'categories': ['Legislative'],
},
}, {
'url': 'https://tvw.org/video/washington-to-washington-a-new-space-race-2022041111/?eventID=2022041111',
'md5': '6f5551090b351aba10c0d08a881b4f30',
'info_dict': {
'id': '2022041111',
'ext': 'mp4',
'title': 'Washington to Washington - A New Space Race',
'thumbnail': r're:^https?://.*\.(?:jpe?g|png)$',
'description': 'md5:f65a24eec56107afbcebb3aa5cd26341',
'timestamp': 1650394800,
'upload_date': '20220419',
'location': 'Hayner Media Center',
'display_id': 'washington-to-washington-a-new-space-race-2022041111',
'categories': ['Washington to Washington', 'General Interest'],
},
}]
def _real_extract(self, url):
display_id = self._match_id(url)
webpage = self._download_webpage(url, display_id)
client_id = self._html_search_meta('clientID', webpage, fatal=True)
video_id = self._html_search_meta('eventID', webpage, fatal=True)
video_data = self._download_json(
'https://api.v3.invintus.com/v2/Event/getDetailed', video_id,
headers={
'authorization': 'embedder',
'wsc-api-key': '7WhiEBzijpritypp8bqcU7pfU9uicDR',
},
data=json.dumps({
'clientID': client_id,
'eventID': video_id,
'showStreams': True,
}).encode())['data']
formats = []
subtitles = {}
for stream_url in traverse_obj(video_data, ('streamingURIs', ..., {url_or_none})):
fmts, subs = self._extract_m3u8_formats_and_subtitles(
stream_url, video_id, 'mp4', m3u8_id='hls', fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
if caption_url := traverse_obj(video_data, ('captionPath', {url_or_none})):
subtitles.setdefault('en', []).append({'url': caption_url, 'ext': 'vtt'})
return {
'id': video_id,
'display_id': display_id,
'formats': formats,
'subtitles': subtitles,
'title': remove_end(self._og_search_title(webpage, default=None), ' - TVW'),
'description': self._og_search_description(webpage, default=None),
**traverse_obj(video_data, {
'title': ('title', {str}),
'description': ('description', {clean_html}),
'categories': ('categories', ..., {str}),
'thumbnail': ('videoThumbnail', {url_or_none}),
'timestamp': ('startDateTime', {unified_timestamp}),
'location': ('locationName', {str}),
'is_live': ('eventStatus', {lambda x: x == 'live'}),
}),
}

@ -4266,6 +4266,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
} for range_start in range(0, f['filesize'], CHUNK_SIZE))
for fmt in streaming_formats:
client_name = fmt[STREAMING_DATA_CLIENT_NAME]
if fmt.get('targetDurationSec'):
continue
@ -4310,6 +4311,12 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
fmt_url = url_or_none(try_get(sc, lambda x: x['url'][0]))
encrypted_sig = try_get(sc, lambda x: x['s'][0])
if not all((sc, fmt_url, player_url, encrypted_sig)):
self.report_warning(
f'Some {client_name} client formats have been skipped as they are missing a url. '
f'{"Your account" if self.is_authenticated else "The current session"} may have '
f'the SSAP (server-side ads) experiment which may be interfering with yt-dlp. '
f'Please see https://github.com/yt-dlp/yt-dlp/issues/12482 for more details.',
only_once=True)
continue
try:
fmt_url += '&{}={}'.format(
@ -4356,7 +4363,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self.report_warning(
f'{video_id}: Some formats are possibly damaged. They will be deprioritized', only_once=True)
client_name = fmt[STREAMING_DATA_CLIENT_NAME]
po_token = fmt.get(STREAMING_DATA_INITIAL_PO_TOKEN)
if po_token:

@ -296,6 +296,7 @@ class RequestsRH(RequestHandler, InstanceStoreMixin):
extensions.pop('cookiejar', None)
extensions.pop('timeout', None)
extensions.pop('legacy_ssl', None)
extensions.pop('keep_header_casing', None)
def _create_instance(self, cookiejar, legacy_ssl_support=None):
session = RequestsSession()
@ -312,11 +313,12 @@ class RequestsRH(RequestHandler, InstanceStoreMixin):
session.trust_env = False # no need, we already load proxies from env
return session
def _send(self, request):
headers = self._merge_headers(request.headers)
def _prepare_headers(self, _, headers):
add_accept_encoding_header(headers, SUPPORTED_ENCODINGS)
def _send(self, request):
headers = self._get_headers(request)
max_redirects_exceeded = False
session = self._get_instance(

@ -379,13 +379,15 @@ class UrllibRH(RequestHandler, InstanceStoreMixin):
opener.addheaders = []
return opener
def _send(self, request):
headers = self._merge_headers(request.headers)
def _prepare_headers(self, _, headers):
add_accept_encoding_header(headers, SUPPORTED_ENCODINGS)
def _send(self, request):
headers = self._get_headers(request)
urllib_req = urllib.request.Request(
url=request.url,
data=request.data,
headers=dict(headers),
headers=headers,
method=request.method,
)

@ -116,6 +116,7 @@ class WebsocketsRH(WebSocketRequestHandler):
extensions.pop('timeout', None)
extensions.pop('cookiejar', None)
extensions.pop('legacy_ssl', None)
extensions.pop('keep_header_casing', None)
def close(self):
# Remove the logging handler that contains a reference to our logger
@ -123,15 +124,16 @@ class WebsocketsRH(WebSocketRequestHandler):
for name, handler in self.__logging_handlers.items():
logging.getLogger(name).removeHandler(handler)
def _send(self, request):
timeout = self._calculate_timeout(request)
headers = self._merge_headers(request.headers)
def _prepare_headers(self, request, headers):
if 'cookie' not in headers:
cookiejar = self._get_cookiejar(request)
cookie_header = cookiejar.get_cookie_header(request.url)
if cookie_header:
headers['cookie'] = cookie_header
def _send(self, request):
timeout = self._calculate_timeout(request)
headers = self._get_headers(request)
wsuri = parse_uri(request.url)
create_conn_kwargs = {
'source_address': (self.source_address, 0) if self.source_address else None,

@ -206,6 +206,7 @@ class RequestHandler(abc.ABC):
- `cookiejar`: Cookiejar to use for this request.
- `timeout`: socket timeout to use for this request.
- `legacy_ssl`: Enable legacy SSL options for this request. See legacy_ssl_support.
- `keep_header_casing`: Keep the casing of headers when sending the request.
To enable these, add extensions.pop('<extension>', None) to _check_extensions
Apart from the url protocol, proxies dict may contain the following keys:
@ -259,6 +260,23 @@ class RequestHandler(abc.ABC):
def _merge_headers(self, request_headers):
return HTTPHeaderDict(self.headers, request_headers)
def _prepare_headers(self, request: Request, headers: HTTPHeaderDict) -> None: # noqa: B027
"""Additional operations to prepare headers before building. To be extended by subclasses.
@param request: Request object
@param headers: Merged headers to prepare
"""
def _get_headers(self, request: Request) -> dict[str, str]:
"""
Get headers for external use.
Subclasses may define a _prepare_headers method to modify headers after merge but before building.
"""
headers = self._merge_headers(request.headers)
self._prepare_headers(request, headers)
if request.extensions.get('keep_header_casing'):
return headers.sensitive()
return dict(headers)
def _calculate_timeout(self, request):
return float(request.extensions.get('timeout') or self.timeout)
@ -317,6 +335,7 @@ class RequestHandler(abc.ABC):
assert isinstance(extensions.get('cookiejar'), (YoutubeDLCookieJar, NoneType))
assert isinstance(extensions.get('timeout'), (float, int, NoneType))
assert isinstance(extensions.get('legacy_ssl'), (bool, NoneType))
assert isinstance(extensions.get('keep_header_casing'), (bool, NoneType))
def _validate(self, request):
self._check_url_scheme(request)

@ -5,11 +5,11 @@ from abc import ABC
from dataclasses import dataclass
from typing import Any
from .common import RequestHandler, register_preference
from .common import RequestHandler, register_preference, Request
from .exceptions import UnsupportedRequest
from ..compat.types import NoneType
from ..utils import classproperty, join_nonempty
from ..utils.networking import std_headers
from ..utils.networking import std_headers, HTTPHeaderDict
@dataclass(order=True, frozen=True)
@ -123,7 +123,17 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
"""Get the requested target for the request"""
return self._resolve_target(request.extensions.get('impersonate') or self.impersonate)
def _get_impersonate_headers(self, request):
def _prepare_impersonate_headers(self, request: Request, headers: HTTPHeaderDict) -> None: # noqa: B027
"""Additional operations to prepare headers before building. To be extended by subclasses.
@param request: Request object
@param headers: Merged headers to prepare
"""
def _get_impersonate_headers(self, request: Request) -> dict[str, str]:
"""
Get headers for external impersonation use.
Subclasses may define a _prepare_impersonate_headers method to modify headers after merge but before building.
"""
headers = self._merge_headers(request.headers)
if self._get_request_target(request) is not None:
# remove all headers present in std_headers
@ -131,7 +141,11 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
for k, v in std_headers.items():
if headers.get(k) == v:
headers.pop(k)
return headers
self._prepare_impersonate_headers(request, headers)
if request.extensions.get('keep_header_casing'):
return headers.sensitive()
return dict(headers)
@register_preference(ImpersonateRequestHandler)

@ -1,9 +1,16 @@
from __future__ import annotations
import collections
import collections.abc
import random
import typing
import urllib.parse
import urllib.request
from ._utils import remove_start
if typing.TYPE_CHECKING:
T = typing.TypeVar('T')
from ._utils import NO_DEFAULT, remove_start
def random_user_agent():
@ -51,32 +58,141 @@ def random_user_agent():
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
class HTTPHeaderDict(collections.UserDict, dict):
class HTTPHeaderDict(dict):
"""
Store and access keys case-insensitively.
The constructor can take multiple dicts, in which keys in the latter are prioritised.
Retains a case sensitive mapping of the headers, which can be accessed via `.sensitive()`.
"""
def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> typing.Self:
obj = dict.__new__(cls, *args, **kwargs)
obj.__sensitive_map = {}
return obj
def __init__(self, *args, **kwargs):
def __init__(self, /, *args, **kwargs):
super().__init__()
for dct in args:
if dct is not None:
self.update(dct)
self.update(kwargs)
self.__sensitive_map = {}
for dct in filter(None, args):
self.update(dct)
if kwargs:
self.update(kwargs)
def sensitive(self, /) -> dict[str, str]:
return {
self.__sensitive_map[key]: value
for key, value in self.items()
}
def __contains__(self, key: str, /) -> bool:
return super().__contains__(key.title() if isinstance(key, str) else key)
def __delitem__(self, key: str, /) -> None:
key = key.title()
del self.__sensitive_map[key]
super().__delitem__(key)
def __setitem__(self, key, value):
def __getitem__(self, key, /) -> str:
return super().__getitem__(key.title())
def __ior__(self, other, /):
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
self.update(other)
return
return NotImplemented
def __or__(self, other, /) -> typing.Self:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
return type(self)(self.sensitive(), other)
return NotImplemented
def __ror__(self, other, /) -> typing.Self:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
return type(self)(other, self.sensitive())
return NotImplemented
def __setitem__(self, key: str, value, /) -> None:
if isinstance(value, bytes):
value = value.decode('latin-1')
super().__setitem__(key.title(), str(value).strip())
key_title = key.title()
self.__sensitive_map[key_title] = key
super().__setitem__(key_title, str(value).strip())
def __getitem__(self, key):
return super().__getitem__(key.title())
def clear(self, /) -> None:
self.__sensitive_map.clear()
super().clear()
def __delitem__(self, key):
super().__delitem__(key.title())
def copy(self, /) -> typing.Self:
return type(self)(self.sensitive())
def __contains__(self, key):
return super().__contains__(key.title() if isinstance(key, str) else key)
@typing.overload
def get(self, key: str, /) -> str | None: ...
@typing.overload
def get(self, key: str, /, default: T) -> str | T: ...
def get(self, key, /, default=NO_DEFAULT):
key = key.title()
if default is NO_DEFAULT:
return super().get(key)
return super().get(key, default)
@typing.overload
def pop(self, key: str, /) -> str: ...
@typing.overload
def pop(self, key: str, /, default: T) -> str | T: ...
def pop(self, key, /, default=NO_DEFAULT):
key = key.title()
if default is NO_DEFAULT:
self.__sensitive_map.pop(key)
return super().pop(key)
self.__sensitive_map.pop(key, default)
return super().pop(key, default)
def popitem(self) -> tuple[str, str]:
self.__sensitive_map.popitem()
return super().popitem()
@typing.overload
def setdefault(self, key: str, /) -> str: ...
@typing.overload
def setdefault(self, key: str, /, default) -> str: ...
def setdefault(self, key, /, default=None) -> str:
key = key.title()
if key in self.__sensitive_map:
return super().__getitem__(key)
self[key] = default or ''
return self[key]
def update(self, other, /, **kwargs) -> None:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, collections.abc.Mapping):
for key, value in other.items():
self[key] = value
elif hasattr(other, 'keys'):
for key in other.keys(): # noqa: SIM118
self[key] = other[key]
else:
for key, value in other:
self[key] = value
for key, value in kwargs.items():
self[key] = value
std_headers = HTTPHeaderDict({

Loading…
Cancel
Save