跳转至

实战项目: 数据可视化仪表板

难度: ⭐⭐⭐ 中等 时间: 12-15小时 涉及知识: Pandas、Matplotlib、Seaborn、Streamlit、数据可视化


📖 项目概述

项目背景

在数据驱动的时代,能够将数据转化为直观、易懂的可视化图表是一项重要技能。数据可视化仪表板可以实时展示关键指标,帮助决策者快速理解数据背后的含义,做出明智的决策。

本项目的目标是构建一个完整的数据可视化仪表板,能够: - 加载和处理各种数据源 - 创建多种类型的可视化图表 - 支持交互式数据探索 - 提供美观的用户界面 - 实现数据过滤和筛选

项目目标

构建一个功能完整的数据可视化仪表板,能够: - 加载CSV、Excel等格式的数据 - 数据清洗和预处理 - 创建多种可视化图表(折线图、柱状图、饼图、散点图等) - 支持交互式数据探索 - 实现数据过滤和筛选 - 提供美观的Web界面

技术栈

  • 数据处理: Pandas
  • 数据可视化: Matplotlib, Seaborn, Plotly
  • Web框架: Streamlit
  • 数值计算: NumPy
  • 文件处理: openpyxl, xlrd

🏗️ 项目结构

Text Only
data-dashboard/
├── app/                      # 应用主目录
│   ├── __init__.py
│   ├── main.py              # Streamlit主应用
│   ├── data_loader.py       # 数据加载器
│   ├── data_processor.py    # 数据处理器
│   ├── visualizer.py        # 可视化工具
│   └── filters.py          # 过滤器
├── data/                    # 数据目录
│   ├── raw/                # 原始数据
│   ├── processed/          # 处理后数据
│   └── sample/            # 示例数据
├── components/              # UI组件
│   ├── __init__.py
│   ├── charts.py          # 图表组件
│   ├── filters.py         # 过滤器组件
│   └── stats.py          # 统计组件
├── utils/                  # 工具函数
│   ├── __init__.py
│   ├── helpers.py         # 辅助函数
│   └── config.py         # 配置文件
├── tests/                  # 测试目录
│   ├── test_data_loader.py
│   ├── test_data_processor.py
│   └── test_visualizer.py
├── requirements.txt         # Python依赖
├── README.md              # 项目说明
└── .streamlit/            # Streamlit配置
    └── config.toml

🎯 核心功能

1. 数据加载

  • 多格式支持: 支持CSV、Excel、JSON等格式
  • 批量加载: 批量加载多个文件
  • 数据预览: 快速预览数据
  • 元数据提取: 提取数据元信息

2. 数据处理

  • 数据清洗: 处理缺失值、异常值
  • 数据转换: 数据类型转换
  • 特征工程: 创建新特征
  • 数据聚合: 数据分组和聚合

3. 数据可视化

  • 折线图: 展示趋势变化
  • 柱状图: 比较不同类别
  • 饼图: 显示占比分布
  • 散点图: 探索变量关系
  • 热力图: 显示相关性矩阵
  • 箱线图: 显示数据分布

4. 交互功能

  • 数据过滤: 根据条件过滤数据
  • 时间范围选择: 选择时间范围
  • 图表交互: 缩放、平移、选择
  • 数据导出: 导出处理后的数据

5. 统计分析

  • 描述性统计: 计算基本统计量
  • 相关性分析: 计算相关性
  • 趋势分析: 分析数据趋势
  • 异常检测: 检测异常值

💻 代码实现

1. 配置文件 (utils/config.py)

Python
"""
配置文件
"""
from pathlib import Path

class Config:
    """应用配置"""

    # 数据目录
    DATA_DIR = Path("./data")
    RAW_DATA_DIR = DATA_DIR / "raw"
    PROCESSED_DATA_DIR = DATA_DIR / "processed"
    SAMPLE_DATA_DIR = DATA_DIR / "sample"

    # 支持的文件格式
    SUPPORTED_FORMATS = [".csv", ".xlsx", ".xls", ".json"]

    # 图表配置
    DEFAULT_FIGURE_SIZE = (10, 6)
    DEFAULT_COLOR_PALETTE = "Set2"

    # Streamlit配置
    PAGE_TITLE = "数据可视化仪表板"
    PAGE_ICON = "📊"
    LAYOUT = "wide"

    # 数据处理配置
    MAX_ROWS_PREVIEW = 1000
    MAX_CATEGORIES = 20

    @classmethod  # @classmethod定义类方法,第一个参数为类本身
    def create_directories(cls):
        """创建必要的目录"""
        cls.DATA_DIR.mkdir(parents=True, exist_ok=True)
        cls.RAW_DATA_DIR.mkdir(parents=True, exist_ok=True)
        cls.PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)
        cls.SAMPLE_DATA_DIR.mkdir(parents=True, exist_ok=True)

