04 - 实战项目¶
学习时间: 15-20小时 重要性: ⭐⭐⭐⭐⭐ 综合应用所学知识
🎯 学习目标¶
- 完成端到端的深度学习项目
- 掌握实际工程中的最佳实践
- 学会模型训练和调优
- 了解模型部署流程
📚 项目列表¶
项目1: 图像分类(PyTorch)¶
项目概述¶
使用PyTorch构建一个CNN模型,在CIFAR-10数据集上进行图像分类。
技术栈: PyTorch, torchvision, tensorboard 预期准确率: 85%+
完整代码¶
Python
# train_cifar10.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import torchvision
import torchvision.transforms as transforms
from tqdm import tqdm
import os
# 设置随机种子
torch.manual_seed(42)
# 设备配置
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# 数据增强和预处理
train_transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
test_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
# 加载数据集
train_dataset = torchvision.datasets.CIFAR10(
root='./data', train=True, download=True, transform=train_transform
)
test_dataset = torchvision.datasets.CIFAR10(
root='./data', train=False, download=True, transform=test_transform
)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=2)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
# 定义模型(ResNet-18风格)
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_channels, out_channels, stride=1):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, 1, stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
out = torch.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = torch.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, num_blocks, num_classes=10):
super().__init__()
self.in_channels = 64
self.conv1 = nn.Conv2d(3, 64, 3, 1, 1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.linear = nn.Linear(512 * block.expansion, num_classes)
def _make_layer(self, block, out_channels, num_blocks, stride):
strides = [stride] + [1] * (num_blocks - 1)
layers = []
for stride in strides:
layers.append(block(self.in_channels, out_channels, stride))
self.in_channels = out_channels * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
out = torch.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = torch.avg_pool2d(out, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
def ResNet18():
return ResNet(BasicBlock, [2, 2, 2, 2])
# 训练函数
def train_epoch(model, loader, criterion, optimizer, scaler, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
pbar = tqdm(loader, desc='Training')
for inputs, targets in pbar:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
# 混合精度训练
with torch.amp.autocast('cuda'):
outputs = model(inputs)
loss = criterion(outputs, targets)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
running_loss += loss.item()
_, predicted = outputs.max(1) # _忽略最大值,predicted取索引(预测类别)
total += targets.size(0)
correct += predicted.eq(targets).sum().item() # 逐元素比较→布尔求和统计正确数→.item()转Python int
pbar.set_postfix({'loss': loss.item(), 'acc': 100. * correct / total})
return running_loss / len(loader), 100. * correct / total
# 验证函数
@torch.no_grad()
def validate(model, loader, criterion, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0
for inputs, targets in tqdm(loader, desc='Validating'):
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
loss = criterion(outputs, targets)
running_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
return running_loss / len(loader), 100. * correct / total
# 主训练循环
def main():
# 创建模型
model = ResNet18().to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)
scaler = torch.amp.GradScaler('cuda')
# TensorBoard
writer = SummaryWriter('runs/cifar10_experiment')
# 训练
best_acc = 0.0
num_epochs = 200
for epoch in range(num_epochs):
print(f'\nEpoch: {epoch+1}/{num_epochs}')
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, scaler, device)
val_loss, val_acc = validate(model, test_loader, criterion, device)
scheduler.step()
# 记录到TensorBoard
writer.add_scalar('Loss/train', train_loss, epoch)
writer.add_scalar('Loss/val', val_loss, epoch)
writer.add_scalar('Accuracy/train', train_acc, epoch)
writer.add_scalar('Accuracy/val', val_acc, epoch)
writer.add_scalar('Learning_rate', optimizer.param_groups[0]['lr'], epoch)
print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
# 保存最佳模型
if val_acc > best_acc:
best_acc = val_acc
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'best_acc': best_acc,
}, 'best_model.pth')
print(f'Model saved with accuracy: {best_acc:.2f}%')
writer.close()
print(f'\nBest accuracy: {best_acc:.2f}%')
if __name__ == '__main__':
main()
项目要点¶
- 数据增强: RandomCrop, RandomHorizontalFlip, ColorJitter
- 模型架构: ResNet-18,包含残差连接
- 训练技巧: 混合精度训练、学习率调度、权重衰减
- 监控: TensorBoard记录训练过程
- 模型保存: 保存最佳模型和训练状态
项目2: 文本分类(Hugging Face)¶
项目概述¶
使用BERT模型进行情感分析,在IMDb电影评论数据集上进行二分类。
技术栈: Transformers, datasets, PyTorch 预期准确率: 90%+
完整代码¶
Python
# bert_sentiment.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import AdamW
from transformers import (
BertTokenizer, BertForSequenceClassification,
get_linear_schedule_with_warmup
)
from datasets import load_dataset
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, classification_report
# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# 加载数据集
print("Loading dataset...")
dataset = load_dataset('imdb')
# 加载tokenizer和模型
model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name, num_labels=2)
model.to(device)
# 数据预处理
def preprocess_function(examples):
return tokenizer(
examples['text'],
truncation=True,
padding='max_length',
max_length=512
)
print("Preprocessing data...")
encoded_dataset = dataset.map(preprocess_function, batched=True)
encoded_dataset = encoded_dataset.remove_columns(['text'])
encoded_dataset = encoded_dataset.rename_column('label', 'labels')
encoded_dataset.set_format('torch')
# 创建DataLoader
train_loader = DataLoader(
encoded_dataset['train'].shuffle(seed=42).select(range(5000)),
batch_size=16,
shuffle=True
)
val_loader = DataLoader(
encoded_dataset['test'].shuffle(seed=42).select(range(1000)),
batch_size=16
)
# 训练设置
optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=0.01)
num_epochs = 3
num_training_steps = num_epochs * len(train_loader)
lr_scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=0,
num_training_steps=num_training_steps
)
# 训练函数
def train_epoch(model, dataloader, optimizer, scheduler, device):
model.train()
total_loss = 0
all_preds = []
all_labels = []
progress_bar = tqdm(dataloader, desc='Training')
for batch in progress_bar:
batch = {k: v.to(device) for k, v in batch.items()} # 字典推导式:将batch中每个张量值移到GPU/CPU设备
outputs = model(**batch) # **解包字典为关键字参数传入模型
loss = outputs.loss
loss.backward()
optimizer.step()
scheduler.step()
optimizer.zero_grad()
total_loss += loss.item()
preds = torch.argmax(outputs.logits, dim=-1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(batch['labels'].cpu().numpy())
progress_bar.set_postfix({'loss': loss.item()})
avg_loss = total_loss / len(dataloader)
accuracy = accuracy_score(all_labels, all_preds)
return avg_loss, accuracy
# 验证函数
@torch.no_grad()
def evaluate(model, dataloader, device):
model.eval()
total_loss = 0
all_preds = []
all_labels = []
for batch in tqdm(dataloader, desc='Evaluating'):
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
total_loss += outputs.loss.item()
preds = torch.argmax(outputs.logits, dim=-1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(batch['labels'].cpu().numpy())
avg_loss = total_loss / len(dataloader)
accuracy = accuracy_score(all_labels, all_preds)
return avg_loss, accuracy, all_labels, all_preds
# 训练循环
print("\nStarting training...")
best_accuracy = 0
for epoch in range(num_epochs):
print(f"\nEpoch {epoch+1}/{num_epochs}")
train_loss, train_acc = train_epoch(model, train_loader, optimizer, lr_scheduler, device)
val_loss, val_acc, val_labels, val_preds = evaluate(model, val_loader, device)
print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
print("\nClassification Report:")
print(classification_report(val_labels, val_preds, target_names=['Negative', 'Positive']))
# 保存最佳模型
if val_acc > best_accuracy:
best_accuracy = val_acc
torch.save(model.state_dict(), 'best_bert_model.pth')
print(f"Best model saved with accuracy: {best_accuracy:.4f}")
print(f"\nBest validation accuracy: {best_accuracy:.4f}")
# 预测函数
def predict_sentiment(text, model, tokenizer, device):
model.eval()
inputs = tokenizer(
text,
return_tensors='pt',
truncation=True,
padding=True,
max_length=512
).to(device)
with torch.no_grad():
outputs = model(**inputs)
probs = torch.softmax(outputs.logits, dim=-1)
prediction = torch.argmax(probs, dim=-1).item()
confidence = probs[0][prediction].item()
sentiment = 'Positive' if prediction == 1 else 'Negative'
return sentiment, confidence
# 测试预测
test_texts = [
"This movie was absolutely fantastic! I loved every minute of it.",
"Terrible waste of time. The plot was boring and predictable.",
"An okay movie, nothing special but not bad either."
]
print("\nTest predictions:")
for text in test_texts:
sentiment, confidence = predict_sentiment(text, model, tokenizer, device)
print(f"Text: {text[:50]}...")
print(f"Sentiment: {sentiment} (confidence: {confidence:.4f})\n")
项目要点¶
- 预训练模型: 使用BERT-base进行迁移学习
- 数据处理: 使用datasets库和tokenizer进行预处理
- 微调策略: 使用较小的学习率(2e-5)进行微调
- 评估: 使用accuracy和classification_report
- 推理: 封装预测函数用于实际应用
项目3: 端到端ML项目(scikit-learn)¶
项目概述¶
完成一个完整的数据科学项目,包括数据探索、特征工程、模型训练、调优和评估。
技术栈: scikit-learn, pandas, matplotlib, seaborn 数据集: 使用Kaggle的Titanic数据集或自定义数据集
完整代码¶
Python
# ml_pipeline.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
import joblib
import warnings
warnings.filterwarnings('ignore')
# 设置样式
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
# 1. 数据加载和探索
print("="*50)
print("1. 数据加载和探索")
print("="*50)
# 加载数据(这里使用示例数据,实际使用时替换为真实数据)
from sklearn.datasets import load_breast_cancer
data = load_breast_cancer()
X = pd.DataFrame(data.data, columns=data.feature_names)
y = data.target
print(f"Dataset shape: {X.shape}")
print(f"\nFirst 5 rows:")
print(X.head())
print(f"\nTarget distribution:")
print(pd.Series(y).value_counts())
# 数据探索
print("\nData info:")
print(X.info())
print("\nData description:")
print(X.describe())
# 检查缺失值
print(f"\nMissing values:\n{X.isnull().sum()}")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 目标分布
axes[0, 0].pie(pd.Series(y).value_counts(), labels=['Malignant', 'Benign'], autopct='%1.1f%%')
axes[0, 0].set_title('Target Distribution')
# 特征分布(前4个特征)
for i, feature in enumerate(X.columns[:4]):
row, col = (i+1) // 2, (i+1) % 2
axes[row, col].hist(X[feature], bins=30, alpha=0.7)
axes[row, col].set_title(f'Distribution of {feature}')
axes[row, col].set_xlabel(feature)
axes[row, col].set_ylabel('Frequency')
plt.tight_layout()
plt.savefig('data_exploration.png')
print("\nData exploration plots saved to 'data_exploration.png'")
# 2. 特征工程
print("\n" + "="*50)
print("2. 特征工程")
print("="*50)
# 分离数值特征
numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
print(f"Numeric features: {len(numeric_features)}")
# 创建预处理管道
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features)
]
)
# 3. 数据分割
print("\n" + "="*50)
print("3. 数据分割")
print("="*50)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Training set size: {X_train.shape[0]}")
print(f"Test set size: {X_test.shape[0]}")
# 4. 模型选择和训练
print("\n" + "="*50)
print("4. 模型选择和训练")
print("="*50)
# 定义模型
models = {
'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(random_state=42),
'SVM': SVC(random_state=42)
}
# 交叉验证
print("\nCross-validation scores:")
cv_results = {}
for name, model in models.items():
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', model)
])
scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
cv_results[name] = scores
print(f"{name}: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")
# 选择最佳模型
best_model_name = max(cv_results, key=lambda x: cv_results[x].mean())
print(f"\nBest model: {best_model_name}")
# 5. 超参数调优
print("\n" + "="*50)
print("5. 超参数调优")
print("="*50)
# 定义参数网格
param_grids = {
'Random Forest': {
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [3, 5, 7, None],
'classifier__min_samples_split': [2, 5, 10]
},
'Gradient Boosting': {
'classifier__n_estimators': [50, 100, 200],
'classifier__learning_rate': [0.01, 0.1, 0.2],
'classifier__max_depth': [3, 5, 7]
},
'SVM': {
'classifier__C': [0.1, 1, 10],
'classifier__kernel': ['rbf', 'linear'],
'classifier__gamma': ['scale', 'auto']
},
'Logistic Regression': {
'classifier__C': [0.1, 1, 10],
'classifier__penalty': ['l1', 'l2']
}
}
# 对最佳模型进行调优
best_model = models[best_model_name]
param_grid = param_grids[best_model_name]
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', best_model)
])
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
print(f"\nTuning {best_model_name}...")
grid_search.fit(X_train, y_train)
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best cross-validation score: {grid_search.best_score_:.4f}")
# 6. 模型评估
print("\n" + "="*50)
print("6. 模型评估")
print("="*50)
# 在测试集上评估
best_pipeline = grid_search.best_estimator_
y_pred = best_pipeline.predict(X_test)
y_pred_proba = best_pipeline.predict_proba(X_test)[:, 1] # [:, 1] 取所有行的第1列,即正类的预测概率
print(f"\nTest Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=['Malignant', 'Benign']))
# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['Malignant', 'Benign'],
yticklabels=['Malignant', 'Benign'])
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.savefig('confusion_matrix.png')
print("\nConfusion matrix saved to 'confusion_matrix.png'")
# 7. 特征重要性
print("\n" + "="*50)
print("7. 特征重要性")
print("="*50)
if hasattr(best_pipeline.named_steps['classifier'], 'feature_importances_'):
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_names = numeric_features
# 排序
indices = np.argsort(importances)[::-1][:10] # Top 10
plt.figure(figsize=(10, 6))
plt.bar(range(10), importances[indices])
plt.xticks(range(10), [feature_names[i] for i in indices], rotation=45, ha='right')
plt.title('Top 10 Feature Importances')
plt.tight_layout()
plt.savefig('feature_importance.png')
print("\nFeature importance plot saved to 'feature_importance.png'")
print("\nTop 10 important features:")
for i in indices:
print(f"{feature_names[i]}: {importances[i]:.4f}")
# 8. 保存模型
print("\n" + "="*50)
print("8. 保存模型")
print("="*50)
joblib.dump(best_pipeline, 'best_model.pkl')
print("Model saved to 'best_model.pkl'")
# 保存预处理器
joblib.dump(preprocessor, 'preprocessor.pkl')
print("Preprocessor saved to 'preprocessor.pkl'")
# 9. 预测函数
print("\n" + "="*50)
print("9. 预测函数")
print("="*50)
def predict_sample(sample, model_path='best_model.pkl'):
"""预测样本"""
model = joblib.load(model_path)
predictions = model.predict(sample)
probabilities = model.predict_proba(sample)
return predictions, probabilities
# 测试预测
sample = X_test.iloc[:5]
predictions, probabilities = predict_sample(sample)
print("\nSample predictions:")
for i, (pred, prob) in enumerate(zip(predictions, probabilities)): # zip()将多个可迭代对象按位置配对
print(f"Sample {i+1}: {'Benign' if pred == 1 else 'Malignant'} (probability: {prob[pred]:.4f})")
print("\n" + "="*50)
print("Pipeline completed successfully!")
print("="*50)
项目要点¶
- 数据探索: 使用pandas和可视化了解数据分布
- 特征工程: 使用Pipeline和ColumnTransformer进行预处理
- 模型选择: 交叉验证比较多个模型
- 超参数调优: 使用GridSearchCV寻找最佳参数
- 模型评估: 多维度评估模型性能
- 可解释性: 分析特征重要性
- 部署准备: 保存模型和预处理器
🎯 完成标准¶
完成所有项目后,你应该能够:
- 独立完成深度学习项目的全流程
- 使用PyTorch构建和训练CNN模型
- 使用Hugging Face进行NLP任务
- 使用scikit-learn完成ML项目
- 掌握模型调优和评估方法
- 了解模型部署的基本流程
💡 进阶建议¶
- 尝试不同架构: ResNet, DenseNet, EfficientNet
- 数据增强策略: AutoAugment, Mixup, CutMix
- 模型集成: 多模型投票、堆叠
- 超参数优化: Optuna, Ray Tune
- 模型部署: Docker, FastAPI, ONNX
📚 参考资源¶
恭喜完成阶段4的所有内容!
你已经掌握了: - PyTorch深度学习框架 - scikit-learn机器学习 - Hugging Face NLP工具 - 实际项目开发经验
下一步: 进入阶段5: 工程最佳实践