跳转至

项目5: 完整ML项目

难度: ⭐⭐⭐⭐⭐ 高级 时间: 8-10小时 涉及知识: 全流程ML工程、MLOps、部署


🎯 项目目标

完成一个生产级的机器学习项目: 1. 端到端的数据处理流程 2. 模型训练和调优 3. 模型评估和解释 4. API服务化 5. 容器化部署


📋 项目架构

Text Only
ml_project/
├── data/                   # 数据目录
│   ├── raw/               # 原始数据
│   ├── processed/         # 处理后数据
│   └── external/          # 外部数据
├── models/                # 模型文件
├── notebooks/             # 探索性分析
├── src/                   # 源代码
│   ├── __init__.py
│   ├── data/              # 数据处理
│   ├── features/          # 特征工程
│   ├── models/            # 模型定义
│   └── api/               # API服务
├── tests/                 # 测试代码
├── configs/               # 配置文件
├── Dockerfile             # Docker配置
├── requirements.txt       # 依赖
└── README.md             # 项目说明

🚀 实现步骤

步骤1: 项目初始化

Bash
# 创建项目结构
mkdir -p ml_project/{data/{raw,processed,external},models,notebooks,src/{data,features,models,api},tests,configs}
cd ml_project

# 初始化Git
git init

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 安装依赖
pip install pandas numpy scikit-learn fastapi uvicorn jupyter matplotlib seaborn
pip freeze > requirements.txt

步骤2: 数据模块

Python
# src/data/load_data.py
import pandas as pd
from pathlib import Path
import logging

logger = logging.getLogger(__name__)

def load_data(filepath: str) -> pd.DataFrame:
    """加载数据"""
    path = Path(filepath)

    if path.suffix == '.csv':
        df = pd.read_csv(path)
    elif path.suffix in ['.xls', '.xlsx']:
        df = pd.read_excel(path)
    else:
        raise ValueError(f"Unsupported file format: {path.suffix}")

    logger.info(f"Loaded {len(df)} rows from {filepath}")
    return df

def save_data(df: pd.DataFrame, filepath: str):
    """保存数据"""
    path = Path(filepath)
    path.parent.mkdir(parents=True, exist_ok=True)

    if path.suffix == '.csv':
        df.to_csv(path, index=False)
    elif path.suffix in ['.xls', '.xlsx']:
        df.to_excel(path, index=False)

    logger.info(f"Saved data to {filepath}")

# src/data/preprocess.py
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, LabelEncoder
import pickle

class DataPreprocessor:
    def __init__(self):
        self.imputer = SimpleImputer(strategy='mean')
        self.scaler = StandardScaler()
        self.label_encoders = {}
        self.fitted = False

    def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """拟合并转换数据"""
        df = df.copy()

        # 分离数值和分类列
        numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
        categorical_cols = df.select_dtypes(include=['object']).columns

        # 处理数值列
        if len(numeric_cols) > 0:
            df[numeric_cols] = self.imputer.fit_transform(df[numeric_cols])
            df[numeric_cols] = self.scaler.fit_transform(df[numeric_cols])

        # 处理分类列
        for col in categorical_cols:
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col].astype(str))
            self.label_encoders[col] = le

        self.fitted = True
        return df

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """转换新数据"""
        if not self.fitted:
            raise ValueError("Preprocessor not fitted yet")

        df = df.copy()

        numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
        categorical_cols = df.select_dtypes(include=['object']).columns

        if len(numeric_cols) > 0:
            df[numeric_cols] = self.imputer.transform(df[numeric_cols])
            df[numeric_cols] = self.scaler.transform(df[numeric_cols])

        for col in categorical_cols:
            if col in self.label_encoders:
                df[col] = self.label_encoders[col].transform(df[col].astype(str))

        return df

    def save(self, filepath: str):
        """保存预处理器"""
        with open(filepath, 'wb') as f:
            pickle.dump({
                'imputer': self.imputer,
                'scaler': self.scaler,
                'label_encoders': self.label_encoders
            }, f)

    def load(self, filepath: str):
        """加载预处理器"""
        with open(filepath, 'rb') as f:
            data = pickle.load(f)
            self.imputer = data['imputer']
            self.scaler = data['scaler']
            self.label_encoders = data['label_encoders']
            self.fitted = True

