跳转至

项目1: RAG知识库问答系统

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

难度: ⭐⭐⭐ 中等 时间: 10-15小时 涉及知识: RAG技术、向量数据库、LangChain、API开发


📖 项目概述

项目背景

随着大语言模型的普及,企业需要一个能够基于内部文档回答用户问题的智能问答系统。传统的搜索引擎只能基于关键词匹配,无法理解用户意图;而大语言模型虽然能理解意图,但缺乏企业内部知识。RAG(Retrieval-Augmented Generation,检索增强生成)技术结合了两者的优势,先从知识库中检索相关文档,然后基于检索结果生成答案。

项目目标

构建一个完整的RAG知识库问答系统,能够: - 上传和管理企业文档 - 自动文档切分和向量化 - 基于用户问题检索相关文档 - 生成准确的答案并引用来源 - 支持多轮对话 - 提供友好的Web界面

技术栈

  • 后端框架: FastAPI
  • LLM框架: LangChain
  • 向量数据库: ChromaDB
  • 大模型: OpenAI GPT-4 / 通义千问 / 文心一言
  • 前端框架: Streamlit
  • 文档处理: PyPDF2, python-docx
  • 文本嵌入: OpenAI Embeddings / HuggingFace Embeddings

🏗️ 项目结构

Text Only
rag-qa-system/
├── app/                      # 应用主目录
│   ├── __init__.py
│   ├── main.py              # FastAPI主应用
│   ├── config.py            # 配置文件
│   ├── document_processor.py # 文档处理模块
│   ├── vector_store.py      # 向量数据库管理
│   ├── rag_chain.py         # RAG链构建
│   └── api/                 # API路由
│       ├── __init__.py
│       ├── documents.py     # 文档管理API
│       └── chat.py          # 聊天API
├── frontend/                 # 前端目录
│   ├── app.py              # Streamlit应用
│   ├── pages/              # 页面组件
│   └── components/         # UI组件
├── data/                    # 数据目录
│   ├── documents/          # 原始文档
│   ├── processed/          # 处理后文档
│   └── vector_store/       # 向量数据库
├── tests/                   # 测试目录
│   ├── test_document_processor.py
│   ├── test_vector_store.py
│   └── test_rag_chain.py
├── utils/                   # 工具函数
│   ├── __init__.py
│   ├── text_splitter.py    # 文本切分工具
│   └── logger.py          # 日志工具
├── requirements.txt         # Python依赖
├── Dockerfile              # Docker配置
├── docker-compose.yml      # Docker Compose配置
└── README.md              # 项目说明

🎯 核心功能

1. 文档管理

  • 文档上传: 支持PDF、Word、TXT、Markdown等格式
  • 文档预处理: 自动提取文本、清理格式
  • 文档切分: 智能切分文档为合适大小的文本块
  • 文档索引: 将文本块向量化并存储到向量数据库

2. 向量检索

  • 相似度检索: 基于用户问题检索最相关的文档片段
  • 混合检索: 结合关键词检索和语义检索
  • 重排序: 对检索结果进行重新排序,提高准确性

3. 答案生成

  • 上下文构建: 将检索到的文档片段组合成上下文
  • 答案生成: 基于上下文和问题生成准确答案
  • 来源引用: 标注答案引用的文档来源
  • 置信度评分: 评估答案的可信度

4. 对话管理

  • 多轮对话: 支持连续对话,理解上下文
  • 历史记录: 保存用户对话历史
  • 会话管理: 支持多个独立会话

5. Web界面

  • 文档管理界面: 上传、查看、删除文档
  • 聊天界面: 用户友好的对话界面
  • 结果展示: 清晰展示答案和来源引用

💻 代码实现

1. 配置文件 (app/config.py)

