[fragment] Merge during download for `-N`, and refactor `hls`/`dash` (#364)

pull/310/head
pukkandan 4 years ago committed by GitHub
parent e6779b9400
commit 4c7853de14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,21 +1,9 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import errno
try:
import concurrent.futures
can_threaded_download = True
except ImportError:
can_threaded_download = False
from ..downloader import _get_real_downloader from ..downloader import _get_real_downloader
from .fragment import FragmentFD from .fragment import FragmentFD
from ..compat import compat_urllib_error from ..utils import urljoin
from ..utils import (
DownloadError,
sanitize_open,
urljoin,
)
class DashSegmentsFD(FragmentFD): class DashSegmentsFD(FragmentFD):
@ -43,9 +31,6 @@ class DashSegmentsFD(FragmentFD):
else: else:
self._prepare_and_start_frag_download(ctx) self._prepare_and_start_frag_download(ctx)
fragment_retries = self.params.get('fragment_retries', 0)
skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True)
fragments_to_download = [] fragments_to_download = []
frag_index = 0 frag_index = 0
for i, fragment in enumerate(fragments): for i, fragment in enumerate(fragments):
@ -76,116 +61,5 @@ class DashSegmentsFD(FragmentFD):
if not success: if not success:
return False return False
else: else:
def download_fragment(fragment): self.download_and_append_fragments(ctx, fragments_to_download, info_dict)
i = fragment['index']
frag_index = fragment['frag_index']
fragment_url = fragment['url']
ctx['fragment_index'] = frag_index
# In DASH, the first segment contains necessary headers to
# generate a valid MP4 file, so always abort for the first segment
fatal = i == 0 or not skip_unavailable_fragments
count = 0
while count <= fragment_retries:
try:
success, frag_content = self._download_fragment(ctx, fragment_url, info_dict)
if not success:
return False, frag_index
break
except compat_urllib_error.HTTPError as err:
# YouTube may often return 404 HTTP error for a fragment causing the
# whole download to fail. However if the same fragment is immediately
# retried with the same request data this usually succeeds (1-2 attempts
# is usually enough) thus allowing to download the whole file successfully.
# To be future-proof we will retry all fragments that fail with any
# HTTP error.
count += 1
if count <= fragment_retries:
self.report_retry_fragment(err, frag_index, count, fragment_retries)
except DownloadError:
# Don't retry fragment if error occurred during HTTP downloading
# itself since it has own retry settings
if not fatal:
break
raise
if count > fragment_retries:
if not fatal:
return False, frag_index
ctx['dest_stream'].close()
self.report_error('Giving up after %s fragment retries' % fragment_retries)
return False, frag_index
return frag_content, frag_index
def append_fragment(frag_content, frag_index):
fatal = frag_index == 1 or not skip_unavailable_fragments
if frag_content:
fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
try:
file, frag_sanitized = sanitize_open(fragment_filename, 'rb')
ctx['fragment_filename_sanitized'] = frag_sanitized
file.close()
self._append_fragment(ctx, frag_content)
return True
except EnvironmentError as ose:
if ose.errno != errno.ENOENT:
raise
# FileNotFoundError
if not fatal:
self.report_skip_fragment(frag_index)
return True
else:
ctx['dest_stream'].close()
self.report_error(
'fragment %s not found, unable to continue' % frag_index)
return False
else:
if not fatal:
self.report_skip_fragment(frag_index)
return True
else:
ctx['dest_stream'].close()
self.report_error(
'fragment %s not found, unable to continue' % frag_index)
return False
max_workers = self.params.get('concurrent_fragment_downloads', 1)
if can_threaded_download and max_workers > 1:
self.report_warning('The download speed shown is only of one thread. This is a known issue')
_download_fragment = lambda f: (f, download_fragment(f)[1])
with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
futures = [pool.submit(_download_fragment, fragment) for fragment in fragments_to_download]
# timeout must be 0 to return instantly
done, not_done = concurrent.futures.wait(futures, timeout=0)
try:
while not_done:
# Check every 1 second for KeyboardInterrupt
freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1)
done |= freshly_done
except KeyboardInterrupt:
for future in not_done:
future.cancel()
# timeout must be none to cancel
concurrent.futures.wait(not_done, timeout=None)
raise KeyboardInterrupt
for fragment, frag_index in map(lambda x: x.result(), futures):
fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
down, frag_sanitized = sanitize_open(fragment_filename, 'rb')
fragment['fragment_filename_sanitized'] = frag_sanitized
frag_content = down.read()
down.close()
result = append_fragment(frag_content, frag_index)
if not result:
return False
else:
for fragment in fragments_to_download:
frag_content, frag_index = download_fragment(fragment)
result = append_fragment(frag_content, frag_index)
if not result:
return False
self._finish_frag_download(ctx)
return True return True