步骤3: 模型模块

Python
# src/models/train.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.metrics import mean_squared_error, r2_score
import pickle
import logging

logger = logging.getLogger(__name__)

class ModelTrainer:
    def __init__(self, task_type='classification'):
        self.task_type = task_type
        self.model = None
        self.best_params = None

    def get_model(self, model_name: str):
        """获取模型实例"""
        if self.task_type == 'classification':
            models = {
                'random_forest': RandomForestClassifier(random_state=42),
                'logistic_regression': LogisticRegression(random_state=42)
            }
        else:
            models = {
                'random_forest': RandomForestRegressor(random_state=42),
                'linear_regression': LinearRegression()
            }

        return models.get(model_name)

    def train(self, X_train, y_train, model_name='random_forest', tune_hyperparams=False):
        """训练模型"""
        logger.info(f"Training {model_name} model...")

        model = self.get_model(model_name)

        if tune_hyperparams and model_name == 'random_forest':
            param_grid = {
                'n_estimators': [50, 100, 200],
                'max_depth': [None, 10, 20],
                'min_samples_split': [2, 5, 10]
            }

            grid_search = GridSearchCV(
                model, param_grid,
                cv=5, scoring='accuracy' if self.task_type == 'classification' else 'r2',
                n_jobs=-1
            )
            grid_search.fit(X_train, y_train)

            self.model = grid_search.best_estimator_
            self.best_params = grid_search.best_params_
            logger.info(f"Best parameters: {self.best_params}")
        else:
            model.fit(X_train, y_train)
            self.model = model

        logger.info("Training completed")

    def evaluate(self, X_test, y_test):
        """评估模型"""
        predictions = self.model.predict(X_test)

        if self.task_type == 'classification':
            accuracy = accuracy_score(y_test, predictions)
            logger.info(f"Accuracy: {accuracy:.4f}")
            logger.info("\nClassification Report:")
            logger.info(classification_report(y_test, predictions))
            return {'accuracy': accuracy}
        else:
            mse = mean_squared_error(y_test, predictions)
            r2 = r2_score(y_test, predictions)
            logger.info(f"MSE: {mse:.4f}")
            logger.info(f"R2 Score: {r2:.4f}")
            return {'mse': mse, 'r2': r2}

    def save_model(self, filepath: str):
        """保存模型"""
        with open(filepath, 'wb') as f:
            pickle.dump({
                'model': self.model,
                'task_type': self.task_type,
                'best_params': self.best_params
            }, f)
        logger.info(f"Model saved to {filepath}")

    def load_model(self, filepath: str):
        """加载模型"""
        with open(filepath, 'rb') as f:
            data = pickle.load(f)
            self.model = data['model']
            self.task_type = data['task_type']
            self.best_params = data.get('best_params')
        logger.info(f"Model loaded from {filepath}")

步骤4: API服务

Python
# src/api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
import sys
from pathlib import Path

# 添加src到路径
sys.path.append(str(Path(__file__).parent.parent))

from data.preprocess import DataPreprocessor
import pickle

app = FastAPI(
    title="ML Prediction API",
    description="API for ML model predictions",
    version="1.0.0"
)

# 加载模型和预处理器
try:
    with open('models/model.pkl', 'rb') as f:
        model_data = pickle.load(f)
        model = model_data['model']

    preprocessor = DataPreprocessor()
    preprocessor.load('models/preprocessor.pkl')

    print("Model and preprocessor loaded successfully")
except Exception as e:
    print(f"Error loading model: {e}")
    model = None
    preprocessor = None

class PredictionRequest(BaseModel):
    data: list[dict]

class PredictionResponse(BaseModel):
    predictions: list
    model_version: str = "1.0.0"

