You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
yt-dlp/yt_dlp/plugins.py

291 lines
9.3 KiB
Python

import contextlib
import dataclasses
import importlib
import importlib.abc
import importlib.machinery
import importlib.util
import inspect
import itertools
import os
import pkgutil
import sys
import traceback
import zipimport
import functools
from pathlib import Path
from zipfile import ZipFile
from ._globals import (
plugin_dirs,
all_plugins_loaded,
plugin_specs,
plugins_enabled,
Indirect,
)
from .utils import (
get_executable_path,
get_system_config_dirs,
get_user_config_dirs,
merge_dicts,
orderedSet,
write_string,
YoutubeDLError,
)
PACKAGE_NAME = 'yt_dlp_plugins'
COMPAT_PACKAGE_NAME = 'ytdlp_plugins'
_BASE_PACKAGE_PATH = Path(__file__).parent
# Public APIs
# Anything else is NOT public and no backwards compatibility is guaranteed
__all__ = [
'directories',
'load_plugins',
'load_all_plugins',
'register_plugin_spec',
'add_plugin_dirs',
'set_plugin_dirs',
'disable_plugins',
'get_plugin_spec',
'PACKAGE_NAME',
'COMPAT_PACKAGE_NAME',
]
@dataclasses.dataclass
class PluginSpec:
module_name: str
suffix: str
destination: Indirect
plugin_destination: Indirect
class PluginLoader(importlib.abc.Loader):
"""Dummy loader for virtual namespace packages"""
def exec_module(self, module):
return None
@functools.cache
def dirs_in_zip(archive):
try:
with ZipFile(archive) as zip_:
return set(itertools.chain.from_iterable(
Path(file).parents for file in zip_.namelist()))
except FileNotFoundError:
pass
except Exception as e:
write_string(f'WARNING: Could not read zip file {archive}: {e}\n')
return ()
def external_plugin_paths():
def _get_package_paths(*root_paths, containing_folder):
for config_dir in orderedSet(map(Path, root_paths), lazy=True):
# We need to filter the base path added when running __main__.py directly
if config_dir == _BASE_PACKAGE_PATH:
continue
with contextlib.suppress(OSError):
yield from (config_dir / containing_folder).iterdir()
# Load from yt-dlp config folders
yield from _get_package_paths(
*get_user_config_dirs('yt-dlp'),
*get_system_config_dirs('yt-dlp'),
containing_folder='plugins',
)
# Load from yt-dlp-plugins folders
yield from _get_package_paths(
get_executable_path(),
*get_user_config_dirs(''),
*get_system_config_dirs(''),
containing_folder='yt-dlp-plugins',
)
yield from internal_plugin_paths()
def internal_plugin_paths():
# Always try load from PYTHONPATH folders
yield from (path for path in map(Path, sys.path) if path != _BASE_PACKAGE_PATH)
def candidate_plugin_paths(candidate):
candidate_path = Path(candidate)
if not candidate_path.is_dir():
raise ValueError(f'Invalid plugin directory: {candidate_path}')
yield from candidate_path.iterdir()
yield from internal_plugin_paths()
class PluginFinder(importlib.abc.MetaPathFinder):
"""
This class provides one or multiple namespace packages.
It searches in sys.path and yt-dlp config folders for
the existing subdirectories from which the modules can be imported
"""
def __init__(self, *packages):
self._zip_content_cache = {}
self.packages = set(
itertools.chain.from_iterable(
itertools.accumulate(name.split('.'), lambda a, b: '.'.join((a, b)))
for name in packages
),
)
def search_locations(self, fullname):
candidate_locations = itertools.chain.from_iterable(
external_plugin_paths() if candidate == 'external' else candidate_plugin_paths(candidate)
for candidate in plugin_dirs.value
)
parts = Path(*fullname.split('.'))
for path in orderedSet(candidate_locations, lazy=True):
candidate = path / parts
try:
if candidate.is_dir():
yield candidate
elif path.suffix in ('.zip', '.egg', '.whl') and path.is_file():
if parts in dirs_in_zip(path):
yield candidate
except PermissionError as e:
write_string(f'Permission error while accessing modules in "{e.filename}"\n')
def find_spec(self, fullname, path=None, target=None):
if not plugins_enabled.value:
return None
if fullname not in self.packages:
return None
search_locations = list(map(str, self.search_locations(fullname)))
if not search_locations:
return None
spec = importlib.machinery.ModuleSpec(fullname, PluginLoader(), is_package=True)
spec.submodule_search_locations = search_locations
return spec
def invalidate_caches(self):
dirs_in_zip.cache_clear()
for package in self.packages:
if package in sys.modules:
del sys.modules[package]
def directories():
spec = importlib.util.find_spec(PACKAGE_NAME)
return list(spec.submodule_search_locations) if spec else []
def iter_modules(subpackage):
fullname = f'{PACKAGE_NAME}.{subpackage}'
with contextlib.suppress(ModuleNotFoundError):
pkg = importlib.import_module(fullname)
yield from pkgutil.iter_modules(path=pkg.__path__, prefix=f'{fullname}.')
def get_regular_classes(module, module_name, suffix):
# Find standard public plugin classes (not overrides)
return inspect.getmembers(module, lambda obj: (
inspect.isclass(obj)
and obj.__name__.endswith(suffix)
and obj.__module__.startswith(module_name)
and not obj.__name__.startswith('_')
and obj.__name__ in getattr(module, '__all__', [obj.__name__])
and getattr(obj, 'PLUGIN_NAME', None) is None
))
def load_plugins(plugin_spec: PluginSpec):
name, suffix = plugin_spec.module_name, plugin_spec.suffix
regular_classes = {}
if os.environ.get('YTDLP_NO_PLUGINS') or plugins_enabled.value is False:
return regular_classes
for finder, module_name, _ in iter_modules(name):
if any(x.startswith('_') for x in module_name.split('.')):
continue
try:
if sys.version_info < (3, 10) and isinstance(finder, zipimport.zipimporter):
# zipimporter.load_module() is deprecated in 3.10 and removed in 3.12
# The exec_module branch below is the replacement for >= 3.10
# See: https://docs.python.org/3/library/zipimport.html#zipimport.zipimporter.exec_module
module = finder.load_module(module_name)
else:
spec = finder.find_spec(module_name)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
except Exception:
write_string(
f'Error while importing module {module_name!r}\n{traceback.format_exc(limit=-1)}',
)
continue
regular_classes.update(get_regular_classes(module, module_name, suffix))
# Compat: old plugin system using __init__.py
# Note: plugins imported this way do not show up in directories()
# nor are considered part of the yt_dlp_plugins namespace package
if 'external' in plugin_dirs.value:
with contextlib.suppress(FileNotFoundError):
spec = importlib.util.spec_from_file_location(
name,
Path(get_executable_path(), COMPAT_PACKAGE_NAME, name, '__init__.py'),
)
plugins = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = plugins
spec.loader.exec_module(plugins)
regular_classes.update(get_regular_classes(plugins, spec.name, suffix))
# Add the classes into the global plugin lookup for that type
plugin_spec.plugin_destination.value = regular_classes
# We want to prepend to the main lookup for that type
plugin_spec.destination.value = merge_dicts(regular_classes, plugin_spec.destination.value)
return regular_classes
def load_all_plugins():
for plugin_spec in plugin_specs.value.values():
load_plugins(plugin_spec)
all_plugins_loaded.value = True
def register_plugin_spec(plugin_spec: PluginSpec):
# If the plugin spec for a module is already registered, it will not be added again
if plugin_spec.module_name not in plugin_specs.value:
plugin_specs.value[plugin_spec.module_name] = plugin_spec
sys.meta_path.insert(0, PluginFinder(f'{PACKAGE_NAME}.{plugin_spec.module_name}'))
def add_plugin_dirs(*paths):
"""Add external plugin dirs to the existing ones"""
plugin_dirs.value.extend(paths)
def set_plugin_dirs(*paths):
"""Set external plugin dirs, overriding the default ones"""
plugin_dirs.value = list(paths)
def get_plugin_spec(module_name):
return plugin_specs.value.get(module_name)
def disable_plugins():
if (
all_plugins_loaded.value
or any(len(plugin_spec.plugin_destination.value) != 0 for plugin_spec in plugin_specs.value.values())
):
# note: we can't detect all cases when plugins are loaded (e.g. if spec isn't registered)
raise YoutubeDLError('Plugins have already been loaded. Cannot disable plugins after loading plugins.')
plugins_enabled.value = False