项目5: 完整ML项目¶
难度: ⭐⭐⭐⭐⭐ 高级 时间: 8-10小时 涉及知识: 全流程ML工程、MLOps、部署
🎯 项目目标¶
完成一个生产级的机器学习项目: 1. 端到端的数据处理流程 2. 模型训练和调优 3. 模型评估和解释 4. API服务化 5. 容器化部署
📋 项目架构¶
Text Only
ml_project/
├── data/ # 数据目录
│ ├── raw/ # 原始数据
│ ├── processed/ # 处理后数据
│ └── external/ # 外部数据
├── models/ # 模型文件
├── notebooks/ # 探索性分析
├── src/ # 源代码
│ ├── __init__.py
│ ├── data/ # 数据处理
│ ├── features/ # 特征工程
│ ├── models/ # 模型定义
│ └── api/ # API服务
├── tests/ # 测试代码
├── configs/ # 配置文件
├── Dockerfile # Docker配置
├── requirements.txt # 依赖
└── README.md # 项目说明
🚀 实现步骤¶
步骤1: 项目初始化¶
Bash
# 创建项目结构
mkdir -p ml_project/{data/{raw,processed,external},models,notebooks,src/{data,features,models,api},tests,configs}
cd ml_project
# 初始化Git
git init
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# 安装依赖
pip install pandas numpy scikit-learn fastapi uvicorn jupyter matplotlib seaborn
pip freeze > requirements.txt
步骤2: 数据模块¶
Python
# src/data/load_data.py
import pandas as pd
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def load_data(filepath: str) -> pd.DataFrame:
"""加载数据"""
path = Path(filepath)
if path.suffix == '.csv':
df = pd.read_csv(path)
elif path.suffix in ['.xls', '.xlsx']:
df = pd.read_excel(path)
else:
raise ValueError(f"Unsupported file format: {path.suffix}")
logger.info(f"Loaded {len(df)} rows from {filepath}")
return df
def save_data(df: pd.DataFrame, filepath: str):
"""保存数据"""
path = Path(filepath)
path.parent.mkdir(parents=True, exist_ok=True)
if path.suffix == '.csv':
df.to_csv(path, index=False)
elif path.suffix in ['.xls', '.xlsx']:
df.to_excel(path, index=False)
logger.info(f"Saved data to {filepath}")
# src/data/preprocess.py
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, LabelEncoder
import pickle
class DataPreprocessor:
def __init__(self):
self.imputer = SimpleImputer(strategy='mean')
self.scaler = StandardScaler()
self.label_encoders = {}
self.fitted = False
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""拟合并转换数据"""
df = df.copy()
# 分离数值和分类列
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
categorical_cols = df.select_dtypes(include=['object']).columns
# 处理数值列
if len(numeric_cols) > 0:
df[numeric_cols] = self.imputer.fit_transform(df[numeric_cols])
df[numeric_cols] = self.scaler.fit_transform(df[numeric_cols])
# 处理分类列
for col in categorical_cols:
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str))
self.label_encoders[col] = le
self.fitted = True
return df
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""转换新数据"""
if not self.fitted:
raise ValueError("Preprocessor not fitted yet")
df = df.copy()
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
categorical_cols = df.select_dtypes(include=['object']).columns
if len(numeric_cols) > 0:
df[numeric_cols] = self.imputer.transform(df[numeric_cols])
df[numeric_cols] = self.scaler.transform(df[numeric_cols])
for col in categorical_cols:
if col in self.label_encoders:
df[col] = self.label_encoders[col].transform(df[col].astype(str))
return df
def save(self, filepath: str):
"""保存预处理器"""
with open(filepath, 'wb') as f:
pickle.dump({
'imputer': self.imputer,
'scaler': self.scaler,
'label_encoders': self.label_encoders
}, f)
def load(self, filepath: str):
"""加载预处理器"""
with open(filepath, 'rb') as f:
data = pickle.load(f)
self.imputer = data['imputer']
self.scaler = data['scaler']
self.label_encoders = data['label_encoders']
self.fitted = True
步骤3: 模型模块¶
Python
# src/models/train.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.metrics import mean_squared_error, r2_score
import pickle
import logging
logger = logging.getLogger(__name__)
class ModelTrainer:
def __init__(self, task_type='classification'):
self.task_type = task_type
self.model = None
self.best_params = None
def get_model(self, model_name: str):
"""获取模型实例"""
if self.task_type == 'classification':
models = {
'random_forest': RandomForestClassifier(random_state=42),
'logistic_regression': LogisticRegression(random_state=42)
}
else:
models = {
'random_forest': RandomForestRegressor(random_state=42),
'linear_regression': LinearRegression()
}
return models.get(model_name)
def train(self, X_train, y_train, model_name='random_forest', tune_hyperparams=False):
"""训练模型"""
logger.info(f"Training {model_name} model...")
model = self.get_model(model_name)
if tune_hyperparams and model_name == 'random_forest':
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20],
'min_samples_split': [2, 5, 10]
}
grid_search = GridSearchCV(
model, param_grid,
cv=5, scoring='accuracy' if self.task_type == 'classification' else 'r2',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
self.model = grid_search.best_estimator_
self.best_params = grid_search.best_params_
logger.info(f"Best parameters: {self.best_params}")
else:
model.fit(X_train, y_train)
self.model = model
logger.info("Training completed")
def evaluate(self, X_test, y_test):
"""评估模型"""
predictions = self.model.predict(X_test)
if self.task_type == 'classification':
accuracy = accuracy_score(y_test, predictions)
logger.info(f"Accuracy: {accuracy:.4f}")
logger.info("\nClassification Report:")
logger.info(classification_report(y_test, predictions))
return {'accuracy': accuracy}
else:
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)
logger.info(f"MSE: {mse:.4f}")
logger.info(f"R2 Score: {r2:.4f}")
return {'mse': mse, 'r2': r2}
def save_model(self, filepath: str):
"""保存模型"""
with open(filepath, 'wb') as f:
pickle.dump({
'model': self.model,
'task_type': self.task_type,
'best_params': self.best_params
}, f)
logger.info(f"Model saved to {filepath}")
def load_model(self, filepath: str):
"""加载模型"""
with open(filepath, 'rb') as f:
data = pickle.load(f)
self.model = data['model']
self.task_type = data['task_type']
self.best_params = data.get('best_params')
logger.info(f"Model loaded from {filepath}")
步骤4: API服务¶
Python
# src/api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
import sys
from pathlib import Path
# 添加src到路径
sys.path.append(str(Path(__file__).parent.parent))
from data.preprocess import DataPreprocessor
import pickle
app = FastAPI(
title="ML Prediction API",
description="API for ML model predictions",
version="1.0.0"
)
# 加载模型和预处理器
try:
with open('models/model.pkl', 'rb') as f:
model_data = pickle.load(f)
model = model_data['model']
preprocessor = DataPreprocessor()
preprocessor.load('models/preprocessor.pkl')
print("Model and preprocessor loaded successfully")
except Exception as e:
print(f"Error loading model: {e}")
model = None
preprocessor = None
class PredictionRequest(BaseModel):
data: list[dict]
class PredictionResponse(BaseModel):
predictions: list
model_version: str = "1.0.0"
@app.get("/")
def root():
return {
"message": "ML Prediction API",
"status": "ready" if model else "not loaded",
"docs": "/docs"
}
@app.get("/health")
def health_check():
return {
"status": "healthy",
"model_loaded": model is not None
}
@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest):
"""Make predictions"""
if model is None or preprocessor is None:
raise HTTPException(status_code=503, detail="Model not loaded")
try:
# 转换为DataFrame
df = pd.DataFrame(request.data)
# 预处理
df_processed = preprocessor.transform(df)
# 预测
predictions = model.predict(df_processed)
return PredictionResponse(predictions=predictions.tolist())
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/predict_single")
def predict_single(features: dict):
"""预测单条数据"""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
try:
df = pd.DataFrame([features])
df_processed = preprocessor.transform(df)
prediction = model.predict(df_processed)[0]
return {"prediction": prediction}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
步骤5: 训练脚本¶
Python
# train.py
import argparse
import logging
from pathlib import Path
import sys
sys.path.append('src')
from data.load_data import load_data
from data.preprocess import DataPreprocessor
from models.train import ModelTrainer
from sklearn.model_selection import train_test_split
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description='Train ML model')
parser.add_argument('--data', required=True, help='Path to training data')
parser.add_argument('--target', required=True, help='Target column name')
parser.add_argument('--model', default='random_forest', help='Model type')
parser.add_argument('--tune', action='store_true', help='Tune hyperparameters')
parser.add_argument('--output', default='models', help='Output directory')
args = parser.parse_args()
# 创建输出目录
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
# 加载数据
logger.info(f"Loading data from {args.data}")
df = load_data(args.data)
# 分离特征和目标
X = df.drop(columns=[args.target])
y = df[args.target]
# 判断任务类型
task_type = 'classification' if y.dtype == 'object' or y.nunique() < 10 else 'regression'
logger.info(f"Task type: {task_type}")
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 预处理
logger.info("Preprocessing data...")
preprocessor = DataPreprocessor()
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)
# 保存预处理器
preprocessor.save(output_dir / 'preprocessor.pkl')
# 训练模型
trainer = ModelTrainer(task_type=task_type)
trainer.train(X_train_processed, y_train, model_name=args.model, tune_hyperparams=args.tune)
# 评估
logger.info("Evaluating model...")
metrics = trainer.evaluate(X_test_processed, y_test)
# 保存模型
trainer.save_model(output_dir / 'model.pkl')
logger.info("Training completed!")
logger.info(f"Model saved to {output_dir / 'model.pkl'}")
if __name__ == '__main__':
main()
步骤6: Docker配置¶
Docker
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY src/ ./src/
COPY models/ ./models/
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
YAML
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/models
environment:
- PYTHONPATH=/app
🎯 完成标准¶
- 完整的项目结构和模块化代码
- 数据处理、特征工程、模型训练分离
- 模型和预处理器可持久化
- REST API可访问
- Docker容器化
- 有适当的日志和错误处理
- 包含README文档
💡 最佳实践¶
- 版本控制 - 使用Git管理代码,DVC管理数据
- 配置管理 - 使用YAML/JSON配置文件
- 日志记录 - 详细记录训练和预测日志
- 测试覆盖 - 为核心功能编写单元测试
- 文档 - 使用docstrings和README
📚 参考资源¶
🎉 恭喜完成所有项目!¶
你已经完成了从Python基础到完整ML项目的学习!
下一步建议: 1. 完善项目文档 2. 部署到云平台(AWS/GCP/Azure) 3. 设置CI/CD流水线 4. 参与开源项目 5. 持续学习和实践
记住: 最好的学习方式是构建项目!