@app.get("/")
def root():
    return {
        "message": "ML Prediction API",
        "status": "ready" if model else "not loaded",
        "docs": "/docs"
    }

@app.get("/health")
def health_check():
    return {
        "status": "healthy",
        "model_loaded": model is not None
    }

@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest):
    """Make predictions"""
    if model is None or preprocessor is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    try:
        # 转换为DataFrame
        df = pd.DataFrame(request.data)

        # 预处理
        df_processed = preprocessor.transform(df)

        # 预测
        predictions = model.predict(df_processed)

        return PredictionResponse(predictions=predictions.tolist())

    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/predict_single")
def predict_single(features: dict):
    """预测单条数据"""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    try:
        df = pd.DataFrame([features])
        df_processed = preprocessor.transform(df)
        prediction = model.predict(df_processed)[0]

        return {"prediction": prediction}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

步骤5: 训练脚本

Python
# train.py
import argparse
import logging
from pathlib import Path
import sys

sys.path.append('src')

from data.load_data import load_data
from data.preprocess import DataPreprocessor
from models.train import ModelTrainer
from sklearn.model_selection import train_test_split

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def main():
    parser = argparse.ArgumentParser(description='Train ML model')
    parser.add_argument('--data', required=True, help='Path to training data')
    parser.add_argument('--target', required=True, help='Target column name')
    parser.add_argument('--model', default='random_forest', help='Model type')
    parser.add_argument('--tune', action='store_true', help='Tune hyperparameters')
    parser.add_argument('--output', default='models', help='Output directory')

    args = parser.parse_args()

    # 创建输出目录
    output_dir = Path(args.output)
    output_dir.mkdir(parents=True, exist_ok=True)

    # 加载数据
    logger.info(f"Loading data from {args.data}")
    df = load_data(args.data)

    # 分离特征和目标
    X = df.drop(columns=[args.target])
    y = df[args.target]

    # 判断任务类型
    task_type = 'classification' if y.dtype == 'object' or y.nunique() < 10 else 'regression'
    logger.info(f"Task type: {task_type}")

    # 分割数据
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # 预处理
    logger.info("Preprocessing data...")
    preprocessor = DataPreprocessor()
    X_train_processed = preprocessor.fit_transform(X_train)
    X_test_processed = preprocessor.transform(X_test)

    # 保存预处理器
    preprocessor.save(output_dir / 'preprocessor.pkl')

    # 训练模型
    trainer = ModelTrainer(task_type=task_type)
    trainer.train(X_train_processed, y_train, model_name=args.model, tune_hyperparams=args.tune)

    # 评估
    logger.info("Evaluating model...")
    metrics = trainer.evaluate(X_test_processed, y_test)

    # 保存模型
    trainer.save_model(output_dir / 'model.pkl')

    logger.info("Training completed!")
    logger.info(f"Model saved to {output_dir / 'model.pkl'}")

if __name__ == '__main__':
    main()

步骤6: Docker配置

Docker
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY src/ ./src/
COPY models/ ./models/

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
YAML
# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./models:/app/models
    environment:
      - PYTHONPATH=/app

🎯 完成标准

  • 完整的项目结构和模块化代码
  • 数据处理、特征工程、模型训练分离
  • 模型和预处理器可持久化
  • REST API可访问
  • Docker容器化
  • 有适当的日志和错误处理
  • 包含README文档

💡 最佳实践

  1. 版本控制 - 使用Git管理代码,DVC管理数据
  2. 配置管理 - 使用YAML/JSON配置文件
  3. 日志记录 - 详细记录训练和预测日志
  4. 测试覆盖 - 为核心功能编写单元测试
  5. 文档 - 使用docstrings和README

📚 参考资源


🎉 恭喜完成所有项目!

你已经完成了从Python基础到完整ML项目的学习!

下一步建议: 1. 完善项目文档 2. 部署到云平台(AWS/GCP/Azure) 3. 设置CI/CD流水线 4. 参与开源项目 5. 持续学习和实践

记住: 最好的学习方式是构建项目!