Python
"""
RAG系统配置文件
"""
import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """应用配置"""

    # API配置
    API_HOST: str = "0.0.0.0"
    API_PORT: int = 8000
    API_PREFIX: str = "/api/v1"

    # LLM配置
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
    OPENAI_MODEL: str = "gpt-4o"
    OPENAI_TEMPERATURE: float = 0.7

    # Embeddings配置
    EMBEDDING_MODEL: str = "text-embedding-3-small"
    EMBEDDING_DIMENSION: int = 1536

    # 向量数据库配置
    CHROMA_PERSIST_DIR: str = "./data/vector_store"
    CHROMA_COLLECTION_NAME: str = "documents"

    # 文档处理配置
    CHUNK_SIZE: int = 500
    CHUNK_OVERLAP: int = 50
    MAX_DOCUMENTS: int = 1000

    # RAG配置
    TOP_K_RETRIEVAL: int = 3
    MIN_SIMILARITY_SCORE: float = 0.7

    # 文件上传配置
    UPLOAD_DIR: str = "./data/documents"
    MAX_FILE_SIZE: int = 10 * 1024 * 1024  # 10MB
    ALLOWED_EXTENSIONS: set = {".pdf", ".docx", ".txt", ".md"}

    class Config:
        env_file = ".env"
        case_sensitive = True

# 全局配置实例
settings = Settings()

2. 文档处理模块 (app/document_processor.py)

Python
"""
文档处理模块
"""
import os
from pathlib import Path
import PyPDF2
from docx import Document
import chardet

