实战项目: 数据可视化仪表板¶
难度: ⭐⭐⭐ 中等 时间: 12-15小时 涉及知识: Pandas、Matplotlib、Seaborn、Streamlit、数据可视化
📖 项目概述¶
项目背景¶
在数据驱动的时代,能够将数据转化为直观、易懂的可视化图表是一项重要技能。数据可视化仪表板可以实时展示关键指标,帮助决策者快速理解数据背后的含义,做出明智的决策。
本项目的目标是构建一个完整的数据可视化仪表板,能够: - 加载和处理各种数据源 - 创建多种类型的可视化图表 - 支持交互式数据探索 - 提供美观的用户界面 - 实现数据过滤和筛选
项目目标¶
构建一个功能完整的数据可视化仪表板,能够: - 加载CSV、Excel等格式的数据 - 数据清洗和预处理 - 创建多种可视化图表(折线图、柱状图、饼图、散点图等) - 支持交互式数据探索 - 实现数据过滤和筛选 - 提供美观的Web界面
技术栈¶
- 数据处理: Pandas
- 数据可视化: Matplotlib, Seaborn, Plotly
- Web框架: Streamlit
- 数值计算: NumPy
- 文件处理: openpyxl, xlrd
🏗️ 项目结构¶
Text Only
data-dashboard/
├── app/ # 应用主目录
│ ├── __init__.py
│ ├── main.py # Streamlit主应用
│ ├── data_loader.py # 数据加载器
│ ├── data_processor.py # 数据处理器
│ ├── visualizer.py # 可视化工具
│ └── filters.py # 过滤器
├── data/ # 数据目录
│ ├── raw/ # 原始数据
│ ├── processed/ # 处理后数据
│ └── sample/ # 示例数据
├── components/ # UI组件
│ ├── __init__.py
│ ├── charts.py # 图表组件
│ ├── filters.py # 过滤器组件
│ └── stats.py # 统计组件
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── helpers.py # 辅助函数
│ └── config.py # 配置文件
├── tests/ # 测试目录
│ ├── test_data_loader.py
│ ├── test_data_processor.py
│ └── test_visualizer.py
├── requirements.txt # Python依赖
├── README.md # 项目说明
└── .streamlit/ # Streamlit配置
└── config.toml
🎯 核心功能¶
1. 数据加载¶
- 多格式支持: 支持CSV、Excel、JSON等格式
- 批量加载: 批量加载多个文件
- 数据预览: 快速预览数据
- 元数据提取: 提取数据元信息
2. 数据处理¶
- 数据清洗: 处理缺失值、异常值
- 数据转换: 数据类型转换
- 特征工程: 创建新特征
- 数据聚合: 数据分组和聚合
3. 数据可视化¶
- 折线图: 展示趋势变化
- 柱状图: 比较不同类别
- 饼图: 显示占比分布
- 散点图: 探索变量关系
- 热力图: 显示相关性矩阵
- 箱线图: 显示数据分布
4. 交互功能¶
- 数据过滤: 根据条件过滤数据
- 时间范围选择: 选择时间范围
- 图表交互: 缩放、平移、选择
- 数据导出: 导出处理后的数据
5. 统计分析¶
- 描述性统计: 计算基本统计量
- 相关性分析: 计算相关性
- 趋势分析: 分析数据趋势
- 异常检测: 检测异常值
💻 代码实现¶
1. 配置文件 (utils/config.py)¶
Python
"""
配置文件
"""
from pathlib import Path
class Config:
"""应用配置"""
# 数据目录
DATA_DIR = Path("./data")
RAW_DATA_DIR = DATA_DIR / "raw"
PROCESSED_DATA_DIR = DATA_DIR / "processed"
SAMPLE_DATA_DIR = DATA_DIR / "sample"
# 支持的文件格式
SUPPORTED_FORMATS = [".csv", ".xlsx", ".xls", ".json"]
# 图表配置
DEFAULT_FIGURE_SIZE = (10, 6)
DEFAULT_COLOR_PALETTE = "Set2"
# Streamlit配置
PAGE_TITLE = "数据可视化仪表板"
PAGE_ICON = "📊"
LAYOUT = "wide"
# 数据处理配置
MAX_ROWS_PREVIEW = 1000
MAX_CATEGORIES = 20
@classmethod # @classmethod定义类方法,第一个参数为类本身
def create_directories(cls):
"""创建必要的目录"""
cls.DATA_DIR.mkdir(parents=True, exist_ok=True)
cls.RAW_DATA_DIR.mkdir(parents=True, exist_ok=True)
cls.PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)
cls.SAMPLE_DATA_DIR.mkdir(parents=True, exist_ok=True)
# 创建目录
Config.create_directories()
2. 数据加载器 (app/data_loader.py)¶
Python
"""
数据加载器
"""
import pandas as pd
from pathlib import Path
from typing import Any
from utils.config import Config
class DataLoader:
"""数据加载器"""
def __init__(self):
"""初始化数据加载器"""
self.supported_formats = Config.SUPPORTED_FORMATS
def load_file(
self,
file_path: str | Path,
**kwargs
) -> pd.DataFrame:
"""
加载单个文件
Args:
file_path: 文件路径
**kwargs: 传递给pandas读取函数的额外参数
Returns:
DataFrame对象
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
# 根据文件扩展名选择读取方法
extension = file_path.suffix.lower()
if extension == ".csv":
return self._load_csv(file_path, **kwargs)
elif extension in [".xlsx", ".xls"]:
return self._load_excel(file_path, **kwargs)
elif extension == ".json":
return self._load_json(file_path, **kwargs)
else:
raise ValueError(f"不支持的文件格式: {extension}")
def _load_csv(
self,
file_path: Path,
encoding: str = "utf-8",
**kwargs
) -> pd.DataFrame:
"""加载CSV文件"""
return pd.read_csv(file_path, encoding=encoding, **kwargs)
def _load_excel(
self,
file_path: Path,
sheet_name: str | int = 0,
**kwargs
) -> pd.DataFrame:
"""加载Excel文件"""
return pd.read_excel(file_path, sheet_name=sheet_name, **kwargs)
def _load_json(
self,
file_path: Path,
**kwargs
) -> pd.DataFrame:
"""加载JSON文件"""
return pd.read_json(file_path, **kwargs)
def load_directory(
self,
directory: str | Path,
pattern: str = "*",
combine: bool = False
) -> list[pd.DataFrame] | pd.DataFrame:
"""
加载目录中的所有文件
Args:
directory: 目录路径
pattern: 文件匹配模式
combine: 是否合并所有文件
Returns:
DataFrame列表或合并后的DataFrame
"""
directory = Path(directory)
# 查找所有支持的文件
files = []
for ext in self.supported_formats:
files.extend(directory.glob(f"{pattern}{ext}"))
if not files:
raise FileNotFoundError(f"目录中没有找到支持的文件: {directory}")
# 加载所有文件
dataframes = []
for file_path in files:
try:
df = self.load_file(file_path)
df["source_file"] = file_path.name
dataframes.append(df)
except Exception as e:
print(f"加载文件失败 {file_path}: {str(e)}")
if combine:
return pd.concat(dataframes, ignore_index=True)
else:
return dataframes
def get_file_info(
self,
file_path: str | Path
) -> dict:
"""
获取文件信息
Args:
file_path: 文件路径
Returns:
文件信息字典
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
# 加载数据
df = self.load_file(file_path)
# 提取信息
info = {
"file_name": file_path.name,
"file_size": file_path.stat().st_size,
"file_format": file_path.suffix,
"rows": len(df),
"columns": len(df.columns),
"column_names": df.columns.tolist(),
"column_types": df.dtypes.astype(str).to_dict(),
"memory_usage": df.memory_usage(deep=True).sum(),
"has_missing": df.isnull().any().any(),
"missing_counts": df.isnull().sum().to_dict()
}
return info
3. 数据处理器 (app/data_processor.py)¶
Python
"""
数据处理器
"""
import pandas as pd
import numpy as np
from typing import Any
from utils.config import Config
class DataProcessor:
"""数据处理器"""
def __init__(self, df: pd.DataFrame):
"""
初始化数据处理器
Args:
df: DataFrame对象
"""
self.df = df.copy()
self.original_df = df.copy()
def clean_data(
self,
drop_duplicates: bool = True,
handle_missing: str = "drop",
missing_threshold: float = 0.5
) -> 'DataProcessor':
"""
清洗数据
Args:
drop_duplicates: 是否删除重复行
handle_missing: 处理缺失值的方式 (drop, fill, keep)
missing_threshold: 缺失值阈值,超过此比例的列将被删除
Returns:
自身对象
"""
# 删除重复行
if drop_duplicates:
self.df = self.df.drop_duplicates()
# 处理缺失值
if handle_missing == "drop":
# 删除缺失值比例过高的列
missing_ratio = self.df.isnull().mean()
cols_to_drop = missing_ratio[missing_ratio > missing_threshold].index.tolist()
if cols_to_drop:
self.df = self.df.drop(columns=cols_to_drop)
# 删除包含缺失值的行
self.df = self.df.dropna()
elif handle_missing == "fill":
# 数值列用中位数填充
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
self.df[numeric_cols] = self.df[numeric_cols].fillna(
self.df[numeric_cols].median()
)
# 分类列用众数填充
categorical_cols = self.df.select_dtypes(include=['object']).columns
for col in categorical_cols:
mode = self.df[col].mode()
if len(mode) > 0:
self.df[col] = self.df[col].fillna(mode[0])
return self
def convert_types(
self,
type_mapping: dict[str, str] | None = None
) -> 'DataProcessor':
"""
转换数据类型
Args:
type_mapping: 列名到类型的映射
Returns:
自身对象
"""
if type_mapping:
for col, dtype in type_mapping.items():
if col in self.df.columns:
try:
if dtype == "datetime":
self.df[col] = pd.to_datetime(self.df[col])
elif dtype == "category":
self.df[col] = self.df[col].astype('category')
else:
self.df[col] = self.df[col].astype(dtype)
except Exception as e:
print(f"转换列 {col} 失败: {str(e)}")
return self
def remove_outliers(
self,
columns: list[str] | None = None,
method: str = "iqr",
threshold: float = 1.5
) -> 'DataProcessor':
"""
移除异常值
Args:
columns: 要处理的列名列表
method: 异常值检测方法 (iqr, zscore)
threshold: 阈值
Returns:
自身对象
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns.tolist()
for col in columns:
if col not in self.df.columns:
continue
if method == "iqr":
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
self.df = self.df[
(self.df[col] >= lower_bound) &
(self.df[col] <= upper_bound)
]
elif method == "zscore":
z_scores = np.abs(
(self.df[col] - self.df[col].mean()) / self.df[col].std()
)
self.df = self.df[z_scores < threshold]
return self
def aggregate_data(
self,
group_by: str | list[str],
agg_func: str | dict[str, str] = "mean"
) -> pd.DataFrame:
"""
聚合数据
Args:
group_by: 分组列
agg_func: 聚合函数
Returns:
聚合后的DataFrame
"""
return self.df.groupby(group_by).agg(agg_func).reset_index()
def create_features(
self,
feature_configs: list[dict[str, Any]]
) -> 'DataProcessor':
"""
创建新特征
Args:
feature_configs: 特征配置列表
Returns:
自身对象
"""
for config in feature_configs:
feature_name = config["name"]
feature_type = config["type"]
params = config.get("params", {})
if feature_type == "arithmetic":
# 算术运算(使用 df.eval() 替代内置 eval,避免代码注入风险)
self.df[feature_name] = self.df.eval(
params["expression"]
)
elif feature_type == "datetime":
# 日期时间特征
date_col = params["column"]
if date_col in self.df.columns:
self.df[feature_name] = getattr( # getattr()动态获取对象属性,可设默认值
self.df[date_col].dt,
params["attribute"]
)
elif feature_type == "binning":
# 分箱
self.df[feature_name] = pd.cut(
self.df[params["column"]],
bins=params["bins"],
labels=params.get("labels")
)
return self
def get_statistics(self) -> dict[str, Any]:
"""
获取统计信息
Returns:
统计信息字典
"""
stats = {
"shape": self.df.shape,
"columns": self.df.columns.tolist(),
"dtypes": self.df.dtypes.astype(str).to_dict(),
"memory_usage": self.df.memory_usage(deep=True).sum(),
"missing_values": self.df.isnull().sum().to_dict(),
"numeric_stats": {},
"categorical_stats": {}
}
# 数值列统计
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
stats["numeric_stats"] = self.df[numeric_cols].describe().to_dict()
# 分类列统计
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
stats["categorical_stats"][col] = {
"unique": self.df[col].nunique(),
"top": self.df[col].mode().iloc[0] if len(self.df[col].mode()) > 0 else None,
"freq": self.df[col].value_counts().iloc[0] if len(self.df[col].value_counts()) > 0 else 0
}
return stats
def get_data(self) -> pd.DataFrame:
"""
获取处理后的数据
Returns:
DataFrame对象
"""
return self.df.copy()
def reset(self):
"""重置为原始数据"""
self.df = self.original_df.copy()
4. 可视化工具 (app/visualizer.py)¶
Python
"""
可视化工具
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from typing import Any
from utils.config import Config
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
# 设置样式
sns.set_style("whitegrid")
sns.set_palette(Config.DEFAULT_COLOR_PALETTE)
class DataVisualizer:
"""数据可视化工具"""
def __init__(self, df: pd.DataFrame):
"""
初始化可视化工具
Args:
df: DataFrame对象
"""
self.df = df
self.default_figsize = Config.DEFAULT_FIGURE_SIZE
def line_plot(
self,
x: str,
y: str | list[str],
title: str | None = None,
figsize: tuple | None = None,
interactive: bool = False
) -> plt.Figure | go.Figure:
"""
绘制折线图
Args:
x: x轴列名
y: y轴列名或列名列表
title: 图表标题
figsize: 图表大小
interactive: 是否使用交互式图表
Returns:
图表对象
"""
if figsize is None:
figsize = self.default_figsize
if interactive:
return self._line_plot_interactive(x, y, title)
else:
return self._line_plot_static(x, y, title, figsize)
def _line_plot_static(
self,
x: str,
y: str | list[str],
title: str | None,
figsize: tuple
) -> plt.Figure:
"""绘制静态折线图"""
fig, ax = plt.subplots(figsize=figsize)
if isinstance(y, str):
y = [y]
for col in y:
ax.plot(self.df[x], self.df[col], label=col, marker='o')
ax.set_xlabel(x)
ax.set_ylabel('值')
ax.set_title(title or f'{x} vs {", ".join(y)}')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def _line_plot_interactive(
self,
x: str,
y: str | list[str],
title: str | None
) -> go.Figure:
"""绘制交互式折线图"""
if isinstance(y, str):
y = [y]
fig = go.Figure()
for col in y:
fig.add_trace(go.Scatter(
x=self.df[x],
y=self.df[col],
mode='lines+markers',
name=col,
line=dict(width=2),
marker=dict(size=6)
))
fig.update_layout(
title=title or f'{x} vs {", ".join(y)}',
xaxis_title=x,
yaxis_title='值',
hovermode='x unified',
template='plotly_white'
)
return fig
def bar_plot(
self,
x: str,
y: str,
title: str | None = None,
figsize: tuple | None = None,
interactive: bool = False,
horizontal: bool = False
) -> plt.Figure | go.Figure:
"""
绘制柱状图
Args:
x: x轴列名
y: y轴列名
title: 图表标题
figsize: 图表大小
interactive: 是否使用交互式图表
horizontal: 是否水平显示
Returns:
图表对象
"""
if figsize is None:
figsize = self.default_figsize
if interactive:
return self._bar_plot_interactive(x, y, title, horizontal)
else:
return self._bar_plot_static(x, y, title, figsize, horizontal)
def _bar_plot_static(
self,
x: str,
y: str,
title: str | None,
figsize: tuple,
horizontal: bool
) -> plt.Figure:
"""绘制静态柱状图"""
fig, ax = plt.subplots(figsize=figsize)
if horizontal:
ax.barh(self.df[y], self.df[x])
ax.set_xlabel(x)
ax.set_ylabel(y)
else:
ax.bar(self.df[x], self.df[y])
ax.set_xlabel(x)
ax.set_ylabel(y)
ax.set_title(title or f'{x} vs {y}')
ax.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
return fig
def _bar_plot_interactive(
self,
x: str,
y: str,
title: str | None,
horizontal: bool
) -> go.Figure:
"""绘制交互式柱状图"""
if horizontal:
fig = px.bar(self.df, x=y, y=x, orientation='h')
else:
fig = px.bar(self.df, x=x, y=y)
fig.update_layout(
title=title or f'{x} vs {y}',
template='plotly_white'
)
return fig
def pie_plot(
self,
values: str,
names: str,
title: str | None = None,
figsize: tuple | None = None,
interactive: bool = False
) -> plt.Figure | go.Figure:
"""
绘制饼图
Args:
values: 值列名
names: 名称列名
title: 图表标题
figsize: 图表大小
interactive: 是否使用交互式图表
Returns:
图表对象
"""
if figsize is None:
figsize = (8, 8)
if interactive:
return self._pie_plot_interactive(values, names, title)
else:
return self._pie_plot_static(values, names, title, figsize)
def _pie_plot_static(
self,
values: str,
names: str,
title: str | None,
figsize: tuple
) -> plt.Figure:
"""绘制静态饼图"""
fig, ax = plt.subplots(figsize=figsize)
ax.pie(
self.df[values],
labels=self.df[names],
autopct='%1.1f%%',
startangle=90
)
ax.set_title(title or f'{names} 分布')
return fig
def _pie_plot_interactive(
self,
values: str,
names: str,
title: str | None
) -> go.Figure:
"""绘制交互式饼图"""
fig = px.pie(
self.df,
values=values,
names=names,
title=title or f'{names} 分布'
)
return fig
def scatter_plot(
self,
x: str,
y: str,
color: str | None = None,
size: str | None = None,
title: str | None = None,
figsize: tuple | None = None,
interactive: bool = False
) -> plt.Figure | go.Figure:
"""
绘制散点图
Args:
x: x轴列名
y: y轴列名
color: 颜色列名
size: 大小列名
title: 图表标题
figsize: 图表大小
interactive: 是否使用交互式图表
Returns:
图表对象
"""
if figsize is None:
figsize = self.default_figsize
if interactive:
return self._scatter_plot_interactive(x, y, color, size, title)
else:
return self._scatter_plot_static(x, y, color, size, title, figsize)
def _scatter_plot_static(
self,
x: str,
y: str,
color: str | None,
size: str | None,
title: str | None,
figsize: tuple
) -> plt.Figure:
"""绘制静态散点图"""
fig, ax = plt.subplots(figsize=figsize)
if color:
scatter = ax.scatter(
self.df[x],
self.df[y],
c=self.df[color],
s=self.df[size] if size else 50,
alpha=0.6,
cmap='viridis'
)
plt.colorbar(scatter, ax=ax, label=color)
else:
ax.scatter(
self.df[x],
self.df[y],
s=self.df[size] if size else 50,
alpha=0.6
)
ax.set_xlabel(x)
ax.set_ylabel(y)
ax.set_title(title or f'{x} vs {y}')
ax.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def _scatter_plot_interactive(
self,
x: str,
y: str,
color: str | None,
size: str | None,
title: str | None
) -> go.Figure:
"""绘制交互式散点图"""
fig = px.scatter(
self.df,
x=x,
y=y,
color=color,
size=size,
title=title or f'{x} vs {y}',
template='plotly_white'
)
return fig
def heatmap(
self,
title: str | None = None,
figsize: tuple | None = None,
interactive: bool = False
) -> plt.Figure | go.Figure:
"""
绘制热力图
Args:
title: 图表标题
figsize: 图表大小
interactive: 是否使用交互式图表
Returns:
图表对象
"""
if figsize is None:
figsize = (10, 8)
# 计算相关性矩阵
corr_matrix = self.df.corr(numeric_only=True)
if interactive:
return self._heatmap_interactive(corr_matrix, title)
else:
return self._heatmap_static(corr_matrix, title, figsize)
def _heatmap_static(
self,
corr_matrix: pd.DataFrame,
title: str | None,
figsize: tuple
) -> plt.Figure:
"""绘制静态热力图"""
fig, ax = plt.subplots(figsize=figsize)
sns.heatmap(
corr_matrix,
annot=True,
fmt='.2f',
cmap='coolwarm',
center=0,
square=True,
ax=ax
)
ax.set_title(title or '相关性矩阵')
plt.tight_layout()
return fig
def _heatmap_interactive(
self,
corr_matrix: pd.DataFrame,
title: str | None
) -> go.Figure:
"""绘制交互式热力图"""
fig = px.imshow(
corr_matrix,
text_auto=True,
aspect='auto',
color_continuous_scale='RdBu_r',
title=title or '相关性矩阵'
)
return fig
def box_plot(
self,
y: str,
x: str | None = None,
title: str | None = None,
figsize: tuple | None = None,
interactive: bool = False
) -> plt.Figure | go.Figure:
"""
绘制箱线图
Args:
y: y轴列名
x: x轴列名(可选)
title: 图表标题
figsize: 图表大小
interactive: 是否使用交互式图表
Returns:
图表对象
"""
if figsize is None:
figsize = self.default_figsize
if interactive:
return self._box_plot_interactive(y, x, title)
else:
return self._box_plot_static(y, x, title, figsize)
def _box_plot_static(
self,
y: str,
x: str | None,
title: str | None,
figsize: tuple
) -> plt.Figure:
"""绘制静态箱线图"""
fig, ax = plt.subplots(figsize=figsize)
if x:
sns.boxplot(data=self.df, x=x, y=y, ax=ax)
else:
sns.boxplot(data=self.df, y=y, ax=ax)
ax.set_title(title or f'{y} 分布')
ax.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
return fig
def _box_plot_interactive(
self,
y: str,
x: str | None,
title: str | None
) -> go.Figure:
"""绘制交互式箱线图"""
if x:
fig = px.box(self.df, x=x, y=y)
else:
fig = px.box(self.df, y=y)
fig.update_layout(
title=title or f'{y} 分布',
template='plotly_white'
)
return fig
5. Streamlit主应用 (app/main.py)¶
Python
"""
Streamlit主应用
"""
import streamlit as st
import pandas as pd
from pathlib import Path
import io
from app.data_loader import DataLoader
from app.data_processor import DataProcessor
from app.visualizer import DataVisualizer
from utils.config import Config
# 页面配置
st.set_page_config(
page_title=Config.PAGE_TITLE,
page_icon=Config.PAGE_ICON,
layout=Config.LAYOUT
)
# 初始化session state
if "df" not in st.session_state:
st.session_state.df = None
if "processor" not in st.session_state:
st.session_state.processor = None
if "visualizer" not in st.session_state:
st.session_state.visualizer = None
def main():
"""主函数"""
st.title("📊 数据可视化仪表板")
# 侧边栏
with st.sidebar:
st.header("📁 数据加载")
# 文件上传
uploaded_file = st.file_uploader(
"上传数据文件",
type=['csv', 'xlsx', 'xls', 'json']
)
if uploaded_file:
load_data(uploaded_file)
st.divider()
# 数据处理选项
if st.session_state.df is not None:
st.header("🔧 数据处理")
# 数据清洗
if st.button("清洗数据"):
with st.spinner("正在清洗数据..."):
processor = DataProcessor(st.session_state.df)
processor.clean_data()
st.session_state.df = processor.get_data()
st.success("数据清洗完成!")
# 移除异常值
if st.button("移除异常值"):
with st.spinner("正在移除异常值..."):
processor = DataProcessor(st.session_state.df)
processor.remove_outliers()
st.session_state.df = processor.get_data()
st.success("异常值移除完成!")
# 重置数据
if st.button("重置数据"):
if "original_df" in st.session_state:
st.session_state.df = st.session_state.original_df.copy()
st.success("数据已重置!")
# 主区域
if st.session_state.df is not None:
show_dashboard()
else:
show_welcome()
def load_data(uploaded_file):
"""加载数据"""
try:
# 保存文件
file_path = Path(Config.RAW_DATA_DIR) / uploaded_file.name
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
# 加载数据
loader = DataLoader()
df = loader.load_file(file_path)
# 保存到session state
st.session_state.df = df
st.session_state.original_df = df.copy()
st.session_state.processor = DataProcessor(df)
st.session_state.visualizer = DataVisualizer(df)
st.success(f"数据加载成功!共 {len(df)} 行, {len(df.columns)} 列")
# 显示数据预览
st.subheader("📋 数据预览")
st.dataframe(df.head())
# 显示数据信息
st.subheader("📊 数据信息")
file_info = loader.get_file_info(file_path)
col1, col2, col3 = st.columns(3)
with col1:
st.metric("行数", file_info["rows"])
with col2:
st.metric("列数", file_info["columns"])
with col3:
st.metric("文件大小", f"{file_info['file_size'] / 1024:.2f} KB")
except Exception as e:
st.error(f"加载数据失败: {str(e)}")
def show_welcome():
"""显示欢迎页面"""
st.markdown("""
## 欢迎使用数据可视化仪表板!
### 功能特点
- 📁 **多格式支持**: 支持CSV、Excel、JSON等格式
- 🔧 **数据清洗**: 自动处理缺失值和异常值
- 📊 **多种图表**: 折线图、柱状图、饼图、散点图等
- 🎨 **交互式图表**: 支持缩放、平移、选择等交互
- 🔍 **数据过滤**: 灵活的数据过滤和筛选
### 使用步骤
1. 在侧边栏上传数据文件
2. 查看数据预览和统计信息
3. 选择要创建的图表类型
4. 自定义图表参数
5. 导出图表或数据
### 示例数据
您可以使用以下示例数据进行测试:
- 销售数据
- 用户数据
- 产品数据
开始上传您的数据吧! 🚀
""")
def show_dashboard():
"""显示仪表板"""
# 创建标签页
tab1, tab2, tab3, tab4 = st.tabs(["📊 数据概览", "📈 趋势分析", "📉 分布分析", "🔍 关系分析"])
with tab1:
show_overview()
with tab2:
show_trend_analysis()
with tab3:
show_distribution_analysis()
with tab4:
show_relationship_analysis()
def show_overview():
"""显示数据概览"""
st.header("📊 数据概览")
# 数据统计
stats = st.session_state.processor.get_statistics()
# 基本统计
st.subheader("基本信息")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("行数", stats["shape"][0])
with col2:
st.metric("列数", stats["shape"][1])
with col3:
st.metric("内存使用", f"{stats['memory_usage'] / 1024:.2f} KB")
with col4:
st.metric("缺失值", sum(stats["missing_values"].values()))
# 数据类型
st.subheader("数据类型")
dtypes_df = pd.DataFrame(list(stats["dtypes"].items()), columns=["列名", "数据类型"])
st.dataframe(dtypes_df)
# 缺失值
if any(stats["missing_values"].values()): # any()任一为True即返回True
st.subheader("缺失值")
missing_df = pd.DataFrame(
[(k, v) for k, v in stats["missing_values"].items() if v > 0],
columns=["列名", "缺失数量"]
)
st.dataframe(missing_df)
# 数值统计
if stats["numeric_stats"]:
st.subheader("数值统计")
for col, col_stats in stats["numeric_stats"].items():
with st.expander(col):
st.json(col_stats)
def show_trend_analysis():
"""显示趋势分析"""
st.header("📈 趋势分析")
# 选择列
numeric_cols = st.session_state.df.select_dtypes(include=['number']).columns.tolist()
if not numeric_cols:
st.warning("没有数值列可供分析")
return
col1, col2 = st.columns(2)
with col1:
x_col = st.selectbox("选择X轴", numeric_cols)
with col2:
y_col = st.selectbox("选择Y轴", numeric_cols)
# 图表选项
interactive = st.checkbox("使用交互式图表", value=True)
# 绘制图表
visualizer = DataVisualizer(st.session_state.df)
fig = visualizer.line_plot(
x=x_col,
y=y_col,
title=f'{x_col} vs {y_col}',
interactive=interactive
)
if interactive:
st.plotly_chart(fig, use_container_width=True)
else:
st.pyplot(fig)
plt.close()
def show_distribution_analysis():
"""显示分布分析"""
st.header("📉 分布分析")
# 选择列
numeric_cols = st.session_state.df.select_dtypes(include=['number']).columns.tolist()
categorical_cols = st.session_state.df.select_dtypes(include=['object', 'category']).columns.tolist()
# 选择分析类型
analysis_type = st.radio(
"选择分析类型",
["数值分布", "分类分布"]
)
if analysis_type == "数值分布":
if not numeric_cols:
st.warning("没有数值列可供分析")
return
col = st.selectbox("选择列", numeric_cols)
# 图表选项
chart_type = st.selectbox("图表类型", ["直方图", "箱线图"])
interactive = st.checkbox("使用交互式图表", value=True)
# 绘制图表
visualizer = DataVisualizer(st.session_state.df)
if chart_type == "直方图":
fig = px.histogram(
st.session_state.df,
x=col,
title=f'{col} 分布',
nbins=30
)
else:
fig = visualizer.box_plot(
y=col,
title=f'{col} 分布',
interactive=interactive
)
if interactive or chart_type == "直方图":
st.plotly_chart(fig, use_container_width=True)
else:
st.pyplot(fig)
plt.close()
else:
if not categorical_cols:
st.warning("没有分类列可供分析")
return
col = st.selectbox("选择列", categorical_cols)
# 统计类别数量
value_counts = st.session_state.df[col].value_counts()
if len(value_counts) > Config.MAX_CATEGORIES:
st.warning(f"类别数量过多({len(value_counts)}),只显示前{Config.MAX_CATEGORIES}个")
value_counts = value_counts.head(Config.MAX_CATEGORIES)
# 绘制饼图
fig = px.pie(
values=value_counts.values,
names=value_counts.index,
title=f'{col} 分布'
)
st.plotly_chart(fig, use_container_width=True)
def show_relationship_analysis():
"""显示关系分析"""
st.header("🔍 关系分析")
# 选择列
numeric_cols = st.session_state.df.select_dtypes(include=['number']).columns.tolist()
if len(numeric_cols) < 2:
st.warning("至少需要2个数值列才能进行关系分析")
return
# 选择分析类型
analysis_type = st.radio(
"选择分析类型",
["散点图", "相关性热力图"]
)
if analysis_type == "散点图":
col1, col2 = st.columns(2)
with col1:
x_col = st.selectbox("选择X轴", numeric_cols)
with col2:
y_col = st.selectbox("选择Y轴", numeric_cols)
# 颜色和大小
color_col = st.selectbox("颜色列(可选)", ["无"] + numeric_cols)
size_col = st.selectbox("大小列(可选)", ["无"] + numeric_cols)
# 绘制图表
visualizer = DataVisualizer(st.session_state.df)
fig = visualizer.scatter_plot(
x=x_col,
y=y_col,
color=color_col if color_col != "无" else None,
size=size_col if size_col != "无" else None,
title=f'{x_col} vs {y_col}',
interactive=True
)
st.plotly_chart(fig, use_container_width=True)
else:
# 绘制相关性热力图
visualizer = DataVisualizer(st.session_state.df)
fig = visualizer.heatmap(
title="相关性矩阵",
interactive=True
)
st.plotly_chart(fig, use_container_width=True)
if __name__ == "__main__":
main()
6. 依赖文件 (requirements.txt)¶
Text Only
streamlit==1.28.0
pandas==2.1.0
numpy==1.24.0
matplotlib==3.7.0
seaborn==0.12.0
plotly==5.17.0
openpyxl==3.1.2
xlrd==2.0.1
python-dotenv==1.0.0
7. Streamlit配置文件 (.streamlit/config.toml)¶
TOML
[theme]
primaryColor = "#F63366"
backgroundColor = "#FFFFFF"
secondaryBackgroundColor = "#F0F2F6"
textColor = "#262730"
font = "sans serif"
[client]
showErrorDetails = false
maxUploadSize = 200
[logger]
level = "info"
🚀 部署说明¶
1. 本地部署¶
步骤1: 克隆项目¶
步骤2: 创建虚拟环境¶
Bash
python -m venv venv
# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
步骤3: 安装依赖¶
步骤4: 运行应用¶
2. Docker部署¶
Bash
# 构建镜像
docker build -t data-dashboard .
# 运行容器
docker run -d \
--name data-dashboard \
-p 8501:8501 \
data-dashboard
3. 云部署¶
Streamlit Cloud¶
- 将代码推送到GitHub
- 访问 https://share.streamlit.io/
- 连接GitHub仓库
- 部署应用
其他云平台¶
- AWS: 使用EC2或Elastic Beanstalk
- Google Cloud: 使用App Engine
- Azure: 使用App Service
🔧 扩展方向¶
1. 功能扩展¶
- 实时数据: 支持实时数据更新
- 数据源扩展: 支持数据库、API等数据源
- 更多图表: 添加更多图表类型
- 导出功能: 支持导出图表和数据
- 报告生成: 自动生成分析报告
2. 性能优化¶
- 数据缓存: 缓存处理结果
- 懒加载: 大数据集的懒加载
- 并行处理: 并行处理数据
- 内存优化: 优化内存使用
3. 用户体验¶
- 自定义主题: 支持自定义主题
- 图表模板: 预定义图表模板
- 拖拽布局: 支持拖拽调整布局
- 快捷键: 支持键盘快捷键
4. 企业功能¶
- 用户认证: 添加用户登录
- 权限管理: 细粒度权限控制
- 审计日志: 操作审计
- 数据安全: 数据加密和脱敏
📚 学习收获¶
完成本项目后,你将掌握:
- Pandas: 熟练使用Pandas处理数据
- 数据可视化: 掌握多种可视化技术
- Streamlit: 使用Streamlit构建Web应用
- 数据分析: 数据清洗和分析技能
- 交互式图表: 创建交互式可视化
- 项目开发: 完整的项目开发流程
🎉 开始学习¶
现在你已经了解了整个数据可视化仪表板的实现,开始动手构建你自己的数据可视化应用吧!
推荐学习顺序: 1. 先实现基本的数据加载和预览 2. 然后添加数据清洗功能 3. 接着实现各种可视化图表 4. 最后添加交互功能和高级特性
祝你学习顺利! 💪