From 75a24854073e590f4efc9f037b57dee348f52b61 Mon Sep 17 00:00:00 2001 From: Remita Amine Date: Tue, 28 Jun 2016 18:07:50 +0100 Subject: [PATCH] [fragment,hls,f4m,dash,ism] improve fragment downloading - resume immediately - no need to concatenate segments and decrypt them on every resume - no need to save temp files for segments and for hls downloader: - no need to download keys for segments that already downloaded --- youtube_dl/downloader/common.py | 31 +++++++++-------- youtube_dl/downloader/dash.py | 43 ++++++----------------- youtube_dl/downloader/f4m.py | 33 ++++++------------ youtube_dl/downloader/fragment.py | 57 +++++++++++++++++++++++++------ youtube_dl/downloader/hls.py | 34 +++++++----------- youtube_dl/downloader/http.py | 8 +++-- youtube_dl/downloader/ism.py | 30 +++++----------- 7 files changed, 112 insertions(+), 124 deletions(-) diff --git a/youtube_dl/downloader/common.py b/youtube_dl/downloader/common.py index 2c4470a95..fdb77b620 100644 --- a/youtube_dl/downloader/common.py +++ b/youtube_dl/downloader/common.py @@ -327,21 +327,22 @@ class FileDownloader(object): os.path.exists(encodeFilename(filename)) ) - continuedl_and_exists = ( - self.params.get('continuedl', True) and - os.path.isfile(encodeFilename(filename)) and - not self.params.get('nopart', False) - ) - - # Check file already present - if filename != '-' and (nooverwrites_and_exists or continuedl_and_exists): - self.report_file_already_downloaded(filename) - self._hook_progress({ - 'filename': filename, - 'status': 'finished', - 'total_bytes': os.path.getsize(encodeFilename(filename)), - }) - return True + if not hasattr(filename, 'write'): + continuedl_and_exists = ( + self.params.get('continuedl', True) and + os.path.isfile(encodeFilename(filename)) and + not self.params.get('nopart', False) + ) + + # Check file already present + if filename != '-' and (nooverwrites_and_exists or continuedl_and_exists): + self.report_file_already_downloaded(filename) + self._hook_progress({ + 'filename': filename, + 'status': 'finished', + 'total_bytes': os.path.getsize(encodeFilename(filename)), + }) + return True min_sleep_interval = self.params.get('sleep_interval') if min_sleep_interval: diff --git a/youtube_dl/downloader/dash.py b/youtube_dl/downloader/dash.py index e2ddc369e..94a13a543 100644 --- a/youtube_dl/downloader/dash.py +++ b/youtube_dl/downloader/dash.py @@ -1,13 +1,7 @@ from __future__ import unicode_literals -import os - from .fragment import FragmentFD from ..compat import compat_urllib_error -from ..utils import ( - sanitize_open, - encodeFilename, -) class DashSegmentsFD(FragmentFD): @@ -28,31 +22,24 @@ class DashSegmentsFD(FragmentFD): self._prepare_and_start_frag_download(ctx) - segments_filenames = [] - fragment_retries = self.params.get('fragment_retries', 0) skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True) - def process_segment(segment, tmp_filename, num): - segment_url = segment['url'] - segment_name = 'Frag%d' % num - target_filename = '%s-%s' % (tmp_filename, segment_name) + frag_index = 0 + for i, segment in enumerate(segments): + frag_index += 1 + if frag_index <= ctx['frag_index']: + continue # In DASH, the first segment contains necessary headers to # generate a valid MP4 file, so always abort for the first segment - fatal = num == 0 or not skip_unavailable_fragments + fatal = i == 0 or not skip_unavailable_fragments count = 0 while count <= fragment_retries: try: - success = ctx['dl'].download(target_filename, { - 'url': segment_url, - 'http_headers': info_dict.get('http_headers'), - }) + success, frag_content = self._download_fragment(ctx, segment['url'], info_dict) if not success: return False - down, target_sanitized = sanitize_open(target_filename, 'rb') - ctx['dest_stream'].write(down.read()) - down.close() - segments_filenames.append(target_sanitized) + self._append_fragment(ctx, frag_content) break except compat_urllib_error.HTTPError as err: # YouTube may often return 404 HTTP error for a fragment causing the @@ -63,22 +50,14 @@ class DashSegmentsFD(FragmentFD): # HTTP error. count += 1 if count <= fragment_retries: - self.report_retry_fragment(err, segment_name, count, fragment_retries) + self.report_retry_fragment(err, frag_index, count, fragment_retries) if count > fragment_retries: if not fatal: - self.report_skip_fragment(segment_name) - return True + self.report_skip_fragment(frag_index) + continue self.report_error('giving up after %s fragment retries' % fragment_retries) return False - return True - - for i, segment in enumerate(segments): - if not process_segment(segment, ctx['tmpfilename'], i): - return False self._finish_frag_download(ctx) - for segment_file in segments_filenames: - os.remove(encodeFilename(segment_file)) - return True diff --git a/youtube_dl/downloader/f4m.py b/youtube_dl/downloader/f4m.py index 688e086eb..e456ed58f 100644 --- a/youtube_dl/downloader/f4m.py +++ b/youtube_dl/downloader/f4m.py @@ -3,7 +3,6 @@ from __future__ import division, unicode_literals import base64 import io import itertools -import os import time from .fragment import FragmentFD @@ -16,9 +15,7 @@ from ..compat import ( compat_struct_unpack, ) from ..utils import ( - encodeFilename, fix_xml_ampersands, - sanitize_open, xpath_text, ) @@ -366,17 +363,21 @@ class F4mFD(FragmentFD): dest_stream = ctx['dest_stream'] - write_flv_header(dest_stream) - if not live: - write_metadata_tag(dest_stream, metadata) + if ctx['complete_frags_downloaded_bytes'] == 0: + write_flv_header(dest_stream) + if not live: + write_metadata_tag(dest_stream, metadata) base_url_parsed = compat_urllib_parse_urlparse(base_url) self._start_frag_download(ctx) - frags_filenames = [] + frag_index = 0 while fragments_list: seg_i, frag_i = fragments_list.pop(0) + frag_index += 1 + if frag_index <= ctx['frag_index']: + continue name = 'Seg%d-Frag%d' % (seg_i, frag_i) query = [] if base_url_parsed.query: @@ -386,17 +387,10 @@ class F4mFD(FragmentFD): if info_dict.get('extra_param_to_segment_url'): query.append(info_dict['extra_param_to_segment_url']) url_parsed = base_url_parsed._replace(path=base_url_parsed.path + name, query='&'.join(query)) - frag_filename = '%s-%s' % (ctx['tmpfilename'], name) try: - success = ctx['dl'].download(frag_filename, { - 'url': url_parsed.geturl(), - 'http_headers': info_dict.get('http_headers'), - }) + success, down_data = self._download_fragment(ctx, url_parsed.geturl(), info_dict) if not success: return False - (down, frag_sanitized) = sanitize_open(frag_filename, 'rb') - down_data = down.read() - down.close() reader = FlvReader(down_data) while True: try: @@ -411,12 +405,8 @@ class F4mFD(FragmentFD): break raise if box_type == b'mdat': - dest_stream.write(box_data) + self._append_fragment(ctx, box_data) break - if live: - os.remove(encodeFilename(frag_sanitized)) - else: - frags_filenames.append(frag_sanitized) except (compat_urllib_error.HTTPError, ) as err: if live and (err.code == 404 or err.code == 410): # We didn't keep up with the live window. Continue @@ -436,7 +426,4 @@ class F4mFD(FragmentFD): self._finish_frag_download(ctx) - for frag_file in frags_filenames: - os.remove(encodeFilename(frag_file)) - return True diff --git a/youtube_dl/downloader/fragment.py b/youtube_dl/downloader/fragment.py index 56f975266..44a3c1040 100644 --- a/youtube_dl/downloader/fragment.py +++ b/youtube_dl/downloader/fragment.py @@ -2,6 +2,7 @@ from __future__ import division, unicode_literals import os import time +import io from .common import FileDownloader from .http import HttpFD @@ -10,6 +11,7 @@ from ..utils import ( encodeFilename, sanitize_open, sanitized_Request, + compat_str, ) @@ -30,13 +32,13 @@ class FragmentFD(FileDownloader): Skip unavailable fragments (DASH and hlsnative only) """ - def report_retry_fragment(self, err, fragment_name, count, retries): + def report_retry_fragment(self, err, frag_index, count, retries): self.to_screen( - '[download] Got server HTTP error: %s. Retrying fragment %s (attempt %d of %s)...' - % (error_to_compat_str(err), fragment_name, count, self.format_retries(retries))) + '[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s)...' + % (error_to_compat_str(err), frag_index, count, self.format_retries(retries))) - def report_skip_fragment(self, fragment_name): - self.to_screen('[download] Skipping fragment %s...' % fragment_name) + def report_skip_fragment(self, frag_index): + self.to_screen('[download] Skipping fragment %d...' % frag_index) def _prepare_url(self, info_dict, url): headers = info_dict.get('http_headers') @@ -46,6 +48,25 @@ class FragmentFD(FileDownloader): self._prepare_frag_download(ctx) self._start_frag_download(ctx) + def _download_fragment(self, ctx, frag_url, info_dict, headers=None): + down = io.BytesIO() + success = ctx['dl'].download(down, { + 'url': frag_url, + 'http_headers': headers or info_dict.get('http_headers'), + }) + if not success: + return False, None + frag_content = down.getvalue() + down.close() + return True, frag_content + + def _append_fragment(self, ctx, frag_content): + ctx['dest_stream'].write(frag_content) + if not (ctx.get('live') or ctx['tmpfilename'] == '-'): + frag_index_stream, _ = sanitize_open(ctx['tmpfilename'] + '.fragindex', 'w') + frag_index_stream.write(compat_str(ctx['frag_index'])) + frag_index_stream.close() + def _prepare_frag_download(self, ctx): if 'live' not in ctx: ctx['live'] = False @@ -66,11 +87,26 @@ class FragmentFD(FileDownloader): } ) tmpfilename = self.temp_name(ctx['filename']) - dest_stream, tmpfilename = sanitize_open(tmpfilename, 'wb') + open_mode = 'wb' + resume_len = 0 + frag_index = 0 + # Establish possible resume length + if os.path.isfile(encodeFilename(tmpfilename)): + open_mode = 'ab' + resume_len = os.path.getsize(encodeFilename(tmpfilename)) + if os.path.isfile(encodeFilename(tmpfilename + '.fragindex')): + frag_index_stream, _ = sanitize_open(tmpfilename + '.fragindex', 'r') + frag_index = int(frag_index_stream.read()) + frag_index_stream.close() + dest_stream, tmpfilename = sanitize_open(tmpfilename, open_mode) + ctx.update({ 'dl': dl, 'dest_stream': dest_stream, 'tmpfilename': tmpfilename, + 'frag_index': frag_index, + # Total complete fragments downloaded so far in bytes + 'complete_frags_downloaded_bytes': resume_len, }) def _start_frag_download(self, ctx): @@ -79,8 +115,8 @@ class FragmentFD(FileDownloader): # hook state = { 'status': 'downloading', - 'downloaded_bytes': 0, - 'frag_index': 0, + 'downloaded_bytes': ctx['complete_frags_downloaded_bytes'], + 'frag_index': ctx['frag_index'], 'frag_count': total_frags, 'filename': ctx['filename'], 'tmpfilename': ctx['tmpfilename'], @@ -89,8 +125,6 @@ class FragmentFD(FileDownloader): start = time.time() ctx.update({ 'started': start, - # Total complete fragments downloaded so far in bytes - 'complete_frags_downloaded_bytes': 0, # Amount of fragment's bytes downloaded by the time of the previous # frag progress hook invocation 'prev_frag_downloaded_bytes': 0, @@ -111,6 +145,7 @@ class FragmentFD(FileDownloader): if s['status'] == 'finished': state['frag_index'] += 1 + ctx['frag_index'] = state['frag_index'] state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes'] ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes'] ctx['prev_frag_downloaded_bytes'] = 0 @@ -132,6 +167,8 @@ class FragmentFD(FileDownloader): def _finish_frag_download(self, ctx): ctx['dest_stream'].close() + if os.path.isfile(encodeFilename(ctx['tmpfilename'] + '.fragindex')): + os.remove(encodeFilename(ctx['tmpfilename'] + '.fragindex')) elapsed = time.time() - ctx['started'] self.try_rename(ctx['tmpfilename'], ctx['filename']) fsize = os.path.getsize(encodeFilename(ctx['filename'])) diff --git a/youtube_dl/downloader/hls.py b/youtube_dl/downloader/hls.py index d0a5f7ba4..9a87d7ca8 100644 --- a/youtube_dl/downloader/hls.py +++ b/youtube_dl/downloader/hls.py @@ -1,6 +1,5 @@ from __future__ import unicode_literals -import os.path import re import binascii try: @@ -18,8 +17,6 @@ from ..compat import ( compat_struct_pack, ) from ..utils import ( - encodeFilename, - sanitize_open, parse_m3u8_attributes, update_url_query, ) @@ -103,17 +100,18 @@ class HlsFD(FragmentFD): media_sequence = 0 decrypt_info = {'METHOD': 'NONE'} byte_range = {} - frags_filenames = [] + frag_index = 0 for line in s.splitlines(): line = line.strip() if line: if not line.startswith('#'): + frag_index += 1 + if frag_index <= ctx['frag_index']: + continue frag_url = ( line if re.match(r'^https?://', line) else compat_urlparse.urljoin(man_url, line)) - frag_name = 'Frag%d' % i - frag_filename = '%s-%s' % (ctx['tmpfilename'], frag_name) if extra_query: frag_url = update_url_query(frag_url, extra_query) count = 0 @@ -122,15 +120,10 @@ class HlsFD(FragmentFD): headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end']) while count <= fragment_retries: try: - success = ctx['dl'].download(frag_filename, { - 'url': frag_url, - 'http_headers': headers, - }) + success, frag_content = self._download_fragment( + ctx, frag_url, info_dict, headers) if not success: return False - down, frag_sanitized = sanitize_open(frag_filename, 'rb') - frag_content = down.read() - down.close() break except compat_urllib_error.HTTPError as err: # Unavailable (possibly temporary) fragments may be served. @@ -139,28 +132,29 @@ class HlsFD(FragmentFD): # https://github.com/rg3/youtube-dl/issues/10448). count += 1 if count <= fragment_retries: - self.report_retry_fragment(err, frag_name, count, fragment_retries) + self.report_retry_fragment(err, frag_index, count, fragment_retries) if count > fragment_retries: if skip_unavailable_fragments: i += 1 media_sequence += 1 - self.report_skip_fragment(frag_name) + self.report_skip_fragment(frag_index) continue self.report_error( 'giving up after %s fragment retries' % fragment_retries) return False if decrypt_info['METHOD'] == 'AES-128': iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence) + decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(decrypt_info['URI']).read() frag_content = AES.new( decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content) - ctx['dest_stream'].write(frag_content) - frags_filenames.append(frag_sanitized) + self._append_fragment(ctx, frag_content) # We only download the first fragment during the test if test: break i += 1 media_sequence += 1 elif line.startswith('#EXT-X-KEY'): + decrypt_url = decrypt_info.get('URI') decrypt_info = parse_m3u8_attributes(line[11:]) if decrypt_info['METHOD'] == 'AES-128': if 'IV' in decrypt_info: @@ -170,7 +164,8 @@ class HlsFD(FragmentFD): man_url, decrypt_info['URI']) if extra_query: decrypt_info['URI'] = update_url_query(decrypt_info['URI'], extra_query) - decrypt_info['KEY'] = self.ydl.urlopen(decrypt_info['URI']).read() + if decrypt_url != decrypt_info['URI']: + decrypt_info['KEY'] = None elif line.startswith('#EXT-X-MEDIA-SEQUENCE'): media_sequence = int(line[22:]) elif line.startswith('#EXT-X-BYTERANGE'): @@ -183,7 +178,4 @@ class HlsFD(FragmentFD): self._finish_frag_download(ctx) - for frag_file in frags_filenames: - os.remove(encodeFilename(frag_file)) - return True diff --git a/youtube_dl/downloader/http.py b/youtube_dl/downloader/http.py index af405b950..2896a17af 100644 --- a/youtube_dl/downloader/http.py +++ b/youtube_dl/downloader/http.py @@ -20,10 +20,14 @@ from ..utils import ( class HttpFD(FileDownloader): - def real_download(self, filename, info_dict): + def real_download(self, filename_or_stream, info_dict): url = info_dict['url'] - tmpfilename = self.temp_name(filename) + filename = filename_or_stream stream = None + if hasattr(filename_or_stream, 'write'): + stream = filename_or_stream + filename = '-' + tmpfilename = self.temp_name(filename) # Do not include the Accept-Encoding header headers = {'Youtubedl-no-compression': 'True'} diff --git a/youtube_dl/downloader/ism.py b/youtube_dl/downloader/ism.py index 63a636cb7..9f0fc36b3 100644 --- a/youtube_dl/downloader/ism.py +++ b/youtube_dl/downloader/ism.py @@ -1,6 +1,5 @@ from __future__ import unicode_literals -import os import time import struct import binascii @@ -8,10 +7,6 @@ import io from .fragment import FragmentFD from ..compat import compat_urllib_error -from ..utils import ( - sanitize_open, - encodeFilename, -) u8 = struct.Struct(b'>B') @@ -225,16 +220,15 @@ class IsmFD(FragmentFD): self._prepare_and_start_frag_download(ctx) - segments_filenames = [] - fragment_retries = self.params.get('fragment_retries', 0) skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True) track_written = False + frag_index = 0 for i, segment in enumerate(segments): - segment_url = segment['url'] - segment_name = 'Frag%d' % i - target_filename = '%s-%s' % (ctx['tmpfilename'], segment_name) + frag_index += 1 + if frag_index <= ctx['frag_index']: + continue count = 0 while count <= fragment_retries: try: @@ -242,33 +236,27 @@ class IsmFD(FragmentFD): 'url': segment_url, 'http_headers': info_dict.get('http_headers'), }) + success, frag_content = self._download_fragment(ctx, segment['url'], info_dict) if not success: return False - down, target_sanitized = sanitize_open(target_filename, 'rb') - down_data = down.read() if not track_written: - tfhd_data = extract_box_data(down_data, [b'moof', b'traf', b'tfhd']) + tfhd_data = extract_box_data(frag_content, [b'moof', b'traf', b'tfhd']) info_dict['_download_params']['track_id'] = u32.unpack(tfhd_data[4:8])[0] write_piff_header(ctx['dest_stream'], info_dict['_download_params']) track_written = True - ctx['dest_stream'].write(down_data) - down.close() - segments_filenames.append(target_sanitized) + self._append_fragment(ctx, frag_content) break except compat_urllib_error.HTTPError as err: count += 1 if count <= fragment_retries: - self.report_retry_fragment(err, segment_name, count, fragment_retries) + self.report_retry_fragment(err, frag_index, count, fragment_retries) if count > fragment_retries: if skip_unavailable_fragments: - self.report_skip_fragment(segment_name) + self.report_skip_fragment(frag_index) continue self.report_error('giving up after %s fragment retries' % fragment_retries) return False self._finish_frag_download(ctx) - for segment_file in segments_filenames: - os.remove(encodeFilename(segment_file)) - return True