class DocumentProcessor:
    """文档处理器"""

    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        """
        初始化文档处理器

        Args:
            chunk_size: 文本块大小
            chunk_overlap: 文本块重叠大小
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def load_document(self, file_path: str) -> str:
        """
        加载文档内容

        Args:
            file_path: 文档路径

        Returns:
            文档文本内容
        """
        file_path = Path(file_path)
        extension = file_path.suffix.lower()

        if extension == ".pdf":
            return self._load_pdf(file_path)
        elif extension == ".docx":
            return self._load_docx(file_path)
        elif extension == ".txt":
            return self._load_txt(file_path)
        elif extension == ".md":
            return self._load_md(file_path)
        else:
            raise ValueError(f"不支持的文件格式: {extension}")

    def _load_pdf(self, file_path: Path) -> str:
        """加载PDF文档"""
        text = ""
        with open(file_path, "rb") as file:  # with自动管理文件关闭
            pdf_reader = PyPDF2.PdfReader(file)
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text

    def _load_docx(self, file_path: Path) -> str:
        """加载Word文档"""
        doc = Document(file_path)
        text = ""
        for paragraph in doc.paragraphs:
            text += paragraph.text + "\n"
        return text

    def _load_txt(self, file_path: Path) -> str:
        """加载TXT文档"""
        # 检测文件编码
        with open(file_path, "rb") as file:
            raw_data = file.read()
            result = chardet.detect(raw_data)
            encoding = result["encoding"]

        with open(file_path, "r", encoding=encoding) as file:
            return file.read()

    def _load_md(self, file_path: Path) -> str:
        """加载Markdown文档"""
        with open(file_path, "r", encoding="utf-8") as file:
            return file.read()

    def split_text(self, text: str) -> list[str]:
        """
        切分文本为块

        Args:
            text: 原始文本

        Returns:
            文本块列表
        """
        chunks = []
        start = 0

        while start < len(text):
            end = start + self.chunk_size

            # 如果不是最后一块,尝试在句子边界切分
            if end < len(text):
                # 查找最近的句号、问号、感叹号
                for sep in ["。", "!", "?", ".", "!", "?", "\n"]:
                    last_sep = text.rfind(sep, start, end)
                    if last_sep != -1:
                        end = last_sep + 1
                        break

            chunk = text[start:end].strip()
            if chunk:
                chunks.append(chunk)

            start = end - self.chunk_overlap

        return chunks

    def process_document(self, file_path: str) -> list[str]:
        """
        处理文档:加载和切分

        Args:
            file_path: 文档路径

        Returns:
            文本块列表
        """
        # 加载文档
        text = self.load_document(file_path)

        # 清理文本
        text = self._clean_text(text)

        # 切分文本
        chunks = self.split_text(text)

        return chunks

    def _clean_text(self, text: str) -> str:
        """清理文本"""
        # 移除多余空行
        text = "\n".join(line.strip() for line in text.split("\n") if line.strip())  # 链式调用:strip去除空白
        return text

3. 向量数据库管理 (app/vector_store.py)

Python
"""
向量数据库管理模块
"""
import os
from typing import Any
from pathlib import Path
import chromadb
from chromadb.config import Settings as ChromaSettings
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document

class VectorStoreManager:
    """向量数据库管理器"""

    def __init__(
        self,
        persist_directory: str = "./data/vector_store",
        collection_name: str = "documents",
        embedding_model: str = "text-embedding-3-small"
    ):
        """
        初始化向量数据库管理器

        Args:
            persist_directory: 持久化目录
            collection_name: 集合名称
            embedding_model: 嵌入模型
        """
        self.persist_directory = persist_directory
        self.collection_name = collection_name
        self.embedding_model = embedding_model

        # 创建持久化目录
        Path(persist_directory).mkdir(parents=True, exist_ok=True)

        # 初始化嵌入模型
        self.embeddings = OpenAIEmbeddings(
            model=embedding_model,
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

        # 初始化向量数据库
        self._init_vector_store()

    def _init_vector_store(self):
        """初始化向量数据库"""
        self.vector_store = Chroma(
            collection_name=self.collection_name,
            embedding_function=self.embeddings,
            persist_directory=self.persist_directory
        )

    def add_documents(
        self,
        chunks: list[str],
        metadata: list[dict[str, Any]] | None = None
    ) -> list[str]:
        """
        添加文档到向量数据库

        Args:
            chunks: 文本块列表
            metadata: 元数据列表

        Returns:
            文档ID列表
        """
        # 创建Document对象
        documents = []
        for i, chunk in enumerate(chunks):  # enumerate同时获取索引和元素
            doc_metadata = metadata[i] if metadata and i < len(metadata) else {}  # 三元表达式:metadata存在且索引有效时取值,否则用空字典
            doc = Document(
                page_content=chunk,
                metadata=doc_metadata
            )
            documents.append(doc)

        # 添加到向量数据库
        ids = self.vector_store.add_documents(documents)

        return ids

    def similarity_search(
        self,
        query: str,
        k: int = 3,
        filter: dict[str, Any] | None = None
    ) -> list[Document]:
        """
        相似度搜索

        Args:
            query: 查询文本
            k: 返回结果数量
            filter: 过滤条件

        Returns:
            相关文档列表
        """
        results = self.vector_store.similarity_search(
            query=query,
            k=k,
            filter=filter
        )
        return results

    def similarity_search_with_score(
        self,
        query: str,
        k: int = 3,
        filter: dict[str, Any] | None = None
    ) -> list[tuple[Document, float]]:
        """
        带分数的相似度搜索

        Args:
            query: 查询文本
            k: 返回结果数量
            filter: 过滤条件

        Returns:
            (文档, 相似度分数)列表
        """
        results = self.vector_store.similarity_search_with_score(
            query=query,
            k=k,
            filter=filter
        )
        return results

    def delete_collection(self):
        """删除集合"""
        # ChromaDB不支持直接删除集合,需要重新初始化
        import shutil
        if Path(self.persist_directory).exists():
            shutil.rmtree(self.persist_directory)
        self._init_vector_store()

    def get_collection_stats(self) -> dict[str, Any]:
        """获取集合统计信息"""
        collection = self.vector_store._collection
        count = collection.count()
        return {
            "document_count": count,
            "collection_name": self.collection_name
        }

4. RAG链构建 (app/rag_chain.py)

Python
"""
RAG链构建模块
"""
from typing import Any
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
from app.vector_store import VectorStoreManager
from app.config import settings

class RAGChain:
    """RAG链"""

    def __init__(self, vector_store: VectorStoreManager):
        """
        初始化RAG链

        Args:
            vector_store: 向量数据库管理器
        """
        self.vector_store = vector_store

        # 初始化LLM
        self.llm = ChatOpenAI(
            model=settings.OPENAI_MODEL,
            temperature=settings.OPENAI_TEMPERATURE,
            openai_api_key=settings.OPENAI_API_KEY
        )

        # 创建检索器
        self.retriever = vector_store.vector_store.as_retriever(
            search_kwargs={"k": settings.TOP_K_RETRIEVAL}
        )

        # 创建RAG链
        self.qa_chain = self._create_qa_chain()

    def _create_qa_chain(self):
        """创建问答链(LCEL方式,替代已废弃的 RetrievalQA)"""

        # 定义提示模板
        system_prompt = """你是一个智能助手,请基于以下已知信息回答用户的问题。
