implement --cookies-from-browser (#1606)

most of the code is adapted from yt-dlp's implementation
and *should* work the same.
pull/2584/head
Mike Fährmann 2 years ago
parent c4b9f7bab8
commit 6742f3bc1e
No known key found for this signature in database
GPG Key ID: 5680CA389D365A88

@ -399,16 +399,19 @@ Description
extractor.*.cookies
-------------------
Type
|Path|_ or ``object``
|Path|_ or ``object`` or ``list``
Default
``null``
Description
Source to read additional cookies from. Either as
Source to read additional cookies from. This can be
* the |Path|_ to a Mozilla/Netscape format cookies.txt file or
* a JSON ``object`` specifying cookies as a name-to-value mapping
* The |Path|_ to a Mozilla/Netscape format cookies.txt file
Example:
.. code:: json
"~/.local/share/cookies-instagram-com.txt"
* An ``object`` specifying cookies as name-value pairs
.. code:: json
@ -418,6 +421,17 @@ Description
"isAdult" : "1"
}
* A ``list`` with up to 3 entries specifying a browser profile.
* The first entry is the browser name
* The optional second entry is a profile name or an absolote path to a profile directory
* The optional third entry is the keyring to retrieve passwords for decrypting cookies from
.. code:: json
["firefox"]
["chromium", "Private", "kwallet"]
extractor.*.cookies-update
--------------------------

@ -131,6 +131,10 @@ def main():
config.set((), "skip", "abort:" + str(args.abort))
if args.terminate:
config.set((), "skip", "terminate:" + str(args.terminate))
if args.cookies_from_browser:
browser, _, profile = args.cookies_from_browser.partition(":")
browser, _, keyring = browser.partition("+")
config.set((), "cookies", (browser, profile, keyring))
for opts in args.options:
config.set(*opts)

@ -0,0 +1,641 @@
# -*- coding: utf-8 -*-
# This is a slightly modified version of yt-dlp's aes module.
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/aes.py
import struct
import binascii
from math import ceil
try:
from Cryptodome.Cipher import AES as Cryptodome_AES
except ImportError:
try:
from Crypto.Cipher import AES as Cryptodome_AES
except ImportError:
Cryptodome_AES = None
if Cryptodome_AES:
def aes_cbc_decrypt_bytes(data, key, iv):
"""Decrypt bytes with AES-CBC using pycryptodome"""
return Cryptodome_AES.new(
key, Cryptodome_AES.MODE_CBC, iv).decrypt(data)
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
"""Decrypt bytes with AES-GCM using pycryptodome"""
return Cryptodome_AES.new(
key, Cryptodome_AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
else:
def aes_cbc_decrypt_bytes(data, key, iv):
"""Decrypt bytes with AES-CBC using native implementation"""
return intlist_to_bytes(aes_cbc_decrypt(
bytes_to_intlist(data),
bytes_to_intlist(key),
bytes_to_intlist(iv),
))
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
"""Decrypt bytes with AES-GCM using native implementation"""
return intlist_to_bytes(aes_gcm_decrypt_and_verify(
bytes_to_intlist(data),
bytes_to_intlist(key),
bytes_to_intlist(tag),
bytes_to_intlist(nonce),
))
bytes_to_intlist = list
def intlist_to_bytes(xs):
if not xs:
return b""
return struct.pack("%dB" % len(xs), *xs)
def unpad_pkcs7(data):
return data[:-data[-1]]
BLOCK_SIZE_BYTES = 16
def aes_ecb_encrypt(data, key, iv=None):
"""
Encrypt with aes in ECB mode
@param {int[]} data cleartext
@param {int[]} key 16/24/32-Byte cipher key
@param {int[]} iv Unused for this mode
@returns {int[]} encrypted data
"""
expanded_key = key_expansion(key)
block_count = int(ceil(float(len(data)) / BLOCK_SIZE_BYTES))
encrypted_data = []
for i in range(block_count):
block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]
encrypted_data += aes_encrypt(block, expanded_key)
encrypted_data = encrypted_data[:len(data)]
return encrypted_data
def aes_ecb_decrypt(data, key, iv=None):
"""
Decrypt with aes in ECB mode
@param {int[]} data cleartext
@param {int[]} key 16/24/32-Byte cipher key
@param {int[]} iv Unused for this mode
@returns {int[]} decrypted data
"""
expanded_key = key_expansion(key)
block_count = int(ceil(float(len(data)) / BLOCK_SIZE_BYTES))
encrypted_data = []
for i in range(block_count):
block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]
encrypted_data += aes_decrypt(block, expanded_key)
encrypted_data = encrypted_data[:len(data)]
return encrypted_data
def aes_ctr_decrypt(data, key, iv):
"""
Decrypt with aes in counter mode
@param {int[]} data cipher
@param {int[]} key 16/24/32-Byte cipher key
@param {int[]} iv 16-Byte initialization vector
@returns {int[]} decrypted data
"""
return aes_ctr_encrypt(data, key, iv)
def aes_ctr_encrypt(data, key, iv):
"""
Encrypt with aes in counter mode
@param {int[]} data cleartext
@param {int[]} key 16/24/32-Byte cipher key
@param {int[]} iv 16-Byte initialization vector
@returns {int[]} encrypted data
"""
expanded_key = key_expansion(key)
block_count = int(ceil(float(len(data)) / BLOCK_SIZE_BYTES))
counter = iter_vector(iv)
encrypted_data = []
for i in range(block_count):
counter_block = next(counter)
block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]
block += [0] * (BLOCK_SIZE_BYTES - len(block))
cipher_counter_block = aes_encrypt(counter_block, expanded_key)
encrypted_data += xor(block, cipher_counter_block)
encrypted_data = encrypted_data[:len(data)]
return encrypted_data
def aes_cbc_decrypt(data, key, iv):
"""
Decrypt with aes in CBC mode
@param {int[]} data cipher
@param {int[]} key 16/24/32-Byte cipher key
@param {int[]} iv 16-Byte IV
@returns {int[]} decrypted data
"""
expanded_key = key_expansion(key)
block_count = int(ceil(float(len(data)) / BLOCK_SIZE_BYTES))
decrypted_data = []
previous_cipher_block = iv
for i in range(block_count):
block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]
block += [0] * (BLOCK_SIZE_BYTES - len(block))
decrypted_block = aes_decrypt(block, expanded_key)
decrypted_data += xor(decrypted_block, previous_cipher_block)
previous_cipher_block = block
decrypted_data = decrypted_data[:len(data)]
return decrypted_data
def aes_cbc_encrypt(data, key, iv):
"""
Encrypt with aes in CBC mode. Using PKCS#7 padding
@param {int[]} data cleartext
@param {int[]} key 16/24/32-Byte cipher key
@param {int[]} iv 16-Byte IV
@returns {int[]} encrypted data
"""
expanded_key = key_expansion(key)
block_count = int(ceil(float(len(data)) / BLOCK_SIZE_BYTES))
encrypted_data = []
previous_cipher_block = iv
for i in range(block_count):
block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]
remaining_length = BLOCK_SIZE_BYTES - len(block)
block += [remaining_length] * remaining_length
mixed_block = xor(block, previous_cipher_block)
encrypted_block = aes_encrypt(mixed_block, expanded_key)
encrypted_data += encrypted_block
previous_cipher_block = encrypted_block
return encrypted_data
def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
"""
Decrypt with aes in GBM mode and checks authenticity using tag
@param {int[]} data cipher
@param {int[]} key 16-Byte cipher key
@param {int[]} tag authentication tag
@param {int[]} nonce IV (recommended 12-Byte)
@returns {int[]} decrypted data
"""
# XXX: check aes, gcm param
hash_subkey = aes_encrypt([0] * BLOCK_SIZE_BYTES, key_expansion(key))
if len(nonce) == 12:
j0 = nonce + [0, 0, 0, 1]
else:
fill = (BLOCK_SIZE_BYTES - (len(nonce) % BLOCK_SIZE_BYTES)) % \
BLOCK_SIZE_BYTES + 8
ghash_in = nonce + [0] * fill + bytes_to_intlist(
(8 * len(nonce)).to_bytes(8, "big"))
j0 = ghash(hash_subkey, ghash_in)
# TODO: add nonce support to aes_ctr_decrypt
# nonce_ctr = j0[:12]
iv_ctr = inc(j0)
decrypted_data = aes_ctr_decrypt(
data, key, iv_ctr + [0] * (BLOCK_SIZE_BYTES - len(iv_ctr)))
pad_len = len(data) // 16 * 16
s_tag = ghash(
hash_subkey,
data +
[0] * (BLOCK_SIZE_BYTES - len(data) + pad_len) + # pad
bytes_to_intlist(
(0 * 8).to_bytes(8, "big") + # length of associated data
((len(data) * 8).to_bytes(8, "big")) # length of data
)
)
if tag != aes_ctr_encrypt(s_tag, key, j0):
raise ValueError("Mismatching authentication tag")
return decrypted_data
def aes_encrypt(data, expanded_key):
"""
Encrypt one block with aes
@param {int[]} data 16-Byte state
@param {int[]} expanded_key 176/208/240-Byte expanded key
@returns {int[]} 16-Byte cipher
"""
rounds = len(expanded_key) // BLOCK_SIZE_BYTES - 1
data = xor(data, expanded_key[:BLOCK_SIZE_BYTES])
for i in range(1, rounds + 1):
data = sub_bytes(data)
data = shift_rows(data)
if i != rounds:
data = list(iter_mix_columns(data, MIX_COLUMN_MATRIX))
data = xor(data, expanded_key[
i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES])
return data
def aes_decrypt(data, expanded_key):
"""
Decrypt one block with aes
@param {int[]} data 16-Byte cipher
@param {int[]} expanded_key 176/208/240-Byte expanded key
@returns {int[]} 16-Byte state
"""
rounds = len(expanded_key) // BLOCK_SIZE_BYTES - 1
for i in range(rounds, 0, -1):
data = xor(data, expanded_key[
i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES])
if i != rounds:
data = list(iter_mix_columns(data, MIX_COLUMN_MATRIX_INV))
data = shift_rows_inv(data)
data = sub_bytes_inv(data)
data = xor(data, expanded_key[:BLOCK_SIZE_BYTES])
return data
def aes_decrypt_text(data, password, key_size_bytes):
"""
Decrypt text
- The first 8 Bytes of decoded 'data' are the 8 high Bytes of the counter
- The cipher key is retrieved by encrypting the first 16 Byte of 'password'
with the first 'key_size_bytes' Bytes from 'password'
(if necessary filled with 0's)
- Mode of operation is 'counter'
@param {str} data Base64 encoded string
@param {str,unicode} password Password (will be encoded with utf-8)
@param {int} key_size_bytes Possible values: 16 for 128-Bit,
24 for 192-Bit, or
32 for 256-Bit
@returns {str} Decrypted data
"""
NONCE_LENGTH_BYTES = 8
data = bytes_to_intlist(binascii.a2b_base64(data))
password = bytes_to_intlist(password.encode("utf-8"))
key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password))
key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key_expansion(key)) * \
(key_size_bytes // BLOCK_SIZE_BYTES)
nonce = data[:NONCE_LENGTH_BYTES]
cipher = data[NONCE_LENGTH_BYTES:]
return intlist_to_bytes(aes_ctr_decrypt(
cipher, key, nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES)
))
RCON = (
0x8d, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80, 0x1b, 0x36,
)
SBOX = (
0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5,
0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76,
0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0,
0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0,
0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC,
0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15,
0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A,
0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75,
0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0,
0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84,
0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B,
0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF,
0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85,
0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8,
0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5,
0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2,
0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17,
0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73,
0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88,
0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB,
0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C,
0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79,
0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9,
0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08,
0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6,
0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A,
0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E,
0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E,
0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94,
0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF,
0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68,
0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16,
)
SBOX_INV = (
0x52, 0x09, 0x6a, 0xd5, 0x30, 0x36, 0xa5, 0x38,
0xbf, 0x40, 0xa3, 0x9e, 0x81, 0xf3, 0xd7, 0xfb,
0x7c, 0xe3, 0x39, 0x82, 0x9b, 0x2f, 0xff, 0x87,
0x34, 0x8e, 0x43, 0x44, 0xc4, 0xde, 0xe9, 0xcb,
0x54, 0x7b, 0x94, 0x32, 0xa6, 0xc2, 0x23, 0x3d,
0xee, 0x4c, 0x95, 0x0b, 0x42, 0xfa, 0xc3, 0x4e,
0x08, 0x2e, 0xa1, 0x66, 0x28, 0xd9, 0x24, 0xb2,
0x76, 0x5b, 0xa2, 0x49, 0x6d, 0x8b, 0xd1, 0x25,
0x72, 0xf8, 0xf6, 0x64, 0x86, 0x68, 0x98, 0x16,
0xd4, 0xa4, 0x5c, 0xcc, 0x5d, 0x65, 0xb6, 0x92,
0x6c, 0x70, 0x48, 0x50, 0xfd, 0xed, 0xb9, 0xda,
0x5e, 0x15, 0x46, 0x57, 0xa7, 0x8d, 0x9d, 0x84,
0x90, 0xd8, 0xab, 0x00, 0x8c, 0xbc, 0xd3, 0x0a,
0xf7, 0xe4, 0x58, 0x05, 0xb8, 0xb3, 0x45, 0x06,
0xd0, 0x2c, 0x1e, 0x8f, 0xca, 0x3f, 0x0f, 0x02,
0xc1, 0xaf, 0xbd, 0x03, 0x01, 0x13, 0x8a, 0x6b,
0x3a, 0x91, 0x11, 0x41, 0x4f, 0x67, 0xdc, 0xea,
0x97, 0xf2, 0xcf, 0xce, 0xf0, 0xb4, 0xe6, 0x73,
0x96, 0xac, 0x74, 0x22, 0xe7, 0xad, 0x35, 0x85,
0xe2, 0xf9, 0x37, 0xe8, 0x1c, 0x75, 0xdf, 0x6e,
0x47, 0xf1, 0x1a, 0x71, 0x1d, 0x29, 0xc5, 0x89,
0x6f, 0xb7, 0x62, 0x0e, 0xaa, 0x18, 0xbe, 0x1b,
0xfc, 0x56, 0x3e, 0x4b, 0xc6, 0xd2, 0x79, 0x20,
0x9a, 0xdb, 0xc0, 0xfe, 0x78, 0xcd, 0x5a, 0xf4,
0x1f, 0xdd, 0xa8, 0x33, 0x88, 0x07, 0xc7, 0x31,
0xb1, 0x12, 0x10, 0x59, 0x27, 0x80, 0xec, 0x5f,
0x60, 0x51, 0x7f, 0xa9, 0x19, 0xb5, 0x4a, 0x0d,
0x2d, 0xe5, 0x7a, 0x9f, 0x93, 0xc9, 0x9c, 0xef,
0xa0, 0xe0, 0x3b, 0x4d, 0xae, 0x2a, 0xf5, 0xb0,
0xc8, 0xeb, 0xbb, 0x3c, 0x83, 0x53, 0x99, 0x61,
0x17, 0x2b, 0x04, 0x7e, 0xba, 0x77, 0xd6, 0x26,
0xe1, 0x69, 0x14, 0x63, 0x55, 0x21, 0x0c, 0x7d
)
MIX_COLUMN_MATRIX = (
(0x2, 0x3, 0x1, 0x1),
(0x1, 0x2, 0x3, 0x1),
(0x1, 0x1, 0x2, 0x3),
(0x3, 0x1, 0x1, 0x2),
)
MIX_COLUMN_MATRIX_INV = (
(0xE, 0xB, 0xD, 0x9),
(0x9, 0xE, 0xB, 0xD),
(0xD, 0x9, 0xE, 0xB),
(0xB, 0xD, 0x9, 0xE),
)
RIJNDAEL_EXP_TABLE = (
0x01, 0x03, 0x05, 0x0F, 0x11, 0x33, 0x55, 0xFF,
0x1A, 0x2E, 0x72, 0x96, 0xA1, 0xF8, 0x13, 0x35,
0x5F, 0xE1, 0x38, 0x48, 0xD8, 0x73, 0x95, 0xA4,
0xF7, 0x02, 0x06, 0x0A, 0x1E, 0x22, 0x66, 0xAA,
0xE5, 0x34, 0x5C, 0xE4, 0x37, 0x59, 0xEB, 0x26,
0x6A, 0xBE, 0xD9, 0x70, 0x90, 0xAB, 0xE6, 0x31,
0x53, 0xF5, 0x04, 0x0C, 0x14, 0x3C, 0x44, 0xCC,
0x4F, 0xD1, 0x68, 0xB8, 0xD3, 0x6E, 0xB2, 0xCD,
0x4C, 0xD4, 0x67, 0xA9, 0xE0, 0x3B, 0x4D, 0xD7,
0x62, 0xA6, 0xF1, 0x08, 0x18, 0x28, 0x78, 0x88,
0x83, 0x9E, 0xB9, 0xD0, 0x6B, 0xBD, 0xDC, 0x7F,
0x81, 0x98, 0xB3, 0xCE, 0x49, 0xDB, 0x76, 0x9A,
0xB5, 0xC4, 0x57, 0xF9, 0x10, 0x30, 0x50, 0xF0,
0x0B, 0x1D, 0x27, 0x69, 0xBB, 0xD6, 0x61, 0xA3,
0xFE, 0x19, 0x2B, 0x7D, 0x87, 0x92, 0xAD, 0xEC,
0x2F, 0x71, 0x93, 0xAE, 0xE9, 0x20, 0x60, 0xA0,
0xFB, 0x16, 0x3A, 0x4E, 0xD2, 0x6D, 0xB7, 0xC2,
0x5D, 0xE7, 0x32, 0x56, 0xFA, 0x15, 0x3F, 0x41,
0xC3, 0x5E, 0xE2, 0x3D, 0x47, 0xC9, 0x40, 0xC0,
0x5B, 0xED, 0x2C, 0x74, 0x9C, 0xBF, 0xDA, 0x75,
0x9F, 0xBA, 0xD5, 0x64, 0xAC, 0xEF, 0x2A, 0x7E,
0x82, 0x9D, 0xBC, 0xDF, 0x7A, 0x8E, 0x89, 0x80,
0x9B, 0xB6, 0xC1, 0x58, 0xE8, 0x23, 0x65, 0xAF,
0xEA, 0x25, 0x6F, 0xB1, 0xC8, 0x43, 0xC5, 0x54,
0xFC, 0x1F, 0x21, 0x63, 0xA5, 0xF4, 0x07, 0x09,
0x1B, 0x2D, 0x77, 0x99, 0xB0, 0xCB, 0x46, 0xCA,
0x45, 0xCF, 0x4A, 0xDE, 0x79, 0x8B, 0x86, 0x91,
0xA8, 0xE3, 0x3E, 0x42, 0xC6, 0x51, 0xF3, 0x0E,
0x12, 0x36, 0x5A, 0xEE, 0x29, 0x7B, 0x8D, 0x8C,
0x8F, 0x8A, 0x85, 0x94, 0xA7, 0xF2, 0x0D, 0x17,
0x39, 0x4B, 0xDD, 0x7C, 0x84, 0x97, 0xA2, 0xFD,
0x1C, 0x24, 0x6C, 0xB4, 0xC7, 0x52, 0xF6, 0x01,
)
RIJNDAEL_LOG_TABLE = (
0x00, 0x00, 0x19, 0x01, 0x32, 0x02, 0x1a, 0xc6,
0x4b, 0xc7, 0x1b, 0x68, 0x33, 0xee, 0xdf, 0x03,
0x64, 0x04, 0xe0, 0x0e, 0x34, 0x8d, 0x81, 0xef,
0x4c, 0x71, 0x08, 0xc8, 0xf8, 0x69, 0x1c, 0xc1,
0x7d, 0xc2, 0x1d, 0xb5, 0xf9, 0xb9, 0x27, 0x6a,
0x4d, 0xe4, 0xa6, 0x72, 0x9a, 0xc9, 0x09, 0x78,
0x65, 0x2f, 0x8a, 0x05, 0x21, 0x0f, 0xe1, 0x24,
0x12, 0xf0, 0x82, 0x45, 0x35, 0x93, 0xda, 0x8e,
0x96, 0x8f, 0xdb, 0xbd, 0x36, 0xd0, 0xce, 0x94,
0x13, 0x5c, 0xd2, 0xf1, 0x40, 0x46, 0x83, 0x38,
0x66, 0xdd, 0xfd, 0x30, 0xbf, 0x06, 0x8b, 0x62,
0xb3, 0x25, 0xe2, 0x98, 0x22, 0x88, 0x91, 0x10,
0x7e, 0x6e, 0x48, 0xc3, 0xa3, 0xb6, 0x1e, 0x42,
0x3a, 0x6b, 0x28, 0x54, 0xfa, 0x85, 0x3d, 0xba,
0x2b, 0x79, 0x0a, 0x15, 0x9b, 0x9f, 0x5e, 0xca,
0x4e, 0xd4, 0xac, 0xe5, 0xf3, 0x73, 0xa7, 0x57,
0xaf, 0x58, 0xa8, 0x50, 0xf4, 0xea, 0xd6, 0x74,
0x4f, 0xae, 0xe9, 0xd5, 0xe7, 0xe6, 0xad, 0xe8,
0x2c, 0xd7, 0x75, 0x7a, 0xeb, 0x16, 0x0b, 0xf5,
0x59, 0xcb, 0x5f, 0xb0, 0x9c, 0xa9, 0x51, 0xa0,
0x7f, 0x0c, 0xf6, 0x6f, 0x17, 0xc4, 0x49, 0xec,
0xd8, 0x43, 0x1f, 0x2d, 0xa4, 0x76, 0x7b, 0xb7,
0xcc, 0xbb, 0x3e, 0x5a, 0xfb, 0x60, 0xb1, 0x86,
0x3b, 0x52, 0xa1, 0x6c, 0xaa, 0x55, 0x29, 0x9d,
0x97, 0xb2, 0x87, 0x90, 0x61, 0xbe, 0xdc, 0xfc,
0xbc, 0x95, 0xcf, 0xcd, 0x37, 0x3f, 0x5b, 0xd1,
0x53, 0x39, 0x84, 0x3c, 0x41, 0xa2, 0x6d, 0x47,
0x14, 0x2a, 0x9e, 0x5d, 0x56, 0xf2, 0xd3, 0xab,
0x44, 0x11, 0x92, 0xd9, 0x23, 0x20, 0x2e, 0x89,
0xb4, 0x7c, 0xb8, 0x26, 0x77, 0x99, 0xe3, 0xa5,
0x67, 0x4a, 0xed, 0xde, 0xc5, 0x31, 0xfe, 0x18,
0x0d, 0x63, 0x8c, 0x80, 0xc0, 0xf7, 0x70, 0x07,
)
def key_expansion(data):
"""
Generate key schedule
@param {int[]} data 16/24/32-Byte cipher key
@returns {int[]} 176/208/240-Byte expanded key
"""
data = data[:] # copy
rcon_iteration = 1
key_size_bytes = len(data)
expanded_key_size_bytes = (key_size_bytes // 4 + 7) * BLOCK_SIZE_BYTES
while len(data) < expanded_key_size_bytes:
temp = data[-4:]
temp = key_schedule_core(temp, rcon_iteration)
rcon_iteration += 1
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
for _ in range(3):
temp = data[-4:]
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
if key_size_bytes == 32:
temp = data[-4:]
temp = sub_bytes(temp)
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
for _ in range(3 if key_size_bytes == 32 else
2 if key_size_bytes == 24 else 0):
temp = data[-4:]
data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes])
data = data[:expanded_key_size_bytes]
return data
def iter_vector(iv):
while True:
yield iv
iv = inc(iv)
def sub_bytes(data):
return [SBOX[x] for x in data]
def sub_bytes_inv(data):
return [SBOX_INV[x] for x in data]
def rotate(data):
return data[1:] + [data[0]]
def key_schedule_core(data, rcon_iteration):
data = rotate(data)
data = sub_bytes(data)
data[0] = data[0] ^ RCON[rcon_iteration]
return data
def xor(data1, data2):
return [x ^ y for x, y in zip(data1, data2)]
def iter_mix_columns(data, matrix):
for i in (0, 4, 8, 12):
for row in matrix:
mixed = 0
for j in range(4):
if data[i:i + 4][j] == 0 or row[j] == 0:
mixed ^= 0
else:
mixed ^= RIJNDAEL_EXP_TABLE[
(RIJNDAEL_LOG_TABLE[data[i + j]] +
RIJNDAEL_LOG_TABLE[row[j]]) % 0xFF
]
yield mixed
def shift_rows(data):
return [
data[((column + row) & 0b11) * 4 + row]
for column in range(4)
for row in range(4)
]
def shift_rows_inv(data):
return [
data[((column - row) & 0b11) * 4 + row]
for column in range(4)
for row in range(4)
]
def shift_block(data):
data_shifted = []
bit = 0
for n in data:
if bit:
n |= 0x100
bit = n & 1
n >>= 1
data_shifted.append(n)
return data_shifted
def inc(data):
data = data[:] # copy
for i in range(len(data) - 1, -1, -1):
if data[i] == 255:
data[i] = 0
else:
data[i] = data[i] + 1
break
return data
def block_product(block_x, block_y):
# NIST SP 800-38D, Algorithm 1
if len(block_x) != BLOCK_SIZE_BYTES or len(block_y) != BLOCK_SIZE_BYTES:
raise ValueError(
"Length of blocks need to be %d bytes" % BLOCK_SIZE_BYTES)
block_r = [0xE1] + [0] * (BLOCK_SIZE_BYTES - 1)
block_v = block_y[:]
block_z = [0] * BLOCK_SIZE_BYTES
for i in block_x:
for bit in range(7, -1, -1):
if i & (1 << bit):
block_z = xor(block_z, block_v)
do_xor = block_v[-1] & 1
block_v = shift_block(block_v)
if do_xor:
block_v = xor(block_v, block_r)
return block_z
def ghash(subkey, data):
# NIST SP 800-38D, Algorithm 2
if len(data) % BLOCK_SIZE_BYTES:
raise ValueError(
"Length of data should be %d bytes" % BLOCK_SIZE_BYTES)
last_y = [0] * BLOCK_SIZE_BYTES
for i in range(0, len(data), BLOCK_SIZE_BYTES):
block = data[i: i + BLOCK_SIZE_BYTES]
last_y = block_product(xor(last_y, block), subkey)
return last_y

@ -0,0 +1,968 @@
# -*- coding: utf-8 -*-
# Copyright 2022 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
# Adapted freom yt-dlp's cookies module.
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/cookies.py
import binascii
import contextlib
import ctypes
import json
import logging
import os
import shutil
import sqlite3
import struct
import subprocess
import sys
import tempfile
from datetime import datetime, timedelta, timezone
from hashlib import pbkdf2_hmac
from http.cookiejar import Cookie
from . import aes
SUPPORTED_BROWSERS_CHROMIUM = {
"brave", "chrome", "chromium", "edge", "opera", "vivaldi"}
SUPPORTED_BROWSERS = SUPPORTED_BROWSERS_CHROMIUM | {"firefox", "safari"}
logger = logging.getLogger("cookies")
def load_cookies(cookiejar, browser_specification):
browser_name, profile, keyring = \
_parse_browser_specification(*browser_specification)
if browser_name == "firefox":
load_cookies_firefox(cookiejar, profile)
elif browser_name == "safari":
load_cookies_safari(cookiejar, profile)
elif browser_name in SUPPORTED_BROWSERS_CHROMIUM:
load_cookies_chrome(cookiejar, browser_name, profile, keyring)
else:
raise ValueError("unknown browser '{}'".format(browser_name))
def load_cookies_firefox(cookiejar, profile=None):
set_cookie = cookiejar.set_cookie
with _firefox_cookies_database(profile) as db:
for name, value, domain, path, secure, expires in db.execute(
"SELECT name, value, host, path, isSecure, expiry "
"FROM moz_cookies"):
set_cookie(Cookie(
0, name, value, None, False,
domain, bool(domain), domain.startswith("."),
path, bool(path), secure, expires, False, None, None, {},
))
def load_cookies_safari(cookiejar, profile=None):
"""Ref.: https://github.com/libyal/dtformats/blob
/main/documentation/Safari%20Cookies.asciidoc
- This data appears to be out of date
but the important parts of the database structure is the same
- There are a few bytes here and there
which are skipped during parsing
"""
with _safari_cookies_database() as fp:
data = fp.read()
page_sizes, body_start = _safari_parse_cookies_header(data)
p = DataParser(data[body_start:])
for page_size in page_sizes:
_safari_parse_cookies_page(p.read_bytes(page_size), cookiejar)
def load_cookies_chrome(cookiejar, browser_name, profile, keyring):
config = _get_chromium_based_browser_settings(browser_name)
with _chrome_cookies_database(profile, config) as db:
decryptor = get_cookie_decryptor(
config["directory"], config["keyring"], keyring=keyring)
failed_cookies = 0
unencrypted_cookies = 0
db.text_factory = bytes
set_cookie = cookiejar.set_cookie
try:
rows = db.execute(
"SELECT host_key, name, value, encrypted_value, path, "
"expires_utc, is_secure FROM cookies")
except sqlite3.OperationalError:
print(1)
rows = db.execute(
"SELECT host_key, name, value, encrypted_value, path, "
"expires_utc, secure FROM cookies")
for row in rows:
is_encrypted, cookie = _process_chrome_cookie(decryptor, *row)
if not cookie:
failed_cookies += 1
continue
elif not is_encrypted:
unencrypted_cookies += 1
set_cookie(cookie)
if failed_cookies > 0:
failed_message = " ({} could not be decrypted)".format(failed_cookies)
else:
failed_message = ""
logger.info("Extracted %s cookies from %s%s",
len(cookiejar), browser_name, failed_message)
counts = decryptor.cookie_counts.copy()
counts["unencrypted"] = unencrypted_cookies
logger.debug("cookie version breakdown: %s", counts)
# --------------------------------------------------------------------
# firefox
def _firefox_cookies_database(profile=None):
if profile is None:
search_root = _firefox_browser_directory()
elif _is_path(profile):
search_root = profile
else:
search_root = os.path.join(_firefox_browser_directory(), profile)
path = _find_most_recently_used_file(search_root, "cookies.sqlite")
if path is None:
raise FileNotFoundError("Unable to find Firefox cookies database in "
"{}".format(search_root))
logger.debug("Extracting cookies from %s", path)
return DatabaseCopy(path)
def _firefox_browser_directory():
if sys.platform in ("linux", "linux2"):
return os.path.expanduser("~/.mozilla/firefox")
if sys.platform == "win32":
return os.path.expandvars(R"%APPDATA%\Mozilla\Firefox\Profiles")
if sys.platform == "darwin":
return os.path.expanduser("~/Library/Application Support/Firefox")
raise ValueError("unsupported platform '{}'".format(sys.platform))
# --------------------------------------------------------------------
# safari
def _safari_cookies_database():
try:
path = os.path.expanduser("~/Library/Cookies/Cookies.binarycookies")
return open(path, "rb")
except FileNotFoundError:
logger.debug("Trying secondary cookie location")
path = os.path.expanduser("~/Library/Containers/com.apple.Safari/Data"
"/Library/Cookies/Cookies.binarycookies")
return open(path, "rb")
def _safari_parse_cookies_header(data):
p = DataParser(data)
p.expect_bytes(b"cook", "database signature")
number_of_pages = p.read_uint(big_endian=True)
page_sizes = [p.read_uint(big_endian=True)
for _ in range(number_of_pages)]
return page_sizes, p.cursor
def _safari_parse_cookies_page(data, jar):
p = DataParser(data)
p.expect_bytes(b"\x00\x00\x01\x00", "page signature")
number_of_cookies = p.read_uint()
record_offsets = [p.read_uint() for _ in range(number_of_cookies)]
if number_of_cookies == 0:
logger.debug("a cookies page of size %s has no cookies", len(data))
return
p.skip_to(record_offsets[0], "unknown page header field")
for i, record_offset in enumerate(record_offsets):
p.skip_to(record_offset, "space between records")
record_length = _safari_parse_cookies_record(
data[record_offset:], jar)
p.read_bytes(record_length)
p.skip_to_end("space in between pages")
def _safari_parse_cookies_record(data, cookiejar):
p = DataParser(data)
record_size = p.read_uint()
p.skip(4, "unknown record field 1")
flags = p.read_uint()
is_secure = bool(flags & 0x0001)
p.skip(4, "unknown record field 2")
domain_offset = p.read_uint()
name_offset = p.read_uint()
path_offset = p.read_uint()
value_offset = p.read_uint()
p.skip(8, "unknown record field 3")
expiration_date = _mac_absolute_time_to_posix(p.read_double())
_creation_date = _mac_absolute_time_to_posix(p.read_double()) # noqa: F841
try:
p.skip_to(domain_offset)
domain = p.read_cstring()
p.skip_to(name_offset)
name = p.read_cstring()
p.skip_to(path_offset)
path = p.read_cstring()
p.skip_to(value_offset)
value = p.read_cstring()
except UnicodeDecodeError:
logger.warning("failed to parse Safari cookie "
"because UTF-8 decoding failed")
return record_size
p.skip_to(record_size, "space at the end of the record")
cookiejar.set_cookie(Cookie(
0, name, value, None, False,
domain, bool(domain), domain.startswith('.'),
path, bool(path), is_secure, expiration_date, False,
None, None, {},
))
return record_size
# --------------------------------------------------------------------
# chrome
def _chrome_cookies_database(profile, config):
if profile is None:
search_root = config["directory"]
elif _is_path(profile):
search_root = profile
config["directory"] = (os.path.dirname(profile)
if config["profiles"] else profile)
elif config["profiles"]:
search_root = os.path.join(config["directory"], profile)
else:
logger.warning("%s does not support profiles", config["browser"])
search_root = config["directory"]
path = _find_most_recently_used_file(search_root, "Cookies")
if path is None:
raise FileNotFoundError("Unable tp find {} cookies database in "
"'{}'".format(config["browser"], search_root))
logger.debug("Extracting cookies from %s", path)
return DatabaseCopy(path)
def _get_chromium_based_browser_settings(browser_name):
# https://chromium.googlesource.com/chromium
# /src/+/HEAD/docs/user_data_dir.md
join = os.path.join
if sys.platform in ("linux", "linux2"):
config = (os.environ.get("XDG_CONFIG_HOME") or
os.path.expanduser("~/.config"))
browser_dir = {
"brave" : join(config, "BraveSoftware/Brave-Browser"),
"chrome" : join(config, "google-chrome"),
"chromium": join(config, "chromium"),
"edge" : join(config, "microsoft-edge"),
"opera" : join(config, "opera"),
"vivaldi" : join(config, "vivaldi"),
}[browser_name]
elif sys.platform == "win32":
appdata_local = os.path.expandvars("%LOCALAPPDATA%")
appdata_roaming = os.path.expandvars("%APPDATA%")
browser_dir = {
"brave" : join(appdata_local,
R"BraveSoftware\Brave-Browser\User Data"),
"chrome" : join(appdata_local, R"Google\Chrome\User Data"),
"chromium": join(appdata_local, R"Chromium\User Data"),
"edge" : join(appdata_local, R"Microsoft\Edge\User Data"),
"opera" : join(appdata_roaming, R"Opera Software\Opera Stable"),
"vivaldi" : join(appdata_local, R"Vivaldi\User Data"),
}[browser_name]
elif sys.platform == "darwin":
appdata = os.path.expanduser("~/Library/Application Support")
browser_dir = {
"brave" : join(appdata, "BraveSoftware/Brave-Browser"),
"chrome" : join(appdata, "Google/Chrome"),
"chromium": join(appdata, "Chromium"),
"edge" : join(appdata, "Microsoft Edge"),
"opera" : join(appdata, "com.operasoftware.Opera"),
"vivaldi" : join(appdata, "Vivaldi"),
}[browser_name]
else:
raise ValueError("unsupported platform '{}'".format(sys.platform))
# Linux keyring names can be determined by snooping on dbus
# while opening the browser in KDE:
# dbus-monitor "interface="org.kde.KWallet"" "type=method_return"
keyring_name = {
"brave" : "Brave",
"chrome" : "Chrome",
"chromium": "Chromium",
"edge" : "Microsoft Edge" if sys.platform == "darwin" else
"Chromium",
"opera" : "Opera" if sys.platform == "darwin" else "Chromium",
"vivaldi" : "Vivaldi" if sys.platform == "darwin" else "Chrome",
}[browser_name]
browsers_without_profiles = {"opera"}
return {
"browser" : browser_name,
"directory": browser_dir,
"keyring" : keyring_name,
"profiles" : browser_name not in browsers_without_profiles
}
return {
"browser_dir": browser_dir,
"keyring_name": keyring_name,
"supports_profiles": browser_name not in browsers_without_profiles
}
def _process_chrome_cookie(decryptor, host_key, name, value, encrypted_value,
path, expires_utc, is_secure):
host_key = host_key.decode()
name = name.decode()
value = value.decode()
path = path.decode()
is_encrypted = not value and encrypted_value
if is_encrypted:
value = decryptor.decrypt(encrypted_value)
if value is None:
return is_encrypted, None
return is_encrypted, Cookie(
0, name, value, None, False,
host_key, bool(host_key), host_key.startswith("."),
path, bool(path), is_secure, expires_utc, False, None, None, {},
)
class ChromeCookieDecryptor:
"""
Overview:
Linux:
- cookies are either v10 or v11
- v10: AES-CBC encrypted with a fixed key
- v11: AES-CBC encrypted with an OS protected key (keyring)
- v11 keys can be stored in various places depending on the
activate desktop environment [2]
Mac:
- cookies are either v10 or not v10
- v10: AES-CBC encrypted with an OS protected key (keyring)
and more key derivation iterations than linux
- not v10: "old data" stored as plaintext
Windows:
- cookies are either v10 or not v10
- v10: AES-GCM encrypted with a key which is encrypted with DPAPI
- not v10: encrypted with DPAPI
Sources:
- [1] https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/
- [2] https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/key_storage_linux.cc
- KeyStorageLinux::CreateService
"""
def decrypt(self, encrypted_value):
raise NotImplementedError("Must be implemented by sub classes")
@property
def cookie_counts(self):
raise NotImplementedError("Must be implemented by sub classes")
def get_cookie_decryptor(browser_root, browser_keyring_name, *, keyring=None):
if sys.platform in ("linux", "linux2"):
return LinuxChromeCookieDecryptor(
browser_keyring_name, keyring=keyring)
elif sys.platform == "darwin":
return MacChromeCookieDecryptor(browser_keyring_name)
elif sys.platform == "win32":
return WindowsChromeCookieDecryptor(browser_root)
else:
raise NotImplementedError("Chrome cookie decryption is not supported "
"on {}".format(sys.platform))
class LinuxChromeCookieDecryptor(ChromeCookieDecryptor):
def __init__(self, browser_keyring_name, *, keyring=None):
self._v10_key = self.derive_key(b"peanuts")
password = _get_linux_keyring_password(browser_keyring_name, keyring)
self._v11_key = None if password is None else self.derive_key(password)
self._cookie_counts = {"v10": 0, "v11": 0, "other": 0}
@staticmethod
def derive_key(password):
# values from
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_linux.cc
return pbkdf2_sha1(password, salt=b"saltysalt",
iterations=1, key_length=16)
@property
def cookie_counts(self):
return self._cookie_counts
def decrypt(self, encrypted_value):
version = encrypted_value[:3]
ciphertext = encrypted_value[3:]
if version == b"v10":
self._cookie_counts["v10"] += 1
return _decrypt_aes_cbc(ciphertext, self._v10_key)
elif version == b"v11":
self._cookie_counts["v11"] += 1
if self._v11_key is None:
logger.warning("cannot decrypt v11 cookies: no key found")
return None
return _decrypt_aes_cbc(ciphertext, self._v11_key)
else:
self._cookie_counts["other"] += 1
return None
class MacChromeCookieDecryptor(ChromeCookieDecryptor):
def __init__(self, browser_keyring_name):
password = _get_mac_keyring_password(browser_keyring_name)
self._v10_key = None if password is None else self.derive_key(password)
self._cookie_counts = {"v10": 0, "other": 0}
@staticmethod
def derive_key(password):
# values from
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_mac.mm
return pbkdf2_sha1(password, salt=b"saltysalt",
iterations=1003, key_length=16)
@property
def cookie_counts(self):
return self._cookie_counts
def decrypt(self, encrypted_value):
version = encrypted_value[:3]
ciphertext = encrypted_value[3:]
if version == b"v10":
self._cookie_counts["v10"] += 1
if self._v10_key is None:
logger.warning("cannot decrypt v10 cookies: no key found")
return None
return _decrypt_aes_cbc(ciphertext, self._v10_key)
else:
self._cookie_counts["other"] += 1
# other prefixes are considered "old data",
# which were stored as plaintext
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_mac.mm
return encrypted_value
class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
def __init__(self, browser_root):
self._v10_key = _get_windows_v10_key(browser_root)
self._cookie_counts = {"v10": 0, "other": 0}
@property
def cookie_counts(self):
return self._cookie_counts
def decrypt(self, encrypted_value):
version = encrypted_value[:3]
ciphertext = encrypted_value[3:]
if version == b"v10":
self._cookie_counts["v10"] += 1
if self._v10_key is None:
logger.warning("cannot decrypt v10 cookies: no key found")
return None
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_win.cc
# kNonceLength
nonce_length = 96 // 8
# boringssl
# EVP_AEAD_AES_GCM_TAG_LEN
authentication_tag_length = 16
raw_ciphertext = ciphertext
nonce = raw_ciphertext[:nonce_length]
ciphertext = raw_ciphertext[
nonce_length:-authentication_tag_length]
authentication_tag = raw_ciphertext[-authentication_tag_length:]
return _decrypt_aes_gcm(
ciphertext, self._v10_key, nonce, authentication_tag)
else:
self._cookie_counts["other"] += 1
# any other prefix means the data is DPAPI encrypted
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_win.cc
return _decrypt_windows_dpapi(encrypted_value).decode()
# --------------------------------------------------------------------
# keyring
def _choose_linux_keyring():
"""
https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/key_storage_util_linux.cc
SelectBackend
"""
desktop_environment = _get_linux_desktop_environment(os.environ)
logger.debug("Detected desktop environment: %s", desktop_environment)
if desktop_environment == DE_KDE:
return KEYRING_KWALLET
if desktop_environment == DE_OTHER:
return KEYRING_BASICTEXT
return KEYRING_GNOMEKEYRING
def _get_kwallet_network_wallet():
""" The name of the wallet used to store network passwords.
https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/kwallet_dbus.cc
KWalletDBus::NetworkWallet
which does a dbus call to the following function:
https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html
Wallet::NetworkWallet
"""
default_wallet = "kdewallet"
try:
proc = Popen(
"dbus-send", "--session", "--print-reply=literal",
"--dest=org.kde.kwalletd5",
"/modules/kwalletd5",
"org.kde.KWallet.networkWallet"
)
stdout, stderr = proc.communicate_or_kill()
if proc.returncode != 0:
logger.warning("failed to read NetworkWallet")
return default_wallet
else:
network_wallet = stdout.decode().strip()
logger.debug("NetworkWallet = '%s'", network_wallet)
return network_wallet
except Exception as exc:
logger.warning("exception while obtaining NetworkWallet (%s: %s)",
exc.__class__.__name__, exc)
return default_wallet
def _get_kwallet_password(browser_keyring_name):
logger.debug("using kwallet-query to obtain password from kwallet")
if shutil.which("kwallet-query") is None:
logger.error(
"kwallet-query command not found. KWallet and kwallet-query "
"must be installed to read from KWallet. kwallet-query should be "
"included in the kwallet package for your distribution")
return b""
network_wallet = _get_kwallet_network_wallet()
try:
proc = Popen(
"kwallet-query",
"--read-password", browser_keyring_name + " Safe Storage",
"--folder", browser_keyring_name + " Keys",
network_wallet,
)
stdout, stderr = proc.communicate_or_kill()
if proc.returncode != 0:
logger.error("kwallet-query failed with return code {}. "
"Please consult the kwallet-query man page "
"for details".format(proc.returncode))
return b""
if stdout.lower().startswith(b"failed to read"):
logger.debug("Failed to read password from kwallet. "
"Using empty string instead")
# This sometimes occurs in KDE because chrome does not check
# hasEntry and instead just tries to read the value (which
# kwallet returns "") whereas kwallet-query checks hasEntry.
# To verify this:
# dbus-monitor "interface="org.kde.KWallet"" "type=method_return"
# while starting chrome.
# This may be a bug, as the intended behaviour is to generate a
# random password and store it, but that doesn't matter here.
return b""
else:
logger.debug("password found")
if stdout[-1:] == b"\n":
stdout = stdout[:-1]
return stdout
except Exception as exc:
logger.warning("exception running kwallet-query (%s: %s)",
exc.__class__.__name__, exc)
return b""
def _get_gnome_keyring_password(browser_keyring_name):
try:
import secretstorage
except ImportError:
logger.error("secretstorage not available")
return b""
# Gnome keyring does not seem to organise keys in the same way as KWallet,
# using `dbus-monitor` during startup, it can be observed that chromium
# lists all keys and presumably searches for its key in the list.
# It appears that we must do the same.
# https://github.com/jaraco/keyring/issues/556
with contextlib.closing(secretstorage.dbus_init()) as con:
col = secretstorage.get_default_collection(con)
label = browser_keyring_name + " Safe Storage"
for item in col.get_all_items():
if item.get_label() == label:
return item.get_secret()
else:
logger.error("failed to read from keyring")
return b""
def _get_linux_keyring_password(browser_keyring_name, keyring):
# Note: chrome/chromium can be run with the following flags
# to determine which keyring backend it has chosen to use
# - chromium --enable-logging=stderr --v=1 2>&1 | grep key_storage_
#
# Chromium supports --password-store=<basic|gnome|kwallet>
# so the automatic detection will not be sufficient in all cases.
if not keyring:
keyring = _choose_linux_keyring()
logger.debug("Chosen keyring: %s", keyring)
if keyring == KEYRING_KWALLET:
return _get_kwallet_password(browser_keyring_name)
elif keyring == KEYRING_GNOMEKEYRING:
return _get_gnome_keyring_password(browser_keyring_name)
elif keyring == KEYRING_BASICTEXT:
# when basic text is chosen, all cookies are stored as v10
# so no keyring password is required
return None
assert False, "Unknown keyring " + keyring
def _get_mac_keyring_password(browser_keyring_name):
logger.debug("using find-generic-password to obtain "
"password from OSX keychain")
try:
proc = Popen(
"security", "find-generic-password",
"-w", # write password to stdout
"-a", browser_keyring_name, # match "account"
"-s", browser_keyring_name + " Safe Storage", # match "service"
)
stdout, stderr = proc.communicate_or_kill()
if stdout[-1:] == b"\n":
stdout = stdout[:-1]
return stdout
except Exception as exc:
logger.warning("exception running find-generic-password (%s: %s)",
exc.__class__.__name__, exc)
return None
def _get_windows_v10_key(browser_root):
path = _find_most_recently_used_file(browser_root, "Local State")
if path is None:
logger.error("could not find local state file")
return None
logger.debug("Found local state file at '%s'", path)
with open(path, encoding="utf8") as f:
data = json.load(f)
try:
base64_key = data["os_crypt"]["encrypted_key"]
except KeyError:
logger.error("no encrypted key in Local State")
return None
encrypted_key = binascii.a2b_base64(base64_key)
prefix = b"DPAPI"
if not encrypted_key.startswith(prefix):
logger.error("invalid key")
return None
return _decrypt_windows_dpapi(encrypted_key[len(prefix):])
# --------------------------------------------------------------------
# utility
class ParserError(Exception):
pass
class DataParser:
def __init__(self, data):
self.cursor = 0
self._data = data
def read_bytes(self, num_bytes):
if num_bytes < 0:
raise ParserError("invalid read of {} bytes".format(num_bytes))
end = self.cursor + num_bytes
if end > len(self._data):
raise ParserError("reached end of input")
data = self._data[self.cursor:end]
self.cursor = end
return data
def expect_bytes(self, expected_value, message):
value = self.read_bytes(len(expected_value))
if value != expected_value:
raise ParserError("unexpected value: {} != {} ({})".format(
value, expected_value, message))
def read_uint(self, big_endian=False):
data_format = ">I" if big_endian else "<I"
return struct.unpack(data_format, self.read_bytes(4))[0]
def read_double(self, big_endian=False):
data_format = ">d" if big_endian else "<d"
return struct.unpack(data_format, self.read_bytes(8))[0]
def read_cstring(self):
buffer = []
while True:
c = self.read_bytes(1)
if c == b"\x00":
return b"".join(buffer).decode()
else:
buffer.append(c)
def skip(self, num_bytes, description="unknown"):
if num_bytes > 0:
logger.debug("skipping {} bytes ({}): {!r}".format(
num_bytes, description, self.read_bytes(num_bytes)))
elif num_bytes < 0:
raise ParserError("invalid skip of {} bytes".format(num_bytes))
def skip_to(self, offset, description="unknown"):
self.skip(offset - self.cursor, description)
def skip_to_end(self, description="unknown"):
self.skip_to(len(self._data), description)
class DatabaseCopy():
def __init__(self, path):
self.path = path
self.directory = self.database = None
def __enter__(self):
try:
self.directory = tempfile.TemporaryDirectory(prefix="gallery-dl-")
path_copy = os.path.join(self.directory.name, "copy.sqlite")
shutil.copyfile(self.path, path_copy)
self.database = db = sqlite3.connect(
path_copy, isolation_level=None, check_same_thread=False)
return db
except BaseException:
if self.directory:
self.directory.cleanup()
raise
def __exit__(self, exc, value, tb):
self.database.close()
self.directory.cleanup()
def Popen(*args):
return subprocess.Popen(
args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
"""
https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/base/nix/xdg_util.h - DesktopEnvironment
"""
DE_OTHER = "other"
DE_CINNAMON = "cinnamon"
DE_GNOME = "gnome"
DE_KDE = "kde"
DE_PANTHEON = "pantheon"
DE_UNITY = "unity"
DE_XFCE = "xfce"
"""
https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/key_storage_util_linux.h - SelectedLinuxBackend
"""
KEYRING_KWALLET = "kwallet"
KEYRING_GNOMEKEYRING = "gnomekeyring"
KEYRING_BASICTEXT = "basictext"
SUPPORTED_KEYRINGS = {"kwallet", "gnomekeyring", "basictext"}
def _get_linux_desktop_environment(env):
"""
Ref: https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/base/nix/xdg_util.cc - GetDesktopEnvironment
"""
xdg_current_desktop = env.get("XDG_CURRENT_DESKTOP")
desktop_session = env.get("DESKTOP_SESSION")
if xdg_current_desktop:
xdg_current_desktop = xdg_current_desktop.partition(
":")[0].strip().lower()
if xdg_current_desktop == "unity":
if desktop_session and "gnome-fallback" in desktop_session:
return DE_GNOME
else:
return DE_UNITY
elif xdg_current_desktop == "gnome":
return DE_GNOME
elif xdg_current_desktop == "x-cinnamon":
return DE_CINNAMON
elif xdg_current_desktop == "kde":
return DE_KDE
elif xdg_current_desktop == "pantheon":
return DE_PANTHEON
elif xdg_current_desktop == "xfce":
return DE_XFCE
if desktop_session:
if desktop_session in ("mate", "gnome"):
return DE_GNOME
if "kde" in desktop_session:
return DE_KDE
if "xfce" in desktop_session:
return DE_XFCE
if "GNOME_DESKTOP_SESSION_ID" in env:
return DE_GNOME
if "KDE_FULL_SESSION" in env:
return DE_KDE
return DE_OTHER
def _mac_absolute_time_to_posix(timestamp):
return int((datetime(2001, 1, 1, 0, 0, tzinfo=timezone.utc) +
timedelta(seconds=timestamp)).timestamp())
def pbkdf2_sha1(password, salt, iterations, key_length):
return pbkdf2_hmac("sha1", password, salt, iterations, key_length)
def _decrypt_aes_cbc(ciphertext, key, initialization_vector=b" " * 16):
plaintext = aes.unpad_pkcs7(
aes.aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector))
try:
return plaintext.decode()
except UnicodeDecodeError:
logger.warning("failed to decrypt cookie (AES-CBC) because UTF-8 "
"decoding failed. Possibly the key is wrong?")
return None
def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag):
try:
plaintext = aes.aes_gcm_decrypt_and_verify_bytes(
ciphertext, key, authentication_tag, nonce)
except ValueError:
logger.warning("failed to decrypt cookie (AES-GCM) because MAC check "
"failed. Possibly the key is wrong?")
return None
try:
return plaintext.decode()
except UnicodeDecodeError:
logger.warning("failed to decrypt cookie (AES-GCM) because UTF-8 "
"decoding failed. Possibly the key is wrong?")
return None
def _decrypt_windows_dpapi(ciphertext):
"""
References:
- https://docs.microsoft.com/en-us/windows
/win32/api/dpapi/nf-dpapi-cryptunprotectdata
"""
from ctypes.wintypes import DWORD
class DATA_BLOB(ctypes.Structure):
_fields_ = [("cbData", DWORD),
("pbData", ctypes.POINTER(ctypes.c_char))]
buffer = ctypes.create_string_buffer(ciphertext)
blob_in = DATA_BLOB(ctypes.sizeof(buffer), buffer)
blob_out = DATA_BLOB()
ret = ctypes.windll.crypt32.CryptUnprotectData(
ctypes.byref(blob_in), # pDataIn
None, # ppszDataDescr: human readable description of pDataIn
None, # pOptionalEntropy: salt?
None, # pvReserved: must be NULL
None, # pPromptStruct: information about prompts to display
0, # dwFlags
ctypes.byref(blob_out) # pDataOut
)
if not ret:
logger.warning("failed to decrypt with DPAPI")
return None
result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
return result
def _find_most_recently_used_file(root, filename):
# if there are multiple browser profiles, take the most recently used one
paths = []
for curr_root, dirs, files in os.walk(root):
for file in files:
if file == filename:
paths.append(os.path.join(curr_root, file))
if not paths:
return None
return max(paths, key=lambda path: os.lstat(path).st_mtime)
def _is_path(value):
return os.path.sep in value
def _parse_browser_specification(browser, profile=None, keyring=None):
if browser not in SUPPORTED_BROWSERS:
raise ValueError("unsupported browser '{}'".format(browser))
if keyring and keyring not in SUPPORTED_KEYRINGS:
raise ValueError("unsupported keyring '{}'".format(keyring))
if profile and _is_path(profile):
profile = os.path.expanduser(profile)
return browser, profile, keyring

@ -311,10 +311,17 @@ class Extractor():
self.log.warning("cookies: %s", exc)
else:
self._cookiefile = cookiefile
elif isinstance(cookies, (list, tuple)):
from ..cookies import load_cookies
try:
load_cookies(self._cookiejar, cookies)
except Exception as exc:
self.log.warning("cookies: %s", exc)
else:
self.log.warning(
"expected 'dict' or 'str' value for 'cookies' option, "
"got '%s' (%s)", cookies.__class__.__name__, cookies)
"Expected 'dict', 'list', or 'str' value for 'cookies' "
"option, got '%s' (%s)",
cookies.__class__.__name__, cookies)
def _store_cookies(self):
"""Store the session's cookiejar in a cookies.txt file"""

@ -59,7 +59,7 @@ class ParseAction(argparse.Action):
class Formatter(argparse.HelpFormatter):
"""Custom HelpFormatter class to customize help output"""
def __init__(self, *args, **kwargs):
super().__init__(max_help_position=50, *args, **kwargs)
super().__init__(max_help_position=30, *args, **kwargs)
def _format_action_invocation(self, action):
opts = action.option_strings[:]
@ -113,11 +113,6 @@ def build_parser():
help=("Filename format string for downloaded files "
"('/O' for \"original\" filenames)"),
)
general.add_argument(
"--cookies",
dest="cookies", metavar="FILE", action=ConfigAction,
help="File to load additional cookies from",
)
general.add_argument(
"--proxy",
dest="proxy", metavar="URL", action=ConfigAction,
@ -134,6 +129,18 @@ def build_parser():
help="Delete cached login sessions, cookies, etc. for MODULE "
"(ALL to delete everything)",
)
general.add_argument(
"--cookies",
dest="cookies", metavar="FILE", action=ConfigAction,
help="File to load additional cookies from",
)
general.add_argument(
"--cookies-from_browser",
dest="cookies_from_browser", metavar="BROWSER[+KEYRING][:PROFILE]",
help=("Name of the browser to load cookies from, "
"with optional keyring name prefixed with '+' and "
"profile prefixed with ':'"),
)
output = parser.add_argument_group("Output Options")
output.add_argument(

Loading…
Cancel
Save