01 - Web安全¶
目标: 掌握Web安全知识,能防范常见攻击
时间: 2周
核心原则: 安全是设计出来的,不是后期添加的
🎯 为什么安全很重要?¶
真实案例¶
Text Only
案例1: SQL注入
- 某电商网站被注入,数据库被拖库
- 损失:数百万用户信息泄露
案例2: XSS攻击
- 某社交平台被XSS,用户Cookie被盗
- 损失:大量账号被盗用
案例3: CSRF攻击
- 某银行网站被CSRF,用户资金被转走
- 损失:巨额资金损失
📚 OWASP Top 10¶
1. 注入攻击(Injection)¶
Python
# ❌ 不安全的代码 - SQL注入
@app.get("/user")
def get_user(username: str):
query = f"SELECT * FROM users WHERE username = '{username}'"
# 攻击:username = "' OR '1'='1"
# 结果:query = "SELECT * FROM users WHERE username = '' OR '1'='1'"
# 返回所有用户!
# ✅ 安全的代码 - 参数化查询
@app.get("/user")
def get_user(username: str):
query = "SELECT * FROM users WHERE username = %s"
cursor.execute(query, (username,)) # 参数化查询
2. 失效的身份认证¶
Python
# ❌ 不安全的认证
# 1. 明文存储密码
# 2. 弱密码策略
# 3. 会话不过期
# 4. 没有登录失败限制
# ✅ 安全的认证
from passlib.context import CryptContext
import jwt
from datetime import datetime, timedelta, timezone
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 密码哈希存储
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# JWT Token
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
# 登录限制(防止暴力破解)
from functools import wraps
from collections import defaultdict # defaultdict带默认值的字典,避免KeyError
import time
login_attempts = defaultdict(list)
def rate_limit(max_attempts: int = 5, window: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs): # *args接收任意位置参数;**kwargs接收任意关键字参数
# 获取IP地址
ip = kwargs.get('request').client.host
now = time.time()
# 清理过期记录
login_attempts[ip] = [t for t in login_attempts[ip] if now - t < window]
if len(login_attempts[ip]) >= max_attempts:
raise HTTPException(status_code=429, detail="Too many login attempts")
login_attempts[ip].append(now)
return await func(*args, **kwargs)
return wrapper
return decorator
@app.post("/login")
@rate_limit(max_attempts=5, window=300) # 5分钟内最多5次尝试
def login(credentials: LoginCredentials):
# 验证逻辑
pass
3. 敏感数据泄露¶
Python
# ❌ 不安全的数据处理
# 1. 日志中记录敏感信息
# 2. 返回完整的错误信息给客户端
# 3. 使用HTTP而不是HTTPS
# ✅ 安全的数据处理
import logging
from fastapi import HTTPException
# 配置日志,不记录敏感信息
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@app.post("/payment")
def process_payment(payment_info: PaymentInfo):
try: # try/except捕获异常
# 处理支付
logger.info(f"Processing payment for user: {payment_info.user_id}")
# 不要记录信用卡号!
except Exception as e:
# 不要返回详细错误给客户端
logger.error(f"Payment failed: {str(e)}") # 详细错误记录到日志
raise HTTPException(status_code=500, detail="Payment processing failed")
# HTTPS强制
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
app.add_middleware(HTTPSRedirectMiddleware)
# 安全响应头
@app.middleware("http")
async def add_security_headers(request, call_next): # async def定义异步函数;用await调用
response = await call_next(request) # await等待异步操作完成
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
4. XSS(跨站脚本攻击)¶
Python
# ❌ 不安全的代码
@app.post("/comment")
def add_comment(comment: str):
# 直接存储用户输入
save_to_db(comment)
# 在页面显示时:{{ comment }} - 如果包含<script>alert('xss')</script>,就会执行
# ✅ 安全的代码
import nh3
from html import escape
@app.post("/comment")
def add_comment(comment: str):
# 方案1: 转义HTML
safe_comment = escape(comment)
# 方案2: 使用 nh3 清理HTML(bleach 已于2023年废弃,nh3 是推荐替代)
safe_comment = nh3.clean(
comment,
tags={"p", "br", "strong", "em"},
attributes={},
strip_comments=True,
)
save_to_db(safe_comment)
# 前端也需要防护
# React/Vue 默认会转义内容,但使用 dangerouslySetInnerHTML/v-html 时要小心
5. CSRF(跨站请求伪造)¶
Python
# ❌ 不安全的代码
@app.post("/transfer")
def transfer_money(to_account: str, amount: float):
# 直接执行转账,不验证请求来源
execute_transfer(to_account, amount)
# ✅ 安全的代码
from fastapi import Request, Cookie
import secrets
# CSRF Token
@app.get("/")
def get_form(csrf_token: str = Cookie(None)):
if not csrf_token:
csrf_token = secrets.token_urlsafe(32)
return {"csrf_token": csrf_token}
@app.post("/transfer")
def transfer_money(
request: Request,
to_account: str,
amount: float,
csrf_token: str = Header(...),
csrf_cookie: str = Cookie(...)
):
# 验证CSRF Token
if csrf_token != csrf_cookie:
raise HTTPException(status_code=403, detail="Invalid CSRF token")
# 验证Referer
referer = request.headers.get("referer")
if not referer or not referer.startswith("https://yourdomain.com"):
raise HTTPException(status_code=403, detail="Invalid referer")
execute_transfer(to_account, amount)
# 更好的方案:使用SameSite Cookie
response.set_cookie(
key="session_id",
value=session_id,
httponly=True,
secure=True,
samesite="Strict" # 或 "Lax"
)
✅ 安全开发清单¶
开发前¶
- 进行威胁建模
- 定义安全需求
- 选择安全的框架和库
开发中¶
- 输入验证(白名单)
- 输出编码
- 参数化查询
- 安全的认证和授权
- 最小权限原则
开发后¶
- 代码审计
- 安全测试(渗透测试)
- 依赖漏洞扫描
- 日志审查
📚 推荐资源¶
书籍¶
- 《Web安全深度剖析》
- 《黑客攻防技术宝典:Web实战篇》
- 《OWASP Testing Guide》
在线资源¶
- OWASP官网
- HackerOne(漏洞赏金平台)
- PortSwigger Web Security Academy
记住:安全是每个人的责任,不是安全团队的事! 🔒