如果已知信息不足以回答问题,请明确说明。
请用中文回答,并在回答中引用相关信息的来源。

{context}"""

        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", "{input}")
        ])

        # 创建问答链
        question_answer_chain = create_stuff_documents_chain(self.llm, prompt)
        qa_chain = create_retrieval_chain(self.retriever, question_answer_chain)

        return qa_chain

    def query(self, question: str) -> dict[str, Any]:
        """
        查询RAG系统

        Args:
            question: 用户问题

        Returns:
            包含答案和来源的字典
        """
        # 执行查询
        result = self.qa_chain.invoke({"input": question})

        # 提取答案和来源
        answer = result["answer"]
        source_documents = result["context"]

        # 提取来源信息
        sources = []
        for doc in source_documents:
            source_info = {
                "content": doc.page_content,
                "metadata": doc.metadata
            }
            sources.append(source_info)

        return {
            "answer": answer,
            "sources": sources,
            "question": question
        }

    def query_with_score(self, question: str) -> dict[str, Any]:
        """
        带相似度分数的查询

        Args:
            question: 用户问题

        Returns:
            包含答案、来源和分数的字典
        """
        # 检索相关文档
        docs_with_scores = self.vector_store.similarity_search_with_score(
            query=question,
            k=settings.TOP_K_RETRIEVAL
        )

        # 过滤低相似度文档
        filtered_docs = [
            (doc, score) for doc, score in docs_with_scores
            if score <= (1 - settings.MIN_SIMILARITY_SCORE)
        ]

        if not filtered_docs:
            return {
                "answer": "抱歉,我没有找到相关信息来回答您的问题。",
                "sources": [],
                "question": question,
                "confidence": 0.0
            }

        # 构建上下文
        context = "\n\n".join([doc.page_content for doc, _ in filtered_docs])

        # 生成答案
        prompt = f"""基于以下信息回答问题:

{context}

问题:{question}

