跳转至

04 - 实战项目

学习时间: 15-20小时 重要性: ⭐⭐⭐⭐⭐ 综合应用所学知识


🎯 学习目标

  • 完成端到端的深度学习项目
  • 掌握实际工程中的最佳实践
  • 学会模型训练和调优
  • 了解模型部署流程

📚 项目列表

  1. 项目1: 图像分类(PyTorch)
  2. 项目2: 文本分类(Hugging Face)
  3. 项目3: 端到端ML项目(scikit-learn)

项目1: 图像分类(PyTorch)

项目概述

使用PyTorch构建一个CNN模型,在CIFAR-10数据集上进行图像分类。

技术栈: PyTorch, torchvision, tensorboard 预期准确率: 85%+

完整代码

Python
# train_cifar10.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import torchvision
import torchvision.transforms as transforms
from tqdm import tqdm
import os

# 设置随机种子
torch.manual_seed(42)

# 设备配置
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# 数据增强和预处理
train_transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

# 加载数据集
train_dataset = torchvision.datasets.CIFAR10(
    root='./data', train=True, download=True, transform=train_transform
)
test_dataset = torchvision.datasets.CIFAR10(
    root='./data', train=False, download=True, transform=test_transform
)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# 定义模型(ResNet-18风格)
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = torch.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = torch.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, 3, 1, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = torch.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = torch.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

# 训练函数
def train_epoch(model, loader, criterion, optimizer, scaler, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    pbar = tqdm(loader, desc='Training')
    for inputs, targets in pbar:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()

        # 混合精度训练
        with torch.amp.autocast('cuda'):
            outputs = model(inputs)
            loss = criterion(outputs, targets)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()
        _, predicted = outputs.max(1)  # _忽略最大值,predicted取索引(预测类别)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()  # 逐元素比较→布尔求和统计正确数→.item()转Python int

        pbar.set_postfix({'loss': loss.item(), 'acc': 100. * correct / total})

    return running_loss / len(loader), 100. * correct / total

# 验证函数
@torch.no_grad()
def validate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in tqdm(loader, desc='Validating'):
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    return running_loss / len(loader), 100. * correct / total

# 主训练循环
def main():
    # 创建模型
    model = ResNet18().to(device)

    # 损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)
    scaler = torch.amp.GradScaler('cuda')

    # TensorBoard
    writer = SummaryWriter('runs/cifar10_experiment')

    # 训练
    best_acc = 0.0
    num_epochs = 200

    for epoch in range(num_epochs):
        print(f'\nEpoch: {epoch+1}/{num_epochs}')

        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, scaler, device)
        val_loss, val_acc = validate(model, test_loader, criterion, device)

        scheduler.step()

        # 记录到TensorBoard
        writer.add_scalar('Loss/train', train_loss, epoch)
        writer.add_scalar('Loss/val', val_loss, epoch)
        writer.add_scalar('Accuracy/train', train_acc, epoch)
        writer.add_scalar('Accuracy/val', val_acc, epoch)
        writer.add_scalar('Learning_rate', optimizer.param_groups[0]['lr'], epoch)

        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')

        # 保存最佳模型
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'best_acc': best_acc,
            }, 'best_model.pth')
            print(f'Model saved with accuracy: {best_acc:.2f}%')

    writer.close()
    print(f'\nBest accuracy: {best_acc:.2f}%')

if __name__ == '__main__':
    main()

项目要点

  1. 数据增强: RandomCrop, RandomHorizontalFlip, ColorJitter
  2. 模型架构: ResNet-18,包含残差连接
  3. 训练技巧: 混合精度训练、学习率调度、权重衰减
  4. 监控: TensorBoard记录训练过程
  5. 模型保存: 保存最佳模型和训练状态

项目2: 文本分类(Hugging Face)

项目概述

使用BERT模型进行情感分析,在IMDb电影评论数据集上进行二分类。

技术栈: Transformers, datasets, PyTorch 预期准确率: 90%+

完整代码

