跳转至

10-实战项目案例

实战项目案例

📋 本章概览

本章将通过三个完整的实战项目,综合运用前面章节所学的数据库知识。每个项目都包含完整的需求分析、数据库设计、代码实现和优化建议,帮助你建立从理论到实践的完整能力。

项目列表: 1. 电商用户行为分析系统 - 数据仓库与分析型应用 2. 智能推荐系统 - 特征存储与实时推荐 3. MLOps模型管理平台 - 完整的机器学习生命周期管理

预计学习时间: 10-15小时

前置章节: 第09章:数据库与AI应用


项目一:电商用户行为分析系统

1.1 项目背景与需求

业务场景

构建一个电商平台的用户行为分析系统,支持实时数据摄取、离线分析和报表生成。

功能需求

  • 记录用户行为(浏览、点击、加购、购买)
  • 实时统计关键指标(PV、UV、转化率)
  • 用户画像分析
  • 商品销售分析
  • 支持时间维度的多维分析

技术选型

  • MySQL: 存储用户、商品等基础数据
  • ClickHouse: 存储行为日志,支持OLAP分析
  • Redis: 缓存实时统计结果
  • Python: 数据处理和分析

1.2 数据库设计

SQL
-- ==================== MySQL: 基础数据 ====================

-- 用户表
CREATE TABLE users (
    user_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) UNIQUE,
    phone VARCHAR(20),
    register_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    last_login_time DATETIME,
    user_level TINYINT DEFAULT 1 COMMENT '用户等级: 1-普通, 2-VIP, 3-SVIP',
    status TINYINT DEFAULT 1 COMMENT '状态: 0-禁用, 1-正常',
    INDEX idx_register_time (register_time),
    INDEX idx_user_level (user_level)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 商品表
CREATE TABLE products (
    product_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    product_name VARCHAR(200) NOT NULL,
    category_id INT,
    brand_id INT,
    price DECIMAL(10, 2) NOT NULL,
    stock INT DEFAULT 0,
    status TINYINT DEFAULT 1,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_category (category_id),
    INDEX idx_price (price),
    FULLTEXT INDEX idx_name (product_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 订单表
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    total_amount DECIMAL(12, 2) NOT NULL,
    status TINYINT COMMENT '0-待支付, 1-已支付, 2-已发货, 3-已完成, 4-已取消',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    paid_at DATETIME,
    INDEX idx_user_id (user_id),
    INDEX idx_created_at (created_at),
    INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 订单详情表
CREATE TABLE order_items (
    item_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    quantity INT NOT NULL,
    unit_price DECIMAL(10, 2) NOT NULL,
    total_price DECIMAL(10, 2) NOT NULL,
    INDEX idx_order_id (order_id),
    INDEX idx_product_id (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ==================== ClickHouse: 行为日志 ====================

-- 用户行为日志表
CREATE TABLE user_behavior_log (
    event_id UUID,
    user_id UInt64,
    session_id String,
    event_type Enum('view' = 1, 'click' = 2, 'cart' = 3, 'purchase' = 4),
    product_id UInt64,
    category_id UInt32,
    brand_id UInt32,
    price Decimal(10, 2),
    quantity UInt32,
    device_type Enum('pc' = 1, 'mobile' = 2, 'tablet' = 3),
    os String,
    browser String,
    ip_address IPv4,
    country String,
    city String,
    referrer String,
    event_time DateTime,
    event_date Date DEFAULT toDate(event_time)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_type, user_id)
TTL event_time + INTERVAL 1 YEAR;

-- 物化视图:实时统计
CREATE MATERIALIZED VIEW user_behavior_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, hour, event_type)
AS SELECT
    toDate(event_time) as event_date,
    toHour(event_time) as hour,
    event_type,
    count() as event_count,
    uniqExact(user_id) as unique_users,
    sum(price * quantity) as total_amount
FROM user_behavior_log
GROUP BY event_date, hour, event_type;  -- GROUP BY分组;HAVING过滤分组

1.3 核心代码实现

Python
"""
电商用户行为分析系统 - 核心实现
"""
# 需要安装: pip install clickhouse-driver
import json
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd
from sqlalchemy import create_engine, text
import redis
import clickhouse_driver

class ECommerceAnalytics:
    """电商分析系统主类"""

    def __init__(self):
        # MySQL连接
        self.mysql_engine = create_engine(
            'mysql+pymysql://user:pass@localhost/ecommerce'
        )

        # ClickHouse连接
        self.ch_client = clickhouse_driver.Client(
            host='localhost',
            database='ecommerce'
        )

        # Redis连接
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)

    def log_user_behavior(self, event: Dict):
        """
        记录用户行为

        双写策略:
        1. 写入ClickHouse用于分析
        2. 更新Redis实时统计
        """
        # 1. 写入ClickHouse
        self.ch_client.execute('''
            INSERT INTO user_behavior_log
            (event_id, user_id, session_id, event_type, product_id,
             category_id, brand_id, price, quantity, device_type,
             os, browser, ip_address, country, city, referrer, event_time)
            VALUES
        ''', [(
            event['event_id'],
            event['user_id'],
            event['session_id'],
            event['event_type'],
            event.get('product_id', 0),
            event.get('category_id', 0),
            event.get('brand_id', 0),
            event.get('price', 0),
            event.get('quantity', 0),
            event.get('device_type', 'pc'),
            event.get('os', ''),
            event.get('browser', ''),
            event.get('ip_address', '0.0.0.0'),
            event.get('country', ''),
            event.get('city', ''),
            event.get('referrer', ''),
            datetime.now()
        )])

        # 2. 更新Redis实时统计
        self._update_realtime_stats(event)

    def _update_realtime_stats(self, event: Dict):
        """更新实时统计(Redis)"""
        today = datetime.now().strftime('%Y-%m-%d')
        hour = datetime.now().strftime('%H')

        # 使用Redis Pipeline批量更新
        pipe = self.redis_client.pipeline()

        # 今日总PV
        pipe.incr(f'stats:{today}:pv')

        # 今日UV(使用HyperLogLog)
        pipe.pfadd(f'stats:{today}:uv', event['user_id'])

        # 事件类型统计
        event_type = event['event_type']
        pipe.hincrby(f'stats:{today}:events', event_type, 1)

        # 小时级统计
        pipe.hincrby(f'stats:{today}:{hour}:events', event_type, 1)

        # 设置过期时间(保留7天)
        pipe.expire(f'stats:{today}:pv', 7 * 24 * 3600)

        pipe.execute()

    def get_realtime_dashboard(self) -> Dict:
        """获取实时看板数据"""
        today = datetime.now().strftime('%Y-%m-%d')

        # 从Redis读取实时数据
        pv = int(self.redis_client.get(f'stats:{today}:pv') or 0)
        uv = self.redis_client.pfcount(f'stats:{today}:uv')
        events = self.redis_client.hgetall(f'stats:{today}:events')

        # 计算转化率
        views = int(events.get(b'view', 0))
        purchases = int(events.get(b'purchase', 0))
        conversion_rate = (purchases / views * 100) if views > 0 else 0

        return {
            'date': today,
            'pv': pv,
            'uv': uv,
            'events': {k.decode(): int(v) for k, v in events.items()},
            'conversion_rate': round(conversion_rate, 2)
        }

    def get_user_behavior_analysis(self,
                                   start_date: str,
                                   end_date: str) -> pd.DataFrame:
        """
        用户行为分析

        使用ClickHouse进行OLAP分析
        """
        query = '''
            SELECT
                event_date,
                event_type,
                count() as event_count,
                uniqExact(user_id) as unique_users,
                avg(price) as avg_price,
                sum(price * quantity) as total_revenue
            FROM user_behavior_log
            WHERE event_date BETWEEN %(start)s AND %(end)s
            GROUP BY event_date, event_type
            ORDER BY event_date, event_type
        '''

        return pd.read_sql(
            query,
            self.ch_client,
            params={'start': start_date, 'end': end_date}
        )

    def get_user_funnel_analysis(self, date: str) -> Dict:
        """
        用户漏斗分析

        计算从浏览到购买的转化漏斗
        """
        query = '''
            SELECT
                event_type,
                uniqExact(user_id) as user_count,
                count() as event_count
            FROM user_behavior_log
            WHERE event_date = %(date)s
            GROUP BY event_type
        '''

        result = self.ch_client.execute(query, {'date': date})

        # 构建漏斗数据
        funnel = {}
        for row in result:
            funnel[row[0]] = {
                'users': row[1],
                'events': row[2]
            }

        # 计算转化率
        views = funnel.get('view', {}).get('users', 0)
        clicks = funnel.get('click', {}).get('users', 0)
        carts = funnel.get('cart', {}).get('users', 0)
        purchases = funnel.get('purchase', {}).get('users', 0)

        return {
            'date': date,
            'funnel': {
                'view': views,
                'click': clicks,
                'cart': carts,
                'purchase': purchases
            },
            'conversion_rates': {
                'view_to_click': round(clicks / views * 100, 2) if views > 0 else 0,
                'click_to_cart': round(carts / clicks * 100, 2) if clicks > 0 else 0,
                'cart_to_purchase': round(purchases / carts * 100, 2) if carts > 0 else 0,
                'overall': round(purchases / views * 100, 2) if views > 0 else 0
            }
        }

    def get_user_rfm_analysis(self, analysis_date: str = None) -> pd.DataFrame:
        """
        RFM用户价值分析

        R: Recency - 最近一次购买距今天数
        F: Frequency - 购买频率
        M: Monetary - 购买金额
        """
        if analysis_date is None:
            analysis_date = datetime.now().strftime('%Y-%m-%d')

        query = '''
            SELECT
                user_id,
                dateDiff('day', max(event_time), %(date)s) as recency,
                count() as frequency,
                sum(price * quantity) as monetary
            FROM user_behavior_log
            WHERE event_type = 'purchase'
              AND event_time >= date_sub(%(date)s, 365)
            GROUP BY user_id
        '''

        df = pd.read_sql(
            query,
            self.ch_client,
            params={'date': analysis_date}
        )

        # RFM评分(使用分位数)
        df['R_score'] = pd.qcut(df['recency'], 5, labels=[5,4,3,2,1])
        df['F_score'] = pd.qcut(df['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
        df['M_score'] = pd.qcut(df['monetary'], 5, labels=[1,2,3,4,5])

        # 综合RFM得分
        df['RFM_score'] = (df['R_score'].astype(str) +
                          df['F_score'].astype(str) +
                          df['M_score'].astype(str))

        # 用户分层
        def segment_user(row):
            if row['RFM_score'] in ['555', '554', '544', '545', '454', '455', '445']:
                return '重要价值客户'
            elif row['RFM_score'] in ['543', '444', '435', '355', '354', '345', '344', '335']:
                return '重要保持客户'
            elif row['RFM_score'] in ['512', '511', '422', '421', '412', '411', '311']:
                return '新客户'
            elif row['RFM_score'] in ['155', '154', '144', '214', '215', '115', '114']:
                return '重要挽留客户'
            else:
                return '一般客户'

        df['segment'] = df.apply(segment_user, axis=1)

        return df

# 使用示例
analytics = ECommerceAnalytics()

# 记录用户行为
event = {
    'event_id': 'uuid-123',
    'user_id': 10001,
    'session_id': 'sess-abc',
    'event_type': 'purchase',
    'product_id': 5001,
    'category_id': 10,
    'brand_id': 100,
    'price': 299.00,
    'quantity': 2,
    'device_type': 'mobile'
}
analytics.log_user_behavior(event)

# 获取实时看板
dashboard = analytics.get_realtime_dashboard()
print(f"今日PV: {dashboard['pv']}, UV: {dashboard['uv']}")

# RFM分析
rfm_df = analytics.get_user_rfm_analysis()
print(rfm_df.groupby('segment').size())

1.4 性能优化建议

Markdown
## 性能优化清单

### 1. 写入优化
- [ ] 使用批量插入(每批1000-10000条)
- [ ] ClickHouse使用异步插入
- [ ] 使用Redis Pipeline减少网络往返
- [ ] 行为日志采用异步写入(消息队列)

### 2. 查询优化
- [ ] ClickHouse按时间分区,减少扫描范围
- [ ] 常用查询字段建立索引
- [ ] 物化视图预聚合常用统计
- [ ] Redis缓存热点数据

### 3. 存储优化
- [ ] ClickHouse设置TTL自动清理旧数据
- [ ] 冷热数据分离(近期数据SSD,历史数据HDD)
- [ ] 定期归档历史数据

### 4. 监控告警
- [ ] 监控ClickHouse查询性能
- [ ] 监控Redis内存使用
- [ ] 监控MySQL连接数
- [ ] 设置慢查询告警

项目二:智能推荐系统

2.1 项目背景与需求

业务场景

构建一个电商平台的智能推荐系统,支持个性化商品推荐。

功能需求

  • 用户画像管理
  • 商品特征管理
  • 实时推荐API
  • 推荐效果追踪
  • A/B测试支持

技术选型

  • PostgreSQL + pgvector: 存储用户/商品向量
  • Redis: 缓存推荐结果
  • FastAPI: 推荐服务API
  • Python: 推荐算法

2.2 数据库设计

SQL
-- ==================== 用户画像 ====================

CREATE TABLE user_profiles (
    user_id BIGINT PRIMARY KEY,
    user_vector VECTOR(128),  -- 用户嵌入向量
    preferences JSONB,        -- 偏好标签
    purchase_history JSONB,   -- 购买历史统计
    browse_history JSONB,     -- 浏览历史统计
    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 用户行为序列(用于序列推荐)
CREATE TABLE user_behavior_sequences (
    user_id BIGINT,
    sequence_id INT,
    product_id BIGINT,
    event_type VARCHAR(20),
    event_time TIMESTAMP,
    PRIMARY KEY (user_id, sequence_id)
);

-- ==================== 商品特征 ====================

CREATE TABLE product_features (
    product_id BIGINT PRIMARY KEY,
    product_vector VECTOR(128),  -- 商品嵌入向量
    category_id INT,
    brand_id INT,
    price DECIMAL(10, 2),
    attributes JSONB,            -- 商品属性
    popularity_score FLOAT,      -- 热度分数
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建向量索引
CREATE INDEX idx_user_vector ON user_profiles  -- INDEX索引加速查询
USING ivfflat (user_vector vector_cosine_ops) WITH (lists = 100);

CREATE INDEX idx_product_vector ON product_features
USING ivfflat (product_vector vector_cosine_ops) WITH (lists = 100);

-- ==================== 推荐记录 ====================

CREATE TABLE recommendation_logs (
    log_id BIGSERIAL PRIMARY KEY,
    user_id BIGINT,
    request_id VARCHAR(64),
    context JSONB,              -- 推荐上下文
    recommendations JSONB,      -- 推荐结果
    algorithm VARCHAR(50),      -- 使用的算法
    response_time_ms INT,       -- 响应时间
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- ==================== A/B测试 ====================

CREATE TABLE ab_tests (
    test_id SERIAL PRIMARY KEY,
    test_name VARCHAR(100),
    algorithm_a VARCHAR(50),
    algorithm_b VARCHAR(50),
    traffic_split FLOAT DEFAULT 0.5,  -- A组流量比例
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    status VARCHAR(20) DEFAULT 'running'
);

CREATE TABLE ab_test_assignments (
    user_id BIGINT,
    test_id INT,
    group_name VARCHAR(10),  -- 'A' or 'B'
    assigned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, test_id)
);

2.3 核心代码实现

Python
"""
智能推荐系统 - 核心实现
"""
import numpy as np
from typing import List, Dict, Tuple
from sqlalchemy import create_engine, text
from pgvector.sqlalchemy import Vector
import redis
import hashlib
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel  # Pydantic数据验证模型
import random

app = FastAPI(title="推荐系统API")

class RecommendationRequest(BaseModel):
    user_id: int
    context: Dict = {}
    num_recommendations: int = 10

class RecommendationSystem:
    """推荐系统主类"""

    def __init__(self):
        # PostgreSQL连接
        self.engine = create_engine('postgresql://user:pass@localhost/recommendation')

        # Redis连接
        self.redis = redis.Redis(host='localhost', port=6379, db=0)

        # 缓存配置
        self.cache_ttl = 3600  # 1小时

    def get_recommendations(self,
                           user_id: int,
                           context: Dict = None,
                           num_results: int = 10) -> List[Dict]:
        """
        获取推荐结果

        流程:
        1. 检查A/B测试分组
        2. 尝试从缓存获取
        3. 根据分组选择算法
        4. 生成推荐
        5. 缓存结果
        """
        context = context or {}

        # 1. 检查A/B测试
        ab_group = self._get_ab_test_group(user_id)
        algorithm = self._select_algorithm(ab_group)

        # 2. 检查缓存
        cache_key = f"rec:{user_id}:{algorithm}:{hash(str(context))}"
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)

        # 3. 生成推荐
        if algorithm == 'collaborative_filtering':
            recommendations = self._collaborative_filtering(user_id, num_results)
        elif algorithm == 'content_based':
            recommendations = self._content_based(user_id, num_results)
        elif algorithm == 'vector_similarity':
            recommendations = self._vector_similarity(user_id, num_results)
        else:
            recommendations = self._hybrid_recommendation(user_id, num_results)

        # 4. 应用业务规则
        recommendations = self._apply_business_rules(recommendations, context)

        # 5. 记录推荐日志
        self._log_recommendation(user_id, context, recommendations, algorithm)

        # 6. 缓存结果
        self.redis.setex(cache_key, self.cache_ttl, json.dumps(recommendations))

        return recommendations

    def _get_ab_test_group(self, user_id: int) -> str:
        """获取用户A/B测试分组"""
        # 检查是否有正在运行的测试
        with self.engine.connect() as conn:
            result = conn.execute(text('''
                SELECT test_id, traffic_split
                FROM ab_tests
                WHERE status = 'running'
                LIMIT 1
            '''))
            test = result.fetchone()

            if not test:
                return 'control'

            test_id, traffic_split = test

            # 检查用户是否已分配
            result = conn.execute(text('''
                SELECT group_name FROM ab_test_assignments
                WHERE user_id = :user_id AND test_id = :test_id
            '''), {'user_id': user_id, 'test_id': test_id})

            assignment = result.fetchone()
            if assignment:
                return assignment.group_name

            # 新用户分配
            group = 'A' if random.random() < traffic_split else 'B'
            conn.execute(text('''
                INSERT INTO ab_test_assignments (user_id, test_id, group_name)
                VALUES (:user_id, :test_id, :group)
            '''), {'user_id': user_id, 'test_id': test_id, 'group': group})
            conn.commit()

            return group

    def _select_algorithm(self, ab_group: str) -> str:
        """根据A/B分组选择算法"""
        algorithms = {
            'A': 'vector_similarity',
            'B': 'collaborative_filtering',
            'control': 'hybrid'
        }
        return algorithms.get(ab_group, 'hybrid')

    def _vector_similarity(self, user_id: int, num_results: int) -> List[Dict]:
        """
        基于向量相似度的推荐

        使用pgvector进行相似度搜索
        """
        with self.engine.connect() as conn:
            # 获取用户向量
            result = conn.execute(text('''
                SELECT user_vector FROM user_profiles WHERE user_id = :user_id
            '''), {'user_id': user_id})

            row = result.fetchone()
            if not row or row.user_vector is None:
                # 新用户,返回热门商品
                return self._get_popular_products(num_results)

            user_vector = row.user_vector

            # 向量相似度搜索
            result = conn.execute(text('''
                SELECT
                    p.product_id,
                    p.product_vector <=> :user_vector as distance,
                    p.price,
                    p.popularity_score
                FROM product_features p
                WHERE p.product_vector IS NOT NULL
                ORDER BY p.product_vector <=> :user_vector
                LIMIT :limit
            '''), {'user_vector': str(user_vector), 'limit': num_results * 2})

            recommendations = []
            for row in result:
                recommendations.append({
                    'product_id': row.product_id,
                    'score': 1 - float(row.distance),  # 转换为相似度
                    'price': float(row.price),
                    'algorithm': 'vector_similarity'
                })

            return recommendations[:num_results]

    def _collaborative_filtering(self, user_id: int, num_results: int) -> List[Dict]:
        """
        协同过滤推荐

        基于相似用户的购买行为
        """
        with self.engine.connect() as conn:
            # 找到相似用户
            result = conn.execute(text('''
                WITH similar_users AS (
                    SELECT
                        b.user_id as other_user,
                        COUNT(*) as common_products
                    FROM user_behavior_sequences a
                    JOIN user_behavior_sequences b ON a.product_id = b.product_id
                    WHERE a.user_id = :user_id
                      AND b.user_id != :user_id
                      AND a.event_type = 'purchase'
                      AND b.event_type = 'purchase'
                    GROUP BY b.user_id
                    ORDER BY common_products DESC
                    LIMIT 20
                )
                SELECT
                    ub.product_id,
                    COUNT(*) as score,
                    AVG(p.price) as avg_price
                FROM user_behavior_sequences ub
                JOIN similar_users su ON ub.user_id = su.other_user
                JOIN product_features p ON ub.product_id = p.product_id
                WHERE ub.event_type = 'purchase'
                  AND ub.product_id NOT IN (
                      SELECT product_id FROM user_behavior_sequences
                      WHERE user_id = :user_id AND event_type = 'purchase'
                  )
                GROUP BY ub.product_id
                ORDER BY score DESC
                LIMIT :limit
            '''), {'user_id': user_id, 'limit': num_results})

            recommendations = []
            for row in result:
                recommendations.append({
                    'product_id': row.product_id,
                    'score': row.score,
                    'price': float(row.avg_price),
                    'algorithm': 'collaborative_filtering'
                })

            return recommendations

    def _content_based(self, user_id: int, num_results: int) -> List[Dict]:
        """基于内容的推荐"""
        with self.engine.connect() as conn:
            # 获取用户偏好
            result = conn.execute(text('''
                SELECT preferences FROM user_profiles WHERE user_id = :user_id
            '''), {'user_id': user_id})

            row = result.fetchone()
            if not row:
                return self._get_popular_products(num_results)

            preferences = row.preferences or {}
            preferred_categories = preferences.get('categories', [])

            # 基于偏好的推荐
            result = conn.execute(text('''
                SELECT
                    product_id,
                    price,
                    popularity_score,
                    attributes
                FROM product_features
                WHERE category_id = ANY(:categories)
                ORDER BY popularity_score DESC
                LIMIT :limit
            '''), {'categories': preferred_categories, 'limit': num_results})

            recommendations = []
            for row in result:
                recommendations.append({
                    'product_id': row.product_id,
                    'score': row.popularity_score,
                    'price': float(row.price),
                    'algorithm': 'content_based'
                })

            return recommendations

    def _hybrid_recommendation(self, user_id: int, num_results: int) -> List[Dict]:
        """混合推荐"""
        # 获取多种算法的推荐
        vector_recs = self._vector_similarity(user_id, num_results // 2)
        cf_recs = self._collaborative_filtering(user_id, num_results // 2)

        # 合并并去重
        seen_products = set()
        recommendations = []

        for rec in vector_recs + cf_recs:
            if rec['product_id'] not in seen_products:
                seen_products.add(rec['product_id'])
                rec['algorithm'] = 'hybrid'
                recommendations.append(rec)

        return recommendations[:num_results]

    def _apply_business_rules(self,
                             recommendations: List[Dict],
                             context: Dict) -> List[Dict]:
        """应用业务规则"""
        # 过滤已购买商品
        # 提升新品曝光
        # 价格区间过滤
        # 库存过滤
        return recommendations

    def _get_popular_products(self, num_results: int) -> List[Dict]:
        """获取热门商品(冷启动)"""
        with self.engine.connect() as conn:
            result = conn.execute(text('''
                SELECT product_id, price, popularity_score
                FROM product_features
                ORDER BY popularity_score DESC
                LIMIT :limit
            '''), {'limit': num_results})

            return [
                {
                    'product_id': row.product_id,
                    'score': row.popularity_score,
                    'price': float(row.price),
                    'algorithm': 'popular'
                }
                for row in result
            ]

    def _log_recommendation(self,
                           user_id: int,
                           context: Dict,
                           recommendations: List[Dict],
                           algorithm: str):
        """记录推荐日志"""
        import uuid

        with self.engine.connect() as conn:
            conn.execute(text('''
                INSERT INTO recommendation_logs
                (user_id, request_id, context, recommendations, algorithm)
                VALUES (:user_id, :request_id, :context, :recommendations, :algorithm)
            '''), {
                'user_id': user_id,
                'request_id': str(uuid.uuid4()),
                'context': json.dumps(context),
                'recommendations': json.dumps(recommendations),
                'algorithm': algorithm
            })
            conn.commit()

# FastAPI路由
rec_system = RecommendationSystem()

@app.post("/recommendations")
async def get_recommendations(request: RecommendationRequest):  # async def定义异步函数;用await调用
    """推荐API"""
    try:
        recommendations = rec_system.get_recommendations(
            user_id=request.user_id,
            context=request.context,
            num_results=request.num_recommendations
        )

        return {
            'user_id': request.user_id,
            'recommendations': recommendations,
            'total': len(recommendations)
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/feedback")
async def record_feedback(user_id: int, product_id: int, action: str):
    """记录用户反馈(点击、购买等)"""
    # 更新用户画像
    # 更新推荐模型
    return {'status': 'success'}

项目三:MLOps模型管理平台

3.1 项目背景与需求

业务场景

构建一个企业级MLOps平台,管理完整的机器学习生命周期。

功能需求

  • 数据集管理(版本控制、血缘追踪)
  • 实验管理(参数、指标、Artifact)
  • 模型注册与版本管理
  • 模型部署与 serving
  • 模型监控与告警
  • 工作流编排

技术选型

  • PostgreSQL: 元数据存储
  • MinIO/S3: 模型和Artifact存储
  • Airflow: 工作流编排
  • FastAPI: API服务
  • Prometheus + Grafana: 监控

3.2 数据库设计

SQL
-- ==================== 数据集管理 ====================

CREATE TABLE datasets (
    dataset_id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    version VARCHAR(20) NOT NULL,
    storage_path VARCHAR(500),
    size_bytes BIGINT,
    num_rows BIGINT,
    num_features INT,
    schema JSONB,
    tags JSONB,
    created_by VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(name, version)
);

-- 数据集血缘
CREATE TABLE dataset_lineage (
    parent_dataset_id INT REFERENCES datasets(dataset_id),
    child_dataset_id INT REFERENCES datasets(dataset_id),
    transformation_type VARCHAR(50),
    transformation_sql TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (parent_dataset_id, child_dataset_id)
);

-- ==================== 实验管理 ====================

CREATE TABLE experiments (
    experiment_id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    project_id INT,
    tags JSONB,
    created_by VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE runs (
    run_id VARCHAR(32) PRIMARY KEY,
    experiment_id INT REFERENCES experiments(experiment_id),
    status VARCHAR(20) DEFAULT 'RUNNING',  -- RUNNING, COMPLETED, FAILED
    start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    end_time TIMESTAMP,
    git_commit VARCHAR(40),
    git_branch VARCHAR(50),
    source_code_path VARCHAR(500),
    tags JSONB
);

CREATE TABLE run_params (
    run_id VARCHAR(32) REFERENCES runs(run_id),
    param_key VARCHAR(100),
    param_value TEXT,
    PRIMARY KEY (run_id, param_key)
);

CREATE TABLE run_metrics (
    metric_id BIGSERIAL PRIMARY KEY,
    run_id VARCHAR(32) REFERENCES runs(run_id),
    metric_key VARCHAR(100),
    metric_value FLOAT,
    step INT,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE run_artifacts (
    artifact_id SERIAL PRIMARY KEY,
    run_id VARCHAR(32) REFERENCES runs(run_id),
    artifact_name VARCHAR(100),
    artifact_type VARCHAR(50),  -- model, dataset, plot, etc.
    storage_path VARCHAR(500),
    size_bytes BIGINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- ==================== 模型管理 ====================

CREATE TABLE registered_models (
    model_id SERIAL PRIMARY KEY,
    name VARCHAR(100) UNIQUE NOT NULL,
    description TEXT,
    tags JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE model_versions (
    version_id SERIAL PRIMARY KEY,
    model_id INT REFERENCES registered_models(model_id),
    version INT NOT NULL,
    run_id VARCHAR(32) REFERENCES runs(run_id),
    status VARCHAR(20) DEFAULT 'NONE',  -- NONE, STAGING, PRODUCTION, ARCHIVED
    storage_path VARCHAR(500),
    model_format VARCHAR(20),  -- sklearn, tensorflow, pytorch, onnx
    model_signature JSONB,     -- 输入输出签名
    training_metrics JSONB,
    validation_metrics JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(model_id, version)
);

-- ==================== 模型部署 ====================

CREATE TABLE model_deployments (
    deployment_id SERIAL PRIMARY KEY,
    model_id INT REFERENCES registered_models(model_id),
    version_id INT REFERENCES model_versions(version_id),
    deployment_name VARCHAR(100),
    endpoint_url VARCHAR(500),
    status VARCHAR(20),  -- DEPLOYING, RUNNING, STOPPED, ERROR
    environment VARCHAR(20),  -- dev, staging, production
    replicas INT DEFAULT 1,
    resource_config JSONB,  -- CPU, memory, GPU配置
    deployed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    stopped_at TIMESTAMP
);

-- ==================== 模型监控 ====================

CREATE TABLE model_monitoring (
    monitor_id BIGSERIAL PRIMARY KEY,
    deployment_id INT REFERENCES model_deployments(deployment_id),
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    request_count INT DEFAULT 0,
    latency_p50 FLOAT,
    latency_p99 FLOAT,
    error_rate FLOAT,
    input_drift_score FLOAT,
    output_drift_score FLOAT,
    custom_metrics JSONB
);

-- ==================== 工作流 ====================

CREATE TABLE workflows (
    workflow_id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    dag_config JSONB,  -- Airflow DAG配置
    schedule_interval VARCHAR(50),
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE workflow_runs (
    run_id VARCHAR(32) PRIMARY KEY,
    workflow_id INT REFERENCES workflows(workflow_id),
    execution_date TIMESTAMP,
    status VARCHAR(20),  -- running, success, failed
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    run_config JSONB
);

3.3 核心代码实现

Python
"""
MLOps模型管理平台 - 核心实现
"""
import os
import json
import hashlib
import shutil
from datetime import datetime
from typing import List, Dict, Optional
from sqlalchemy import create_engine, text
import boto3
from botocore.exceptions import ClientError

class MLOpsPlatform:
    """MLOps平台主类"""

    def __init__(self, db_url: str, storage_endpoint: str, storage_bucket: str):
        self.engine = create_engine(db_url)
        self.s3_client = boto3.client(
            's3',
            endpoint_url=storage_endpoint,
            aws_access_key_id='minioadmin',
            aws_secret_access_key='minioadmin'
        )
        self.bucket = storage_bucket
        self._ensure_bucket_exists()

    def _ensure_bucket_exists(self):
        """确保存储桶存在"""
        try:  # try/except捕获异常
            self.s3_client.head_bucket(Bucket=self.bucket)
        except ClientError:
            self.s3_client.create_bucket(Bucket=self.bucket)

    # ==================== 数据集管理 ====================

    def register_dataset(self,
                        name: str,
                        version: str,
                        local_path: str,
                        description: str = "",
                        tags: Dict = None) -> int:
        """注册数据集"""
        # 计算数据集信息
        size_bytes = os.path.getsize(local_path)

        # 上传到对象存储
        storage_path = f"datasets/{name}/{version}/data"
        self.s3_client.upload_file(local_path, self.bucket, storage_path)

        # 记录到数据库
        with self.engine.connect() as conn:
            result = conn.execute(text('''
                INSERT INTO datasets
                (name, version, description, storage_path, size_bytes, tags, created_by)
                VALUES (:name, :version, :desc, :path, :size, :tags, :user)
                RETURNING dataset_id
            '''), {
                'name': name,
                'version': version,
                'desc': description,
                'path': storage_path,
                'size': size_bytes,
                'tags': json.dumps(tags or {}),  # json.dumps将Python对象转为JSON字符串
                'user': 'system'
            })
            conn.commit()

            dataset_id = result.fetchone()[0]

        print(f"✅ 数据集注册成功: {name} v{version} (ID: {dataset_id})")
        return dataset_id

    def get_dataset(self, name: str, version: str = None) -> Optional[Dict]:
        """获取数据集信息"""
        with self.engine.connect() as conn:
            if version:
                result = conn.execute(text('''
                    SELECT * FROM datasets
                    WHERE name = :name AND version = :version
                '''), {'name': name, 'version': version})
            else:
                result = conn.execute(text('''
                    SELECT * FROM datasets
                    WHERE name = :name
                    ORDER BY created_at DESC LIMIT 1
                '''), {'name': name})

            row = result.fetchone()
            if row:
                return {
                    'dataset_id': row.dataset_id,
                    'name': row.name,
                    'version': row.version,
                    'storage_path': row.storage_path,
                    'size_bytes': row.size_bytes,
                    'tags': json.loads(row.tags) if row.tags else {}  # json.loads将JSON字符串转为Python对象
                }
            return None

    def download_dataset(self, dataset_id: int, local_path: str):
        """下载数据集"""
        dataset = self.get_dataset_by_id(dataset_id)
        if not dataset:
            raise ValueError(f"数据集不存在: {dataset_id}")

        self.s3_client.download_file(
            self.bucket,
            dataset['storage_path'],
            local_path
        )
        print(f"✅ 数据集下载完成: {local_path}")

    # ==================== 实验管理 ====================

    def create_experiment(self, name: str, description: str = "", tags: Dict = None) -> int:
        """创建实验"""
        with self.engine.connect() as conn:
            result = conn.execute(text('''
                INSERT INTO experiments (name, description, tags, created_by)
                VALUES (:name, :desc, :tags, :user)
                RETURNING experiment_id
            '''), {
                'name': name,
                'desc': description,
                'tags': json.dumps(tags or {}),
                'user': 'system'
            })
            conn.commit()

            exp_id = result.fetchone()[0]

        print(f"✅ 实验创建成功: {name} (ID: {exp_id})")
        return exp_id

    def start_run(self, experiment_id: int, run_name: str = None) -> str:
        """开始运行"""
        import uuid
        run_id = uuid.uuid4().hex[:16]  # 切片操作:[start:end:step]提取子序列

        with self.engine.connect() as conn:
            conn.execute(text('''
                INSERT INTO runs (run_id, experiment_id, tags)
                VALUES (:run_id, :exp_id, :tags)
            '''), {
                'run_id': run_id,
                'exp_id': experiment_id,
                'tags': json.dumps({'mlflow.runName': run_name} if run_name else {})
            })
            conn.commit()

        print(f"▶️ 运行开始: {run_id}")
        return run_id

    def log_param(self, run_id: str, key: str, value: any):
        """记录参数"""
        with self.engine.connect() as conn:
            conn.execute(text('''
                INSERT INTO run_params (run_id, param_key, param_value)
                VALUES (:run_id, :key, :value)
                ON CONFLICT (run_id, param_key) DO UPDATE
                SET param_value = EXCLUDED.param_value
            '''), {
                'run_id': run_id,
                'key': key,
                'value': str(value)
            })
            conn.commit()

    def log_metric(self, run_id: str, key: str, value: float, step: int = None):
        """记录指标"""
        with self.engine.connect() as conn:
            conn.execute(text('''
                INSERT INTO run_metrics (run_id, metric_key, metric_value, step)
                VALUES (:run_id, :key, :value, :step)
            '''), {
                'run_id': run_id,
                'key': key,
                'value': value,
                'step': step
            })
            conn.commit()

    def log_artifact(self, run_id: str, local_path: str, artifact_path: str = None):
        """记录Artifact"""
        artifact_name = artifact_path or os.path.basename(local_path)
        storage_path = f"artifacts/{run_id}/{artifact_name}"

        # 上传到存储
        if os.path.isfile(local_path):
            self.s3_client.upload_file(local_path, self.bucket, storage_path)
            size_bytes = os.path.getsize(local_path)
        else:
            # 上传目录
            for root, dirs, files in os.walk(local_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    rel_path = os.path.relpath(file_path, local_path)
                    s3_path = f"{storage_path}/{rel_path}"
                    self.s3_client.upload_file(file_path, self.bucket, s3_path)
            size_bytes = sum(
                os.path.getsize(os.path.join(dirpath, filename))
                for dirpath, dirnames, filenames in os.walk(local_path)
                for filename in filenames
            )

        # 记录到数据库
        with self.engine.connect() as conn:
            conn.execute(text('''
                INSERT INTO run_artifacts (run_id, artifact_name, artifact_type, storage_path, size_bytes)
                VALUES (:run_id, :name, :type, :path, :size)
            '''), {
                'run_id': run_id,
                'name': artifact_name,
                'type': 'file',
                'path': storage_path,
                'size': size_bytes
            })
            conn.commit()

        print(f"📎 Artifact已记录: {artifact_name}")

    def end_run(self, run_id: str, status: str = "COMPLETED"):
        """结束运行"""
        with self.engine.connect() as conn:
            conn.execute(text('''
                UPDATE runs
                SET status = :status, end_time = NOW()
                WHERE run_id = :run_id
            '''), {'status': status, 'run_id': run_id})
            conn.commit()

        print(f"✅ 运行结束: {run_id} ({status})")

    # ==================== 模型管理 ====================

    def register_model(self,
                      name: str,
                      run_id: str,
                      model_path: str,
                      description: str = "",
                      tags: Dict = None) -> Dict:
        """注册模型"""
        # 确保模型存在
        with self.engine.connect() as conn:
            result = conn.execute(text('''
                SELECT 1 FROM registered_models WHERE name = :name
            '''), {'name': name})

            if not result.fetchone():
                conn.execute(text('''
                    INSERT INTO registered_models (name, description, tags)
                    VALUES (:name, :desc, :tags)
                '''), {
                    'name': name,
                    'desc': description,
                    'tags': json.dumps(tags or {})
                })

            # 获取版本号
            result = conn.execute(text('''
                SELECT COALESCE(MAX(version), 0) + 1
                FROM model_versions mv
                JOIN registered_models rm ON mv.model_id = rm.model_id
                WHERE rm.name = :name
            '''), {'name': name})

            version = result.fetchone()[0]

            # 上传模型
            model_storage_path = f"models/{name}/{version}/model"
            if os.path.isdir(model_path):
                for root, dirs, files in os.walk(model_path):
                    for file in files:
                        file_path = os.path.join(root, file)
                        rel_path = os.path.relpath(file_path, model_path)
                        s3_path = f"{model_storage_path}/{rel_path}"
                        self.s3_client.upload_file(file_path, self.bucket, s3_path)
            else:
                self.s3_client.upload_file(model_path, self.bucket, model_storage_path)

            # 记录版本
            result = conn.execute(text('''
                INSERT INTO model_versions
                (model_id, version, run_id, storage_path, model_format, training_metrics)
                SELECT model_id, :version, :run_id, :path, :format, :metrics
                FROM registered_models WHERE name = :name
                RETURNING version_id
            '''), {
                'name': name,
                'version': version,
                'run_id': run_id,
                'path': model_storage_path,
                'format': 'sklearn',
                'metrics': '{}'
            })
            conn.commit()

            version_id = result.fetchone()[0]

        print(f"✅ 模型注册成功: {name} v{version}")
        return {'model_name': name, 'version': version, 'version_id': version_id}

    def transition_model_stage(self, name: str, version: int, stage: str):
        """转换模型阶段"""
        with self.engine.connect() as conn:
            # 如果要设置为Production,降级其他Production版本
            if stage == 'PRODUCTION':
                conn.execute(text('''
                    UPDATE model_versions mv
                    SET status = 'ARCHIVED'
                    FROM registered_models rm
                    WHERE mv.model_id = rm.model_id
                      AND rm.name = :name
                      AND mv.status = 'PRODUCTION'
                '''), {'name': name})

            conn.execute(text('''
                UPDATE model_versions mv
                SET status = :stage
                FROM registered_models rm
                WHERE mv.model_id = rm.model_id
                  AND rm.name = :name
                  AND mv.version = :version
            '''), {'name': name, 'version': version, 'stage': stage})
            conn.commit()

        print(f"🔄 模型 {name} v{version} 阶段变更为: {stage}")

    def load_model(self, name: str, version: int = None, stage: str = None):
        """加载模型"""
        with self.engine.connect() as conn:
            if version:
                result = conn.execute(text('''
                    SELECT mv.storage_path, mv.model_format
                    FROM model_versions mv
                    JOIN registered_models rm ON mv.model_id = rm.model_id
                    WHERE rm.name = :name AND mv.version = :version
                '''), {'name': name, 'version': version})
            elif stage:
                result = conn.execute(text('''
                    SELECT mv.storage_path, mv.model_format
                    FROM model_versions mv
                    JOIN registered_models rm ON mv.model_id = rm.model_id
                    WHERE rm.name = :name AND mv.status = :stage
                    ORDER BY mv.version DESC LIMIT 1
                '''), {'name': name, 'stage': stage})
            else:
                result = conn.execute(text('''
                    SELECT mv.storage_path, mv.model_format
                    FROM model_versions mv
                    JOIN registered_models rm ON mv.model_id = rm.model_id
                    WHERE rm.name = :name
                    ORDER BY mv.version DESC LIMIT 1
                '''), {'name': name})

            row = result.fetchone()
            if not row:
                raise ValueError(f"模型不存在: {name}")

            # 下载模型
            local_path = f"/tmp/models/{name}"
            os.makedirs(local_path, exist_ok=True)

            # 列出并下载所有文件
            response = self.s3_client.list_objects_v2(
                Bucket=self.bucket,
                Prefix=row.storage_path
            )

            for obj in response.get('Contents', []):
                rel_path = os.path.relpath(obj['Key'], row.storage_path)
                file_path = os.path.join(local_path, rel_path)
                os.makedirs(os.path.dirname(file_path), exist_ok=True)
                self.s3_client.download_file(self.bucket, obj['Key'], file_path)

            # 加载模型
            import joblib
            model_file = os.path.join(local_path, 'model.pkl')
            return joblib.load(model_file)

# 使用示例
mlops = MLOpsPlatform(
    db_url='postgresql://user:pass@localhost/mlops',
    storage_endpoint='http://localhost:9000',
    storage_bucket='mlops-storage'
)

# 注册数据集
dataset_id = mlops.register_dataset(
    name='customer_churn',
    version='1.0.0',
    local_path='data/customer_churn.csv',
    description='客户流失预测数据集',
    tags={'domain': 'finance', 'type': 'classification'}
)

# 创建实验并记录运行
exp_id = mlops.create_experiment('churn_prediction', '客户流失预测模型')
run_id = mlops.start_run(exp_id, run_name='xgboost_baseline')

# 记录参数和指标
mlops.log_param(run_id, 'model', 'xgboost')
mlops.log_param(run_id, 'n_estimators', 100)
mlops.log_param(run_id, 'max_depth', 6)

for epoch in range(10):
    mlops.log_metric(run_id, 'train_auc', 0.8 + epoch * 0.02, step=epoch)
    mlops.log_metric(run_id, 'val_auc', 0.78 + epoch * 0.015, step=epoch)

mlops.log_metric(run_id, 'test_auc', 0.95)

# 保存模型并注册
mlops.log_artifact(run_id, 'model.pkl')
model_info = mlops.register_model(
    name='churn_predictor',
    run_id=run_id,
    model_path='model.pkl',
    description='XGBoost客户流失预测模型'
)

# 部署到生产
mlops.transition_model_stage('churn_predictor', model_info['version'], 'PRODUCTION')

mlops.end_run(run_id)

4. 本章小结

三个项目的技术对比

维度 项目一:电商分析 项目二:推荐系统 项目三:MLOps平台
数据库 MySQL + ClickHouse + Redis PostgreSQL + pgvector + Redis PostgreSQL + MinIO
数据规模 TB级 GB级 可变
查询类型 OLAP分析 向量搜索 + 实时查询 元数据管理
延迟要求 秒级 毫秒级 秒级
核心挑战 大数据处理 向量检索性能 版本管理

学习检查清单

Markdown
## 完成本教程后,你应该能够:

### 基础能力
- [ ] 设计符合范式要求的数据库schema
- [ ] 编写高效的SQL查询
- [ ] 使用Python操作关系型数据库
- [ ] 理解索引原理并能优化查询

### 进阶能力
- [ ] 设计分布式数据系统架构
- [ ] 实现读写分离和数据同步
- [ ] 使用NoSQL数据库(MongoDB、Redis)
- [ ] 处理高并发场景的数据一致性

### 高级能力
- [ ] 构建特征存储系统
- [ ] 使用向量数据库
- [ ] 实现MLOps数据管理
- [ ] 设计AI数据流水线

### 实战能力
- [ ] 独立完成电商分析系统
- [ ] 构建推荐系统数据层
- [ ] 搭建MLOps数据平台

下一步学习建议

  1. 深入学习
  2. 分布式数据库(TiDB、CockroachDB)
  3. 数据仓库(Snowflake、BigQuery)
  4. 流处理(Kafka、Flink)

  5. 实践项目

  6. 参与开源项目
  7. 构建个人项目
  8. 参加数据竞赛

  9. 持续学习

  10. 关注数据库领域新趋势
  11. 学习云原生数据库
  12. 了解AI Native数据库

恭喜完成全部十章学习!

你已经掌握了从基础SQL到高级AI应用的数据库完整知识体系。继续实践和探索,将所学知识应用到实际项目中!


参考资源: - Designing Data-Intensive Applications (书籍) - Google Cloud Database Best Practices - AWS Database Blog - MLflow Documentation