10-实战项目案例¶
📋 本章概览¶
本章将通过三个完整的实战项目,综合运用前面章节所学的数据库知识。每个项目都包含完整的需求分析、数据库设计、代码实现和优化建议,帮助你建立从理论到实践的完整能力。
项目列表: 1. 电商用户行为分析系统 - 数据仓库与分析型应用 2. 智能推荐系统 - 特征存储与实时推荐 3. MLOps模型管理平台 - 完整的机器学习生命周期管理
预计学习时间: 10-15小时
前置章节: 第09章:数据库与AI应用
项目一:电商用户行为分析系统¶
1.1 项目背景与需求¶
业务场景¶
构建一个电商平台的用户行为分析系统,支持实时数据摄取、离线分析和报表生成。
功能需求¶
- 记录用户行为(浏览、点击、加购、购买)
- 实时统计关键指标(PV、UV、转化率)
- 用户画像分析
- 商品销售分析
- 支持时间维度的多维分析
技术选型¶
- MySQL: 存储用户、商品等基础数据
- ClickHouse: 存储行为日志,支持OLAP分析
- Redis: 缓存实时统计结果
- Python: 数据处理和分析
1.2 数据库设计¶
SQL
-- ==================== MySQL: 基础数据 ====================
-- 用户表
CREATE TABLE users (
user_id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE,
phone VARCHAR(20),
register_time DATETIME DEFAULT CURRENT_TIMESTAMP,
last_login_time DATETIME,
user_level TINYINT DEFAULT 1 COMMENT '用户等级: 1-普通, 2-VIP, 3-SVIP',
status TINYINT DEFAULT 1 COMMENT '状态: 0-禁用, 1-正常',
INDEX idx_register_time (register_time),
INDEX idx_user_level (user_level)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 商品表
CREATE TABLE products (
product_id BIGINT PRIMARY KEY AUTO_INCREMENT,
product_name VARCHAR(200) NOT NULL,
category_id INT,
brand_id INT,
price DECIMAL(10, 2) NOT NULL,
stock INT DEFAULT 0,
status TINYINT DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_category (category_id),
INDEX idx_price (price),
FULLTEXT INDEX idx_name (product_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 订单表
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
total_amount DECIMAL(12, 2) NOT NULL,
status TINYINT COMMENT '0-待支付, 1-已支付, 2-已发货, 3-已完成, 4-已取消',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
paid_at DATETIME,
INDEX idx_user_id (user_id),
INDEX idx_created_at (created_at),
INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 订单详情表
CREATE TABLE order_items (
item_id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL,
total_price DECIMAL(10, 2) NOT NULL,
INDEX idx_order_id (order_id),
INDEX idx_product_id (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ==================== ClickHouse: 行为日志 ====================
-- 用户行为日志表
CREATE TABLE user_behavior_log (
event_id UUID,
user_id UInt64,
session_id String,
event_type Enum('view' = 1, 'click' = 2, 'cart' = 3, 'purchase' = 4),
product_id UInt64,
category_id UInt32,
brand_id UInt32,
price Decimal(10, 2),
quantity UInt32,
device_type Enum('pc' = 1, 'mobile' = 2, 'tablet' = 3),
os String,
browser String,
ip_address IPv4,
country String,
city String,
referrer String,
event_time DateTime,
event_date Date DEFAULT toDate(event_time)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_type, user_id)
TTL event_time + INTERVAL 1 YEAR;
-- 物化视图:实时统计
CREATE MATERIALIZED VIEW user_behavior_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, hour, event_type)
AS SELECT
toDate(event_time) as event_date,
toHour(event_time) as hour,
event_type,
count() as event_count,
uniqExact(user_id) as unique_users,
sum(price * quantity) as total_amount
FROM user_behavior_log
GROUP BY event_date, hour, event_type; -- GROUP BY分组;HAVING过滤分组
1.3 核心代码实现¶
Python
"""
电商用户行为分析系统 - 核心实现
"""
# 需要安装: pip install clickhouse-driver
import json
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd
from sqlalchemy import create_engine, text
import redis
import clickhouse_driver
class ECommerceAnalytics:
"""电商分析系统主类"""
def __init__(self):
# MySQL连接
self.mysql_engine = create_engine(
'mysql+pymysql://user:pass@localhost/ecommerce'
)
# ClickHouse连接
self.ch_client = clickhouse_driver.Client(
host='localhost',
database='ecommerce'
)
# Redis连接
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
def log_user_behavior(self, event: Dict):
"""
记录用户行为
双写策略:
1. 写入ClickHouse用于分析
2. 更新Redis实时统计
"""
# 1. 写入ClickHouse
self.ch_client.execute('''
INSERT INTO user_behavior_log
(event_id, user_id, session_id, event_type, product_id,
category_id, brand_id, price, quantity, device_type,
os, browser, ip_address, country, city, referrer, event_time)
VALUES
''', [(
event['event_id'],
event['user_id'],
event['session_id'],
event['event_type'],
event.get('product_id', 0),
event.get('category_id', 0),
event.get('brand_id', 0),
event.get('price', 0),
event.get('quantity', 0),
event.get('device_type', 'pc'),
event.get('os', ''),
event.get('browser', ''),
event.get('ip_address', '0.0.0.0'),
event.get('country', ''),
event.get('city', ''),
event.get('referrer', ''),
datetime.now()
)])
# 2. 更新Redis实时统计
self._update_realtime_stats(event)
def _update_realtime_stats(self, event: Dict):
"""更新实时统计(Redis)"""
today = datetime.now().strftime('%Y-%m-%d')
hour = datetime.now().strftime('%H')
# 使用Redis Pipeline批量更新
pipe = self.redis_client.pipeline()
# 今日总PV
pipe.incr(f'stats:{today}:pv')
# 今日UV(使用HyperLogLog)
pipe.pfadd(f'stats:{today}:uv', event['user_id'])
# 事件类型统计
event_type = event['event_type']
pipe.hincrby(f'stats:{today}:events', event_type, 1)
# 小时级统计
pipe.hincrby(f'stats:{today}:{hour}:events', event_type, 1)
# 设置过期时间(保留7天)
pipe.expire(f'stats:{today}:pv', 7 * 24 * 3600)
pipe.execute()
def get_realtime_dashboard(self) -> Dict:
"""获取实时看板数据"""
today = datetime.now().strftime('%Y-%m-%d')
# 从Redis读取实时数据
pv = int(self.redis_client.get(f'stats:{today}:pv') or 0)
uv = self.redis_client.pfcount(f'stats:{today}:uv')
events = self.redis_client.hgetall(f'stats:{today}:events')
# 计算转化率
views = int(events.get(b'view', 0))
purchases = int(events.get(b'purchase', 0))
conversion_rate = (purchases / views * 100) if views > 0 else 0
return {
'date': today,
'pv': pv,
'uv': uv,
'events': {k.decode(): int(v) for k, v in events.items()},
'conversion_rate': round(conversion_rate, 2)
}
def get_user_behavior_analysis(self,
start_date: str,
end_date: str) -> pd.DataFrame:
"""
用户行为分析
使用ClickHouse进行OLAP分析
"""
query = '''
SELECT
event_date,
event_type,
count() as event_count,
uniqExact(user_id) as unique_users,
avg(price) as avg_price,
sum(price * quantity) as total_revenue
FROM user_behavior_log
WHERE event_date BETWEEN %(start)s AND %(end)s
GROUP BY event_date, event_type
ORDER BY event_date, event_type
'''
return pd.read_sql(
query,
self.ch_client,
params={'start': start_date, 'end': end_date}
)
def get_user_funnel_analysis(self, date: str) -> Dict:
"""
用户漏斗分析
计算从浏览到购买的转化漏斗
"""
query = '''
SELECT
event_type,
uniqExact(user_id) as user_count,
count() as event_count
FROM user_behavior_log
WHERE event_date = %(date)s
GROUP BY event_type
'''
result = self.ch_client.execute(query, {'date': date})
# 构建漏斗数据
funnel = {}
for row in result:
funnel[row[0]] = {
'users': row[1],
'events': row[2]
}
# 计算转化率
views = funnel.get('view', {}).get('users', 0)
clicks = funnel.get('click', {}).get('users', 0)
carts = funnel.get('cart', {}).get('users', 0)
purchases = funnel.get('purchase', {}).get('users', 0)
return {
'date': date,
'funnel': {
'view': views,
'click': clicks,
'cart': carts,
'purchase': purchases
},
'conversion_rates': {
'view_to_click': round(clicks / views * 100, 2) if views > 0 else 0,
'click_to_cart': round(carts / clicks * 100, 2) if clicks > 0 else 0,
'cart_to_purchase': round(purchases / carts * 100, 2) if carts > 0 else 0,
'overall': round(purchases / views * 100, 2) if views > 0 else 0
}
}
def get_user_rfm_analysis(self, analysis_date: str = None) -> pd.DataFrame:
"""
RFM用户价值分析
R: Recency - 最近一次购买距今天数
F: Frequency - 购买频率
M: Monetary - 购买金额
"""
if analysis_date is None:
analysis_date = datetime.now().strftime('%Y-%m-%d')
query = '''
SELECT
user_id,
dateDiff('day', max(event_time), %(date)s) as recency,
count() as frequency,
sum(price * quantity) as monetary
FROM user_behavior_log
WHERE event_type = 'purchase'
AND event_time >= date_sub(%(date)s, 365)
GROUP BY user_id
'''
df = pd.read_sql(
query,
self.ch_client,
params={'date': analysis_date}
)
# RFM评分(使用分位数)
df['R_score'] = pd.qcut(df['recency'], 5, labels=[5,4,3,2,1])
df['F_score'] = pd.qcut(df['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
df['M_score'] = pd.qcut(df['monetary'], 5, labels=[1,2,3,4,5])
# 综合RFM得分
df['RFM_score'] = (df['R_score'].astype(str) +
df['F_score'].astype(str) +
df['M_score'].astype(str))
# 用户分层
def segment_user(row):
if row['RFM_score'] in ['555', '554', '544', '545', '454', '455', '445']:
return '重要价值客户'
elif row['RFM_score'] in ['543', '444', '435', '355', '354', '345', '344', '335']:
return '重要保持客户'
elif row['RFM_score'] in ['512', '511', '422', '421', '412', '411', '311']:
return '新客户'
elif row['RFM_score'] in ['155', '154', '144', '214', '215', '115', '114']:
return '重要挽留客户'
else:
return '一般客户'
df['segment'] = df.apply(segment_user, axis=1)
return df
# 使用示例
analytics = ECommerceAnalytics()
# 记录用户行为
event = {
'event_id': 'uuid-123',
'user_id': 10001,
'session_id': 'sess-abc',
'event_type': 'purchase',
'product_id': 5001,
'category_id': 10,
'brand_id': 100,
'price': 299.00,
'quantity': 2,
'device_type': 'mobile'
}
analytics.log_user_behavior(event)
# 获取实时看板
dashboard = analytics.get_realtime_dashboard()
print(f"今日PV: {dashboard['pv']}, UV: {dashboard['uv']}")
# RFM分析
rfm_df = analytics.get_user_rfm_analysis()
print(rfm_df.groupby('segment').size())
1.4 性能优化建议¶
Markdown
## 性能优化清单
### 1. 写入优化
- [ ] 使用批量插入(每批1000-10000条)
- [ ] ClickHouse使用异步插入
- [ ] 使用Redis Pipeline减少网络往返
- [ ] 行为日志采用异步写入(消息队列)
### 2. 查询优化
- [ ] ClickHouse按时间分区,减少扫描范围
- [ ] 常用查询字段建立索引
- [ ] 物化视图预聚合常用统计
- [ ] Redis缓存热点数据
### 3. 存储优化
- [ ] ClickHouse设置TTL自动清理旧数据
- [ ] 冷热数据分离(近期数据SSD,历史数据HDD)
- [ ] 定期归档历史数据
### 4. 监控告警
- [ ] 监控ClickHouse查询性能
- [ ] 监控Redis内存使用
- [ ] 监控MySQL连接数
- [ ] 设置慢查询告警
项目二:智能推荐系统¶
2.1 项目背景与需求¶
业务场景¶
构建一个电商平台的智能推荐系统,支持个性化商品推荐。
功能需求¶
- 用户画像管理
- 商品特征管理
- 实时推荐API
- 推荐效果追踪
- A/B测试支持
技术选型¶
- PostgreSQL + pgvector: 存储用户/商品向量
- Redis: 缓存推荐结果
- FastAPI: 推荐服务API
- Python: 推荐算法
2.2 数据库设计¶
SQL
-- ==================== 用户画像 ====================
CREATE TABLE user_profiles (
user_id BIGINT PRIMARY KEY,
user_vector VECTOR(128), -- 用户嵌入向量
preferences JSONB, -- 偏好标签
purchase_history JSONB, -- 购买历史统计
browse_history JSONB, -- 浏览历史统计
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 用户行为序列(用于序列推荐)
CREATE TABLE user_behavior_sequences (
user_id BIGINT,
sequence_id INT,
product_id BIGINT,
event_type VARCHAR(20),
event_time TIMESTAMP,
PRIMARY KEY (user_id, sequence_id)
);
-- ==================== 商品特征 ====================
CREATE TABLE product_features (
product_id BIGINT PRIMARY KEY,
product_vector VECTOR(128), -- 商品嵌入向量
category_id INT,
brand_id INT,
price DECIMAL(10, 2),
attributes JSONB, -- 商品属性
popularity_score FLOAT, -- 热度分数
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建向量索引
CREATE INDEX idx_user_vector ON user_profiles -- INDEX索引加速查询
USING ivfflat (user_vector vector_cosine_ops) WITH (lists = 100);
CREATE INDEX idx_product_vector ON product_features
USING ivfflat (product_vector vector_cosine_ops) WITH (lists = 100);
-- ==================== 推荐记录 ====================
CREATE TABLE recommendation_logs (
log_id BIGSERIAL PRIMARY KEY,
user_id BIGINT,
request_id VARCHAR(64),
context JSONB, -- 推荐上下文
recommendations JSONB, -- 推荐结果
algorithm VARCHAR(50), -- 使用的算法
response_time_ms INT, -- 响应时间
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- ==================== A/B测试 ====================
CREATE TABLE ab_tests (
test_id SERIAL PRIMARY KEY,
test_name VARCHAR(100),
algorithm_a VARCHAR(50),
algorithm_b VARCHAR(50),
traffic_split FLOAT DEFAULT 0.5, -- A组流量比例
start_time TIMESTAMP,
end_time TIMESTAMP,
status VARCHAR(20) DEFAULT 'running'
);
CREATE TABLE ab_test_assignments (
user_id BIGINT,
test_id INT,
group_name VARCHAR(10), -- 'A' or 'B'
assigned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, test_id)
);
2.3 核心代码实现¶
Python
"""
智能推荐系统 - 核心实现
"""
import numpy as np
from typing import List, Dict, Tuple
from sqlalchemy import create_engine, text
from pgvector.sqlalchemy import Vector
import redis
import hashlib
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel # Pydantic数据验证模型
import random
app = FastAPI(title="推荐系统API")
class RecommendationRequest(BaseModel):
user_id: int
context: Dict = {}
num_recommendations: int = 10
class RecommendationSystem:
"""推荐系统主类"""
def __init__(self):
# PostgreSQL连接
self.engine = create_engine('postgresql://user:pass@localhost/recommendation')
# Redis连接
self.redis = redis.Redis(host='localhost', port=6379, db=0)
# 缓存配置
self.cache_ttl = 3600 # 1小时
def get_recommendations(self,
user_id: int,
context: Dict = None,
num_results: int = 10) -> List[Dict]:
"""
获取推荐结果
流程:
1. 检查A/B测试分组
2. 尝试从缓存获取
3. 根据分组选择算法
4. 生成推荐
5. 缓存结果
"""
context = context or {}
# 1. 检查A/B测试
ab_group = self._get_ab_test_group(user_id)
algorithm = self._select_algorithm(ab_group)
# 2. 检查缓存
cache_key = f"rec:{user_id}:{algorithm}:{hash(str(context))}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 3. 生成推荐
if algorithm == 'collaborative_filtering':
recommendations = self._collaborative_filtering(user_id, num_results)
elif algorithm == 'content_based':
recommendations = self._content_based(user_id, num_results)
elif algorithm == 'vector_similarity':
recommendations = self._vector_similarity(user_id, num_results)
else:
recommendations = self._hybrid_recommendation(user_id, num_results)
# 4. 应用业务规则
recommendations = self._apply_business_rules(recommendations, context)
# 5. 记录推荐日志
self._log_recommendation(user_id, context, recommendations, algorithm)
# 6. 缓存结果
self.redis.setex(cache_key, self.cache_ttl, json.dumps(recommendations))
return recommendations
def _get_ab_test_group(self, user_id: int) -> str:
"""获取用户A/B测试分组"""
# 检查是否有正在运行的测试
with self.engine.connect() as conn:
result = conn.execute(text('''
SELECT test_id, traffic_split
FROM ab_tests
WHERE status = 'running'
LIMIT 1
'''))
test = result.fetchone()
if not test:
return 'control'
test_id, traffic_split = test
# 检查用户是否已分配
result = conn.execute(text('''
SELECT group_name FROM ab_test_assignments
WHERE user_id = :user_id AND test_id = :test_id
'''), {'user_id': user_id, 'test_id': test_id})
assignment = result.fetchone()
if assignment:
return assignment.group_name
# 新用户分配
group = 'A' if random.random() < traffic_split else 'B'
conn.execute(text('''
INSERT INTO ab_test_assignments (user_id, test_id, group_name)
VALUES (:user_id, :test_id, :group)
'''), {'user_id': user_id, 'test_id': test_id, 'group': group})
conn.commit()
return group
def _select_algorithm(self, ab_group: str) -> str:
"""根据A/B分组选择算法"""
algorithms = {
'A': 'vector_similarity',
'B': 'collaborative_filtering',
'control': 'hybrid'
}
return algorithms.get(ab_group, 'hybrid')
def _vector_similarity(self, user_id: int, num_results: int) -> List[Dict]:
"""
基于向量相似度的推荐
使用pgvector进行相似度搜索
"""
with self.engine.connect() as conn:
# 获取用户向量
result = conn.execute(text('''
SELECT user_vector FROM user_profiles WHERE user_id = :user_id
'''), {'user_id': user_id})
row = result.fetchone()
if not row or row.user_vector is None:
# 新用户,返回热门商品
return self._get_popular_products(num_results)
user_vector = row.user_vector
# 向量相似度搜索
result = conn.execute(text('''
SELECT
p.product_id,
p.product_vector <=> :user_vector as distance,
p.price,
p.popularity_score
FROM product_features p
WHERE p.product_vector IS NOT NULL
ORDER BY p.product_vector <=> :user_vector
LIMIT :limit
'''), {'user_vector': str(user_vector), 'limit': num_results * 2})
recommendations = []
for row in result:
recommendations.append({
'product_id': row.product_id,
'score': 1 - float(row.distance), # 转换为相似度
'price': float(row.price),
'algorithm': 'vector_similarity'
})
return recommendations[:num_results]
def _collaborative_filtering(self, user_id: int, num_results: int) -> List[Dict]:
"""
协同过滤推荐
基于相似用户的购买行为
"""
with self.engine.connect() as conn:
# 找到相似用户
result = conn.execute(text('''
WITH similar_users AS (
SELECT
b.user_id as other_user,
COUNT(*) as common_products
FROM user_behavior_sequences a
JOIN user_behavior_sequences b ON a.product_id = b.product_id
WHERE a.user_id = :user_id
AND b.user_id != :user_id
AND a.event_type = 'purchase'
AND b.event_type = 'purchase'
GROUP BY b.user_id
ORDER BY common_products DESC
LIMIT 20
)
SELECT
ub.product_id,
COUNT(*) as score,
AVG(p.price) as avg_price
FROM user_behavior_sequences ub
JOIN similar_users su ON ub.user_id = su.other_user
JOIN product_features p ON ub.product_id = p.product_id
WHERE ub.event_type = 'purchase'
AND ub.product_id NOT IN (
SELECT product_id FROM user_behavior_sequences
WHERE user_id = :user_id AND event_type = 'purchase'
)
GROUP BY ub.product_id
ORDER BY score DESC
LIMIT :limit
'''), {'user_id': user_id, 'limit': num_results})
recommendations = []
for row in result:
recommendations.append({
'product_id': row.product_id,
'score': row.score,
'price': float(row.avg_price),
'algorithm': 'collaborative_filtering'
})
return recommendations
def _content_based(self, user_id: int, num_results: int) -> List[Dict]:
"""基于内容的推荐"""
with self.engine.connect() as conn:
# 获取用户偏好
result = conn.execute(text('''
SELECT preferences FROM user_profiles WHERE user_id = :user_id
'''), {'user_id': user_id})
row = result.fetchone()
if not row:
return self._get_popular_products(num_results)
preferences = row.preferences or {}
preferred_categories = preferences.get('categories', [])
# 基于偏好的推荐
result = conn.execute(text('''
SELECT
product_id,
price,
popularity_score,
attributes
FROM product_features
WHERE category_id = ANY(:categories)
ORDER BY popularity_score DESC
LIMIT :limit
'''), {'categories': preferred_categories, 'limit': num_results})
recommendations = []
for row in result:
recommendations.append({
'product_id': row.product_id,
'score': row.popularity_score,
'price': float(row.price),
'algorithm': 'content_based'
})
return recommendations
def _hybrid_recommendation(self, user_id: int, num_results: int) -> List[Dict]:
"""混合推荐"""
# 获取多种算法的推荐
vector_recs = self._vector_similarity(user_id, num_results // 2)
cf_recs = self._collaborative_filtering(user_id, num_results // 2)
# 合并并去重
seen_products = set()
recommendations = []
for rec in vector_recs + cf_recs:
if rec['product_id'] not in seen_products:
seen_products.add(rec['product_id'])
rec['algorithm'] = 'hybrid'
recommendations.append(rec)
return recommendations[:num_results]
def _apply_business_rules(self,
recommendations: List[Dict],
context: Dict) -> List[Dict]:
"""应用业务规则"""
# 过滤已购买商品
# 提升新品曝光
# 价格区间过滤
# 库存过滤
return recommendations
def _get_popular_products(self, num_results: int) -> List[Dict]:
"""获取热门商品(冷启动)"""
with self.engine.connect() as conn:
result = conn.execute(text('''
SELECT product_id, price, popularity_score
FROM product_features
ORDER BY popularity_score DESC
LIMIT :limit
'''), {'limit': num_results})
return [
{
'product_id': row.product_id,
'score': row.popularity_score,
'price': float(row.price),
'algorithm': 'popular'
}
for row in result
]
def _log_recommendation(self,
user_id: int,
context: Dict,
recommendations: List[Dict],
algorithm: str):
"""记录推荐日志"""
import uuid
with self.engine.connect() as conn:
conn.execute(text('''
INSERT INTO recommendation_logs
(user_id, request_id, context, recommendations, algorithm)
VALUES (:user_id, :request_id, :context, :recommendations, :algorithm)
'''), {
'user_id': user_id,
'request_id': str(uuid.uuid4()),
'context': json.dumps(context),
'recommendations': json.dumps(recommendations),
'algorithm': algorithm
})
conn.commit()
# FastAPI路由
rec_system = RecommendationSystem()
@app.post("/recommendations")
async def get_recommendations(request: RecommendationRequest): # async def定义异步函数;用await调用
"""推荐API"""
try:
recommendations = rec_system.get_recommendations(
user_id=request.user_id,
context=request.context,
num_results=request.num_recommendations
)
return {
'user_id': request.user_id,
'recommendations': recommendations,
'total': len(recommendations)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/feedback")
async def record_feedback(user_id: int, product_id: int, action: str):
"""记录用户反馈(点击、购买等)"""
# 更新用户画像
# 更新推荐模型
return {'status': 'success'}
项目三:MLOps模型管理平台¶
3.1 项目背景与需求¶
业务场景¶
构建一个企业级MLOps平台,管理完整的机器学习生命周期。
功能需求¶
- 数据集管理(版本控制、血缘追踪)
- 实验管理(参数、指标、Artifact)
- 模型注册与版本管理
- 模型部署与 serving
- 模型监控与告警
- 工作流编排
技术选型¶
- PostgreSQL: 元数据存储
- MinIO/S3: 模型和Artifact存储
- Airflow: 工作流编排
- FastAPI: API服务
- Prometheus + Grafana: 监控
3.2 数据库设计¶
SQL
-- ==================== 数据集管理 ====================
CREATE TABLE datasets (
dataset_id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
version VARCHAR(20) NOT NULL,
storage_path VARCHAR(500),
size_bytes BIGINT,
num_rows BIGINT,
num_features INT,
schema JSONB,
tags JSONB,
created_by VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(name, version)
);
-- 数据集血缘
CREATE TABLE dataset_lineage (
parent_dataset_id INT REFERENCES datasets(dataset_id),
child_dataset_id INT REFERENCES datasets(dataset_id),
transformation_type VARCHAR(50),
transformation_sql TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (parent_dataset_id, child_dataset_id)
);
-- ==================== 实验管理 ====================
CREATE TABLE experiments (
experiment_id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
project_id INT,
tags JSONB,
created_by VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE runs (
run_id VARCHAR(32) PRIMARY KEY,
experiment_id INT REFERENCES experiments(experiment_id),
status VARCHAR(20) DEFAULT 'RUNNING', -- RUNNING, COMPLETED, FAILED
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP,
git_commit VARCHAR(40),
git_branch VARCHAR(50),
source_code_path VARCHAR(500),
tags JSONB
);
CREATE TABLE run_params (
run_id VARCHAR(32) REFERENCES runs(run_id),
param_key VARCHAR(100),
param_value TEXT,
PRIMARY KEY (run_id, param_key)
);
CREATE TABLE run_metrics (
metric_id BIGSERIAL PRIMARY KEY,
run_id VARCHAR(32) REFERENCES runs(run_id),
metric_key VARCHAR(100),
metric_value FLOAT,
step INT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE run_artifacts (
artifact_id SERIAL PRIMARY KEY,
run_id VARCHAR(32) REFERENCES runs(run_id),
artifact_name VARCHAR(100),
artifact_type VARCHAR(50), -- model, dataset, plot, etc.
storage_path VARCHAR(500),
size_bytes BIGINT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- ==================== 模型管理 ====================
CREATE TABLE registered_models (
model_id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
tags JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE model_versions (
version_id SERIAL PRIMARY KEY,
model_id INT REFERENCES registered_models(model_id),
version INT NOT NULL,
run_id VARCHAR(32) REFERENCES runs(run_id),
status VARCHAR(20) DEFAULT 'NONE', -- NONE, STAGING, PRODUCTION, ARCHIVED
storage_path VARCHAR(500),
model_format VARCHAR(20), -- sklearn, tensorflow, pytorch, onnx
model_signature JSONB, -- 输入输出签名
training_metrics JSONB,
validation_metrics JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(model_id, version)
);
-- ==================== 模型部署 ====================
CREATE TABLE model_deployments (
deployment_id SERIAL PRIMARY KEY,
model_id INT REFERENCES registered_models(model_id),
version_id INT REFERENCES model_versions(version_id),
deployment_name VARCHAR(100),
endpoint_url VARCHAR(500),
status VARCHAR(20), -- DEPLOYING, RUNNING, STOPPED, ERROR
environment VARCHAR(20), -- dev, staging, production
replicas INT DEFAULT 1,
resource_config JSONB, -- CPU, memory, GPU配置
deployed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
stopped_at TIMESTAMP
);
-- ==================== 模型监控 ====================
CREATE TABLE model_monitoring (
monitor_id BIGSERIAL PRIMARY KEY,
deployment_id INT REFERENCES model_deployments(deployment_id),
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
request_count INT DEFAULT 0,
latency_p50 FLOAT,
latency_p99 FLOAT,
error_rate FLOAT,
input_drift_score FLOAT,
output_drift_score FLOAT,
custom_metrics JSONB
);
-- ==================== 工作流 ====================
CREATE TABLE workflows (
workflow_id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
dag_config JSONB, -- Airflow DAG配置
schedule_interval VARCHAR(50),
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE workflow_runs (
run_id VARCHAR(32) PRIMARY KEY,
workflow_id INT REFERENCES workflows(workflow_id),
execution_date TIMESTAMP,
status VARCHAR(20), -- running, success, failed
start_time TIMESTAMP,
end_time TIMESTAMP,
run_config JSONB
);
3.3 核心代码实现¶
Python
"""
MLOps模型管理平台 - 核心实现
"""
import os
import json
import hashlib
import shutil
from datetime import datetime
from typing import List, Dict, Optional
from sqlalchemy import create_engine, text
import boto3
from botocore.exceptions import ClientError
class MLOpsPlatform:
"""MLOps平台主类"""
def __init__(self, db_url: str, storage_endpoint: str, storage_bucket: str):
self.engine = create_engine(db_url)
self.s3_client = boto3.client(
's3',
endpoint_url=storage_endpoint,
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin'
)
self.bucket = storage_bucket
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
"""确保存储桶存在"""
try: # try/except捕获异常
self.s3_client.head_bucket(Bucket=self.bucket)
except ClientError:
self.s3_client.create_bucket(Bucket=self.bucket)
# ==================== 数据集管理 ====================
def register_dataset(self,
name: str,
version: str,
local_path: str,
description: str = "",
tags: Dict = None) -> int:
"""注册数据集"""
# 计算数据集信息
size_bytes = os.path.getsize(local_path)
# 上传到对象存储
storage_path = f"datasets/{name}/{version}/data"
self.s3_client.upload_file(local_path, self.bucket, storage_path)
# 记录到数据库
with self.engine.connect() as conn:
result = conn.execute(text('''
INSERT INTO datasets
(name, version, description, storage_path, size_bytes, tags, created_by)
VALUES (:name, :version, :desc, :path, :size, :tags, :user)
RETURNING dataset_id
'''), {
'name': name,
'version': version,
'desc': description,
'path': storage_path,
'size': size_bytes,
'tags': json.dumps(tags or {}), # json.dumps将Python对象转为JSON字符串
'user': 'system'
})
conn.commit()
dataset_id = result.fetchone()[0]
print(f"✅ 数据集注册成功: {name} v{version} (ID: {dataset_id})")
return dataset_id
def get_dataset(self, name: str, version: str = None) -> Optional[Dict]:
"""获取数据集信息"""
with self.engine.connect() as conn:
if version:
result = conn.execute(text('''
SELECT * FROM datasets
WHERE name = :name AND version = :version
'''), {'name': name, 'version': version})
else:
result = conn.execute(text('''
SELECT * FROM datasets
WHERE name = :name
ORDER BY created_at DESC LIMIT 1
'''), {'name': name})
row = result.fetchone()
if row:
return {
'dataset_id': row.dataset_id,
'name': row.name,
'version': row.version,
'storage_path': row.storage_path,
'size_bytes': row.size_bytes,
'tags': json.loads(row.tags) if row.tags else {} # json.loads将JSON字符串转为Python对象
}
return None
def download_dataset(self, dataset_id: int, local_path: str):
"""下载数据集"""
dataset = self.get_dataset_by_id(dataset_id)
if not dataset:
raise ValueError(f"数据集不存在: {dataset_id}")
self.s3_client.download_file(
self.bucket,
dataset['storage_path'],
local_path
)
print(f"✅ 数据集下载完成: {local_path}")
# ==================== 实验管理 ====================
def create_experiment(self, name: str, description: str = "", tags: Dict = None) -> int:
"""创建实验"""
with self.engine.connect() as conn:
result = conn.execute(text('''
INSERT INTO experiments (name, description, tags, created_by)
VALUES (:name, :desc, :tags, :user)
RETURNING experiment_id
'''), {
'name': name,
'desc': description,
'tags': json.dumps(tags or {}),
'user': 'system'
})
conn.commit()
exp_id = result.fetchone()[0]
print(f"✅ 实验创建成功: {name} (ID: {exp_id})")
return exp_id
def start_run(self, experiment_id: int, run_name: str = None) -> str:
"""开始运行"""
import uuid
run_id = uuid.uuid4().hex[:16] # 切片操作:[start:end:step]提取子序列
with self.engine.connect() as conn:
conn.execute(text('''
INSERT INTO runs (run_id, experiment_id, tags)
VALUES (:run_id, :exp_id, :tags)
'''), {
'run_id': run_id,
'exp_id': experiment_id,
'tags': json.dumps({'mlflow.runName': run_name} if run_name else {})
})
conn.commit()
print(f"▶️ 运行开始: {run_id}")
return run_id
def log_param(self, run_id: str, key: str, value: any):
"""记录参数"""
with self.engine.connect() as conn:
conn.execute(text('''
INSERT INTO run_params (run_id, param_key, param_value)
VALUES (:run_id, :key, :value)
ON CONFLICT (run_id, param_key) DO UPDATE
SET param_value = EXCLUDED.param_value
'''), {
'run_id': run_id,
'key': key,
'value': str(value)
})
conn.commit()
def log_metric(self, run_id: str, key: str, value: float, step: int = None):
"""记录指标"""
with self.engine.connect() as conn:
conn.execute(text('''
INSERT INTO run_metrics (run_id, metric_key, metric_value, step)
VALUES (:run_id, :key, :value, :step)
'''), {
'run_id': run_id,
'key': key,
'value': value,
'step': step
})
conn.commit()
def log_artifact(self, run_id: str, local_path: str, artifact_path: str = None):
"""记录Artifact"""
artifact_name = artifact_path or os.path.basename(local_path)
storage_path = f"artifacts/{run_id}/{artifact_name}"
# 上传到存储
if os.path.isfile(local_path):
self.s3_client.upload_file(local_path, self.bucket, storage_path)
size_bytes = os.path.getsize(local_path)
else:
# 上传目录
for root, dirs, files in os.walk(local_path):
for file in files:
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, local_path)
s3_path = f"{storage_path}/{rel_path}"
self.s3_client.upload_file(file_path, self.bucket, s3_path)
size_bytes = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(local_path)
for filename in filenames
)
# 记录到数据库
with self.engine.connect() as conn:
conn.execute(text('''
INSERT INTO run_artifacts (run_id, artifact_name, artifact_type, storage_path, size_bytes)
VALUES (:run_id, :name, :type, :path, :size)
'''), {
'run_id': run_id,
'name': artifact_name,
'type': 'file',
'path': storage_path,
'size': size_bytes
})
conn.commit()
print(f"📎 Artifact已记录: {artifact_name}")
def end_run(self, run_id: str, status: str = "COMPLETED"):
"""结束运行"""
with self.engine.connect() as conn:
conn.execute(text('''
UPDATE runs
SET status = :status, end_time = NOW()
WHERE run_id = :run_id
'''), {'status': status, 'run_id': run_id})
conn.commit()
print(f"✅ 运行结束: {run_id} ({status})")
# ==================== 模型管理 ====================
def register_model(self,
name: str,
run_id: str,
model_path: str,
description: str = "",
tags: Dict = None) -> Dict:
"""注册模型"""
# 确保模型存在
with self.engine.connect() as conn:
result = conn.execute(text('''
SELECT 1 FROM registered_models WHERE name = :name
'''), {'name': name})
if not result.fetchone():
conn.execute(text('''
INSERT INTO registered_models (name, description, tags)
VALUES (:name, :desc, :tags)
'''), {
'name': name,
'desc': description,
'tags': json.dumps(tags or {})
})
# 获取版本号
result = conn.execute(text('''
SELECT COALESCE(MAX(version), 0) + 1
FROM model_versions mv
JOIN registered_models rm ON mv.model_id = rm.model_id
WHERE rm.name = :name
'''), {'name': name})
version = result.fetchone()[0]
# 上传模型
model_storage_path = f"models/{name}/{version}/model"
if os.path.isdir(model_path):
for root, dirs, files in os.walk(model_path):
for file in files:
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, model_path)
s3_path = f"{model_storage_path}/{rel_path}"
self.s3_client.upload_file(file_path, self.bucket, s3_path)
else:
self.s3_client.upload_file(model_path, self.bucket, model_storage_path)
# 记录版本
result = conn.execute(text('''
INSERT INTO model_versions
(model_id, version, run_id, storage_path, model_format, training_metrics)
SELECT model_id, :version, :run_id, :path, :format, :metrics
FROM registered_models WHERE name = :name
RETURNING version_id
'''), {
'name': name,
'version': version,
'run_id': run_id,
'path': model_storage_path,
'format': 'sklearn',
'metrics': '{}'
})
conn.commit()
version_id = result.fetchone()[0]
print(f"✅ 模型注册成功: {name} v{version}")
return {'model_name': name, 'version': version, 'version_id': version_id}
def transition_model_stage(self, name: str, version: int, stage: str):
"""转换模型阶段"""
with self.engine.connect() as conn:
# 如果要设置为Production,降级其他Production版本
if stage == 'PRODUCTION':
conn.execute(text('''
UPDATE model_versions mv
SET status = 'ARCHIVED'
FROM registered_models rm
WHERE mv.model_id = rm.model_id
AND rm.name = :name
AND mv.status = 'PRODUCTION'
'''), {'name': name})
conn.execute(text('''
UPDATE model_versions mv
SET status = :stage
FROM registered_models rm
WHERE mv.model_id = rm.model_id
AND rm.name = :name
AND mv.version = :version
'''), {'name': name, 'version': version, 'stage': stage})
conn.commit()
print(f"🔄 模型 {name} v{version} 阶段变更为: {stage}")
def load_model(self, name: str, version: int = None, stage: str = None):
"""加载模型"""
with self.engine.connect() as conn:
if version:
result = conn.execute(text('''
SELECT mv.storage_path, mv.model_format
FROM model_versions mv
JOIN registered_models rm ON mv.model_id = rm.model_id
WHERE rm.name = :name AND mv.version = :version
'''), {'name': name, 'version': version})
elif stage:
result = conn.execute(text('''
SELECT mv.storage_path, mv.model_format
FROM model_versions mv
JOIN registered_models rm ON mv.model_id = rm.model_id
WHERE rm.name = :name AND mv.status = :stage
ORDER BY mv.version DESC LIMIT 1
'''), {'name': name, 'stage': stage})
else:
result = conn.execute(text('''
SELECT mv.storage_path, mv.model_format
FROM model_versions mv
JOIN registered_models rm ON mv.model_id = rm.model_id
WHERE rm.name = :name
ORDER BY mv.version DESC LIMIT 1
'''), {'name': name})
row = result.fetchone()
if not row:
raise ValueError(f"模型不存在: {name}")
# 下载模型
local_path = f"/tmp/models/{name}"
os.makedirs(local_path, exist_ok=True)
# 列出并下载所有文件
response = self.s3_client.list_objects_v2(
Bucket=self.bucket,
Prefix=row.storage_path
)
for obj in response.get('Contents', []):
rel_path = os.path.relpath(obj['Key'], row.storage_path)
file_path = os.path.join(local_path, rel_path)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
self.s3_client.download_file(self.bucket, obj['Key'], file_path)
# 加载模型
import joblib
model_file = os.path.join(local_path, 'model.pkl')
return joblib.load(model_file)
# 使用示例
mlops = MLOpsPlatform(
db_url='postgresql://user:pass@localhost/mlops',
storage_endpoint='http://localhost:9000',
storage_bucket='mlops-storage'
)
# 注册数据集
dataset_id = mlops.register_dataset(
name='customer_churn',
version='1.0.0',
local_path='data/customer_churn.csv',
description='客户流失预测数据集',
tags={'domain': 'finance', 'type': 'classification'}
)
# 创建实验并记录运行
exp_id = mlops.create_experiment('churn_prediction', '客户流失预测模型')
run_id = mlops.start_run(exp_id, run_name='xgboost_baseline')
# 记录参数和指标
mlops.log_param(run_id, 'model', 'xgboost')
mlops.log_param(run_id, 'n_estimators', 100)
mlops.log_param(run_id, 'max_depth', 6)
for epoch in range(10):
mlops.log_metric(run_id, 'train_auc', 0.8 + epoch * 0.02, step=epoch)
mlops.log_metric(run_id, 'val_auc', 0.78 + epoch * 0.015, step=epoch)
mlops.log_metric(run_id, 'test_auc', 0.95)
# 保存模型并注册
mlops.log_artifact(run_id, 'model.pkl')
model_info = mlops.register_model(
name='churn_predictor',
run_id=run_id,
model_path='model.pkl',
description='XGBoost客户流失预测模型'
)
# 部署到生产
mlops.transition_model_stage('churn_predictor', model_info['version'], 'PRODUCTION')
mlops.end_run(run_id)
4. 本章小结¶
三个项目的技术对比¶
| 维度 | 项目一:电商分析 | 项目二:推荐系统 | 项目三:MLOps平台 |
|---|---|---|---|
| 数据库 | MySQL + ClickHouse + Redis | PostgreSQL + pgvector + Redis | PostgreSQL + MinIO |
| 数据规模 | TB级 | GB级 | 可变 |
| 查询类型 | OLAP分析 | 向量搜索 + 实时查询 | 元数据管理 |
| 延迟要求 | 秒级 | 毫秒级 | 秒级 |
| 核心挑战 | 大数据处理 | 向量检索性能 | 版本管理 |
学习检查清单¶
Markdown
## 完成本教程后,你应该能够:
### 基础能力
- [ ] 设计符合范式要求的数据库schema
- [ ] 编写高效的SQL查询
- [ ] 使用Python操作关系型数据库
- [ ] 理解索引原理并能优化查询
### 进阶能力
- [ ] 设计分布式数据系统架构
- [ ] 实现读写分离和数据同步
- [ ] 使用NoSQL数据库(MongoDB、Redis)
- [ ] 处理高并发场景的数据一致性
### 高级能力
- [ ] 构建特征存储系统
- [ ] 使用向量数据库
- [ ] 实现MLOps数据管理
- [ ] 设计AI数据流水线
### 实战能力
- [ ] 独立完成电商分析系统
- [ ] 构建推荐系统数据层
- [ ] 搭建MLOps数据平台
下一步学习建议¶
- 深入学习
- 分布式数据库(TiDB、CockroachDB)
- 数据仓库(Snowflake、BigQuery)
-
流处理(Kafka、Flink)
-
实践项目
- 参与开源项目
- 构建个人项目
-
参加数据竞赛
-
持续学习
- 关注数据库领域新趋势
- 学习云原生数据库
- 了解AI Native数据库
恭喜完成全部十章学习!
你已经掌握了从基础SQL到高级AI应用的数据库完整知识体系。继续实践和探索,将所学知识应用到实际项目中!
参考资源: - Designing Data-Intensive Applications (书籍) - Google Cloud Database Best Practices - AWS Database Blog - MLflow Documentation