请用中文回答。"""

        response = self.llm.invoke(prompt)

        # 提取来源信息
        sources = []
        for doc, score in filtered_docs:
            source_info = {
                "content": doc.page_content,
                "metadata": doc.metadata,
                "similarity_score": float(1 - score)
            }
            sources.append(source_info)

        # 计算平均置信度
        avg_confidence = sum(1 - score for _, score in filtered_docs) / len(filtered_docs)

        return {
            "answer": response.content,
            "sources": sources,
            "question": question,
            "confidence": avg_confidence
        }

5. FastAPI主应用 (app/main.py)

Python
"""
FastAPI主应用
"""
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pathlib import Path
import shutil
import uvicorn

from app.config import settings
from app.document_processor import DocumentProcessor
from app.vector_store import VectorStoreManager
from app.rag_chain import RAGChain
from app.api import documents, chat

# 创建FastAPI应用
app = FastAPI(
    title="RAG知识库问答系统",
    description="基于RAG技术的智能问答系统",
    version="1.0.0"
)

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 初始化组件
document_processor = DocumentProcessor(
    chunk_size=settings.CHUNK_SIZE,
    chunk_overlap=settings.CHUNK_OVERLAP
)
vector_store_manager = VectorStoreManager(
    persist_directory=settings.CHROMA_PERSIST_DIR,
    collection_name=settings.CHROMA_COLLECTION_NAME,
    embedding_model=settings.EMBEDDING_MODEL
)
rag_chain = RAGChain(vector_store_manager)

# 创建上传目录
Path(settings.UPLOAD_DIR).mkdir(parents=True, exist_ok=True)

# 注册路由
app.include_router(
    documents.router,
    prefix=settings.API_PREFIX,
    tags=["documents"]
)
app.include_router(
    chat.router,
    prefix=settings.API_PREFIX,
    tags=["chat"]
)

@app.get("/")
async def root():  # async def定义协程函数
    """根路径"""
    return {
        "message": "RAG知识库问答系统",
        "version": "1.0.0",
        "docs": "/docs"
    }

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host=settings.API_HOST,
        port=settings.API_PORT,
        reload=True
    )

6. 文档管理API (app/api/documents.py)

Python
"""
文档管理API
"""
from fastapi import APIRouter, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from pathlib import Path
import shutil
import uuid

from app.config import settings
from app.document_processor import DocumentProcessor
from app.vector_store import VectorStoreManager

router = APIRouter()

# 初始化组件
document_processor = DocumentProcessor(
    chunk_size=settings.CHUNK_SIZE,
    chunk_overlap=settings.CHUNK_OVERLAP
)
vector_store_manager = VectorStoreManager(
    persist_directory=settings.CHROMA_PERSIST_DIR,
    collection_name=settings.CHROMA_COLLECTION_NAME,
    embedding_model=settings.EMBEDDING_MODEL
)

@router.post("/documents/upload")
async def upload_document(file: UploadFile = File(...)):
    """
    上传文档

    Args:
        file: 上传的文件

    Returns:
        上传结果
    """
    # 检查文件扩展名
    file_extension = Path(file.filename).suffix.lower()
    if file_extension not in settings.ALLOWED_EXTENSIONS:
        raise HTTPException(
            status_code=400,
            detail=f"不支持的文件格式: {file_extension}"
        )

    # 检查文件大小
    content = await file.read()  # await等待异步操作完成
    if len(content) > settings.MAX_FILE_SIZE:
        raise HTTPException(
            status_code=400,
            detail=f"文件大小超过限制: {settings.MAX_FILE_SIZE} bytes"
        )

    # 生成唯一文件名
    file_id = str(uuid.uuid4())
    file_path = Path(settings.UPLOAD_DIR) / f"{file_id}{file_extension}"

    # 保存文件
    with open(file_path, "wb") as f:
        f.write(content)

    try:  # try/except捕获异常,防止程序崩溃
        # 处理文档
        chunks = document_processor.process_document(str(file_path))

        # 准备元数据
        metadata = []
        for i, chunk in enumerate(chunks):
            metadata.append({
                "file_id": file_id,
                "file_name": file.filename,
                "chunk_index": i,
                "total_chunks": len(chunks)
            })

        # 添加到向量数据库
        ids = vector_store_manager.add_documents(chunks, metadata)

        return JSONResponse(
            status_code=200,
            content={
                "message": "文档上传成功",
                "file_id": file_id,
                "file_name": file.filename,
                "chunks_count": len(chunks),
                "document_ids": ids
            }
        )
    except Exception as e:
        # 删除文件
        if file_path.exists():
            file_path.unlink()
        raise HTTPException(status_code=500, detail=str(e))

@router.get("/documents/stats")
async def get_documents_stats():
    """
    获取文档统计信息

    Returns:
        统计信息
    """
    stats = vector_store_manager.get_collection_stats()
    return stats

@router.delete("/documents")
async def delete_all_documents():
    """
    删除所有文档

    Returns:
        删除结果
    """
    vector_store_manager.delete_collection()
    return {"message": "所有文档已删除"}

7. 聊天API (app/api/chat.py)

Python
"""
聊天API
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

from app.config import settings
from app.rag_chain import RAGChain
from app.vector_store import VectorStoreManager

router = APIRouter()

# 初始化组件
vector_store_manager = VectorStoreManager(
    persist_directory=settings.CHROMA_PERSIST_DIR,
    collection_name=settings.CHROMA_COLLECTION_NAME,
    embedding_model=settings.EMBEDDING_MODEL
)
rag_chain = RAGChain(vector_store_manager)

class ChatRequest(BaseModel):  # Pydantic BaseModel:自动数据验证和序列化
    """聊天请求"""
    question: str
    use_score: bool = False

