mirror of https://github.com/yt-dlp/yt-dlp
Merge branch 'yt-dlp:master' into fix-ximalaya-album
commit
ebe8c36176
@ -0,0 +1,71 @@
|
||||
import collections
|
||||
|
||||
import pytest
|
||||
|
||||
from yt_dlp import YoutubeDL
|
||||
from yt_dlp.cookies import YoutubeDLCookieJar
|
||||
from yt_dlp.extractor.common import InfoExtractor
|
||||
from yt_dlp.extractor.youtube.pot._provider import IEContentProviderLogger
|
||||
from yt_dlp.extractor.youtube.pot.provider import PoTokenRequest, PoTokenContext
|
||||
from yt_dlp.utils.networking import HTTPHeaderDict
|
||||
|
||||
|
||||
class MockLogger(IEContentProviderLogger):
|
||||
|
||||
log_level = IEContentProviderLogger.LogLevel.TRACE
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.messages = collections.defaultdict(list)
|
||||
|
||||
def trace(self, message: str):
|
||||
self.messages['trace'].append(message)
|
||||
|
||||
def debug(self, message: str):
|
||||
self.messages['debug'].append(message)
|
||||
|
||||
def info(self, message: str):
|
||||
self.messages['info'].append(message)
|
||||
|
||||
def warning(self, message: str, *, once=False):
|
||||
self.messages['warning'].append(message)
|
||||
|
||||
def error(self, message: str):
|
||||
self.messages['error'].append(message)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ie() -> InfoExtractor:
|
||||
ydl = YoutubeDL()
|
||||
return ydl.get_info_extractor('Youtube')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def logger() -> MockLogger:
|
||||
return MockLogger()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def pot_request() -> PoTokenRequest:
|
||||
return PoTokenRequest(
|
||||
context=PoTokenContext.GVS,
|
||||
innertube_context={'client': {'clientName': 'WEB'}},
|
||||
innertube_host='youtube.com',
|
||||
session_index=None,
|
||||
player_url=None,
|
||||
is_authenticated=False,
|
||||
video_webpage=None,
|
||||
|
||||
visitor_data='example-visitor-data',
|
||||
data_sync_id='example-data-sync-id',
|
||||
video_id='example-video-id',
|
||||
|
||||
request_cookiejar=YoutubeDLCookieJar(),
|
||||
request_proxy=None,
|
||||
request_headers=HTTPHeaderDict(),
|
||||
request_timeout=None,
|
||||
request_source_address=None,
|
||||
request_verify_tls=True,
|
||||
|
||||
bypass_cache=False,
|
||||
)
|
@ -0,0 +1,117 @@
|
||||
import threading
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
import pytest
|
||||
from yt_dlp.extractor.youtube.pot._provider import IEContentProvider, BuiltinIEContentProvider
|
||||
from yt_dlp.utils import bug_reports_message
|
||||
from yt_dlp.extractor.youtube.pot._builtin.memory_cache import MemoryLRUPCP, memorylru_preference, initialize_global_cache
|
||||
from yt_dlp.version import __version__
|
||||
from yt_dlp.extractor.youtube.pot._registry import _pot_cache_providers, _pot_memory_cache
|
||||
|
||||
|
||||
class TestMemoryLRUPCS:
|
||||
|
||||
def test_base_type(self):
|
||||
assert issubclass(MemoryLRUPCP, IEContentProvider)
|
||||
assert issubclass(MemoryLRUPCP, BuiltinIEContentProvider)
|
||||
|
||||
@pytest.fixture
|
||||
def pcp(self, ie, logger) -> MemoryLRUPCP:
|
||||
return MemoryLRUPCP(ie, logger, {}, initialize_cache=lambda max_size: (OrderedDict(), threading.Lock(), max_size))
|
||||
|
||||
def test_is_registered(self):
|
||||
assert _pot_cache_providers.value.get('MemoryLRU') == MemoryLRUPCP
|
||||
|
||||
def test_initialization(self, pcp):
|
||||
assert pcp.PROVIDER_NAME == 'memory'
|
||||
assert pcp.PROVIDER_VERSION == __version__
|
||||
assert pcp.BUG_REPORT_MESSAGE == bug_reports_message(before='')
|
||||
assert pcp.is_available()
|
||||
|
||||
def test_store_and_get(self, pcp):
|
||||
pcp.store('key1', 'value1', int(time.time()) + 60)
|
||||
assert pcp.get('key1') == 'value1'
|
||||
assert len(pcp.cache) == 1
|
||||
|
||||
def test_store_ignore_expired(self, pcp):
|
||||
pcp.store('key1', 'value1', int(time.time()) - 1)
|
||||
assert len(pcp.cache) == 0
|
||||
assert pcp.get('key1') is None
|
||||
assert len(pcp.cache) == 0
|
||||
|
||||
def test_store_override_existing_key(self, ie, logger):
|
||||
MAX_SIZE = 2
|
||||
pcp = MemoryLRUPCP(ie, logger, {}, initialize_cache=lambda max_size: (OrderedDict(), threading.Lock(), MAX_SIZE))
|
||||
pcp.store('key1', 'value1', int(time.time()) + 60)
|
||||
pcp.store('key2', 'value2', int(time.time()) + 60)
|
||||
assert len(pcp.cache) == 2
|
||||
pcp.store('key1', 'value2', int(time.time()) + 60)
|
||||
# Ensure that the override key gets added to the end of the cache instead of in the same position
|
||||
pcp.store('key3', 'value3', int(time.time()) + 60)
|
||||
assert pcp.get('key1') == 'value2'
|
||||
|
||||
def test_store_ignore_expired_existing_key(self, pcp):
|
||||
pcp.store('key1', 'value2', int(time.time()) + 60)
|
||||
pcp.store('key1', 'value1', int(time.time()) - 1)
|
||||
assert len(pcp.cache) == 1
|
||||
assert pcp.get('key1') == 'value2'
|
||||
assert len(pcp.cache) == 1
|
||||
|
||||
def test_get_key_expired(self, pcp):
|
||||
pcp.store('key1', 'value1', int(time.time()) + 60)
|
||||
assert pcp.get('key1') == 'value1'
|
||||
assert len(pcp.cache) == 1
|
||||
pcp.cache['key1'] = ('value1', int(time.time()) - 1)
|
||||
assert pcp.get('key1') is None
|
||||
assert len(pcp.cache) == 0
|
||||
|
||||
def test_lru_eviction(self, ie, logger):
|
||||
MAX_SIZE = 2
|
||||
provider = MemoryLRUPCP(ie, logger, {}, initialize_cache=lambda max_size: (OrderedDict(), threading.Lock(), MAX_SIZE))
|
||||
provider.store('key1', 'value1', int(time.time()) + 5)
|
||||
provider.store('key2', 'value2', int(time.time()) + 5)
|
||||
assert len(provider.cache) == 2
|
||||
|
||||
assert provider.get('key1') == 'value1'
|
||||
|
||||
provider.store('key3', 'value3', int(time.time()) + 5)
|
||||
assert len(provider.cache) == 2
|
||||
|
||||
assert provider.get('key2') is None
|
||||
|
||||
provider.store('key4', 'value4', int(time.time()) + 5)
|
||||
assert len(provider.cache) == 2
|
||||
|
||||
assert provider.get('key1') is None
|
||||
assert provider.get('key3') == 'value3'
|
||||
assert provider.get('key4') == 'value4'
|
||||
|
||||
def test_delete(self, pcp):
|
||||
pcp.store('key1', 'value1', int(time.time()) + 5)
|
||||
assert len(pcp.cache) == 1
|
||||
assert pcp.get('key1') == 'value1'
|
||||
pcp.delete('key1')
|
||||
assert len(pcp.cache) == 0
|
||||
assert pcp.get('key1') is None
|
||||
|
||||
def test_use_global_cache_default(self, ie, logger):
|
||||
pcp = MemoryLRUPCP(ie, logger, {})
|
||||
assert pcp.max_size == _pot_memory_cache.value['max_size'] == 25
|
||||
assert pcp.cache is _pot_memory_cache.value['cache']
|
||||
assert pcp.lock is _pot_memory_cache.value['lock']
|
||||
|
||||
pcp2 = MemoryLRUPCP(ie, logger, {})
|
||||
assert pcp.max_size == pcp2.max_size == _pot_memory_cache.value['max_size'] == 25
|
||||
assert pcp.cache is pcp2.cache is _pot_memory_cache.value['cache']
|
||||
assert pcp.lock is pcp2.lock is _pot_memory_cache.value['lock']
|
||||
|
||||
def test_fail_max_size_change_global(self, ie, logger):
|
||||
pcp = MemoryLRUPCP(ie, logger, {})
|
||||
assert pcp.max_size == _pot_memory_cache.value['max_size'] == 25
|
||||
with pytest.raises(ValueError, match='Cannot change max_size of initialized global memory cache'):
|
||||
initialize_global_cache(50)
|
||||
|
||||
assert pcp.max_size == _pot_memory_cache.value['max_size'] == 25
|
||||
|
||||
def test_memory_lru_preference(self, pcp, ie, pot_request):
|
||||
assert memorylru_preference(pcp, pot_request) == 10000
|
@ -0,0 +1,46 @@
|
||||
import pytest
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenContext,
|
||||
|
||||
)
|
||||
|
||||
from yt_dlp.extractor.youtube.pot.utils import get_webpo_content_binding, ContentBindingType
|
||||
|
||||
|
||||
class TestGetWebPoContentBinding:
|
||||
|
||||
@pytest.mark.parametrize('client_name, context, is_authenticated, expected', [
|
||||
*[(client, context, is_authenticated, expected) for client in [
|
||||
'WEB', 'MWEB', 'TVHTML5', 'WEB_EMBEDDED_PLAYER', 'WEB_CREATOR', 'TVHTML5_SIMPLY_EMBEDDED_PLAYER']
|
||||
for context, is_authenticated, expected in [
|
||||
(PoTokenContext.GVS, False, ('example-visitor-data', ContentBindingType.VISITOR_DATA)),
|
||||
(PoTokenContext.PLAYER, False, ('example-video-id', ContentBindingType.VIDEO_ID)),
|
||||
(PoTokenContext.GVS, True, ('example-data-sync-id', ContentBindingType.DATASYNC_ID)),
|
||||
]],
|
||||
('WEB_REMIX', PoTokenContext.GVS, False, ('example-visitor-data', ContentBindingType.VISITOR_DATA)),
|
||||
('WEB_REMIX', PoTokenContext.PLAYER, False, ('example-visitor-data', ContentBindingType.VISITOR_DATA)),
|
||||
('ANDROID', PoTokenContext.GVS, False, (None, None)),
|
||||
('IOS', PoTokenContext.GVS, False, (None, None)),
|
||||
])
|
||||
def test_get_webpo_content_binding(self, pot_request, client_name, context, is_authenticated, expected):
|
||||
pot_request.innertube_context['client']['clientName'] = client_name
|
||||
pot_request.context = context
|
||||
pot_request.is_authenticated = is_authenticated
|
||||
assert get_webpo_content_binding(pot_request) == expected
|
||||
|
||||
def test_extract_visitor_id(self, pot_request):
|
||||
pot_request.visitor_data = 'CgsxMjNhYmNYWVpfLSiA4s%2DqBg%3D%3D'
|
||||
assert get_webpo_content_binding(pot_request, bind_to_visitor_id=True) == ('123abcXYZ_-', ContentBindingType.VISITOR_ID)
|
||||
|
||||
def test_invalid_visitor_id(self, pot_request):
|
||||
# visitor id not alphanumeric (i.e. protobuf extraction failed)
|
||||
pot_request.visitor_data = 'CggxMjM0NTY3OCiA4s-qBg%3D%3D'
|
||||
assert get_webpo_content_binding(pot_request, bind_to_visitor_id=True) == (pot_request.visitor_data, ContentBindingType.VISITOR_DATA)
|
||||
|
||||
def test_no_visitor_id(self, pot_request):
|
||||
pot_request.visitor_data = 'KIDiz6oG'
|
||||
assert get_webpo_content_binding(pot_request, bind_to_visitor_id=True) == (pot_request.visitor_data, ContentBindingType.VISITOR_DATA)
|
||||
|
||||
def test_invalid_base64(self, pot_request):
|
||||
pot_request.visitor_data = 'invalid-base64'
|
||||
assert get_webpo_content_binding(pot_request, bind_to_visitor_id=True) == (pot_request.visitor_data, ContentBindingType.VISITOR_DATA)
|
@ -0,0 +1,92 @@
|
||||
import pytest
|
||||
|
||||
from yt_dlp.extractor.youtube.pot._provider import IEContentProvider, BuiltinIEContentProvider
|
||||
from yt_dlp.extractor.youtube.pot.cache import CacheProviderWritePolicy
|
||||
from yt_dlp.utils import bug_reports_message
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenRequest,
|
||||
PoTokenContext,
|
||||
|
||||
)
|
||||
from yt_dlp.version import __version__
|
||||
|
||||
from yt_dlp.extractor.youtube.pot._builtin.webpo_cachespec import WebPoPCSP
|
||||
from yt_dlp.extractor.youtube.pot._registry import _pot_pcs_providers
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def pot_request(pot_request) -> PoTokenRequest:
|
||||
pot_request.visitor_data = 'CgsxMjNhYmNYWVpfLSiA4s%2DqBg%3D%3D' # visitor_id=123abcXYZ_-
|
||||
return pot_request
|
||||
|
||||
|
||||
class TestWebPoPCSP:
|
||||
def test_base_type(self):
|
||||
assert issubclass(WebPoPCSP, IEContentProvider)
|
||||
assert issubclass(WebPoPCSP, BuiltinIEContentProvider)
|
||||
|
||||
def test_init(self, ie, logger):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
assert pcs.PROVIDER_NAME == 'webpo'
|
||||
assert pcs.PROVIDER_VERSION == __version__
|
||||
assert pcs.BUG_REPORT_MESSAGE == bug_reports_message(before='')
|
||||
assert pcs.is_available()
|
||||
|
||||
def test_is_registered(self):
|
||||
assert _pot_pcs_providers.value.get('WebPo') == WebPoPCSP
|
||||
|
||||
@pytest.mark.parametrize('client_name, context, is_authenticated', [
|
||||
('ANDROID', PoTokenContext.GVS, False),
|
||||
('IOS', PoTokenContext.GVS, False),
|
||||
('IOS', PoTokenContext.PLAYER, False),
|
||||
])
|
||||
def test_not_supports(self, ie, logger, pot_request, client_name, context, is_authenticated):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
pot_request.innertube_context['client']['clientName'] = client_name
|
||||
pot_request.context = context
|
||||
pot_request.is_authenticated = is_authenticated
|
||||
assert pcs.generate_cache_spec(pot_request) is None
|
||||
|
||||
@pytest.mark.parametrize('client_name, context, is_authenticated, remote_host, source_address, request_proxy, expected', [
|
||||
*[(client, context, is_authenticated, remote_host, source_address, request_proxy, expected) for client in [
|
||||
'WEB', 'MWEB', 'TVHTML5', 'WEB_EMBEDDED_PLAYER', 'WEB_CREATOR', 'TVHTML5_SIMPLY_EMBEDDED_PLAYER']
|
||||
for context, is_authenticated, remote_host, source_address, request_proxy, expected in [
|
||||
(PoTokenContext.GVS, False, 'example-remote-host', 'example-source-address', 'example-request-proxy', {'t': 'webpo', 'ip': 'example-remote-host', 'sa': 'example-source-address', 'px': 'example-request-proxy', 'cb': '123abcXYZ_-', 'cbt': 'visitor_id'}),
|
||||
(PoTokenContext.PLAYER, False, 'example-remote-host', 'example-source-address', 'example-request-proxy', {'t': 'webpo', 'ip': 'example-remote-host', 'sa': 'example-source-address', 'px': 'example-request-proxy', 'cb': '123abcXYZ_-', 'cbt': 'video_id'}),
|
||||
(PoTokenContext.GVS, True, 'example-remote-host', 'example-source-address', 'example-request-proxy', {'t': 'webpo', 'ip': 'example-remote-host', 'sa': 'example-source-address', 'px': 'example-request-proxy', 'cb': 'example-data-sync-id', 'cbt': 'datasync_id'}),
|
||||
]],
|
||||
('WEB_REMIX', PoTokenContext.PLAYER, False, 'example-remote-host', 'example-source-address', 'example-request-proxy', {'t': 'webpo', 'ip': 'example-remote-host', 'sa': 'example-source-address', 'px': 'example-request-proxy', 'cb': '123abcXYZ_-', 'cbt': 'visitor_id'}),
|
||||
('WEB', PoTokenContext.GVS, False, None, None, None, {'t': 'webpo', 'cb': '123abcXYZ_-', 'cbt': 'visitor_id', 'ip': None, 'sa': None, 'px': None}),
|
||||
('TVHTML5', PoTokenContext.PLAYER, False, None, None, 'http://example.com', {'t': 'webpo', 'cb': '123abcXYZ_-', 'cbt': 'video_id', 'ip': None, 'sa': None, 'px': 'http://example.com'}),
|
||||
|
||||
])
|
||||
def test_generate_key_bindings(self, ie, logger, pot_request, client_name, context, is_authenticated, remote_host, source_address, request_proxy, expected):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
pot_request.innertube_context['client']['clientName'] = client_name
|
||||
pot_request.context = context
|
||||
pot_request.is_authenticated = is_authenticated
|
||||
pot_request.innertube_context['client']['remoteHost'] = remote_host
|
||||
pot_request.request_source_address = source_address
|
||||
pot_request.request_proxy = request_proxy
|
||||
pot_request.video_id = '123abcXYZ_-' # same as visitor id to test type
|
||||
|
||||
assert pcs.generate_cache_spec(pot_request).key_bindings == expected
|
||||
|
||||
def test_no_bind_visitor_id(self, ie, logger, pot_request):
|
||||
# Should not bind to visitor id if setting is set to False
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={'bind_to_visitor_id': ['false']})
|
||||
pot_request.innertube_context['client']['clientName'] = 'WEB'
|
||||
pot_request.context = PoTokenContext.GVS
|
||||
pot_request.is_authenticated = False
|
||||
assert pcs.generate_cache_spec(pot_request).key_bindings == {'t': 'webpo', 'ip': None, 'sa': None, 'px': None, 'cb': 'CgsxMjNhYmNYWVpfLSiA4s%2DqBg%3D%3D', 'cbt': 'visitor_data'}
|
||||
|
||||
def test_default_ttl(self, ie, logger, pot_request):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
assert pcs.generate_cache_spec(pot_request).default_ttl == 6 * 60 * 60 # should default to 6 hours
|
||||
|
||||
def test_write_policy(self, ie, logger, pot_request):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
pot_request.context = PoTokenContext.GVS
|
||||
assert pcs.generate_cache_spec(pot_request).write_policy == CacheProviderWritePolicy.WRITE_ALL
|
||||
pot_request.context = PoTokenContext.PLAYER
|
||||
assert pcs.generate_cache_spec(pot_request).write_policy == CacheProviderWritePolicy.WRITE_FIRST
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,629 @@
|
||||
import pytest
|
||||
|
||||
from yt_dlp.extractor.youtube.pot._provider import IEContentProvider
|
||||
from yt_dlp.cookies import YoutubeDLCookieJar
|
||||
from yt_dlp.utils.networking import HTTPHeaderDict
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenRequest,
|
||||
PoTokenContext,
|
||||
ExternalRequestFeature,
|
||||
|
||||
)
|
||||
|
||||
from yt_dlp.extractor.youtube.pot.cache import (
|
||||
PoTokenCacheProvider,
|
||||
PoTokenCacheSpec,
|
||||
PoTokenCacheSpecProvider,
|
||||
CacheProviderWritePolicy,
|
||||
)
|
||||
|
||||
import yt_dlp.extractor.youtube.pot.cache as cache
|
||||
|
||||
from yt_dlp.networking import Request
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenResponse,
|
||||
PoTokenProvider,
|
||||
PoTokenProviderRejectedRequest,
|
||||
provider_bug_report_message,
|
||||
register_provider,
|
||||
register_preference,
|
||||
)
|
||||
|
||||
from yt_dlp.extractor.youtube.pot._registry import _pot_providers, _ptp_preferences, _pot_pcs_providers, _pot_cache_providers, _pot_cache_provider_preferences
|
||||
|
||||
|
||||
class ExamplePTP(PoTokenProvider):
|
||||
PROVIDER_NAME = 'example'
|
||||
PROVIDER_VERSION = '0.0.1'
|
||||
BUG_REPORT_LOCATION = 'https://example.com/issues'
|
||||
|
||||
_SUPPORTED_CLIENTS = ('WEB',)
|
||||
_SUPPORTED_CONTEXTS = (PoTokenContext.GVS, )
|
||||
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = (
|
||||
ExternalRequestFeature.PROXY_SCHEME_HTTP,
|
||||
ExternalRequestFeature.PROXY_SCHEME_SOCKS5H,
|
||||
)
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
return PoTokenResponse('example-token', expires_at=123)
|
||||
|
||||
|
||||
class ExampleCacheProviderPCP(PoTokenCacheProvider):
|
||||
|
||||
PROVIDER_NAME = 'example'
|
||||
PROVIDER_VERSION = '0.0.1'
|
||||
BUG_REPORT_LOCATION = 'https://example.com/issues'
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def get(self, key: str):
|
||||
return 'example-cache'
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
|
||||
class ExampleCacheSpecProviderPCSP(PoTokenCacheSpecProvider):
|
||||
|
||||
PROVIDER_NAME = 'example'
|
||||
PROVIDER_VERSION = '0.0.1'
|
||||
BUG_REPORT_LOCATION = 'https://example.com/issues'
|
||||
|
||||
def generate_cache_spec(self, request: PoTokenRequest):
|
||||
return PoTokenCacheSpec(
|
||||
key_bindings={'field': 'example-key'},
|
||||
default_ttl=60,
|
||||
write_policy=CacheProviderWritePolicy.WRITE_FIRST,
|
||||
)
|
||||
|
||||
|
||||
class TestPoTokenProvider:
|
||||
|
||||
def test_base_type(self):
|
||||
assert issubclass(PoTokenProvider, IEContentProvider)
|
||||
|
||||
def test_create_provider_missing_fetch_method(self, ie, logger):
|
||||
class MissingMethodsPTP(PoTokenProvider):
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_missing_available_method(self, ie, logger):
|
||||
class MissingMethodsPTP(PoTokenProvider):
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
raise PoTokenProviderRejectedRequest('Not implemented')
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_barebones_provider(self, ie, logger):
|
||||
class BarebonesProviderPTP(PoTokenProvider):
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
raise PoTokenProviderRejectedRequest('Not implemented')
|
||||
|
||||
provider = BarebonesProviderPTP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_KEY == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.0'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at (developer has not provided a bug report location) .'
|
||||
|
||||
def test_example_provider_success(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'example'
|
||||
assert provider.PROVIDER_KEY == 'Example'
|
||||
assert provider.PROVIDER_VERSION == '0.0.1'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at https://example.com/issues .'
|
||||
assert provider.is_available()
|
||||
|
||||
response = provider.request_pot(pot_request)
|
||||
|
||||
assert response.po_token == 'example-token'
|
||||
assert response.expires_at == 123
|
||||
|
||||
def test_provider_unsupported_context(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
pot_request.context = PoTokenContext.PLAYER
|
||||
|
||||
with pytest.raises(PoTokenProviderRejectedRequest):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_unsupported_client(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
pot_request.innertube_context['client']['clientName'] = 'ANDROID'
|
||||
|
||||
with pytest.raises(PoTokenProviderRejectedRequest):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_unsupported_proxy_scheme(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
pot_request.request_proxy = 'socks4://example.com'
|
||||
|
||||
with pytest.raises(
|
||||
PoTokenProviderRejectedRequest,
|
||||
match='External requests by "example" provider do not support proxy scheme "socks4". Supported proxy '
|
||||
'schemes: http, socks5h',
|
||||
):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_proxy = 'http://example.com'
|
||||
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_ignore_external_request_features(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = None
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_proxy = 'socks5://example.com'
|
||||
assert provider.request_pot(pot_request)
|
||||
pot_request.request_source_address = '0.0.0.0'
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_unsupported_external_request_source_address(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = tuple()
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_source_address = None
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_source_address = '0.0.0.0'
|
||||
with pytest.raises(
|
||||
PoTokenProviderRejectedRequest,
|
||||
match='External requests by "example" provider do not support setting source address',
|
||||
):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_supported_external_request_source_address(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = (
|
||||
ExternalRequestFeature.SOURCE_ADDRESS,
|
||||
)
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_source_address = None
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_source_address = '0.0.0.0'
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_unsupported_external_request_tls_verification(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = tuple()
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_verify_tls = True
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_verify_tls = False
|
||||
with pytest.raises(
|
||||
PoTokenProviderRejectedRequest,
|
||||
match='External requests by "example" provider do not support ignoring TLS certificate failures',
|
||||
):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_supported_external_request_tls_verification(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = (
|
||||
ExternalRequestFeature.DISABLE_TLS_VERIFICATION,
|
||||
)
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_verify_tls = True
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_verify_tls = False
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_request_webpage(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
cookiejar = YoutubeDLCookieJar()
|
||||
pot_request.request_headers = HTTPHeaderDict({'User-Agent': 'example-user-agent'})
|
||||
pot_request.request_proxy = 'socks5://example-proxy.com'
|
||||
pot_request.request_cookiejar = cookiejar
|
||||
|
||||
def mock_urlopen(request):
|
||||
return request
|
||||
|
||||
ie._downloader.urlopen = mock_urlopen
|
||||
|
||||
sent_request = provider._request_webpage(Request(
|
||||
'https://example.com',
|
||||
), pot_request=pot_request)
|
||||
|
||||
assert sent_request.url == 'https://example.com'
|
||||
assert sent_request.headers['User-Agent'] == 'example-user-agent'
|
||||
assert sent_request.proxies == {'all': 'socks5://example-proxy.com'}
|
||||
assert sent_request.extensions['cookiejar'] is cookiejar
|
||||
assert 'Requesting webpage' in logger.messages['info']
|
||||
|
||||
def test_provider_request_webpage_override(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
cookiejar_request = YoutubeDLCookieJar()
|
||||
pot_request.request_headers = HTTPHeaderDict({'User-Agent': 'example-user-agent'})
|
||||
pot_request.request_proxy = 'socks5://example-proxy.com'
|
||||
pot_request.request_cookiejar = cookiejar_request
|
||||
|
||||
def mock_urlopen(request):
|
||||
return request
|
||||
|
||||
ie._downloader.urlopen = mock_urlopen
|
||||
|
||||
sent_request = provider._request_webpage(Request(
|
||||
'https://example.com',
|
||||
headers={'User-Agent': 'override-user-agent-override'},
|
||||
proxies={'http': 'http://example-proxy-override.com'},
|
||||
extensions={'cookiejar': YoutubeDLCookieJar()},
|
||||
), pot_request=pot_request, note='Custom requesting webpage')
|
||||
|
||||
assert sent_request.url == 'https://example.com'
|
||||
assert sent_request.headers['User-Agent'] == 'override-user-agent-override'
|
||||
assert sent_request.proxies == {'http': 'http://example-proxy-override.com'}
|
||||
assert sent_request.extensions['cookiejar'] is not cookiejar_request
|
||||
assert 'Custom requesting webpage' in logger.messages['info']
|
||||
|
||||
def test_provider_request_webpage_no_log(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def mock_urlopen(request):
|
||||
return request
|
||||
|
||||
ie._downloader.urlopen = mock_urlopen
|
||||
|
||||
sent_request = provider._request_webpage(Request(
|
||||
'https://example.com',
|
||||
), note=False)
|
||||
|
||||
assert sent_request.url == 'https://example.com'
|
||||
assert 'info' not in logger.messages
|
||||
|
||||
def test_provider_request_webpage_no_pot_request(self, ie, logger):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def mock_urlopen(request):
|
||||
return request
|
||||
|
||||
ie._downloader.urlopen = mock_urlopen
|
||||
|
||||
sent_request = provider._request_webpage(Request(
|
||||
'https://example.com',
|
||||
), pot_request=None)
|
||||
|
||||
assert sent_request.url == 'https://example.com'
|
||||
|
||||
def test_get_config_arg(self, ie, logger):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={'abc': ['123D'], 'xyz': ['456a', '789B']})
|
||||
|
||||
assert provider._configuration_arg('abc') == ['123d']
|
||||
assert provider._configuration_arg('abc', default=['default']) == ['123d']
|
||||
assert provider._configuration_arg('ABC', default=['default']) == ['default']
|
||||
assert provider._configuration_arg('abc', casesense=True) == ['123D']
|
||||
assert provider._configuration_arg('xyz', casesense=False) == ['456a', '789b']
|
||||
|
||||
def test_require_class_end_with_suffix(self, ie, logger):
|
||||
class InvalidSuffix(PoTokenProvider):
|
||||
PROVIDER_NAME = 'invalid-suffix'
|
||||
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
raise PoTokenProviderRejectedRequest('Not implemented')
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
provider = InvalidSuffix(ie=ie, logger=logger, settings={})
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
provider.PROVIDER_KEY # noqa: B018
|
||||
|
||||
|
||||
class TestPoTokenCacheProvider:
|
||||
|
||||
def test_base_type(self):
|
||||
assert issubclass(PoTokenCacheProvider, IEContentProvider)
|
||||
|
||||
def test_create_provider_missing_get_method(self, ie, logger):
|
||||
class MissingMethodsPCP(PoTokenCacheProvider):
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_missing_store_method(self, ie, logger):
|
||||
class MissingMethodsPCP(PoTokenCacheProvider):
|
||||
def get(self, key: str):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_missing_delete_method(self, ie, logger):
|
||||
class MissingMethodsPCP(PoTokenCacheProvider):
|
||||
def get(self, key: str):
|
||||
pass
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_missing_is_available_method(self, ie, logger):
|
||||
class MissingMethodsPCP(PoTokenCacheProvider):
|
||||
def get(self, key: str):
|
||||
pass
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_barebones_provider(self, ie, logger):
|
||||
class BarebonesProviderPCP(PoTokenCacheProvider):
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def get(self, key: str):
|
||||
return 'example-cache'
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
provider = BarebonesProviderPCP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_KEY == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.0'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at (developer has not provided a bug report location) .'
|
||||
|
||||
def test_create_provider_example(self, ie, logger):
|
||||
provider = ExampleCacheProviderPCP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'example'
|
||||
assert provider.PROVIDER_KEY == 'ExampleCacheProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.1'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at https://example.com/issues .'
|
||||
assert provider.is_available()
|
||||
|
||||
def test_get_config_arg(self, ie, logger):
|
||||
provider = ExampleCacheProviderPCP(ie=ie, logger=logger, settings={'abc': ['123D'], 'xyz': ['456a', '789B']})
|
||||
assert provider._configuration_arg('abc') == ['123d']
|
||||
assert provider._configuration_arg('abc', default=['default']) == ['123d']
|
||||
assert provider._configuration_arg('ABC', default=['default']) == ['default']
|
||||
assert provider._configuration_arg('abc', casesense=True) == ['123D']
|
||||
assert provider._configuration_arg('xyz', casesense=False) == ['456a', '789b']
|
||||
|
||||
def test_require_class_end_with_suffix(self, ie, logger):
|
||||
class InvalidSuffix(PoTokenCacheProvider):
|
||||
def get(self, key: str):
|
||||
return 'example-cache'
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
provider = InvalidSuffix(ie=ie, logger=logger, settings={})
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
provider.PROVIDER_KEY # noqa: B018
|
||||
|
||||
|
||||
class TestPoTokenCacheSpecProvider:
|
||||
|
||||
def test_base_type(self):
|
||||
assert issubclass(PoTokenCacheSpecProvider, IEContentProvider)
|
||||
|
||||
def test_create_provider_missing_supports_method(self, ie, logger):
|
||||
class MissingMethodsPCS(PoTokenCacheSpecProvider):
|
||||
pass
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCS(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_barebones(self, ie, pot_request, logger):
|
||||
class BarebonesProviderPCSP(PoTokenCacheSpecProvider):
|
||||
def generate_cache_spec(self, request: PoTokenRequest):
|
||||
return PoTokenCacheSpec(
|
||||
default_ttl=100,
|
||||
key_bindings={},
|
||||
)
|
||||
|
||||
provider = BarebonesProviderPCSP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_KEY == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.0'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at (developer has not provided a bug report location) .'
|
||||
assert provider.is_available()
|
||||
assert provider.generate_cache_spec(request=pot_request).default_ttl == 100
|
||||
assert provider.generate_cache_spec(request=pot_request).key_bindings == {}
|
||||
assert provider.generate_cache_spec(request=pot_request).write_policy == CacheProviderWritePolicy.WRITE_ALL
|
||||
|
||||
def test_create_provider_example(self, ie, pot_request, logger):
|
||||
provider = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'example'
|
||||
assert provider.PROVIDER_KEY == 'ExampleCacheSpecProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.1'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at https://example.com/issues .'
|
||||
assert provider.is_available()
|
||||
assert provider.generate_cache_spec(pot_request)
|
||||
assert provider.generate_cache_spec(pot_request).key_bindings == {'field': 'example-key'}
|
||||
assert provider.generate_cache_spec(pot_request).default_ttl == 60
|
||||
assert provider.generate_cache_spec(pot_request).write_policy == CacheProviderWritePolicy.WRITE_FIRST
|
||||
|
||||
def test_get_config_arg(self, ie, logger):
|
||||
provider = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={'abc': ['123D'], 'xyz': ['456a', '789B']})
|
||||
|
||||
assert provider._configuration_arg('abc') == ['123d']
|
||||
assert provider._configuration_arg('abc', default=['default']) == ['123d']
|
||||
assert provider._configuration_arg('ABC', default=['default']) == ['default']
|
||||
assert provider._configuration_arg('abc', casesense=True) == ['123D']
|
||||
assert provider._configuration_arg('xyz', casesense=False) == ['456a', '789b']
|
||||
|
||||
def test_require_class_end_with_suffix(self, ie, logger):
|
||||
class InvalidSuffix(PoTokenCacheSpecProvider):
|
||||
def generate_cache_spec(self, request: PoTokenRequest):
|
||||
return None
|
||||
|
||||
provider = InvalidSuffix(ie=ie, logger=logger, settings={})
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
provider.PROVIDER_KEY # noqa: B018
|
||||
|
||||
|
||||
class TestPoTokenRequest:
|
||||
def test_copy_request(self, pot_request):
|
||||
copied_request = pot_request.copy()
|
||||
|
||||
assert copied_request is not pot_request
|
||||
assert copied_request.context == pot_request.context
|
||||
assert copied_request.innertube_context == pot_request.innertube_context
|
||||
assert copied_request.innertube_context is not pot_request.innertube_context
|
||||
copied_request.innertube_context['client']['clientName'] = 'ANDROID'
|
||||
assert pot_request.innertube_context['client']['clientName'] != 'ANDROID'
|
||||
assert copied_request.innertube_host == pot_request.innertube_host
|
||||
assert copied_request.session_index == pot_request.session_index
|
||||
assert copied_request.player_url == pot_request.player_url
|
||||
assert copied_request.is_authenticated == pot_request.is_authenticated
|
||||
assert copied_request.visitor_data == pot_request.visitor_data
|
||||
assert copied_request.data_sync_id == pot_request.data_sync_id
|
||||
assert copied_request.video_id == pot_request.video_id
|
||||
assert copied_request.request_cookiejar is pot_request.request_cookiejar
|
||||
assert copied_request.request_proxy == pot_request.request_proxy
|
||||
assert copied_request.request_headers == pot_request.request_headers
|
||||
assert copied_request.request_headers is not pot_request.request_headers
|
||||
assert copied_request.request_timeout == pot_request.request_timeout
|
||||
assert copied_request.request_source_address == pot_request.request_source_address
|
||||
assert copied_request.request_verify_tls == pot_request.request_verify_tls
|
||||
assert copied_request.bypass_cache == pot_request.bypass_cache
|
||||
|
||||
|
||||
def test_provider_bug_report_message(ie, logger):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at https://example.com/issues .'
|
||||
|
||||
message = provider_bug_report_message(provider)
|
||||
assert message == '; please report this issue to the provider developer at https://example.com/issues .'
|
||||
|
||||
message_before = provider_bug_report_message(provider, before='custom message!')
|
||||
assert message_before == 'custom message! Please report this issue to the provider developer at https://example.com/issues .'
|
||||
|
||||
|
||||
def test_register_provider(ie):
|
||||
|
||||
@register_provider
|
||||
class UnavailableProviderPTP(PoTokenProvider):
|
||||
def is_available(self) -> bool:
|
||||
return False
|
||||
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
raise PoTokenProviderRejectedRequest('Not implemented')
|
||||
|
||||
assert _pot_providers.value.get('UnavailableProvider') == UnavailableProviderPTP
|
||||
_pot_providers.value.pop('UnavailableProvider')
|
||||
|
||||
|
||||
def test_register_pot_preference(ie):
|
||||
before = len(_ptp_preferences.value)
|
||||
|
||||
@register_preference(ExamplePTP)
|
||||
def unavailable_preference(provider: PoTokenProvider, request: PoTokenRequest):
|
||||
return 1
|
||||
|
||||
assert len(_ptp_preferences.value) == before + 1
|
||||
|
||||
|
||||
def test_register_cache_provider(ie):
|
||||
|
||||
@cache.register_provider
|
||||
class UnavailableCacheProviderPCP(PoTokenCacheProvider):
|
||||
def is_available(self) -> bool:
|
||||
return False
|
||||
|
||||
def get(self, key: str):
|
||||
return 'example-cache'
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
assert _pot_cache_providers.value.get('UnavailableCacheProvider') == UnavailableCacheProviderPCP
|
||||
_pot_cache_providers.value.pop('UnavailableCacheProvider')
|
||||
|
||||
|
||||
def test_register_cache_provider_spec(ie):
|
||||
|
||||
@cache.register_spec
|
||||
class UnavailableCacheProviderPCSP(PoTokenCacheSpecProvider):
|
||||
def is_available(self) -> bool:
|
||||
return False
|
||||
|
||||
def generate_cache_spec(self, request: PoTokenRequest):
|
||||
return None
|
||||
|
||||
assert _pot_pcs_providers.value.get('UnavailableCacheProvider') == UnavailableCacheProviderPCSP
|
||||
_pot_pcs_providers.value.pop('UnavailableCacheProvider')
|
||||
|
||||
|
||||
def test_register_cache_provider_preference(ie):
|
||||
before = len(_pot_cache_provider_preferences.value)
|
||||
|
||||
@cache.register_preference(ExampleCacheProviderPCP)
|
||||
def unavailable_preference(provider: PoTokenCacheProvider, request: PoTokenRequest):
|
||||
return 1
|
||||
|
||||
assert len(_pot_cache_provider_preferences.value) == before + 1
|
||||
|
||||
|
||||
def test_logger_log_level(logger):
|
||||
assert logger.LogLevel('INFO') == logger.LogLevel.INFO
|
||||
assert logger.LogLevel('debuG') == logger.LogLevel.DEBUG
|
||||
assert logger.LogLevel(10) == logger.LogLevel.DEBUG
|
||||
assert logger.LogLevel('UNKNOWN') == logger.LogLevel.INFO
|
@ -0,0 +1,84 @@
|
||||
import json
|
||||
import time
|
||||
|
||||
from .common import InfoExtractor
|
||||
from ..utils import (
|
||||
determine_ext,
|
||||
float_or_none,
|
||||
jwt_decode_hs256,
|
||||
parse_iso8601,
|
||||
url_or_none,
|
||||
variadic,
|
||||
)
|
||||
from ..utils.traversal import traverse_obj
|
||||
|
||||
|
||||
class CanalsurmasIE(InfoExtractor):
|
||||
_VALID_URL = r'https?://(?:www\.)?canalsurmas\.es/videos/(?P<id>\d+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://www.canalsurmas.es/videos/44006-el-gran-queo-1-lora-del-rio-sevilla-20072014',
|
||||
'md5': '861f86fdc1221175e15523047d0087ef',
|
||||
'info_dict': {
|
||||
'id': '44006',
|
||||
'ext': 'mp4',
|
||||
'title': 'Lora del Río (Sevilla)',
|
||||
'description': 'md5:3d9ee40a9b1b26ed8259e6b71ed27b8b',
|
||||
'thumbnail': 'https://cdn2.rtva.interactvty.com/content_cards/00f3e8f67b0a4f3b90a4a14618a48b0d.jpg',
|
||||
'timestamp': 1648123182,
|
||||
'upload_date': '20220324',
|
||||
},
|
||||
}]
|
||||
_API_BASE = 'https://api-rtva.interactvty.com'
|
||||
_access_token = None
|
||||
|
||||
@staticmethod
|
||||
def _is_jwt_expired(token):
|
||||
return jwt_decode_hs256(token)['exp'] - time.time() < 300
|
||||
|
||||
def _call_api(self, endpoint, video_id, fields=None):
|
||||
if not self._access_token or self._is_jwt_expired(self._access_token):
|
||||
self._access_token = self._download_json(
|
||||
f'{self._API_BASE}/jwt/token/', None,
|
||||
'Downloading access token', 'Failed to download access token',
|
||||
headers={'Content-Type': 'application/json'},
|
||||
data=json.dumps({
|
||||
'username': 'canalsur_demo',
|
||||
'password': 'dsUBXUcI',
|
||||
}).encode())['access']
|
||||
|
||||
return self._download_json(
|
||||
f'{self._API_BASE}/api/2.0/contents/{endpoint}/{video_id}/', video_id,
|
||||
f'Downloading {endpoint} API JSON', f'Failed to download {endpoint} API JSON',
|
||||
headers={'Authorization': f'jwtok {self._access_token}'},
|
||||
query={'optional_fields': ','.join(variadic(fields))} if fields else None)
|
||||
|
||||
def _real_extract(self, url):
|
||||
video_id = self._match_id(url)
|
||||
video_info = self._call_api('content', video_id, fields=[
|
||||
'description', 'image', 'duration', 'created_at', 'tags',
|
||||
])
|
||||
stream_info = self._call_api('content_resources', video_id, 'media_url')
|
||||
|
||||
formats, subtitles = [], {}
|
||||
for stream_url in traverse_obj(stream_info, ('results', ..., 'media_url', {url_or_none})):
|
||||
if determine_ext(stream_url) == 'm3u8':
|
||||
fmts, subs = self._extract_m3u8_formats_and_subtitles(
|
||||
stream_url, video_id, m3u8_id='hls', fatal=False)
|
||||
formats.extend(fmts)
|
||||
self._merge_subtitles(subs, target=subtitles)
|
||||
else:
|
||||
formats.append({'url': stream_url})
|
||||
|
||||
return {
|
||||
'id': video_id,
|
||||
'formats': formats,
|
||||
'subtitles': subtitles,
|
||||
**traverse_obj(video_info, {
|
||||
'title': ('name', {str.strip}),
|
||||
'description': ('description', {str}),
|
||||
'thumbnail': ('image', {url_or_none}),
|
||||
'duration': ('duration', {float_or_none}),
|
||||
'timestamp': ('created_at', {parse_iso8601}),
|
||||
'tags': ('tags', ..., {str}),
|
||||
}),
|
||||
}
|
@ -1,59 +0,0 @@
|
||||
from .turner import TurnerBaseIE
|
||||
from ..utils import int_or_none
|
||||
|
||||
|
||||
class CartoonNetworkIE(TurnerBaseIE):
|
||||
_VALID_URL = r'https?://(?:www\.)?cartoonnetwork\.com/video/(?:[^/]+/)+(?P<id>[^/?#]+)-(?:clip|episode)\.html'
|
||||
_TEST = {
|
||||
'url': 'https://www.cartoonnetwork.com/video/ben-10/how-to-draw-upgrade-episode.html',
|
||||
'info_dict': {
|
||||
'id': '6e3375097f63874ebccec7ef677c1c3845fa850e',
|
||||
'ext': 'mp4',
|
||||
'title': 'How to Draw Upgrade',
|
||||
'description': 'md5:2061d83776db7e8be4879684eefe8c0f',
|
||||
},
|
||||
'params': {
|
||||
# m3u8 download
|
||||
'skip_download': True,
|
||||
},
|
||||
}
|
||||
|
||||
def _real_extract(self, url):
|
||||
display_id = self._match_id(url)
|
||||
webpage = self._download_webpage(url, display_id)
|
||||
|
||||
def find_field(global_re, name, content_re=None, value_re='[^"]+', fatal=False):
|
||||
metadata_re = ''
|
||||
if content_re:
|
||||
metadata_re = r'|video_metadata\.content_' + content_re
|
||||
return self._search_regex(
|
||||
rf'(?:_cnglobal\.currentVideo\.{global_re}{metadata_re})\s*=\s*"({value_re})";',
|
||||
webpage, name, fatal=fatal)
|
||||
|
||||
media_id = find_field('mediaId', 'media id', 'id', '[0-9a-f]{40}', True)
|
||||
title = find_field('episodeTitle', 'title', '(?:episodeName|name)', fatal=True)
|
||||
|
||||
info = self._extract_ngtv_info(
|
||||
media_id, {'networkId': 'cartoonnetwork'}, {
|
||||
'url': url,
|
||||
'site_name': 'CartoonNetwork',
|
||||
'auth_required': find_field('authType', 'auth type') != 'unauth',
|
||||
})
|
||||
|
||||
series = find_field(
|
||||
'propertyName', 'series', 'showName') or self._html_search_meta('partOfSeries', webpage)
|
||||
info.update({
|
||||
'id': media_id,
|
||||
'display_id': display_id,
|
||||
'title': title,
|
||||
'description': self._html_search_meta('description', webpage),
|
||||
'series': series,
|
||||
'episode': title,
|
||||
})
|
||||
|
||||
for field in ('season', 'episode'):
|
||||
field_name = field + 'Number'
|
||||
info[field + '_number'] = int_or_none(find_field(
|
||||
field_name, field + ' number', value_re=r'\d+') or self._html_search_meta(field_name, webpage))
|
||||
|
||||
return info
|
@ -1,142 +0,0 @@
|
||||
import json
|
||||
|
||||
from .common import InfoExtractor
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
int_or_none,
|
||||
orderedSet,
|
||||
)
|
||||
|
||||
|
||||
class DeezerBaseInfoExtractor(InfoExtractor):
|
||||
def get_data(self, url):
|
||||
if not self.get_param('test'):
|
||||
self.report_warning('For now, this extractor only supports the 30 second previews. Patches welcome!')
|
||||
|
||||
mobj = self._match_valid_url(url)
|
||||
data_id = mobj.group('id')
|
||||
|
||||
webpage = self._download_webpage(url, data_id)
|
||||
geoblocking_msg = self._html_search_regex(
|
||||
r'<p class="soon-txt">(.*?)</p>', webpage, 'geoblocking message',
|
||||
default=None)
|
||||
if geoblocking_msg is not None:
|
||||
raise ExtractorError(
|
||||
f'Deezer said: {geoblocking_msg}', expected=True)
|
||||
|
||||
data_json = self._search_regex(
|
||||
(r'__DZR_APP_STATE__\s*=\s*({.+?})\s*</script>',
|
||||
r'naboo\.display\(\'[^\']+\',\s*(.*?)\);\n'),
|
||||
webpage, 'data JSON')
|
||||
data = json.loads(data_json)
|
||||
return data_id, webpage, data
|
||||
|
||||
|
||||
class DeezerPlaylistIE(DeezerBaseInfoExtractor):
|
||||
_VALID_URL = r'https?://(?:www\.)?deezer\.com/(../)?playlist/(?P<id>[0-9]+)'
|
||||
_TEST = {
|
||||
'url': 'http://www.deezer.com/playlist/176747451',
|
||||
'info_dict': {
|
||||
'id': '176747451',
|
||||
'title': 'Best!',
|
||||
'uploader': 'anonymous',
|
||||
'thumbnail': r're:^https?://(e-)?cdns-images\.dzcdn\.net/images/cover/.*\.jpg$',
|
||||
},
|
||||
'playlist_count': 29,
|
||||
}
|
||||
|
||||
def _real_extract(self, url):
|
||||
playlist_id, webpage, data = self.get_data(url)
|
||||
|
||||
playlist_title = data.get('DATA', {}).get('TITLE')
|
||||
playlist_uploader = data.get('DATA', {}).get('PARENT_USERNAME')
|
||||
playlist_thumbnail = self._search_regex(
|
||||
r'<img id="naboo_playlist_image".*?src="([^"]+)"', webpage,
|
||||
'playlist thumbnail')
|
||||
|
||||
entries = []
|
||||
for s in data.get('SONGS', {}).get('data'):
|
||||
formats = [{
|
||||
'format_id': 'preview',
|
||||
'url': s.get('MEDIA', [{}])[0].get('HREF'),
|
||||
'preference': -100, # Only the first 30 seconds
|
||||
'ext': 'mp3',
|
||||
}]
|
||||
artists = ', '.join(
|
||||
orderedSet(a.get('ART_NAME') for a in s.get('ARTISTS')))
|
||||
entries.append({
|
||||
'id': s.get('SNG_ID'),
|
||||
'duration': int_or_none(s.get('DURATION')),
|
||||
'title': '{} - {}'.format(artists, s.get('SNG_TITLE')),
|
||||
'uploader': s.get('ART_NAME'),
|
||||
'uploader_id': s.get('ART_ID'),
|
||||
'age_limit': 16 if s.get('EXPLICIT_LYRICS') == '1' else 0,
|
||||
'formats': formats,
|
||||
})
|
||||
|
||||
return {
|
||||
'_type': 'playlist',
|
||||
'id': playlist_id,
|
||||
'title': playlist_title,
|
||||
'uploader': playlist_uploader,
|
||||
'thumbnail': playlist_thumbnail,
|
||||
'entries': entries,
|
||||
}
|
||||
|
||||
|
||||
class DeezerAlbumIE(DeezerBaseInfoExtractor):
|
||||
_VALID_URL = r'https?://(?:www\.)?deezer\.com/(../)?album/(?P<id>[0-9]+)'
|
||||
_TEST = {
|
||||
'url': 'https://www.deezer.com/fr/album/67505622',
|
||||
'info_dict': {
|
||||
'id': '67505622',
|
||||
'title': 'Last Week',
|
||||
'uploader': 'Home Brew',
|
||||
'thumbnail': r're:^https?://(e-)?cdns-images\.dzcdn\.net/images/cover/.*\.jpg$',
|
||||
},
|
||||
'playlist_count': 7,
|
||||
}
|
||||
|
||||
def _real_extract(self, url):
|
||||
album_id, webpage, data = self.get_data(url)
|
||||
|
||||
album_title = data.get('DATA', {}).get('ALB_TITLE')
|
||||
album_uploader = data.get('DATA', {}).get('ART_NAME')
|
||||
album_thumbnail = self._search_regex(
|
||||
r'<img id="naboo_album_image".*?src="([^"]+)"', webpage,
|
||||
'album thumbnail')
|
||||
|
||||
entries = []
|
||||
for s in data.get('SONGS', {}).get('data'):
|
||||
formats = [{
|
||||
'format_id': 'preview',
|
||||
'url': s.get('MEDIA', [{}])[0].get('HREF'),
|
||||
'preference': -100, # Only the first 30 seconds
|
||||
'ext': 'mp3',
|
||||
}]
|
||||
artists = ', '.join(
|
||||
orderedSet(a.get('ART_NAME') for a in s.get('ARTISTS')))
|
||||
entries.append({
|
||||
'id': s.get('SNG_ID'),
|
||||
'duration': int_or_none(s.get('DURATION')),
|
||||
'title': '{} - {}'.format(artists, s.get('SNG_TITLE')),
|
||||
'uploader': s.get('ART_NAME'),
|
||||
'uploader_id': s.get('ART_ID'),
|
||||
'age_limit': 16 if s.get('EXPLICIT_LYRICS') == '1' else 0,
|
||||
'formats': formats,
|
||||
'track': s.get('SNG_TITLE'),
|
||||
'track_number': int_or_none(s.get('TRACK_NUMBER')),
|
||||
'track_id': s.get('SNG_ID'),
|
||||
'artist': album_uploader,
|
||||
'album': album_title,
|
||||
'album_artist': album_uploader,
|
||||
})
|
||||
|
||||
return {
|
||||
'_type': 'playlist',
|
||||
'id': album_id,
|
||||
'title': album_title,
|
||||
'uploader': album_uploader,
|
||||
'thumbnail': album_thumbnail,
|
||||
'entries': entries,
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
import urllib.parse
|
||||
|
||||
from .common import InfoExtractor
|
||||
from ..networking.exceptions import HTTPError
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
float_or_none,
|
||||
url_or_none,
|
||||
)
|
||||
from ..utils.traversal import traverse_obj
|
||||
|
||||
|
||||
class FrancaisFacileIE(InfoExtractor):
|
||||
_VALID_URL = r'https?://francaisfacile\.rfi\.fr/[a-z]{2}/(?:actualit%C3%A9|podcasts/[^/#?]+)/(?P<id>[^/#?]+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://francaisfacile.rfi.fr/fr/actualit%C3%A9/20250305-r%C3%A9concilier-les-jeunes-avec-la-lecture-gr%C3%A2ce-aux-r%C3%A9seaux-sociaux',
|
||||
'md5': '4f33674cb205744345cc835991100afa',
|
||||
'info_dict': {
|
||||
'id': 'WBMZ58952-FLE-FR-20250305',
|
||||
'display_id': '20250305-réconcilier-les-jeunes-avec-la-lecture-grâce-aux-réseaux-sociaux',
|
||||
'title': 'Réconcilier les jeunes avec la lecture grâce aux réseaux sociaux',
|
||||
'url': 'https://aod-fle.akamaized.net/fle/sounds/fr/2025/03/05/6b6af52a-f9ba-11ef-a1f8-005056a97652.mp3',
|
||||
'ext': 'mp3',
|
||||
'description': 'md5:b903c63d8585bd59e8cc4d5f80c4272d',
|
||||
'duration': 103.15,
|
||||
'timestamp': 1741177984,
|
||||
'upload_date': '20250305',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://francaisfacile.rfi.fr/fr/actualit%C3%A9/20250307-argentine-le-sac-d-un-alpiniste-retrouv%C3%A9-40-ans-apr%C3%A8s-sa-mort',
|
||||
'md5': 'b8c3a63652d4ae8e8092dda5700c1cd9',
|
||||
'info_dict': {
|
||||
'id': 'WBMZ59102-FLE-FR-20250307',
|
||||
'display_id': '20250307-argentine-le-sac-d-un-alpiniste-retrouvé-40-ans-après-sa-mort',
|
||||
'title': 'Argentine: le sac d\'un alpiniste retrouvé 40 ans après sa mort',
|
||||
'url': 'https://aod-fle.akamaized.net/fle/sounds/fr/2025/03/07/8edf4082-fb46-11ef-8a37-005056bf762b.mp3',
|
||||
'ext': 'mp3',
|
||||
'description': 'md5:7fd088fbdf4a943bb68cf82462160dca',
|
||||
'duration': 117.74,
|
||||
'timestamp': 1741352789,
|
||||
'upload_date': '20250307',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://francaisfacile.rfi.fr/fr/podcasts/un-mot-une-histoire/20250317-le-mot-de-david-foenkinos-peut-%C3%AAtre',
|
||||
'md5': 'db83c2cc2589b4c24571c6b6cf14f5f1',
|
||||
'info_dict': {
|
||||
'id': 'WBMZ59441-FLE-FR-20250317',
|
||||
'display_id': '20250317-le-mot-de-david-foenkinos-peut-être',
|
||||
'title': 'Le mot de David Foenkinos: «peut-être» - Un mot, une histoire',
|
||||
'url': 'https://aod-fle.akamaized.net/fle/sounds/fr/2025/03/17/4ca6cbbe-0315-11f0-a85b-005056a97652.mp3',
|
||||
'ext': 'mp3',
|
||||
'description': 'md5:3fe35fae035803df696bfa7af2496e49',
|
||||
'duration': 198.96,
|
||||
'timestamp': 1742210897,
|
||||
'upload_date': '20250317',
|
||||
},
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
display_id = urllib.parse.unquote(self._match_id(url))
|
||||
|
||||
try: # yt-dlp's default user-agents are too old and blocked by the site
|
||||
webpage = self._download_webpage(url, display_id, headers={
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:136.0) Gecko/20100101 Firefox/136.0',
|
||||
})
|
||||
except ExtractorError as e:
|
||||
if not isinstance(e.cause, HTTPError) or e.cause.status != 403:
|
||||
raise
|
||||
# Retry with impersonation if hardcoded UA is insufficient
|
||||
webpage = self._download_webpage(url, display_id, impersonate=True)
|
||||
|
||||
data = self._search_json(
|
||||
r'<script[^>]+\bdata-media-id=[^>]+\btype="application/json"[^>]*>',
|
||||
webpage, 'audio data', display_id)
|
||||
|
||||
return {
|
||||
'id': data['mediaId'],
|
||||
'display_id': display_id,
|
||||
'vcodec': 'none',
|
||||
'title': self._html_extract_title(webpage),
|
||||
**self._search_json_ld(webpage, display_id, fatal=False),
|
||||
**traverse_obj(data, {
|
||||
'title': ('title', {str}),
|
||||
'url': ('sources', ..., 'url', {url_or_none}, any),
|
||||
'duration': ('sources', ..., 'duration', {float_or_none}, any),
|
||||
}),
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
from .common import InfoExtractor
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
urlencode_postdata,
|
||||
)
|
||||
|
||||
|
||||
class GigyaBaseIE(InfoExtractor):
|
||||
def _gigya_login(self, auth_data):
|
||||
auth_info = self._download_json(
|
||||
'https://accounts.eu1.gigya.com/accounts.login', None,
|
||||
note='Logging in', errnote='Unable to log in',
|
||||
data=urlencode_postdata(auth_data))
|
||||
|
||||
error_message = auth_info.get('errorDetails') or auth_info.get('errorMessage')
|
||||
if error_message:
|
||||
raise ExtractorError(
|
||||
f'Unable to login: {error_message}', expected=True)
|
||||
return auth_info
|
@ -0,0 +1,78 @@
|
||||
from .common import InfoExtractor
|
||||
from ..utils import int_or_none, parse_iso8601, url_or_none, urljoin
|
||||
from ..utils.traversal import traverse_obj
|
||||
|
||||
|
||||
class IvooxIE(InfoExtractor):
|
||||
_VALID_URL = (
|
||||
r'https?://(?:www\.)?ivoox\.com/(?:\w{2}/)?[^/?#]+_rf_(?P<id>[0-9]+)_1\.html',
|
||||
r'https?://go\.ivoox\.com/rf/(?P<id>[0-9]+)',
|
||||
)
|
||||
_TESTS = [{
|
||||
'url': 'https://www.ivoox.com/dex-08x30-rostros-del-mal-los-asesinos-en-audios-mp3_rf_143594959_1.html',
|
||||
'md5': '993f712de5b7d552459fc66aa3726885',
|
||||
'info_dict': {
|
||||
'id': '143594959',
|
||||
'ext': 'mp3',
|
||||
'timestamp': 1742731200,
|
||||
'channel': 'DIAS EXTRAÑOS con Santiago Camacho',
|
||||
'title': 'DEx 08x30 Rostros del mal: Los asesinos en serie que aterrorizaron España',
|
||||
'description': 'md5:eae8b4b9740d0216d3871390b056bb08',
|
||||
'uploader': 'Santiago Camacho',
|
||||
'thumbnail': 'https://static-1.ivoox.com/audios/c/d/5/2/cd52f46783fe735000c33a803dce2554_XXL.jpg',
|
||||
'upload_date': '20250323',
|
||||
'episode': 'DEx 08x30 Rostros del mal: Los asesinos en serie que aterrorizaron España',
|
||||
'duration': 11837,
|
||||
'tags': ['españa', 'asesinos en serie', 'arropiero', 'historia criminal', 'mataviejas'],
|
||||
},
|
||||
}, {
|
||||
'url': 'https://go.ivoox.com/rf/143594959',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'https://www.ivoox.com/en/campodelgas-28-03-2025-audios-mp3_rf_144036942_1.html',
|
||||
'only_matching': True,
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
media_id = self._match_id(url)
|
||||
webpage = self._download_webpage(url, media_id, fatal=False)
|
||||
|
||||
data = self._search_nuxt_data(
|
||||
webpage, media_id, fatal=False, traverse=('data', 0, 'data', 'audio'))
|
||||
|
||||
direct_download = self._download_json(
|
||||
f'https://vcore-web.ivoox.com/v1/public/audios/{media_id}/download-url', media_id, fatal=False,
|
||||
note='Fetching direct download link', headers={'Referer': url})
|
||||
|
||||
download_paths = {
|
||||
*traverse_obj(direct_download, ('data', 'downloadUrl', {str}, filter, all)),
|
||||
*traverse_obj(data, (('downloadUrl', 'mediaUrl'), {str}, filter)),
|
||||
}
|
||||
|
||||
formats = []
|
||||
for path in download_paths:
|
||||
formats.append({
|
||||
'url': urljoin('https://ivoox.com', path),
|
||||
'http_headers': {'Referer': url},
|
||||
})
|
||||
|
||||
return {
|
||||
'id': media_id,
|
||||
'formats': formats,
|
||||
'uploader': self._html_search_regex(r'data-prm-author="([^"]+)"', webpage, 'author', default=None),
|
||||
'timestamp': parse_iso8601(
|
||||
self._html_search_regex(r'data-prm-pubdate="([^"]+)"', webpage, 'timestamp', default=None)),
|
||||
'channel': self._html_search_regex(r'data-prm-podname="([^"]+)"', webpage, 'channel', default=None),
|
||||
'title': self._html_search_regex(r'data-prm-title="([^"]+)"', webpage, 'title', default=None),
|
||||
'thumbnail': self._og_search_thumbnail(webpage, default=None),
|
||||
'description': self._og_search_description(webpage, default=None),
|
||||
**self._search_json_ld(webpage, media_id, default={}),
|
||||
**traverse_obj(data, {
|
||||
'title': ('title', {str}),
|
||||
'description': ('description', {str}),
|
||||
'thumbnail': ('image', {url_or_none}),
|
||||
'timestamp': ('uploadDate', {parse_iso8601(delimiter=' ')}),
|
||||
'duration': ('duration', {int_or_none}),
|
||||
'tags': ('tags', ..., 'name', {str}),
|
||||
}),
|
||||
}
|
@ -0,0 +1,159 @@
|
||||
import json
|
||||
import random
|
||||
import time
|
||||
|
||||
from .common import InfoExtractor
|
||||
from ..utils import int_or_none, jwt_decode_hs256, try_call, url_or_none
|
||||
from ..utils.traversal import require, traverse_obj
|
||||
|
||||
|
||||
class LocoIE(InfoExtractor):
|
||||
_VALID_URL = r'https?://(?:www\.)?loco\.com/(?P<type>streamers|stream)/(?P<id>[^/?#]+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://loco.com/streamers/teuzinfps',
|
||||
'info_dict': {
|
||||
'id': 'teuzinfps',
|
||||
'ext': 'mp4',
|
||||
'title': r're:MS BOLADAO, RESENHA & GAMEPLAY ALTO NIVEL',
|
||||
'description': 'bom e novo',
|
||||
'uploader_id': 'RLUVE3S9JU',
|
||||
'channel': 'teuzinfps',
|
||||
'channel_follower_count': int,
|
||||
'comment_count': int,
|
||||
'view_count': int,
|
||||
'concurrent_view_count': int,
|
||||
'like_count': int,
|
||||
'thumbnail': 'https://static.ivory.getloconow.com/default_thumb/743701a9-98ca-41ae-9a8b-70bd5da070ad.jpg',
|
||||
'tags': ['MMORPG', 'Gameplay'],
|
||||
'series': 'Tibia',
|
||||
'timestamp': int,
|
||||
'modified_timestamp': int,
|
||||
'live_status': 'is_live',
|
||||
'upload_date': str,
|
||||
'modified_date': str,
|
||||
},
|
||||
'params': {
|
||||
'skip_download': 'Livestream',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://loco.com/stream/c64916eb-10fb-46a9-9a19-8c4b7ed064e7',
|
||||
'md5': '45ebc8a47ee1c2240178757caf8881b5',
|
||||
'info_dict': {
|
||||
'id': 'c64916eb-10fb-46a9-9a19-8c4b7ed064e7',
|
||||
'ext': 'mp4',
|
||||
'title': 'PAULINHO LOKO NA LOCO!',
|
||||
'description': 'live on na loco',
|
||||
'uploader_id': '2MDO7Z1DPM',
|
||||
'channel': 'paulinholokobr',
|
||||
'channel_follower_count': int,
|
||||
'comment_count': int,
|
||||
'view_count': int,
|
||||
'concurrent_view_count': int,
|
||||
'like_count': int,
|
||||
'duration': 14491,
|
||||
'thumbnail': 'https://static.ivory.getloconow.com/default_thumb/59b5970b-23c1-4518-9e96-17ce341299fe.jpg',
|
||||
'tags': ['Gameplay'],
|
||||
'series': 'GTA 5',
|
||||
'timestamp': 1740612872,
|
||||
'modified_timestamp': 1740613037,
|
||||
'upload_date': '20250226',
|
||||
'modified_date': '20250226',
|
||||
},
|
||||
}, {
|
||||
# Requires video authorization
|
||||
'url': 'https://loco.com/stream/ac854641-ae0f-497c-a8ea-4195f6d8cc53',
|
||||
'md5': '0513edf85c1e65c9521f555f665387d5',
|
||||
'info_dict': {
|
||||
'id': 'ac854641-ae0f-497c-a8ea-4195f6d8cc53',
|
||||
'ext': 'mp4',
|
||||
'title': 'DUAS CONTAS DESAFIANTE, RUSH TOP 1 NO BRASIL!',
|
||||
'description': 'md5:aa77818edd6fe00dd4b6be75cba5f826',
|
||||
'uploader_id': '7Y9JNAZC3Q',
|
||||
'channel': 'ayellol',
|
||||
'channel_follower_count': int,
|
||||
'comment_count': int,
|
||||
'view_count': int,
|
||||
'concurrent_view_count': int,
|
||||
'like_count': int,
|
||||
'duration': 1229,
|
||||
'thumbnail': 'https://static.ivory.getloconow.com/default_thumb/f5aa678b-6d04-45d9-a89a-859af0a8028f.jpg',
|
||||
'tags': ['Gameplay', 'Carry'],
|
||||
'series': 'League of Legends',
|
||||
'timestamp': 1741182253,
|
||||
'upload_date': '20250305',
|
||||
'modified_timestamp': 1741182419,
|
||||
'modified_date': '20250305',
|
||||
},
|
||||
}]
|
||||
|
||||
# From _app.js
|
||||
_CLIENT_ID = 'TlwKp1zmF6eKFpcisn3FyR18WkhcPkZtzwPVEEC3'
|
||||
_CLIENT_SECRET = 'Kp7tYlUN7LXvtcSpwYvIitgYcLparbtsQSe5AdyyCdiEJBP53Vt9J8eB4AsLdChIpcO2BM19RA3HsGtqDJFjWmwoonvMSG3ZQmnS8x1YIM8yl82xMXZGbE3NKiqmgBVU'
|
||||
|
||||
def _is_jwt_expired(self, token):
|
||||
return jwt_decode_hs256(token)['exp'] - time.time() < 300
|
||||
|
||||
def _get_access_token(self, video_id):
|
||||
access_token = try_call(lambda: self._get_cookies('https://loco.com')['access_token'].value)
|
||||
if access_token and not self._is_jwt_expired(access_token):
|
||||
return access_token
|
||||
access_token = traverse_obj(self._download_json(
|
||||
'https://api.getloconow.com/v3/user/device_profile/', video_id,
|
||||
'Downloading access token', fatal=False, data=json.dumps({
|
||||
'platform': 7,
|
||||
'client_id': self._CLIENT_ID,
|
||||
'client_secret': self._CLIENT_SECRET,
|
||||
'model': 'Mozilla',
|
||||
'os_name': 'Win32',
|
||||
'os_ver': '5.0 (Windows)',
|
||||
'app_ver': '5.0 (Windows)',
|
||||
}).encode(), headers={
|
||||
'Content-Type': 'application/json;charset=utf-8',
|
||||
'DEVICE-ID': ''.join(random.choices('0123456789abcdef', k=32)) + 'live',
|
||||
'X-APP-LANG': 'en',
|
||||
'X-APP-LOCALE': 'en-US',
|
||||
'X-CLIENT-ID': self._CLIENT_ID,
|
||||
'X-CLIENT-SECRET': self._CLIENT_SECRET,
|
||||
'X-PLATFORM': '7',
|
||||
}), 'access_token')
|
||||
if access_token and not self._is_jwt_expired(access_token):
|
||||
self._set_cookie('.loco.com', 'access_token', access_token)
|
||||
return access_token
|
||||
|
||||
def _real_extract(self, url):
|
||||
video_type, video_id = self._match_valid_url(url).group('type', 'id')
|
||||
webpage = self._download_webpage(url, video_id)
|
||||
stream = traverse_obj(self._search_nextjs_data(webpage, video_id), (
|
||||
'props', 'pageProps', ('liveStreamData', 'stream', 'liveStream'), {dict}, any, {require('stream info')}))
|
||||
|
||||
if access_token := self._get_access_token(video_id):
|
||||
self._request_webpage(
|
||||
'https://drm.loco.com/v1/streams/playback/', video_id,
|
||||
'Downloading video authorization', fatal=False, headers={
|
||||
'authorization': access_token,
|
||||
}, query={
|
||||
'stream_uid': stream['uid'],
|
||||
})
|
||||
|
||||
return {
|
||||
'formats': self._extract_m3u8_formats(stream['conf']['hls'], video_id),
|
||||
'id': video_id,
|
||||
'is_live': video_type == 'streamers',
|
||||
**traverse_obj(stream, {
|
||||
'title': ('title', {str}),
|
||||
'series': ('game_name', {str}),
|
||||
'uploader_id': ('user_uid', {str}),
|
||||
'channel': ('alias', {str}),
|
||||
'description': ('description', {str}),
|
||||
'concurrent_view_count': ('viewersCurrent', {int_or_none}),
|
||||
'view_count': ('total_views', {int_or_none}),
|
||||
'thumbnail': ('thumbnail_url_small', {url_or_none}),
|
||||
'like_count': ('likes', {int_or_none}),
|
||||
'tags': ('tags', ..., {str}),
|
||||
'timestamp': ('started_at', {int_or_none(scale=1000)}),
|
||||
'modified_timestamp': ('updated_at', {int_or_none(scale=1000)}),
|
||||
'comment_count': ('comments_count', {int_or_none}),
|
||||
'channel_follower_count': ('followers_count', {int_or_none}),
|
||||
'duration': ('duration', {int_or_none}),
|
||||
}),
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
import re
|
||||
|
||||
from .common import InfoExtractor
|
||||
|
||||
|
||||
class OnceIE(InfoExtractor): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor
|
||||
_VALID_URL = r'https?://.+?\.unicornmedia\.com/now/(?:ads/vmap/)?[^/]+/[^/]+/(?P<domain_id>[^/]+)/(?P<application_id>[^/]+)/(?:[^/]+/)?(?P<media_item_id>[^/]+)/content\.(?:once|m3u8|mp4)'
|
||||
ADAPTIVE_URL_TEMPLATE = 'http://once.unicornmedia.com/now/master/playlist/%s/%s/%s/content.m3u8'
|
||||
PROGRESSIVE_URL_TEMPLATE = 'http://once.unicornmedia.com/now/media/progressive/%s/%s/%s/%s/content.mp4'
|
||||
|
||||
def _extract_once_formats(self, url, http_formats_preference=None):
|
||||
domain_id, application_id, media_item_id = re.match(
|
||||
OnceIE._VALID_URL, url).groups()
|
||||
formats = self._extract_m3u8_formats(
|
||||
self.ADAPTIVE_URL_TEMPLATE % (
|
||||
domain_id, application_id, media_item_id),
|
||||
media_item_id, 'mp4', m3u8_id='hls', fatal=False)
|
||||
progressive_formats = []
|
||||
for adaptive_format in formats:
|
||||
# Prevent advertisement from embedding into m3u8 playlist (see
|
||||
# https://github.com/ytdl-org/youtube-dl/issues/8893#issuecomment-199912684)
|
||||
adaptive_format['url'] = re.sub(
|
||||
r'\badsegmentlength=\d+', r'adsegmentlength=0', adaptive_format['url'])
|
||||
rendition_id = self._search_regex(
|
||||
r'/now/media/playlist/[^/]+/[^/]+/([^/]+)',
|
||||
adaptive_format['url'], 'redition id', default=None)
|
||||
if rendition_id:
|
||||
progressive_format = adaptive_format.copy()
|
||||
progressive_format.update({
|
||||
'url': self.PROGRESSIVE_URL_TEMPLATE % (
|
||||
domain_id, application_id, rendition_id, media_item_id),
|
||||
'format_id': adaptive_format['format_id'].replace(
|
||||
'hls', 'http'),
|
||||
'protocol': 'http',
|
||||
'preference': http_formats_preference,
|
||||
})
|
||||
progressive_formats.append(progressive_format)
|
||||
self._check_formats(progressive_formats, media_item_id)
|
||||
formats.extend(progressive_formats)
|
||||
return formats
|
@ -0,0 +1,101 @@
|
||||
from .common import InfoExtractor
|
||||
from ..utils import UserNotLive, int_or_none, parse_iso8601, url_or_none, urljoin
|
||||
from ..utils.traversal import traverse_obj
|
||||
|
||||
|
||||
class PartiBaseIE(InfoExtractor):
|
||||
def _call_api(self, path, video_id, note=None):
|
||||
return self._download_json(
|
||||
f'https://api-backend.parti.com/parti_v2/profile/{path}', video_id, note)
|
||||
|
||||
|
||||
class PartiVideoIE(PartiBaseIE):
|
||||
IE_NAME = 'parti:video'
|
||||
_VALID_URL = r'https?://(?:www\.)?parti\.com/video/(?P<id>\d+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://parti.com/video/66284',
|
||||
'info_dict': {
|
||||
'id': '66284',
|
||||
'ext': 'mp4',
|
||||
'title': 'NOW LIVE ',
|
||||
'upload_date': '20250327',
|
||||
'categories': ['Gaming'],
|
||||
'thumbnail': 'https://assets.parti.com/351424_eb9e5250-2821-484a-9c5f-ca99aa666c87.png',
|
||||
'channel': 'ItZTMGG',
|
||||
'timestamp': 1743044379,
|
||||
},
|
||||
'params': {'skip_download': 'm3u8'},
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
video_id = self._match_id(url)
|
||||
data = self._call_api(f'get_livestream_channel_info/recent/{video_id}', video_id)
|
||||
|
||||
return {
|
||||
'id': video_id,
|
||||
'formats': self._extract_m3u8_formats(
|
||||
urljoin('https://watch.parti.com', data['livestream_recording']), video_id, 'mp4'),
|
||||
**traverse_obj(data, {
|
||||
'title': ('event_title', {str}),
|
||||
'channel': ('user_name', {str}),
|
||||
'thumbnail': ('event_file', {url_or_none}),
|
||||
'categories': ('category_name', {str}, filter, all),
|
||||
'timestamp': ('event_start_ts', {int_or_none}),
|
||||
}),
|
||||
}
|
||||
|
||||
|
||||
class PartiLivestreamIE(PartiBaseIE):
|
||||
IE_NAME = 'parti:livestream'
|
||||
_VALID_URL = r'https?://(?:www\.)?parti\.com/creator/(?P<service>[\w]+)/(?P<id>[\w/-]+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://parti.com/creator/parti/Capt_Robs_Adventures',
|
||||
'info_dict': {
|
||||
'id': 'Capt_Robs_Adventures',
|
||||
'ext': 'mp4',
|
||||
'title': r"re:I'm Live on Parti \d{4}-\d{2}-\d{2} \d{2}:\d{2}",
|
||||
'view_count': int,
|
||||
'thumbnail': r're:https://assets\.parti\.com/.+\.png',
|
||||
'timestamp': 1743879776,
|
||||
'upload_date': '20250405',
|
||||
'live_status': 'is_live',
|
||||
},
|
||||
'params': {'skip_download': 'm3u8'},
|
||||
}, {
|
||||
'url': 'https://parti.com/creator/discord/sazboxgaming/0',
|
||||
'only_matching': True,
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
service, creator_slug = self._match_valid_url(url).group('service', 'id')
|
||||
|
||||
encoded_creator_slug = creator_slug.replace('/', '%23')
|
||||
creator_id = self._call_api(
|
||||
f'get_user_by_social_media/{service}/{encoded_creator_slug}',
|
||||
creator_slug, note='Fetching user ID')
|
||||
|
||||
data = self._call_api(
|
||||
f'get_livestream_channel_info/{creator_id}', creator_id,
|
||||
note='Fetching user profile feed')['channel_info']
|
||||
|
||||
if not traverse_obj(data, ('channel', 'is_live', {bool})):
|
||||
raise UserNotLive(video_id=creator_id)
|
||||
|
||||
channel_info = data['channel']
|
||||
|
||||
return {
|
||||
'id': creator_slug,
|
||||
'formats': self._extract_m3u8_formats(
|
||||
channel_info['playback_url'], creator_slug, live=True, query={
|
||||
'token': channel_info['playback_auth_token'],
|
||||
'player_version': '1.17.0',
|
||||
}),
|
||||
'is_live': True,
|
||||
**traverse_obj(data, {
|
||||
'title': ('livestream_event_info', 'event_name', {str}),
|
||||
'description': ('livestream_event_info', 'event_description', {str}),
|
||||
'thumbnail': ('livestream_event_info', 'livestream_preview_file', {url_or_none}),
|
||||
'timestamp': ('stream', 'start_time', {parse_iso8601}),
|
||||
'view_count': ('stream', 'viewer_count', {int_or_none}),
|
||||
}),
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
from .common import InfoExtractor
|
||||
from ..utils.traversal import traverse_obj
|
||||
|
||||
|
||||
class RoyaLiveIE(InfoExtractor):
|
||||
_VALID_URL = r'https?://roya\.tv/live-stream/(?P<id>\d+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://roya.tv/live-stream/1',
|
||||
'info_dict': {
|
||||
'id': '1',
|
||||
'title': r're:Roya TV \d{4}-\d{2}-\d{2} \d{2}:\d{2}',
|
||||
'ext': 'mp4',
|
||||
'live_status': 'is_live',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://roya.tv/live-stream/21',
|
||||
'info_dict': {
|
||||
'id': '21',
|
||||
'title': r're:Roya News \d{4}-\d{2}-\d{2} \d{2}:\d{2}',
|
||||
'ext': 'mp4',
|
||||
'live_status': 'is_live',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://roya.tv/live-stream/10000',
|
||||
'only_matching': True,
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
media_id = self._match_id(url)
|
||||
|
||||
stream_url = self._download_json(
|
||||
f'https://ticket.roya-tv.com/api/v5/fastchannel/{media_id}', media_id)['data']['secured_url']
|
||||
|
||||
title = traverse_obj(
|
||||
self._download_json('https://backend.roya.tv/api/v01/channels/schedule-pagination', media_id, fatal=False),
|
||||
('data', 0, 'channel', lambda _, v: str(v['id']) == media_id, 'title', {str}, any))
|
||||
|
||||
return {
|
||||
'id': media_id,
|
||||
'formats': self._extract_m3u8_formats(stream_url, media_id, 'mp4', m3u8_id='hls', live=True),
|
||||
'title': title,
|
||||
'is_live': True,
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue