You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gallery-dl/gallery_dl/cookies.py

1018 lines
35 KiB

# -*- coding: utf-8 -*-
# Copyright 2022-2023 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
# Adapted from yt-dlp's cookies module.
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/cookies.py
import binascii
import contextlib
import ctypes
import logging
import os
import shutil
import sqlite3
import struct
import subprocess
import sys
import tempfile
from hashlib import pbkdf2_hmac
from http.cookiejar import Cookie
from . import aes, text, util
SUPPORTED_BROWSERS_CHROMIUM = {
"brave", "chrome", "chromium", "edge", "opera", "vivaldi"}
SUPPORTED_BROWSERS = SUPPORTED_BROWSERS_CHROMIUM | {"firefox", "safari"}
logger = logging.getLogger("cookies")
def load_cookies(cookiejar, browser_specification):
browser_name, profile, keyring, container, domain = \
_parse_browser_specification(*browser_specification)
if browser_name == "firefox":
load_cookies_firefox(cookiejar, profile, container, domain)
elif browser_name == "safari":
load_cookies_safari(cookiejar, profile, domain)
elif browser_name in SUPPORTED_BROWSERS_CHROMIUM:
load_cookies_chrome(cookiejar, browser_name, profile, keyring, domain)
else:
raise ValueError("unknown browser '{}'".format(browser_name))
def load_cookies_firefox(cookiejar, profile=None, container=None, domain=None):
path, container_id = _firefox_cookies_database(profile, container)
with DatabaseCopy(path) as db:
sql = ("SELECT name, value, host, path, isSecure, expiry "
"FROM moz_cookies")
parameters = ()
if container_id is False:
sql += " WHERE NOT INSTR(originAttributes,'userContextId=')"
elif container_id:
sql += " WHERE originAttributes LIKE ? OR originAttributes LIKE ?"
uid = "%userContextId={}".format(container_id)
parameters = (uid, uid + "&%")
elif domain:
if domain[0] == ".":
sql += " WHERE host == ? OR host LIKE ?"
parameters = (domain[1:], "%" + domain)
else:
sql += " WHERE host == ? OR host == ?"
parameters = (domain, "." + domain)
set_cookie = cookiejar.set_cookie
for name, value, domain, path, secure, expires in db.execute(
sql, parameters):
set_cookie(Cookie(
0, name, value, None, False,
domain, bool(domain), domain.startswith("."),
path, bool(path), secure, expires, False, None, None, {},
))
logger.info("Extracted %s cookies from Firefox", len(cookiejar))
def load_cookies_safari(cookiejar, profile=None, domain=None):
"""Ref.: https://github.com/libyal/dtformats/blob
/main/documentation/Safari%20Cookies.asciidoc
- This data appears to be out of date
but the important parts of the database structure is the same
- There are a few bytes here and there
which are skipped during parsing
"""
with _safari_cookies_database() as fp:
data = fp.read()
page_sizes, body_start = _safari_parse_cookies_header(data)
p = DataParser(data[body_start:])
for page_size in page_sizes:
_safari_parse_cookies_page(p.read_bytes(page_size), cookiejar)
def load_cookies_chrome(cookiejar, browser_name, profile=None,
keyring=None, domain=None):
config = _get_chromium_based_browser_settings(browser_name)
path = _chrome_cookies_database(profile, config)
logger.debug("Extracting cookies from %s", path)
with DatabaseCopy(path) as db:
db.text_factory = bytes
decryptor = get_cookie_decryptor(
config["directory"], config["keyring"], keyring)
if domain:
if domain[0] == ".":
condition = " WHERE host_key == ? OR host_key LIKE ?"
parameters = (domain[1:], "%" + domain)
else:
condition = " WHERE host_key == ? OR host_key == ?"
parameters = (domain, "." + domain)
else:
condition = ""
parameters = ()
try:
rows = db.execute(
"SELECT host_key, name, value, encrypted_value, path, "
"expires_utc, is_secure FROM cookies" + condition, parameters)
except sqlite3.OperationalError:
rows = db.execute(
"SELECT host_key, name, value, encrypted_value, path, "
"expires_utc, secure FROM cookies" + condition, parameters)
set_cookie = cookiejar.set_cookie
failed_cookies = 0
unencrypted_cookies = 0
for domain, name, value, enc_value, path, expires, secure in rows:
if not value and enc_value: # encrypted
value = decryptor.decrypt(enc_value)
if value is None:
failed_cookies += 1
continue
else:
value = value.decode()
unencrypted_cookies += 1
domain = domain.decode()
path = path.decode()
name = name.decode()
set_cookie(Cookie(
0, name, value, None, False,
domain, bool(domain), domain.startswith("."),
path, bool(path), secure, expires, False, None, None, {},
))
if failed_cookies > 0:
failed_message = " ({} could not be decrypted)".format(failed_cookies)
else:
failed_message = ""
logger.info("Extracted %s cookies from %s%s",
len(cookiejar), browser_name.capitalize(), failed_message)
counts = decryptor.cookie_counts
counts["unencrypted"] = unencrypted_cookies
logger.debug("cookie version breakdown: %s", counts)
# --------------------------------------------------------------------
# firefox
def _firefox_cookies_database(profile=None, container=None):
if not profile:
search_root = _firefox_browser_directory()
elif _is_path(profile):
search_root = profile
else:
search_root = os.path.join(_firefox_browser_directory(), profile)
path = _find_most_recently_used_file(search_root, "cookies.sqlite")
if path is None:
raise FileNotFoundError("Unable to find Firefox cookies database in "
"{}".format(search_root))
logger.debug("Extracting cookies from %s", path)
if container == "none":
container_id = False
logger.debug("Only loading cookies not belonging to any container")
elif container:
containers_path = os.path.join(
os.path.dirname(path), "containers.json")
try:
with open(containers_path) as file:
identities = util.json_loads(file.read())["identities"]
except OSError:
logger.error("Unable to read Firefox container database at %s",
containers_path)
raise
except KeyError:
identities = ()
for context in identities:
if container == context.get("name") or container == text.extr(
context.get("l10nID", ""), "userContext", ".label"):
container_id = context["userContextId"]
break
else:
raise ValueError("Unable to find Firefox container {}".format(
container))
logger.debug("Only loading cookies from container '%s' (ID %s)",
container, container_id)
else:
container_id = None
return path, container_id
def _firefox_browser_directory():
if sys.platform in ("win32", "cygwin"):
return os.path.expandvars(r"%APPDATA%\Mozilla\Firefox\Profiles")
if sys.platform == "darwin":
return os.path.expanduser("~/Library/Application Support/Firefox")
return os.path.expanduser("~/.mozilla/firefox")
# --------------------------------------------------------------------
# safari
def _safari_cookies_database():
try:
path = os.path.expanduser("~/Library/Cookies/Cookies.binarycookies")
return open(path, "rb")
except FileNotFoundError:
logger.debug("Trying secondary cookie location")
path = os.path.expanduser("~/Library/Containers/com.apple.Safari/Data"
"/Library/Cookies/Cookies.binarycookies")
return open(path, "rb")
def _safari_parse_cookies_header(data):
p = DataParser(data)
p.expect_bytes(b"cook", "database signature")
number_of_pages = p.read_uint(big_endian=True)
page_sizes = [p.read_uint(big_endian=True)
for _ in range(number_of_pages)]
return page_sizes, p.cursor
def _safari_parse_cookies_page(data, cookiejar, domain=None):
p = DataParser(data)
p.expect_bytes(b"\x00\x00\x01\x00", "page signature")
number_of_cookies = p.read_uint()
record_offsets = [p.read_uint() for _ in range(number_of_cookies)]
if number_of_cookies == 0:
logger.debug("a cookies page of size %s has no cookies", len(data))
return
p.skip_to(record_offsets[0], "unknown page header field")
for i, record_offset in enumerate(record_offsets):
p.skip_to(record_offset, "space between records")
record_length = _safari_parse_cookies_record(
data[record_offset:], cookiejar, domain)
p.read_bytes(record_length)
p.skip_to_end("space in between pages")
def _safari_parse_cookies_record(data, cookiejar, host=None):
p = DataParser(data)
record_size = p.read_uint()
p.skip(4, "unknown record field 1")
flags = p.read_uint()
is_secure = bool(flags & 0x0001)
p.skip(4, "unknown record field 2")
domain_offset = p.read_uint()
name_offset = p.read_uint()
path_offset = p.read_uint()
value_offset = p.read_uint()
p.skip(8, "unknown record field 3")
expiration_date = _mac_absolute_time_to_posix(p.read_double())
_creation_date = _mac_absolute_time_to_posix(p.read_double()) # noqa: F841
try:
p.skip_to(domain_offset)
domain = p.read_cstring()
if host:
if host[0] == ".":
if host[1:] != domain and not domain.endswith(host):
return record_size
else:
if host != domain and ("." + host) != domain:
return record_size
p.skip_to(name_offset)
name = p.read_cstring()
p.skip_to(path_offset)
path = p.read_cstring()
p.skip_to(value_offset)
value = p.read_cstring()
except UnicodeDecodeError:
logger.warning("failed to parse Safari cookie "
"because UTF-8 decoding failed")
return record_size
p.skip_to(record_size, "space at the end of the record")
cookiejar.set_cookie(Cookie(
0, name, value, None, False,
domain, bool(domain), domain.startswith("."),
path, bool(path), is_secure, expiration_date, False,
None, None, {},
))
return record_size
# --------------------------------------------------------------------
# chrome
def _chrome_cookies_database(profile, config):
if profile is None:
search_root = config["directory"]
elif _is_path(profile):
search_root = profile
config["directory"] = (os.path.dirname(profile)
if config["profiles"] else profile)
elif config["profiles"]:
search_root = os.path.join(config["directory"], profile)
else:
logger.warning("%s does not support profiles", config["browser"])
search_root = config["directory"]
path = _find_most_recently_used_file(search_root, "Cookies")
if path is None:
2 years ago
raise FileNotFoundError("Unable to find {} cookies database in "
"'{}'".format(config["browser"], search_root))
return path
def _get_chromium_based_browser_settings(browser_name):
# https://chromium.googlesource.com/chromium
# /src/+/HEAD/docs/user_data_dir.md
join = os.path.join
if sys.platform in ("win32", "cygwin"):
appdata_local = os.path.expandvars("%LOCALAPPDATA%")
appdata_roaming = os.path.expandvars("%APPDATA%")
browser_dir = {
"brave" : join(appdata_local,
R"BraveSoftware\Brave-Browser\User Data"),
"chrome" : join(appdata_local, R"Google\Chrome\User Data"),
"chromium": join(appdata_local, R"Chromium\User Data"),
"edge" : join(appdata_local, R"Microsoft\Edge\User Data"),
"opera" : join(appdata_roaming, R"Opera Software\Opera Stable"),
"vivaldi" : join(appdata_local, R"Vivaldi\User Data"),
}[browser_name]
elif sys.platform == "darwin":
appdata = os.path.expanduser("~/Library/Application Support")
browser_dir = {
"brave" : join(appdata, "BraveSoftware/Brave-Browser"),
"chrome" : join(appdata, "Google/Chrome"),
"chromium": join(appdata, "Chromium"),
"edge" : join(appdata, "Microsoft Edge"),
"opera" : join(appdata, "com.operasoftware.Opera"),
"vivaldi" : join(appdata, "Vivaldi"),
}[browser_name]
else:
config = (os.environ.get("XDG_CONFIG_HOME") or
os.path.expanduser("~/.config"))
browser_dir = {
"brave" : join(config, "BraveSoftware/Brave-Browser"),
"chrome" : join(config, "google-chrome"),
"chromium": join(config, "chromium"),
"edge" : join(config, "microsoft-edge"),
"opera" : join(config, "opera"),
"vivaldi" : join(config, "vivaldi"),
}[browser_name]
# Linux keyring names can be determined by snooping on dbus
# while opening the browser in KDE:
# dbus-monitor "interface="org.kde.KWallet"" "type=method_return"
keyring_name = {
"brave" : "Brave",
"chrome" : "Chrome",
"chromium": "Chromium",
"edge" : "Microsoft Edge" if sys.platform == "darwin" else
"Chromium",
"opera" : "Opera" if sys.platform == "darwin" else "Chromium",
"vivaldi" : "Vivaldi" if sys.platform == "darwin" else "Chrome",
}[browser_name]
browsers_without_profiles = {"opera"}
return {
"browser" : browser_name,
"directory": browser_dir,
"keyring" : keyring_name,
"profiles" : browser_name not in browsers_without_profiles
}
class ChromeCookieDecryptor:
"""
Overview:
Linux:
- cookies are either v10 or v11
- v10: AES-CBC encrypted with a fixed key
- v11: AES-CBC encrypted with an OS protected key (keyring)
- v11 keys can be stored in various places depending on the
activate desktop environment [2]
Mac:
- cookies are either v10 or not v10
- v10: AES-CBC encrypted with an OS protected key (keyring)
and more key derivation iterations than linux
- not v10: "old data" stored as plaintext
Windows:
- cookies are either v10 or not v10
- v10: AES-GCM encrypted with a key which is encrypted with DPAPI
- not v10: encrypted with DPAPI
Sources:
- [1] https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/
- [2] https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/key_storage_linux.cc
- KeyStorageLinux::CreateService
"""
def decrypt(self, encrypted_value):
raise NotImplementedError("Must be implemented by sub classes")
@property
def cookie_counts(self):
raise NotImplementedError("Must be implemented by sub classes")
def get_cookie_decryptor(browser_root, browser_keyring_name, keyring=None):
if sys.platform in ("win32", "cygwin"):
return WindowsChromeCookieDecryptor(browser_root)
elif sys.platform == "darwin":
return MacChromeCookieDecryptor(browser_keyring_name)
else:
return LinuxChromeCookieDecryptor(browser_keyring_name, keyring)
class LinuxChromeCookieDecryptor(ChromeCookieDecryptor):
def __init__(self, browser_keyring_name, keyring=None):
self._v10_key = self.derive_key(b"peanuts")
password = _get_linux_keyring_password(browser_keyring_name, keyring)
self._v11_key = None if password is None else self.derive_key(password)
self._cookie_counts = {"v10": 0, "v11": 0, "other": 0}
@staticmethod
def derive_key(password):
# values from
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_linux.cc
return pbkdf2_sha1(password, salt=b"saltysalt",
iterations=1, key_length=16)
@property
def cookie_counts(self):
return self._cookie_counts
def decrypt(self, encrypted_value):
version = encrypted_value[:3]
ciphertext = encrypted_value[3:]
if version == b"v10":
self._cookie_counts["v10"] += 1
return _decrypt_aes_cbc(ciphertext, self._v10_key)
elif version == b"v11":
self._cookie_counts["v11"] += 1
if self._v11_key is None:
logger.warning("cannot decrypt v11 cookies: no key found")
return None
return _decrypt_aes_cbc(ciphertext, self._v11_key)
else:
self._cookie_counts["other"] += 1
return None
class MacChromeCookieDecryptor(ChromeCookieDecryptor):
def __init__(self, browser_keyring_name):
password = _get_mac_keyring_password(browser_keyring_name)
self._v10_key = None if password is None else self.derive_key(password)
self._cookie_counts = {"v10": 0, "other": 0}
@staticmethod
def derive_key(password):
# values from
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_mac.mm
return pbkdf2_sha1(password, salt=b"saltysalt",
iterations=1003, key_length=16)
@property
def cookie_counts(self):
return self._cookie_counts
def decrypt(self, encrypted_value):
version = encrypted_value[:3]
ciphertext = encrypted_value[3:]
if version == b"v10":
self._cookie_counts["v10"] += 1
if self._v10_key is None:
logger.warning("cannot decrypt v10 cookies: no key found")
return None
return _decrypt_aes_cbc(ciphertext, self._v10_key)
else:
self._cookie_counts["other"] += 1
# other prefixes are considered "old data",
# which were stored as plaintext
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_mac.mm
return encrypted_value
class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
def __init__(self, browser_root):
self._v10_key = _get_windows_v10_key(browser_root)
self._cookie_counts = {"v10": 0, "other": 0}
@property
def cookie_counts(self):
return self._cookie_counts
def decrypt(self, encrypted_value):
version = encrypted_value[:3]
ciphertext = encrypted_value[3:]
if version == b"v10":
self._cookie_counts["v10"] += 1
if self._v10_key is None:
logger.warning("cannot decrypt v10 cookies: no key found")
return None
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_win.cc
# kNonceLength
nonce_length = 96 // 8
# boringssl
# EVP_AEAD_AES_GCM_TAG_LEN
authentication_tag_length = 16
raw_ciphertext = ciphertext
nonce = raw_ciphertext[:nonce_length]
ciphertext = raw_ciphertext[
nonce_length:-authentication_tag_length]
authentication_tag = raw_ciphertext[-authentication_tag_length:]
return _decrypt_aes_gcm(
ciphertext, self._v10_key, nonce, authentication_tag)
else:
self._cookie_counts["other"] += 1
# any other prefix means the data is DPAPI encrypted
# https://chromium.googlesource.com/chromium/src/+/refs/heads
# /main/components/os_crypt/os_crypt_win.cc
return _decrypt_windows_dpapi(encrypted_value).decode()
# --------------------------------------------------------------------
# keyring
def _choose_linux_keyring():
"""
https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/key_storage_util_linux.cc
SelectBackend
"""
desktop_environment = _get_linux_desktop_environment(os.environ)
logger.debug("Detected desktop environment: %s", desktop_environment)
if desktop_environment == DE_KDE:
return KEYRING_KWALLET
if desktop_environment == DE_OTHER:
return KEYRING_BASICTEXT
return KEYRING_GNOMEKEYRING
def _get_kwallet_network_wallet():
""" The name of the wallet used to store network passwords.
https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/kwallet_dbus.cc
KWalletDBus::NetworkWallet
which does a dbus call to the following function:
https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html
Wallet::NetworkWallet
"""
default_wallet = "kdewallet"
try:
proc, stdout = Popen_communicate(
"dbus-send", "--session", "--print-reply=literal",
"--dest=org.kde.kwalletd5",
"/modules/kwalletd5",
"org.kde.KWallet.networkWallet"
)
if proc.returncode != 0:
logger.warning("failed to read NetworkWallet")
return default_wallet
else:
network_wallet = stdout.decode().strip()
logger.debug("NetworkWallet = '%s'", network_wallet)
return network_wallet
except Exception as exc:
logger.warning("exception while obtaining NetworkWallet (%s: %s)",
exc.__class__.__name__, exc)
return default_wallet
def _get_kwallet_password(browser_keyring_name):
logger.debug("using kwallet-query to obtain password from kwallet")
if shutil.which("kwallet-query") is None:
logger.error(
"kwallet-query command not found. KWallet and kwallet-query "
"must be installed to read from KWallet. kwallet-query should be "
"included in the kwallet package for your distribution")
return b""
network_wallet = _get_kwallet_network_wallet()
try:
proc, stdout = Popen_communicate(
"kwallet-query",
"--read-password", browser_keyring_name + " Safe Storage",
"--folder", browser_keyring_name + " Keys",
network_wallet,
)
if proc.returncode != 0:
logger.error("kwallet-query failed with return code {}. "
"Please consult the kwallet-query man page "
"for details".format(proc.returncode))
return b""
if stdout.lower().startswith(b"failed to read"):
logger.debug("Failed to read password from kwallet. "
"Using empty string instead")
# This sometimes occurs in KDE because chrome does not check
# hasEntry and instead just tries to read the value (which
# kwallet returns "") whereas kwallet-query checks hasEntry.
# To verify this:
# dbus-monitor "interface="org.kde.KWallet"" "type=method_return"
# while starting chrome.
# This may be a bug, as the intended behaviour is to generate a
# random password and store it, but that doesn't matter here.
return b""
else:
logger.debug("password found")
if stdout[-1:] == b"\n":
stdout = stdout[:-1]
return stdout
except Exception as exc:
logger.warning("exception running kwallet-query (%s: %s)",
exc.__class__.__name__, exc)
return b""
def _get_gnome_keyring_password(browser_keyring_name):
try:
import secretstorage
except ImportError:
logger.error("secretstorage not available")
return b""
# Gnome keyring does not seem to organise keys in the same way as KWallet,
# using `dbus-monitor` during startup, it can be observed that chromium
# lists all keys and presumably searches for its key in the list.
# It appears that we must do the same.
# https://github.com/jaraco/keyring/issues/556
with contextlib.closing(secretstorage.dbus_init()) as con:
col = secretstorage.get_default_collection(con)
label = browser_keyring_name + " Safe Storage"
for item in col.get_all_items():
if item.get_label() == label:
return item.get_secret()
else:
logger.error("failed to read from keyring")
return b""
def _get_linux_keyring_password(browser_keyring_name, keyring):
# Note: chrome/chromium can be run with the following flags
# to determine which keyring backend it has chosen to use
# - chromium --enable-logging=stderr --v=1 2>&1 | grep key_storage_
#
# Chromium supports --password-store=<basic|gnome|kwallet>
# so the automatic detection will not be sufficient in all cases.
if not keyring:
keyring = _choose_linux_keyring()
logger.debug("Chosen keyring: %s", keyring)
if keyring == KEYRING_KWALLET:
return _get_kwallet_password(browser_keyring_name)
elif keyring == KEYRING_GNOMEKEYRING:
return _get_gnome_keyring_password(browser_keyring_name)
elif keyring == KEYRING_BASICTEXT:
# when basic text is chosen, all cookies are stored as v10
# so no keyring password is required
return None
assert False, "Unknown keyring " + keyring
def _get_mac_keyring_password(browser_keyring_name):
logger.debug("using find-generic-password to obtain "
"password from OSX keychain")
try:
proc, stdout = Popen_communicate(
"security", "find-generic-password",
"-w", # write password to stdout
"-a", browser_keyring_name, # match "account"
"-s", browser_keyring_name + " Safe Storage", # match "service"
)
if stdout[-1:] == b"\n":
stdout = stdout[:-1]
return stdout
except Exception as exc:
logger.warning("exception running find-generic-password (%s: %s)",
exc.__class__.__name__, exc)
return None
def _get_windows_v10_key(browser_root):
path = _find_most_recently_used_file(browser_root, "Local State")
if path is None:
logger.error("could not find local state file")
return None
logger.debug("Found local state file at '%s'", path)
with open(path, encoding="utf-8") as file:
data = util.json_loads(file.read())
try:
base64_key = data["os_crypt"]["encrypted_key"]
except KeyError:
logger.error("no encrypted key in Local State")
return None
encrypted_key = binascii.a2b_base64(base64_key)
prefix = b"DPAPI"
if not encrypted_key.startswith(prefix):
logger.error("invalid key")
return None
return _decrypt_windows_dpapi(encrypted_key[len(prefix):])
# --------------------------------------------------------------------
# utility
class ParserError(Exception):
pass
class DataParser:
def __init__(self, data):
self.cursor = 0
self._data = data
def read_bytes(self, num_bytes):
if num_bytes < 0:
raise ParserError("invalid read of {} bytes".format(num_bytes))
end = self.cursor + num_bytes
if end > len(self._data):
raise ParserError("reached end of input")
data = self._data[self.cursor:end]
self.cursor = end
return data
def expect_bytes(self, expected_value, message):
value = self.read_bytes(len(expected_value))
if value != expected_value:
raise ParserError("unexpected value: {} != {} ({})".format(
value, expected_value, message))
def read_uint(self, big_endian=False):
data_format = ">I" if big_endian else "<I"
return struct.unpack(data_format, self.read_bytes(4))[0]
def read_double(self, big_endian=False):
data_format = ">d" if big_endian else "<d"
return struct.unpack(data_format, self.read_bytes(8))[0]
def read_cstring(self):
buffer = []
while True:
c = self.read_bytes(1)
if c == b"\x00":
return b"".join(buffer).decode()
else:
buffer.append(c)
def skip(self, num_bytes, description="unknown"):
if num_bytes > 0:
logger.debug("skipping {} bytes ({}): {!r}".format(
num_bytes, description, self.read_bytes(num_bytes)))
elif num_bytes < 0:
raise ParserError("invalid skip of {} bytes".format(num_bytes))
def skip_to(self, offset, description="unknown"):
self.skip(offset - self.cursor, description)
def skip_to_end(self, description="unknown"):
self.skip_to(len(self._data), description)
class DatabaseCopy():
def __init__(self, path):
self.path = path
self.database = None
self.directory = None
def __enter__(self):
try:
self.directory = tempfile.TemporaryDirectory(prefix="gallery-dl-")
path_copy = os.path.join(self.directory.name, "copy.sqlite")
shutil.copyfile(self.path, path_copy)
self.database = db = sqlite3.connect(
path_copy, isolation_level=None, check_same_thread=False)
return db
except BaseException:
if self.directory:
self.directory.cleanup()
raise
def __exit__(self, exc, value, tb):
self.database.close()
self.directory.cleanup()
def Popen_communicate(*args):
proc = subprocess.Popen(
args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
try:
stdout, stderr = proc.communicate()
except BaseException: # Including KeyboardInterrupt
proc.kill()
proc.wait()
raise
return proc, stdout
"""
https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/base/nix/xdg_util.h - DesktopEnvironment
"""
DE_OTHER = "other"
DE_CINNAMON = "cinnamon"
DE_GNOME = "gnome"
DE_KDE = "kde"
DE_PANTHEON = "pantheon"
DE_UNITY = "unity"
DE_XFCE = "xfce"
"""
https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/components/os_crypt/key_storage_util_linux.h - SelectedLinuxBackend
"""
KEYRING_KWALLET = "kwallet"
KEYRING_GNOMEKEYRING = "gnomekeyring"
KEYRING_BASICTEXT = "basictext"
SUPPORTED_KEYRINGS = {"kwallet", "gnomekeyring", "basictext"}
def _get_linux_desktop_environment(env):
"""
Ref: https://chromium.googlesource.com/chromium/src/+/refs/heads
/main/base/nix/xdg_util.cc - GetDesktopEnvironment
"""
xdg_current_desktop = env.get("XDG_CURRENT_DESKTOP")
desktop_session = env.get("DESKTOP_SESSION")
if xdg_current_desktop:
xdg_current_desktop = (xdg_current_desktop.partition(":")[0]
.strip().lower())
if xdg_current_desktop == "unity":
if desktop_session and "gnome-fallback" in desktop_session:
return DE_GNOME
else:
return DE_UNITY
elif xdg_current_desktop == "gnome":
return DE_GNOME
elif xdg_current_desktop == "x-cinnamon":
return DE_CINNAMON
elif xdg_current_desktop == "kde":
return DE_KDE
elif xdg_current_desktop == "pantheon":
return DE_PANTHEON
elif xdg_current_desktop == "xfce":
return DE_XFCE
if desktop_session:
if desktop_session in ("mate", "gnome"):
return DE_GNOME
if "kde" in desktop_session:
return DE_KDE
if "xfce" in desktop_session:
return DE_XFCE
if "GNOME_DESKTOP_SESSION_ID" in env:
return DE_GNOME
if "KDE_FULL_SESSION" in env:
return DE_KDE
return DE_OTHER
def _mac_absolute_time_to_posix(timestamp):
# 978307200 is timestamp of 2001-01-01 00:00:00
return 978307200 + int(timestamp)
def pbkdf2_sha1(password, salt, iterations, key_length):
return pbkdf2_hmac("sha1", password, salt, iterations, key_length)
def _decrypt_aes_cbc(ciphertext, key, initialization_vector=b" " * 16):
plaintext = aes.unpad_pkcs7(
aes.aes_cbc_decrypt_bytes(ciphertext, key, initialization_vector))
try:
return plaintext.decode()
except UnicodeDecodeError:
logger.warning("failed to decrypt cookie (AES-CBC) because UTF-8 "
"decoding failed. Possibly the key is wrong?")
return None
def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag):
try:
plaintext = aes.aes_gcm_decrypt_and_verify_bytes(
ciphertext, key, authentication_tag, nonce)
except ValueError:
logger.warning("failed to decrypt cookie (AES-GCM) because MAC check "
"failed. Possibly the key is wrong?")
return None
try:
return plaintext.decode()
except UnicodeDecodeError:
logger.warning("failed to decrypt cookie (AES-GCM) because UTF-8 "
"decoding failed. Possibly the key is wrong?")
return None
def _decrypt_windows_dpapi(ciphertext):
"""
References:
- https://docs.microsoft.com/en-us/windows
/win32/api/dpapi/nf-dpapi-cryptunprotectdata
"""
from ctypes.wintypes import DWORD
class DATA_BLOB(ctypes.Structure):
_fields_ = [("cbData", DWORD),
("pbData", ctypes.POINTER(ctypes.c_char))]
buffer = ctypes.create_string_buffer(ciphertext)
blob_in = DATA_BLOB(ctypes.sizeof(buffer), buffer)
blob_out = DATA_BLOB()
ret = ctypes.windll.crypt32.CryptUnprotectData(
ctypes.byref(blob_in), # pDataIn
None, # ppszDataDescr: human readable description of pDataIn
None, # pOptionalEntropy: salt?
None, # pvReserved: must be NULL
None, # pPromptStruct: information about prompts to display
0, # dwFlags
ctypes.byref(blob_out) # pDataOut
)
if not ret:
logger.warning("failed to decrypt with DPAPI")
return None
result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
return result
def _find_most_recently_used_file(root, filename):
# if there are multiple browser profiles, take the most recently used one
paths = []
for curr_root, dirs, files in os.walk(root):
for file in files:
if file == filename:
paths.append(os.path.join(curr_root, file))
if not paths:
return None
return max(paths, key=lambda path: os.lstat(path).st_mtime)
def _is_path(value):
return os.path.sep in value
def _parse_browser_specification(
browser, profile=None, keyring=None, container=None, domain=None):
browser = browser.lower()
if browser not in SUPPORTED_BROWSERS:
raise ValueError("unsupported browser '{}'".format(browser))
if keyring and keyring not in SUPPORTED_KEYRINGS:
raise ValueError("unsupported keyring '{}'".format(keyring))
if profile and _is_path(profile):
profile = os.path.expanduser(profile)
return browser, profile, keyring, container, domain