pull/11787/head
c-basalt 2 months ago
parent 6fef824025
commit 29a955d63d

@ -1,5 +1,7 @@
import functools
import json
import re
import urllib.parse
from .common import InfoExtractor
from ..utils import (
@ -12,40 +14,84 @@ from ..utils import (
traverse_obj,
unified_timestamp,
url_or_none,
urljoin,
)
class NiconicoChannelPlusBaseIE(InfoExtractor):
_WEBPAGE_BASE_URL = 'https://nicochannel.jp'
def _call_api(self, path, item_id, **kwargs):
return self._download_json(
f'https://nfc-api.nicochannel.jp/fc/{path}', video_id=item_id, **kwargs)
def _find_fanclub_site_id(self, channel_name):
fanclub_list_json = self._call_api(
'content_providers/channels', item_id=f'channels/{channel_name}',
note='Fetching channel list', errnote='Unable to fetch channel list',
)['data']['content_providers']
fanclub_id = traverse_obj(fanclub_list_json, (
lambda _, v: v['domain'] == f'{self._WEBPAGE_BASE_URL}/{channel_name}', 'id'),
get_all=False)
if not fanclub_id:
raise ExtractorError(f'Channel {channel_name} does not exist', expected=True)
return fanclub_id
def _get_channel_base_info(self, fanclub_site_id):
return traverse_obj(self._call_api(
f'fanclub_sites/{fanclub_site_id}/page_base_info', item_id=f'fanclub_sites/{fanclub_site_id}',
note='Fetching channel base info', errnote='Unable to fetch channel base info', fatal=False,
), ('data', 'fanclub_site', {dict})) or {}
def _get_channel_user_info(self, fanclub_site_id):
return traverse_obj(self._call_api(
f'fanclub_sites/{fanclub_site_id}/user_info', item_id=f'fanclub_sites/{fanclub_site_id}',
note='Fetching channel user info', errnote='Unable to fetch channel user info', fatal=False,
data=json.dumps('null').encode('ascii'),
), ('data', 'fanclub_site', {dict})) or {}
_SITE_SETTINGS = {}
_DOMAIN_SITE_ID = {}
_CHANNEL_NAMES = {}
_CHANNEL_AGE_LIMIT = {}
def _get_settings(self, url, video_id=None):
base_url = urljoin(url, '/')
if base_url not in self._SITE_SETTINGS:
self._SITE_SETTINGS[base_url] = self._download_json(
urljoin(base_url, '/site/settings.json'), video_id, note='Downloading site settings')
if self._SITE_SETTINGS[base_url].get('platform_id') not in ['CHPL', 'SHTA', 'JOQR', 'TKFM']:
self.report_warning(f'Unknown platform type: {self._SITE_SETTINGS[base_url].get("platform_id")}')
return self._SITE_SETTINGS[base_url]
def _download_api_json(self, site_url, path, video_id, headers={}, **kwargs):
path = f'/{path}' if path[0] != '/' else path
settings = self._get_settings(site_url, video_id)
headers = {
'origin': urljoin(site_url, '/').strip('/'),
'referer': urljoin(site_url, '/'),
'fc_site_id': settings['fanclub_site_id'],
'fc_use_device': 'null',
**headers,
}
return self._download_json(f'{settings["api_base_url"]}{path}', video_id, headers=headers, **kwargs)
def _get_fanclub_site_id(self, url):
settings = self._get_settings(url)
if settings['platform_id'] == 'SHTA':
return str(settings['fanclub_site_id'])
else:
parsed = urllib.parse.urlparse(url)
# parsed.path starts with '/', so index 0 is empty string
domain_url = f'{parsed.scheme}://{parsed.netloc}/{parsed.path.split("/")[1].lower()}'
if domain_url not in self._DOMAIN_SITE_ID:
self._DOMAIN_SITE_ID[domain_url] = str(self._download_api_json(
url, '/content_providers/channel_domain', domain_url,
query={'current_site_domain': domain_url})['data']['content_providers']['id'])
return self._DOMAIN_SITE_ID[domain_url]
def _get_channel_id(self, url):
parsed = urllib.parse.urlparse(url)
if self._get_settings(url)['platform_id'] == 'SHTA':
return parsed.hostname.replace('.', '_')
elif self._get_settings(url)['platform_id'] == 'CHPL':
return parsed.path.split('/')[1]
else:
return f'{parsed.hostname.replace(".", "_")}_{parsed.path.split("/")[1]}'
def _get_channel_url(self, url):
parsed = urllib.parse.urlparse(url)
if self._get_settings(url)['platform_id'] == 'SHTA':
return f'{parsed.scheme}://{parsed.netloc}'
else:
return f'{parsed.scheme}://{parsed.netloc}/{parsed.path.split("/")[1]}'
def _get_channel_name(self, url):
fanclub_site_id = self._get_fanclub_site_id(url)
if fanclub_site_id not in self._CHANNEL_NAMES:
self._CHANNEL_NAMES[fanclub_site_id] = traverse_obj(self._download_api_json(
url, f'/fanclub_sites/{fanclub_site_id}/page_base_info', video_id=str(fanclub_site_id),
note='Downloading channel name', fatal=False,
), ('data', 'fanclub_site', 'fanclub_site_name', {str}))
return self._CHANNEL_NAMES[fanclub_site_id]
def _get_age_limit(self, url):
fanclub_site_id = self._get_fanclub_site_id(url)
if fanclub_site_id not in self._CHANNEL_AGE_LIMIT:
self._CHANNEL_AGE_LIMIT[fanclub_site_id] = traverse_obj(self._download_api_json(
url, f'/fanclub_sites/{fanclub_site_id}/user_info', video_id=str(fanclub_site_id), data=b'',
note='Downloading channel age limit', fatal=False,
), ('data', 'fanclub_site', 'content_provider', 'age_limit', {int}))
return self._CHANNEL_AGE_LIMIT[fanclub_site_id]
class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
@ -53,6 +99,25 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
IE_DESC = 'ニコニコチャンネルプラス'
_VALID_URL = r'https?://nicochannel\.jp/(?P<channel>[\w.-]+)/(?:video|live)/(?P<code>sm\w+)'
_TESTS = [{
'url': 'https://nicochannel.jp/renge/video/smjHSEPCxd4ohY4zg8iyGKnX',
'info_dict': {
'id': 'smjHSEPCxd4ohY4zg8iyGKnX',
'title': '【両耳舐め】あまいちゃトロらぶ両耳舐め【本多ぽこちゃんと耳舐めASMR②】',
'ext': 'mp4',
'channel': '狐月れんげのあまとろASMR',
'channel_id': 'renge',
'channel_url': 'https://nicochannel.jp/renge',
'live_status': 'not_live',
'thumbnail': 'https://nicochannel.jp/public_html/contents/video_pages/35690/thumbnail_path?time=1722439868',
'description': 'お耳が癒されて疲れもヌケる♡\n本多ぽこちゃんとの2024年7月24日の耳舐めコラボアーカイブです。',
'timestamp': 1722439866,
'duration': 2698,
'comment_count': int,
'view_count': int,
'tags': list,
'upload_date': '20240731',
},
}, {
'url': 'https://nicochannel.jp/kaorin/video/smsDd8EdFLcVZk9yyAhD6H7H',
'info_dict': {
'id': 'smsDd8EdFLcVZk9yyAhD6H7H',
@ -71,9 +136,7 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
'tags': [],
'upload_date': '20220105',
},
'params': {
'skip_download': True,
},
'skip': 'subscriber only',
}, {
# age limited video; test purpose channel.
'url': 'https://nicochannel.jp/testman/video/smDXbcrtyPNxLx9jc4BW69Ve',
@ -93,49 +156,37 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
'tags': [],
'upload_date': '20221021',
},
'params': {
'skip_download': True,
},
'skip': 'subscriber only',
}]
def _real_extract(self, url):
content_code, channel_id = self._match_valid_url(url).group('code', 'channel')
fanclub_site_id = self._find_fanclub_site_id(channel_id)
data_json = self._call_api(
f'video_pages/{content_code}', item_id=content_code, headers={'fc_use_device': 'null'},
note='Fetching video page info', errnote='Unable to fetch video page info',
)['data']['video_page']
live_status, session_id = self._get_live_status_and_session_id(content_code, data_json)
def _parse_video_id(self, url):
parsed = urllib.parse.urlparse(url)
return re.search(r'/(?:video|live)/(?P<id>\w+)', parsed.path)[1]
release_timestamp_str = data_json.get('live_scheduled_start_at')
def _real_extract(self, url):
video_id = self._parse_video_id(url)
formats = []
video_info = self._download_api_json(url, f'/video_pages/{video_id}', video_id,
note='Downloading video info')['data']['video_page']
if live_status == 'is_upcoming':
if release_timestamp_str:
msg = f'This live event will begin at {release_timestamp_str} UTC'
else:
msg = 'This event has not started yet'
self.raise_no_formats(msg, expected=True, video_id=content_code)
else:
formats = self._extract_m3u8_formats(
# "authenticated_url" is a format string that contains "{session_id}".
m3u8_url=data_json['video_stream']['authenticated_url'].format(session_id=session_id),
video_id=content_code)
live_status, session_payload, timestamp = self._parse_live_status(video_id, video_info)
session_info = self._download_api_json(
url, f'/video_pages/{video_id}/session_ids', video_id, data=json.dumps(session_payload).encode(),
headers={'content-type': 'application/json'}, note='Downloading video session')['data']
formats = self._extract_m3u8_formats(
video_info['video_stream']['authenticated_url'].format(**session_info), video_id)
return {
'id': content_code,
'id': video_id,
'formats': formats,
'_format_sort_fields': ('tbr', 'vcodec', 'acodec'),
'channel': self._get_channel_base_info(fanclub_site_id).get('fanclub_site_name'),
'channel_id': channel_id,
'channel_url': f'{self._WEBPAGE_BASE_URL}/{channel_id}',
'age_limit': traverse_obj(self._get_channel_user_info(fanclub_site_id), ('content_provider', 'age_limit')),
'channel': self._get_channel_name(url),
'channel_id': self._get_channel_id(url),
'channel_url': self._get_channel_url(url),
'age_limit': self._get_age_limit(url),
'live_status': live_status,
'release_timestamp': unified_timestamp(release_timestamp_str),
**traverse_obj(data_json, {
'release_timestamp': timestamp,
**traverse_obj(video_info, {
'title': ('title', {str}),
'thumbnail': ('thumbnail_url', {url_or_none}),
'description': ('description', {str}),
@ -146,23 +197,22 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
'tags': ('video_tags', ..., 'tag', {str}),
}),
'__post_extractor': self.extract_comments(
content_code=content_code,
comment_group_id=traverse_obj(data_json, ('video_comment_setting', 'comment_group_id'))),
url=url,
comment_group_id=traverse_obj(video_info, ('video_comment_setting', 'comment_group_id'))),
}
def _get_comments(self, content_code, comment_group_id):
item_id = f'{content_code}/comments'
def _get_comments(self, url, comment_group_id):
if not comment_group_id:
return None
video_id = self._parse_video_id(url)
comment_access_token = self._call_api(
f'video_pages/{content_code}/comments_user_token', item_id,
comment_access_token = self._download_api_json(
url, f'video_pages/{video_id}/comments_user_token', f'{video_id}/comments',
note='Getting comment token', errnote='Unable to get comment token',
)['data']['access_token']
comment_list = self._download_json(
'https://comm-api.sheeta.com/messages.history', video_id=item_id,
'https://comm-api.sheeta.com/messages.history', video_id=f'{video_id}/comments',
note='Fetching comments', errnote='Unable to fetch comments',
headers={'Content-Type': 'application/json'},
query={
@ -184,9 +234,10 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
'author_is_uploader': ('sender_id', {lambda x: x == '-1'}),
}, get_all=False)
def _get_live_status_and_session_id(self, content_code, data_json):
video_type = data_json.get('type')
live_finished_at = data_json.get('live_finished_at')
def _parse_live_status(self, video_id, video_info):
video_type = video_info.get('type')
live_finished_at = video_info.get('live_finished_at')
release_timestamp_str = video_info.get('live_scheduled_start_at')
payload = {}
if video_type == 'vod':
@ -195,8 +246,13 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
else:
live_status = 'not_live'
elif video_type == 'live':
if not data_json.get('live_started_at'):
return 'is_upcoming', ''
if not video_info.get('live_started_at'):
live_status = 'is_upcoming'
if release_timestamp_str:
msg = f'This live event will begin at {release_timestamp_str} UTC'
else:
msg = 'This event has not started yet'
self.raise_no_formats(msg, expected=True, video_id=video_id)
if not live_finished_at:
live_status = 'is_live'
@ -204,50 +260,39 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
live_status = 'was_live'
payload = {'broadcast_type': 'dvr'}
video_allow_dvr_flg = traverse_obj(data_json, ('video', 'allow_dvr_flg'))
video_convert_to_vod_flg = traverse_obj(data_json, ('video', 'convert_to_vod_flg'))
video_allow_dvr_flg = traverse_obj(video_info, ('video', 'allow_dvr_flg'))
video_convert_to_vod_flg = traverse_obj(video_info, ('video', 'convert_to_vod_flg'))
self.write_debug(f'allow_dvr_flg = {video_allow_dvr_flg}, convert_to_vod_flg = {video_convert_to_vod_flg}.')
if not (video_allow_dvr_flg and video_convert_to_vod_flg):
raise ExtractorError(
'Live was ended, there is no video for download.', video_id=content_code, expected=True)
'Live was ended, there is no video for download.', video_id=video_id, expected=True)
else:
raise ExtractorError(f'Unknown type: {video_type}', video_id=content_code, expected=False)
self.write_debug(f'{content_code}: video_type={video_type}, live_status={live_status}')
raise ExtractorError(f'Unknown type: {video_type}', video_id=video_id, expected=False)
session_id = self._call_api(
f'video_pages/{content_code}/session_ids', item_id=f'{content_code}/session',
data=json.dumps(payload).encode('ascii'), headers={
'Content-Type': 'application/json',
'fc_use_device': 'null',
'origin': 'https://nicochannel.jp',
},
note='Getting session id', errnote='Unable to get session id',
)['data']['session_id']
self.write_debug(f'{video_id}: video_type={video_type}, live_status={live_status}')
return live_status, session_id
return live_status, payload, unified_timestamp(release_timestamp_str)
class NiconicoChannelPlusChannelBaseIE(NiconicoChannelPlusBaseIE):
_PAGE_SIZE = 12
def _fetch_paged_channel_video_list(self, path, query, channel_name, item_id, page):
response = self._call_api(
path, item_id, query={
def _fetch_paged_channel_video_list(self, site_url, path, query, video_id, page):
response = self._download_api_json(
site_url, path, video_id, query={
**query,
'page': (page + 1),
'per_page': self._PAGE_SIZE,
},
headers={'fc_use_device': 'null'},
note=f'Getting channel info (page {page + 1})',
errnote=f'Unable to get channel info (page {page + 1})')
for content_code in traverse_obj(response, ('data', 'video_pages', 'list', ..., 'content_code')):
# "video/{content_code}" works for both VOD and live, but "live/{content_code}" doesn't work for VOD
yield self.url_result(
f'{self._WEBPAGE_BASE_URL}/{channel_name}/video/{content_code}', NiconicoChannelPlusIE)
f'{self._get_channel_url(site_url)}/video/{content_code}', NiconicoChannelPlusIE)
class NiconicoChannelPlusChannelVideosIE(NiconicoChannelPlusChannelBaseIE):
@ -275,7 +320,7 @@ class NiconicoChannelPlusChannelVideosIE(NiconicoChannelPlusChannelBaseIE):
'url': 'https://nicochannel.jp/testjirou/videos',
'info_dict': {
'id': 'testjirou-videos',
'title': 'チャンネルプラステスト二郎-videos',
'title': 'チャンネルプラステスト"二郎21-videos',
},
'playlist_mincount': 12,
}, {
@ -353,23 +398,23 @@ class NiconicoChannelPlusChannelVideosIE(NiconicoChannelPlusChannelBaseIE):
5 アップロード動画 (uploaded videos)
"""
channel_id = self._match_id(url)
fanclub_site_id = self._find_fanclub_site_id(channel_id)
channel_name = self._get_channel_base_info(fanclub_site_id).get('fanclub_site_name')
channel_id = self._get_channel_id(url)
qs = parse_qs(url)
return self.playlist_result(
OnDemandPagedList(
functools.partial(
self._fetch_paged_channel_video_list, f'fanclub_sites/{fanclub_site_id}/video_pages',
self._fetch_paged_channel_video_list,
url,
f'fanclub_sites/{self._get_fanclub_site_id(url)}/video_pages',
filter_dict({
'tag': traverse_obj(qs, ('tag', 0)),
'sort': traverse_obj(qs, ('sort', 0), default='-released_at'),
'vod_type': traverse_obj(qs, ('vodType', 0), default='0'),
}),
channel_id, f'{channel_id}/videos'),
f'{channel_id}/videos'),
self._PAGE_SIZE),
playlist_id=f'{channel_id}-videos', playlist_title=f'{channel_name}-videos')
playlist_id=f'{channel_id}-videos', playlist_title=f'{self._get_channel_name(url)}-videos')
class NiconicoChannelPlusChannelLivesIE(NiconicoChannelPlusChannelBaseIE):
@ -410,17 +455,15 @@ class NiconicoChannelPlusChannelLivesIE(NiconicoChannelPlusChannelBaseIE):
We use "4" instead of "3" because some recently ended live streams could not be downloaded.
"""
channel_id = self._match_id(url)
fanclub_site_id = self._find_fanclub_site_id(channel_id)
channel_name = self._get_channel_base_info(fanclub_site_id).get('fanclub_site_name')
channel_id = self._get_channel_id(url)
return self.playlist_result(
OnDemandPagedList(
functools.partial(
self._fetch_paged_channel_video_list, f'fanclub_sites/{fanclub_site_id}/live_pages',
{
'live_type': 4,
},
channel_id, f'{channel_id}/lives'),
self._fetch_paged_channel_video_list,
url,
f'fanclub_sites/{self._get_fanclub_site_id(url)}/live_pages',
{'live_type': 4},
f'{channel_id}/lives'),
self._PAGE_SIZE),
playlist_id=f'{channel_id}-lives', playlist_title=f'{channel_name}-lives')
playlist_id=f'{channel_id}-lives', playlist_title=f'{self._get_channel_name(url)}-lives')

Loading…
Cancel
Save