Merge branch 'master' into boomplay

pull/11252/head
grqx_wsl 4 months ago
commit 98d9edf823

@ -278,7 +278,7 @@ py -m bundle.py2exe
* **`devscripts/update-version.py`** - Update the version number based on the current date. * **`devscripts/update-version.py`** - Update the version number based on the current date.
* **`devscripts/set-variant.py`** - Set the build variant of the executable. * **`devscripts/set-variant.py`** - Set the build variant of the executable.
* **`devscripts/make_changelog.py`** - Create a markdown changelog using short commit messages and update `CONTRIBUTORS` file. * **`devscripts/make_changelog.py`** - Create a markdown changelog using short commit messages and update `CONTRIBUTORS` file.
* **`devscripts/make_lazy_extractors.py`** - Create lazy extractors. Running this before building the binaries (any variant) will improve their startup performance. Set the environment variable `YTDLP_NO_LAZY_EXTRACTORS=1` if you wish to forcefully disable lazy extractor loading. * **`devscripts/make_lazy_extractors.py`** - Create lazy extractors. Running this before building the binaries (any variant) will improve their startup performance. Set the environment variable `YTDLP_NO_LAZY_EXTRACTORS` to something nonempty to forcefully disable lazy extractor loading.
Note: See their `--help` for more info. Note: See their `--help` for more info.
@ -1795,6 +1795,7 @@ The following extractors use this feature:
* `key_query`: Passthrough the master m3u8 URL query to its HLS AES-128 decryption key URI if no value is provided, or else apply the query string given as `key_query=VALUE`. Note that this will have no effect if the key URI is provided via the `hls_key` extractor-arg. Does not apply to ffmpeg * `key_query`: Passthrough the master m3u8 URL query to its HLS AES-128 decryption key URI if no value is provided, or else apply the query string given as `key_query=VALUE`. Note that this will have no effect if the key URI is provided via the `hls_key` extractor-arg. Does not apply to ffmpeg
* `hls_key`: An HLS AES-128 key URI *or* key (as hex), and optionally the IV (as hex), in the form of `(URI|KEY)[,IV]`; e.g. `generic:hls_key=ABCDEF1234567980,0xFEDCBA0987654321`. Passing any of these values will force usage of the native HLS downloader and override the corresponding values found in the m3u8 playlist * `hls_key`: An HLS AES-128 key URI *or* key (as hex), and optionally the IV (as hex), in the form of `(URI|KEY)[,IV]`; e.g. `generic:hls_key=ABCDEF1234567980,0xFEDCBA0987654321`. Passing any of these values will force usage of the native HLS downloader and override the corresponding values found in the m3u8 playlist
* `is_live`: Bypass live HLS detection and manually set `live_status` - a value of `false` will set `not_live`, any other value (or no value) will set `is_live` * `is_live`: Bypass live HLS detection and manually set `live_status` - a value of `false` will set `not_live`, any other value (or no value) will set `is_live`
* `impersonate`: Target(s) to try and impersonate with the initial webpage request; e.g. `safari,chrome-110`. By default any available target will be used. Use `false` to disable impersonation
#### funimation #### funimation
* `language`: Audio languages to extract, e.g. `funimation:language=english,japanese` * `language`: Audio languages to extract, e.g. `funimation:language=english,japanese`
@ -1897,6 +1898,7 @@ In other words, the file structure on the disk looks something like:
myplugin.py myplugin.py
yt-dlp looks for these `yt_dlp_plugins` namespace folders in many locations (see below) and loads in plugins from **all** of them. yt-dlp looks for these `yt_dlp_plugins` namespace folders in many locations (see below) and loads in plugins from **all** of them.
Set the environment variable `YTDLP_NO_PLUGINS` to something nonempty to disable loading plugins entirely.
See the [wiki for some known plugins](https://github.com/yt-dlp/yt-dlp/wiki/Plugins) See the [wiki for some known plugins](https://github.com/yt-dlp/yt-dlp/wiki/Plugins)

@ -2,7 +2,6 @@
# Allow direct execution # Allow direct execution
import os import os
import shutil
import sys import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -34,18 +33,14 @@ MODULE_TEMPLATE = read_file('devscripts/lazy_load_template.py')
def main(): def main():
lazy_extractors_filename = get_filename_args(default_outfile='yt_dlp/extractor/lazy_extractors.py') os.environ['YTDLP_NO_PLUGINS'] = 'true'
if os.path.exists(lazy_extractors_filename): os.environ['YTDLP_NO_LAZY_EXTRACTORS'] = 'true'
os.remove(lazy_extractors_filename)
_ALL_CLASSES = get_all_ies() # Must be before import lazy_extractors_filename = get_filename_args(default_outfile='yt_dlp/extractor/lazy_extractors.py')
import yt_dlp.plugins from yt_dlp.extractor.extractors import _ALL_CLASSES
from yt_dlp.extractor.common import InfoExtractor, SearchInfoExtractor from yt_dlp.extractor.common import InfoExtractor, SearchInfoExtractor
# Filter out plugins
_ALL_CLASSES = [cls for cls in _ALL_CLASSES if not cls.__module__.startswith(f'{yt_dlp.plugins.PACKAGE_NAME}.')]
DummyInfoExtractor = type('InfoExtractor', (InfoExtractor,), {'IE_NAME': NO_ATTR}) DummyInfoExtractor = type('InfoExtractor', (InfoExtractor,), {'IE_NAME': NO_ATTR})
module_src = '\n'.join(( module_src = '\n'.join((
MODULE_TEMPLATE, MODULE_TEMPLATE,
@ -58,20 +53,6 @@ def main():
write_file(lazy_extractors_filename, f'{module_src}\n') write_file(lazy_extractors_filename, f'{module_src}\n')
def get_all_ies():
PLUGINS_DIRNAME = 'ytdlp_plugins'
BLOCKED_DIRNAME = f'{PLUGINS_DIRNAME}_blocked'
if os.path.exists(PLUGINS_DIRNAME):
# os.rename cannot be used, e.g. in Docker. See https://github.com/yt-dlp/yt-dlp/pull/4958
shutil.move(PLUGINS_DIRNAME, BLOCKED_DIRNAME)
try:
from yt_dlp.extractor.extractors import _ALL_CLASSES
finally:
if os.path.exists(BLOCKED_DIRNAME):
shutil.move(BLOCKED_DIRNAME, PLUGINS_DIRNAME)
return _ALL_CLASSES
def extra_ie_code(ie, base=None): def extra_ie_code(ie, base=None):
for var in STATIC_CLASS_PROPERTIES: for var in STATIC_CLASS_PROPERTIES:
val = getattr(ie, var) val = getattr(ie, var)

@ -16,7 +16,7 @@ fix_test_name = functools.partial(re.compile(r'IE(_all|_\d+)?$').sub, r'\1')
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description='Run selected yt-dlp tests') parser = argparse.ArgumentParser(description='Run selected yt-dlp tests')
parser.add_argument( parser.add_argument(
'test', help='a extractor tests, or one of "core" or "download"', nargs='*') 'test', help='an extractor test, test path, or one of "core" or "download"', nargs='*')
parser.add_argument( parser.add_argument(
'-k', help='run a test matching EXPRESSION. Same as "pytest -k"', metavar='EXPRESSION') '-k', help='run a test matching EXPRESSION. Same as "pytest -k"', metavar='EXPRESSION')
parser.add_argument( parser.add_argument(
@ -27,7 +27,6 @@ def parse_args():
def run_tests(*tests, pattern=None, ci=False): def run_tests(*tests, pattern=None, ci=False):
run_core = 'core' in tests or (not pattern and not tests) run_core = 'core' in tests or (not pattern and not tests)
run_download = 'download' in tests run_download = 'download' in tests
tests = list(map(fix_test_name, tests))
pytest_args = args.pytest_args or os.getenv('HATCH_TEST_ARGS', '') pytest_args = args.pytest_args or os.getenv('HATCH_TEST_ARGS', '')
arguments = ['pytest', '-Werror', '--tb=short', *shlex.split(pytest_args)] arguments = ['pytest', '-Werror', '--tb=short', *shlex.split(pytest_args)]
@ -41,7 +40,9 @@ def run_tests(*tests, pattern=None, ci=False):
arguments.extend(['-m', 'download']) arguments.extend(['-m', 'download'])
else: else:
arguments.extend( arguments.extend(
f'test/test_download.py::TestDownload::test_{test}' for test in tests) test if '/' in test
else f'test/test_download.py::TestDownload::test_{fix_test_name(test)}'
for test in tests)
print(f'Running {arguments}', flush=True) print(f'Running {arguments}', flush=True)
try: try:

@ -4,8 +4,18 @@ import xml.etree.ElementTree
import pytest import pytest
from yt_dlp.utils import dict_get, int_or_none, str_or_none from yt_dlp.utils import (
from yt_dlp.utils.traversal import traverse_obj ExtractorError,
determine_ext,
dict_get,
int_or_none,
str_or_none,
)
from yt_dlp.utils.traversal import (
traverse_obj,
require,
subs_list_to_dict,
)
_TEST_DATA = { _TEST_DATA = {
100: 100, 100: 100,
@ -420,6 +430,71 @@ class TestTraversal:
assert traverse_obj(morsel, [(None,), any]) == morsel, \ assert traverse_obj(morsel, [(None,), any]) == morsel, \
'Morsel should not be implicitly changed to dict on usage' 'Morsel should not be implicitly changed to dict on usage'
def test_traversal_filter(self):
data = [None, False, True, 0, 1, 0.0, 1.1, '', 'str', {}, {0: 0}, [], [1]]
assert traverse_obj(data, [..., filter]) == [True, 1, 1.1, 'str', {0: 0}, [1]], \
'`filter` should filter falsy values'
class TestTraversalHelpers:
def test_traversal_require(self):
with pytest.raises(ExtractorError):
traverse_obj(_TEST_DATA, ['None', {require('value')}])
assert traverse_obj(_TEST_DATA, ['str', {require('value')}]) == 'str', \
'`require` should pass through non `None` values'
def test_subs_list_to_dict(self):
assert traverse_obj([
{'name': 'de', 'url': 'https://example.com/subs/de.vtt'},
{'name': 'en', 'url': 'https://example.com/subs/en1.ass'},
{'name': 'en', 'url': 'https://example.com/subs/en2.ass'},
], [..., {
'id': 'name',
'url': 'url',
}, all, {subs_list_to_dict}]) == {
'de': [{'url': 'https://example.com/subs/de.vtt'}],
'en': [
{'url': 'https://example.com/subs/en1.ass'},
{'url': 'https://example.com/subs/en2.ass'},
],
}, 'function should build subtitle dict from list of subtitles'
assert traverse_obj([
{'name': 'de', 'url': 'https://example.com/subs/de.ass'},
{'name': 'de'},
{'name': 'en', 'content': 'content'},
{'url': 'https://example.com/subs/en'},
], [..., {
'id': 'name',
'data': 'content',
'url': 'url',
}, all, {subs_list_to_dict}]) == {
'de': [{'url': 'https://example.com/subs/de.ass'}],
'en': [{'data': 'content'}],
}, 'subs with mandatory items missing should be filtered'
assert traverse_obj([
{'url': 'https://example.com/subs/de.ass', 'name': 'de'},
{'url': 'https://example.com/subs/en', 'name': 'en'},
], [..., {
'id': 'name',
'ext': ['url', {lambda x: determine_ext(x, default_ext=None)}],
'url': 'url',
}, all, {subs_list_to_dict(ext='ext')}]) == {
'de': [{'url': 'https://example.com/subs/de.ass', 'ext': 'ass'}],
'en': [{'url': 'https://example.com/subs/en', 'ext': 'ext'}],
}, '`ext` should set default ext but leave existing value untouched'
assert traverse_obj([
{'name': 'en', 'url': 'https://example.com/subs/en2', 'prio': True},
{'name': 'en', 'url': 'https://example.com/subs/en1', 'prio': False},
], [..., {
'id': 'name',
'quality': ['prio', {int}],
'url': 'url',
}, all, {subs_list_to_dict(ext='ext')}]) == {'en': [
{'url': 'https://example.com/subs/en1', 'ext': 'ext'},
{'url': 'https://example.com/subs/en2', 'ext': 'ext'},
]}, '`quality` key should sort subtitle list accordingly'
class TestDictGet: class TestDictGet:
def test_dict_get(self): def test_dict_get(self):

@ -221,9 +221,10 @@ class TestUtil(unittest.TestCase):
self.assertEqual(sanitize_filename('N0Y__7-UOdI', is_id=True), 'N0Y__7-UOdI') self.assertEqual(sanitize_filename('N0Y__7-UOdI', is_id=True), 'N0Y__7-UOdI')
def test_sanitize_path(self): def test_sanitize_path(self):
if sys.platform != 'win32': with unittest.mock.patch('sys.platform', 'win32'):
return self._test_sanitize_path()
def _test_sanitize_path(self):
self.assertEqual(sanitize_path('abc'), 'abc') self.assertEqual(sanitize_path('abc'), 'abc')
self.assertEqual(sanitize_path('abc/def'), 'abc\\def') self.assertEqual(sanitize_path('abc/def'), 'abc\\def')
self.assertEqual(sanitize_path('abc\\def'), 'abc\\def') self.assertEqual(sanitize_path('abc\\def'), 'abc\\def')
@ -256,6 +257,11 @@ class TestUtil(unittest.TestCase):
self.assertEqual(sanitize_path('./abc'), 'abc') self.assertEqual(sanitize_path('./abc'), 'abc')
self.assertEqual(sanitize_path('./../abc'), '..\\abc') self.assertEqual(sanitize_path('./../abc'), '..\\abc')
self.assertEqual(sanitize_path('\\abc'), '\\abc')
self.assertEqual(sanitize_path('C:abc'), 'C:abc')
self.assertEqual(sanitize_path('C:abc\\..\\'), 'C:..')
self.assertEqual(sanitize_path('C:\\abc:%(title)s.%(ext)s'), 'C:\\abc#%(title)s.%(ext)s')
def test_sanitize_url(self): def test_sanitize_url(self):
self.assertEqual(sanitize_url('//foo.bar'), 'http://foo.bar') self.assertEqual(sanitize_url('//foo.bar'), 'http://foo.bar')
self.assertEqual(sanitize_url('httpss://foo.bar'), 'https://foo.bar') self.assertEqual(sanitize_url('httpss://foo.bar'), 'https://foo.bar')

@ -4070,6 +4070,10 @@ class YoutubeDL:
write_debug(f'Proxy map: {self.proxies}') write_debug(f'Proxy map: {self.proxies}')
write_debug(f'Request Handlers: {", ".join(rh.RH_NAME for rh in self._request_director.handlers.values())}') write_debug(f'Request Handlers: {", ".join(rh.RH_NAME for rh in self._request_director.handlers.values())}')
if os.environ.get('YTDLP_NO_PLUGINS'):
write_debug('Plugins are forcibly disabled')
return
for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items(): for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items():
display_list = ['{}{}'.format( display_list = ['{}{}'.format(
klass.__name__, '' if klass.__name__ == name else f' as {name}') klass.__name__, '' if klass.__name__ == name else f' as {name}')
@ -4120,7 +4124,8 @@ class YoutubeDL:
self.params.get('cookiefile'), self.params.get('cookiesfrombrowser'), self) self.params.get('cookiefile'), self.params.get('cookiesfrombrowser'), self)
except CookieLoadError as error: except CookieLoadError as error:
cause = error.__context__ cause = error.__context__
self.report_error(str(cause), tb=''.join(traceback.format_exception(cause))) # compat: <=py3.9: `traceback.format_exception` has a different signature
self.report_error(str(cause), tb=''.join(traceback.format_exception(None, cause, cause.__traceback__)))
raise raise
@property @property

@ -573,13 +573,13 @@ class InfoExtractor:
def _login_hint(self, method=NO_DEFAULT, netrc=None): def _login_hint(self, method=NO_DEFAULT, netrc=None):
password_hint = f'--username and --password, --netrc-cmd, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials' password_hint = f'--username and --password, --netrc-cmd, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials'
cookies_hint = 'See https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp for how to manually pass cookies'
return { return {
None: '', None: '',
'any': f'Use --cookies, --cookies-from-browser, {password_hint}', 'any': f'Use --cookies, --cookies-from-browser, {password_hint}. {cookies_hint}',
'password': f'Use {password_hint}', 'password': f'Use {password_hint}',
'cookies': ( 'cookies': f'Use --cookies-from-browser or --cookies for the authentication. {cookies_hint}',
'Use --cookies-from-browser or --cookies for the authentication. ' 'session_cookies': f'Use --cookies for the authentication (--cookies-from-browser might not work). {cookies_hint}',
'See https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp for how to manually pass cookies'),
}[method if method is not NO_DEFAULT else 'any' if self.supports_login() else 'cookies'] }[method if method is not NO_DEFAULT else 'any' if self.supports_login() else 'cookies']
def __init__(self, downloader=None): def __init__(self, downloader=None):

@ -6,6 +6,7 @@ from ..utils import (
parse_iso8601, parse_iso8601,
smuggle_url, smuggle_url,
str_or_none, str_or_none,
update_url_query,
) )
@ -98,7 +99,9 @@ class CWTVIE(InfoExtractor):
raise ExtractorError(data['msg'], expected=True) raise ExtractorError(data['msg'], expected=True)
video_data = data['video'] video_data = data['video']
title = video_data['title'] title = video_data['title']
mpx_url = video_data.get('mpx_url') or f'https://link.theplatform.com/s/cwtv/media/guid/2703454149/{video_id}?formats=M3U' mpx_url = update_url_query(
video_data.get('mpx_url') or f'https://link.theplatform.com/s/cwtv/media/guid/2703454149/{video_id}',
{'formats': 'M3U+none'})
season = str_or_none(video_data.get('season')) season = str_or_none(video_data.get('season'))
episode = str_or_none(video_data.get('episode')) episode = str_or_none(video_data.get('episode'))

@ -139,12 +139,11 @@ class DRTVIE(InfoExtractor):
return return
token_response = self._download_json( token_response = self._download_json(
'https://production.dr-massive.com/api/authorization/anonymous-sso', None, 'https://isl.dr-massive.com/api/authorization/anonymous-sso', None,
note='Downloading anonymous token', headers={ note='Downloading anonymous token', headers={
'content-type': 'application/json', 'content-type': 'application/json',
}, query={ }, query={
'device': 'web_browser', 'device': 'phone_android',
'ff': 'idp,ldp,rpt',
'lang': 'da', 'lang': 'da',
'supportFallbackToken': 'true', 'supportFallbackToken': 'true',
}, data=json.dumps({ }, data=json.dumps({

@ -8,6 +8,7 @@ from .common import InfoExtractor
from .commonprotocols import RtmpIE from .commonprotocols import RtmpIE
from .youtube import YoutubeIE from .youtube import YoutubeIE
from ..compat import compat_etree_fromstring from ..compat import compat_etree_fromstring
from ..networking.impersonate import ImpersonateTarget
from ..utils import ( from ..utils import (
KNOWN_EXTENSIONS, KNOWN_EXTENSIONS,
MEDIA_EXTENSIONS, MEDIA_EXTENSIONS,
@ -2373,6 +2374,12 @@ class GenericIE(InfoExtractor):
else: else:
video_id = self._generic_id(url) video_id = self._generic_id(url)
# Try to impersonate a web-browser by default if possible
# Skip impersonation if not available to omit the warning
impersonate = self._configuration_arg('impersonate', [''])
if 'false' in impersonate or not self._downloader._impersonate_target_available(ImpersonateTarget()):
impersonate = None
# Some webservers may serve compressed content of rather big size (e.g. gzipped flac) # Some webservers may serve compressed content of rather big size (e.g. gzipped flac)
# making it impossible to download only chunk of the file (yet we need only 512kB to # making it impossible to download only chunk of the file (yet we need only 512kB to
# test whether it's HTML or not). According to yt-dlp default Accept-Encoding # test whether it's HTML or not). According to yt-dlp default Accept-Encoding
@ -2384,7 +2391,7 @@ class GenericIE(InfoExtractor):
full_response = self._request_webpage(url, video_id, headers=filter_dict({ full_response = self._request_webpage(url, video_id, headers=filter_dict({
'Accept-Encoding': 'identity', 'Accept-Encoding': 'identity',
'Referer': smuggled_data.get('referer'), 'Referer': smuggled_data.get('referer'),
})) }), impersonate=impersonate)
new_url = full_response.url new_url = full_response.url
if new_url != extract_basic_auth(url)[0]: if new_url != extract_basic_auth(url)[0]:
self.report_following_redirect(new_url) self.report_following_redirect(new_url)

@ -55,6 +55,7 @@ class PatreonBaseIE(InfoExtractor):
class PatreonIE(PatreonBaseIE): class PatreonIE(PatreonBaseIE):
IE_NAME = 'patreon'
_VALID_URL = r'https?://(?:www\.)?patreon\.com/(?:creation\?hid=|posts/(?:[\w-]+-)?)(?P<id>\d+)' _VALID_URL = r'https?://(?:www\.)?patreon\.com/(?:creation\?hid=|posts/(?:[\w-]+-)?)(?P<id>\d+)'
_TESTS = [{ _TESTS = [{
'url': 'http://www.patreon.com/creation?hid=743933', 'url': 'http://www.patreon.com/creation?hid=743933',
@ -433,8 +434,12 @@ class PatreonIE(PatreonBaseIE):
class PatreonCampaignIE(PatreonBaseIE): class PatreonCampaignIE(PatreonBaseIE):
IE_NAME = 'patreon:campaign'
_VALID_URL = r'https?://(?:www\.)?patreon\.com/(?!rss)(?:(?:m|api/campaigns)/(?P<campaign_id>\d+)|(?P<vanity>[-\w]+))' _VALID_URL = r'''(?x)
https?://(?:www\.)?patreon\.com/(?:
(?:m|api/campaigns)/(?P<campaign_id>\d+)|
(?P<vanity>(?!creation[?/]|posts/|rss[?/])[\w-]+)
)(?:/posts)?/?(?:$|[?#])'''
_TESTS = [{ _TESTS = [{
'url': 'https://www.patreon.com/dissonancepod/', 'url': 'https://www.patreon.com/dissonancepod/',
'info_dict': { 'info_dict': {
@ -496,10 +501,6 @@ class PatreonCampaignIE(PatreonBaseIE):
'only_matching': True, 'only_matching': True,
}] }]
@classmethod
def suitable(cls, url):
return False if PatreonIE.suitable(url) else super().suitable(url)
def _entries(self, campaign_id): def _entries(self, campaign_id):
cursor = None cursor = None
params = { params = {

@ -1,3 +1,4 @@
import json
import urllib.parse import urllib.parse
from .common import InfoExtractor from .common import InfoExtractor
@ -17,7 +18,7 @@ from ..utils import (
class RedditIE(InfoExtractor): class RedditIE(InfoExtractor):
_NETRC_MACHINE = 'reddit' _NETRC_MACHINE = 'reddit'
_VALID_URL = r'https?://(?P<host>(?:\w+\.)?reddit(?:media)?\.com)/(?P<slug>(?:(?:r|user)/[^/]+/)?comments/(?P<id>[^/?#&]+))' _VALID_URL = r'https?://(?:\w+\.)?reddit(?:media)?\.com/(?P<slug>(?:(?:r|user)/[^/]+/)?comments/(?P<id>[^/?#&]+))'
_TESTS = [{ _TESTS = [{
'url': 'https://www.reddit.com/r/videos/comments/6rrwyj/that_small_heart_attack/', 'url': 'https://www.reddit.com/r/videos/comments/6rrwyj/that_small_heart_attack/',
'info_dict': { 'info_dict': {
@ -251,15 +252,15 @@ class RedditIE(InfoExtractor):
return {'en': [{'url': caption_url}]} return {'en': [{'url': caption_url}]}
def _real_extract(self, url): def _real_extract(self, url):
host, slug, video_id = self._match_valid_url(url).group('host', 'slug', 'id') slug, video_id = self._match_valid_url(url).group('slug', 'id')
try:
data = self._download_json( data = self._download_json(
f'https://{host}/{slug}/.json', video_id, fatal=False, expected_status=403) f'https://www.reddit.com/{slug}/.json', video_id, expected_status=403)
if not data: except ExtractorError as e:
fallback_host = 'old.reddit.com' if host != 'old.reddit.com' else 'www.reddit.com' if isinstance(e.cause, json.JSONDecodeError):
self.to_screen(f'{host} request failed, retrying with {fallback_host}') self.raise_login_required('Account authentication is required')
data = self._download_json( raise
f'https://{fallback_host}/{slug}/.json', video_id, expected_status=403)
if traverse_obj(data, 'error') == 403: if traverse_obj(data, 'error') == 403:
reason = data.get('reason') reason = data.get('reason')

@ -5,6 +5,7 @@ import importlib.machinery
import importlib.util import importlib.util
import inspect import inspect
import itertools import itertools
import os
import pkgutil import pkgutil
import sys import sys
import traceback import traceback
@ -137,6 +138,8 @@ def load_module(module, module_name, suffix):
def load_plugins(name, suffix): def load_plugins(name, suffix):
classes = {} classes = {}
if os.environ.get('YTDLP_NO_PLUGINS'):
return classes
for finder, module_name, _ in iter_modules(name): for finder, module_name, _ in iter_modules(name):
if any(x.startswith('_') for x in module_name.split('.')): if any(x.startswith('_') for x in module_name.split('.')):

@ -664,31 +664,51 @@ def sanitize_filename(s, restricted=False, is_id=NO_DEFAULT):
return result return result
def _sanitize_path_parts(parts):
sanitized_parts = []
for part in parts:
if not part or part == '.':
continue
elif part == '..':
if sanitized_parts and sanitized_parts[-1] != '..':
sanitized_parts.pop()
sanitized_parts.append('..')
continue
# Replace invalid segments with `#`
# - trailing dots and spaces (`asdf...` => `asdf..#`)
# - invalid chars (`<>` => `##`)
sanitized_part = re.sub(r'[/<>:"\|\\?\*]|[\s.]$', '#', part)
sanitized_parts.append(sanitized_part)
return sanitized_parts
def sanitize_path(s, force=False): def sanitize_path(s, force=False):
"""Sanitizes and normalizes path on Windows""" """Sanitizes and normalizes path on Windows"""
# XXX: this handles drive relative paths (c:sth) incorrectly if sys.platform != 'win32':
if sys.platform == 'win32': if not force:
force = False
drive_or_unc, _ = os.path.splitdrive(s)
elif force:
drive_or_unc = ''
else:
return s return s
root = '/' if s.startswith('/') else ''
return root + '/'.join(_sanitize_path_parts(s.split('/')))
normed = s.replace('/', '\\')
if normed.startswith('\\\\'):
# UNC path (`\\SERVER\SHARE`) or device path (`\\.`, `\\?`)
parts = normed.split('\\')
root = '\\'.join(parts[:4]) + '\\'
parts = parts[4:]
elif normed[1:2] == ':':
# absolute path or drive relative path
offset = 3 if normed[2:3] == '\\' else 2
root = normed[:offset]
parts = normed[offset:].split('\\')
else:
# relative/drive root relative path
root = '\\' if normed[:1] == '\\' else ''
parts = normed.split('\\')
norm_path = os.path.normpath(remove_start(s, drive_or_unc)).split(os.path.sep) return root + '\\'.join(_sanitize_path_parts(parts))
if drive_or_unc:
norm_path.pop(0)
sanitized_path = [
path_part if path_part in ['.', '..'] else re.sub(r'(?:[/<>:"\|\\?\*]|[\s.]$)', '#', path_part)
for path_part in norm_path]
if drive_or_unc:
sanitized_path.insert(0, drive_or_unc + os.path.sep)
elif force and s and s[0] == os.path.sep:
sanitized_path.insert(0, os.path.sep)
# TODO: Fix behavioral differences <3.12
# The workaround using `normpath` only superficially passes tests
# Ref: https://github.com/python/cpython/pull/100351
return os.path.normpath(os.path.join(*sanitized_path))
def sanitize_url(url, *, scheme='http'): def sanitize_url(url, *, scheme='http'):
@ -1964,11 +1984,30 @@ def urljoin(base, path):
return urllib.parse.urljoin(base, path) return urllib.parse.urljoin(base, path)
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1): def partial_application(func):
sig = inspect.signature(func)
@functools.wraps(func)
def wrapped(*args, **kwargs):
try:
sig.bind(*args, **kwargs)
except TypeError:
return functools.partial(func, *args, **kwargs)
else:
return func(*args, **kwargs)
return wrapped
@partial_application
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1, base=None):
if get_attr and v is not None: if get_attr and v is not None:
v = getattr(v, get_attr, None) v = getattr(v, get_attr, None)
if invscale == 1 and scale < 1:
invscale = int(1 / scale)
scale = 1
try: try:
return int(v) * invscale // scale return (int(v) if base is None else int(v, base=base)) * invscale // scale
except (ValueError, TypeError, OverflowError): except (ValueError, TypeError, OverflowError):
return default return default
@ -1986,9 +2025,13 @@ def str_to_int(int_str):
return int_or_none(int_str) return int_or_none(int_str)
@partial_application
def float_or_none(v, scale=1, invscale=1, default=None): def float_or_none(v, scale=1, invscale=1, default=None):
if v is None: if v is None:
return default return default
if invscale == 1 and scale < 1:
invscale = int(1 / scale)
scale = 1
try: try:
return float(v) * invscale / scale return float(v) * invscale / scale
except (ValueError, TypeError): except (ValueError, TypeError):

@ -1,18 +1,35 @@
from __future__ import annotations
import collections
import collections.abc import collections.abc
import contextlib import contextlib
import functools
import http.cookies import http.cookies
import inspect import inspect
import itertools import itertools
import re import re
import typing
import xml.etree.ElementTree import xml.etree.ElementTree
from ._utils import ( from ._utils import (
IDENTITY, IDENTITY,
NO_DEFAULT, NO_DEFAULT,
ExtractorError,
LazyList, LazyList,
deprecation_warning, deprecation_warning,
get_elements_html_by_class,
get_elements_html_by_attribute,
get_elements_by_attribute,
get_element_html_by_attribute,
get_element_by_attribute,
get_element_html_by_id,
get_element_by_id,
get_element_html_by_class,
get_elements_by_class,
get_element_text_and_html_by_tag,
is_iterable_like, is_iterable_like,
try_call, try_call,
url_or_none,
variadic, variadic,
) )
@ -54,6 +71,7 @@ def traverse_obj(
Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`. Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
- `any`-builtin: Take the first matching object and return it, resetting branching. - `any`-builtin: Take the first matching object and return it, resetting branching.
- `all`-builtin: Take all matching objects and return them as a list, resetting branching. - `all`-builtin: Take all matching objects and return them as a list, resetting branching.
- `filter`-builtin: Return the value if it is truthy, `None` otherwise.
`tuple`, `list`, and `dict` all support nested paths and branches. `tuple`, `list`, and `dict` all support nested paths and branches.
@ -247,6 +265,10 @@ def traverse_obj(
objs = (list(filtered_objs),) objs = (list(filtered_objs),)
continue continue
if key is filter:
objs = filter(None, objs)
continue
if __debug__ and callable(key): if __debug__ and callable(key):
# Verify function signature # Verify function signature
inspect.signature(key).bind(None, None) inspect.signature(key).bind(None, None)
@ -277,13 +299,143 @@ def traverse_obj(
return results[0] if results else {} if allow_empty and is_dict else None return results[0] if results else {} if allow_empty and is_dict else None
for index, path in enumerate(paths, 1): for index, path in enumerate(paths, 1):
result = _traverse_obj(obj, path, index == len(paths), True) is_last = index == len(paths)
try:
result = _traverse_obj(obj, path, is_last, True)
if result is not None: if result is not None:
return result return result
except _RequiredError as e:
if is_last:
# Reraise to get cleaner stack trace
raise ExtractorError(e.orig_msg, expected=e.expected) from None
return None if default is NO_DEFAULT else default return None if default is NO_DEFAULT else default
def value(value, /):
return lambda _: value
def require(name, /, *, expected=False):
def func(value):
if value is None:
raise _RequiredError(f'Unable to extract {name}', expected=expected)
return value
return func
class _RequiredError(ExtractorError):
pass
@typing.overload
def subs_list_to_dict(*, ext: str | None = None) -> collections.abc.Callable[[list[dict]], dict[str, list[dict]]]: ...
@typing.overload
def subs_list_to_dict(subs: list[dict] | None, /, *, ext: str | None = None) -> dict[str, list[dict]]: ...
def subs_list_to_dict(subs: list[dict] | None = None, /, *, ext=None):
"""
Convert subtitles from a traversal into a subtitle dict.
The path should have an `all` immediately before this function.
Arguments:
`ext` The default value for `ext` in the subtitle dict
In the dict you can set the following additional items:
`id` The subtitle id to sort the dict into
`quality` The sort order for each subtitle
"""
if subs is None:
return functools.partial(subs_list_to_dict, ext=ext)
result = collections.defaultdict(list)
for sub in subs:
if not url_or_none(sub.get('url')) and not sub.get('data'):
continue
sub_id = sub.pop('id', None)
if sub_id is None:
continue
if ext is not None and not sub.get('ext'):
sub['ext'] = ext
result[sub_id].append(sub)
result = dict(result)
for subs in result.values():
subs.sort(key=lambda x: x.pop('quality', 0) or 0)
return result
@typing.overload
def find_element(*, attr: str, value: str, tag: str | None = None, html=False): ...
@typing.overload
def find_element(*, cls: str, html=False): ...
@typing.overload
def find_element(*, id: str, tag: str | None = None, html=False): ...
@typing.overload
def find_element(*, tag: str, html=False): ...
def find_element(*, tag=None, id=None, cls=None, attr=None, value=None, html=False):
# deliberately using `id=` and `cls=` for ease of readability
assert tag or id or cls or (attr and value), 'One of tag, id, cls or (attr AND value) is required'
if not tag:
tag = r'[\w:.-]+'
if attr and value:
assert not cls, 'Cannot match both attr and cls'
assert not id, 'Cannot match both attr and id'
func = get_element_html_by_attribute if html else get_element_by_attribute
return functools.partial(func, attr, value, tag=tag)
elif cls:
assert not id, 'Cannot match both cls and id'
assert tag is None, 'Cannot match both cls and tag'
func = get_element_html_by_class if html else get_elements_by_class
return functools.partial(func, cls)
elif id:
func = get_element_html_by_id if html else get_element_by_id
return functools.partial(func, id, tag=tag)
index = int(bool(html))
return lambda html: get_element_text_and_html_by_tag(tag, html)[index]
@typing.overload
def find_elements(*, cls: str, html=False): ...
@typing.overload
def find_elements(*, attr: str, value: str, tag: str | None = None, html=False): ...
def find_elements(*, tag=None, cls=None, attr=None, value=None, html=False):
# deliberately using `cls=` for ease of readability
assert cls or (attr and value), 'One of cls or (attr AND value) is required'
if attr and value:
assert not cls, 'Cannot match both attr and cls'
func = get_elements_html_by_attribute if html else get_elements_by_attribute
return functools.partial(func, attr, value, tag=tag or r'[\w:.-]+')
assert not tag, 'Cannot match both cls and tag'
func = get_elements_html_by_class if html else get_elements_by_class
return functools.partial(func, cls)
def get_first(obj, *paths, **kwargs): def get_first(obj, *paths, **kwargs):
return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False) return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False)

Loading…
Cancel
Save