class ChatResponse(BaseModel):
    """聊天响应"""
    answer: str
    sources: list[dict]
    question: str
    confidence: float | None = None

@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """
    聊天接口

    Args:
        request: 聊天请求

    Returns:
        聊天响应
    """
    try:
        if request.use_score:
            result = rag_chain.query_with_score(request.question)
        else:
            result = rag_chain.query(request.question)

        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

8. Streamlit前端 (frontend/app.py)

Python
"""
Streamlit前端应用
"""
import streamlit as st
import requests

# 配置页面
st.set_page_config(
    page_title="RAG知识库问答系统",
    page_icon="🤖",
    layout="wide"
)

# API配置
API_BASE_URL = "http://localhost:8000/api/v1"

def main():
    """主函数"""
    st.title("🤖 RAG知识库问答系统")

    # 侧边栏
    with st.sidebar:
        st.header("文档管理")

        # 上传文档
        uploaded_file = st.file_uploader(
            "上传文档",
            type=["pdf", "docx", "txt", "md"],
            help="支持PDF、Word、TXT、Markdown格式"
        )

        if uploaded_file:
            if st.button("上传"):
                with st.spinner("正在上传和处理文档..."):
                    try:
                        files = {"file": uploaded_file}
                        response = requests.post(
                            f"{API_BASE_URL}/documents/upload",
                            files=files
                        )

                        if response.status_code == 200:
                            st.success("文档上传成功!")
                            st.json(response.json())
                        else:
                            st.error(f"上传失败: {response.text}")
                    except Exception as e:
                        st.error(f"上传失败: {str(e)}")

        st.divider()

        # 文档统计
        st.subheader("文档统计")
        if st.button("刷新统计"):
            try:
                response = requests.get(f"{API_BASE_URL}/documents/stats")
                if response.status_code == 200:
                    stats = response.json()
                    st.json(stats)
            except Exception as e:
                st.error(f"获取统计失败: {str(e)}")

        st.divider()

        # 删除所有文档
        st.subheader("危险操作")
        confirm_delete = st.checkbox("我确认要删除所有文档(不可恢复)")
        if st.button("删除所有文档", type="primary", disabled=not confirm_delete):
            try:
                response = requests.delete(f"{API_BASE_URL}/documents")
                if response.status_code == 200:
                    st.success("所有文档已删除")
            except Exception as e:
                st.error(f"删除失败: {str(e)}")

    # 主区域 - 聊天界面
    st.header("智能问答")

    # 初始化聊天历史
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # 显示聊天历史
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

    # 用户输入
    if prompt := st.chat_input("请输入您的问题..."):  # 海象运算符:=:赋值并判断——获取用户输入的同时检查是否非空
        # 显示用户消息
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)

        # 获取AI回复
        with st.chat_message("assistant"):
            with st.spinner("正在思考..."):
                try:
                    response = requests.post(
                        f"{API_BASE_URL}/chat",
                        json={"question": prompt, "use_score": True}
                    )

                    if response.status_code == 200:
                        result = response.json()

                        # 显示答案
                        st.markdown(result["answer"])

                        # 显示来源
                        if result["sources"]:
                            st.divider()
                            st.subheader("📚 参考来源")
                            for i, source in enumerate(result["sources"], 1):
                                with st.expander(f"来源 {i}"):
                                    st.write(f"**相似度**: {source.get('similarity_score', 'N/A'):.2%}")
                                    st.write(f"**文件名**: {source['metadata'].get('file_name', 'N/A')}")
                                    st.write(f"**内容片段**:")
                                    st.write(source["content"])

                        # 显示置信度
                        if "confidence" in result:
                            st.divider()
                            st.write(f"**置信度**: {result['confidence']:.2%}")

                        # 保存到历史
                        st.session_state.messages.append({
                            "role": "assistant",
                            "content": result["answer"]
                        })
                    else:
                        st.error(f"请求失败: {response.text}")
                except Exception as e:
                    st.error(f"请求失败: {str(e)}")

if __name__ == "__main__":
    main()

9. 依赖文件 (requirements.txt)