# 创建目录
Config.create_directories()

2. 数据加载器 (app/data_loader.py)

Python
"""
数据加载器
"""
import pandas as pd
from pathlib import Path
from typing import Any
from utils.config import Config

class DataLoader:
    """数据加载器"""

    def __init__(self):
        """初始化数据加载器"""
        self.supported_formats = Config.SUPPORTED_FORMATS

    def load_file(
        self,
        file_path: str | Path,
        **kwargs
    ) -> pd.DataFrame:
        """
        加载单个文件

        Args:
            file_path: 文件路径
            **kwargs: 传递给pandas读取函数的额外参数

        Returns:
            DataFrame对象
        """
        file_path = Path(file_path)

        if not file_path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")

        # 根据文件扩展名选择读取方法
        extension = file_path.suffix.lower()

        if extension == ".csv":
            return self._load_csv(file_path, **kwargs)
        elif extension in [".xlsx", ".xls"]:
            return self._load_excel(file_path, **kwargs)
        elif extension == ".json":
            return self._load_json(file_path, **kwargs)
        else:
            raise ValueError(f"不支持的文件格式: {extension}")

    def _load_csv(
        self,
        file_path: Path,
        encoding: str = "utf-8",
        **kwargs
    ) -> pd.DataFrame:
        """加载CSV文件"""
        return pd.read_csv(file_path, encoding=encoding, **kwargs)

    def _load_excel(
        self,
        file_path: Path,
        sheet_name: str | int = 0,
        **kwargs
    ) -> pd.DataFrame:
        """加载Excel文件"""
        return pd.read_excel(file_path, sheet_name=sheet_name, **kwargs)

    def _load_json(
        self,
        file_path: Path,
        **kwargs
    ) -> pd.DataFrame:
        """加载JSON文件"""
        return pd.read_json(file_path, **kwargs)

    def load_directory(
        self,
        directory: str | Path,
        pattern: str = "*",
        combine: bool = False
    ) -> list[pd.DataFrame] | pd.DataFrame:
        """
        加载目录中的所有文件

        Args:
            directory: 目录路径
            pattern: 文件匹配模式
            combine: 是否合并所有文件

        Returns:
            DataFrame列表或合并后的DataFrame
        """
        directory = Path(directory)

        # 查找所有支持的文件
        files = []
        for ext in self.supported_formats:
            files.extend(directory.glob(f"{pattern}{ext}"))

        if not files:
            raise FileNotFoundError(f"目录中没有找到支持的文件: {directory}")

        # 加载所有文件
        dataframes = []
        for file_path in files:
            try:
                df = self.load_file(file_path)
                df["source_file"] = file_path.name
                dataframes.append(df)
            except Exception as e:
                print(f"加载文件失败 {file_path}: {str(e)}")

        if combine:
            return pd.concat(dataframes, ignore_index=True)
        else:
            return dataframes

    def get_file_info(
        self,
        file_path: str | Path
    ) -> dict:
        """
        获取文件信息

        Args:
            file_path: 文件路径

        Returns:
            文件信息字典
        """
        file_path = Path(file_path)

        if not file_path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")

        # 加载数据
        df = self.load_file(file_path)

        # 提取信息
        info = {
            "file_name": file_path.name,
            "file_size": file_path.stat().st_size,
            "file_format": file_path.suffix,
            "rows": len(df),
            "columns": len(df.columns),
            "column_names": df.columns.tolist(),
            "column_types": df.dtypes.astype(str).to_dict(),
            "memory_usage": df.memory_usage(deep=True).sum(),
            "has_missing": df.isnull().any().any(),
            "missing_counts": df.isnull().sum().to_dict()
        }

        return info

3. 数据处理器 (app/data_processor.py)

Python
"""
数据处理器
"""
import pandas as pd
import numpy as np
from typing import Any
from utils.config import Config

