mirror of https://github.com/yt-dlp/yt-dlp
Merge branch 'master' into extract-common-_is_jwt_token_expired-to-InfoExtractor
commit
f837652d3e
@ -0,0 +1,41 @@
|
||||
name: Signature Tests
|
||||
on:
|
||||
push:
|
||||
paths:
|
||||
- .github/workflows/signature-tests.yml
|
||||
- test/test_youtube_signature.py
|
||||
- yt_dlp/jsinterp.py
|
||||
pull_request:
|
||||
paths:
|
||||
- .github/workflows/signature-tests.yml
|
||||
- test/test_youtube_signature.py
|
||||
- yt_dlp/jsinterp.py
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
concurrency:
|
||||
group: signature-tests-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
|
||||
|
||||
jobs:
|
||||
tests:
|
||||
name: Signature Tests
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ubuntu-latest, windows-latest]
|
||||
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13', pypy-3.11]
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install test requirements
|
||||
run: python3 ./devscripts/install_deps.py --only-optional --include test
|
||||
- name: Run tests
|
||||
timeout-minutes: 15
|
||||
run: |
|
||||
python3 -m yt_dlp -v || true # Print debug head
|
||||
python3 ./devscripts/run_tests.py test/test_youtube_signature.py
|
@ -0,0 +1,235 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Allow direct execution
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
|
||||
import datetime as dt
|
||||
import json
|
||||
import math
|
||||
import re
|
||||
import unittest
|
||||
|
||||
from yt_dlp.utils.jslib import devalue
|
||||
|
||||
|
||||
TEST_CASES_EQUALS = [{
|
||||
'name': 'int',
|
||||
'unparsed': [-42],
|
||||
'parsed': -42,
|
||||
}, {
|
||||
'name': 'str',
|
||||
'unparsed': ['woo!!!'],
|
||||
'parsed': 'woo!!!',
|
||||
}, {
|
||||
'name': 'Number',
|
||||
'unparsed': [['Object', 42]],
|
||||
'parsed': 42,
|
||||
}, {
|
||||
'name': 'String',
|
||||
'unparsed': [['Object', 'yar']],
|
||||
'parsed': 'yar',
|
||||
}, {
|
||||
'name': 'Infinity',
|
||||
'unparsed': -4,
|
||||
'parsed': math.inf,
|
||||
}, {
|
||||
'name': 'negative Infinity',
|
||||
'unparsed': -5,
|
||||
'parsed': -math.inf,
|
||||
}, {
|
||||
'name': 'negative zero',
|
||||
'unparsed': -6,
|
||||
'parsed': -0.0,
|
||||
}, {
|
||||
'name': 'RegExp',
|
||||
'unparsed': [['RegExp', 'regexp', 'gim']], # XXX: flags are ignored
|
||||
'parsed': re.compile('regexp'),
|
||||
}, {
|
||||
'name': 'Date',
|
||||
'unparsed': [['Date', '2001-09-09T01:46:40.000Z']],
|
||||
'parsed': dt.datetime.fromtimestamp(1e9, tz=dt.timezone.utc),
|
||||
}, {
|
||||
'name': 'Array',
|
||||
'unparsed': [[1, 2, 3], 'a', 'b', 'c'],
|
||||
'parsed': ['a', 'b', 'c'],
|
||||
}, {
|
||||
'name': 'Array (empty)',
|
||||
'unparsed': [[]],
|
||||
'parsed': [],
|
||||
}, {
|
||||
'name': 'Array (sparse)',
|
||||
'unparsed': [[-2, 1, -2], 'b'],
|
||||
'parsed': [None, 'b', None],
|
||||
}, {
|
||||
'name': 'Object',
|
||||
'unparsed': [{'foo': 1, 'x-y': 2}, 'bar', 'z'],
|
||||
'parsed': {'foo': 'bar', 'x-y': 'z'},
|
||||
}, {
|
||||
'name': 'Set',
|
||||
'unparsed': [['Set', 1, 2, 3], 1, 2, 3],
|
||||
'parsed': [1, 2, 3],
|
||||
}, {
|
||||
'name': 'Map',
|
||||
'unparsed': [['Map', 1, 2], 'a', 'b'],
|
||||
'parsed': [['a', 'b']],
|
||||
}, {
|
||||
'name': 'BigInt',
|
||||
'unparsed': [['BigInt', '1']],
|
||||
'parsed': 1,
|
||||
}, {
|
||||
'name': 'Uint8Array',
|
||||
'unparsed': [['Uint8Array', 'AQID']],
|
||||
'parsed': [1, 2, 3],
|
||||
}, {
|
||||
'name': 'ArrayBuffer',
|
||||
'unparsed': [['ArrayBuffer', 'AQID']],
|
||||
'parsed': [1, 2, 3],
|
||||
}, {
|
||||
'name': 'str (repetition)',
|
||||
'unparsed': [[1, 1], 'a string'],
|
||||
'parsed': ['a string', 'a string'],
|
||||
}, {
|
||||
'name': 'None (repetition)',
|
||||
'unparsed': [[1, 1], None],
|
||||
'parsed': [None, None],
|
||||
}, {
|
||||
'name': 'dict (repetition)',
|
||||
'unparsed': [[1, 1], {}],
|
||||
'parsed': [{}, {}],
|
||||
}, {
|
||||
'name': 'Object without prototype',
|
||||
'unparsed': [['null']],
|
||||
'parsed': {},
|
||||
}, {
|
||||
'name': 'cross-realm POJO',
|
||||
'unparsed': [{}],
|
||||
'parsed': {},
|
||||
}]
|
||||
|
||||
TEST_CASES_IS = [{
|
||||
'name': 'bool',
|
||||
'unparsed': [True],
|
||||
'parsed': True,
|
||||
}, {
|
||||
'name': 'Boolean',
|
||||
'unparsed': [['Object', False]],
|
||||
'parsed': False,
|
||||
}, {
|
||||
'name': 'undefined',
|
||||
'unparsed': -1,
|
||||
'parsed': None,
|
||||
}, {
|
||||
'name': 'null',
|
||||
'unparsed': [None],
|
||||
'parsed': None,
|
||||
}, {
|
||||
'name': 'NaN',
|
||||
'unparsed': -3,
|
||||
'parsed': math.nan,
|
||||
}]
|
||||
|
||||
TEST_CASES_INVALID = [{
|
||||
'name': 'empty string',
|
||||
'unparsed': '',
|
||||
'error': ValueError,
|
||||
'pattern': r'expected int or list as input',
|
||||
}, {
|
||||
'name': 'hole',
|
||||
'unparsed': -2,
|
||||
'error': ValueError,
|
||||
'pattern': r'invalid integer input',
|
||||
}, {
|
||||
'name': 'string',
|
||||
'unparsed': 'hello',
|
||||
'error': ValueError,
|
||||
'pattern': r'expected int or list as input',
|
||||
}, {
|
||||
'name': 'number',
|
||||
'unparsed': 42,
|
||||
'error': ValueError,
|
||||
'pattern': r'invalid integer input',
|
||||
}, {
|
||||
'name': 'boolean',
|
||||
'unparsed': True,
|
||||
'error': ValueError,
|
||||
'pattern': r'expected int or list as input',
|
||||
}, {
|
||||
'name': 'null',
|
||||
'unparsed': None,
|
||||
'error': ValueError,
|
||||
'pattern': r'expected int or list as input',
|
||||
}, {
|
||||
'name': 'object',
|
||||
'unparsed': {},
|
||||
'error': ValueError,
|
||||
'pattern': r'expected int or list as input',
|
||||
}, {
|
||||
'name': 'empty array',
|
||||
'unparsed': [],
|
||||
'error': ValueError,
|
||||
'pattern': r'expected a non-empty list as input',
|
||||
}, {
|
||||
'name': 'Python negative indexing',
|
||||
'unparsed': [[1, 2, 3, 4, 5, 6, 7, -7], 1, 2, 3, 4, 5, 6, 7],
|
||||
'error': IndexError,
|
||||
'pattern': r'invalid index: -7',
|
||||
}]
|
||||
|
||||
|
||||
class TestDevalue(unittest.TestCase):
|
||||
def test_devalue_parse_equals(self):
|
||||
for tc in TEST_CASES_EQUALS:
|
||||
self.assertEqual(devalue.parse(tc['unparsed']), tc['parsed'], tc['name'])
|
||||
|
||||
def test_devalue_parse_is(self):
|
||||
for tc in TEST_CASES_IS:
|
||||
self.assertIs(devalue.parse(tc['unparsed']), tc['parsed'], tc['name'])
|
||||
|
||||
def test_devalue_parse_invalid(self):
|
||||
for tc in TEST_CASES_INVALID:
|
||||
with self.assertRaisesRegex(tc['error'], tc['pattern'], msg=tc['name']):
|
||||
devalue.parse(tc['unparsed'])
|
||||
|
||||
def test_devalue_parse_cyclical(self):
|
||||
name = 'Map (cyclical)'
|
||||
result = devalue.parse([['Map', 1, 0], 'self'])
|
||||
self.assertEqual(result[0][0], 'self', name)
|
||||
self.assertIs(result, result[0][1], name)
|
||||
|
||||
name = 'Set (cyclical)'
|
||||
result = devalue.parse([['Set', 0, 1], 42])
|
||||
self.assertEqual(result[1], 42, name)
|
||||
self.assertIs(result, result[0], name)
|
||||
|
||||
result = devalue.parse([[0]])
|
||||
self.assertIs(result, result[0], 'Array (cyclical)')
|
||||
|
||||
name = 'Object (cyclical)'
|
||||
result = devalue.parse([{'self': 0}])
|
||||
self.assertIs(result, result['self'], name)
|
||||
|
||||
name = 'Object with null prototype (cyclical)'
|
||||
result = devalue.parse([['null', 'self', 0]])
|
||||
self.assertIs(result, result['self'], name)
|
||||
|
||||
name = 'Objects (cyclical)'
|
||||
result = devalue.parse([[1, 2], {'second': 2}, {'first': 1}])
|
||||
self.assertIs(result[0], result[1]['first'], name)
|
||||
self.assertIs(result[1], result[0]['second'], name)
|
||||
|
||||
def test_devalue_parse_revivers(self):
|
||||
self.assertEqual(
|
||||
devalue.parse([['indirect', 1], {'a': 2}, 'b'], revivers={'indirect': lambda x: x}),
|
||||
{'a': 'b'}, 'revivers (indirect)')
|
||||
|
||||
self.assertEqual(
|
||||
devalue.parse([['parse', 1], '{"a":0}'], revivers={'parse': lambda x: json.loads(x)}),
|
||||
{'a': 0}, 'revivers (parse)')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -0,0 +1,71 @@
|
||||
import collections
|
||||
|
||||
import pytest
|
||||
|
||||
from yt_dlp import YoutubeDL
|
||||
from yt_dlp.cookies import YoutubeDLCookieJar
|
||||
from yt_dlp.extractor.common import InfoExtractor
|
||||
from yt_dlp.extractor.youtube.pot._provider import IEContentProviderLogger
|
||||
from yt_dlp.extractor.youtube.pot.provider import PoTokenRequest, PoTokenContext
|
||||
from yt_dlp.utils.networking import HTTPHeaderDict
|
||||
|
||||
|
||||
class MockLogger(IEContentProviderLogger):
|
||||
|
||||
log_level = IEContentProviderLogger.LogLevel.TRACE
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.messages = collections.defaultdict(list)
|
||||
|
||||
def trace(self, message: str):
|
||||
self.messages['trace'].append(message)
|
||||
|
||||
def debug(self, message: str):
|
||||
self.messages['debug'].append(message)
|
||||
|
||||
def info(self, message: str):
|
||||
self.messages['info'].append(message)
|
||||
|
||||
def warning(self, message: str, *, once=False):
|
||||
self.messages['warning'].append(message)
|
||||
|
||||
def error(self, message: str):
|
||||
self.messages['error'].append(message)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ie() -> InfoExtractor:
|
||||
ydl = YoutubeDL()
|
||||
return ydl.get_info_extractor('Youtube')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def logger() -> MockLogger:
|
||||
return MockLogger()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def pot_request() -> PoTokenRequest:
|
||||
return PoTokenRequest(
|
||||
context=PoTokenContext.GVS,
|
||||
innertube_context={'client': {'clientName': 'WEB'}},
|
||||
innertube_host='youtube.com',
|
||||
session_index=None,
|
||||
player_url=None,
|
||||
is_authenticated=False,
|
||||
video_webpage=None,
|
||||
|
||||
visitor_data='example-visitor-data',
|
||||
data_sync_id='example-data-sync-id',
|
||||
video_id='example-video-id',
|
||||
|
||||
request_cookiejar=YoutubeDLCookieJar(),
|
||||
request_proxy=None,
|
||||
request_headers=HTTPHeaderDict(),
|
||||
request_timeout=None,
|
||||
request_source_address=None,
|
||||
request_verify_tls=True,
|
||||
|
||||
bypass_cache=False,
|
||||
)
|
@ -0,0 +1,117 @@
|
||||
import threading
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
import pytest
|
||||
from yt_dlp.extractor.youtube.pot._provider import IEContentProvider, BuiltinIEContentProvider
|
||||
from yt_dlp.utils import bug_reports_message
|
||||
from yt_dlp.extractor.youtube.pot._builtin.memory_cache import MemoryLRUPCP, memorylru_preference, initialize_global_cache
|
||||
from yt_dlp.version import __version__
|
||||
from yt_dlp.extractor.youtube.pot._registry import _pot_cache_providers, _pot_memory_cache
|
||||
|
||||
|
||||
class TestMemoryLRUPCS:
|
||||
|
||||
def test_base_type(self):
|
||||
assert issubclass(MemoryLRUPCP, IEContentProvider)
|
||||
assert issubclass(MemoryLRUPCP, BuiltinIEContentProvider)
|
||||
|
||||
@pytest.fixture
|
||||
def pcp(self, ie, logger) -> MemoryLRUPCP:
|
||||
return MemoryLRUPCP(ie, logger, {}, initialize_cache=lambda max_size: (OrderedDict(), threading.Lock(), max_size))
|
||||
|
||||
def test_is_registered(self):
|
||||
assert _pot_cache_providers.value.get('MemoryLRU') == MemoryLRUPCP
|
||||
|
||||
def test_initialization(self, pcp):
|
||||
assert pcp.PROVIDER_NAME == 'memory'
|
||||
assert pcp.PROVIDER_VERSION == __version__
|
||||
assert pcp.BUG_REPORT_MESSAGE == bug_reports_message(before='')
|
||||
assert pcp.is_available()
|
||||
|
||||
def test_store_and_get(self, pcp):
|
||||
pcp.store('key1', 'value1', int(time.time()) + 60)
|
||||
assert pcp.get('key1') == 'value1'
|
||||
assert len(pcp.cache) == 1
|
||||
|
||||
def test_store_ignore_expired(self, pcp):
|
||||
pcp.store('key1', 'value1', int(time.time()) - 1)
|
||||
assert len(pcp.cache) == 0
|
||||
assert pcp.get('key1') is None
|
||||
assert len(pcp.cache) == 0
|
||||
|
||||
def test_store_override_existing_key(self, ie, logger):
|
||||
MAX_SIZE = 2
|
||||
pcp = MemoryLRUPCP(ie, logger, {}, initialize_cache=lambda max_size: (OrderedDict(), threading.Lock(), MAX_SIZE))
|
||||
pcp.store('key1', 'value1', int(time.time()) + 60)
|
||||
pcp.store('key2', 'value2', int(time.time()) + 60)
|
||||
assert len(pcp.cache) == 2
|
||||
pcp.store('key1', 'value2', int(time.time()) + 60)
|
||||
# Ensure that the override key gets added to the end of the cache instead of in the same position
|
||||
pcp.store('key3', 'value3', int(time.time()) + 60)
|
||||
assert pcp.get('key1') == 'value2'
|
||||
|
||||
def test_store_ignore_expired_existing_key(self, pcp):
|
||||
pcp.store('key1', 'value2', int(time.time()) + 60)
|
||||
pcp.store('key1', 'value1', int(time.time()) - 1)
|
||||
assert len(pcp.cache) == 1
|
||||
assert pcp.get('key1') == 'value2'
|
||||
assert len(pcp.cache) == 1
|
||||
|
||||
def test_get_key_expired(self, pcp):
|
||||
pcp.store('key1', 'value1', int(time.time()) + 60)
|
||||
assert pcp.get('key1') == 'value1'
|
||||
assert len(pcp.cache) == 1
|
||||
pcp.cache['key1'] = ('value1', int(time.time()) - 1)
|
||||
assert pcp.get('key1') is None
|
||||
assert len(pcp.cache) == 0
|
||||
|
||||
def test_lru_eviction(self, ie, logger):
|
||||
MAX_SIZE = 2
|
||||
provider = MemoryLRUPCP(ie, logger, {}, initialize_cache=lambda max_size: (OrderedDict(), threading.Lock(), MAX_SIZE))
|
||||
provider.store('key1', 'value1', int(time.time()) + 5)
|
||||
provider.store('key2', 'value2', int(time.time()) + 5)
|
||||
assert len(provider.cache) == 2
|
||||
|
||||
assert provider.get('key1') == 'value1'
|
||||
|
||||
provider.store('key3', 'value3', int(time.time()) + 5)
|
||||
assert len(provider.cache) == 2
|
||||
|
||||
assert provider.get('key2') is None
|
||||
|
||||
provider.store('key4', 'value4', int(time.time()) + 5)
|
||||
assert len(provider.cache) == 2
|
||||
|
||||
assert provider.get('key1') is None
|
||||
assert provider.get('key3') == 'value3'
|
||||
assert provider.get('key4') == 'value4'
|
||||
|
||||
def test_delete(self, pcp):
|
||||
pcp.store('key1', 'value1', int(time.time()) + 5)
|
||||
assert len(pcp.cache) == 1
|
||||
assert pcp.get('key1') == 'value1'
|
||||
pcp.delete('key1')
|
||||
assert len(pcp.cache) == 0
|
||||
assert pcp.get('key1') is None
|
||||
|
||||
def test_use_global_cache_default(self, ie, logger):
|
||||
pcp = MemoryLRUPCP(ie, logger, {})
|
||||
assert pcp.max_size == _pot_memory_cache.value['max_size'] == 25
|
||||
assert pcp.cache is _pot_memory_cache.value['cache']
|
||||
assert pcp.lock is _pot_memory_cache.value['lock']
|
||||
|
||||
pcp2 = MemoryLRUPCP(ie, logger, {})
|
||||
assert pcp.max_size == pcp2.max_size == _pot_memory_cache.value['max_size'] == 25
|
||||
assert pcp.cache is pcp2.cache is _pot_memory_cache.value['cache']
|
||||
assert pcp.lock is pcp2.lock is _pot_memory_cache.value['lock']
|
||||
|
||||
def test_fail_max_size_change_global(self, ie, logger):
|
||||
pcp = MemoryLRUPCP(ie, logger, {})
|
||||
assert pcp.max_size == _pot_memory_cache.value['max_size'] == 25
|
||||
with pytest.raises(ValueError, match='Cannot change max_size of initialized global memory cache'):
|
||||
initialize_global_cache(50)
|
||||
|
||||
assert pcp.max_size == _pot_memory_cache.value['max_size'] == 25
|
||||
|
||||
def test_memory_lru_preference(self, pcp, ie, pot_request):
|
||||
assert memorylru_preference(pcp, pot_request) == 10000
|
@ -0,0 +1,47 @@
|
||||
import pytest
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenContext,
|
||||
|
||||
)
|
||||
|
||||
from yt_dlp.extractor.youtube.pot.utils import get_webpo_content_binding, ContentBindingType
|
||||
|
||||
|
||||
class TestGetWebPoContentBinding:
|
||||
|
||||
@pytest.mark.parametrize('client_name, context, is_authenticated, expected', [
|
||||
*[(client, context, is_authenticated, expected) for client in [
|
||||
'WEB', 'MWEB', 'TVHTML5', 'WEB_EMBEDDED_PLAYER', 'WEB_CREATOR', 'TVHTML5_SIMPLY_EMBEDDED_PLAYER', 'TVHTML5_SIMPLY']
|
||||
for context, is_authenticated, expected in [
|
||||
(PoTokenContext.GVS, False, ('example-visitor-data', ContentBindingType.VISITOR_DATA)),
|
||||
(PoTokenContext.PLAYER, False, ('example-video-id', ContentBindingType.VIDEO_ID)),
|
||||
(PoTokenContext.SUBS, False, ('example-video-id', ContentBindingType.VIDEO_ID)),
|
||||
(PoTokenContext.GVS, True, ('example-data-sync-id', ContentBindingType.DATASYNC_ID)),
|
||||
]],
|
||||
('WEB_REMIX', PoTokenContext.GVS, False, ('example-visitor-data', ContentBindingType.VISITOR_DATA)),
|
||||
('WEB_REMIX', PoTokenContext.PLAYER, False, ('example-visitor-data', ContentBindingType.VISITOR_DATA)),
|
||||
('ANDROID', PoTokenContext.GVS, False, (None, None)),
|
||||
('IOS', PoTokenContext.GVS, False, (None, None)),
|
||||
])
|
||||
def test_get_webpo_content_binding(self, pot_request, client_name, context, is_authenticated, expected):
|
||||
pot_request.innertube_context['client']['clientName'] = client_name
|
||||
pot_request.context = context
|
||||
pot_request.is_authenticated = is_authenticated
|
||||
assert get_webpo_content_binding(pot_request) == expected
|
||||
|
||||
def test_extract_visitor_id(self, pot_request):
|
||||
pot_request.visitor_data = 'CgsxMjNhYmNYWVpfLSiA4s%2DqBg%3D%3D'
|
||||
assert get_webpo_content_binding(pot_request, bind_to_visitor_id=True) == ('123abcXYZ_-', ContentBindingType.VISITOR_ID)
|
||||
|
||||
def test_invalid_visitor_id(self, pot_request):
|
||||
# visitor id not alphanumeric (i.e. protobuf extraction failed)
|
||||
pot_request.visitor_data = 'CggxMjM0NTY3OCiA4s-qBg%3D%3D'
|
||||
assert get_webpo_content_binding(pot_request, bind_to_visitor_id=True) == (pot_request.visitor_data, ContentBindingType.VISITOR_DATA)
|
||||
|
||||
def test_no_visitor_id(self, pot_request):
|
||||
pot_request.visitor_data = 'KIDiz6oG'
|
||||
assert get_webpo_content_binding(pot_request, bind_to_visitor_id=True) == (pot_request.visitor_data, ContentBindingType.VISITOR_DATA)
|
||||
|
||||
def test_invalid_base64(self, pot_request):
|
||||
pot_request.visitor_data = 'invalid-base64'
|
||||
assert get_webpo_content_binding(pot_request, bind_to_visitor_id=True) == (pot_request.visitor_data, ContentBindingType.VISITOR_DATA)
|
@ -0,0 +1,92 @@
|
||||
import pytest
|
||||
|
||||
from yt_dlp.extractor.youtube.pot._provider import IEContentProvider, BuiltinIEContentProvider
|
||||
from yt_dlp.extractor.youtube.pot.cache import CacheProviderWritePolicy
|
||||
from yt_dlp.utils import bug_reports_message
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenRequest,
|
||||
PoTokenContext,
|
||||
|
||||
)
|
||||
from yt_dlp.version import __version__
|
||||
|
||||
from yt_dlp.extractor.youtube.pot._builtin.webpo_cachespec import WebPoPCSP
|
||||
from yt_dlp.extractor.youtube.pot._registry import _pot_pcs_providers
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def pot_request(pot_request) -> PoTokenRequest:
|
||||
pot_request.visitor_data = 'CgsxMjNhYmNYWVpfLSiA4s%2DqBg%3D%3D' # visitor_id=123abcXYZ_-
|
||||
return pot_request
|
||||
|
||||
|
||||
class TestWebPoPCSP:
|
||||
def test_base_type(self):
|
||||
assert issubclass(WebPoPCSP, IEContentProvider)
|
||||
assert issubclass(WebPoPCSP, BuiltinIEContentProvider)
|
||||
|
||||
def test_init(self, ie, logger):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
assert pcs.PROVIDER_NAME == 'webpo'
|
||||
assert pcs.PROVIDER_VERSION == __version__
|
||||
assert pcs.BUG_REPORT_MESSAGE == bug_reports_message(before='')
|
||||
assert pcs.is_available()
|
||||
|
||||
def test_is_registered(self):
|
||||
assert _pot_pcs_providers.value.get('WebPo') == WebPoPCSP
|
||||
|
||||
@pytest.mark.parametrize('client_name, context, is_authenticated', [
|
||||
('ANDROID', PoTokenContext.GVS, False),
|
||||
('IOS', PoTokenContext.GVS, False),
|
||||
('IOS', PoTokenContext.PLAYER, False),
|
||||
])
|
||||
def test_not_supports(self, ie, logger, pot_request, client_name, context, is_authenticated):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
pot_request.innertube_context['client']['clientName'] = client_name
|
||||
pot_request.context = context
|
||||
pot_request.is_authenticated = is_authenticated
|
||||
assert pcs.generate_cache_spec(pot_request) is None
|
||||
|
||||
@pytest.mark.parametrize('client_name, context, is_authenticated, remote_host, source_address, request_proxy, expected', [
|
||||
*[(client, context, is_authenticated, remote_host, source_address, request_proxy, expected) for client in [
|
||||
'WEB', 'MWEB', 'TVHTML5', 'WEB_EMBEDDED_PLAYER', 'WEB_CREATOR', 'TVHTML5_SIMPLY_EMBEDDED_PLAYER', 'TVHTML5_SIMPLY']
|
||||
for context, is_authenticated, remote_host, source_address, request_proxy, expected in [
|
||||
(PoTokenContext.GVS, False, 'example-remote-host', 'example-source-address', 'example-request-proxy', {'t': 'webpo', 'ip': 'example-remote-host', 'sa': 'example-source-address', 'px': 'example-request-proxy', 'cb': '123abcXYZ_-', 'cbt': 'visitor_id'}),
|
||||
(PoTokenContext.PLAYER, False, 'example-remote-host', 'example-source-address', 'example-request-proxy', {'t': 'webpo', 'ip': 'example-remote-host', 'sa': 'example-source-address', 'px': 'example-request-proxy', 'cb': '123abcXYZ_-', 'cbt': 'video_id'}),
|
||||
(PoTokenContext.GVS, True, 'example-remote-host', 'example-source-address', 'example-request-proxy', {'t': 'webpo', 'ip': 'example-remote-host', 'sa': 'example-source-address', 'px': 'example-request-proxy', 'cb': 'example-data-sync-id', 'cbt': 'datasync_id'}),
|
||||
]],
|
||||
('WEB_REMIX', PoTokenContext.PLAYER, False, 'example-remote-host', 'example-source-address', 'example-request-proxy', {'t': 'webpo', 'ip': 'example-remote-host', 'sa': 'example-source-address', 'px': 'example-request-proxy', 'cb': '123abcXYZ_-', 'cbt': 'visitor_id'}),
|
||||
('WEB', PoTokenContext.GVS, False, None, None, None, {'t': 'webpo', 'cb': '123abcXYZ_-', 'cbt': 'visitor_id', 'ip': None, 'sa': None, 'px': None}),
|
||||
('TVHTML5', PoTokenContext.PLAYER, False, None, None, 'http://example.com', {'t': 'webpo', 'cb': '123abcXYZ_-', 'cbt': 'video_id', 'ip': None, 'sa': None, 'px': 'http://example.com'}),
|
||||
|
||||
])
|
||||
def test_generate_key_bindings(self, ie, logger, pot_request, client_name, context, is_authenticated, remote_host, source_address, request_proxy, expected):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
pot_request.innertube_context['client']['clientName'] = client_name
|
||||
pot_request.context = context
|
||||
pot_request.is_authenticated = is_authenticated
|
||||
pot_request.innertube_context['client']['remoteHost'] = remote_host
|
||||
pot_request.request_source_address = source_address
|
||||
pot_request.request_proxy = request_proxy
|
||||
pot_request.video_id = '123abcXYZ_-' # same as visitor id to test type
|
||||
|
||||
assert pcs.generate_cache_spec(pot_request).key_bindings == expected
|
||||
|
||||
def test_no_bind_visitor_id(self, ie, logger, pot_request):
|
||||
# Should not bind to visitor id if setting is set to False
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={'bind_to_visitor_id': ['false']})
|
||||
pot_request.innertube_context['client']['clientName'] = 'WEB'
|
||||
pot_request.context = PoTokenContext.GVS
|
||||
pot_request.is_authenticated = False
|
||||
assert pcs.generate_cache_spec(pot_request).key_bindings == {'t': 'webpo', 'ip': None, 'sa': None, 'px': None, 'cb': 'CgsxMjNhYmNYWVpfLSiA4s%2DqBg%3D%3D', 'cbt': 'visitor_data'}
|
||||
|
||||
def test_default_ttl(self, ie, logger, pot_request):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
assert pcs.generate_cache_spec(pot_request).default_ttl == 6 * 60 * 60 # should default to 6 hours
|
||||
|
||||
def test_write_policy(self, ie, logger, pot_request):
|
||||
pcs = WebPoPCSP(ie=ie, logger=logger, settings={})
|
||||
pot_request.context = PoTokenContext.GVS
|
||||
assert pcs.generate_cache_spec(pot_request).write_policy == CacheProviderWritePolicy.WRITE_ALL
|
||||
pot_request.context = PoTokenContext.PLAYER
|
||||
assert pcs.generate_cache_spec(pot_request).write_policy == CacheProviderWritePolicy.WRITE_FIRST
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,629 @@
|
||||
import pytest
|
||||
|
||||
from yt_dlp.extractor.youtube.pot._provider import IEContentProvider
|
||||
from yt_dlp.cookies import YoutubeDLCookieJar
|
||||
from yt_dlp.utils.networking import HTTPHeaderDict
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenRequest,
|
||||
PoTokenContext,
|
||||
ExternalRequestFeature,
|
||||
|
||||
)
|
||||
|
||||
from yt_dlp.extractor.youtube.pot.cache import (
|
||||
PoTokenCacheProvider,
|
||||
PoTokenCacheSpec,
|
||||
PoTokenCacheSpecProvider,
|
||||
CacheProviderWritePolicy,
|
||||
)
|
||||
|
||||
import yt_dlp.extractor.youtube.pot.cache as cache
|
||||
|
||||
from yt_dlp.networking import Request
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenResponse,
|
||||
PoTokenProvider,
|
||||
PoTokenProviderRejectedRequest,
|
||||
provider_bug_report_message,
|
||||
register_provider,
|
||||
register_preference,
|
||||
)
|
||||
|
||||
from yt_dlp.extractor.youtube.pot._registry import _pot_providers, _ptp_preferences, _pot_pcs_providers, _pot_cache_providers, _pot_cache_provider_preferences
|
||||
|
||||
|
||||
class ExamplePTP(PoTokenProvider):
|
||||
PROVIDER_NAME = 'example'
|
||||
PROVIDER_VERSION = '0.0.1'
|
||||
BUG_REPORT_LOCATION = 'https://example.com/issues'
|
||||
|
||||
_SUPPORTED_CLIENTS = ('WEB',)
|
||||
_SUPPORTED_CONTEXTS = (PoTokenContext.GVS, )
|
||||
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = (
|
||||
ExternalRequestFeature.PROXY_SCHEME_HTTP,
|
||||
ExternalRequestFeature.PROXY_SCHEME_SOCKS5H,
|
||||
)
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
return PoTokenResponse('example-token', expires_at=123)
|
||||
|
||||
|
||||
class ExampleCacheProviderPCP(PoTokenCacheProvider):
|
||||
|
||||
PROVIDER_NAME = 'example'
|
||||
PROVIDER_VERSION = '0.0.1'
|
||||
BUG_REPORT_LOCATION = 'https://example.com/issues'
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def get(self, key: str):
|
||||
return 'example-cache'
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
|
||||
class ExampleCacheSpecProviderPCSP(PoTokenCacheSpecProvider):
|
||||
|
||||
PROVIDER_NAME = 'example'
|
||||
PROVIDER_VERSION = '0.0.1'
|
||||
BUG_REPORT_LOCATION = 'https://example.com/issues'
|
||||
|
||||
def generate_cache_spec(self, request: PoTokenRequest):
|
||||
return PoTokenCacheSpec(
|
||||
key_bindings={'field': 'example-key'},
|
||||
default_ttl=60,
|
||||
write_policy=CacheProviderWritePolicy.WRITE_FIRST,
|
||||
)
|
||||
|
||||
|
||||
class TestPoTokenProvider:
|
||||
|
||||
def test_base_type(self):
|
||||
assert issubclass(PoTokenProvider, IEContentProvider)
|
||||
|
||||
def test_create_provider_missing_fetch_method(self, ie, logger):
|
||||
class MissingMethodsPTP(PoTokenProvider):
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_missing_available_method(self, ie, logger):
|
||||
class MissingMethodsPTP(PoTokenProvider):
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
raise PoTokenProviderRejectedRequest('Not implemented')
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_barebones_provider(self, ie, logger):
|
||||
class BarebonesProviderPTP(PoTokenProvider):
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
raise PoTokenProviderRejectedRequest('Not implemented')
|
||||
|
||||
provider = BarebonesProviderPTP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_KEY == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.0'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at (developer has not provided a bug report location) .'
|
||||
|
||||
def test_example_provider_success(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'example'
|
||||
assert provider.PROVIDER_KEY == 'Example'
|
||||
assert provider.PROVIDER_VERSION == '0.0.1'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at https://example.com/issues .'
|
||||
assert provider.is_available()
|
||||
|
||||
response = provider.request_pot(pot_request)
|
||||
|
||||
assert response.po_token == 'example-token'
|
||||
assert response.expires_at == 123
|
||||
|
||||
def test_provider_unsupported_context(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
pot_request.context = PoTokenContext.PLAYER
|
||||
|
||||
with pytest.raises(PoTokenProviderRejectedRequest):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_unsupported_client(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
pot_request.innertube_context['client']['clientName'] = 'ANDROID'
|
||||
|
||||
with pytest.raises(PoTokenProviderRejectedRequest):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_unsupported_proxy_scheme(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
pot_request.request_proxy = 'socks4://example.com'
|
||||
|
||||
with pytest.raises(
|
||||
PoTokenProviderRejectedRequest,
|
||||
match='External requests by "example" provider do not support proxy scheme "socks4". Supported proxy '
|
||||
'schemes: http, socks5h',
|
||||
):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_proxy = 'http://example.com'
|
||||
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_ignore_external_request_features(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = None
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_proxy = 'socks5://example.com'
|
||||
assert provider.request_pot(pot_request)
|
||||
pot_request.request_source_address = '0.0.0.0'
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_unsupported_external_request_source_address(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = tuple()
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_source_address = None
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_source_address = '0.0.0.0'
|
||||
with pytest.raises(
|
||||
PoTokenProviderRejectedRequest,
|
||||
match='External requests by "example" provider do not support setting source address',
|
||||
):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_supported_external_request_source_address(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = (
|
||||
ExternalRequestFeature.SOURCE_ADDRESS,
|
||||
)
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_source_address = None
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_source_address = '0.0.0.0'
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_unsupported_external_request_tls_verification(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = tuple()
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_verify_tls = True
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_verify_tls = False
|
||||
with pytest.raises(
|
||||
PoTokenProviderRejectedRequest,
|
||||
match='External requests by "example" provider do not support ignoring TLS certificate failures',
|
||||
):
|
||||
provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_supported_external_request_tls_verification(self, ie, logger, pot_request):
|
||||
class InternalPTP(ExamplePTP):
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = (
|
||||
ExternalRequestFeature.DISABLE_TLS_VERIFICATION,
|
||||
)
|
||||
|
||||
provider = InternalPTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
pot_request.request_verify_tls = True
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
pot_request.request_verify_tls = False
|
||||
assert provider.request_pot(pot_request)
|
||||
|
||||
def test_provider_request_webpage(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
cookiejar = YoutubeDLCookieJar()
|
||||
pot_request.request_headers = HTTPHeaderDict({'User-Agent': 'example-user-agent'})
|
||||
pot_request.request_proxy = 'socks5://example-proxy.com'
|
||||
pot_request.request_cookiejar = cookiejar
|
||||
|
||||
def mock_urlopen(request):
|
||||
return request
|
||||
|
||||
ie._downloader.urlopen = mock_urlopen
|
||||
|
||||
sent_request = provider._request_webpage(Request(
|
||||
'https://example.com',
|
||||
), pot_request=pot_request)
|
||||
|
||||
assert sent_request.url == 'https://example.com'
|
||||
assert sent_request.headers['User-Agent'] == 'example-user-agent'
|
||||
assert sent_request.proxies == {'all': 'socks5://example-proxy.com'}
|
||||
assert sent_request.extensions['cookiejar'] is cookiejar
|
||||
assert 'Requesting webpage' in logger.messages['info']
|
||||
|
||||
def test_provider_request_webpage_override(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
cookiejar_request = YoutubeDLCookieJar()
|
||||
pot_request.request_headers = HTTPHeaderDict({'User-Agent': 'example-user-agent'})
|
||||
pot_request.request_proxy = 'socks5://example-proxy.com'
|
||||
pot_request.request_cookiejar = cookiejar_request
|
||||
|
||||
def mock_urlopen(request):
|
||||
return request
|
||||
|
||||
ie._downloader.urlopen = mock_urlopen
|
||||
|
||||
sent_request = provider._request_webpage(Request(
|
||||
'https://example.com',
|
||||
headers={'User-Agent': 'override-user-agent-override'},
|
||||
proxies={'http': 'http://example-proxy-override.com'},
|
||||
extensions={'cookiejar': YoutubeDLCookieJar()},
|
||||
), pot_request=pot_request, note='Custom requesting webpage')
|
||||
|
||||
assert sent_request.url == 'https://example.com'
|
||||
assert sent_request.headers['User-Agent'] == 'override-user-agent-override'
|
||||
assert sent_request.proxies == {'http': 'http://example-proxy-override.com'}
|
||||
assert sent_request.extensions['cookiejar'] is not cookiejar_request
|
||||
assert 'Custom requesting webpage' in logger.messages['info']
|
||||
|
||||
def test_provider_request_webpage_no_log(self, ie, logger, pot_request):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def mock_urlopen(request):
|
||||
return request
|
||||
|
||||
ie._downloader.urlopen = mock_urlopen
|
||||
|
||||
sent_request = provider._request_webpage(Request(
|
||||
'https://example.com',
|
||||
), note=False)
|
||||
|
||||
assert sent_request.url == 'https://example.com'
|
||||
assert 'info' not in logger.messages
|
||||
|
||||
def test_provider_request_webpage_no_pot_request(self, ie, logger):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def mock_urlopen(request):
|
||||
return request
|
||||
|
||||
ie._downloader.urlopen = mock_urlopen
|
||||
|
||||
sent_request = provider._request_webpage(Request(
|
||||
'https://example.com',
|
||||
), pot_request=None)
|
||||
|
||||
assert sent_request.url == 'https://example.com'
|
||||
|
||||
def test_get_config_arg(self, ie, logger):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={'abc': ['123D'], 'xyz': ['456a', '789B']})
|
||||
|
||||
assert provider._configuration_arg('abc') == ['123d']
|
||||
assert provider._configuration_arg('abc', default=['default']) == ['123d']
|
||||
assert provider._configuration_arg('ABC', default=['default']) == ['default']
|
||||
assert provider._configuration_arg('abc', casesense=True) == ['123D']
|
||||
assert provider._configuration_arg('xyz', casesense=False) == ['456a', '789b']
|
||||
|
||||
def test_require_class_end_with_suffix(self, ie, logger):
|
||||
class InvalidSuffix(PoTokenProvider):
|
||||
PROVIDER_NAME = 'invalid-suffix'
|
||||
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
raise PoTokenProviderRejectedRequest('Not implemented')
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
provider = InvalidSuffix(ie=ie, logger=logger, settings={})
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
provider.PROVIDER_KEY # noqa: B018
|
||||
|
||||
|
||||
class TestPoTokenCacheProvider:
|
||||
|
||||
def test_base_type(self):
|
||||
assert issubclass(PoTokenCacheProvider, IEContentProvider)
|
||||
|
||||
def test_create_provider_missing_get_method(self, ie, logger):
|
||||
class MissingMethodsPCP(PoTokenCacheProvider):
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_missing_store_method(self, ie, logger):
|
||||
class MissingMethodsPCP(PoTokenCacheProvider):
|
||||
def get(self, key: str):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_missing_delete_method(self, ie, logger):
|
||||
class MissingMethodsPCP(PoTokenCacheProvider):
|
||||
def get(self, key: str):
|
||||
pass
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_missing_is_available_method(self, ie, logger):
|
||||
class MissingMethodsPCP(PoTokenCacheProvider):
|
||||
def get(self, key: str):
|
||||
pass
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCP(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_barebones_provider(self, ie, logger):
|
||||
class BarebonesProviderPCP(PoTokenCacheProvider):
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def get(self, key: str):
|
||||
return 'example-cache'
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
provider = BarebonesProviderPCP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_KEY == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.0'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at (developer has not provided a bug report location) .'
|
||||
|
||||
def test_create_provider_example(self, ie, logger):
|
||||
provider = ExampleCacheProviderPCP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'example'
|
||||
assert provider.PROVIDER_KEY == 'ExampleCacheProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.1'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at https://example.com/issues .'
|
||||
assert provider.is_available()
|
||||
|
||||
def test_get_config_arg(self, ie, logger):
|
||||
provider = ExampleCacheProviderPCP(ie=ie, logger=logger, settings={'abc': ['123D'], 'xyz': ['456a', '789B']})
|
||||
assert provider._configuration_arg('abc') == ['123d']
|
||||
assert provider._configuration_arg('abc', default=['default']) == ['123d']
|
||||
assert provider._configuration_arg('ABC', default=['default']) == ['default']
|
||||
assert provider._configuration_arg('abc', casesense=True) == ['123D']
|
||||
assert provider._configuration_arg('xyz', casesense=False) == ['456a', '789b']
|
||||
|
||||
def test_require_class_end_with_suffix(self, ie, logger):
|
||||
class InvalidSuffix(PoTokenCacheProvider):
|
||||
def get(self, key: str):
|
||||
return 'example-cache'
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
provider = InvalidSuffix(ie=ie, logger=logger, settings={})
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
provider.PROVIDER_KEY # noqa: B018
|
||||
|
||||
|
||||
class TestPoTokenCacheSpecProvider:
|
||||
|
||||
def test_base_type(self):
|
||||
assert issubclass(PoTokenCacheSpecProvider, IEContentProvider)
|
||||
|
||||
def test_create_provider_missing_supports_method(self, ie, logger):
|
||||
class MissingMethodsPCS(PoTokenCacheSpecProvider):
|
||||
pass
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
MissingMethodsPCS(ie=ie, logger=logger, settings={})
|
||||
|
||||
def test_create_provider_barebones(self, ie, pot_request, logger):
|
||||
class BarebonesProviderPCSP(PoTokenCacheSpecProvider):
|
||||
def generate_cache_spec(self, request: PoTokenRequest):
|
||||
return PoTokenCacheSpec(
|
||||
default_ttl=100,
|
||||
key_bindings={},
|
||||
)
|
||||
|
||||
provider = BarebonesProviderPCSP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_KEY == 'BarebonesProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.0'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at (developer has not provided a bug report location) .'
|
||||
assert provider.is_available()
|
||||
assert provider.generate_cache_spec(request=pot_request).default_ttl == 100
|
||||
assert provider.generate_cache_spec(request=pot_request).key_bindings == {}
|
||||
assert provider.generate_cache_spec(request=pot_request).write_policy == CacheProviderWritePolicy.WRITE_ALL
|
||||
|
||||
def test_create_provider_example(self, ie, pot_request, logger):
|
||||
provider = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
|
||||
assert provider.PROVIDER_NAME == 'example'
|
||||
assert provider.PROVIDER_KEY == 'ExampleCacheSpecProvider'
|
||||
assert provider.PROVIDER_VERSION == '0.0.1'
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at https://example.com/issues .'
|
||||
assert provider.is_available()
|
||||
assert provider.generate_cache_spec(pot_request)
|
||||
assert provider.generate_cache_spec(pot_request).key_bindings == {'field': 'example-key'}
|
||||
assert provider.generate_cache_spec(pot_request).default_ttl == 60
|
||||
assert provider.generate_cache_spec(pot_request).write_policy == CacheProviderWritePolicy.WRITE_FIRST
|
||||
|
||||
def test_get_config_arg(self, ie, logger):
|
||||
provider = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={'abc': ['123D'], 'xyz': ['456a', '789B']})
|
||||
|
||||
assert provider._configuration_arg('abc') == ['123d']
|
||||
assert provider._configuration_arg('abc', default=['default']) == ['123d']
|
||||
assert provider._configuration_arg('ABC', default=['default']) == ['default']
|
||||
assert provider._configuration_arg('abc', casesense=True) == ['123D']
|
||||
assert provider._configuration_arg('xyz', casesense=False) == ['456a', '789b']
|
||||
|
||||
def test_require_class_end_with_suffix(self, ie, logger):
|
||||
class InvalidSuffix(PoTokenCacheSpecProvider):
|
||||
def generate_cache_spec(self, request: PoTokenRequest):
|
||||
return None
|
||||
|
||||
provider = InvalidSuffix(ie=ie, logger=logger, settings={})
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
provider.PROVIDER_KEY # noqa: B018
|
||||
|
||||
|
||||
class TestPoTokenRequest:
|
||||
def test_copy_request(self, pot_request):
|
||||
copied_request = pot_request.copy()
|
||||
|
||||
assert copied_request is not pot_request
|
||||
assert copied_request.context == pot_request.context
|
||||
assert copied_request.innertube_context == pot_request.innertube_context
|
||||
assert copied_request.innertube_context is not pot_request.innertube_context
|
||||
copied_request.innertube_context['client']['clientName'] = 'ANDROID'
|
||||
assert pot_request.innertube_context['client']['clientName'] != 'ANDROID'
|
||||
assert copied_request.innertube_host == pot_request.innertube_host
|
||||
assert copied_request.session_index == pot_request.session_index
|
||||
assert copied_request.player_url == pot_request.player_url
|
||||
assert copied_request.is_authenticated == pot_request.is_authenticated
|
||||
assert copied_request.visitor_data == pot_request.visitor_data
|
||||
assert copied_request.data_sync_id == pot_request.data_sync_id
|
||||
assert copied_request.video_id == pot_request.video_id
|
||||
assert copied_request.request_cookiejar is pot_request.request_cookiejar
|
||||
assert copied_request.request_proxy == pot_request.request_proxy
|
||||
assert copied_request.request_headers == pot_request.request_headers
|
||||
assert copied_request.request_headers is not pot_request.request_headers
|
||||
assert copied_request.request_timeout == pot_request.request_timeout
|
||||
assert copied_request.request_source_address == pot_request.request_source_address
|
||||
assert copied_request.request_verify_tls == pot_request.request_verify_tls
|
||||
assert copied_request.bypass_cache == pot_request.bypass_cache
|
||||
|
||||
|
||||
def test_provider_bug_report_message(ie, logger):
|
||||
provider = ExamplePTP(ie=ie, logger=logger, settings={})
|
||||
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at https://example.com/issues .'
|
||||
|
||||
message = provider_bug_report_message(provider)
|
||||
assert message == '; please report this issue to the provider developer at https://example.com/issues .'
|
||||
|
||||
message_before = provider_bug_report_message(provider, before='custom message!')
|
||||
assert message_before == 'custom message! Please report this issue to the provider developer at https://example.com/issues .'
|
||||
|
||||
|
||||
def test_register_provider(ie):
|
||||
|
||||
@register_provider
|
||||
class UnavailableProviderPTP(PoTokenProvider):
|
||||
def is_available(self) -> bool:
|
||||
return False
|
||||
|
||||
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
|
||||
raise PoTokenProviderRejectedRequest('Not implemented')
|
||||
|
||||
assert _pot_providers.value.get('UnavailableProvider') == UnavailableProviderPTP
|
||||
_pot_providers.value.pop('UnavailableProvider')
|
||||
|
||||
|
||||
def test_register_pot_preference(ie):
|
||||
before = len(_ptp_preferences.value)
|
||||
|
||||
@register_preference(ExamplePTP)
|
||||
def unavailable_preference(provider: PoTokenProvider, request: PoTokenRequest):
|
||||
return 1
|
||||
|
||||
assert len(_ptp_preferences.value) == before + 1
|
||||
|
||||
|
||||
def test_register_cache_provider(ie):
|
||||
|
||||
@cache.register_provider
|
||||
class UnavailableCacheProviderPCP(PoTokenCacheProvider):
|
||||
def is_available(self) -> bool:
|
||||
return False
|
||||
|
||||
def get(self, key: str):
|
||||
return 'example-cache'
|
||||
|
||||
def store(self, key: str, value: str, expires_at: int):
|
||||
pass
|
||||
|
||||
def delete(self, key: str):
|
||||
pass
|
||||
|
||||
assert _pot_cache_providers.value.get('UnavailableCacheProvider') == UnavailableCacheProviderPCP
|
||||
_pot_cache_providers.value.pop('UnavailableCacheProvider')
|
||||
|
||||
|
||||
def test_register_cache_provider_spec(ie):
|
||||
|
||||
@cache.register_spec
|
||||
class UnavailableCacheProviderPCSP(PoTokenCacheSpecProvider):
|
||||
def is_available(self) -> bool:
|
||||
return False
|
||||
|
||||
def generate_cache_spec(self, request: PoTokenRequest):
|
||||
return None
|
||||
|
||||
assert _pot_pcs_providers.value.get('UnavailableCacheProvider') == UnavailableCacheProviderPCSP
|
||||
_pot_pcs_providers.value.pop('UnavailableCacheProvider')
|
||||
|
||||
|
||||
def test_register_cache_provider_preference(ie):
|
||||
before = len(_pot_cache_provider_preferences.value)
|
||||
|
||||
@cache.register_preference(ExampleCacheProviderPCP)
|
||||
def unavailable_preference(provider: PoTokenCacheProvider, request: PoTokenRequest):
|
||||
return 1
|
||||
|
||||
assert len(_pot_cache_provider_preferences.value) == before + 1
|
||||
|
||||
|
||||
def test_logger_log_level(logger):
|
||||
assert logger.LogLevel('INFO') == logger.LogLevel.INFO
|
||||
assert logger.LogLevel('debuG') == logger.LogLevel.DEBUG
|
||||
assert logger.LogLevel(10) == logger.LogLevel.DEBUG
|
||||
assert logger.LogLevel('UNKNOWN') == logger.LogLevel.INFO
|
Binary file not shown.
Before Width: | Height: | Size: 3.8 KiB |
@ -1,33 +0,0 @@
|
||||
from .brightcove import BrightcoveNewBaseIE
|
||||
from ..utils import extract_attributes
|
||||
|
||||
|
||||
class BandaiChannelIE(BrightcoveNewBaseIE):
|
||||
IE_NAME = 'bandaichannel'
|
||||
_VALID_URL = r'https?://(?:www\.)?b-ch\.com/titles/(?P<id>\d+/\d+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://www.b-ch.com/titles/514/001',
|
||||
'md5': 'a0f2d787baa5729bed71108257f613a4',
|
||||
'info_dict': {
|
||||
'id': '6128044564001',
|
||||
'ext': 'mp4',
|
||||
'title': 'メタルファイターMIKU 第1話',
|
||||
'timestamp': 1580354056,
|
||||
'uploader_id': '5797077852001',
|
||||
'upload_date': '20200130',
|
||||
'duration': 1387.733,
|
||||
},
|
||||
'params': {
|
||||
'skip_download': True,
|
||||
},
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
video_id = self._match_id(url)
|
||||
webpage = self._download_webpage(url, video_id)
|
||||
attrs = extract_attributes(self._search_regex(
|
||||
r'(<video-js[^>]+\bid="bcplayer"[^>]*>)', webpage, 'player'))
|
||||
bc = self._download_json(
|
||||
'https://pbifcd.b-ch.com/v1/playbackinfo/ST/70/' + attrs['data-info'],
|
||||
video_id, headers={'X-API-KEY': attrs['data-auth'].strip()})['bc']
|
||||
return self._parse_brightcove_metadata(bc, bc['id'])
|
@ -1,91 +0,0 @@
|
||||
from .common import InfoExtractor
|
||||
|
||||
|
||||
class BellMediaIE(InfoExtractor):
|
||||
_VALID_URL = r'''(?x)https?://(?:www\.)?
|
||||
(?P<domain>
|
||||
(?:
|
||||
ctv|
|
||||
tsn|
|
||||
bnn(?:bloomberg)?|
|
||||
thecomedynetwork|
|
||||
discovery|
|
||||
discoveryvelocity|
|
||||
sciencechannel|
|
||||
investigationdiscovery|
|
||||
animalplanet|
|
||||
bravo|
|
||||
mtv|
|
||||
space|
|
||||
etalk|
|
||||
marilyn
|
||||
)\.ca|
|
||||
(?:much|cp24)\.com
|
||||
)/.*?(?:\b(?:vid(?:eoid)?|clipId)=|-vid|~|%7E|/(?:episode)?)(?P<id>[0-9]{6,})'''
|
||||
_TESTS = [{
|
||||
'url': 'https://www.bnnbloomberg.ca/video/david-cockfield-s-top-picks~1403070',
|
||||
'md5': '3e5b8e38370741d5089da79161646635',
|
||||
'info_dict': {
|
||||
'id': '1403070',
|
||||
'ext': 'flv',
|
||||
'title': 'David Cockfield\'s Top Picks',
|
||||
'description': 'md5:810f7f8c6a83ad5b48677c3f8e5bb2c3',
|
||||
'upload_date': '20180525',
|
||||
'timestamp': 1527288600,
|
||||
'season_id': '73997',
|
||||
'season': '2018',
|
||||
'thumbnail': 'http://images2.9c9media.com/image_asset/2018_5_25_baf30cbd-b28d-4a18-9903-4bb8713b00f5_PNG_956x536.jpg',
|
||||
'tags': [],
|
||||
'categories': ['ETFs'],
|
||||
'season_number': 8,
|
||||
'duration': 272.038,
|
||||
'series': 'Market Call Tonight',
|
||||
},
|
||||
}, {
|
||||
'url': 'http://www.thecomedynetwork.ca/video/player?vid=923582',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'http://www.tsn.ca/video/expectations-high-for-milos-raonic-at-us-open~939549',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'http://www.bnn.ca/video/berman-s-call-part-two-viewer-questions~939654',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'http://www.ctv.ca/YourMorning/Video/S1E6-Monday-August-29-2016-vid938009',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'http://www.much.com/shows/atmidnight/episode948007/tuesday-september-13-2016',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'http://www.much.com/shows/the-almost-impossible-gameshow/928979/episode-6',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'http://www.ctv.ca/DCs-Legends-of-Tomorrow/Video/S2E11-Turncoat-vid1051430',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'http://www.etalk.ca/video?videoid=663455',
|
||||
'only_matching': True,
|
||||
}, {
|
||||
'url': 'https://www.cp24.com/video?clipId=1982548',
|
||||
'only_matching': True,
|
||||
}]
|
||||
_DOMAINS = {
|
||||
'thecomedynetwork': 'comedy',
|
||||
'discoveryvelocity': 'discvel',
|
||||
'sciencechannel': 'discsci',
|
||||
'investigationdiscovery': 'invdisc',
|
||||
'animalplanet': 'aniplan',
|
||||
'etalk': 'ctv',
|
||||
'bnnbloomberg': 'bnn',
|
||||
'marilyn': 'ctv_marilyn',
|
||||
}
|
||||
|
||||
def _real_extract(self, url):
|
||||
domain, video_id = self._match_valid_url(url).groups()
|
||||
domain = domain.split('.')[0]
|
||||
return {
|
||||
'_type': 'url_transparent',
|
||||
'id': video_id,
|
||||
'url': f'9c9media:{self._DOMAINS.get(domain, domain)}_web:{video_id}',
|
||||
'ie_key': 'NineCNineMedia',
|
||||
}
|
@ -1,188 +0,0 @@
|
||||
from .adobepass import AdobePassIE
|
||||
from ..networking import HEADRequest
|
||||
from ..utils import (
|
||||
extract_attributes,
|
||||
float_or_none,
|
||||
get_element_html_by_class,
|
||||
int_or_none,
|
||||
merge_dicts,
|
||||
parse_age_limit,
|
||||
remove_end,
|
||||
str_or_none,
|
||||
traverse_obj,
|
||||
unescapeHTML,
|
||||
unified_timestamp,
|
||||
update_url_query,
|
||||
url_or_none,
|
||||
)
|
||||
|
||||
|
||||
class BravoTVIE(AdobePassIE):
|
||||
_VALID_URL = r'https?://(?:www\.)?(?P<site>bravotv|oxygen)\.com/(?:[^/]+/)+(?P<id>[^/?#]+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://www.bravotv.com/top-chef/season-16/episode-15/videos/the-top-chef-season-16-winner-is',
|
||||
'info_dict': {
|
||||
'id': '3923059',
|
||||
'ext': 'mp4',
|
||||
'title': 'The Top Chef Season 16 Winner Is...',
|
||||
'description': 'Find out who takes the title of Top Chef!',
|
||||
'upload_date': '20190314',
|
||||
'timestamp': 1552591860,
|
||||
'season_number': 16,
|
||||
'episode_number': 15,
|
||||
'series': 'Top Chef',
|
||||
'episode': 'The Top Chef Season 16 Winner Is...',
|
||||
'duration': 190.357,
|
||||
'season': 'Season 16',
|
||||
'thumbnail': r're:^https://.+\.jpg',
|
||||
},
|
||||
'params': {'skip_download': 'm3u8'},
|
||||
}, {
|
||||
'url': 'https://www.bravotv.com/top-chef/season-20/episode-1/london-calling',
|
||||
'info_dict': {
|
||||
'id': '9000234570',
|
||||
'ext': 'mp4',
|
||||
'title': 'London Calling',
|
||||
'description': 'md5:5af95a8cbac1856bd10e7562f86bb759',
|
||||
'upload_date': '20230310',
|
||||
'timestamp': 1678410000,
|
||||
'season_number': 20,
|
||||
'episode_number': 1,
|
||||
'series': 'Top Chef',
|
||||
'episode': 'London Calling',
|
||||
'duration': 3266.03,
|
||||
'season': 'Season 20',
|
||||
'chapters': 'count:7',
|
||||
'thumbnail': r're:^https://.+\.jpg',
|
||||
'age_limit': 14,
|
||||
},
|
||||
'params': {'skip_download': 'm3u8'},
|
||||
'skip': 'This video requires AdobePass MSO credentials',
|
||||
}, {
|
||||
'url': 'https://www.oxygen.com/in-ice-cold-blood/season-1/closing-night',
|
||||
'info_dict': {
|
||||
'id': '3692045',
|
||||
'ext': 'mp4',
|
||||
'title': 'Closing Night',
|
||||
'description': 'md5:3170065c5c2f19548d72a4cbc254af63',
|
||||
'upload_date': '20180401',
|
||||
'timestamp': 1522623600,
|
||||
'season_number': 1,
|
||||
'episode_number': 1,
|
||||
'series': 'In Ice Cold Blood',
|
||||
'episode': 'Closing Night',
|
||||
'duration': 2629.051,
|
||||
'season': 'Season 1',
|
||||
'chapters': 'count:6',
|
||||
'thumbnail': r're:^https://.+\.jpg',
|
||||
'age_limit': 14,
|
||||
},
|
||||
'params': {'skip_download': 'm3u8'},
|
||||
'skip': 'This video requires AdobePass MSO credentials',
|
||||
}, {
|
||||
'url': 'https://www.oxygen.com/in-ice-cold-blood/season-2/episode-16/videos/handling-the-horwitz-house-after-the-murder-season-2',
|
||||
'info_dict': {
|
||||
'id': '3974019',
|
||||
'ext': 'mp4',
|
||||
'title': '\'Handling The Horwitz House After The Murder (Season 2, Episode 16)',
|
||||
'description': 'md5:f9d638dd6946a1c1c0533a9c6100eae5',
|
||||
'upload_date': '20190617',
|
||||
'timestamp': 1560790800,
|
||||
'season_number': 2,
|
||||
'episode_number': 16,
|
||||
'series': 'In Ice Cold Blood',
|
||||
'episode': '\'Handling The Horwitz House After The Murder (Season 2, Episode 16)',
|
||||
'duration': 68.235,
|
||||
'season': 'Season 2',
|
||||
'thumbnail': r're:^https://.+\.jpg',
|
||||
'age_limit': 14,
|
||||
},
|
||||
'params': {'skip_download': 'm3u8'},
|
||||
}, {
|
||||
'url': 'https://www.bravotv.com/below-deck/season-3/ep-14-reunion-part-1',
|
||||
'only_matching': True,
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
site, display_id = self._match_valid_url(url).group('site', 'id')
|
||||
webpage = self._download_webpage(url, display_id)
|
||||
settings = self._search_json(
|
||||
r'<script[^>]+data-drupal-selector="drupal-settings-json"[^>]*>', webpage, 'settings', display_id)
|
||||
tve = extract_attributes(get_element_html_by_class('tve-video-deck-app', webpage) or '')
|
||||
query = {
|
||||
'manifest': 'm3u',
|
||||
'formats': 'm3u,mpeg4',
|
||||
}
|
||||
|
||||
if tve:
|
||||
account_pid = tve.get('data-mpx-media-account-pid') or 'HNK2IC'
|
||||
account_id = tve['data-mpx-media-account-id']
|
||||
metadata = self._parse_json(
|
||||
tve.get('data-normalized-video', ''), display_id, fatal=False, transform_source=unescapeHTML)
|
||||
video_id = tve.get('data-guid') or metadata['guid']
|
||||
if tve.get('data-entitlement') == 'auth':
|
||||
auth = traverse_obj(settings, ('tve_adobe_auth', {dict})) or {}
|
||||
site = remove_end(site, 'tv')
|
||||
release_pid = tve['data-release-pid']
|
||||
resource = self._get_mvpd_resource(
|
||||
tve.get('data-adobe-pass-resource-id') or auth.get('adobePassResourceId') or site,
|
||||
tve['data-title'], release_pid, tve.get('data-rating'))
|
||||
query.update({
|
||||
'switch': 'HLSServiceSecure',
|
||||
'auth': self._extract_mvpd_auth(
|
||||
url, release_pid, auth.get('adobePassRequestorId') or site, resource),
|
||||
})
|
||||
|
||||
else:
|
||||
ls_playlist = traverse_obj(settings, ('ls_playlist', ..., {dict}), get_all=False) or {}
|
||||
account_pid = ls_playlist.get('mpxMediaAccountPid') or 'PHSl-B'
|
||||
account_id = ls_playlist['mpxMediaAccountId']
|
||||
video_id = ls_playlist['defaultGuid']
|
||||
metadata = traverse_obj(
|
||||
ls_playlist, ('videos', lambda _, v: v['guid'] == video_id, {dict}), get_all=False)
|
||||
|
||||
tp_url = f'https://link.theplatform.com/s/{account_pid}/media/guid/{account_id}/{video_id}'
|
||||
tp_metadata = self._download_json(
|
||||
update_url_query(tp_url, {'format': 'preview'}), video_id, fatal=False)
|
||||
|
||||
chapters = traverse_obj(tp_metadata, ('chapters', ..., {
|
||||
'start_time': ('startTime', {float_or_none(scale=1000)}),
|
||||
'end_time': ('endTime', {float_or_none(scale=1000)}),
|
||||
}))
|
||||
# prune pointless single chapters that span the entire duration from short videos
|
||||
if len(chapters) == 1 and not traverse_obj(chapters, (0, 'end_time')):
|
||||
chapters = None
|
||||
|
||||
m3u8_url = self._request_webpage(HEADRequest(
|
||||
update_url_query(f'{tp_url}/stream.m3u8', query)), video_id, 'Checking m3u8 URL').url
|
||||
if 'mpeg_cenc' in m3u8_url:
|
||||
self.report_drm(video_id)
|
||||
formats, subtitles = self._extract_m3u8_formats_and_subtitles(m3u8_url, video_id, 'mp4', m3u8_id='hls')
|
||||
|
||||
return {
|
||||
'id': video_id,
|
||||
'formats': formats,
|
||||
'subtitles': subtitles,
|
||||
'chapters': chapters,
|
||||
**merge_dicts(traverse_obj(tp_metadata, {
|
||||
'title': 'title',
|
||||
'description': 'description',
|
||||
'duration': ('duration', {float_or_none(scale=1000)}),
|
||||
'timestamp': ('pubDate', {float_or_none(scale=1000)}),
|
||||
'season_number': (('pl1$seasonNumber', 'nbcu$seasonNumber'), {int_or_none}),
|
||||
'episode_number': (('pl1$episodeNumber', 'nbcu$episodeNumber'), {int_or_none}),
|
||||
'series': (('pl1$show', 'nbcu$show'), (None, ...), {str}),
|
||||
'episode': (('title', 'pl1$episodeNumber', 'nbcu$episodeNumber'), {str_or_none}),
|
||||
'age_limit': ('ratings', ..., 'rating', {parse_age_limit}),
|
||||
}, get_all=False), traverse_obj(metadata, {
|
||||
'title': 'title',
|
||||
'description': 'description',
|
||||
'duration': ('durationInSeconds', {int_or_none}),
|
||||
'timestamp': ('airDate', {unified_timestamp}),
|
||||
'thumbnail': ('thumbnailUrl', {url_or_none}),
|
||||
'season_number': ('seasonNumber', {int_or_none}),
|
||||
'episode_number': ('episodeNumber', {int_or_none}),
|
||||
'episode': 'episodeTitle',
|
||||
'series': 'show',
|
||||
})),
|
||||
}
|
@ -1,59 +0,0 @@
|
||||
from .turner import TurnerBaseIE
|
||||
from ..utils import int_or_none
|
||||
|
||||
|
||||
class CartoonNetworkIE(TurnerBaseIE):
|
||||
_VALID_URL = r'https?://(?:www\.)?cartoonnetwork\.com/video/(?:[^/]+/)+(?P<id>[^/?#]+)-(?:clip|episode)\.html'
|
||||
_TEST = {
|
||||
'url': 'https://www.cartoonnetwork.com/video/ben-10/how-to-draw-upgrade-episode.html',
|
||||
'info_dict': {
|
||||
'id': '6e3375097f63874ebccec7ef677c1c3845fa850e',
|
||||
'ext': 'mp4',
|
||||
'title': 'How to Draw Upgrade',
|
||||
'description': 'md5:2061d83776db7e8be4879684eefe8c0f',
|
||||
},
|
||||
'params': {
|
||||
# m3u8 download
|
||||
'skip_download': True,
|
||||
},
|
||||
}
|
||||
|
||||
def _real_extract(self, url):
|
||||
display_id = self._match_id(url)
|
||||
webpage = self._download_webpage(url, display_id)
|
||||
|
||||
def find_field(global_re, name, content_re=None, value_re='[^"]+', fatal=False):
|
||||
metadata_re = ''
|
||||
if content_re:
|
||||
metadata_re = r'|video_metadata\.content_' + content_re
|
||||
return self._search_regex(
|
||||
rf'(?:_cnglobal\.currentVideo\.{global_re}{metadata_re})\s*=\s*"({value_re})";',
|
||||
webpage, name, fatal=fatal)
|
||||
|
||||
media_id = find_field('mediaId', 'media id', 'id', '[0-9a-f]{40}', True)
|
||||
title = find_field('episodeTitle', 'title', '(?:episodeName|name)', fatal=True)
|
||||
|
||||
info = self._extract_ngtv_info(
|
||||
media_id, {'networkId': 'cartoonnetwork'}, {
|
||||
'url': url,
|
||||
'site_name': 'CartoonNetwork',
|
||||
'auth_required': find_field('authType', 'auth type') != 'unauth',
|
||||
})
|
||||
|
||||
series = find_field(
|
||||
'propertyName', 'series', 'showName') or self._html_search_meta('partOfSeries', webpage)
|
||||
info.update({
|
||||
'id': media_id,
|
||||
'display_id': display_id,
|
||||
'title': title,
|
||||
'description': self._html_search_meta('description', webpage),
|
||||
'series': series,
|
||||
'episode': title,
|
||||
})
|
||||
|
||||
for field in ('season', 'episode'):
|
||||
field_name = field + 'Number'
|
||||
info[field + '_number'] = int_or_none(find_field(
|
||||
field_name, field + ' number', value_re=r'\d+') or self._html_search_meta(field_name, webpage))
|
||||
|
||||
return info
|
@ -1,49 +0,0 @@
|
||||
from .common import InfoExtractor
|
||||
|
||||
|
||||
class CTVIE(InfoExtractor):
|
||||
_VALID_URL = r'https?://(?:www\.)?ctv\.ca/(?P<id>(?:show|movie)s/[^/]+/[^/?#&]+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://www.ctv.ca/shows/your-morning/wednesday-december-23-2020-s5e88',
|
||||
'info_dict': {
|
||||
'id': '2102249',
|
||||
'ext': 'flv',
|
||||
'title': 'Wednesday, December 23, 2020',
|
||||
'thumbnail': r're:^https?://.*\.jpg$',
|
||||
'description': 'Your Morning delivers original perspectives and unique insights into the headlines of the day.',
|
||||
'timestamp': 1608732000,
|
||||
'upload_date': '20201223',
|
||||
'series': 'Your Morning',
|
||||
'season': '2020-2021',
|
||||
'season_number': 5,
|
||||
'episode_number': 88,
|
||||
'tags': ['Your Morning'],
|
||||
'categories': ['Talk Show'],
|
||||
'duration': 7467.126,
|
||||
},
|
||||
}, {
|
||||
'url': 'https://www.ctv.ca/movies/adam-sandlers-eight-crazy-nights/adam-sandlers-eight-crazy-nights',
|
||||
'only_matching': True,
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
display_id = self._match_id(url)
|
||||
content = self._download_json(
|
||||
'https://www.ctv.ca/space-graphql/graphql', display_id, query={
|
||||
'query': '''{
|
||||
resolvedPath(path: "/%s") {
|
||||
lastSegment {
|
||||
content {
|
||||
... on AxisContent {
|
||||
axisId
|
||||
videoPlayerDestCode
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}''' % display_id, # noqa: UP031
|
||||
})['data']['resolvedPath']['lastSegment']['content']
|
||||
video_id = content['axisId']
|
||||
return self.url_result(
|
||||
'9c9media:{}:{}'.format(content['videoPlayerDestCode'], video_id),
|
||||
'NineCNineMedia', video_id)
|
@ -1,215 +0,0 @@
|
||||
import functools
|
||||
import re
|
||||
|
||||
from .common import InfoExtractor
|
||||
from ..networking.exceptions import HTTPError
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
int_or_none,
|
||||
smuggle_url,
|
||||
unsmuggle_url,
|
||||
url_or_none,
|
||||
)
|
||||
|
||||
|
||||
class EaglePlatformIE(InfoExtractor):
|
||||
_VALID_URL = r'''(?x)
|
||||
(?:
|
||||
eagleplatform:(?P<custom_host>[^/]+):|
|
||||
https?://(?P<host>.+?\.media\.eagleplatform\.com)/index/player\?.*\brecord_id=
|
||||
)
|
||||
(?P<id>\d+)
|
||||
'''
|
||||
_EMBED_REGEX = [r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//.+?\.media\.eagleplatform\.com/index/player\?.+?)\1']
|
||||
_TESTS = [{
|
||||
# http://lenta.ru/news/2015/03/06/navalny/
|
||||
'url': 'http://lentaru.media.eagleplatform.com/index/player?player=new&record_id=227304&player_template_id=5201',
|
||||
# Not checking MD5 as sometimes the direct HTTP link results in 404 and HLS is used
|
||||
'info_dict': {
|
||||
'id': '227304',
|
||||
'ext': 'mp4',
|
||||
'title': 'Навальный вышел на свободу',
|
||||
'description': 'md5:d97861ac9ae77377f3f20eaf9d04b4f5',
|
||||
'thumbnail': r're:^https?://.*\.jpg$',
|
||||
'duration': 87,
|
||||
'view_count': int,
|
||||
'age_limit': 0,
|
||||
},
|
||||
}, {
|
||||
# http://muz-tv.ru/play/7129/
|
||||
# http://media.clipyou.ru/index/player?record_id=12820&width=730&height=415&autoplay=true
|
||||
'url': 'eagleplatform:media.clipyou.ru:12820',
|
||||
'md5': '358597369cf8ba56675c1df15e7af624',
|
||||
'info_dict': {
|
||||
'id': '12820',
|
||||
'ext': 'mp4',
|
||||
'title': "'O Sole Mio",
|
||||
'thumbnail': r're:^https?://.*\.jpg$',
|
||||
'duration': 216,
|
||||
'view_count': int,
|
||||
},
|
||||
'skip': 'Georestricted',
|
||||
}, {
|
||||
# referrer protected video (https://tvrain.ru/lite/teleshow/kak_vse_nachinalos/namin-418921/)
|
||||
'url': 'eagleplatform:tvrainru.media.eagleplatform.com:582306',
|
||||
'only_matching': True,
|
||||
}]
|
||||
|
||||
@classmethod
|
||||
def _extract_embed_urls(cls, url, webpage):
|
||||
add_referer = functools.partial(smuggle_url, data={'referrer': url})
|
||||
|
||||
res = tuple(super()._extract_embed_urls(url, webpage))
|
||||
if res:
|
||||
return map(add_referer, res)
|
||||
|
||||
PLAYER_JS_RE = r'''
|
||||
<script[^>]+
|
||||
src=(?P<qjs>["\'])(?:https?:)?//(?P<host>(?:(?!(?P=qjs)).)+\.media\.eagleplatform\.com)/player/player\.js(?P=qjs)
|
||||
.+?
|
||||
'''
|
||||
# "Basic usage" embedding (see http://dultonmedia.github.io/eplayer/)
|
||||
mobj = re.search(
|
||||
rf'''(?xs)
|
||||
{PLAYER_JS_RE}
|
||||
<div[^>]+
|
||||
class=(?P<qclass>["\'])eagleplayer(?P=qclass)[^>]+
|
||||
data-id=["\'](?P<id>\d+)
|
||||
''', webpage)
|
||||
if mobj is not None:
|
||||
return [add_referer('eagleplatform:{host}:{id}'.format(**mobj.groupdict()))]
|
||||
# Generalization of "Javascript code usage", "Combined usage" and
|
||||
# "Usage without attaching to DOM" embeddings (see
|
||||
# http://dultonmedia.github.io/eplayer/)
|
||||
mobj = re.search(
|
||||
r'''(?xs)
|
||||
%s
|
||||
<script>
|
||||
.+?
|
||||
new\s+EaglePlayer\(
|
||||
(?:[^,]+\s*,\s*)?
|
||||
{
|
||||
.+?
|
||||
\bid\s*:\s*["\']?(?P<id>\d+)
|
||||
.+?
|
||||
}
|
||||
\s*\)
|
||||
.+?
|
||||
</script>
|
||||
''' % PLAYER_JS_RE, webpage) # noqa: UP031
|
||||
if mobj is not None:
|
||||
return [add_referer('eagleplatform:{host}:{id}'.format(**mobj.groupdict()))]
|
||||
|
||||
@staticmethod
|
||||
def _handle_error(response):
|
||||
status = int_or_none(response.get('status', 200))
|
||||
if status != 200:
|
||||
raise ExtractorError(' '.join(response['errors']), expected=True)
|
||||
|
||||
def _download_json(self, url_or_request, video_id, *args, **kwargs):
|
||||
try:
|
||||
response = super()._download_json(
|
||||
url_or_request, video_id, *args, **kwargs)
|
||||
except ExtractorError as ee:
|
||||
if isinstance(ee.cause, HTTPError):
|
||||
response = self._parse_json(ee.cause.response.read().decode('utf-8'), video_id)
|
||||
self._handle_error(response)
|
||||
raise
|
||||
return response
|
||||
|
||||
def _get_video_url(self, url_or_request, video_id, note='Downloading JSON metadata'):
|
||||
return self._download_json(url_or_request, video_id, note)['data'][0]
|
||||
|
||||
def _real_extract(self, url):
|
||||
url, smuggled_data = unsmuggle_url(url, {})
|
||||
|
||||
mobj = self._match_valid_url(url)
|
||||
host, video_id = mobj.group('custom_host') or mobj.group('host'), mobj.group('id')
|
||||
|
||||
headers = {}
|
||||
query = {
|
||||
'id': video_id,
|
||||
}
|
||||
|
||||
referrer = smuggled_data.get('referrer')
|
||||
if referrer:
|
||||
headers['Referer'] = referrer
|
||||
query['referrer'] = referrer
|
||||
|
||||
player_data = self._download_json(
|
||||
f'http://{host}/api/player_data', video_id,
|
||||
headers=headers, query=query)
|
||||
|
||||
media = player_data['data']['playlist']['viewports'][0]['medialist'][0]
|
||||
|
||||
title = media['title']
|
||||
description = media.get('description')
|
||||
thumbnail = self._proto_relative_url(media.get('snapshot'), 'http:')
|
||||
duration = int_or_none(media.get('duration'))
|
||||
view_count = int_or_none(media.get('views'))
|
||||
|
||||
age_restriction = media.get('age_restriction')
|
||||
age_limit = None
|
||||
if age_restriction:
|
||||
age_limit = 0 if age_restriction == 'allow_all' else 18
|
||||
|
||||
secure_m3u8 = self._proto_relative_url(media['sources']['secure_m3u8']['auto'], 'http:')
|
||||
|
||||
formats = []
|
||||
|
||||
m3u8_url = self._get_video_url(secure_m3u8, video_id, 'Downloading m3u8 JSON')
|
||||
m3u8_formats = self._extract_m3u8_formats(
|
||||
m3u8_url, video_id, 'mp4', entry_protocol='m3u8_native',
|
||||
m3u8_id='hls', fatal=False)
|
||||
formats.extend(m3u8_formats)
|
||||
|
||||
m3u8_formats_dict = {}
|
||||
for f in m3u8_formats:
|
||||
if f.get('height') is not None:
|
||||
m3u8_formats_dict[f['height']] = f
|
||||
|
||||
mp4_data = self._download_json(
|
||||
# Secure mp4 URL is constructed according to Player.prototype.mp4 from
|
||||
# http://lentaru.media.eagleplatform.com/player/player.js
|
||||
re.sub(r'm3u8|hlsvod|hls|f4m', 'mp4s', secure_m3u8),
|
||||
video_id, 'Downloading mp4 JSON', fatal=False)
|
||||
if mp4_data:
|
||||
for format_id, format_url in mp4_data.get('data', {}).items():
|
||||
if not url_or_none(format_url):
|
||||
continue
|
||||
height = int_or_none(format_id)
|
||||
if height is not None and m3u8_formats_dict.get(height):
|
||||
f = m3u8_formats_dict[height].copy()
|
||||
f.update({
|
||||
'format_id': f['format_id'].replace('hls', 'http'),
|
||||
'protocol': 'http',
|
||||
})
|
||||
else:
|
||||
f = {
|
||||
'format_id': f'http-{format_id}',
|
||||
'height': int_or_none(format_id),
|
||||
}
|
||||
f['url'] = format_url
|
||||
formats.append(f)
|
||||
|
||||
return {
|
||||
'id': video_id,
|
||||
'title': title,
|
||||
'description': description,
|
||||
'thumbnail': thumbnail,
|
||||
'duration': duration,
|
||||
'view_count': view_count,
|
||||
'age_limit': age_limit,
|
||||
'formats': formats,
|
||||
}
|
||||
|
||||
|
||||
class ClipYouEmbedIE(InfoExtractor):
|
||||
_VALID_URL = False
|
||||
|
||||
@classmethod
|
||||
def _extract_embed_urls(cls, url, webpage):
|
||||
mobj = re.search(
|
||||
r'<iframe[^>]+src="https?://(?P<host>media\.clipyou\.ru)/index/player\?.*\brecord_id=(?P<id>\d+).*"', webpage)
|
||||
if mobj is not None:
|
||||
yield smuggle_url('eagleplatform:{host}:{id}'.format(**mobj.groupdict()), {'referrer': url})
|
@ -0,0 +1,105 @@
|
||||
import re
|
||||
import urllib.parse
|
||||
|
||||
from .common import InfoExtractor
|
||||
from ..utils import js_to_json, url_or_none
|
||||
from ..utils.traversal import traverse_obj
|
||||
|
||||
|
||||
class FaulioLiveIE(InfoExtractor):
|
||||
_DOMAINS = (
|
||||
'aloula.sba.sa',
|
||||
'bahry.com',
|
||||
'maraya.sba.net.ae',
|
||||
'sat7plus.org',
|
||||
)
|
||||
_VALID_URL = fr'https?://(?:{"|".join(map(re.escape, _DOMAINS))})/(?:(?:en|ar|fa)/)?live/(?P<id>[a-zA-Z0-9-]+)'
|
||||
_TESTS = [{
|
||||
'url': 'https://aloula.sba.sa/live/saudiatv',
|
||||
'info_dict': {
|
||||
'id': 'aloula.faulio.com_saudiatv',
|
||||
'title': str,
|
||||
'description': str,
|
||||
'ext': 'mp4',
|
||||
'live_status': 'is_live',
|
||||
},
|
||||
'params': {
|
||||
'skip_download': 'Livestream',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://bahry.com/live/1',
|
||||
'info_dict': {
|
||||
'id': 'bahry.faulio.com_1',
|
||||
'title': str,
|
||||
'description': str,
|
||||
'ext': 'mp4',
|
||||
'live_status': 'is_live',
|
||||
},
|
||||
'params': {
|
||||
'skip_download': 'Livestream',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://maraya.sba.net.ae/live/1',
|
||||
'info_dict': {
|
||||
'id': 'maraya.faulio.com_1',
|
||||
'title': str,
|
||||
'description': str,
|
||||
'ext': 'mp4',
|
||||
'live_status': 'is_live',
|
||||
},
|
||||
'params': {
|
||||
'skip_download': 'Livestream',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://sat7plus.org/live/pars',
|
||||
'info_dict': {
|
||||
'id': 'sat7.faulio.com_pars',
|
||||
'title': str,
|
||||
'description': str,
|
||||
'ext': 'mp4',
|
||||
'live_status': 'is_live',
|
||||
},
|
||||
'params': {
|
||||
'skip_download': 'Livestream',
|
||||
},
|
||||
}, {
|
||||
'url': 'https://sat7plus.org/fa/live/arabic',
|
||||
'only_matching': True,
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
video_id = self._match_id(url)
|
||||
webpage = self._download_webpage(url, video_id)
|
||||
|
||||
config_data = self._search_json(
|
||||
r'window\.__NUXT__\.config=', webpage, 'config', video_id, transform_source=js_to_json)
|
||||
api_base = config_data['public']['TRANSLATIONS_API_URL']
|
||||
|
||||
channel = traverse_obj(
|
||||
self._download_json(f'{api_base}/channels', video_id),
|
||||
(lambda k, v: v['url'] == video_id, any))
|
||||
|
||||
formats = []
|
||||
subtitles = {}
|
||||
if hls_url := traverse_obj(channel, ('streams', 'hls', {url_or_none})):
|
||||
fmts, subs = self._extract_m3u8_formats_and_subtitles(
|
||||
hls_url, video_id, 'mp4', m3u8_id='hls', live=True, fatal=False)
|
||||
formats.extend(fmts)
|
||||
self._merge_subtitles(subs, target=subtitles)
|
||||
|
||||
if mpd_url := traverse_obj(channel, ('streams', 'mpd', {url_or_none})):
|
||||
fmts, subs = self._extract_mpd_formats_and_subtitles(
|
||||
mpd_url, video_id, mpd_id='dash', fatal=False)
|
||||
formats.extend(fmts)
|
||||
self._merge_subtitles(subs, target=subtitles)
|
||||
|
||||
return {
|
||||
'id': f'{urllib.parse.urlparse(api_base).hostname}_{video_id}',
|
||||
**traverse_obj(channel, {
|
||||
'title': ('title', {str}),
|
||||
'description': ('description', {str}),
|
||||
}),
|
||||
'formats': formats,
|
||||
'subtitles': subtitles,
|
||||
'is_live': True,
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue