跳转至

📖 第05章 API安全

API安全

📚 章节概述

本章将深入讲解API安全,包括API网关、速率限制、认证、安全测试等。通过本章学习,你将能够保护API安全。

🎯 学习目标

完成本章后,你将能够:

  1. 理解API安全的核心概念
  2. 掌握API网关的使用
  3. 了解速率限制技术
  4. 掌握API认证方法
  5. 能够保护API安全

5.1 API安全概述

5.1.1 什么是API安全

API安全是保护API免受未授权访问、滥用和攻击的实践。

API安全威胁

  1. 未授权访问
  2. 缺少认证
  3. 弱认证机制
  4. 会话管理不当

  5. 数据泄露

  6. 过度暴露数据
  7. 敏感信息泄露
  8. 错误信息泄露

  9. 滥用攻击

  10. DDoS攻击
  11. 暴力破解
  12. 速率限制绕过

5.1.2 API安全最佳实践

  1. 认证
  2. 强认证机制
  3. OAuth2/JWT
  4. 多因素认证

  5. 授权

  6. RBAC/ABAC
  7. 最小权限
  8. 权限审查

  9. 速率限制

  10. 防止滥用
  11. 保护资源
  12. 公平使用

5.2 API网关

5.2.1 API网关概述

API网关是管理、保护和路由API请求的中间层。

API网关功能

  1. 路由
  2. 请求路由
  3. 负载均衡
  4. 服务发现

  5. 安全

  6. 认证授权
  7. 速率限制
  8. WAF防护

5.2.2 API网关实现

Python
from flask import Flask, request, jsonify
from functools import wraps
import time

app = Flask(__name__)

class APIGateway:
    """API网关类"""

    def __init__(self):
        self.rate_limits = {}
        self.blocked_ips = set()

    def rate_limit(self, max_requests=100, window=60):
        """速率限制装饰器"""
        def decorator(f):
            @wraps(f)
            def wrapper(*args, **kwargs):
                ip = request.remote_addr

                if ip in self.blocked_ips:
                    return jsonify({'error': 'IP blocked'}), 403

                current_time = time.time()

                if ip not in self.rate_limits:
                    self.rate_limits[ip] = []

                # 清理过期请求
                self.rate_limits[ip] = [
                    req_time for req_time in self.rate_limits[ip]
                    if current_time - req_time < window
                ]

                # 检查速率限制
                if len(self.rate_limits[ip]) >= max_requests:
                    return jsonify({'error': 'Rate limit exceeded'}), 429

                # 记录请求
                self.rate_limits[ip].append(current_time)

                return f(*args, **kwargs)

            return wrapper
        return decorator

    def block_ip(self, ip):
        """阻止IP"""
        self.blocked_ips.add(ip)

    def unblock_ip(self, ip):
        """解除阻止IP"""
        if ip in self.blocked_ips:
            self.blocked_ips.remove(ip)

# 使用API网关
gateway = APIGateway()

@app.route('/api/users', methods=['GET'])
@gateway.rate_limit(max_requests=10, window=60)
def get_users():
    """获取用户列表"""
    # 实际的业务逻辑
    return jsonify({'users': []})

@app.route('/api/users', methods=['POST'])
@gateway.rate_limit(max_requests=5, window=60)
def create_user():
    """创建用户"""
    # 实际的业务逻辑
    return jsonify({'message': 'User created'})

5.3 速率限制

5.3.1 速率限制概述

速率限制是控制API请求频率的技术。

速率限制策略

  1. 固定窗口
  2. 固定时间窗口
  3. 简单实现
  4. 可能突发

  5. 滑动窗口

  6. 滑动时间窗口
  7. 更平滑
  8. 更复杂

  9. 令牌桶

  10. 令牌桶算法
  11. 平滑限流
  12. 广泛使用

5.3.2 速率限制实现

Python
import time
from collections import deque

class RateLimiter:
    """速率限制器"""

    def __init__(self, max_requests=100, window=60):
        self.max_requests = max_requests
        self.window = window
        self.requests = {}

    def is_allowed(self, identifier):
        """检查是否允许请求"""
        current_time = time.time()

        if identifier not in self.requests:
            self.requests[identifier] = deque()

        # 清理过期请求
        while self.requests[identifier] and current_time - self.requests[identifier][0] > self.window:
            self.requests[identifier].popleft()

        # 检查是否超过限制
        if len(self.requests[identifier]) >= self.max_requests:
            return False

        # 记录请求
        self.requests[identifier].append(current_time)
        return True

    def get_remaining(self, identifier):
        """获取剩余请求数"""
        if identifier not in self.requests:
            return self.max_requests

        return self.max_requests - len(self.requests[identifier])

    def get_retry_after(self, identifier):
        """获取重试时间"""
        if identifier not in self.requests or not self.requests[identifier]:
            return 0

        oldest_request = self.requests[identifier][0]
        current_time = time.time()
        return int(self.window - (current_time - oldest_request))

5.4 API认证

5.4.1 API认证方法

  1. API Key
  2. 简单易用
  3. 适合内部API
  4. 需要安全管理

  5. OAuth2

  6. 标准协议
  7. 适合第三方集成
  8. 复杂实现

  9. JWT

  10. 无状态
  11. 适合分布式系统
  12. 需要密钥管理

5.4.2 API认证实现

Python
from flask import Flask, request, jsonify
from datetime import datetime, timedelta, timezone
import jwt
from functools import wraps

app = Flask(__name__)
# ⚠️ 仅用于本地学习演示!生产环境必须从环境变量获取
import os
app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'your-secret-key-FOR-DEV-ONLY')

class APIAuth:
    """API认证类"""

    def __init__(self, secret_key):
        self.secret_key = secret_key

    def generate_token(self, user_id):
        """生成JWT令牌"""
        payload = {
            'user_id': user_id,
            'exp': datetime.now(timezone.utc) + timedelta(hours=24)
        }

        token = jwt.encode(payload, self.secret_key, algorithm='HS256')
        return token

    def verify_token(self, token):
        """验证JWT令牌"""
        try:  # try/except捕获异常
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None

    def require_auth(self, f):
        """认证装饰器"""
        @wraps(f)
        def wrapper(*args, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
            token = request.headers.get('Authorization')

            if not token:
                return jsonify({'error': 'Missing token'}), 401

            if not token.startswith('Bearer '):
                return jsonify({'error': 'Invalid token format'}), 401

            token = token.split(' ')[1]
            payload = self.verify_token(token)

            if not payload:
                return jsonify({'error': 'Invalid token'}), 401

            request.user = payload
            return f(*args, **kwargs)

        return wrapper

# 使用API认证
auth = APIAuth(app.secret_key)

@app.route('/api/login', methods=['POST'])
def login():
    """用户登录"""
    data = request.json
    # 实际的认证逻辑
    user_id = 1  # 假设认证成功

    token = auth.generate_token(user_id)

    return jsonify({'token': token})

@app.route('/api/protected', methods=['GET'])
@auth.require_auth
def protected():
    """受保护的API"""
    return jsonify({'message': 'Access granted', 'user_id': request.user['user_id']})

5.5 API安全测试

5.5.1 API安全测试工具

  1. Postman
  2. API测试
  3. 自动化测试
  4. 集合管理

  5. OWASP ZAP

  6. 安全扫描
  7. 漏洞检测
  8. 报告生成

5.5.2 API安全测试实践

Python
import requests
import json

class APISecurityTester:
    """API安全测试器"""

    def __init__(self, base_url):
        self.base_url = base_url
        self.vulnerabilities = []

    def test_authentication(self):
        """测试认证"""
        print("Testing authentication...")

        # 测试无认证
        response = requests.get(f"{self.base_url}/api/users")
        if response.status_code == 200:
            self.vulnerabilities.append({
                'type': 'Missing Authentication',
                'endpoint': '/api/users',
                'severity': 'High'
            })

        # 测试弱认证
        response = requests.get(
            f"{self.base_url}/api/users",
            headers={'Authorization': 'Bearer weak-token'}
        )
        if response.status_code == 200:
            self.vulnerabilities.append({
                'type': 'Weak Authentication',
                'endpoint': '/api/users',
                'severity': 'High'
            })

    def test_rate_limiting(self):
        """测试速率限制"""
        print("Testing rate limiting...")

        # 快速发送多个请求
        responses = []
        for i in range(20):
            response = requests.get(f"{self.base_url}/api/users")
            responses.append(response.status_code)

        # 检查是否被限制
        if 429 not in responses:
            self.vulnerabilities.append({
                'type': 'Missing Rate Limiting',
                'endpoint': '/api/users',
                'severity': 'Medium'
            })

    def test_input_validation(self):
        """测试输入验证"""
        print("Testing input validation...")

        # 测试SQL注入
        malicious_inputs = [
            "' OR '1'='1",
            "'; DROP TABLE users--",
            "<script>alert('XSS')</script>"
        ]

        for malicious_input in malicious_inputs:
            response = requests.get(
                f"{self.base_url}/api/search",
                params={'q': malicious_input}
            )

            if response.status_code == 200 and malicious_input in response.text:
                self.vulnerabilities.append({
                    'type': 'Input Validation',
                    'endpoint': '/api/search',
                    'payload': malicious_input,
                    'severity': 'High'
                })

    def generate_report(self):
        """生成报告"""
        return {
            'base_url': self.base_url,
            'total_vulnerabilities': len(self.vulnerabilities),
            'vulnerabilities': self.vulnerabilities
        }

5.6 练习题

基础题

  1. 选择题
  2. API安全威胁不包括什么?

    • A. 未授权访问
    • B. 数据泄露
    • C. 滥用攻击
    • D. 性能优化
  3. 简答题

  4. 解释API安全的核心概念。
  5. 说明API网关的功能。

进阶题

  1. 实践题
  2. 搭建API网关。
  3. 实施速率限制。
  4. 实施API认证。

  5. 设计题

  6. 设计API安全架构。
  7. 设计API安全测试流程。

答案

1. 选择题答案

  1. D(API安全威胁不包括性能优化)

2. 简答题答案

API安全的核心概念: - 未授权访问、数据泄露、滥用攻击

API网关的功能: - 路由、安全、监控

3. 实践题答案

参见5.2-5.4节的示例。

4. 设计题答案

参见5.1-5.5节的设计方案。

5.7 面试准备

大厂面试题

字节跳动

  1. 解释API安全的核心概念。
  2. API网关的功能是什么?
  3. 如何实施速率限制?
  4. 如何设计API认证?

腾讯

  1. API安全的最佳实践是什么?
  2. 如何设计API网关?
  3. 如何设计API授权?
  4. 如何设计API监控?

阿里云

  1. API安全的挑战是什么?
  2. 如何设计API安全架构?
  3. 如何处理API滥用?
  4. 如何设计API文档?

📚 参考资料

🎯 本章小结

本章深入讲解了API安全,包括:

  1. API安全的核心概念
  2. API网关的使用
  3. 速率限制技术
  4. API认证方法

通过本章学习,你掌握了API安全的核心技术,能够保护API安全。下一章将深入学习安全合规。