Python
# bert_sentiment.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import AdamW
from transformers import (
    BertTokenizer, BertForSequenceClassification,
    get_linear_schedule_with_warmup
)
from datasets import load_dataset
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, classification_report

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# 加载数据集
print("Loading dataset...")
dataset = load_dataset('imdb')

# 加载tokenizer和模型
model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name, num_labels=2)
model.to(device)

# 数据预处理
def preprocess_function(examples):
    return tokenizer(
        examples['text'],
        truncation=True,
        padding='max_length',
        max_length=512
    )

print("Preprocessing data...")
encoded_dataset = dataset.map(preprocess_function, batched=True)
encoded_dataset = encoded_dataset.remove_columns(['text'])
encoded_dataset = encoded_dataset.rename_column('label', 'labels')
encoded_dataset.set_format('torch')

# 创建DataLoader
train_loader = DataLoader(
    encoded_dataset['train'].shuffle(seed=42).select(range(5000)),
    batch_size=16,
    shuffle=True
)
val_loader = DataLoader(
    encoded_dataset['test'].shuffle(seed=42).select(range(1000)),
    batch_size=16
)

# 训练设置
optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=0.01)
num_epochs = 3
num_training_steps = num_epochs * len(train_loader)
lr_scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=num_training_steps
)

# 训练函数
def train_epoch(model, dataloader, optimizer, scheduler, device):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []

    progress_bar = tqdm(dataloader, desc='Training')
    for batch in progress_bar:
        batch = {k: v.to(device) for k, v in batch.items()}  # 字典推导式:将batch中每个张量值移到GPU/CPU设备

        outputs = model(**batch)  # **解包字典为关键字参数传入模型
        loss = outputs.loss

        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

        total_loss += loss.item()
        preds = torch.argmax(outputs.logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch['labels'].cpu().numpy())

        progress_bar.set_postfix({'loss': loss.item()})

    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(all_labels, all_preds)
    return avg_loss, accuracy

# 验证函数
@torch.no_grad()
def evaluate(model, dataloader, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    for batch in tqdm(dataloader, desc='Evaluating'):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)

        total_loss += outputs.loss.item()
        preds = torch.argmax(outputs.logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch['labels'].cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(all_labels, all_preds)
    return avg_loss, accuracy, all_labels, all_preds

# 训练循环
print("\nStarting training...")
best_accuracy = 0

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch+1}/{num_epochs}")

    train_loss, train_acc = train_epoch(model, train_loader, optimizer, lr_scheduler, device)
    val_loss, val_acc, val_labels, val_preds = evaluate(model, val_loader, device)

    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    print("\nClassification Report:")
    print(classification_report(val_labels, val_preds, target_names=['Negative', 'Positive']))

    # 保存最佳模型
    if val_acc > best_accuracy:
        best_accuracy = val_acc
        torch.save(model.state_dict(), 'best_bert_model.pth')
        print(f"Best model saved with accuracy: {best_accuracy:.4f}")

print(f"\nBest validation accuracy: {best_accuracy:.4f}")

# 预测函数
def predict_sentiment(text, model, tokenizer, device):
    model.eval()
    inputs = tokenizer(
        text,
        return_tensors='pt',
        truncation=True,
        padding=True,
        max_length=512
    ).to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        probs = torch.softmax(outputs.logits, dim=-1)
        prediction = torch.argmax(probs, dim=-1).item()
        confidence = probs[0][prediction].item()

    sentiment = 'Positive' if prediction == 1 else 'Negative'
    return sentiment, confidence

# 测试预测
test_texts = [
    "This movie was absolutely fantastic! I loved every minute of it.",
    "Terrible waste of time. The plot was boring and predictable.",
    "An okay movie, nothing special but not bad either."
]

print("\nTest predictions:")
for text in test_texts:
    sentiment, confidence = predict_sentiment(text, model, tokenizer, device)
    print(f"Text: {text[:50]}...")
    print(f"Sentiment: {sentiment} (confidence: {confidence:.4f})\n")

项目要点

  1. 预训练模型: 使用BERT-base进行迁移学习
  2. 数据处理: 使用datasets库和tokenizer进行预处理
  3. 微调策略: 使用较小的学习率(2e-5)进行微调
  4. 评估: 使用accuracy和classification_report
  5. 推理: 封装预测函数用于实际应用

