|
|
|
@ -12,6 +12,7 @@ import io
|
|
|
|
|
import itertools
|
|
|
|
|
import json
|
|
|
|
|
import locale
|
|
|
|
|
import logging
|
|
|
|
|
import operator
|
|
|
|
|
import os
|
|
|
|
|
import platform
|
|
|
|
@ -24,6 +25,8 @@ import time
|
|
|
|
|
import tokenize
|
|
|
|
|
import traceback
|
|
|
|
|
import random
|
|
|
|
|
from typing import Any
|
|
|
|
|
from typing import cast
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from ssl import OPENSSL_VERSION
|
|
|
|
@ -130,6 +133,10 @@ from .version import __version__
|
|
|
|
|
if compat_os_name == 'nt':
|
|
|
|
|
import ctypes
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger('soundcloudutil.downloader')
|
|
|
|
|
|
|
|
|
|
TAGGED_LOG_MSG_REGEX = re.compile(r'^\[(?P<tag>\w+)(:(?P<subtag>\w+))?\]\s*(?P<msg>.+)$')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _catch_unsafe_file_extension(func):
|
|
|
|
|
@functools.wraps(func)
|
|
|
|
@ -536,37 +543,53 @@ class YoutubeDL(object):
|
|
|
|
|
for _ in range(line_count))
|
|
|
|
|
return res[:-len('\n')]
|
|
|
|
|
|
|
|
|
|
def to_screen(self, message, skip_eol=False):
|
|
|
|
|
def to_screen(self, message, skip_eol: bool = False):
|
|
|
|
|
"""Print message to stdout if not in quiet mode."""
|
|
|
|
|
return self.to_stdout(message, skip_eol, check_quiet=True)
|
|
|
|
|
|
|
|
|
|
def _write_string(self, s, out=None, only_once=False, _cache=set()):
|
|
|
|
|
if only_once and s in _cache:
|
|
|
|
|
return
|
|
|
|
|
@property
|
|
|
|
|
def user_logger(self) -> logging.Logger | None:
|
|
|
|
|
return cast(logging.Logger | None, self.params.get('logger'))
|
|
|
|
|
|
|
|
|
|
def _write_string(self, s: str, out: io.TextIOWrapper | None = None) -> None:
|
|
|
|
|
write_string(s, out=out, encoding=self.params.get('encoding'))
|
|
|
|
|
if only_once:
|
|
|
|
|
_cache.add(s)
|
|
|
|
|
|
|
|
|
|
def to_stdout(self, message, skip_eol=False, check_quiet=False, only_once=False):
|
|
|
|
|
def to_stdout(self, message, skip_eol: bool = False, check_quiet: bool = False):
|
|
|
|
|
"""Print message to stdout if not in quiet mode."""
|
|
|
|
|
if self.params.get('logger'):
|
|
|
|
|
self.params['logger'].debug(message)
|
|
|
|
|
elif not check_quiet or not self.params.get('quiet', False):
|
|
|
|
|
message = self._bidi_workaround(message)
|
|
|
|
|
terminator = ['\n', ''][skip_eol]
|
|
|
|
|
output = message + terminator
|
|
|
|
|
|
|
|
|
|
self._write_string(output, self._screen_file, only_once=only_once)
|
|
|
|
|
quiet = check_quiet and self.params.get('quiet', False)
|
|
|
|
|
|
|
|
|
|
debug: bool
|
|
|
|
|
if message.startswith(f'[debug]'):
|
|
|
|
|
debug = True
|
|
|
|
|
message = message.removeprefix('[debug]').lstrip()
|
|
|
|
|
elif message.startswith('[info]'):
|
|
|
|
|
debug = False
|
|
|
|
|
message = message.removeprefix('[info]').lstrip()
|
|
|
|
|
elif quiet:
|
|
|
|
|
debug = True
|
|
|
|
|
else:
|
|
|
|
|
debug = False
|
|
|
|
|
|
|
|
|
|
_logger = logger
|
|
|
|
|
if m := TAGGED_LOG_MSG_REGEX.match(message):
|
|
|
|
|
tag = m.group('tag')
|
|
|
|
|
subtag = m.group('subtag')
|
|
|
|
|
_logger_name = f'youtube_dl.{tag}'
|
|
|
|
|
if m.group('subtag'):
|
|
|
|
|
_logger_name += f'.{subtag}'
|
|
|
|
|
_logger = logging.getLogger(_logger_name)
|
|
|
|
|
message = m.group('msg')
|
|
|
|
|
|
|
|
|
|
if debug:
|
|
|
|
|
_logger.debug(message)
|
|
|
|
|
else:
|
|
|
|
|
_logger.info(message)
|
|
|
|
|
|
|
|
|
|
def to_stderr(self, message, only_once=False):
|
|
|
|
|
"""Print message to stderr."""
|
|
|
|
|
assert isinstance(message, compat_str)
|
|
|
|
|
if self.params.get('logger'):
|
|
|
|
|
self.params['logger'].error(message)
|
|
|
|
|
def to_stderr(self, message: str) -> None:
|
|
|
|
|
if self.user_logger is not None:
|
|
|
|
|
self.user_logger.error(message)
|
|
|
|
|
else:
|
|
|
|
|
message = self._bidi_workaround(message)
|
|
|
|
|
output = message + '\n'
|
|
|
|
|
self._write_string(output, self._err_file, only_once=only_once)
|
|
|
|
|
logger.error(message)
|
|
|
|
|
|
|
|
|
|
def to_console_title(self, message):
|
|
|
|
|
if not self.params.get('consoletitle', False):
|
|
|
|
@ -645,33 +668,26 @@ class YoutubeDL(object):
|
|
|
|
|
raise DownloadError(message, exc_info)
|
|
|
|
|
self._download_retcode = 1
|
|
|
|
|
|
|
|
|
|
def report_warning(self, message, only_once=False):
|
|
|
|
|
'''
|
|
|
|
|
Print the message to stderr, it will be prefixed with 'WARNING:'
|
|
|
|
|
If stderr is a tty file the 'WARNING:' will be colored
|
|
|
|
|
'''
|
|
|
|
|
if self.params.get('logger') is not None:
|
|
|
|
|
self.params['logger'].warning(message)
|
|
|
|
|
def report_warning(self, message: str, only_once: bool = False, _cache: dict[int, int] | None = None) -> None:
|
|
|
|
|
_cache = _cache or {}
|
|
|
|
|
if only_once:
|
|
|
|
|
m_hash = hash((self, message))
|
|
|
|
|
m_cnt = _cache.setdefault(m_hash, 0)
|
|
|
|
|
_cache[m_hash] = m_cnt + 1
|
|
|
|
|
if m_cnt > 0:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if self.user_logger is not None:
|
|
|
|
|
self.user_logger.warning(message)
|
|
|
|
|
else:
|
|
|
|
|
if self.params.get('no_warnings'):
|
|
|
|
|
return
|
|
|
|
|
if not self.params.get('no_color') and self._err_file.isatty() and compat_os_name != 'nt':
|
|
|
|
|
_msg_header = '\033[0;33mWARNING:\033[0m'
|
|
|
|
|
else:
|
|
|
|
|
_msg_header = 'WARNING:'
|
|
|
|
|
warning_message = '%s %s' % (_msg_header, message)
|
|
|
|
|
self.to_stderr(warning_message, only_once=only_once)
|
|
|
|
|
|
|
|
|
|
def report_error(self, message, *args, **kwargs):
|
|
|
|
|
'''
|
|
|
|
|
Do the same as trouble, but prefixes the message with 'ERROR:', colored
|
|
|
|
|
in red if stderr is a tty file.
|
|
|
|
|
'''
|
|
|
|
|
if not self.params.get('no_color') and self._err_file.isatty() and compat_os_name != 'nt':
|
|
|
|
|
_msg_header = '\033[0;31mERROR:\033[0m'
|
|
|
|
|
else:
|
|
|
|
|
_msg_header = 'ERROR:'
|
|
|
|
|
kwargs['message'] = '%s %s' % (_msg_header, message)
|
|
|
|
|
logger.warning(message)
|
|
|
|
|
|
|
|
|
|
# TODO: re-implement :meth:`trouble` to output tracebacks with RichHandler
|
|
|
|
|
def report_error(self, message: str, *args: Any, **kwargs: Any) -> None:
|
|
|
|
|
logger.error(message)
|
|
|
|
|
kwargs['message'] = f'ERROR: {message}'
|
|
|
|
|
self.trouble(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def write_debug(self, message, only_once=False):
|
|
|
|
@ -2663,7 +2679,13 @@ class YoutubeDL(object):
|
|
|
|
|
encoding = preferredencoding()
|
|
|
|
|
return encoding
|
|
|
|
|
|
|
|
|
|
def _write_info_json(self, label, info_dict, infofn, overwrite=None):
|
|
|
|
|
def _write_info_json(
|
|
|
|
|
self,
|
|
|
|
|
label: str,
|
|
|
|
|
info_dict: dict[str, Any],
|
|
|
|
|
infofn: str,
|
|
|
|
|
overwrite: bool | None = None,
|
|
|
|
|
) -> bool | str | None:
|
|
|
|
|
if not self.params.get('writeinfojson', False):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
@ -2683,7 +2705,7 @@ class YoutubeDL(object):
|
|
|
|
|
return True
|
|
|
|
|
except (OSError, IOError):
|
|
|
|
|
self.report_error(msg('Cannot write %s to JSON file ', label) + infofn)
|
|
|
|
|
return
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _write_thumbnails(self, info_dict, filename):
|
|
|
|
|
if self.params.get('writethumbnail', False):
|
|
|
|
|