跳转至

📖 第 05 章 API 安全

API 安全

图源:项目内既有威胁建模示意图(人工复核:未见 AI 生成痕迹)。该图适合帮助读者从接口、数据流、外部系统调用与缓解措施的角度理解 API 安全。

📚 章节概述

本章将深入讲解 API 安全,包括 API 网关、速率限制、认证、安全测试等。通过本章学习,你将能够保护 API 安全。

🎯 学习目标

完成本章后,你将能够:

  1. 理解 API 安全的核心概念
  2. 掌握 API 网关的使用
  3. 了解速率限制技术
  4. 掌握 API 认证方法
  5. 能够保护 API 安全

5.1 API 安全概述

5.1.1 什么是 API 安全

API 安全是保护 API 免受未授权访问、滥用和攻击的实践。

API 安全威胁

  1. 未授权访问
  2. 缺少认证
  3. 弱认证机制
  4. 会话管理不当

  5. 数据泄露

  6. 过度暴露数据
  7. 敏感信息泄露
  8. 错误信息泄露

  9. 滥用攻击

  10. DDoS 攻击
  11. 暴力破解
  12. 速率限制绕过

5.1.2 API 安全最佳实践

  1. 认证
  2. 强认证机制
  3. OAuth2/JWT
  4. 多因素认证

  5. 授权

  6. RBAC/ABAC
  7. 最小权限
  8. 权限审查

  9. 速率限制

  10. 防止滥用
  11. 保护资源
  12. 公平使用

5.2 API 网关

5.2.1 API 网关概述

API 网关是管理、保护和路由 API 请求的中间层。

API 网关功能

  1. 路由
  2. 请求路由
  3. 负载均衡
  4. 服务发现

  5. 安全

  6. 认证授权
  7. 速率限制
  8. WAF 防护

5.2.2 API 网关实现

Python
from flask import Flask, request, jsonify
from functools import wraps
import time

app = Flask(__name__)

class APIGateway:
    """API网关类"""

    def __init__(self):
        self.rate_limits = {}
        self.blocked_ips = set()

    def rate_limit(self, max_requests=100, window=60):
        """速率限制装饰器"""
        def decorator(f):
            @wraps(f)
            def wrapper(*args, **kwargs):
                ip = request.remote_addr

                if ip in self.blocked_ips:
                    return jsonify({'error': 'IP blocked'}), 403

                current_time = time.time()

                if ip not in self.rate_limits:
                    self.rate_limits[ip] = []

                # 清理过期请求
                self.rate_limits[ip] = [
                    req_time for req_time in self.rate_limits[ip]
                    if current_time - req_time < window
                ]

                # 检查速率限制
                if len(self.rate_limits[ip]) >= max_requests:
                    return jsonify({'error': 'Rate limit exceeded'}), 429

                # 记录请求
                self.rate_limits[ip].append(current_time)

                return f(*args, **kwargs)

            return wrapper
        return decorator

    def block_ip(self, ip):
        """阻止IP"""
        self.blocked_ips.add(ip)

    def unblock_ip(self, ip):
        """解除阻止IP"""
        if ip in self.blocked_ips:
            self.blocked_ips.remove(ip)

# 使用API网关
gateway = APIGateway()

@app.route('/api/users', methods=['GET'])
@gateway.rate_limit(max_requests=10, window=60)
def get_users():
    """获取用户列表"""
    # 实际的业务逻辑
    return jsonify({'users': []})

@app.route('/api/users', methods=['POST'])
@gateway.rate_limit(max_requests=5, window=60)
def create_user():
    """创建用户"""
    # 实际的业务逻辑
    return jsonify({'message': 'User created'})

5.3 速率限制

5.3.1 速率限制概述

速率限制是控制 API 请求频率的技术。

速率限制策略

  1. 固定窗口
  2. 固定时间窗口
  3. 简单实现
  4. 可能突发

  5. 滑动窗口

  6. 滑动时间窗口
  7. 更平滑
  8. 更复杂

  9. 令牌桶

  10. 令牌桶算法
  11. 平滑限流
  12. 广泛使用