项目3: 端到端ML项目(scikit-learn)

项目概述

完成一个完整的数据科学项目,包括数据探索、特征工程、模型训练、调优和评估。

技术栈: scikit-learn, pandas, matplotlib, seaborn 数据集: 使用Kaggle的Titanic数据集或自定义数据集

完整代码

Python
# ml_pipeline.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
import joblib
import warnings
warnings.filterwarnings('ignore')

# 设置样式
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

# 1. 数据加载和探索
print("="*50)
print("1. 数据加载和探索")
print("="*50)

# 加载数据(这里使用示例数据,实际使用时替换为真实数据)
from sklearn.datasets import load_breast_cancer
data = load_breast_cancer()
X = pd.DataFrame(data.data, columns=data.feature_names)
y = data.target

print(f"Dataset shape: {X.shape}")
print(f"\nFirst 5 rows:")
print(X.head())
print(f"\nTarget distribution:")
print(pd.Series(y).value_counts())

# 数据探索
print("\nData info:")
print(X.info())
print("\nData description:")
print(X.describe())

# 检查缺失值
print(f"\nMissing values:\n{X.isnull().sum()}")

# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 目标分布
axes[0, 0].pie(pd.Series(y).value_counts(), labels=['Malignant', 'Benign'], autopct='%1.1f%%')
axes[0, 0].set_title('Target Distribution')

# 特征分布(前4个特征)
for i, feature in enumerate(X.columns[:4]):
    row, col = (i+1) // 2, (i+1) % 2
    axes[row, col].hist(X[feature], bins=30, alpha=0.7)
    axes[row, col].set_title(f'Distribution of {feature}')
    axes[row, col].set_xlabel(feature)
    axes[row, col].set_ylabel('Frequency')

plt.tight_layout()
plt.savefig('data_exploration.png')
print("\nData exploration plots saved to 'data_exploration.png'")

# 2. 特征工程
print("\n" + "="*50)
print("2. 特征工程")
print("="*50)

# 分离数值特征
numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
print(f"Numeric features: {len(numeric_features)}")

# 创建预处理管道
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features)
    ]
)

# 3. 数据分割
print("\n" + "="*50)
print("3. 数据分割")
print("="*50)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Training set size: {X_train.shape[0]}")
print(f"Test set size: {X_test.shape[0]}")

# 4. 模型选择和训练
print("\n" + "="*50)
print("4. 模型选择和训练")
print("="*50)

# 定义模型
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(random_state=42),
    'SVM': SVC(random_state=42)
}

# 交叉验证
print("\nCross-validation scores:")
cv_results = {}
for name, model in models.items():
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
    cv_results[name] = scores
    print(f"{name}: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")

# 选择最佳模型
best_model_name = max(cv_results, key=lambda x: cv_results[x].mean())
print(f"\nBest model: {best_model_name}")

# 5. 超参数调优
print("\n" + "="*50)
print("5. 超参数调优")
print("="*50)

# 定义参数网格
param_grids = {
    'Random Forest': {
        'classifier__n_estimators': [50, 100, 200],
        'classifier__max_depth': [3, 5, 7, None],
        'classifier__min_samples_split': [2, 5, 10]
    },
    'Gradient Boosting': {
        'classifier__n_estimators': [50, 100, 200],
        'classifier__learning_rate': [0.01, 0.1, 0.2],
        'classifier__max_depth': [3, 5, 7]
    },
    'SVM': {
        'classifier__C': [0.1, 1, 10],
        'classifier__kernel': ['rbf', 'linear'],
        'classifier__gamma': ['scale', 'auto']
    },
    'Logistic Regression': {
        'classifier__C': [0.1, 1, 10],
        'classifier__penalty': ['l1', 'l2']
    }
}

# 对最佳模型进行调优
best_model = models[best_model_name]
param_grid = param_grids[best_model_name]

pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', best_model)
])

grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)

print(f"\nTuning {best_model_name}...")
grid_search.fit(X_train, y_train)