class DataProcessor:
    """数据处理器"""

    def __init__(self, df: pd.DataFrame):
        """
        初始化数据处理器

        Args:
            df: DataFrame对象
        """
        self.df = df.copy()
        self.original_df = df.copy()

    def clean_data(
        self,
        drop_duplicates: bool = True,
        handle_missing: str = "drop",
        missing_threshold: float = 0.5
    ) -> 'DataProcessor':
        """
        清洗数据

        Args:
            drop_duplicates: 是否删除重复行
            handle_missing: 处理缺失值的方式 (drop, fill, keep)
            missing_threshold: 缺失值阈值,超过此比例的列将被删除

        Returns:
            自身对象
        """
        # 删除重复行
        if drop_duplicates:
            self.df = self.df.drop_duplicates()

        # 处理缺失值
        if handle_missing == "drop":
            # 删除缺失值比例过高的列
            missing_ratio = self.df.isnull().mean()
            cols_to_drop = missing_ratio[missing_ratio > missing_threshold].index.tolist()
            if cols_to_drop:
                self.df = self.df.drop(columns=cols_to_drop)

            # 删除包含缺失值的行
            self.df = self.df.dropna()

        elif handle_missing == "fill":
            # 数值列用中位数填充
            numeric_cols = self.df.select_dtypes(include=[np.number]).columns
            self.df[numeric_cols] = self.df[numeric_cols].fillna(
                self.df[numeric_cols].median()
            )

            # 分类列用众数填充
            categorical_cols = self.df.select_dtypes(include=['object']).columns
            for col in categorical_cols:
                mode = self.df[col].mode()
                if len(mode) > 0:
                    self.df[col] = self.df[col].fillna(mode[0])

        return self

    def convert_types(
        self,
        type_mapping: dict[str, str] | None = None
    ) -> 'DataProcessor':
        """
        转换数据类型

        Args:
            type_mapping: 列名到类型的映射

        Returns:
            自身对象
        """
        if type_mapping:
            for col, dtype in type_mapping.items():
                if col in self.df.columns:
                    try:
                        if dtype == "datetime":
                            self.df[col] = pd.to_datetime(self.df[col])
                        elif dtype == "category":
                            self.df[col] = self.df[col].astype('category')
                        else:
                            self.df[col] = self.df[col].astype(dtype)
                    except Exception as e:
                        print(f"转换列 {col} 失败: {str(e)}")

        return self

    def remove_outliers(
        self,
        columns: list[str] | None = None,
        method: str = "iqr",
        threshold: float = 1.5
    ) -> 'DataProcessor':
        """
        移除异常值

        Args:
            columns: 要处理的列名列表
            method: 异常值检测方法 (iqr, zscore)
            threshold: 阈值

        Returns:
            自身对象
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns.tolist()

        for col in columns:
            if col not in self.df.columns:
                continue

            if method == "iqr":
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1

                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR

                self.df = self.df[
                    (self.df[col] >= lower_bound) &
                    (self.df[col] <= upper_bound)
                ]

            elif method == "zscore":
                z_scores = np.abs(
                    (self.df[col] - self.df[col].mean()) / self.df[col].std()
                )
                self.df = self.df[z_scores < threshold]

        return self

    def aggregate_data(
        self,
        group_by: str | list[str],
        agg_func: str | dict[str, str] = "mean"
    ) -> pd.DataFrame:
        """
        聚合数据

        Args:
            group_by: 分组列
            agg_func: 聚合函数

        Returns:
            聚合后的DataFrame
        """
        return self.df.groupby(group_by).agg(agg_func).reset_index()

    def create_features(
        self,
        feature_configs: list[dict[str, Any]]
    ) -> 'DataProcessor':
        """
        创建新特征

        Args:
            feature_configs: 特征配置列表

        Returns:
            自身对象
        """
        for config in feature_configs:
            feature_name = config["name"]
            feature_type = config["type"]
            params = config.get("params", {})

            if feature_type == "arithmetic":
                # 算术运算(使用 df.eval() 替代内置 eval,避免代码注入风险)
                self.df[feature_name] = self.df.eval(
                    params["expression"]
                )

            elif feature_type == "datetime":
                # 日期时间特征
                date_col = params["column"]
                if date_col in self.df.columns:
                    self.df[feature_name] = getattr(  # getattr()动态获取对象属性,可设默认值
                        self.df[date_col].dt,
                        params["attribute"]
                    )

            elif feature_type == "binning":
                # 分箱
                self.df[feature_name] = pd.cut(
                    self.df[params["column"]],
                    bins=params["bins"],
                    labels=params.get("labels")
                )

        return self

    def get_statistics(self) -> dict[str, Any]:
        """
        获取统计信息

        Returns:
            统计信息字典
        """
        stats = {
            "shape": self.df.shape,
            "columns": self.df.columns.tolist(),
            "dtypes": self.df.dtypes.astype(str).to_dict(),
            "memory_usage": self.df.memory_usage(deep=True).sum(),
            "missing_values": self.df.isnull().sum().to_dict(),
            "numeric_stats": {},
            "categorical_stats": {}
        }

        # 数值列统计
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 0:
            stats["numeric_stats"] = self.df[numeric_cols].describe().to_dict()

        # 分类列统计
        categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
        for col in categorical_cols:
            stats["categorical_stats"][col] = {
                "unique": self.df[col].nunique(),
                "top": self.df[col].mode().iloc[0] if len(self.df[col].mode()) > 0 else None,
                "freq": self.df[col].value_counts().iloc[0] if len(self.df[col].value_counts()) > 0 else 0
            }

        return stats

    def get_data(self) -> pd.DataFrame:
        """
        获取处理后的数据

        Returns:
            DataFrame对象
        """
        return self.df.copy()

    def reset(self):
        """重置为原始数据"""
        self.df = self.original_df.copy()

4. 可视化工具 (app/visualizer.py)

Python
"""
可视化工具
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from typing import Any
from utils.config import Config

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False