5.3.2 速率限制实现

Python
import time
from collections import deque

class RateLimiter:
    """速率限制器"""

    def __init__(self, max_requests=100, window=60):
        self.max_requests = max_requests
        self.window = window
        self.requests = {}

    def is_allowed(self, identifier):
        """检查是否允许请求"""
        current_time = time.time()

        if identifier not in self.requests:
            self.requests[identifier] = deque()

        # 清理过期请求
        while self.requests[identifier] and current_time - self.requests[identifier][0] > self.window:
            self.requests[identifier].popleft()

        # 检查是否超过限制
        if len(self.requests[identifier]) >= self.max_requests:
            return False

        # 记录请求
        self.requests[identifier].append(current_time)
        return True

    def get_remaining(self, identifier):
        """获取剩余请求数"""
        if identifier not in self.requests:
            return self.max_requests

        return self.max_requests - len(self.requests[identifier])

    def get_retry_after(self, identifier):
        """获取重试时间"""
        if identifier not in self.requests or not self.requests[identifier]:
            return 0

        oldest_request = self.requests[identifier][0]
        current_time = time.time()
        return int(self.window - (current_time - oldest_request))

5.4 API 认证

5.4.1 API 认证方法

  1. API Key
  2. 简单易用
  3. 适合内部 API
  4. 需要安全管理

  5. OAuth2

  6. 标准协议
  7. 适合第三方集成
  8. 复杂实现

  9. JWT

  10. 无状态
  11. 适合分布式系统
  12. 需要密钥管理

5.4.2 API 认证实现

Python
from flask import Flask, request, jsonify
from datetime import datetime, timedelta, timezone
import jwt
from functools import wraps

app = Flask(__name__)
# ⚠️ 仅用于本地学习演示!生产环境必须从环境变量获取
import os
app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'your-secret-key-FOR-DEV-ONLY')

class APIAuth:
    """API认证类"""

    def __init__(self, secret_key):
        self.secret_key = secret_key

    def generate_token(self, user_id):
        """生成JWT令牌"""
        payload = {
            'user_id': user_id,
            'exp': datetime.now(timezone.utc) + timedelta(hours=24)
        }

        token = jwt.encode(payload, self.secret_key, algorithm='HS256')
        return token

    def verify_token(self, token):
        """验证JWT令牌"""
        try:  # try/except捕获异常
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None

    def require_auth(self, f):
        """认证装饰器"""
        @wraps(f)
        def wrapper(*args, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
            token = request.headers.get('Authorization')

            if not token:
                return jsonify({'error': 'Missing token'}), 401

            if not token.startswith('Bearer '):
                return jsonify({'error': 'Invalid token format'}), 401

            token = token.split(' ')[1]
            payload = self.verify_token(token)

            if not payload:
                return jsonify({'error': 'Invalid token'}), 401

            request.user = payload
            return f(*args, **kwargs)

        return wrapper

# 使用API认证
auth = APIAuth(app.secret_key)

@app.route('/api/login', methods=['POST'])
def login():
    """用户登录"""
    data = request.json
    # 实际的认证逻辑
    user_id = 1  # 假设认证成功

    token = auth.generate_token(user_id)

    return jsonify({'token': token})

@app.route('/api/protected', methods=['GET'])
@auth.require_auth
def protected():
    """受保护的API"""
    return jsonify({'message': 'Access granted', 'user_id': request.user['user_id']})

5.5 API 安全测试

5.5.1 API 安全测试工具

  1. Postman
  2. API 测试
  3. 自动化测试
  4. 集合管理

  5. OWASP ZAP

  6. 安全扫描
  7. 漏洞检测
  8. 报告生成

5.5.2 API 安全测试实践

Python
import requests
import json

class APISecurityTester:
    """API安全测试器"""

    def __init__(self, base_url):
        self.base_url = base_url
        self.vulnerabilities = []

    def test_authentication(self):
        """测试认证"""
        print("Testing authentication...")

        # 测试无认证
        response = requests.get(f"{self.base_url}/api/users")
        if response.status_code == 200:
            self.vulnerabilities.append({
                'type': 'Missing Authentication',
                'endpoint': '/api/users',
                'severity': 'High'
            })

        # 测试弱认证
        response = requests.get(
            f"{self.base_url}/api/users",
            headers={'Authorization': 'Bearer weak-token'}
        )
        if response.status_code == 200:
            self.vulnerabilities.append({
                'type': 'Weak Authentication',
                'endpoint': '/api/users',
                'severity': 'High'
            })

    def test_rate_limiting(self):
        """测试速率限制"""
        print("Testing rate limiting...")

        # 快速发送多个请求
        responses = []
        for i in range(20):
            response = requests.get(f"{self.base_url}/api/users")
            responses.append(response.status_code)

        # 检查是否被限制
        if 429 not in responses:
            self.vulnerabilities.append({
                'type': 'Missing Rate Limiting',
                'endpoint': '/api/users',
                'severity': 'Medium'
            })

    def test_input_validation(self):
        """测试输入验证"""
        print("Testing input validation...")

        # 测试SQL注入
        malicious_inputs = [
            "' OR '1'='1",
            "'; DROP TABLE users--",
            "<script>alert('XSS')</script>"
        ]

        for malicious_input in malicious_inputs:
            response = requests.get(
                f"{self.base_url}/api/search",
                params={'q': malicious_input}
            )

            if response.status_code == 200 and malicious_input in response.text:
                self.vulnerabilities.append({
                    'type': 'Input Validation',
                    'endpoint': '/api/search',
                    'payload': malicious_input,
                    'severity': 'High'
                })

    def generate_report(self):
        """生成报告"""
        return {
            'base_url': self.base_url,
            'total_vulnerabilities': len(self.vulnerabilities),
            'vulnerabilities': self.vulnerabilities
        }

5.6 练习题

基础题

  1. 选择题
  2. API 安全威胁不包括什么?

    • A. 未授权访问
    • B. 数据泄露
    • C. 滥用攻击
    • D. 性能优化
  3. 简答题

  4. 解释 API 安全的核心概念。
  5. 说明 API 网关的功能。

进阶题

  1. 实践题
  2. 搭建 API 网关。
  3. 实施速率限制。
  4. 实施 API 认证。

  5. 设计题

  6. 设计 API 安全架构。
  7. 设计 API 安全测试流程。

答案

1. 选择题答案

  1. D ( API 安全威胁不包括性能优化)

2. 简答题答案

API 安全的核心概念: - 未授权访问、数据泄露、滥用攻击

API 网关的功能: - 路由、安全、监控

3. 实践题答案

参见 5.2-5.4 节的示例。

4. 设计题答案

参见 5.1-5.5 节的设计方案。

5.7 面试准备

大厂面试题

字节跳动

  1. 解释 API 安全的核心概念。
  2. API 网关的功能是什么?
  3. 如何实施速率限制?
  4. 如何设计 API 认证?

腾讯

  1. API 安全的最佳实践是什么?
  2. 如何设计 API 网关?
  3. 如何设计 API 授权?
  4. 如何设计 API 监控?

阿里云

  1. API 安全的挑战是什么?
  2. 如何设计 API 安全架构?
  3. 如何处理 API 滥用?
  4. 如何设计 API 文档?

📚 参考资料

🎯 本章小结

本章深入讲解了 API 安全,包括:

  1. API 安全的核心概念
  2. API 网关的使用
  3. 速率限制技术
  4. API 认证方法

通过本章学习,你掌握了 API 安全的核心技术,能够保护 API 安全。下一章将深入学习安全合规。

⚠️ 核验说明(2026-03-26):本页已纳入 2026-03-26 全站统一复核批次。若文中涉及外部模型、API、版本号、价格或第三方产品名称,请以官方文档和实际运行环境为准。


最后更新日期: 2026-03-26