mirror of https://github.com/yt-dlp/yt-dlp
Proposal v2, with carried state
parent
8dbf2cf66d
commit
96f9bbf392
@ -0,0 +1,33 @@
|
||||
from .common import InfoExtractor
|
||||
from ..utils.lazy import lazy_ie, lazy_fields
|
||||
|
||||
|
||||
@lazy_ie
|
||||
class LazyExtractorIE(InfoExtractor):
|
||||
IE_NAME = 'lazy'
|
||||
_VALID_URL = r"lazy://(?P<id>.*)"
|
||||
|
||||
def _lazy_webpage(self, storage):
|
||||
return self._download_webpage(storage.url, storage.id)
|
||||
|
||||
@lazy_fields("creator")
|
||||
def _extract_other(self, storage):
|
||||
self.to_screen("Extracting something else from webpage")
|
||||
return {
|
||||
"creator": storage.webpage.partition(" - ")[0],
|
||||
}
|
||||
|
||||
@lazy_fields("title", "description")
|
||||
def _extract_website(self, storage):
|
||||
self.to_screen("Extracting title and description from webpage")
|
||||
title, _, description = storage.webpage.partition("\n")
|
||||
|
||||
return {
|
||||
"title": title,
|
||||
"description": description,
|
||||
}
|
||||
|
||||
# Fake downloading the webpage for testing purposes
|
||||
def _download_webpage(self, url_or_request, video_id, *args, **kwargs):
|
||||
self.to_screen(f"[{video_id}] Downloaded webpage ({url_or_request})")
|
||||
return "<creator> - Fake Webpage title\nThis is the description.\n..."
|
@ -0,0 +1,151 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import functools
|
||||
import inspect
|
||||
from collections.abc import MutableMapping
|
||||
|
||||
from ..utils import try_call
|
||||
from ..extractor.common import InfoExtractor
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Callable, Any
|
||||
|
||||
|
||||
class _LazyStorage:
|
||||
def __init__(self, ie, **kwargs):
|
||||
self._ie = ie
|
||||
self._cache = kwargs
|
||||
|
||||
def __setattr__(self, name, value, /) -> None:
|
||||
if name.startswith("_"):
|
||||
super().__setattr__(name, value)
|
||||
else:
|
||||
self._cache[name] = value
|
||||
|
||||
def __getattr__(self, name: str):
|
||||
if name in self._cache:
|
||||
return self._cache[name]
|
||||
|
||||
resolver = getattr(self._ie, f"_lazy_{name}")
|
||||
result = try_call(resolver, args=(self,))
|
||||
self._cache[name] = result
|
||||
return result
|
||||
|
||||
def __delattr__(self, name: str) -> None:
|
||||
if name.startswith("_"):
|
||||
super().__delattr__(name)
|
||||
elif name in self._cache:
|
||||
del self._cache[name]
|
||||
|
||||
|
||||
class _LazyInfoDict(MutableMapping):
|
||||
def __init__(self, data: dict, lazy: dict, ie: InfoExtractor, **kwargs):
|
||||
self._data = data
|
||||
self._lazy = lazy
|
||||
self._ie = ie
|
||||
self._storage = _LazyStorage(self._ie, **kwargs)
|
||||
|
||||
for key in self._data.keys() & self._lazy.keys():
|
||||
del self._lazy[key]
|
||||
|
||||
self._data.update(dict.fromkeys(self._lazy.keys()))
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self._data
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key in self._lazy:
|
||||
compute_func = self._lazy[key]
|
||||
|
||||
# updates = try_call(compute_func, args=(self._storage,), expected_type=dict) or {}
|
||||
updates = compute_func(self._ie, self._storage)
|
||||
self._data.update(updates)
|
||||
for field in updates:
|
||||
self._lazy.pop(field, None)
|
||||
|
||||
fields = getattr(compute_func, lazy_fields._field_name, None) or ()
|
||||
for field in fields:
|
||||
self._lazy.pop(field, None)
|
||||
|
||||
return self._data[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key in self._lazy:
|
||||
del self._lazy[key]
|
||||
|
||||
self._data[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
if key in self._lazy:
|
||||
del self._lazy[key]
|
||||
|
||||
del self._data[key]
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._data)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._data)
|
||||
|
||||
def __repr__(self):
|
||||
if self._lazy:
|
||||
lazy = ", ".join(f"{key!r}: ..." for key in self._lazy.keys())
|
||||
data = ", ".join(f"{key!r}: {value!r}" for key, value in self._data.items() if key not in self._lazy)
|
||||
data = f"{{{data}}}, lazy={{{lazy}}}"
|
||||
else:
|
||||
data = f"{self._data!r}"
|
||||
return f"{type(self).__name__}({data})"
|
||||
|
||||
|
||||
def _default_lazy_extract(self, url):
|
||||
return dict(id=self._match_id(url))
|
||||
|
||||
|
||||
def lazy_ie(klass: type[InfoExtractor] | None = None, /):
|
||||
if not klass:
|
||||
return lazy_ie
|
||||
|
||||
_old_extract = klass._real_extract
|
||||
if _old_extract is InfoExtractor._real_extract:
|
||||
_old_extract = _default_lazy_extract
|
||||
|
||||
lazy_members = {}
|
||||
for _, member in inspect.getmembers(klass):
|
||||
fields = getattr(member, lazy_fields._field_name, None)
|
||||
if not isinstance(fields, tuple):
|
||||
continue
|
||||
|
||||
for field in fields:
|
||||
lazy_members[field] = member
|
||||
|
||||
@functools.wraps(_old_extract)
|
||||
def _real_extract(self, url):
|
||||
result = _old_extract(self, url)
|
||||
assert isinstance(result, dict), 'Lazy extractors need to return a dict'
|
||||
return _LazyInfoDict(result, lazy_members, self, url=url, **result)
|
||||
|
||||
klass._real_extract = _real_extract
|
||||
return klass
|
||||
|
||||
|
||||
def lazy_fields(*fields: str) -> Callable[[Callable[[Any, _LazyStorage], dict[str, Any]]], Callable[[Any, _LazyStorage], dict[str, Any]]]:
|
||||
def _lazy_fields(func):
|
||||
setattr(func, lazy_fields._field_name, fields)
|
||||
return func
|
||||
|
||||
return _lazy_fields
|
||||
|
||||
|
||||
lazy_fields._field_name = "_lazy_fields"
|
||||
|
||||
if __name__ == '__main__':
|
||||
from yt_dlp import YoutubeDL
|
||||
|
||||
with YoutubeDL() as ydl:
|
||||
result = ydl.extract_info("lazy://<URL>", process=False)
|
||||
assert result
|
||||
|
||||
for name in "id", "title", "creator", "description":
|
||||
print(f"{name:<10} = {result[name]!r}")
|
@ -1,76 +0,0 @@
|
||||
from collections.abc import MutableMapping
|
||||
|
||||
from yt_dlp.utils import try_call
|
||||
|
||||
|
||||
class LazyInfoDict(MutableMapping):
|
||||
def __init__(self, data=None, lazy=None):
|
||||
self._data = data or {}
|
||||
self._lazy = lazy or {}
|
||||
|
||||
for key in self._data.keys() & self._lazy.keys():
|
||||
del self._lazy[key]
|
||||
|
||||
self._data.update(dict.fromkeys(self._lazy.keys()))
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self._data
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key in self._lazy:
|
||||
compute_func = self._lazy[key]
|
||||
|
||||
print(f"Evaluating key {key!r}")
|
||||
updates = try_call(compute_func, expected_type=dict) or {}
|
||||
self._data.update(updates)
|
||||
for update in updates:
|
||||
self._lazy.pop(update, None)
|
||||
|
||||
return self._data[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key in self._lazy:
|
||||
del self._lazy[key]
|
||||
|
||||
self._data[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
if key in self._lazy:
|
||||
del self._lazy[key]
|
||||
|
||||
del self._data[key]
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._data)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._data)
|
||||
|
||||
def __repr__(self):
|
||||
data = f"{self._data!r}"
|
||||
if self._lazy:
|
||||
data += f", lazy={set(self._lazy.keys())!r}"
|
||||
return f"{type(self).__name__}({data})"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
def eval_test():
|
||||
print('eval_test')
|
||||
return {'test': 'test'}
|
||||
|
||||
def eval_else():
|
||||
print('eval_else')
|
||||
return {'something': 'something', 'else': 'else'}
|
||||
|
||||
data = LazyInfoDict({
|
||||
'nonlazy': 'nonlazy',
|
||||
'attribute': 'attribute',
|
||||
}, {
|
||||
'test': eval_test,
|
||||
'something': eval_else,
|
||||
'else': eval_else,
|
||||
})
|
||||
print(f'{data["else"]=}')
|
||||
print('-----')
|
||||
for key, value in data.items():
|
||||
print(f'data[{key!r}]={value!r}')
|
Loading…
Reference in New Issue