第13章:安全架构¶
13.1 安全架构概述¶
安全架构的目标¶
- 保密性:保护数据不被未授权访问
- 完整性:保护数据不被篡改
- 可用性:保证服务持续可用
- 可追溯性:记录所有操作
安全架构的原则¶
- 最小权限原则:只授予必要的权限
- 纵深防御:多层防护
- 零信任:不信任任何内部网络
- 安全左移:在开发阶段就考虑安全
13.2 认证与授权¶
13.2.1 认证¶
基本认证¶
Python
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
app = FastAPI()
security = HTTPBasic()
def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
correct_username = secrets.compare_digest(credentials.username, "admin")
correct_password = secrets.compare_digest(credentials.password, "secret")
if not (correct_username and correct_password):
raise HTTPException(status_code=401, detail="Incorrect username or password")
return credentials.username
@app.get("/users/me")
def read_current_user(username: str = Depends(get_current_user)):
return {"username": username}
JWT认证¶
Python
import jwt
from datetime import datetime, timedelta, timezone
# ⚠️ 仅用于本地学习演示!生产环境必须从环境变量或密钥管理服务获取
import os
SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "your-secret-key-FOR-DEV-ONLY")
ALGORITHM = "HS256"
def create_access_token(data: dict):
to_encode = data.copy()
# datetime.utcnow() 在 Python 3.12 中已弃用,使用 timezone-aware 的 UTC 时间
expire = datetime.now(timezone.utc) + timedelta(minutes=30)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str):
try: # try/except捕获异常
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.InvalidTokenError:
return None
# 使用示例
token = create_access_token({"sub": "user123"})
payload = verify_token(token)
OAuth2¶
Python
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
# 验证token
payload = verify_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid token")
return payload
@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)):
return current_user
13.2.2 授权¶
RBAC(基于角色的访问控制)¶
Python
from functools import wraps
# 角色定义
ROLES = {
"admin": ["read", "write", "delete"],
"user": ["read"],
"guest": []
}
def require_role(role: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
current_user = kwargs.get('current_user')
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")
user_role = current_user.get('role')
if user_role != role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@app.get("/admin")
@require_role("admin")
async def admin_endpoint(current_user: dict = Depends(get_current_user)):
return {"message": "Admin access"}
ABAC(基于属性的访问控制)¶
Python
def require_permission(permission: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs): # *args接收任意位置参数;**kwargs接收任意关键字参数
current_user = kwargs.get('current_user')
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")
user_permissions = current_user.get('permissions', [])
if permission not in user_permissions:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@app.get("/users")
@require_permission("user:read")
async def read_users(current_user: dict = Depends(get_current_user)):
return {"users": []}
13.3 数据加密¶
13.3.1 对称加密¶
Python
from cryptography.fernet import Fernet
# 生成密钥
key = Fernet.generate_key()
cipher_suite = Fernet(key)
# 加密
message = "Hello, World!"
encrypted_message = cipher_suite.encrypt(message.encode())
# 解密
decrypted_message = cipher_suite.decrypt(encrypted_message).decode()
13.3.2 非对称加密¶
Python
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
# 生成密钥对
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
public_key = private_key.public_key()
# 加密
message = "Hello, World!"
encrypted_message = public_key.encrypt(
message.encode(),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# 解密
decrypted_message = private_key.decrypt(
encrypted_message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
).decode()
13.3.3 哈希¶
Python
import hashlib
# SHA256
message = "Hello, World!"
hash_value = hashlib.sha256(message.encode()).hexdigest()
# 密码哈希(使用bcrypt)
import bcrypt
password = "mypassword".encode()
salt = bcrypt.gensalt()
hashed_password = bcrypt.hashpw(password, salt)
# 验证密码
if bcrypt.checkpw(password, hashed_password):
print("Password is correct")
13.4 API安全¶
13.4.1 HTTPS¶
Python
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.get("/")
def read_root():
return {"message": "Hello, World!"}
# 启动HTTPS服务器
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=443,
ssl_keyfile="key.pem",
ssl_certfile="cert.pem"
)
13.4.2 输入验证¶
Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator # Pydantic数据验证模型
app = FastAPI()
class UserCreate(BaseModel):
username: str
email: str
password: str
@validator('email')
def validate_email(cls, v):
if '@' not in v:
raise ValueError('Invalid email')
return v
@validator('password')
def validate_password(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
return v
@app.post("/users")
def create_user(user: UserCreate):
# 处理用户创建
return {"message": "User created"}
13.4.3 速率限制¶
Python
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.get("/api/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
return {"data": []}
13.5 安全防护¶
13.5.1 SQL注入防护¶
Python
# 不安全的代码
# query = f"SELECT * FROM users WHERE username = '{username}'"
# 安全的代码
query = "SELECT * FROM users WHERE username = %s"
cursor.execute(query, (username,))
13.5.2 XSS防护¶
Python
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from markupsafe import escape
app = FastAPI()
templates = Jinja2Templates(directory="templates")
@app.get("/search")
def search(request: Request, q: str):
# 转义用户输入
safe_query = escape(q)
return templates.TemplateResponse(
"search.html",
{"request": request, "query": safe_query}
)
13.5.3 CSRF防护¶
Python
from fastapi import FastAPI, Depends, HTTPException, Request
# 需安装: pip install fastapi-csrf-protect
from fastapi_csrf_protect import CsrfProtect
app = FastAPI()
class CsrfSettings:
# ⚠️ 仅用于演示,生产环境必须使用环境变量
secret_key: str = os.environ.get("CSRF_SECRET_KEY", "your-secret-key-FOR-DEV-ONLY")
@CsrfProtect.load_config
def get_csrf_config():
return CsrfSettings()
@app.post("/form")
async def handle_form(request: Request, csrf_protect: CsrfProtect = Depends()): # async def定义异步函数;用await调用
await csrf_protect.validate_csrf(request) # await等待异步操作完成
# 处理表单
return {"message": "Form submitted"}
13.6 实战练习¶
练习1:实现一个认证系统¶
实现一个认证系统,包括: 1. 用户注册 2. 用户登录 3. JWT认证 4. 权限控制
练习2:实现一个API安全方案¶
实现一个API安全方案,包括: 1. HTTPS 2. 输入验证 3. 速率限制 4. 日志审计
练习3:实现一个数据加密方案¶
实现一个数据加密方案,包括: 1. 对称加密 2. 非对称加密 3. 密码哈希 4. 密钥管理
13.7 面试准备¶
常见面试题¶
- 什么是认证?什么是授权?
- JWT的工作原理是什么?
- 如何防止SQL注入?
- 如何防止XSS攻击?
- 什么是零信任架构?
项目经验准备¶
准备一个安全项目: - 使用的安全技术 - 遇到的挑战 - 解决方案 - 项目成果
13.8 总结¶
本章介绍了安全架构,包括认证授权、数据加密、API安全和安全防护。安全是系统设计的重要考虑因素。
关键要点¶
- 安全架构包括认证、授权、加密、防护
- 认证包括基本认证、JWT、OAuth2
- 授权包括RBAC、ABAC
- 数据加密包括对称加密、非对称加密、哈希
- 安全防护包括SQL注入、XSS、CSRF防护
下一步¶
下一章将深入学习性能优化,包括性能分析、优化策略等内容。