[hls,dash] When using `concurrent_fragment_downloads`, do not keep the fragment content in memory

Partial fix for #359
This is a temporary solution until #364 can be implemented
pull/310/head
pukkandan 4 years ago
parent 5dcd8e1d88
commit d89da64b1d
No known key found for this signature in database
GPG Key ID: 0F00D95A001F4698

@ -154,8 +154,9 @@ class DashSegmentsFD(FragmentFD):
max_workers = self.params.get('concurrent_fragment_downloads', 1) max_workers = self.params.get('concurrent_fragment_downloads', 1)
if can_threaded_download and max_workers > 1: if can_threaded_download and max_workers > 1:
self.report_warning('The download speed shown is only of one thread. This is a known issue') self.report_warning('The download speed shown is only of one thread. This is a known issue')
_download_fragment = lambda f: (f, download_fragment(f)[1])
with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
futures = [pool.submit(download_fragment, fragment) for fragment in fragments_to_download] futures = [pool.submit(_download_fragment, fragment) for fragment in fragments_to_download]
# timeout must be 0 to return instantly # timeout must be 0 to return instantly
done, not_done = concurrent.futures.wait(futures, timeout=0) done, not_done = concurrent.futures.wait(futures, timeout=0)
try: try:
@ -169,9 +170,13 @@ class DashSegmentsFD(FragmentFD):
# timeout must be none to cancel # timeout must be none to cancel
concurrent.futures.wait(not_done, timeout=None) concurrent.futures.wait(not_done, timeout=None)
raise KeyboardInterrupt raise KeyboardInterrupt
results = [future.result() for future in futures]
for frag_content, frag_index in results: for fragment, frag_index in map(lambda x: x.result(), futures):
fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
down, frag_sanitized = sanitize_open(fragment_filename, 'rb')
fragment['fragment_filename_sanitized'] = frag_sanitized
frag_content = down.read()
down.close()
result = append_fragment(frag_content, frag_index) result = append_fragment(frag_content, frag_index)
if not result: if not result:
return False return False

@ -424,8 +424,9 @@ class HlsFD(FragmentFD):
max_workers = self.params.get('concurrent_fragment_downloads', 1) max_workers = self.params.get('concurrent_fragment_downloads', 1)
if can_threaded_download and max_workers > 1: if can_threaded_download and max_workers > 1:
self.report_warning('The download speed shown is only of one thread. This is a known issue') self.report_warning('The download speed shown is only of one thread. This is a known issue')
_download_fragment = lambda f: (f, download_fragment(f)[1])
with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
futures = [pool.submit(download_fragment, fragment) for fragment in fragments] futures = [pool.submit(_download_fragment, fragment) for fragment in fragments]
# timeout must be 0 to return instantly # timeout must be 0 to return instantly
done, not_done = concurrent.futures.wait(futures, timeout=0) done, not_done = concurrent.futures.wait(futures, timeout=0)
try: try:
@ -439,9 +440,13 @@ class HlsFD(FragmentFD):
# timeout must be none to cancel # timeout must be none to cancel
concurrent.futures.wait(not_done, timeout=None) concurrent.futures.wait(not_done, timeout=None)
raise KeyboardInterrupt raise KeyboardInterrupt
results = [future.result() for future in futures]
for frag_content, frag_index in results: for fragment, frag_index in map(lambda x: x.result(), futures):
fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
down, frag_sanitized = sanitize_open(fragment_filename, 'rb')
fragment['fragment_filename_sanitized'] = frag_sanitized
frag_content = down.read()
down.close()
result = append_fragment(frag_content, frag_index) result = append_fragment(frag_content, frag_index)
if not result: if not result:
return False return False

Loading…
Cancel
Save