Building Secure Systems: Putting It All Together
1. Why Should You Care?
Youโve learned the building blocks:
- Symmetric encryption (AES-GCM, ChaCha20)
- Asymmetric encryption (RSA, ECC)
- Hashing and MACs (SHA-256, HMAC)
- Digital signatures (ECDSA, EdDSA)
- TLS, certificates, key management
But knowing the pieces doesnโt mean you can build secure systems. This final article shows how to combine them correctly.
2. Principles of Secure System Design
Defense in Depth
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Defense in Depth โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Network โ Firewalls, TLS, network segmentation โ
โ โ โ
โ Transport โ TLS 1.3, certificate pinning โ
โ โ โ
โ Application โ Input validation, output encoding โ
โ โ โ
โ Data โ Encryption at rest, field-level encryption โ
โ โ โ
โ Access โ Authentication, authorization, audit โ
โ โ
โ Each layer protects against different threats โ
โ Breach of one layer doesn't compromise everything โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโPrinciple of Least Privilege
# BAD: One key for everything
master_key = load_key()
encrypt_user_data(master_key, data)
encrypt_logs(master_key, logs)
sign_tokens(master_key, token)
# GOOD: Separate keys with minimal permissions
class KeyRing:
def __init__(self, kms):
self.kms = kms
def get_user_data_key(self, user_id: str) -> bytes:
"""Per-user encryption key"""
return self.kms.derive_key(f"user-data:{user_id}")
def get_log_encryption_key(self) -> bytes:
"""Separate key for log encryption"""
return self.kms.derive_key("log-encryption")
def get_token_signing_key(self) -> bytes:
"""Separate key for token signing"""
return self.kms.derive_key("token-signing")Fail Secure
# BAD: Fail open
def check_access(token):
try:
claims = verify_token(token)
return claims.get('authorized', True) # Default to authorized!
except Exception:
return True # Allow on error!
# GOOD: Fail secure
def check_access(token):
try:
claims = verify_token(token)
if not claims.get('authorized'):
raise AuthorizationError("Not authorized")
return claims
except Exception as e:
log.warning(f"Access check failed: {e}")
raise AuthorizationError("Access denied") # Deny on error3. Designing an End-to-End Encrypted System
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ End-to-End Encrypted Messaging โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Client A Server Client B โ
โ โโโโโโโโ โโโโโโ โโโโโโโโ โ
โ โ
โ [Private Key A] [Public Keys Store] [Private Key B] โ
โ [Public Key A] โโโโโ> [Public Key A] <โโโโโโโ [Public Key B] โ
โ [Public Key B] โ
โ โ
โ Encrypt(PubKey_B, Relay encrypted Decrypt(PrivKey_B, โ
โ message) โโโ> message only โโโโโโโโโโโ> ciphertext) โ
โ โ
โ Server CANNOT read messages (no access to private keys) โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโImplementation
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes, serialization
import os
import json
class E2EClient:
"""End-to-end encrypted messaging client"""
def __init__(self, user_id: str):
self.user_id = user_id
self.identity_key = x25519.X25519PrivateKey.generate()
self.public_key = self.identity_key.public_key()
def get_public_key_bytes(self) -> bytes:
"""Export public key for server storage"""
return self.public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
def encrypt_message(self, recipient_public_key: bytes, message: str) -> dict:
"""Encrypt a message for a recipient"""
# Load recipient's public key
recipient_key = x25519.X25519PublicKey.from_public_bytes(recipient_public_key)
# Generate ephemeral key pair for forward secrecy
ephemeral_private = x25519.X25519PrivateKey.generate()
ephemeral_public = ephemeral_private.public_key()
# Derive shared secret
shared_secret = ephemeral_private.exchange(recipient_key)
# Derive encryption key using HKDF
encryption_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"e2e-message-encryption"
).derive(shared_secret)
# Encrypt message
nonce = os.urandom(12)
aesgcm = AESGCM(encryption_key)
ciphertext = aesgcm.encrypt(nonce, message.encode(), None)
return {
'ephemeral_public': ephemeral_public.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
).hex(),
'nonce': nonce.hex(),
'ciphertext': ciphertext.hex(),
'sender': self.user_id
}
def decrypt_message(self, encrypted_message: dict) -> str:
"""Decrypt a message sent to us"""
# Load sender's ephemeral public key
ephemeral_public = x25519.X25519PublicKey.from_public_bytes(
bytes.fromhex(encrypted_message['ephemeral_public'])
)
# Derive shared secret
shared_secret = self.identity_key.exchange(ephemeral_public)
# Derive decryption key
decryption_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"e2e-message-encryption"
).derive(shared_secret)
# Decrypt message
aesgcm = AESGCM(decryption_key)
plaintext = aesgcm.decrypt(
bytes.fromhex(encrypted_message['nonce']),
bytes.fromhex(encrypted_message['ciphertext']),
None
)
return plaintext.decode()
class E2EServer:
"""Server that relays encrypted messages (cannot read them)"""
def __init__(self):
self.public_keys = {} # user_id -> public_key_bytes
self.messages = {} # user_id -> [encrypted_messages]
def register_user(self, user_id: str, public_key: bytes):
"""Store user's public key"""
self.public_keys[user_id] = public_key
self.messages[user_id] = []
def get_public_key(self, user_id: str) -> bytes:
"""Get a user's public key for encryption"""
return self.public_keys.get(user_id)
def send_message(self, recipient_id: str, encrypted_message: dict):
"""Store encrypted message for recipient"""
# Server can only see: sender, recipient, timestamp, size
# Server CANNOT read message content
self.messages[recipient_id].append(encrypted_message)
def get_messages(self, user_id: str) -> list:
"""Get pending messages for a user"""
messages = self.messages.get(user_id, [])
self.messages[user_id] = [] # Clear after retrieval
return messages
# Usage example
def demo_e2e():
# Setup
server = E2EServer()
alice = E2EClient("alice")
bob = E2EClient("bob")
# Register public keys
server.register_user("alice", alice.get_public_key_bytes())
server.register_user("bob", bob.get_public_key_bytes())
# Alice sends message to Bob
bob_pubkey = server.get_public_key("bob")
encrypted = alice.encrypt_message(bob_pubkey, "Hello Bob! This is secret.")
server.send_message("bob", encrypted)
# Bob receives and decrypts
messages = server.get_messages("bob")
for msg in messages:
plaintext = bob.decrypt_message(msg)
print(f"Bob received from {msg['sender']}: {plaintext}")4. Secure API Design
Authentication Layer
import hmac
import hashlib
import time
from functools import wraps
class SecureAPI:
"""Secure API with multiple authentication methods"""
def __init__(self, secret_key: bytes):
self.secret_key = secret_key
self.token_expiry = 3600 # 1 hour
def create_access_token(self, user_id: str, permissions: list) -> str:
"""Create a signed access token"""
import json
import base64
payload = {
'user_id': user_id,
'permissions': permissions,
'issued_at': int(time.time()),
'expires_at': int(time.time()) + self.token_expiry
}
payload_json = json.dumps(payload, sort_keys=True)
payload_b64 = base64.urlsafe_b64encode(payload_json.encode()).decode()
signature = hmac.new(
self.secret_key,
payload_b64.encode(),
hashlib.sha256
).hexdigest()
return f"{payload_b64}.{signature}"
def verify_token(self, token: str) -> dict:
"""Verify and decode an access token"""
import json
import base64
try:
payload_b64, signature = token.rsplit('.', 1)
# Verify signature
expected_sig = hmac.new(
self.secret_key,
payload_b64.encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected_sig, signature):
raise ValueError("Invalid signature")
# Decode payload
payload = json.loads(base64.urlsafe_b64decode(payload_b64))
# Check expiry
if time.time() > payload['expires_at']:
raise ValueError("Token expired")
return payload
except Exception as e:
raise ValueError(f"Token verification failed: {e}")
def require_permission(self, permission: str):
"""Decorator to require specific permission"""
def decorator(func):
@wraps(func)
def wrapper(request, *args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
raise PermissionError("No token provided")
payload = self.verify_token(token)
if permission not in payload.get('permissions', []):
raise PermissionError(f"Missing permission: {permission}")
request.user = payload
return func(request, *args, **kwargs)
return wrapper
return decoratorRequest Signing for APIs
import hmac
import hashlib
import time
import urllib.parse
class SignedAPIClient:
"""Client that signs all API requests"""
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret.encode()
def sign_request(self, method: str, path: str, body: str = "",
query_params: dict = None) -> dict:
"""Generate authentication headers for a request"""
timestamp = str(int(time.time()))
nonce = os.urandom(16).hex()
# Canonical request string
query_string = urllib.parse.urlencode(sorted(query_params.items())) if query_params else ""
body_hash = hashlib.sha256(body.encode()).hexdigest()
canonical = f"{method}\n{path}\n{query_string}\n{timestamp}\n{nonce}\n{body_hash}"
# Sign
signature = hmac.new(
self.api_secret,
canonical.encode(),
hashlib.sha256
).hexdigest()
return {
'X-API-Key': self.api_key,
'X-Timestamp': timestamp,
'X-Nonce': nonce,
'X-Signature': signature
}
class SignedAPIServer:
"""Server that verifies signed requests"""
def __init__(self, secrets: dict):
self.secrets = secrets # api_key -> api_secret
self.used_nonces = set() # Prevent replay attacks
self.max_clock_skew = 300 # 5 minutes
def verify_request(self, method: str, path: str, headers: dict,
body: str = "", query_params: dict = None) -> str:
"""Verify request signature, return user_id if valid"""
# Extract auth headers
api_key = headers.get('X-API-Key')
timestamp = headers.get('X-Timestamp')
nonce = headers.get('X-Nonce')
signature = headers.get('X-Signature')
if not all([api_key, timestamp, nonce, signature]):
raise ValueError("Missing authentication headers")
# Check timestamp (prevent replay of old requests)
request_time = int(timestamp)
if abs(time.time() - request_time) > self.max_clock_skew:
raise ValueError("Request timestamp too far from server time")
# Check nonce (prevent replay attacks)
if nonce in self.used_nonces:
raise ValueError("Nonce already used")
self.used_nonces.add(nonce)
# Get API secret
api_secret = self.secrets.get(api_key)
if not api_secret:
raise ValueError("Unknown API key")
# Verify signature
query_string = urllib.parse.urlencode(sorted(query_params.items())) if query_params else ""
body_hash = hashlib.sha256(body.encode()).hexdigest()
canonical = f"{method}\n{path}\n{query_string}\n{timestamp}\n{nonce}\n{body_hash}"
expected_sig = hmac.new(
api_secret.encode(),
canonical.encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected_sig, signature):
raise ValueError("Invalid signature")
return api_key # Return authenticated identity5. Data Encryption Patterns
Field-Level Encryption
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import json
import os
class FieldEncryption:
"""Encrypt specific fields in a record"""
def __init__(self, kms):
self.kms = kms
self.sensitive_fields = {'ssn', 'credit_card', 'password_hash', 'private_data'}
def encrypt_record(self, record: dict, context: str) -> dict:
"""Encrypt sensitive fields in a record"""
encrypted = record.copy()
encrypted['_encrypted_fields'] = {}
for field in self.sensitive_fields:
if field in record:
# Get field-specific key
key = self.kms.derive_key(f"field:{field}:{context}")
# Encrypt
nonce = os.urandom(12)
aesgcm = AESGCM(key)
value_bytes = json.dumps(record[field]).encode()
ciphertext = aesgcm.encrypt(nonce, value_bytes, field.encode())
# Store encrypted value
encrypted['_encrypted_fields'][field] = {
'nonce': nonce.hex(),
'ciphertext': ciphertext.hex()
}
encrypted[field] = "[ENCRYPTED]"
return encrypted
def decrypt_record(self, encrypted: dict, context: str) -> dict:
"""Decrypt sensitive fields in a record"""
record = encrypted.copy()
encrypted_fields = record.pop('_encrypted_fields', {})
for field, enc_data in encrypted_fields.items():
key = self.kms.derive_key(f"field:{field}:{context}")
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(
bytes.fromhex(enc_data['nonce']),
bytes.fromhex(enc_data['ciphertext']),
field.encode()
)
record[field] = json.loads(plaintext)
return recordSearchable Encryption
import hmac
import hashlib
class SearchableEncryption:
"""Allow searching encrypted data without decrypting everything"""
def __init__(self, search_key: bytes, encryption_key: bytes):
self.search_key = search_key
self.encryption_key = encryption_key
def create_search_token(self, value: str) -> str:
"""Create a deterministic token for searching"""
# Normalize value for consistent matching
normalized = value.lower().strip()
# Create search token (deterministic, one-way)
token = hmac.new(
self.search_key,
normalized.encode(),
hashlib.sha256
).hexdigest()
return token
def encrypt_with_search(self, value: str) -> dict:
"""Encrypt a value while enabling search"""
# Create search token for indexing
search_token = self.create_search_token(value)
# Encrypt the actual value (non-deterministic)
nonce = os.urandom(12)
aesgcm = AESGCM(self.encryption_key)
ciphertext = aesgcm.encrypt(nonce, value.encode(), None)
return {
'search_token': search_token,
'nonce': nonce.hex(),
'ciphertext': ciphertext.hex()
}
def search(self, query: str, encrypted_records: list) -> list:
"""Search encrypted records"""
search_token = self.create_search_token(query)
# Match on search token (server can do this)
matches = [
r for r in encrypted_records
if r.get('search_token') == search_token
]
# Decrypt matches (client does this)
results = []
for match in matches:
aesgcm = AESGCM(self.encryption_key)
plaintext = aesgcm.decrypt(
bytes.fromhex(match['nonce']),
bytes.fromhex(match['ciphertext']),
None
)
results.append(plaintext.decode())
return results6. Security Checklist
Before Deployment
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Security Deployment Checklist โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ [ ] TLS 1.3 enabled, TLS 1.2 minimum โ
โ [ ] All secrets in environment variables or secrets manager โ
โ [ ] No hardcoded keys, tokens, or passwords in code โ
โ [ ] .gitignore includes .env, *.pem, *.key files โ
โ [ ] Passwords hashed with Argon2id or bcrypt โ
โ [ ] Encryption keys rotated on schedule โ
โ [ ] Audit logging for security-relevant operations โ
โ [ ] Rate limiting on authentication endpoints โ
โ [ ] Input validation on all user inputs โ
โ [ ] HTTPS enforced (HSTS header) โ
โ [ ] Security headers configured (CSP, X-Frame-Options, etc.) โ
โ [ ] Dependencies checked for vulnerabilities โ
โ [ ] Error messages don't leak sensitive information โ
โ [ ] Session management secure (httpOnly, secure cookies) โ
โ [ ] CSRF protection enabled โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโContinuous Security
class SecurityMonitor:
"""Continuous security monitoring"""
def __init__(self, alert_service):
self.alerts = alert_service
def check_certificate_expiry(self, cert_path: str, warn_days: int = 30):
"""Alert before certificates expire"""
from cryptography import x509
from datetime import datetime, timedelta
with open(cert_path, 'rb') as f:
cert = x509.load_pem_x509_certificate(f.read())
days_until_expiry = (cert.not_valid_after_utc - datetime.utcnow()).days
if days_until_expiry < warn_days:
self.alerts.send(
level='warning',
message=f"Certificate expires in {days_until_expiry} days"
)
def check_failed_auth_rate(self, window_minutes: int = 5, threshold: int = 100):
"""Alert on suspicious authentication failure rate"""
failures = self.get_auth_failures(window_minutes)
if failures > threshold:
self.alerts.send(
level='critical',
message=f"High auth failure rate: {failures} in {window_minutes} min"
)
def check_key_age(self, key_metadata: dict, max_age_days: int = 90):
"""Alert when keys need rotation"""
from datetime import datetime
created = datetime.fromisoformat(key_metadata['created_at'])
age_days = (datetime.now() - created).days
if age_days > max_age_days:
self.alerts.send(
level='warning',
message=f"Key {key_metadata['name']} is {age_days} days old"
)7. Common Mistakes to Avoid
Mistake 1: Security Through Obscurity
# BAD: Relying on obscurity
def "secure"_encrypt(data):
# "Nobody will figure out my custom algorithm"
result = ""
for char in data:
result += chr(ord(char) + 3) # ROT3 is not encryption!
return result
# GOOD: Use proven algorithms
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
def secure_encrypt(key: bytes, data: bytes) -> tuple[bytes, bytes]:
nonce = os.urandom(12)
aesgcm = AESGCM(key)
return nonce, aesgcm.encrypt(nonce, data, None)Mistake 2: Insufficient Randomness
import random
import secrets
# BAD: Predictable randomness
def bad_token():
return ''.join(random.choices('abcdef0123456789', k=32))
# GOOD: Cryptographically secure randomness
def good_token():
return secrets.token_hex(16)
# BAD: Seeding with time
random.seed(int(time.time())) # Predictable!
# GOOD: System entropy
secure_random = secrets.SystemRandom()Mistake 3: Logging Sensitive Data
import logging
# BAD: Logging sensitive data
def authenticate_bad(username, password):
logging.info(f"Auth attempt: user={username}, pass={password}") # NO!
# GOOD: Safe logging
def authenticate_good(username, password):
logging.info(f"Auth attempt: user={username}")
# password is never logged
# GOOD: Redact sensitive fields
def safe_log(data: dict) -> dict:
"""Create a copy of data with sensitive fields redacted"""
sensitive = {'password', 'token', 'secret', 'key', 'ssn', 'credit_card'}
return {
k: '[REDACTED]' if k.lower() in sensitive else v
for k, v in data.items()
}
logging.info(f"Request: {safe_log(request_data)}")8. Summary
Three things to remember:
Defense in depth. Donโt rely on any single security measure. Layer your defenses: TLS for transport, encryption for storage, authentication for access, audit for detection.
Use proven solutions. Donโt invent cryptography. Use TLS 1.3, AES-GCM, Argon2, established libraries. The security community has already solved most problems.
Fail secure, log everything. When something goes wrong, deny access by default. Log all security-relevant events so you can detect and investigate attacks.
9. Series Conclusion
Congratulations! Youโve completed the Encryption for Developers series.
You now understand:
- How symmetric and asymmetric encryption work
- When to use AES-GCM vs ChaCha20 vs RSA vs ECC
- Why we hash passwords and how to do it right
- How TLS protects data in transit
- How to manage cryptographic keys
- How to build secure systems
Remember: Cryptography is a tool, not a solution. Security is about understanding threats, making good design decisions, and staying vigilant.
Keep learning. Stay updated on vulnerabilities. And when in doubt, consult a security professional.
Good luck building secure systems!