# 设置样式
sns.set_style("whitegrid")
sns.set_palette(Config.DEFAULT_COLOR_PALETTE)

class DataVisualizer:
    """数据可视化工具"""

    def __init__(self, df: pd.DataFrame):
        """
        初始化可视化工具

        Args:
            df: DataFrame对象
        """
        self.df = df
        self.default_figsize = Config.DEFAULT_FIGURE_SIZE

    def line_plot(
        self,
        x: str,
        y: str | list[str],
        title: str | None = None,
        figsize: tuple | None = None,
        interactive: bool = False
    ) -> plt.Figure | go.Figure:
        """
        绘制折线图

        Args:
            x: x轴列名
            y: y轴列名或列名列表
            title: 图表标题
            figsize: 图表大小
            interactive: 是否使用交互式图表

        Returns:
            图表对象
        """
        if figsize is None:
            figsize = self.default_figsize

        if interactive:
            return self._line_plot_interactive(x, y, title)
        else:
            return self._line_plot_static(x, y, title, figsize)

    def _line_plot_static(
        self,
        x: str,
        y: str | list[str],
        title: str | None,
        figsize: tuple
    ) -> plt.Figure:
        """绘制静态折线图"""
        fig, ax = plt.subplots(figsize=figsize)

        if isinstance(y, str):
            y = [y]

        for col in y:
            ax.plot(self.df[x], self.df[col], label=col, marker='o')

        ax.set_xlabel(x)
        ax.set_ylabel('值')
        ax.set_title(title or f'{x} vs {", ".join(y)}')
        ax.legend()
        ax.grid(True, alpha=0.3)

        plt.tight_layout()
        return fig

    def _line_plot_interactive(
        self,
        x: str,
        y: str | list[str],
        title: str | None
    ) -> go.Figure:
        """绘制交互式折线图"""
        if isinstance(y, str):
            y = [y]

        fig = go.Figure()

        for col in y:
            fig.add_trace(go.Scatter(
                x=self.df[x],
                y=self.df[col],
                mode='lines+markers',
                name=col,
                line=dict(width=2),
                marker=dict(size=6)
            ))

        fig.update_layout(
            title=title or f'{x} vs {", ".join(y)}',
            xaxis_title=x,
            yaxis_title='值',
            hovermode='x unified',
            template='plotly_white'
        )

        return fig

    def bar_plot(
        self,
        x: str,
        y: str,
        title: str | None = None,
        figsize: tuple | None = None,
        interactive: bool = False,
        horizontal: bool = False
    ) -> plt.Figure | go.Figure:
        """
        绘制柱状图

        Args:
            x: x轴列名
            y: y轴列名
            title: 图表标题
            figsize: 图表大小
            interactive: 是否使用交互式图表
            horizontal: 是否水平显示

        Returns:
            图表对象
        """
        if figsize is None:
            figsize = self.default_figsize

        if interactive:
            return self._bar_plot_interactive(x, y, title, horizontal)
        else:
            return self._bar_plot_static(x, y, title, figsize, horizontal)

    def _bar_plot_static(
        self,
        x: str,
        y: str,
        title: str | None,
        figsize: tuple,
        horizontal: bool
    ) -> plt.Figure:
        """绘制静态柱状图"""
        fig, ax = plt.subplots(figsize=figsize)

        if horizontal:
            ax.barh(self.df[y], self.df[x])
            ax.set_xlabel(x)
            ax.set_ylabel(y)
        else:
            ax.bar(self.df[x], self.df[y])
            ax.set_xlabel(x)
            ax.set_ylabel(y)

        ax.set_title(title or f'{x} vs {y}')
        ax.grid(True, alpha=0.3, axis='y')

        plt.tight_layout()
        return fig

    def _bar_plot_interactive(
        self,
        x: str,
        y: str,
        title: str | None,
        horizontal: bool
    ) -> go.Figure:
        """绘制交互式柱状图"""
        if horizontal:
            fig = px.bar(self.df, x=y, y=x, orientation='h')
        else:
            fig = px.bar(self.df, x=x, y=y)

        fig.update_layout(
            title=title or f'{x} vs {y}',
            template='plotly_white'
        )

        return fig

    def pie_plot(
        self,
        values: str,
        names: str,
        title: str | None = None,
        figsize: tuple | None = None,
        interactive: bool = False
    ) -> plt.Figure | go.Figure:
        """
        绘制饼图

        Args:
            values: 值列名
            names: 名称列名
            title: 图表标题
            figsize: 图表大小
            interactive: 是否使用交互式图表

        Returns:
            图表对象
        """
        if figsize is None:
            figsize = (8, 8)

        if interactive:
            return self._pie_plot_interactive(values, names, title)
        else:
            return self._pie_plot_static(values, names, title, figsize)

    def _pie_plot_static(
        self,
        values: str,
        names: str,
        title: str | None,
        figsize: tuple
    ) -> plt.Figure:
        """绘制静态饼图"""
        fig, ax = plt.subplots(figsize=figsize)

        ax.pie(
            self.df[values],
            labels=self.df[names],
            autopct='%1.1f%%',
            startangle=90
        )

        ax.set_title(title or f'{names} 分布')

        return fig

    def _pie_plot_interactive(
        self,
        values: str,
        names: str,
        title: str | None
    ) -> go.Figure:
        """绘制交互式饼图"""
        fig = px.pie(
            self.df,
            values=values,
            names=names,
            title=title or f'{names} 分布'
        )

        return fig

    def scatter_plot(
        self,
        x: str,
        y: str,
        color: str | None = None,
        size: str | None = None,
        title: str | None = None,
        figsize: tuple | None = None,
        interactive: bool = False
    ) -> plt.Figure | go.Figure:
        """
        绘制散点图

        Args:
            x: x轴列名
            y: y轴列名
            color: 颜色列名
            size: 大小列名
            title: 图表标题
            figsize: 图表大小
            interactive: 是否使用交互式图表

        Returns:
            图表对象
        """
        if figsize is None:
            figsize = self.default_figsize

        if interactive:
            return self._scatter_plot_interactive(x, y, color, size, title)
        else:
            return self._scatter_plot_static(x, y, color, size, title, figsize)

    def _scatter_plot_static(
        self,
        x: str,
        y: str,
        color: str | None,
        size: str | None,
        title: str | None,
        figsize: tuple
    ) -> plt.Figure:
        """绘制静态散点图"""
        fig, ax = plt.subplots(figsize=figsize)

        if color:
            scatter = ax.scatter(
                self.df[x],
                self.df[y],
                c=self.df[color],
                s=self.df[size] if size else 50,
                alpha=0.6,
                cmap='viridis'
            )
            plt.colorbar(scatter, ax=ax, label=color)
        else:
            ax.scatter(
                self.df[x],
                self.df[y],
                s=self.df[size] if size else 50,
                alpha=0.6
            )

        ax.set_xlabel(x)
        ax.set_ylabel(y)
        ax.set_title(title or f'{x} vs {y}')
        ax.grid(True, alpha=0.3)

        plt.tight_layout()
        return fig

    def _scatter_plot_interactive(
        self,
        x: str,
        y: str,
        color: str | None,
        size: str | None,
        title: str | None
    ) -> go.Figure:
        """绘制交互式散点图"""
        fig = px.scatter(
            self.df,
            x=x,
            y=y,
            color=color,
            size=size,
            title=title or f'{x} vs {y}',
            template='plotly_white'
        )

        return fig

    def heatmap(
        self,
        title: str | None = None,
        figsize: tuple | None = None,
        interactive: bool = False
    ) -> plt.Figure | go.Figure:
        """
        绘制热力图

        Args:
            title: 图表标题
            figsize: 图表大小
            interactive: 是否使用交互式图表

        Returns:
            图表对象
        """
        if figsize is None:
            figsize = (10, 8)

        # 计算相关性矩阵
        corr_matrix = self.df.corr(numeric_only=True)

        if interactive:
            return self._heatmap_interactive(corr_matrix, title)
        else:
            return self._heatmap_static(corr_matrix, title, figsize)

    def _heatmap_static(
        self,
        corr_matrix: pd.DataFrame,
        title: str | None,
        figsize: tuple
    ) -> plt.Figure:
        """绘制静态热力图"""
        fig, ax = plt.subplots(figsize=figsize)

        sns.heatmap(
            corr_matrix,
            annot=True,
            fmt='.2f',
            cmap='coolwarm',
            center=0,
            square=True,
            ax=ax
        )

        ax.set_title(title or '相关性矩阵')

        plt.tight_layout()
        return fig

    def _heatmap_interactive(
        self,
        corr_matrix: pd.DataFrame,
        title: str | None
    ) -> go.Figure:
        """绘制交互式热力图"""
        fig = px.imshow(
            corr_matrix,
            text_auto=True,
            aspect='auto',
            color_continuous_scale='RdBu_r',
            title=title or '相关性矩阵'
        )

        return fig

    def box_plot(
        self,
        y: str,
        x: str | None = None,
        title: str | None = None,
        figsize: tuple | None = None,
        interactive: bool = False
    ) -> plt.Figure | go.Figure:
        """
        绘制箱线图

        Args:
            y: y轴列名
            x: x轴列名(可选)
            title: 图表标题
            figsize: 图表大小
            interactive: 是否使用交互式图表

        Returns:
            图表对象
        """
        if figsize is None:
            figsize = self.default_figsize

        if interactive:
            return self._box_plot_interactive(y, x, title)
        else:
            return self._box_plot_static(y, x, title, figsize)

    def _box_plot_static(
        self,
        y: str,
        x: str | None,
        title: str | None,
        figsize: tuple
    ) -> plt.Figure:
        """绘制静态箱线图"""
        fig, ax = plt.subplots(figsize=figsize)

        if x:
            sns.boxplot(data=self.df, x=x, y=y, ax=ax)
        else:
            sns.boxplot(data=self.df, y=y, ax=ax)

        ax.set_title(title or f'{y} 分布')
        ax.grid(True, alpha=0.3, axis='y')

        plt.tight_layout()
        return fig

    def _box_plot_interactive(
        self,
        y: str,
        x: str | None,
        title: str | None
    ) -> go.Figure:
        """绘制交互式箱线图"""
        if x:
            fig = px.box(self.df, x=x, y=y)
        else:
            fig = px.box(self.df, y=y)

        fig.update_layout(
            title=title or f'{y} 分布',
            template='plotly_white'
        )

        return fig

