跳转至

第13章:安全架构

安全架构

13.1 安全架构概述

安全架构的目标

  1. 保密性:保护数据不被未授权访问
  2. 完整性:保护数据不被篡改
  3. 可用性:保证服务持续可用
  4. 可追溯性:记录所有操作

安全架构的原则

  1. 最小权限原则:只授予必要的权限
  2. 纵深防御:多层防护
  3. 零信任:不信任任何内部网络
  4. 安全左移:在开发阶段就考虑安全

13.2 认证与授权

13.2.1 认证

基本认证

Python
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets

app = FastAPI()
security = HTTPBasic()

def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
    correct_username = secrets.compare_digest(credentials.username, "admin")
    correct_password = secrets.compare_digest(credentials.password, "secret")

    if not (correct_username and correct_password):
        raise HTTPException(status_code=401, detail="Incorrect username or password")

    return credentials.username

@app.get("/users/me")
def read_current_user(username: str = Depends(get_current_user)):
    return {"username": username}

JWT认证

Python
import jwt
from datetime import datetime, timedelta, timezone

# ⚠️ 仅用于本地学习演示!生产环境必须从环境变量或密钥管理服务获取
import os
SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "your-secret-key-FOR-DEV-ONLY")
ALGORITHM = "HS256"

def create_access_token(data: dict):
    to_encode = data.copy()
    # datetime.utcnow() 在 Python 3.12 中已弃用,使用 timezone-aware 的 UTC 时间
    expire = datetime.now(timezone.utc) + timedelta(minutes=30)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str):
    try:  # try/except捕获异常
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.InvalidTokenError:
        return None

# 使用示例
token = create_access_token({"sub": "user123"})
payload = verify_token(token)

OAuth2

Python
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user(token: str = Depends(oauth2_scheme)):
    # 验证token
    payload = verify_token(token)
    if not payload:
        raise HTTPException(status_code=401, detail="Invalid token")
    return payload

@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)):
    return current_user

13.2.2 授权

RBAC(基于角色的访问控制)

Python
from functools import wraps

# 角色定义
ROLES = {
    "admin": ["read", "write", "delete"],
    "user": ["read"],
    "guest": []
}

def require_role(role: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            current_user = kwargs.get('current_user')
            if not current_user:
                raise HTTPException(status_code=401, detail="Not authenticated")

            user_role = current_user.get('role')
            if user_role != role:
                raise HTTPException(status_code=403, detail="Insufficient permissions")

            return await func(*args, **kwargs)
        return wrapper
    return decorator

# 使用示例
@app.get("/admin")
@require_role("admin")
async def admin_endpoint(current_user: dict = Depends(get_current_user)):
    return {"message": "Admin access"}

ABAC(基于属性的访问控制)

Python
def require_permission(permission: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
            current_user = kwargs.get('current_user')
            if not current_user:
                raise HTTPException(status_code=401, detail="Not authenticated")

            user_permissions = current_user.get('permissions', [])
            if permission not in user_permissions:
                raise HTTPException(status_code=403, detail="Insufficient permissions")

            return await func(*args, **kwargs)
        return wrapper
    return decorator

# 使用示例
@app.get("/users")
@require_permission("user:read")
async def read_users(current_user: dict = Depends(get_current_user)):
    return {"users": []}

13.3 数据加密

13.3.1 对称加密

Python
from cryptography.fernet import Fernet

# 生成密钥
key = Fernet.generate_key()
cipher_suite = Fernet(key)

# 加密
message = "Hello, World!"
encrypted_message = cipher_suite.encrypt(message.encode())

# 解密
decrypted_message = cipher_suite.decrypt(encrypted_message).decode()

13.3.2 非对称加密

Python
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes

# 生成密钥对
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048
)
public_key = private_key.public_key()

# 加密
message = "Hello, World!"
encrypted_message = public_key.encrypt(
    message.encode(),
    padding.OAEP(
        mgf=padding.MGF1(algorithm=hashes.SHA256()),
        algorithm=hashes.SHA256(),
        label=None
    )
)

# 解密
decrypted_message = private_key.decrypt(
    encrypted_message,
    padding.OAEP(
        mgf=padding.MGF1(algorithm=hashes.SHA256()),
        algorithm=hashes.SHA256(),
        label=None
    )
).decode()

13.3.3 哈希

Python
import hashlib

# SHA256
message = "Hello, World!"
hash_value = hashlib.sha256(message.encode()).hexdigest()

# 密码哈希(使用bcrypt)
import bcrypt

password = "mypassword".encode()
salt = bcrypt.gensalt()
hashed_password = bcrypt.hashpw(password, salt)

# 验证密码
if bcrypt.checkpw(password, hashed_password):
    print("Password is correct")

13.4 API安全

13.4.1 HTTPS

Python
from fastapi import FastAPI
import uvicorn

app = FastAPI()

@app.get("/")
def read_root():
    return {"message": "Hello, World!"}

# 启动HTTPS服务器
if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=443,
        ssl_keyfile="key.pem",
        ssl_certfile="cert.pem"
    )