print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best cross-validation score: {grid_search.best_score_:.4f}")

# 6. 模型评估
print("\n" + "="*50)
print("6. 模型评估")
print("="*50)

# 在测试集上评估
best_pipeline = grid_search.best_estimator_
y_pred = best_pipeline.predict(X_test)
y_pred_proba = best_pipeline.predict_proba(X_test)[:, 1]  # [:, 1] 取所有行的第1列,即正类的预测概率

print(f"\nTest Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=['Malignant', 'Benign']))

# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Malignant', 'Benign'],
            yticklabels=['Malignant', 'Benign'])
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.savefig('confusion_matrix.png')
print("\nConfusion matrix saved to 'confusion_matrix.png'")

# 7. 特征重要性
print("\n" + "="*50)
print("7. 特征重要性")
print("="*50)

if hasattr(best_pipeline.named_steps['classifier'], 'feature_importances_'):
    importances = best_pipeline.named_steps['classifier'].feature_importances_
    feature_names = numeric_features

    # 排序
    indices = np.argsort(importances)[::-1][:10]  # Top 10

    plt.figure(figsize=(10, 6))
    plt.bar(range(10), importances[indices])
    plt.xticks(range(10), [feature_names[i] for i in indices], rotation=45, ha='right')
    plt.title('Top 10 Feature Importances')
    plt.tight_layout()
    plt.savefig('feature_importance.png')
    print("\nFeature importance plot saved to 'feature_importance.png'")

    print("\nTop 10 important features:")
    for i in indices:
        print(f"{feature_names[i]}: {importances[i]:.4f}")

# 8. 保存模型
print("\n" + "="*50)
print("8. 保存模型")
print("="*50)

joblib.dump(best_pipeline, 'best_model.pkl')
print("Model saved to 'best_model.pkl'")

# 保存预处理器
joblib.dump(preprocessor, 'preprocessor.pkl')
print("Preprocessor saved to 'preprocessor.pkl'")

# 9. 预测函数
print("\n" + "="*50)
print("9. 预测函数")
print("="*50)

def predict_sample(sample, model_path='best_model.pkl'):
    """预测样本"""
    model = joblib.load(model_path)
    predictions = model.predict(sample)
    probabilities = model.predict_proba(sample)
    return predictions, probabilities

# 测试预测
sample = X_test.iloc[:5]
predictions, probabilities = predict_sample(sample)
print("\nSample predictions:")
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):  # zip()将多个可迭代对象按位置配对
    print(f"Sample {i+1}: {'Benign' if pred == 1 else 'Malignant'} (probability: {prob[pred]:.4f})")

print("\n" + "="*50)
print("Pipeline completed successfully!")
print("="*50)

项目要点

  1. 数据探索: 使用pandas和可视化了解数据分布
  2. 特征工程: 使用Pipeline和ColumnTransformer进行预处理
  3. 模型选择: 交叉验证比较多个模型
  4. 超参数调优: 使用GridSearchCV寻找最佳参数
  5. 模型评估: 多维度评估模型性能
  6. 可解释性: 分析特征重要性
  7. 部署准备: 保存模型和预处理器

🎯 完成标准

完成所有项目后,你应该能够:

  • 独立完成深度学习项目的全流程
  • 使用PyTorch构建和训练CNN模型
  • 使用Hugging Face进行NLP任务
  • 使用scikit-learn完成ML项目
  • 掌握模型调优和评估方法
  • 了解模型部署的基本流程

💡 进阶建议

  1. 尝试不同架构: ResNet, DenseNet, EfficientNet
  2. 数据增强策略: AutoAugment, Mixup, CutMix
  3. 模型集成: 多模型投票、堆叠
  4. 超参数优化: Optuna, Ray Tune
  5. 模型部署: Docker, FastAPI, ONNX

📚 参考资源


恭喜完成阶段4的所有内容!

你已经掌握了: - PyTorch深度学习框架 - scikit-learn机器学习 - Hugging Face NLP工具 - 实际项目开发经验

下一步: 进入阶段5: 工程最佳实践