5. Streamlit主应用 (app/main.py)

Python
"""
Streamlit主应用
"""
import streamlit as st
import pandas as pd
from pathlib import Path
import io

from app.data_loader import DataLoader
from app.data_processor import DataProcessor
from app.visualizer import DataVisualizer
from utils.config import Config

# 页面配置
st.set_page_config(
    page_title=Config.PAGE_TITLE,
    page_icon=Config.PAGE_ICON,
    layout=Config.LAYOUT
)

# 初始化session state
if "df" not in st.session_state:
    st.session_state.df = None
if "processor" not in st.session_state:
    st.session_state.processor = None
if "visualizer" not in st.session_state:
    st.session_state.visualizer = None

def main():
    """主函数"""
    st.title("📊 数据可视化仪表板")

    # 侧边栏
    with st.sidebar:
        st.header("📁 数据加载")

        # 文件上传
        uploaded_file = st.file_uploader(
            "上传数据文件",
            type=['csv', 'xlsx', 'xls', 'json']
        )

        if uploaded_file:
            load_data(uploaded_file)

        st.divider()

        # 数据处理选项
        if st.session_state.df is not None:
            st.header("🔧 数据处理")

            # 数据清洗
            if st.button("清洗数据"):
                with st.spinner("正在清洗数据..."):
                    processor = DataProcessor(st.session_state.df)
                    processor.clean_data()
                    st.session_state.df = processor.get_data()
                    st.success("数据清洗完成!")

            # 移除异常值
            if st.button("移除异常值"):
                with st.spinner("正在移除异常值..."):
                    processor = DataProcessor(st.session_state.df)
                    processor.remove_outliers()
                    st.session_state.df = processor.get_data()
                    st.success("异常值移除完成!")

            # 重置数据
            if st.button("重置数据"):
                if "original_df" in st.session_state:
                    st.session_state.df = st.session_state.original_df.copy()
                    st.success("数据已重置!")

    # 主区域
    if st.session_state.df is not None:
        show_dashboard()
    else:
        show_welcome()

