密钥管理:密码学最难的部分
Published: Sat Feb 01 2025 | Modified: Sat Feb 07 2026 , 20 minutes reading.
1. 为什么要关心这个问题?
你已经实现了 AES-256-GCM 加密。你的数据受到最强密码之一的保护。但密钥在哪里?
# 你的安全加密
key = b"my-super-secret-key-12345678901" # 🔥 硬编码!
ciphertext = encrypt(key, data)那个硬编码的密钥使你的加密变成了表演。任何能访问你代码的人都能访问你所有的加密数据。
密钥管理是大多数密码系统失败的地方。让我们来解决这个问题。
2. 密钥生命周期
┌─────────────────────────────────────────────────────────────────┐
│ 密钥生命周期 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 生成 → 分发 → 存储 → 使用 → 轮换 → 销毁
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ ▼
│ 安全 安全 安全 最小化 定期 安全
│ 随机 通道 位置 暴露 计划 删除
│ │
└─────────────────────────────────────────────────────────────────┘每个阶段都有自己的风险和要求。
3. 密钥生成
正确的方式
import os
import secrets
# 正确:使用密码学安全的随机性
key_256 = secrets.token_bytes(32) # 256 位
key_128 = secrets.token_bytes(16) # 128 位
# 或使用 os.urandom(等效)
key = os.urandom(32)
# 对于特定算法,使用它们的密钥生成
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = AESGCM.generate_key(bit_length=256)
from cryptography.hazmat.primitives.asymmetric import ed25519
private_key = ed25519.Ed25519PrivateKey.generate()错误的方式
import random
import hashlib
# 错误:使用 random 模块(不是密码学安全的)
key = bytes([random.randint(0, 255) for _ in range(32)])
# 错误:从可预测的来源派生
key = hashlib.sha256(b"password").digest()
# 错误:使用时间戳
key = hashlib.sha256(str(time.time()).encode()).digest()
# 错误:使用用户名或其他可预测数据
key = hashlib.sha256(username.encode()).digest()从密码派生密钥
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
import os
def derive_key_from_password(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
"""从密码派生加密密钥"""
if salt is None:
salt = os.urandom(16)
# scrypt 是内存困难的(首选)
kdf = Scrypt(
salt=salt,
length=32,
n=2**17,
r=8,
p=1
)
key = kdf.derive(password.encode())
return key, salt
# 使用
password = "用户的密码短语"
key, salt = derive_key_from_password(password)
# 将盐值与加密数据一起存储
# 永远不要存储密码或派生的密钥4. 密钥存储
存储选项(从最差到最好)
┌─────────────────────────────────────────────────────────────────┐
│ 选项 │ 安全性 │ 使用场景 │
├─────────────────────┼────────┼────────────────────────────────┤
│ 源代码中硬编码 │ ✗✗✗ │ 永远不要 │
│ 配置文件 │ ✗✗ │ 仅开发环境 │
│ 环境变量 │ ✗ │ 简单部署 │
│ 加密文件 │ ○ │ 当 HSM 不可用时 │
│ 密钥管理器 │ ✓ │ 云部署 │
│ HSM/KMS │ ✓✓ │ 生产环境、合规 │
│ 硬件密钥 │ ✓✓✓ │ 最高安全性 │
└─────────────────────────────────────────────────────────────────┘环境变量
import os
# 比硬编码略有改进
def get_encryption_key():
key_hex = os.environ.get('ENCRYPTION_KEY')
if not key_hex:
raise ValueError("ENCRYPTION_KEY 环境变量未设置")
return bytes.fromhex(key_hex)
# 在环境中设置(不是在代码中!)
# export ENCRYPTION_KEY=$(python -c "import secrets; print(secrets.token_hex(32))")加密密钥文件
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
import os
import json
class KeyFile:
"""在加密文件中存储加密密钥"""
def __init__(self, filepath: str, master_password: str):
self.filepath = filepath
self.master_key = self._derive_master_key(master_password)
def _derive_master_key(self, password: str) -> bytes:
# 在生产中,单独存储盐值或在文件头中
salt = b"static-salt-for-demo" # 生产中使用随机盐值!
kdf = Scrypt(salt=salt, length=32, n=2**17, r=8, p=1)
return kdf.derive(password.encode())
def store_key(self, key_name: str, key_value: bytes):
"""在加密文件中存储密钥"""
keys = self._load_all()
keys[key_name] = key_value.hex()
self._save_all(keys)
def get_key(self, key_name: str) -> bytes:
"""从加密文件中检索密钥"""
keys = self._load_all()
if key_name not in keys:
raise KeyError(f"密钥 '{key_name}' 未找到")
return bytes.fromhex(keys[key_name])
def _load_all(self) -> dict:
if not os.path.exists(self.filepath):
return {}
with open(self.filepath, 'rb') as f:
encrypted = f.read()
fernet = Fernet(self._to_fernet_key(self.master_key))
decrypted = fernet.decrypt(encrypted)
return json.loads(decrypted)
def _save_all(self, keys: dict):
data = json.dumps(keys).encode()
fernet = Fernet(self._to_fernet_key(self.master_key))
encrypted = fernet.encrypt(data)
with open(self.filepath, 'wb') as f:
f.write(encrypted)
def _to_fernet_key(self, key: bytes) -> bytes:
import base64
return base64.urlsafe_b64encode(key)
# 使用
keyfile = KeyFile("/secure/keys.enc", os.environ['KEY_FILE_PASSWORD'])
keyfile.store_key("database_key", os.urandom(32))
db_key = keyfile.get_key("database_key")云 KMS 集成
# AWS KMS
import boto3
class AWSKeyManager:
def __init__(self, key_id: str):
self.kms = boto3.client('kms')
self.key_id = key_id
def encrypt(self, plaintext: bytes) -> bytes:
response = self.kms.encrypt(
KeyId=self.key_id,
Plaintext=plaintext
)
return response['CiphertextBlob']
def decrypt(self, ciphertext: bytes) -> bytes:
response = self.kms.decrypt(
KeyId=self.key_id,
CiphertextBlob=ciphertext
)
return response['Plaintext']
def generate_data_key(self) -> tuple[bytes, bytes]:
"""生成由 KMS 加密的数据密钥"""
response = self.kms.generate_data_key(
KeyId=self.key_id,
KeySpec='AES_256'
)
# 返回明文密钥供立即使用
# 存储加密密钥供以后检索
return response['Plaintext'], response['CiphertextBlob']
# 使用
km = AWSKeyManager('alias/my-master-key')
plaintext_key, encrypted_key = km.generate_data_key()
# 使用 plaintext_key 进行加密
# 将 encrypted_key 与密文一起存储
# 从内存中丢弃 plaintext_key# Google Cloud KMS
from google.cloud import kms
class GCPKeyManager:
def __init__(self, project_id: str, location: str, keyring: str, key_name: str):
self.client = kms.KeyManagementServiceClient()
self.key_path = self.client.crypto_key_path(
project_id, location, keyring, key_name
)
def encrypt(self, plaintext: bytes) -> bytes:
response = self.client.encrypt(
request={'name': self.key_path, 'plaintext': plaintext}
)
return response.ciphertext
def decrypt(self, ciphertext: bytes) -> bytes:
response = self.client.decrypt(
request={'name': self.key_path, 'ciphertext': ciphertext}
)
return response.plaintext5. 密钥层次结构
信封加密
主密钥(在 HSM/KMS 中)
│
├── 解密 → 数据加密密钥 1(加密的)
│ └── 加密 → 数据集 A
│
├── 解密 → 数据加密密钥 2(加密的)
│ └── 加密 → 数据集 B
│
└── 解密 → 数据加密密钥 3(加密的)
└── 加密 → 数据集 C
好处:
- 主密钥永远不离开 HSM
- 数据密钥可以独立轮换
- 数据密钥与加密数据一起存储(加密形式)
- 泄露的数据密钥只影响一个数据集实现
import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
class EnvelopeEncryption:
"""使用 KMS 保护的主密钥进行信封加密"""
def __init__(self, kms_client):
self.kms = kms_client
def encrypt(self, plaintext: bytes) -> dict:
"""使用信封加密加密数据"""
# 为此加密生成新的数据密钥
dek_plaintext, dek_encrypted = self.kms.generate_data_key()
# 用 DEK 加密数据
nonce = os.urandom(12)
aesgcm = AESGCM(dek_plaintext)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
# 从内存中清除 DEK(Python 限制,但尽力而为)
del dek_plaintext
return {
'encrypted_dek': dek_encrypted,
'nonce': nonce,
'ciphertext': ciphertext
}
def decrypt(self, encrypted_data: dict) -> bytes:
"""使用信封加密解密数据"""
# 使用 KMS 解密 DEK
dek = self.kms.decrypt(encrypted_data['encrypted_dek'])
# 用 DEK 解密数据
aesgcm = AESGCM(dek)
plaintext = aesgcm.decrypt(
encrypted_data['nonce'],
encrypted_data['ciphertext'],
None
)
# 从内存中清除 DEK
del dek
return plaintext6. 密钥轮换
为什么要轮换密钥?
密钥轮换的原因:
1. 限制潜在泄露的暴露
2. 合规要求(PCI-DSS 等)
3. 人员变动(有密钥访问权限的人离开)
4. 算法更新(SHA-1 → SHA-256)
5. 密钥材料耗尽(对于好的密码理论上的问题)
轮换策略:
- 基于时间:每 N 天/月轮换
- 基于事件:在安全事件时轮换
- 基于使用:在 N 次加密后轮换轮换实现
from datetime import datetime, timedelta
import json
class RotatingKeyManager:
"""管理带版本控制的密钥轮换"""
def __init__(self, storage, kms):
self.storage = storage
self.kms = kms
def get_current_key(self, key_name: str) -> tuple[bytes, int]:
"""获取当前活动密钥及其版本"""
metadata = self._get_metadata(key_name)
current_version = metadata['current_version']
encrypted_key = metadata['versions'][str(current_version)]['key']
key = self.kms.decrypt(encrypted_key)
return key, current_version
def get_key_by_version(self, key_name: str, version: int) -> bytes:
"""获取特定密钥版本用于解密"""
metadata = self._get_metadata(key_name)
version_data = metadata['versions'].get(str(version))
if not version_data:
raise KeyError(f"密钥版本 {version} 未找到")
return self.kms.decrypt(version_data['key'])
def rotate_key(self, key_name: str) -> int:
"""创建新密钥版本并设为当前"""
metadata = self._get_metadata(key_name)
# 生成新密钥
new_key = os.urandom(32)
encrypted_key = self.kms.encrypt(new_key)
# 创建新版本
new_version = metadata['current_version'] + 1
metadata['versions'][str(new_version)] = {
'key': encrypted_key,
'created_at': datetime.now().isoformat(),
'status': 'active'
}
# 更新当前版本
metadata['current_version'] = new_version
metadata['versions'][str(new_version - 1)]['status'] = 'decrypt-only'
self._save_metadata(key_name, metadata)
return new_version
def encrypt_with_rotation(self, key_name: str, plaintext: bytes) -> dict:
"""使用当前密钥加密,包含版本供以后解密"""
key, version = self.get_current_key(key_name)
nonce = os.urandom(12)
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return {
'version': version,
'nonce': nonce.hex(),
'ciphertext': ciphertext.hex()
}
def decrypt_with_rotation(self, key_name: str, encrypted_data: dict) -> bytes:
"""使用正确的密钥版本解密"""
version = encrypted_data['version']
key = self.get_key_by_version(key_name, version)
aesgcm = AESGCM(key)
return aesgcm.decrypt(
bytes.fromhex(encrypted_data['nonce']),
bytes.fromhex(encrypted_data['ciphertext']),
None
)
def _get_metadata(self, key_name: str) -> dict:
return self.storage.get(f"key_metadata:{key_name}") or {
'current_version': 0,
'versions': {}
}
def _save_metadata(self, key_name: str, metadata: dict):
self.storage.set(f"key_metadata:{key_name}", metadata)轮换后重新加密
def re_encrypt_all_data(key_manager, data_store, key_name: str):
"""用新的当前密钥重新加密所有数据"""
current_key, current_version = key_manager.get_current_key(key_name)
for record_id in data_store.list_all():
record = data_store.get(record_id)
# 如果已经使用当前密钥版本则跳过
if record['key_version'] == current_version:
continue
# 用旧密钥解密
old_key = key_manager.get_key_by_version(key_name, record['key_version'])
plaintext = decrypt(old_key, record['ciphertext'], record['nonce'])
# 用新密钥重新加密
new_ciphertext, new_nonce = encrypt(current_key, plaintext)
# 更新记录
data_store.update(record_id, {
'ciphertext': new_ciphertext,
'nonce': new_nonce,
'key_version': current_version
})7. 密钥销毁
安全密钥删除
import ctypes
import os
def secure_zero(data: bytearray):
"""安全地将内存清零(Python 中尽力而为)"""
# 获取 bytearray 缓冲区的地址
addr = id(data) + 32 # Python 对象头偏移
size = len(data)
# 用零覆盖
ctypes.memset(addr, 0, size)
def secure_delete_key(key: bytes) -> None:
"""尽力安全删除密钥材料"""
# 转换为可变 bytearray
key_array = bytearray(key)
# 多次覆盖
for _ in range(3):
secure_zero(key_array)
for i in range(len(key_array)):
key_array[i] = os.urandom(1)[0]
secure_zero(key_array)
# 清除引用
del key_array
# 注意:Python 的内存管理使真正的安全删除很困难
# 对于关键应用,使用具有更好内存控制的语言
# 或将密钥保存在 HSM 中,它们永远不会离开安全硬件密钥销毁策略
from datetime import datetime, timedelta
class KeyDestructionPolicy:
"""管理密钥生命周期和销毁"""
def __init__(self, key_manager, archive_storage):
self.key_manager = key_manager
self.archive = archive_storage
def schedule_destruction(self, key_name: str, version: int,
days_until_destruction: int = 90):
"""安排密钥版本进行销毁"""
destruction_date = datetime.now() + timedelta(days=days_until_destruction)
self.key_manager.update_version_status(
key_name, version,
status='pending-destruction',
destruction_date=destruction_date.isoformat()
)
def execute_pending_destructions(self):
"""销毁已过销毁日期的密钥"""
pending = self.key_manager.get_pending_destructions()
for key_info in pending:
if datetime.now() >= datetime.fromisoformat(key_info['destruction_date']):
# 归档元数据(不是密钥本身!)
self.archive.store({
'key_name': key_info['key_name'],
'version': key_info['version'],
'destroyed_at': datetime.now().isoformat(),
'created_at': key_info['created_at']
})
# 销毁密钥
self.key_manager.destroy_key_version(
key_info['key_name'],
key_info['version']
)8. 密钥安全最佳实践
访问控制
from enum import Enum
from functools import wraps
class KeyPermission(Enum):
ENCRYPT = 'encrypt'
DECRYPT = 'decrypt'
ROTATE = 'rotate'
DESTROY = 'destroy'
ADMIN = 'admin'
class KeyAccessControl:
"""控制谁可以对密钥做什么"""
def __init__(self):
self.permissions = {} # key_name -> {user_id -> set(permissions)}
def grant(self, key_name: str, user_id: str, permission: KeyPermission):
if key_name not in self.permissions:
self.permissions[key_name] = {}
if user_id not in self.permissions[key_name]:
self.permissions[key_name][user_id] = set()
self.permissions[key_name][user_id].add(permission)
def check(self, key_name: str, user_id: str, permission: KeyPermission) -> bool:
user_perms = self.permissions.get(key_name, {}).get(user_id, set())
return permission in user_perms or KeyPermission.ADMIN in user_perms
def require(self, key_name: str, permission: KeyPermission):
"""装饰器,要求函数有特定权限"""
def decorator(func):
@wraps(func)
def wrapper(self, user_id: str, *args, **kwargs):
if not self.acl.check(key_name, user_id, permission):
raise PermissionError(
f"用户 {user_id} 缺少对 {key_name} 的 {permission.value} 权限"
)
return func(self, user_id, *args, **kwargs)
return wrapper
return decorator审计日志
from datetime import datetime
import json
class KeyAuditLog:
"""记录所有密钥操作以供审计"""
def __init__(self, storage):
self.storage = storage
def log(self, event_type: str, key_name: str, user_id: str,
details: dict = None, success: bool = True):
entry = {
'timestamp': datetime.now().isoformat(),
'event_type': event_type,
'key_name': key_name,
'user_id': user_id,
'success': success,
'details': details or {}
}
self.storage.append('key_audit_log', entry)
def log_key_access(self, key_name: str, user_id: str, operation: str):
self.log('key_access', key_name, user_id, {'operation': operation})
def log_key_rotation(self, key_name: str, user_id: str,
old_version: int, new_version: int):
self.log('key_rotation', key_name, user_id, {
'old_version': old_version,
'new_version': new_version
})
def log_key_destruction(self, key_name: str, user_id: str, version: int):
self.log('key_destruction', key_name, user_id, {'version': version})9. 常见错误
错误 1:密钥在源代码控制中
# 错误:代码中的密钥
SECRET_KEY = "aGVsbG8gd29ybGQK"
# 错误:配置文件中的密钥会被提交
# config.yaml:
# encryption_key: "aGVsbG8gd29ybGQK"
# 正确:从环境或密钥管理器获取密钥
SECRET_KEY = os.environ.get('SECRET_KEY')
# 检查你的 .gitignore!
# .env 文件永远不应该被提交错误 2:所有事情使用同一个密钥
# 错误:一个密钥用于所有目的
MASTER_KEY = os.environ['KEY']
encrypt_data(MASTER_KEY, data)
sign_token(MASTER_KEY, token)
encrypt_session(MASTER_KEY, session)
# 正确:为每个目的派生单独的密钥
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
def derive_purpose_key(master_key: bytes, purpose: str) -> bytes:
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=purpose.encode()
)
return hkdf.derive(master_key)
encryption_key = derive_purpose_key(master_key, "data-encryption")
signing_key = derive_purpose_key(master_key, "token-signing")
session_key = derive_purpose_key(master_key, "session-encryption")错误 3:泄露后不轮换
# 当检测到泄露时:
def incident_response(key_manager, key_name: str):
# 1. 立即轮换到新密钥
new_version = key_manager.rotate_key(key_name)
# 2. 将旧版本标记为已泄露(不仅仅是仅解密)
key_manager.mark_compromised(key_name, new_version - 1)
# 3. 开始重新加密所有数据(优先处理敏感数据)
schedule_reencryption(key_name, priority='critical')
# 4. 审计谁有权访问已泄露的密钥
generate_access_report(key_name, new_version - 1)
# 5. 通知安全团队
alert_security_team(key_name, 'key_compromise')10. 本章小结
三点要记住:
密钥需要完整的生命周期。 生成、存储、分发、轮换和销毁——每个阶段都需要仔细的安全考虑。尽可能使用 HSM/KMS。
使用信封加密。 用 KMS 中的主密钥保护数据密钥。这限制了暴露、启用密钥轮换,并将主密钥保留在硬件中。
定期轮换密钥,在任何疑似泄露后立即轮换。 将密钥版本与加密数据一起包含,这样你可以用旧密钥解密同时用新密钥加密。
11. 下一步
我们已经涵盖了密码学构建块和密钥管理。现在是时候把所有东西放在一起了。
在最后一篇文章中:构建安全系统——应用我们学到的一切来设计端到端加密系统、安全 API 和纵深防御。
