@ -1,3 +1,4 @@
import contextlib
import ctypes
import json
import os
@ -7,6 +8,7 @@ import subprocess
import sys
import tempfile
from datetime import datetime , timedelta , timezone
from enum import Enum , auto
from hashlib import pbkdf2_hmac
from . aes import aes_cbc_decrypt_bytes , aes_gcm_decrypt_and_verify_bytes
@ -15,7 +17,6 @@ from .compat import (
compat_cookiejar_Cookie ,
)
from . utils import (
bug_reports_message ,
expand_path ,
Popen ,
YoutubeDLCookieJar ,
@ -31,19 +32,16 @@ except ImportError:
try :
import keyring
KEYRING_AVAILABLE = True
KEYRING_UNAVAILABLE_REASON = f ' due to unknown reasons { bug_reports_message ( ) } '
import secretstorage
SECRETSTORAGE_AVAILABLE = True
except ImportError :
KEYRING_AVAILABLE = False
KEYRING_UNAVAILABLE_REASON = (
' as the `keyring` module is not installed. '
' Please install by running `python3 -m pip install keyring`. '
' Depending on your platform, additional packages may be required '
' to access the keyring; see https://pypi.org/project/keyring ' )
SECRETSTORAGE_AVAILABLE = False
SECRETSTORAGE_UNAVAILABLE_REASON = (
' as the `secretstorage` module is not installed. '
' Please install by running `python3 -m pip install secretstorage`. ' )
except Exception as _err :
KEYRING _AVAILABLE = False
KEYRING_UNAVAILABLE_REASON = ' as the `keyring` module could not be initialized: %s ' % _err
SECRETSTORAGE_AVAILABLE = False
SECRETSTORAGE_UNAVAILABLE_REASON = f ' as the `secretstorage` module could not be initialized. { _err } '
CHROMIUM_BASED_BROWSERS = { ' brave ' , ' chrome ' , ' chromium ' , ' edge ' , ' opera ' , ' vivaldi ' }
@ -74,8 +72,8 @@ class YDLLogger:
def load_cookies ( cookie_file , browser_specification , ydl ) :
cookie_jars = [ ]
if browser_specification is not None :
browser_name , profile = _parse_browser_specification ( * browser_specification )
cookie_jars . append ( extract_cookies_from_browser ( browser_name , profile , YDLLogger ( ydl ) ))
browser_name , profile , keyring = _parse_browser_specification ( * browser_specification )
cookie_jars . append ( extract_cookies_from_browser ( browser_name , profile , YDLLogger ( ydl ) , keyring = keyring ))
if cookie_file is not None :
cookie_file = expand_path ( cookie_file )
@ -87,13 +85,13 @@ def load_cookies(cookie_file, browser_specification, ydl):
return _merge_cookie_jars ( cookie_jars )
def extract_cookies_from_browser ( browser_name , profile = None , logger = YDLLogger ( ) ):
def extract_cookies_from_browser ( browser_name , profile = None , logger = YDLLogger ( ) , * , keyring = None ):
if browser_name == ' firefox ' :
return _extract_firefox_cookies ( profile , logger )
elif browser_name == ' safari ' :
return _extract_safari_cookies ( profile , logger )
elif browser_name in CHROMIUM_BASED_BROWSERS :
return _extract_chrome_cookies ( browser_name , profile , logger)
return _extract_chrome_cookies ( browser_name , profile , keyring, logger)
else :
raise ValueError ( ' unknown browser: {} ' . format ( browser_name ) )
@ -207,7 +205,7 @@ def _get_chromium_based_browser_settings(browser_name):
}
def _extract_chrome_cookies ( browser_name , profile , logger) :
def _extract_chrome_cookies ( browser_name , profile , keyring, logger) :
logger . info ( ' Extracting cookies from {} ' . format ( browser_name ) )
if not SQLITE_AVAILABLE :
@ -234,7 +232,7 @@ def _extract_chrome_cookies(browser_name, profile, logger):
raise FileNotFoundError ( ' could not find {} cookies database in " {} " ' . format ( browser_name , search_root ) )
logger . debug ( ' Extracting cookies from: " {} " ' . format ( cookie_database_path ) )
decryptor = get_cookie_decryptor ( config [ ' browser_dir ' ] , config [ ' keyring_name ' ] , logger )
decryptor = get_cookie_decryptor ( config [ ' browser_dir ' ] , config [ ' keyring_name ' ] , logger , keyring = keyring )
with tempfile . TemporaryDirectory ( prefix = ' yt_dlp ' ) as tmpdir :
cursor = None
@ -247,6 +245,7 @@ def _extract_chrome_cookies(browser_name, profile, logger):
' expires_utc, {} FROM cookies ' . format ( secure_column ) )
jar = YoutubeDLCookieJar ( )
failed_cookies = 0
unencrypted_cookies = 0
for host_key , name , value , encrypted_value , path , expires_utc , is_secure in cursor . fetchall ( ) :
host_key = host_key . decode ( ' utf-8 ' )
name = name . decode ( ' utf-8 ' )
@ -258,6 +257,8 @@ def _extract_chrome_cookies(browser_name, profile, logger):
if value is None :
failed_cookies + = 1
continue
else :
unencrypted_cookies + = 1
cookie = compat_cookiejar_Cookie (
version = 0 , name = name , value = value , port = None , port_specified = False ,
@ -270,6 +271,9 @@ def _extract_chrome_cookies(browser_name, profile, logger):
else :
failed_message = ' '
logger . info ( ' Extracted {} cookies from {} {} ' . format ( len ( jar ) , browser_name , failed_message ) )
counts = decryptor . cookie_counts . copy ( )
counts [ ' unencrypted ' ] = unencrypted_cookies
logger . debug ( ' cookie version breakdown: {} ' . format ( counts ) )
return jar
finally :
if cursor is not None :
@ -305,10 +309,14 @@ class ChromeCookieDecryptor:
def decrypt ( self , encrypted_value ) :
raise NotImplementedError
@property
def cookie_counts ( self ) :
raise NotImplementedError
def get_cookie_decryptor ( browser_root , browser_keyring_name , logger ) :
def get_cookie_decryptor ( browser_root , browser_keyring_name , logger , * , keyring = None ):
if sys . platform in ( ' linux ' , ' linux2 ' ) :
return LinuxChromeCookieDecryptor ( browser_keyring_name , logger )
return LinuxChromeCookieDecryptor ( browser_keyring_name , logger , keyring = keyring )
elif sys . platform == ' darwin ' :
return MacChromeCookieDecryptor ( browser_keyring_name , logger )
elif sys . platform == ' win32 ' :
@ -319,13 +327,12 @@ def get_cookie_decryptor(browser_root, browser_keyring_name, logger):
class LinuxChromeCookieDecryptor ( ChromeCookieDecryptor ) :
def __init__ ( self , browser_keyring_name , logger ):
def __init__ ( self , browser_keyring_name , logger , * , keyring = None ):
self . _logger = logger
self . _v10_key = self . derive_key ( b ' peanuts ' )
if KEYRING_AVAILABLE :
self . _v11_key = self . derive_key ( _get_linux_keyring_password ( browser_keyring_name ) )
else :
self . _v11_key = None
password = _get_linux_keyring_password ( browser_keyring_name , keyring , logger )
self . _v11_key = None if password is None else self . derive_key ( password )
self . _cookie_counts = { ' v10 ' : 0 , ' v11 ' : 0 , ' other ' : 0 }
@staticmethod
def derive_key ( password ) :
@ -333,20 +340,27 @@ class LinuxChromeCookieDecryptor(ChromeCookieDecryptor):
# https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_linux.cc
return pbkdf2_sha1 ( password , salt = b ' saltysalt ' , iterations = 1 , key_length = 16 )
@property
def cookie_counts ( self ) :
return self . _cookie_counts
def decrypt ( self , encrypted_value ) :
version = encrypted_value [ : 3 ]
ciphertext = encrypted_value [ 3 : ]
if version == b ' v10 ' :
self . _cookie_counts [ ' v10 ' ] + = 1
return _decrypt_aes_cbc ( ciphertext , self . _v10_key , self . _logger )
elif version == b ' v11 ' :
self . _cookie_counts [ ' v11 ' ] + = 1
if self . _v11_key is None :
self . _logger . warning ( f' cannot decrypt cookie { KEYRING_UNAVAILABLE_REASON } ' , only_once = True )
self . _logger . warning ( ' cannot decrypt v11 cookies: no key found ' , only_once = True )
return None
return _decrypt_aes_cbc ( ciphertext , self . _v11_key , self . _logger )
else :
self . _cookie_counts [ ' other ' ] + = 1
return None
@ -355,6 +369,7 @@ class MacChromeCookieDecryptor(ChromeCookieDecryptor):
self . _logger = logger
password = _get_mac_keyring_password ( browser_keyring_name , logger )
self . _v10_key = None if password is None else self . derive_key ( password )
self . _cookie_counts = { ' v10 ' : 0 , ' other ' : 0 }
@staticmethod
def derive_key ( password ) :
@ -362,11 +377,16 @@ class MacChromeCookieDecryptor(ChromeCookieDecryptor):
# https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
return pbkdf2_sha1 ( password , salt = b ' saltysalt ' , iterations = 1003 , key_length = 16 )
@property
def cookie_counts ( self ) :
return self . _cookie_counts
def decrypt ( self , encrypted_value ) :
version = encrypted_value [ : 3 ]
ciphertext = encrypted_value [ 3 : ]
if version == b ' v10 ' :
self . _cookie_counts [ ' v10 ' ] + = 1
if self . _v10_key is None :
self . _logger . warning ( ' cannot decrypt v10 cookies: no key found ' , only_once = True )
return None
@ -374,6 +394,7 @@ class MacChromeCookieDecryptor(ChromeCookieDecryptor):
return _decrypt_aes_cbc ( ciphertext , self . _v10_key , self . _logger )
else :
self . _cookie_counts [ ' other ' ] + = 1
# other prefixes are considered 'old data' which were stored as plaintext
# https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_mac.mm
return encrypted_value
@ -383,12 +404,18 @@ class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
def __init__ ( self , browser_root , logger ) :
self . _logger = logger
self . _v10_key = _get_windows_v10_key ( browser_root , logger )
self . _cookie_counts = { ' v10 ' : 0 , ' other ' : 0 }
@property
def cookie_counts ( self ) :
return self . _cookie_counts
def decrypt ( self , encrypted_value ) :
version = encrypted_value [ : 3 ]
ciphertext = encrypted_value [ 3 : ]
if version == b ' v10 ' :
self . _cookie_counts [ ' v10 ' ] + = 1
if self . _v10_key is None :
self . _logger . warning ( ' cannot decrypt v10 cookies: no key found ' , only_once = True )
return None
@ -408,6 +435,7 @@ class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
return _decrypt_aes_gcm ( ciphertext , self . _v10_key , nonce , authentication_tag , self . _logger )
else :
self . _cookie_counts [ ' other ' ] + = 1
# any other prefix means the data is DPAPI encrypted
# https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/os_crypt/os_crypt_win.cc
return _decrypt_windows_dpapi ( encrypted_value , self . _logger ) . decode ( ' utf-8 ' )
@ -577,42 +605,221 @@ def parse_safari_cookies(data, jar=None, logger=YDLLogger()):
return jar
def _get_linux_keyring_password ( browser_keyring_name ) :
password = keyring . get_password ( ' {} Keys ' . format ( browser_keyring_name ) ,
' {} Safe Storage ' . format ( browser_keyring_name ) )
if password is None :
# this sometimes occurs in KDE because chrome does not check hasEntry and instead
# just tries to read the value (which kwallet returns "") whereas keyring checks hasEntry
# to verify this:
# dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
# while starting chrome.
# this may be a bug as the intended behaviour is to generate a random password and store
# it, but that doesn't matter here.
password = ' '
return password . encode ( ' utf-8 ' )
class _LinuxDesktopEnvironment ( Enum ) :
"""
https : / / chromium . googlesource . com / chromium / src / + / refs / heads / main / base / nix / xdg_util . h
DesktopEnvironment
"""
OTHER = auto ( )
CINNAMON = auto ( )
GNOME = auto ( )
KDE = auto ( )
PANTHEON = auto ( )
UNITY = auto ( )
XFCE = auto ( )
def _get_mac_keyring_password ( browser_keyring_name , logger ) :
if KEYRING_AVAILABLE :
logger . debug ( ' using keyring to obtain password ' )
password = keyring . get_password ( ' {} Safe Storage ' . format ( browser_keyring_name ) , browser_keyring_name )
return password . encode ( ' utf-8 ' )
class _LinuxKeyring ( Enum ) :
"""
https : / / chromium . googlesource . com / chromium / src / + / refs / heads / main / components / os_crypt / key_storage_util_linux . h
SelectedLinuxBackend
"""
KWALLET = auto ( )
GNOMEKEYRING = auto ( )
BASICTEXT = auto ( )
SUPPORTED_KEYRINGS = _LinuxKeyring . __members__ . keys ( )
def _get_linux_desktop_environment ( env ) :
"""
https : / / chromium . googlesource . com / chromium / src / + / refs / heads / main / base / nix / xdg_util . cc
GetDesktopEnvironment
"""
xdg_current_desktop = env . get ( ' XDG_CURRENT_DESKTOP ' , None )
desktop_session = env . get ( ' DESKTOP_SESSION ' , None )
if xdg_current_desktop is not None :
xdg_current_desktop = xdg_current_desktop . split ( ' : ' ) [ 0 ] . strip ( )
if xdg_current_desktop == ' Unity ' :
if desktop_session is not None and ' gnome-fallback ' in desktop_session :
return _LinuxDesktopEnvironment . GNOME
else :
return _LinuxDesktopEnvironment . UNITY
elif xdg_current_desktop == ' GNOME ' :
return _LinuxDesktopEnvironment . GNOME
elif xdg_current_desktop == ' X-Cinnamon ' :
return _LinuxDesktopEnvironment . CINNAMON
elif xdg_current_desktop == ' KDE ' :
return _LinuxDesktopEnvironment . KDE
elif xdg_current_desktop == ' Pantheon ' :
return _LinuxDesktopEnvironment . PANTHEON
elif xdg_current_desktop == ' XFCE ' :
return _LinuxDesktopEnvironment . XFCE
elif desktop_session is not None :
if desktop_session in ( ' mate ' , ' gnome ' ) :
return _LinuxDesktopEnvironment . GNOME
elif ' kde ' in desktop_session :
return _LinuxDesktopEnvironment . KDE
elif ' xfce ' in desktop_session :
return _LinuxDesktopEnvironment . XFCE
else :
if ' GNOME_DESKTOP_SESSION_ID ' in env :
return _LinuxDesktopEnvironment . GNOME
elif ' KDE_FULL_SESSION ' in env :
return _LinuxDesktopEnvironment . KDE
else :
return _LinuxDesktopEnvironment . OTHER
def _choose_linux_keyring ( logger ) :
"""
https : / / chromium . googlesource . com / chromium / src / + / refs / heads / main / components / os_crypt / key_storage_util_linux . cc
SelectBackend
"""
desktop_environment = _get_linux_desktop_environment ( os . environ )
logger . debug ( ' detected desktop environment: {} ' . format ( desktop_environment . name ) )
if desktop_environment == _LinuxDesktopEnvironment . KDE :
linux_keyring = _LinuxKeyring . KWALLET
elif desktop_environment == _LinuxDesktopEnvironment . OTHER :
linux_keyring = _LinuxKeyring . BASICTEXT
else :
logger . debug ( ' using find-generic-password to obtain password ' )
linux_keyring = _LinuxKeyring . GNOMEKEYRING
return linux_keyring
def _get_kwallet_network_wallet ( logger ) :
""" The name of the wallet used to store network passwords.
https : / / chromium . googlesource . com / chromium / src / + / refs / heads / main / components / os_crypt / kwallet_dbus . cc
KWalletDBus : : NetworkWallet
which does a dbus call to the following function :
https : / / api . kde . org / frameworks / kwallet / html / classKWallet_1_1Wallet . html
Wallet : : NetworkWallet
"""
default_wallet = ' kdewallet '
try :
proc = Popen ( [
' dbus-send ' , ' --session ' , ' --print-reply=literal ' ,
' --dest=org.kde.kwalletd5 ' ,
' /modules/kwalletd5 ' ,
' org.kde.KWallet.networkWallet '
] , stdout = subprocess . PIPE , stderr = subprocess . DEVNULL )
stdout , stderr = proc . communicate_or_kill ( )
if proc . returncode != 0 :
logger . warning ( ' failed to read NetworkWallet ' )
return default_wallet
else :
network_wallet = stdout . decode ( ' utf-8 ' ) . strip ( )
logger . debug ( ' NetworkWallet = " {} " ' . format ( network_wallet ) )
return network_wallet
except BaseException as e :
logger . warning ( ' exception while obtaining NetworkWallet: {} ' . format ( e ) )
return default_wallet
def _get_kwallet_password ( browser_keyring_name , logger ) :
logger . debug ( ' using kwallet-query to obtain password from kwallet ' )
if shutil . which ( ' kwallet-query ' ) is None :
logger . error ( ' kwallet-query command not found. KWallet and kwallet-query '
' must be installed to read from KWallet. kwallet-query should be '
' included in the kwallet package for your distribution ' )
return b ' '
network_wallet = _get_kwallet_network_wallet ( logger )
try :
proc = Popen ( [
' kwallet-query ' ,
' --read-password ' , ' {} Safe Storage ' . format ( browser_keyring_name ) ,
' --folder ' , ' {} Keys ' . format ( browser_keyring_name ) ,
network_wallet
] , stdout = subprocess . PIPE , stderr = subprocess . DEVNULL )
stdout , stderr = proc . communicate_or_kill ( )
if proc . returncode != 0 :
logger . error ( ' kwallet-query failed with return code {} . Please consult '
' the kwallet-query man page for details ' . format ( proc . returncode ) )
return b ' '
else :
if stdout . lower ( ) . startswith ( b ' failed to read ' ) :
logger . debug ( ' failed to read password from kwallet. Using empty string instead ' )
# this sometimes occurs in KDE because chrome does not check hasEntry and instead
# just tries to read the value (which kwallet returns "") whereas kwallet-query
# checks hasEntry. To verify this:
# dbus-monitor "interface='org.kde.KWallet'" "type=method_return"
# while starting chrome.
# this may be a bug as the intended behaviour is to generate a random password and store
# it, but that doesn't matter here.
return b ' '
else :
logger . debug ( ' password found ' )
if stdout [ - 1 : ] == b ' \n ' :
stdout = stdout [ : - 1 ]
return stdout
except BaseException as e :
logger . warning ( f ' exception running kwallet-query: { type ( e ) . __name__ } ( { e } ) ' )
return b ' '
def _get_gnome_keyring_password ( browser_keyring_name , logger ) :
if not SECRETSTORAGE_AVAILABLE :
logger . error ( ' secretstorage not available {} ' . format ( SECRETSTORAGE_UNAVAILABLE_REASON ) )
return b ' '
# the Gnome keyring does not seem to organise keys in the same way as KWallet,
# using `dbus-monitor` during startup, it can be observed that chromium lists all keys
# and presumably searches for its key in the list. It appears that we must do the same.
# https://github.com/jaraco/keyring/issues/556
with contextlib . closing ( secretstorage . dbus_init ( ) ) as con :
col = secretstorage . get_default_collection ( con )
for item in col . get_all_items ( ) :
if item . get_label ( ) == ' {} Safe Storage ' . format ( browser_keyring_name ) :
return item . get_secret ( )
else :
logger . error ( ' failed to read from keyring ' )
return b ' '
def _get_linux_keyring_password ( browser_keyring_name , keyring , logger ) :
# note: chrome/chromium can be run with the following flags to determine which keyring backend
# it has chosen to use
# chromium --enable-logging=stderr --v=1 2>&1 | grep key_storage_
# Chromium supports a flag: --password-store=<basic|gnome|kwallet> so the automatic detection
# will not be sufficient in all cases.
keyring = _LinuxKeyring [ keyring ] or _choose_linux_keyring ( logger )
logger . debug ( f ' Chosen keyring: { keyring . name } ' )
if keyring == _LinuxKeyring . KWALLET :
return _get_kwallet_password ( browser_keyring_name , logger )
elif keyring == _LinuxKeyring . GNOMEKEYRING :
return _get_gnome_keyring_password ( browser_keyring_name , logger )
elif keyring == _LinuxKeyring . BASICTEXT :
# when basic text is chosen, all cookies are stored as v10 (so no keyring password is required)
return None
assert False , f ' Unknown keyring { keyring } '
def _get_mac_keyring_password ( browser_keyring_name , logger ) :
logger . debug ( ' using find-generic-password to obtain password from OSX keychain ' )
try :
proc = Popen (
[ ' security ' , ' find-generic-password ' ,
' -w ' , # write password to stdout
' -a ' , browser_keyring_name , # match 'account'
' -s ' , ' {} Safe Storage ' . format ( browser_keyring_name ) ] , # match 'service'
stdout = subprocess . PIPE , stderr = subprocess . DEVNULL )
try :
stdout , stderr = proc . communicate_or_kill ( )
if stdout [ - 1 : ] == b ' \n ' :
stdout = stdout [ : - 1 ]
return stdout
except BaseException as e :
logger . warning ( f ' exception running find-generic-password: { type ( e ) . __name__ } ( { e } ) ' )
return None
stdout , stderr = proc . communicate_or_kill ( )
if stdout [ - 1 : ] == b ' \n ' :
stdout = stdout [ : - 1 ]
return stdout
except BaseException as e :
logger . warning ( f ' exception running find-generic-password: { type ( e ) . __name__ } ( { e } ) ' )
return None
def _get_windows_v10_key ( browser_root , logger ) :
@ -736,10 +943,11 @@ def _is_path(value):
return os . path . sep in value
def _parse_browser_specification ( browser_name , profile = None ) :
browser_name = browser_name . lower ( )
def _parse_browser_specification ( browser_name , profile = None , keyring = None ) :
if browser_name not in SUPPORTED_BROWSERS :
raise ValueError ( f ' unsupported browser: " { browser_name } " ' )
if keyring not in ( None , * SUPPORTED_KEYRINGS ) :
raise ValueError ( f ' unsupported keyring: " { keyring } " ' )
if profile is not None and _is_path ( profile ) :
profile = os . path . expanduser ( profile )
return browser_name , profile
return browser_name , profile , keyring