def load_data(uploaded_file):
    """加载数据"""
    try:
        # 保存文件
        file_path = Path(Config.RAW_DATA_DIR) / uploaded_file.name
        with open(file_path, "wb") as f:
            f.write(uploaded_file.getbuffer())

        # 加载数据
        loader = DataLoader()
        df = loader.load_file(file_path)

        # 保存到session state
        st.session_state.df = df
        st.session_state.original_df = df.copy()
        st.session_state.processor = DataProcessor(df)
        st.session_state.visualizer = DataVisualizer(df)

        st.success(f"数据加载成功!共 {len(df)} 行, {len(df.columns)} 列")

        # 显示数据预览
        st.subheader("📋 数据预览")
        st.dataframe(df.head())

        # 显示数据信息
        st.subheader("📊 数据信息")
        file_info = loader.get_file_info(file_path)

        col1, col2, col3 = st.columns(3)
        with col1:
            st.metric("行数", file_info["rows"])
        with col2:
            st.metric("列数", file_info["columns"])
        with col3:
            st.metric("文件大小", f"{file_info['file_size'] / 1024:.2f} KB")

    except Exception as e:
        st.error(f"加载数据失败: {str(e)}")

def show_welcome():
    """显示欢迎页面"""
    st.markdown("""
    ## 欢迎使用数据可视化仪表板!

    ### 功能特点

    - 📁 **多格式支持**: 支持CSV、Excel、JSON等格式
    - 🔧 **数据清洗**: 自动处理缺失值和异常值
    - 📊 **多种图表**: 折线图、柱状图、饼图、散点图等
    - 🎨 **交互式图表**: 支持缩放、平移、选择等交互
    - 🔍 **数据过滤**: 灵活的数据过滤和筛选

    ### 使用步骤

    1. 在侧边栏上传数据文件
    2. 查看数据预览和统计信息
    3. 选择要创建的图表类型
    4. 自定义图表参数
    5. 导出图表或数据

    ### 示例数据

    您可以使用以下示例数据进行测试:
    - 销售数据
    - 用户数据
    - 产品数据

    开始上传您的数据吧! 🚀
    """)

