项目2: 分类问题实战¶
难度: ⭐⭐⭐ 中等偏难 时间: 10-15小时 涉及知识: 逻辑回归、决策树、随机森林、SVM、XGBoost
📖 项目概述¶
项目背景¶
分类是机器学习中最常见的任务之一,用于预测离散类别。广泛应用于垃圾邮件检测、图像分类、疾病诊断、信用评分等场景。本项目将带你构建一个完整的分类系统。
项目目标¶
构建一个完整的分类系统,能够: - 处理不平衡数据 - 实现多种分类算法 - 进行特征选择和工程 - 评估模型性能 - 可视化结果 - 部署模型为Web服务
技术栈¶
- 机器学习框架: scikit-learn, XGBoost
- 数据处理: pandas, numpy
- 可视化: matplotlib, seaborn
- Web框架: Streamlit
🏗️ 项目结构¶
Text Only
classification/
├── data/ # 数据目录
│ ├── raw/ # 原始数据
│ ├── processed/ # 处理后数据
│ └── features/ # 特征数据
├── models/ # 模型目录
│ ├── __init__.py
│ ├── logistic.py # 逻辑回归
│ ├── decision_tree.py # 决策树
│ ├── random_forest.py # 随机森林
│ ├── svm.py # 支持向量机
│ └── xgboost.py # XGBoost
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── data_preprocessing.py # 数据预处理
│ ├── feature_engineering.py # 特征工程
│ ├── model_evaluation.py # 模型评估
│ └── visualization.py # 可视化
├── train.py # 训练脚本
├── evaluate.py # 评估脚本
├── app.py # Web应用
├── config.py # 配置文件
└── requirements.txt # 依赖文件
🎯 核心功能¶
1. 数据预处理¶
- 数据加载: 加载CSV/Excel数据
- 缺失值处理: 填充或删除缺失值
- 类别编码: 标签编码、独热编码
- 数据标准化: 标准化/归一化
- 不平衡处理: 过采样、欠采样
2. 特征工程¶
- 特征选择: 选择重要特征
- 特征变换: 对数变换、Box-Cox变换
- 特征交互: 生成交互项
- 特征缩放: 标准化、归一化
3. 分类模型¶
- 逻辑回归: 基础线性分类器
- 决策树: 基于规则的分类器
- 随机森林: 集成学习方法
- SVM: 支持向量机
- XGBoost: 梯度提升树
4. 模型评估¶
- 准确率: 整体准确率
- 精确率: 预测为正的样本中实际为正的比例
- 召回率: 实际为正的样本中被预测为正的比例
- F1分数: 精确率和召回率的调和平均
- ROC-AUC: ROC曲线下面积
- 混淆矩阵: 可视化分类结果
5. 结果可视化¶
- 混淆矩阵: 可视化分类结果
- ROC曲线: 评估分类器性能
- PR曲线: 精确率-召回率曲线
- 特征重要性: 特征重要性可视化
- 学习曲线: 训练过程可视化
💻 代码实现¶
1. 配置文件 (config.py)¶
Python
"""
分类问题配置文件
"""
from dataclasses import dataclass
@dataclass # @dataclass自动生成__init__等方法
class Config:
"""配置类"""
# 数据配置
data_dir: str = "./data"
train_size: float = 0.8
random_state: int = 42
# 特征工程配置
feature_selection_method: str = "all" # all, k_best, rfe
n_features: int = 10
scaling_method: str = "standard" # standard, minmax, robust
# 不平衡数据处理
handle_imbalance: bool = False
sampling_method: str = "smote" # smote, random_oversample, random_undersample
# 模型配置
model_type: str = "random_forest" # logistic, decision_tree, random_forest, svm, xgboost
# 模型参数
logistic_params: dict = None
decision_tree_params: dict = None
random_forest_params: dict = None
svm_params: dict = None
xgboost_params: dict = None
# 训练配置
cv_folds: int = 5
scoring: str = "accuracy"
# 可视化配置
figsize: tuple[int, int] = (12, 8)
style: str = "seaborn-v0_8" # matplotlib 3.6+ 中 "seaborn" 已弃用
# 保存配置
model_dir: str = "./models"
plot_dir: str = "./plots"
config = Config()
2. 数据预处理 (utils/data_preprocessing.py)¶
💡 安装依赖:运行以下代码前需要先安装imbalanced-learn库:
Python
"""
数据预处理
"""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from imblearn.over_sampling import SMOTE, RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
class DataPreprocessor:
"""数据预处理器"""
def __init__(self, config): # __init__构造方法,创建对象时自动调用
"""初始化预处理器"""
self.config = config
self.scaler = None
self.imputer = None
self.label_encoder = None
self.onehot_encoder = None
def load_data(self, filepath) -> pd.DataFrame:
"""加载数据(支持文件路径或Streamlit UploadedFile对象)"""
# 兼容Streamlit的UploadedFile对象(使用.name属性获取文件名)
filename = filepath.name if hasattr(filepath, 'name') else filepath
if filename.endswith('.csv'):
return pd.read_csv(filepath)
elif filename.endswith('.xlsx'):
return pd.read_excel(filepath)
else:
raise ValueError("不支持的文件格式")
def encode_categorical(
self,
df: pd.DataFrame,
columns: list[str],
method: str = "label",
) -> pd.DataFrame:
"""
编码类别特征
Args:
df: 数据框
columns: 类别列名列表
method: 编码方法 (label, onehot)
Returns:
编码后的数据框
"""
df_encoded = df.copy()
if method == "label":
self.label_encoder = {}
for col in columns:
le = LabelEncoder()
df_encoded[col] = le.fit_transform(df_encoded[col])
self.label_encoder[col] = le
elif method == "onehot":
self.onehot_encoder = {}
for col in columns:
ohe = OneHotEncoder(sparse_output=False, drop='first')
encoded = ohe.fit_transform(df_encoded[[col]])
# 创建新列名
feature_names = [f"{col}_{int(i)}" for i in range(encoded.shape[1])]
# 添加到数据框
for i, feature_name in enumerate(feature_names):
df_encoded[feature_name] = encoded[:, i]
# 删除原始列(使用赋值替代inplace=True)
df_encoded = df_encoded.drop(col, axis=1)
self.onehot_encoder[col] = ohe
return df_encoded
def handle_imbalance(
self,
X: pd.DataFrame,
y: pd.Series,
method: str = "smote",
) -> tuple[pd.DataFrame, pd.Series]:
"""
处理不平衡数据
Args:
X: 特征
y: 目标
method: 处理方法
Returns:
处理后的特征和目标
"""
if method == "smote":
sampler = SMOTE(random_state=self.config.random_state)
elif method == "random_oversample":
sampler = RandomOverSampler(random_state=self.config.random_state)
elif method == "random_undersample":
sampler = RandomUnderSampler(random_state=self.config.random_state)
else:
raise ValueError(f"不支持的采样方法: {method}")
X_resampled, y_resampled = sampler.fit_resample(X, y)
return pd.DataFrame(X_resampled, columns=X.columns), pd.Series(y_resampled)
def scale_features(
self,
df: pd.DataFrame,
method: str = "standard",
) -> pd.DataFrame:
"""
标准化特征
Args:
df: 数据框
method: 标准化方法
Returns:
标准化后的数据框
"""
if method == "standard":
self.scaler = StandardScaler()
elif method == "minmax":
self.scaler = MinMaxScaler()
elif method == "robust":
self.scaler = RobustScaler()
else:
raise ValueError("不支持的标准化方法")
df_scaled = pd.DataFrame(
self.scaler.fit_transform(df),
columns=df.columns,
index=df.index,
)
return df_scaled
def split_data(
self,
X: pd.DataFrame,
y: pd.Series,
) -> Tuple:
"""划分数据集"""
return train_test_split(
X, y,
train_size=self.config.train_size,
random_state=self.config.random_state,
stratify=y,
)
3. 分类模型 (models/random_forest.py)¶
Python
"""
随机森林分类器
"""
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix,
classification_report
)
from sklearn.model_selection import cross_val_score
class RandomForestModel:
"""随机森林分类器"""
def __init__(self, **kwargs): # *args接收任意位置参数,**kwargs接收任意关键字参数
"""
初始化模型
Args:
**kwargs: 模型参数
"""
self.model = RandomForestClassifier(**kwargs)
def fit(self, X_train, y_train):
"""
训练模型
Args:
X_train: 训练特征
y_train: 训练目标
"""
self.model.fit(X_train, y_train)
def predict(self, X):
"""
预测类别
Args:
X: 特征
Returns:
预测类别
"""
return self.model.predict(X)
def predict_proba(self, X):
"""
预测概率
Args:
X: 特征
Returns:
预测概率
"""
return self.model.predict_proba(X)
def evaluate(self, X_test, y_test):
"""
评估模型
Args:
X_test: 测试特征
y_test: 测试目标
Returns:
评估指标字典
"""
y_pred = self.predict(X_test)
y_proba = self.predict_proba(X_test)[:, 1] if len(np.unique(y_test)) == 2 else None
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, average='weighted'),
"recall": recall_score(y_test, y_pred, average='weighted'),
"f1": f1_score(y_test, y_pred, average='weighted'),
"confusion_matrix": confusion_matrix(y_test, y_pred),
"classification_report": classification_report(y_test, y_pred),
}
# 二分类时计算AUC
if y_proba is not None:
metrics["auc"] = roc_auc_score(y_test, y_proba)
return metrics
def cross_validate(self, X, y, cv=5):
"""
交叉验证
Args:
X: 特征
y: 目标
cv: 折数
Returns:
交叉验证分数
"""
scores = cross_val_score(
self.model, X, y,
cv=cv,
scoring="accuracy",
)
return scores
def get_feature_importance(self):
"""
获取特征重要性
Returns:
特征重要性数组
"""
if hasattr(self.model, 'feature_importances_'): # hasattr检查对象是否有指定属性
return self.model.feature_importances_
else:
return None
4. XGBoost模型 (models/xgboost.py)¶
Python
"""
XGBoost分类器
"""
import numpy as np
from xgboost import XGBClassifier
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix,
classification_report
)
from sklearn.model_selection import cross_val_score
class XGBoostModel:
"""XGBoost分类器"""
def __init__(self, **kwargs):
"""
初始化模型
Args:
**kwargs: 模型参数
"""
self.model = XGBClassifier(**kwargs)
def fit(self, X_train, y_train):
"""
训练模型
Args:
X_train: 训练特征
y_train: 训练目标
"""
self.model.fit(X_train, y_train)
def predict(self, X):
"""
预测类别
Args:
X: 特征
Returns:
预测类别
"""
return self.model.predict(X)
def predict_proba(self, X):
"""
预测概率
Args:
X: 特征
Returns:
预测概率
"""
return self.model.predict_proba(X)
def evaluate(self, X_test, y_test):
"""
评估模型
Args:
X_test: 测试特征
y_test: 测试目标
Returns:
评估指标字典
"""
y_pred = self.predict(X_test)
y_proba = self.predict_proba(X_test)[:, 1] if len(np.unique(y_test)) == 2 else None
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, average='weighted'),
"recall": recall_score(y_test, y_pred, average='weighted'),
"f1": f1_score(y_test, y_pred, average='weighted'),
"confusion_matrix": confusion_matrix(y_test, y_pred),
"classification_report": classification_report(y_test, y_pred),
}
# 二分类时计算AUC
if y_proba is not None:
metrics["auc"] = roc_auc_score(y_test, y_proba)
return metrics
def cross_validate(self, X, y, cv=5):
"""
交叉验证
Args:
X: 特征
y: 目标
cv: 折数
Returns:
交叉验证分数
"""
scores = cross_val_score(
self.model, X, y,
cv=cv,
scoring="accuracy",
)
return scores
def get_feature_importance(self):
"""
获取特征重要性
Returns:
特征重要性数组
"""
if hasattr(self.model, 'feature_importances_'):
return self.model.feature_importances_
else:
return None
5. 可视化工具 (utils/visualization.py)¶
Python
"""
可视化工具
"""
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import roc_curve, auc, precision_recall_curve
def plot_confusion_matrix(
cm: np.ndarray,
classes: list,
figsize: tuple[int, int] = (10, 8),
):
"""
绘制混淆矩阵
Args:
cm: 混淆矩阵
classes: 类别名称列表
figsize: 图形大小
"""
plt.figure(figsize=figsize)
sns.heatmap(
cm, annot=True, fmt='d', cmap='Blues',
xticklabels=classes, yticklabels=classes
)
plt.xlabel('预测标签', fontsize=12)
plt.ylabel('真实标签', fontsize=12)
plt.title('混淆矩阵', fontsize=14)
plt.tight_layout()
plt.show()
def plot_roc_curve(
y_true: np.ndarray,
y_proba: np.ndarray,
figsize: tuple[int, int] = (10, 6),
):
"""
绘制ROC曲线
Args:
y_true: 真实标签
y_proba: 预测概率
figsize: 图形大小
"""
fpr, tpr, _ = roc_curve(y_true, y_proba)
roc_auc = auc(fpr, tpr)
plt.figure(figsize=figsize)
plt.plot(fpr, tpr, color='darkorange', lw=2,
label=f'ROC curve (AUC = {roc_auc:.4f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate', fontsize=12)
plt.title('Receiver Operating Characteristic (ROC) Curve', fontsize=14)
plt.legend(loc="lower right", fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def plot_precision_recall_curve(
y_true: np.ndarray,
y_proba: np.ndarray,
figsize: tuple[int, int] = (10, 6),
):
"""
绘制精确率-召回率曲线
Args:
y_true: 真实标签
y_proba: 预测概率
figsize: 图形大小
"""
precision, recall, _ = precision_recall_curve(y_true, y_proba)
pr_auc = auc(recall, precision)
plt.figure(figsize=figsize)
plt.plot(recall, precision, color='blue', lw=2,
label=f'PR curve (AUC = {pr_auc:.4f})')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Recall', fontsize=12)
plt.ylabel('Precision', fontsize=12)
plt.title('Precision-Recall Curve', fontsize=14)
plt.legend(loc="lower left", fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def plot_feature_importance(
feature_names: list,
importance: np.ndarray,
top_n: int = 10,
figsize: tuple[int, int] = (10, 6),
):
"""
绘制特征重要性
Args:
feature_names: 特征名称列表
importance: 重要性数组
top_n: 显示前N个特征
figsize: 图形大小
"""
# 排序
indices = np.argsort(importance)[::-1][:top_n]
plt.figure(figsize=figsize)
plt.bar(range(len(indices)), importance[indices])
plt.xticks(range(len(indices)), [feature_names[i] for i in indices], rotation=45)
plt.xlabel('特征', fontsize=12)
plt.ylabel('重要性', fontsize=12)
plt.title(f'前{top_n}个重要特征', fontsize=14)
plt.tight_layout()
plt.show()
6. Streamlit应用 (app.py)¶
Python
"""
分类问题Web应用
"""
import streamlit as st
import pandas as pd
import numpy as np
import joblib
import os
from config import config
from utils.data_preprocessing import DataPreprocessor
from models.random_forest import RandomForestModel
from models.xgboost import XGBoostModel
from utils.visualization import (
plot_confusion_matrix, plot_roc_curve,
plot_precision_recall_curve, plot_feature_importance
)
# 页面配置
st.set_page_config(
page_title="分类系统",
page_icon="🎯",
layout="wide"
)
# 标题
st.title("🎯 分类问题系统")
st.markdown("---")
# 侧边栏
st.sidebar.header("模型设置")
# 模型选择
model_type = st.sidebar.selectbox(
"选择模型",
["random_forest", "xgboost", "logistic", "decision_tree", "svm"],
format_func=lambda x: { # lambda匿名函数,简洁定义单行函数
"random_forest": "随机森林",
"xgboost": "XGBoost",
"logistic": "逻辑回归",
"decision_tree": "决策树",
"svm": "支持向量机",
}[x],
)
# 不平衡数据处理
handle_imbalance = st.sidebar.checkbox("处理不平衡数据", value=False)
if handle_imbalance:
sampling_method = st.sidebar.selectbox(
"采样方法",
["smote", "random_oversample", "random_undersample"],
)
else:
sampling_method = "smote"
# 主界面
col1, col2 = st.columns(2)
with col1:
st.subheader("数据上传")
# 上传数据
uploaded_file = st.file_uploader(
"上传CSV或Excel文件",
type=['csv', 'xlsx'],
)
if uploaded_file is not None:
# 加载数据
preprocessor = DataPreprocessor(config)
df = preprocessor.load_data(uploaded_file)
st.write("数据预览:")
st.dataframe(df.head())
# 选择目标列
target_column = st.selectbox(
"选择目标列",
df.columns,
)
# 选择特征列
feature_columns = st.multiselect(
"选择特征列",
[col for col in df.columns if col != target_column],
default=[col for col in df.columns if col != target_column][:5],
)
# 选择类别编码方法
categorical_columns = df[feature_columns].select_dtypes(include=['object']).columns.tolist()
if len(categorical_columns) > 0:
encoding_method = st.selectbox(
"类别编码方法",
["label", "onehot"],
)
else:
encoding_method = "label"
if st.button("训练模型", type="primary"):
if not feature_columns:
st.warning("请至少选择一个特征列")
else:
# 准备数据
X = df[feature_columns]
y = df[target_column]
# 编码类别特征
if len(categorical_columns) > 0:
X = preprocessor.encode_categorical(X, categorical_columns, encoding_method)
# 处理不平衡数据
if handle_imbalance:
X, y = preprocessor.handle_imbalance(X, y, sampling_method)
# 划分数据
X_train, X_test, y_train, y_test = preprocessor.split_data(X, y)
# 训练模型
with st.spinner("正在训练模型..."):
if model_type == "random_forest":
model = RandomForestModel(
n_estimators=100,
random_state=config.random_state,
)
elif model_type == "xgboost":
model = XGBoostModel(
n_estimators=100,
random_state=config.random_state,
eval_metric='logloss',
)
else:
model = RandomForestModel(
n_estimators=100,
random_state=config.random_state,
)
model.fit(X_train, y_train)
# 评估模型
metrics = model.evaluate(X_test, y_test)
# 显示结果
with col2:
st.subheader("模型评估")
# 显示指标
st.metric("准确率", f"{metrics['accuracy']:.4f}")
st.metric("精确率", f"{metrics['precision']:.4f}")
st.metric("召回率", f"{metrics['recall']:.4f}")
st.metric("F1分数", f"{metrics['f1']:.4f}")
if "auc" in metrics:
st.metric("AUC", f"{metrics['auc']:.4f}")
# 可视化
st.subheader("结果可视化")
# 混淆矩阵
plot_confusion_matrix(
metrics['confusion_matrix'],
classes=np.unique(y_test),
)
# ROC曲线(二分类)
if len(np.unique(y_test)) == 2:
y_proba = model.predict_proba(X_test)[:, 1]
plot_roc_curve(y_test.values, y_proba)
plot_precision_recall_curve(y_test.values, y_proba)
# 特征重要性
importance = model.get_feature_importance()
if importance is not None:
plot_feature_importance(
X_train.columns,
importance,
top_n=min(10, len(X_train.columns)),
)
🧪 测试方法¶
1. 单元测试¶
Python
"""
单元测试示例
"""
import pytest
import numpy as np
from models.random_forest import RandomForestModel
def test_random_forest():
"""测试随机森林"""
# 创建模型
model = RandomForestModel(n_estimators=10, random_state=42)
# 创建测试数据
X_train = np.random.randn(100, 5)
y_train = np.random.randint(0, 2, 100)
X_test = np.random.randn(20, 5)
y_test = np.random.randint(0, 2, 20)
# 训练
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 验证
assert y_pred.shape == y_test.shape
metrics = model.evaluate(X_test, y_test)
assert metrics['accuracy'] >= 0.0 and metrics['accuracy'] <= 1.0
print("✓ 随机森林测试通过")
2. 集成测试¶
Python
"""
集成测试示例
"""
def test_classification_pipeline():
"""测试分类流程"""
from config import config
from utils.data_preprocessing import DataPreprocessor
# 创建预处理器
preprocessor = DataPreprocessor(config)
# 创建测试数据
df = pd.DataFrame({
'feature1': np.random.randn(100),
'feature2': np.random.randn(100),
'feature3': np.random.randn(100),
'target': np.random.randint(0, 2, 100),
})
# 处理数据
X = df[['feature1', 'feature2', 'feature3']]
y = df['target']
X_train, X_test, y_train, y_test = preprocessor.split_data(X, y)
# 训练模型
model = RandomForestModel(n_estimators=10, random_state=42)
model.fit(X_train, y_train)
# 评估
metrics = model.evaluate(X_test, y_test)
# 验证
assert 'accuracy' in metrics
assert 'precision' in metrics
assert 'recall' in metrics
assert 'f1' in metrics
print("✓ 分类流程测试通过")
📊 扩展建议¶
1. 功能扩展¶
- 多标签分类: 支持一个样本多个标签
- 多分类: 支持多类别分类
- 集成方法: Stacking, Blending
- 自动ML: Auto-sklearn, TPOT
2. 性能优化¶
- 超参数调优: GridSearch, RandomSearch, BayesianOptimization
- 特征选择: 递归特征消除
- 模型融合: 投票法、堆叠法
- 并行训练: 多核并行
3. 部署优化¶
- 模型导出: ONNX, PMML
- API服务: FastAPI, Flask
- 批量预测: 批量处理
- 实时预测: 低延迟推理
📚 学习收获¶
完成本项目后,你将掌握:
- ✅ 分类算法原理和实现
- ✅ 不平衡数据处理
- ✅ 特征工程技术
- ✅ 模型评估指标
- ✅ 可视化技巧
- ✅ Streamlit Web应用开发
- ✅ 完整的分类系统开发
🔗 参考资源¶
项目完成时间: 10-15小时 难度等级: ⭐⭐⭐ 中等偏难 推荐指数: ⭐⭐⭐⭐⭐