Text Only
fastapi==0.115.0
uvicorn[standard]==0.32.0
python-multipart==0.0.12
pydantic==2.10.0
pydantic-settings==2.6.0
langchain==0.3.7
openai==1.55.0
chromadb==0.5.20
pypdf2==3.0.1
python-docx==1.1.0
chardet==5.2.0
streamlit==1.40.0
requests==2.32.0
python-dotenv==1.0.1

10. 环境变量文件 (.env.example)

Text Only
# OpenAI API配置
OPENAI_API_KEY=your_openai_api_key_here

# 可选:使用其他LLM
# OPENAI_BASE_URL=https://api.deepseek.com/v1
# OPENAI_MODEL=deepseek-chat

🚀 部署说明

1. 本地部署

步骤1: 克隆项目

Bash
git clone https://github.com/yourusername/rag-qa-system.git
cd rag-qa-system

步骤2: 创建虚拟环境

Bash
# 创建虚拟环境
python -m venv venv

# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate

步骤3: 安装依赖

Bash
pip install -r requirements.txt

步骤4: 配置环境变量

Bash
# 复制环境变量模板
cp .env.example .env

# 编辑.env文件,填入你的API密钥
# OPENAI_API_KEY=your_api_key_here

步骤5: 启动后端服务

Bash
# 启动FastAPI服务
python -m app.main

# 或使用uvicorn
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload

步骤6: 启动前端服务

Bash
# 新开一个终端
cd frontend
streamlit run app.py

步骤7: 访问应用

2. Docker部署

步骤1: 构建镜像

Bash
docker build -t rag-qa-system .

步骤2: 运行容器

Bash
docker run -d \
  --name rag-qa \
  -p 8000:8000 \
  -p 8501:8501 \
  -e OPENAI_API_KEY=your_api_key_here \
  rag-qa-system

步骤3: 使用Docker Compose

Bash
docker-compose up -d

3. 云部署

部署到阿里云

  1. 购买ECS实例
  2. 安装Docker
  3. 上传代码
  4. 运行Docker容器

部署到腾讯云

  1. 购买CVM实例
  2. 安装Docker
  3. 上传代码
  4. 运行Docker容器

部署到AWS

  1. 使用EC2服务
  2. 使用Elastic Beanstalk
  3. 或使用AWS App Runner

🔧 扩展方向

1. 功能扩展

  • 多模态支持: 支持图片、视频等多媒体文档
  • 多语言支持: 支持中英文等多语言问答
  • 用户管理: 添加用户认证和权限管理
  • 文档版本控制: 支持文档版本管理
  • 高级检索: 支持布尔查询、范围查询等

2. 性能优化

  • 缓存机制: 添加Redis缓存常用查询
  • 异步处理: 使用异步IO提高性能
  • 负载均衡: 使用Nginx做负载均衡
  • 数据库优化: 优化向量数据库配置

3. 用户体验

  • 实时反馈: 显示处理进度
  • 导出功能: 支持导出对话记录
  • 分享功能: 支持分享问答结果
  • 移动端适配: 优化移动端体验

4. 企业功能

  • API限流: 添加API访问限流
  • 日志审计: 完整的操作日志
  • 监控告警: 系统监控和告警
  • 数据备份: 自动数据备份

📚 学习收获

完成本项目后,你将掌握:

  • RAG技术原理: 理解检索增强生成的核心思想
  • 向量数据库: 掌握ChromaDB等向量数据库的使用
  • LangChain框架: 熟练使用LangChain构建LLM应用
  • API开发: 掌握FastAPI开发RESTful API
  • 前端开发: 使用Streamlit快速构建Web应用
  • 文档处理: 掌握各种文档格式的处理方法
  • 文本嵌入: 理解文本嵌入和相似度计算
  • 系统设计: 学习完整的系统架构设计

🎉 开始学习

现在你已经了解了整个RAG系统的实现,开始动手构建你自己的知识库问答系统吧!

推荐学习顺序: 1. 先运行后端API,测试基本功能 2. 然后运行前端界面,体验完整流程 3. 修改代码,添加新功能 4. 部署到云服务器,分享给他人使用

祝你学习顺利! 💪