跳转至

01 - Web安全

目标: 掌握Web安全知识,能防范常见攻击

时间: 2周

核心原则: 安全是设计出来的,不是后期添加的


🎯 为什么安全很重要?

真实案例

Text Only
案例1: SQL注入
- 某电商网站被注入,数据库被拖库
- 损失:数百万用户信息泄露

案例2: XSS攻击
- 某社交平台被XSS,用户Cookie被盗
- 损失:大量账号被盗用

案例3: CSRF攻击
- 某银行网站被CSRF,用户资金被转走
- 损失:巨额资金损失

📚 OWASP Top 10

1. 注入攻击(Injection)

Python
# ❌ 不安全的代码 - SQL注入
@app.get("/user")
def get_user(username: str):
    query = f"SELECT * FROM users WHERE username = '{username}'"
    # 攻击:username = "' OR '1'='1"
    # 结果:query = "SELECT * FROM users WHERE username = '' OR '1'='1'"
    # 返回所有用户!

# ✅ 安全的代码 - 参数化查询
@app.get("/user")
def get_user(username: str):
    query = "SELECT * FROM users WHERE username = %s"
    cursor.execute(query, (username,))  # 参数化查询

2. 失效的身份认证

Python
# ❌ 不安全的认证
# 1. 明文存储密码
# 2. 弱密码策略
# 3. 会话不过期
# 4. 没有登录失败限制

# ✅ 安全的认证
from passlib.context import CryptContext
import jwt
from datetime import datetime, timedelta, timezone

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 密码哈希存储
def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

# JWT Token
def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")

# 登录限制(防止暴力破解)
from functools import wraps
from collections import defaultdict  # defaultdict带默认值的字典,避免KeyError
import time

login_attempts = defaultdict(list)

def rate_limit(max_attempts: int = 5, window: int = 300):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
            # 获取IP地址
            ip = kwargs.get('request').client.host
            now = time.time()

            # 清理过期记录
            login_attempts[ip] = [t for t in login_attempts[ip] if now - t < window]

            if len(login_attempts[ip]) >= max_attempts:
                raise HTTPException(status_code=429, detail="Too many login attempts")

            login_attempts[ip].append(now)
            return await func(*args, **kwargs)
        return wrapper
    return decorator

@app.post("/login")
@rate_limit(max_attempts=5, window=300)  # 5分钟内最多5次尝试
def login(credentials: LoginCredentials):
    # 验证逻辑
    pass

3. 敏感数据泄露

Python
# ❌ 不安全的数据处理
# 1. 日志中记录敏感信息
# 2. 返回完整的错误信息给客户端
# 3. 使用HTTP而不是HTTPS

# ✅ 安全的数据处理
import logging
from fastapi import HTTPException

# 配置日志,不记录敏感信息
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

@app.post("/payment")
def process_payment(payment_info: PaymentInfo):
    try:  # try/except捕获异常
        # 处理支付
        logger.info(f"Processing payment for user: {payment_info.user_id}")
        # 不要记录信用卡号!
    except Exception as e:
        # 不要返回详细错误给客户端
        logger.error(f"Payment failed: {str(e)}")  # 详细错误记录到日志
        raise HTTPException(status_code=500, detail="Payment processing failed")

# HTTPS强制
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

app.add_middleware(HTTPSRedirectMiddleware)

# 安全响应头
@app.middleware("http")
async def add_security_headers(request, call_next):  # async def定义异步函数;用await调用
    response = await call_next(request)  # await等待异步操作完成
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    return response

4. XSS(跨站脚本攻击)

Python
# ❌ 不安全的代码
@app.post("/comment")
def add_comment(comment: str):
    # 直接存储用户输入
    save_to_db(comment)
    # 在页面显示时:{{ comment }} - 如果包含<script>alert('xss')</script>,就会执行

# ✅ 安全的代码
import nh3
from html import escape

@app.post("/comment")
def add_comment(comment: str):
    # 方案1: 转义HTML
    safe_comment = escape(comment)

    # 方案2: 使用 nh3 清理HTML(bleach 已于2023年废弃,nh3 是推荐替代)
    safe_comment = nh3.clean(
        comment,
        tags={"p", "br", "strong", "em"},
        attributes={},
        strip_comments=True,
    )

    save_to_db(safe_comment)

# 前端也需要防护
# React/Vue 默认会转义内容,但使用 dangerouslySetInnerHTML/v-html 时要小心

5. CSRF(跨站请求伪造)

Python
# ❌ 不安全的代码
@app.post("/transfer")
def transfer_money(to_account: str, amount: float):
    # 直接执行转账,不验证请求来源
    execute_transfer(to_account, amount)

# ✅ 安全的代码
from fastapi import Request, Cookie
import secrets

# CSRF Token
@app.get("/")
def get_form(csrf_token: str = Cookie(None)):
    if not csrf_token:
        csrf_token = secrets.token_urlsafe(32)
    return {"csrf_token": csrf_token}

@app.post("/transfer")
def transfer_money(
    request: Request,
    to_account: str,
    amount: float,
    csrf_token: str = Header(...),
    csrf_cookie: str = Cookie(...)
):
    # 验证CSRF Token
    if csrf_token != csrf_cookie:
        raise HTTPException(status_code=403, detail="Invalid CSRF token")

    # 验证Referer
    referer = request.headers.get("referer")
    if not referer or not referer.startswith("https://yourdomain.com"):
        raise HTTPException(status_code=403, detail="Invalid referer")

    execute_transfer(to_account, amount)

# 更好的方案:使用SameSite Cookie
response.set_cookie(
    key="session_id",
    value=session_id,
    httponly=True,
    secure=True,
    samesite="Strict"  # 或 "Lax"
)

✅ 安全开发清单

开发前

  • 进行威胁建模
  • 定义安全需求
  • 选择安全的框架和库

开发中

  • 输入验证(白名单)
  • 输出编码
  • 参数化查询
  • 安全的认证和授权
  • 最小权限原则

开发后

  • 代码审计
  • 安全测试(渗透测试)
  • 依赖漏洞扫描
  • 日志审查

📚 推荐资源

书籍

  • 《Web安全深度剖析》
  • 《黑客攻防技术宝典:Web实战篇》
  • 《OWASP Testing Guide》

在线资源

  • OWASP官网
  • HackerOne(漏洞赏金平台)
  • PortSwigger Web Security Academy

记住:安全是每个人的责任,不是安全团队的事! 🔒