def show_dashboard():
    """显示仪表板"""
    # 创建标签页
    tab1, tab2, tab3, tab4 = st.tabs(["📊 数据概览", "📈 趋势分析", "📉 分布分析", "🔍 关系分析"])

    with tab1:
        show_overview()

    with tab2:
        show_trend_analysis()

    with tab3:
        show_distribution_analysis()

    with tab4:
        show_relationship_analysis()

def show_overview():
    """显示数据概览"""
    st.header("📊 数据概览")

    # 数据统计
    stats = st.session_state.processor.get_statistics()

    # 基本统计
    st.subheader("基本信息")
    col1, col2, col3, col4 = st.columns(4)
    with col1:
        st.metric("行数", stats["shape"][0])
    with col2:
        st.metric("列数", stats["shape"][1])
    with col3:
        st.metric("内存使用", f"{stats['memory_usage'] / 1024:.2f} KB")
    with col4:
        st.metric("缺失值", sum(stats["missing_values"].values()))

    # 数据类型
    st.subheader("数据类型")
    dtypes_df = pd.DataFrame(list(stats["dtypes"].items()), columns=["列名", "数据类型"])
    st.dataframe(dtypes_df)

    # 缺失值
    if any(stats["missing_values"].values()):  # any()任一为True即返回True
        st.subheader("缺失值")
        missing_df = pd.DataFrame(
            [(k, v) for k, v in stats["missing_values"].items() if v > 0],
            columns=["列名", "缺失数量"]
        )
        st.dataframe(missing_df)

    # 数值统计
    if stats["numeric_stats"]:
        st.subheader("数值统计")
        for col, col_stats in stats["numeric_stats"].items():
            with st.expander(col):
                st.json(col_stats)

def show_trend_analysis():
    """显示趋势分析"""
    st.header("📈 趋势分析")

    # 选择列
    numeric_cols = st.session_state.df.select_dtypes(include=['number']).columns.tolist()

    if not numeric_cols:
        st.warning("没有数值列可供分析")
        return

    col1, col2 = st.columns(2)
    with col1:
        x_col = st.selectbox("选择X轴", numeric_cols)
    with col2:
        y_col = st.selectbox("选择Y轴", numeric_cols)

    # 图表选项
    interactive = st.checkbox("使用交互式图表", value=True)

    # 绘制图表
    visualizer = DataVisualizer(st.session_state.df)
    fig = visualizer.line_plot(
        x=x_col,
        y=y_col,
        title=f'{x_col} vs {y_col}',
        interactive=interactive
    )

    if interactive:
        st.plotly_chart(fig, use_container_width=True)
    else:
        st.pyplot(fig)
        plt.close()

def show_distribution_analysis():
    """显示分布分析"""
    st.header("📉 分布分析")

    # 选择列
    numeric_cols = st.session_state.df.select_dtypes(include=['number']).columns.tolist()
    categorical_cols = st.session_state.df.select_dtypes(include=['object', 'category']).columns.tolist()

    # 选择分析类型
    analysis_type = st.radio(
        "选择分析类型",
        ["数值分布", "分类分布"]
    )

    if analysis_type == "数值分布":
        if not numeric_cols:
            st.warning("没有数值列可供分析")
            return

        col = st.selectbox("选择列", numeric_cols)

        # 图表选项
        chart_type = st.selectbox("图表类型", ["直方图", "箱线图"])
        interactive = st.checkbox("使用交互式图表", value=True)

        # 绘制图表
        visualizer = DataVisualizer(st.session_state.df)

        if chart_type == "直方图":
            fig = px.histogram(
                st.session_state.df,
                x=col,
                title=f'{col} 分布',
                nbins=30
            )
        else:
            fig = visualizer.box_plot(
                y=col,
                title=f'{col} 分布',
                interactive=interactive
            )

        if interactive or chart_type == "直方图":
            st.plotly_chart(fig, use_container_width=True)
        else:
            st.pyplot(fig)
            plt.close()

    else:
        if not categorical_cols:
            st.warning("没有分类列可供分析")
            return

        col = st.selectbox("选择列", categorical_cols)

        # 统计类别数量
        value_counts = st.session_state.df[col].value_counts()

        if len(value_counts) > Config.MAX_CATEGORIES:
            st.warning(f"类别数量过多({len(value_counts)}),只显示前{Config.MAX_CATEGORIES}个")
            value_counts = value_counts.head(Config.MAX_CATEGORIES)

        # 绘制饼图
        fig = px.pie(
            values=value_counts.values,
            names=value_counts.index,
            title=f'{col} 分布'
        )

        st.plotly_chart(fig, use_container_width=True)

