API keys: skip undecryptable entries on load

APIKeyManager.load() decrypts every stored key with a dict comprehension
and no error handling. If the .key file no longer matches the ciphertext in
api_keys.json — key rotated, a partial/!mismatched data restore, or a
corrupted .key — Fernet.decrypt raises cryptography.fernet.InvalidToken.

app_initializer.py calls api_key_manager.load() during startup, so a single
undecryptable entry takes down the whole app at boot, and the user can't
reach the UI to fix it.

Decrypt each key in a loop and, on InvalidToken/ValueError, log a warning
and skip that one entry while still returning every key that decrypts
cleanly. One bad/stale key no longer blocks startup.

tests/test_api_key_manager_resilience.py saves a valid key, then injects an
entry encrypted under a different Fernet key (InvalidToken) and a malformed
token (ValueError), and asserts load() returns the good key and skips the
bad ones without raising. Fails before this change.
This commit is contained in:
Tatlatat
2026-06-02 18:28:26 +07:00
committed by GitHub
parent da3876c168
commit 9389cabed0
2 changed files with 47 additions and 5 deletions

View File

@@ -1,7 +1,10 @@
import os
import json
import logging
from typing import Dict
from cryptography.fernet import Fernet
from cryptography.fernet import Fernet, InvalidToken
logger = logging.getLogger(__name__)
class APIKeyManager:
def __init__(self, data_dir: str):
@@ -47,8 +50,12 @@ class APIKeyManager:
return {}
with open(self.api_keys_file, 'r', encoding="utf-8") as f:
encrypted_keys = json.load(f)
return {
provider: self.decrypt_api_key(key)
for provider, key in encrypted_keys.items()
}
decrypted = {}
for provider, key in encrypted_keys.items():
try:
decrypted[provider] = self.decrypt_api_key(key)
except (InvalidToken, ValueError) as e:
logger.warning("Failed to decrypt API key for %s: %s", provider, e)
return decrypted

View File

@@ -0,0 +1,35 @@
import os
import json
from src.api_key_manager import APIKeyManager
from cryptography.fernet import Fernet
def test_api_key_manager_load_resilience(tmp_path):
mgr = APIKeyManager(str(tmp_path))
# Save a valid key
mgr.save("good_provider", "good_value")
# Create another key manager/Fernet instance with a different key to produce an undecryptable token
other_key = Fernet.generate_key()
other_f = Fernet(other_key)
undecryptable_token = other_f.encrypt(b"bad_value").decode()
# Manually edit api_keys.json to include the undecryptable token
with open(mgr.api_keys_file, "r", encoding="utf-8") as f:
keys = json.load(f)
keys["bad_provider"] = undecryptable_token
# Also add a malformed/garbage token (causes ValueError/binascii.Error)
keys["garbage_provider"] = "not-a-valid-base64-fernet-token"
with open(mgr.api_keys_file, "w", encoding="utf-8") as f:
json.dump(keys, f)
# Load keys
loaded = mgr.load()
# Assert load() returns the still-decryptable key and skips the bad ones without raising
assert "good_provider" in loaded
assert loaded["good_provider"] == "good_value"
assert "bad_provider" not in loaded
assert "garbage_provider" not in loaded