13.4.2 输入验证

Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator  # Pydantic数据验证模型

app = FastAPI()

class UserCreate(BaseModel):
    username: str
    email: str
    password: str

    @validator('email')
    def validate_email(cls, v):
        if '@' not in v:
            raise ValueError('Invalid email')
        return v

    @validator('password')
    def validate_password(cls, v):
        if len(v) < 8:
            raise ValueError('Password must be at least 8 characters')
        return v

@app.post("/users")
def create_user(user: UserCreate):
    # 处理用户创建
    return {"message": "User created"}

13.4.3 速率限制

Python
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.get("/api/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
    return {"data": []}

13.5 安全防护

13.5.1 SQL注入防护

Python
# 不安全的代码
# query = f"SELECT * FROM users WHERE username = '{username}'"

# 安全的代码
query = "SELECT * FROM users WHERE username = %s"
cursor.execute(query, (username,))

13.5.2 XSS防护

Python
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from markupsafe import escape

app = FastAPI()
templates = Jinja2Templates(directory="templates")

@app.get("/search")
def search(request: Request, q: str):
    # 转义用户输入
    safe_query = escape(q)
    return templates.TemplateResponse(
        "search.html",
        {"request": request, "query": safe_query}
    )

13.5.3 CSRF防护

Python
from fastapi import FastAPI, Depends, HTTPException, Request
# 需安装: pip install fastapi-csrf-protect
from fastapi_csrf_protect import CsrfProtect

app = FastAPI()

class CsrfSettings:
    # ⚠️ 仅用于演示,生产环境必须使用环境变量
    secret_key: str = os.environ.get("CSRF_SECRET_KEY", "your-secret-key-FOR-DEV-ONLY")

@CsrfProtect.load_config
def get_csrf_config():
    return CsrfSettings()

@app.post("/form")
async def handle_form(request: Request, csrf_protect: CsrfProtect = Depends()):  # async def定义异步函数;用await调用
    await csrf_protect.validate_csrf(request)  # await等待异步操作完成
    # 处理表单
    return {"message": "Form submitted"}

13.6 实战练习

练习1:实现一个认证系统

实现一个认证系统,包括: 1. 用户注册 2. 用户登录 3. JWT认证 4. 权限控制

练习2:实现一个API安全方案

实现一个API安全方案,包括: 1. HTTPS 2. 输入验证 3. 速率限制 4. 日志审计

练习3:实现一个数据加密方案

实现一个数据加密方案,包括: 1. 对称加密 2. 非对称加密 3. 密码哈希 4. 密钥管理

13.7 面试准备

常见面试题

  1. 什么是认证?什么是授权?
  2. JWT的工作原理是什么?
  3. 如何防止SQL注入?
  4. 如何防止XSS攻击?
  5. 什么是零信任架构?

项目经验准备

准备一个安全项目: - 使用的安全技术 - 遇到的挑战 - 解决方案 - 项目成果

13.8 总结

本章介绍了安全架构,包括认证授权、数据加密、API安全和安全防护。安全是系统设计的重要考虑因素。

关键要点

  1. 安全架构包括认证、授权、加密、防护
  2. 认证包括基本认证、JWT、OAuth2
  3. 授权包括RBAC、ABAC
  4. 数据加密包括对称加密、非对称加密、哈希
  5. 安全防护包括SQL注入、XSS、CSRF防护

下一步

下一章将深入学习性能优化,包括性能分析、优化策略等内容。