@ -4,9 +4,26 @@ import os
import time import time
import json import json
try:
from Crypto.Cipher import AES
can_decrypt_frag = True
except ImportError:
can_decrypt_frag = False
try:
import concurrent.futures
can_threaded_download = True
except ImportError:
can_threaded_download = False
from .common import FileDownloader from .common import FileDownloader
from .http import HttpFD from .http import HttpFD
from ..compat import (
compat_urllib_error,
compat_struct_pack,
)
from ..utils import ( from ..utils import (
DownloadError,
error_to_compat_str, error_to_compat_str,
encodeFilename, encodeFilename,
sanitize_open, sanitize_open,
@ -56,7 +73,7 @@ class FragmentFD(FileDownloader):
def report_retry_fragment(self, err, frag_index, count, retries): def report_retry_fragment(self, err, frag_index, count, retries):
self.to_screen( self.to_screen(
'[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...' '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...'
% (error_to_compat_str(err), frag_index, count, self.format_retries(retries))) % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
def report_skip_fragment(self, frag_index): def report_skip_fragment(self, frag_index):
@ -112,11 +129,15 @@ class FragmentFD(FileDownloader):
return False, None return False, None
if fragment_info_dict.get('filetime'): if fragment_info_dict.get('filetime'):
ctx['fragment_filetime'] = fragment_info_dict.get('filetime') ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
down, frag_sanitized = sanitize_open(fragment_filename, 'rb') ctx['fragment_filename_sanitized'] = fragment_filename
return True, self._read_fragment(ctx)
def _read_fragment(self, ctx):
down, frag_sanitized = sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
ctx['fragment_filename_sanitized'] = frag_sanitized ctx['fragment_filename_sanitized'] = frag_sanitized
frag_content = down.read() frag_content = down.read()
down.close() down.close()
return True, frag_content return frag_content
def _append_fragment(self, ctx, frag_content): def _append_fragment(self, ctx, frag_content):
try: try:
@ -304,3 +325,106 @@ class FragmentFD(FileDownloader):
'tmpfilename': tmpfilename, 'tmpfilename': tmpfilename,
'fragment_index': 0, 'fragment_index': 0,
}) })
def download_and_append_fragments(self, ctx, fragments, info_dict, pack_func=None):
fragment_retries = self.params.get('fragment_retries', 0)
skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True)
test = self.params.get('test', False)
if not pack_func:
pack_func = lambda frag_content, _: frag_content
def download_fragment(fragment, ctx):
frag_index = ctx['fragment_index'] = fragment['frag_index']
headers = info_dict.get('http_headers', {})
byte_range = fragment.get('byte_range')
if byte_range:
headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
# Never skip the first fragment
fatal = (fragment.get('index') or frag_index) == 0 or not skip_unavailable_fragments
count, frag_content = 0, None
while count <= fragment_retries:
try:
success, frag_content = self._download_fragment(ctx, fragment['url'], info_dict, headers)
if not success:
return False, frag_index
break
except compat_urllib_error.HTTPError as err:
# Unavailable (possibly temporary) fragments may be served.
# First we try to retry then either skip or abort.
# See https://github.com/ytdl-org/youtube-dl/issues/10165,
# https://github.com/ytdl-org/youtube-dl/issues/10448).
count += 1
if count <= fragment_retries:
self.report_retry_fragment(err, frag_index, count, fragment_retries)
except DownloadError:
# Don't retry fragment if error occurred during HTTP downloading
# itself since it has own retry settings
if not fatal:
break
raise
if count > fragment_retries:
if not fatal:
return False, frag_index
ctx['dest_stream'].close()
self.report_error('Giving up after %s fragment retries' % fragment_retries)
return False, frag_index
return frag_content, frag_index
def decrypt_fragment(fragment, frag_content):
decrypt_info = fragment.get('decrypt_info')
if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
return frag_content
iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence'])
decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
# Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
# size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
# not what it decrypts to.
if test:
return frag_content
return AES.new(decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
def append_fragment(frag_content, frag_index, ctx):
if not frag_content:
fatal = frag_index == 1 or not skip_unavailable_fragments
if not fatal:
self.report_skip_fragment(frag_index)
return True
else:
ctx['dest_stream'].close()
self.report_error(
'fragment %s not found, unable to continue' % frag_index)
return False
self._append_fragment(ctx, pack_func(frag_content, frag_index))
return True
max_workers = self.params.get('concurrent_fragment_downloads', 1)
if can_threaded_download and max_workers > 1:
def _download_fragment(fragment):
try:
ctx_copy = ctx.copy()
frag_content, frag_index = download_fragment(fragment, ctx_copy)
return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
except Exception:
# Return immediately on exception so that it is raised in the main thread
return
self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
ctx['fragment_filename_sanitized'] = frag_filename
ctx['fragment_index'] = frag_index
result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
if not result:
return False
else:
for fragment in fragments:
frag_content, frag_index = download_fragment(fragment, ctx)
result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
if not result:
return False
self._finish_frag_download(ctx)

@ -1,32 +1,18 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import errno
import re import re
import io import io
import binascii import binascii
try:
from Crypto.Cipher import AES
can_decrypt_frag = True
except ImportError:
can_decrypt_frag = False
try:
import concurrent.futures
can_threaded_download = True
except ImportError:
can_threaded_download = False
from ..downloader import _get_real_downloader from ..downloader import _get_real_downloader
from .fragment import FragmentFD from .fragment import FragmentFD, can_decrypt_frag
from .external import FFmpegFD from .external import FFmpegFD
from ..compat import ( from ..compat import (
compat_urllib_error,
compat_urlparse, compat_urlparse,
compat_struct_pack,
) )
from ..utils import ( from ..utils import (
parse_m3u8_attributes, parse_m3u8_attributes,
sanitize_open,
update_url_query, update_url_query,
bug_reports_message, bug_reports_message,
) )
@ -151,10 +137,6 @@ class HlsFD(FragmentFD):
extra_state = ctx.setdefault('extra_state', {}) extra_state = ctx.setdefault('extra_state', {})
fragment_retries = self.params.get('fragment_retries', 0)
skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True)
test = self.params.get('test', False)
format_index = info_dict.get('format_index') format_index = info_dict.get('format_index')
extra_query = None extra_query = None
extra_param_to_segment_url = info_dict.get('extra_param_to_segment_url') extra_param_to_segment_url = info_dict.get('extra_param_to_segment_url')
@ -258,7 +240,7 @@ class HlsFD(FragmentFD):
media_sequence += 1 media_sequence += 1
# We only download the first fragment during the test # We only download the first fragment during the test
if test: if self.params.get('test', False):
fragments = [fragments[0] if fragments else None] fragments = [fragments[0] if fragments else None]
if real_downloader: if real_downloader:
@ -272,55 +254,6 @@ class HlsFD(FragmentFD):
if not success: if not success:
return False return False
else: else:
def decrypt_fragment(fragment, frag_content):
decrypt_info = fragment['decrypt_info']
if decrypt_info['METHOD'] != 'AES-128':
return frag_content
iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence'])
decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
# Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
# size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
# not what it decrypts to.
if test:
return frag_content
return AES.new(decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
def download_fragment(fragment):
frag_index = fragment['frag_index']
frag_url = fragment['url']
byte_range = fragment['byte_range']
ctx['fragment_index'] = frag_index
count = 0
headers = info_dict.get('http_headers', {})
if byte_range:
headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
while count <= fragment_retries:
try:
success, frag_content = self._download_fragment(
ctx, frag_url, info_dict, headers)
if not success:
return False, frag_index
break
except compat_urllib_error.HTTPError as err:
# Unavailable (possibly temporary) fragments may be served.
# First we try to retry then either skip or abort.
# See https://github.com/ytdl-org/youtube-dl/issues/10165,
# https://github.com/ytdl-org/youtube-dl/issues/10448).
count += 1
if count <= fragment_retries:
self.report_retry_fragment(err, frag_index, count, fragment_retries)
if count > fragment_retries:
ctx['dest_stream'].close()
self.report_error('Giving up after %s fragment retries' % fragment_retries)
return False, frag_index
return decrypt_fragment(fragment, frag_content), frag_index
pack_fragment = lambda frag_content, _: frag_content
if is_webvtt: if is_webvtt:
def pack_fragment(frag_content, frag_index): def pack_fragment(frag_content, frag_index):
output = io.StringIO() output = io.StringIO()
@ -388,75 +321,7 @@ class HlsFD(FragmentFD):
block.write_into(output) block.write_into(output)
return output.getvalue().encode('utf-8') return output.getvalue().encode('utf-8')
def append_fragment(frag_content, frag_index):
fatal = frag_index == 1 or not skip_unavailable_fragments
if frag_content:
fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
try:
file, frag_sanitized = sanitize_open(fragment_filename, 'rb')
ctx['fragment_filename_sanitized'] = frag_sanitized
file.close()
frag_content = pack_fragment(frag_content, frag_index)
self._append_fragment(ctx, frag_content)
return True
except EnvironmentError as ose:
if ose.errno != errno.ENOENT:
raise
# FileNotFoundError
if not fatal:
self.report_skip_fragment(frag_index)
return True
else:
ctx['dest_stream'].close()
self.report_error(
'fragment %s not found, unable to continue' % frag_index)
return False
else: else:
if not fatal: pack_fragment = None
self.report_skip_fragment(frag_index) self.download_and_append_fragments(ctx, fragments, info_dict, pack_fragment)
return True
else:
ctx['dest_stream'].close()
self.report_error(
'fragment %s not found, unable to continue' % frag_index)
return False
max_workers = self.params.get('concurrent_fragment_downloads', 1)
if can_threaded_download and max_workers > 1:
self.report_warning('The download speed shown is only of one thread. This is a known issue')
_download_fragment = lambda f: (f, download_fragment(f)[1])
with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
futures = [pool.submit(_download_fragment, fragment) for fragment in fragments]
# timeout must be 0 to return instantly
done, not_done = concurrent.futures.wait(futures, timeout=0)
try:
while not_done:
# Check every 1 second for KeyboardInterrupt
freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1)
done |= freshly_done
except KeyboardInterrupt:
for future in not_done:
future.cancel()
# timeout must be none to cancel
concurrent.futures.wait(not_done, timeout=None)
raise KeyboardInterrupt
for fragment, frag_index in map(lambda x: x.result(), futures):
fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
down, frag_sanitized = sanitize_open(fragment_filename, 'rb')
fragment['fragment_filename_sanitized'] = frag_sanitized
frag_content = down.read()
down.close()
result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index)
if not result:
return False
else:
for fragment in fragments:
frag_content, frag_index = download_fragment(fragment)
result = append_fragment(frag_content, frag_index)
if not result:
return False
self._finish_frag_download(ctx)
return True return True

Loading…
Cancel
Save