def show_relationship_analysis():
    """显示关系分析"""
    st.header("🔍 关系分析")

    # 选择列
    numeric_cols = st.session_state.df.select_dtypes(include=['number']).columns.tolist()

    if len(numeric_cols) < 2:
        st.warning("至少需要2个数值列才能进行关系分析")
        return

    # 选择分析类型
    analysis_type = st.radio(
        "选择分析类型",
        ["散点图", "相关性热力图"]
    )

    if analysis_type == "散点图":
        col1, col2 = st.columns(2)
        with col1:
            x_col = st.selectbox("选择X轴", numeric_cols)
        with col2:
            y_col = st.selectbox("选择Y轴", numeric_cols)

        # 颜色和大小
        color_col = st.selectbox("颜色列(可选)", ["无"] + numeric_cols)
        size_col = st.selectbox("大小列(可选)", ["无"] + numeric_cols)

        # 绘制图表
        visualizer = DataVisualizer(st.session_state.df)
        fig = visualizer.scatter_plot(
            x=x_col,
            y=y_col,
            color=color_col if color_col != "无" else None,
            size=size_col if size_col != "无" else None,
            title=f'{x_col} vs {y_col}',
            interactive=True
        )

        st.plotly_chart(fig, use_container_width=True)

    else:
        # 绘制相关性热力图
        visualizer = DataVisualizer(st.session_state.df)
        fig = visualizer.heatmap(
            title="相关性矩阵",
            interactive=True
        )

        st.plotly_chart(fig, use_container_width=True)

if __name__ == "__main__":
    main()

6. 依赖文件 (requirements.txt)

Text Only
streamlit==1.28.0
pandas==2.1.0
numpy==1.24.0
matplotlib==3.7.0
seaborn==0.12.0
plotly==5.17.0
openpyxl==3.1.2
xlrd==2.0.1
python-dotenv==1.0.0

7. Streamlit配置文件 (.streamlit/config.toml)

TOML
[theme]
primaryColor = "#F63366"
backgroundColor = "#FFFFFF"
secondaryBackgroundColor = "#F0F2F6"
textColor = "#262730"
font = "sans serif"

[client]
showErrorDetails = false
maxUploadSize = 200

[logger]
level = "info"

🚀 部署说明

1. 本地部署

步骤1: 克隆项目

Bash
git clone https://github.com/yourusername/data-dashboard.git
cd data-dashboard

步骤2: 创建虚拟环境

Bash
python -m venv venv

# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate

步骤3: 安装依赖

Bash
pip install -r requirements.txt

步骤4: 运行应用

Bash
streamlit run app/main.py

2. Docker部署

Bash
# 构建镜像
docker build -t data-dashboard .

# 运行容器
docker run -d \
  --name data-dashboard \
  -p 8501:8501 \
  data-dashboard

3. 云部署

Streamlit Cloud

  1. 将代码推送到GitHub
  2. 访问 https://share.streamlit.io/
  3. 连接GitHub仓库
  4. 部署应用

其他云平台

  • AWS: 使用EC2或Elastic Beanstalk
  • Google Cloud: 使用App Engine
  • Azure: 使用App Service

🔧 扩展方向

1. 功能扩展

  • 实时数据: 支持实时数据更新
  • 数据源扩展: 支持数据库、API等数据源
  • 更多图表: 添加更多图表类型
  • 导出功能: 支持导出图表和数据
  • 报告生成: 自动生成分析报告

2. 性能优化

  • 数据缓存: 缓存处理结果
  • 懒加载: 大数据集的懒加载
  • 并行处理: 并行处理数据
  • 内存优化: 优化内存使用

3. 用户体验

  • 自定义主题: 支持自定义主题
  • 图表模板: 预定义图表模板
  • 拖拽布局: 支持拖拽调整布局
  • 快捷键: 支持键盘快捷键

4. 企业功能

  • 用户认证: 添加用户登录
  • 权限管理: 细粒度权限控制
  • 审计日志: 操作审计
  • 数据安全: 数据加密和脱敏

📚 学习收获

完成本项目后,你将掌握:

  • Pandas: 熟练使用Pandas处理数据
  • 数据可视化: 掌握多种可视化技术
  • Streamlit: 使用Streamlit构建Web应用
  • 数据分析: 数据清洗和分析技能
  • 交互式图表: 创建交互式可视化
  • 项目开发: 完整的项目开发流程

🎉 开始学习

现在你已经了解了整个数据可视化仪表板的实现,开始动手构建你自己的数据可视化应用吧!

推荐学习顺序: 1. 先实现基本的数据加载和预览 2. 然后添加数据清洗功能 3. 接着实现各种可视化图表 4. 最后添加交互功能和高级特性

祝你学习顺利! 💪