[Substack] allow multiple embeds regardless of post type

pull/12660/head
jhwgh1968 4 months ago
parent d49565da20
commit e89132c678

@ -133,7 +133,9 @@ class SubstackIE(InfoExtractor):
canonical_url = urllib.parse.urlparse(url)._replace(netloc=domain).geturl() canonical_url = urllib.parse.urlparse(url)._replace(netloc=domain).geturl()
post_type = webpage_info['post']['type'] post_type = webpage_info['post']['type']
post_title = traverse_obj(webpage_info, ('post', 'title'))
formats, subtitles = [], {} formats, subtitles = [], {}
all_results = []
result_info = { result_info = {
'id': str(webpage_info['post']['id']), 'id': str(webpage_info['post']['id']),
'subtitles': subtitles, 'subtitles': subtitles,
@ -144,7 +146,9 @@ class SubstackIE(InfoExtractor):
'uploader_id': str_or_none(traverse_obj(webpage_info, ('post', 'publication_id'))), 'uploader_id': str_or_none(traverse_obj(webpage_info, ('post', 'publication_id'))),
'webpage_url': canonical_url, 'webpage_url': canonical_url,
} }
# handle specific post types which are based on media
if post_type == 'podcast': if post_type == 'podcast':
podcast_result = dict(result_info)
fmt = {'url': webpage_info['post']['podcast_url']} fmt = {'url': webpage_info['post']['podcast_url']}
if not determine_ext(fmt['url'], default_ext=None): if not determine_ext(fmt['url'], default_ext=None):
# The redirected format URL expires but the original URL doesn't, # The redirected format URL expires but the original URL doesn't,
@ -153,89 +157,87 @@ class SubstackIE(InfoExtractor):
HEADRequest(fmt['url']), display_id, HEADRequest(fmt['url']), display_id,
'Resolving podcast file extension', 'Resolving podcast file extension',
'Podcast URL is invalid').url) 'Podcast URL is invalid').url)
result_info['formats'] = [fmt] podcast_result['formats'] = [fmt]
return result_info all_results.append(podcast_result)
elif post_type == 'video': if post_type == 'video':
video_result = dict(result_info)
formats, subtitles = self._extract_video_formats( formats, subtitles = self._extract_video_formats(
webpage_info['post']['videoUpload']['id'], canonical_url) webpage_info['post']['videoUpload']['id'], canonical_url)
result_info.update({ video_result.update({
'formats': formats, 'formats': formats,
'subtitles': subtitles, 'subtitles': subtitles,
}) })
return result_info all_results.append(video_result)
elif post_type == 'newsletter':
results = [] # search for embedded players on the page
found_items = [] found_items = []
post_id = str(webpage_info['post']['id']) post_id = str(webpage_info['post']['id'])
post_title = traverse_obj(webpage_info, ('post', 'title')) assert result_info['uploader_id'] is not None, 'newsletter posted without user_id'
assert result_info['uploader_id'] is not None, 'newsletter posted without user_id'
video_players = re.finditer(
video_players = re.finditer( r'<div[^>]*data-component-name="VideoEmbedPlayer"[^>]*>',
r'<div[^>]*data-component-name="VideoEmbedPlayer"[^>]*>', webpage)
webpage) for vp in video_players:
for vp in video_players: video_id = self._search_regex(r'id="([^"]+)"',
video_id = self._search_regex(r'id="([^"]+)"', vp.group(0),
vp.group(0), 'video id', group=1).replace('media-', '')
'video id', group=1).replace('media-', '') video_metadata_url = urllib.parse.urljoin(url, f'/api/v1/video/upload/{video_id}')
video_metadata_url = urllib.parse.urljoin(url, f'/api/v1/video/upload/{video_id}') json_vid_data = self._download_json(video_metadata_url, video_id)
json_vid_data = self._download_json(video_metadata_url, video_id) assert video_id == json_vid_data['id'], 'unexpected json metadata retrieved'
assert video_id == json_vid_data['id'], 'unexpected json metadata retrieved' json_vid_data['_type'] = 'video'
json_vid_data['_type'] = 'video' found_items.append(json_vid_data)
found_items.append(json_vid_data)
audio_players = re.finditer(
audio_players = re.finditer( r'<div[^>]*data-component-name="AudioEmbedPlayer"[^>]*>',
r'<div[^>]*data-component-name="AudioEmbedPlayer"[^>]*>', webpage)
webpage) for ap in audio_players:
for ap in audio_players: video_uri = self._search_regex(r'src="(/api/v1/audio/upload/[^"]+)"',
video_uri = self._search_regex(r'src="(/api/v1/audio/upload/[^"]+)"', webpage[ap.start():],
webpage[ap.start():], 'video uri', group=1)
'video uri', group=1) video_metadata_url = urllib.parse.urljoin(url, video_uri.replace('/src', ''))
video_metadata_url = urllib.parse.urljoin(url, video_uri.replace('/src', '')) video_id = self._search_regex(r'upload/([^/]+)/src',
video_id = self._search_regex(r'upload/([^/]+)/src', video_uri,
video_uri, 'video id', group=1)
'video id', group=1) json_vid_data = self._download_json(video_metadata_url, video_id)
json_vid_data = self._download_json(video_metadata_url, video_id) assert video_id == json_vid_data['id'], 'unexpected json metadata retrieved'
assert video_id == json_vid_data['id'], 'unexpected json metadata retrieved' json_vid_data['_type'] = 'audio'
json_vid_data['_type'] = 'audio' json_vid_data['_uri'] = video_uri
json_vid_data['_uri'] = video_uri found_items.append(json_vid_data)
found_items.append(json_vid_data)
for json_vid_data in found_items:
for json_vid_data in found_items: video_id = json_vid_data['id']
video_id = json_vid_data['id'] if 'video' in json_vid_data['_type']:
if 'video' in json_vid_data['_type']: formats, subtitles = self._extract_video_formats(
formats, subtitles = self._extract_video_formats( video_id, canonical_url,
video_id, canonical_url, query_params={'override_publication_id': result_info['uploader_id']})
query_params={'override_publication_id': result_info['uploader_id']})
else:
fmt = {'url': urllib.parse.urljoin(url, json_vid_data['_uri'])}
if not determine_ext(fmt['url'], default_ext=None):
# The redirected format URL expires but the original URL doesn't,
# so we only want to extract the extension from this request
fmt['ext'] = determine_ext(self._request_webpage(
HEADRequest(fmt['url']), video_id,
'Resolving audio file extension',
'Embedded audio URL is invalid').url)
formats.append(fmt)
new_result = dict(result_info)
new_result.update({
'formats': formats,
'subtitles': subtitles,
'id': video_id,
'title': json_vid_data.get('name', None),
'description': None,
'thumbnail': json_vid_data.get('thumbnail_url', None),
'duration': float_or_none(json_vid_data.get('duration', None)),
# videos can be cross-embedded, so the publication id should be of the uploader
'uploader_id': str_or_none(json_vid_data.get('user_id', None)),
'uploader': None, # video uploader username is not included
})
results.append(new_result)
if len(results) > 0:
playlist_title = f'Videos for {post_title}'
return self.playlist_result(results, post_id, playlist_title)
else: else:
self.raise_no_formats(f'Page type "{post_type}" contains no supported embeds') fmt = {'url': urllib.parse.urljoin(url, json_vid_data['_uri'])}
if not determine_ext(fmt['url'], default_ext=None):
# The redirected format URL expires but the original URL doesn't,
# so we only want to extract the extension from this request
fmt['ext'] = determine_ext(self._request_webpage(
HEADRequest(fmt['url']), video_id,
'Resolving audio file extension',
'Embedded audio URL is invalid').url)
formats.append(fmt)
new_result = dict(result_info)
new_result.update({
'formats': formats,
'subtitles': subtitles,
'id': video_id,
'title': json_vid_data.get('name', None),
'description': None,
'thumbnail': json_vid_data.get('thumbnail_url', None),
'duration': float_or_none(json_vid_data.get('duration', None)),
# videos can be cross-embedded, so the publication id should be of the uploader
'uploader_id': str_or_none(json_vid_data.get('user_id', None)),
'uploader': None, # video uploader username is not included
})
all_results.append(new_result)
self.raise_no_formats(f'Page type "{post_type}" is not supported') if len(all_results) > 0:
playlist_title = f'Videos for {post_title}'
return self.playlist_result(all_results, post_id, playlist_title)
else:
self.raise_no_formats(f'Page type "{post_type}" contains no supported embeds')

Loading…
Cancel
Save