[cleanup] Fix flake8 and minor refactor

Issues from ab029d7e92, 1fb53b946c
pull/4905/head
pukkandan 2 years ago
parent 709ee21417
commit 7a32c70d13
No known key found for this signature in database
GPG Key ID: 7EEE9E1E817D0A39

@ -30,6 +30,7 @@ from ..utils import (
clean_html, clean_html,
datetime_from_str, datetime_from_str,
dict_get, dict_get,
filter_dict,
float_or_none, float_or_none,
format_field, format_field,
get_first, get_first,
@ -617,7 +618,7 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
if auth is not None: if auth is not None:
headers['Authorization'] = auth headers['Authorization'] = auth
headers['X-Origin'] = origin headers['X-Origin'] = origin
return {h: v for h, v in headers.items() if v is not None} return filter_dict(headers)
def _download_ytcfg(self, client, video_id): def _download_ytcfg(self, client, video_id):
url = { url = {
@ -672,20 +673,10 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
if next_continuation: if next_continuation:
return next_continuation return next_continuation
contents = [] return traverse_obj(renderer, (
for key in ('contents', 'items', 'rows'): ('contents', 'items', 'rows'), ..., 'continuationItemRenderer',
contents.extend(try_get(renderer, lambda x: x[key], list) or []) ('continuationEndpoint', ('button', 'buttonRenderer', 'command'))
), get_all=False, expected_type=cls._extract_continuation_ep_data)
for content in contents:
if not isinstance(content, dict):
continue
continuation_ep = try_get(
content, (lambda x: x['continuationItemRenderer']['continuationEndpoint'],
lambda x: x['continuationItemRenderer']['button']['buttonRenderer']['command']),
dict)
continuation = cls._extract_continuation_ep_data(continuation_ep)
if continuation:
return continuation
@classmethod @classmethod
def _extract_alerts(cls, data): def _extract_alerts(cls, data):
@ -4408,8 +4399,8 @@ class YoutubeTabBaseInfoExtractor(YoutubeBaseInfoExtractor):
def _report_history_entries(self, renderer): def _report_history_entries(self, renderer):
for url in traverse_obj(renderer, ( for url in traverse_obj(renderer, (
'rows', ..., 'reportHistoryTableRowRenderer', 'cells', ..., 'rows', ..., 'reportHistoryTableRowRenderer', 'cells', ...,
'reportHistoryTableCellRenderer', 'cell', 'reportHistoryTableTextCellRenderer', 'text', 'runs', ..., 'reportHistoryTableCellRenderer', 'cell', 'reportHistoryTableTextCellRenderer', 'text', 'runs', ...,
'navigationEndpoint', 'commandMetadata', 'webCommandMetadata', 'url')): 'navigationEndpoint', 'commandMetadata', 'webCommandMetadata', 'url')):
yield self.url_result(urljoin('https://www.youtube.com', url), YoutubeIE) yield self.url_result(urljoin('https://www.youtube.com', url), YoutubeIE)
@ -4553,7 +4544,7 @@ class YoutubeTabBaseInfoExtractor(YoutubeBaseInfoExtractor):
uploader['uploader_url'] = urljoin( uploader['uploader_url'] = urljoin(
'https://www.youtube.com/', 'https://www.youtube.com/',
try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], str)) try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], str))
return {k: v for k, v in uploader.items() if v is not None} return filter_dict(uploader)
def _extract_from_tabs(self, item_id, ytcfg, data, tabs): def _extract_from_tabs(self, item_id, ytcfg, data, tabs):
playlist_id = title = description = channel_url = channel_name = channel_id = None playlist_id = title = description = channel_url = channel_name = channel_id = None

@ -5484,7 +5484,7 @@ def jwt_decode_hs256(jwt):
WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
@ functools.cache @functools.cache
def supports_terminal_sequences(stream): def supports_terminal_sequences(stream):
if compat_os_name == 'nt': if compat_os_name == 'nt':
if not WINDOWS_VT_MODE: if not WINDOWS_VT_MODE:
@ -5634,7 +5634,7 @@ class Config:
*(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs), *(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs),
delim='\n') delim='\n')
@ staticmethod @staticmethod
def read_file(filename, default=[]): def read_file(filename, default=[]):
try: try:
optionf = open(filename, 'rb') optionf = open(filename, 'rb')
@ -5655,7 +5655,7 @@ class Config:
optionf.close() optionf.close()
return res return res
@ staticmethod @staticmethod
def hide_login_info(opts): def hide_login_info(opts):
PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'} PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'}
eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$') eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
@ -5679,7 +5679,7 @@ class Config:
if config.init(*args): if config.init(*args):
self.configs.append(config) self.configs.append(config)
@ property @property
def all_args(self): def all_args(self):
for config in reversed(self.configs): for config in reversed(self.configs):
yield from config.all_args yield from config.all_args
@ -5726,7 +5726,7 @@ class WebSocketsWrapper():
# taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
# for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
@ staticmethod @staticmethod
def run_with_loop(main, loop): def run_with_loop(main, loop):
if not asyncio.iscoroutine(main): if not asyncio.iscoroutine(main):
raise ValueError(f'a coroutine was expected, got {main!r}') raise ValueError(f'a coroutine was expected, got {main!r}')
@ -5738,7 +5738,7 @@ class WebSocketsWrapper():
if hasattr(loop, 'shutdown_default_executor'): if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(loop.shutdown_default_executor()) loop.run_until_complete(loop.shutdown_default_executor())
@ staticmethod @staticmethod
def _cancel_all_tasks(loop): def _cancel_all_tasks(loop):
to_cancel = asyncio.all_tasks(loop) to_cancel = asyncio.all_tasks(loop)
@ -5772,7 +5772,7 @@ def cached_method(f):
"""Cache a method""" """Cache a method"""
signature = inspect.signature(f) signature = inspect.signature(f)
@ functools.wraps(f) @functools.wraps(f)
def wrapper(self, *args, **kwargs): def wrapper(self, *args, **kwargs):
bound_args = signature.bind(self, *args, **kwargs) bound_args = signature.bind(self, *args, **kwargs)
bound_args.apply_defaults() bound_args.apply_defaults()
@ -5804,7 +5804,7 @@ class Namespace(types.SimpleNamespace):
def __iter__(self): def __iter__(self):
return iter(self.__dict__.values()) return iter(self.__dict__.values())
@ property @property
def items_(self): def items_(self):
return self.__dict__.items() return self.__dict__.items()
@ -5843,13 +5843,13 @@ class RetryManager:
def _should_retry(self): def _should_retry(self):
return self._error is not NO_DEFAULT and self.attempt <= self.retries return self._error is not NO_DEFAULT and self.attempt <= self.retries
@ property @property
def error(self): def error(self):
if self._error is NO_DEFAULT: if self._error is NO_DEFAULT:
return None return None
return self._error return self._error
@ error.setter @error.setter
def error(self, value): def error(self, value):
self._error = value self._error = value
@ -5861,7 +5861,7 @@ class RetryManager:
if self.error: if self.error:
self.error_callback(self.error, self.attempt, self.retries) self.error_callback(self.error, self.attempt, self.retries)
@ staticmethod @staticmethod
def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None): def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
"""Utility function for reporting retries""" """Utility function for reporting retries"""
if count > retries: if count > retries:

Loading…
Cancel
Save