项目1: RAG知识库问答系统¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
难度: ⭐⭐⭐ 中等 时间: 10-15小时 涉及知识: RAG技术、向量数据库、LangChain、API开发
📖 项目概述¶
项目背景¶
随着大语言模型的普及,企业需要一个能够基于内部文档回答用户问题的智能问答系统。传统的搜索引擎只能基于关键词匹配,无法理解用户意图;而大语言模型虽然能理解意图,但缺乏企业内部知识。RAG(Retrieval-Augmented Generation,检索增强生成)技术结合了两者的优势,先从知识库中检索相关文档,然后基于检索结果生成答案。
项目目标¶
构建一个完整的RAG知识库问答系统,能够: - 上传和管理企业文档 - 自动文档切分和向量化 - 基于用户问题检索相关文档 - 生成准确的答案并引用来源 - 支持多轮对话 - 提供友好的Web界面
技术栈¶
- 后端框架: FastAPI
- LLM框架: LangChain
- 向量数据库: ChromaDB
- 大模型: OpenAI GPT-4 / 通义千问 / 文心一言
- 前端框架: Streamlit
- 文档处理: PyPDF2, python-docx
- 文本嵌入: OpenAI Embeddings / HuggingFace Embeddings
🏗️ 项目结构¶
Text Only
rag-qa-system/
├── app/ # 应用主目录
│ ├── __init__.py
│ ├── main.py # FastAPI主应用
│ ├── config.py # 配置文件
│ ├── document_processor.py # 文档处理模块
│ ├── vector_store.py # 向量数据库管理
│ ├── rag_chain.py # RAG链构建
│ └── api/ # API路由
│ ├── __init__.py
│ ├── documents.py # 文档管理API
│ └── chat.py # 聊天API
├── frontend/ # 前端目录
│ ├── app.py # Streamlit应用
│ ├── pages/ # 页面组件
│ └── components/ # UI组件
├── data/ # 数据目录
│ ├── documents/ # 原始文档
│ ├── processed/ # 处理后文档
│ └── vector_store/ # 向量数据库
├── tests/ # 测试目录
│ ├── test_document_processor.py
│ ├── test_vector_store.py
│ └── test_rag_chain.py
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── text_splitter.py # 文本切分工具
│ └── logger.py # 日志工具
├── requirements.txt # Python依赖
├── Dockerfile # Docker配置
├── docker-compose.yml # Docker Compose配置
└── README.md # 项目说明
🎯 核心功能¶
1. 文档管理¶
- 文档上传: 支持PDF、Word、TXT、Markdown等格式
- 文档预处理: 自动提取文本、清理格式
- 文档切分: 智能切分文档为合适大小的文本块
- 文档索引: 将文本块向量化并存储到向量数据库
2. 向量检索¶
- 相似度检索: 基于用户问题检索最相关的文档片段
- 混合检索: 结合关键词检索和语义检索
- 重排序: 对检索结果进行重新排序,提高准确性
3. 答案生成¶
- 上下文构建: 将检索到的文档片段组合成上下文
- 答案生成: 基于上下文和问题生成准确答案
- 来源引用: 标注答案引用的文档来源
- 置信度评分: 评估答案的可信度
4. 对话管理¶
- 多轮对话: 支持连续对话,理解上下文
- 历史记录: 保存用户对话历史
- 会话管理: 支持多个独立会话
5. Web界面¶
- 文档管理界面: 上传、查看、删除文档
- 聊天界面: 用户友好的对话界面
- 结果展示: 清晰展示答案和来源引用
💻 代码实现¶
1. 配置文件 (app/config.py)¶
Python
"""
RAG系统配置文件
"""
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""应用配置"""
# API配置
API_HOST: str = "0.0.0.0"
API_PORT: int = 8000
API_PREFIX: str = "/api/v1"
# LLM配置
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
OPENAI_MODEL: str = "gpt-4o"
OPENAI_TEMPERATURE: float = 0.7
# Embeddings配置
EMBEDDING_MODEL: str = "text-embedding-3-small"
EMBEDDING_DIMENSION: int = 1536
# 向量数据库配置
CHROMA_PERSIST_DIR: str = "./data/vector_store"
CHROMA_COLLECTION_NAME: str = "documents"
# 文档处理配置
CHUNK_SIZE: int = 500
CHUNK_OVERLAP: int = 50
MAX_DOCUMENTS: int = 1000
# RAG配置
TOP_K_RETRIEVAL: int = 3
MIN_SIMILARITY_SCORE: float = 0.7
# 文件上传配置
UPLOAD_DIR: str = "./data/documents"
MAX_FILE_SIZE: int = 10 * 1024 * 1024 # 10MB
ALLOWED_EXTENSIONS: set = {".pdf", ".docx", ".txt", ".md"}
class Config:
env_file = ".env"
case_sensitive = True
# 全局配置实例
settings = Settings()
2. 文档处理模块 (app/document_processor.py)¶
Python
"""
文档处理模块
"""
import os
from pathlib import Path
import PyPDF2
from docx import Document
import chardet
class DocumentProcessor:
"""文档处理器"""
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
"""
初始化文档处理器
Args:
chunk_size: 文本块大小
chunk_overlap: 文本块重叠大小
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def load_document(self, file_path: str) -> str:
"""
加载文档内容
Args:
file_path: 文档路径
Returns:
文档文本内容
"""
file_path = Path(file_path)
extension = file_path.suffix.lower()
if extension == ".pdf":
return self._load_pdf(file_path)
elif extension == ".docx":
return self._load_docx(file_path)
elif extension == ".txt":
return self._load_txt(file_path)
elif extension == ".md":
return self._load_md(file_path)
else:
raise ValueError(f"不支持的文件格式: {extension}")
def _load_pdf(self, file_path: Path) -> str:
"""加载PDF文档"""
text = ""
with open(file_path, "rb") as file: # with自动管理文件关闭
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text
def _load_docx(self, file_path: Path) -> str:
"""加载Word文档"""
doc = Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
def _load_txt(self, file_path: Path) -> str:
"""加载TXT文档"""
# 检测文件编码
with open(file_path, "rb") as file:
raw_data = file.read()
result = chardet.detect(raw_data)
encoding = result["encoding"]
with open(file_path, "r", encoding=encoding) as file:
return file.read()
def _load_md(self, file_path: Path) -> str:
"""加载Markdown文档"""
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
def split_text(self, text: str) -> list[str]:
"""
切分文本为块
Args:
text: 原始文本
Returns:
文本块列表
"""
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
# 如果不是最后一块,尝试在句子边界切分
if end < len(text):
# 查找最近的句号、问号、感叹号
for sep in ["。", "!", "?", ".", "!", "?", "\n"]:
last_sep = text.rfind(sep, start, end)
if last_sep != -1:
end = last_sep + 1
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - self.chunk_overlap
return chunks
def process_document(self, file_path: str) -> list[str]:
"""
处理文档:加载和切分
Args:
file_path: 文档路径
Returns:
文本块列表
"""
# 加载文档
text = self.load_document(file_path)
# 清理文本
text = self._clean_text(text)
# 切分文本
chunks = self.split_text(text)
return chunks
def _clean_text(self, text: str) -> str:
"""清理文本"""
# 移除多余空行
text = "\n".join(line.strip() for line in text.split("\n") if line.strip()) # 链式调用:strip去除空白
return text
3. 向量数据库管理 (app/vector_store.py)¶
Python
"""
向量数据库管理模块
"""
import os
from typing import Any
from pathlib import Path
import chromadb
from chromadb.config import Settings as ChromaSettings
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
class VectorStoreManager:
"""向量数据库管理器"""
def __init__(
self,
persist_directory: str = "./data/vector_store",
collection_name: str = "documents",
embedding_model: str = "text-embedding-3-small"
):
"""
初始化向量数据库管理器
Args:
persist_directory: 持久化目录
collection_name: 集合名称
embedding_model: 嵌入模型
"""
self.persist_directory = persist_directory
self.collection_name = collection_name
self.embedding_model = embedding_model
# 创建持久化目录
Path(persist_directory).mkdir(parents=True, exist_ok=True)
# 初始化嵌入模型
self.embeddings = OpenAIEmbeddings(
model=embedding_model,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
# 初始化向量数据库
self._init_vector_store()
def _init_vector_store(self):
"""初始化向量数据库"""
self.vector_store = Chroma(
collection_name=self.collection_name,
embedding_function=self.embeddings,
persist_directory=self.persist_directory
)
def add_documents(
self,
chunks: list[str],
metadata: list[dict[str, Any]] | None = None
) -> list[str]:
"""
添加文档到向量数据库
Args:
chunks: 文本块列表
metadata: 元数据列表
Returns:
文档ID列表
"""
# 创建Document对象
documents = []
for i, chunk in enumerate(chunks): # enumerate同时获取索引和元素
doc_metadata = metadata[i] if metadata and i < len(metadata) else {} # 三元表达式:metadata存在且索引有效时取值,否则用空字典
doc = Document(
page_content=chunk,
metadata=doc_metadata
)
documents.append(doc)
# 添加到向量数据库
ids = self.vector_store.add_documents(documents)
return ids
def similarity_search(
self,
query: str,
k: int = 3,
filter: dict[str, Any] | None = None
) -> list[Document]:
"""
相似度搜索
Args:
query: 查询文本
k: 返回结果数量
filter: 过滤条件
Returns:
相关文档列表
"""
results = self.vector_store.similarity_search(
query=query,
k=k,
filter=filter
)
return results
def similarity_search_with_score(
self,
query: str,
k: int = 3,
filter: dict[str, Any] | None = None
) -> list[tuple[Document, float]]:
"""
带分数的相似度搜索
Args:
query: 查询文本
k: 返回结果数量
filter: 过滤条件
Returns:
(文档, 相似度分数)列表
"""
results = self.vector_store.similarity_search_with_score(
query=query,
k=k,
filter=filter
)
return results
def delete_collection(self):
"""删除集合"""
# ChromaDB不支持直接删除集合,需要重新初始化
import shutil
if Path(self.persist_directory).exists():
shutil.rmtree(self.persist_directory)
self._init_vector_store()
def get_collection_stats(self) -> dict[str, Any]:
"""获取集合统计信息"""
collection = self.vector_store._collection
count = collection.count()
return {
"document_count": count,
"collection_name": self.collection_name
}
4. RAG链构建 (app/rag_chain.py)¶
Python
"""
RAG链构建模块
"""
from typing import Any
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
from app.vector_store import VectorStoreManager
from app.config import settings
class RAGChain:
"""RAG链"""
def __init__(self, vector_store: VectorStoreManager):
"""
初始化RAG链
Args:
vector_store: 向量数据库管理器
"""
self.vector_store = vector_store
# 初始化LLM
self.llm = ChatOpenAI(
model=settings.OPENAI_MODEL,
temperature=settings.OPENAI_TEMPERATURE,
openai_api_key=settings.OPENAI_API_KEY
)
# 创建检索器
self.retriever = vector_store.vector_store.as_retriever(
search_kwargs={"k": settings.TOP_K_RETRIEVAL}
)
# 创建RAG链
self.qa_chain = self._create_qa_chain()
def _create_qa_chain(self):
"""创建问答链(LCEL方式,替代已废弃的 RetrievalQA)"""
# 定义提示模板
system_prompt = """你是一个智能助手,请基于以下已知信息回答用户的问题。
如果已知信息不足以回答问题,请明确说明。
请用中文回答,并在回答中引用相关信息的来源。
{context}"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}")
])
# 创建问答链
question_answer_chain = create_stuff_documents_chain(self.llm, prompt)
qa_chain = create_retrieval_chain(self.retriever, question_answer_chain)
return qa_chain
def query(self, question: str) -> dict[str, Any]:
"""
查询RAG系统
Args:
question: 用户问题
Returns:
包含答案和来源的字典
"""
# 执行查询
result = self.qa_chain.invoke({"input": question})
# 提取答案和来源
answer = result["answer"]
source_documents = result["context"]
# 提取来源信息
sources = []
for doc in source_documents:
source_info = {
"content": doc.page_content,
"metadata": doc.metadata
}
sources.append(source_info)
return {
"answer": answer,
"sources": sources,
"question": question
}
def query_with_score(self, question: str) -> dict[str, Any]:
"""
带相似度分数的查询
Args:
question: 用户问题
Returns:
包含答案、来源和分数的字典
"""
# 检索相关文档
docs_with_scores = self.vector_store.similarity_search_with_score(
query=question,
k=settings.TOP_K_RETRIEVAL
)
# 过滤低相似度文档
filtered_docs = [
(doc, score) for doc, score in docs_with_scores
if score <= (1 - settings.MIN_SIMILARITY_SCORE)
]
if not filtered_docs:
return {
"answer": "抱歉,我没有找到相关信息来回答您的问题。",
"sources": [],
"question": question,
"confidence": 0.0
}
# 构建上下文
context = "\n\n".join([doc.page_content for doc, _ in filtered_docs])
# 生成答案
prompt = f"""基于以下信息回答问题:
{context}
问题:{question}
请用中文回答。"""
response = self.llm.invoke(prompt)
# 提取来源信息
sources = []
for doc, score in filtered_docs:
source_info = {
"content": doc.page_content,
"metadata": doc.metadata,
"similarity_score": float(1 - score)
}
sources.append(source_info)
# 计算平均置信度
avg_confidence = sum(1 - score for _, score in filtered_docs) / len(filtered_docs)
return {
"answer": response.content,
"sources": sources,
"question": question,
"confidence": avg_confidence
}
5. FastAPI主应用 (app/main.py)¶
Python
"""
FastAPI主应用
"""
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pathlib import Path
import shutil
import uvicorn
from app.config import settings
from app.document_processor import DocumentProcessor
from app.vector_store import VectorStoreManager
from app.rag_chain import RAGChain
from app.api import documents, chat
# 创建FastAPI应用
app = FastAPI(
title="RAG知识库问答系统",
description="基于RAG技术的智能问答系统",
version="1.0.0"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化组件
document_processor = DocumentProcessor(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP
)
vector_store_manager = VectorStoreManager(
persist_directory=settings.CHROMA_PERSIST_DIR,
collection_name=settings.CHROMA_COLLECTION_NAME,
embedding_model=settings.EMBEDDING_MODEL
)
rag_chain = RAGChain(vector_store_manager)
# 创建上传目录
Path(settings.UPLOAD_DIR).mkdir(parents=True, exist_ok=True)
# 注册路由
app.include_router(
documents.router,
prefix=settings.API_PREFIX,
tags=["documents"]
)
app.include_router(
chat.router,
prefix=settings.API_PREFIX,
tags=["chat"]
)
@app.get("/")
async def root(): # async def定义协程函数
"""根路径"""
return {
"message": "RAG知识库问答系统",
"version": "1.0.0",
"docs": "/docs"
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host=settings.API_HOST,
port=settings.API_PORT,
reload=True
)
6. 文档管理API (app/api/documents.py)¶
Python
"""
文档管理API
"""
from fastapi import APIRouter, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from pathlib import Path
import shutil
import uuid
from app.config import settings
from app.document_processor import DocumentProcessor
from app.vector_store import VectorStoreManager
router = APIRouter()
# 初始化组件
document_processor = DocumentProcessor(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP
)
vector_store_manager = VectorStoreManager(
persist_directory=settings.CHROMA_PERSIST_DIR,
collection_name=settings.CHROMA_COLLECTION_NAME,
embedding_model=settings.EMBEDDING_MODEL
)
@router.post("/documents/upload")
async def upload_document(file: UploadFile = File(...)):
"""
上传文档
Args:
file: 上传的文件
Returns:
上传结果
"""
# 检查文件扩展名
file_extension = Path(file.filename).suffix.lower()
if file_extension not in settings.ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"不支持的文件格式: {file_extension}"
)
# 检查文件大小
content = await file.read() # await等待异步操作完成
if len(content) > settings.MAX_FILE_SIZE:
raise HTTPException(
status_code=400,
detail=f"文件大小超过限制: {settings.MAX_FILE_SIZE} bytes"
)
# 生成唯一文件名
file_id = str(uuid.uuid4())
file_path = Path(settings.UPLOAD_DIR) / f"{file_id}{file_extension}"
# 保存文件
with open(file_path, "wb") as f:
f.write(content)
try: # try/except捕获异常,防止程序崩溃
# 处理文档
chunks = document_processor.process_document(str(file_path))
# 准备元数据
metadata = []
for i, chunk in enumerate(chunks):
metadata.append({
"file_id": file_id,
"file_name": file.filename,
"chunk_index": i,
"total_chunks": len(chunks)
})
# 添加到向量数据库
ids = vector_store_manager.add_documents(chunks, metadata)
return JSONResponse(
status_code=200,
content={
"message": "文档上传成功",
"file_id": file_id,
"file_name": file.filename,
"chunks_count": len(chunks),
"document_ids": ids
}
)
except Exception as e:
# 删除文件
if file_path.exists():
file_path.unlink()
raise HTTPException(status_code=500, detail=str(e))
@router.get("/documents/stats")
async def get_documents_stats():
"""
获取文档统计信息
Returns:
统计信息
"""
stats = vector_store_manager.get_collection_stats()
return stats
@router.delete("/documents")
async def delete_all_documents():
"""
删除所有文档
Returns:
删除结果
"""
vector_store_manager.delete_collection()
return {"message": "所有文档已删除"}
7. 聊天API (app/api/chat.py)¶
Python
"""
聊天API
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.config import settings
from app.rag_chain import RAGChain
from app.vector_store import VectorStoreManager
router = APIRouter()
# 初始化组件
vector_store_manager = VectorStoreManager(
persist_directory=settings.CHROMA_PERSIST_DIR,
collection_name=settings.CHROMA_COLLECTION_NAME,
embedding_model=settings.EMBEDDING_MODEL
)
rag_chain = RAGChain(vector_store_manager)
class ChatRequest(BaseModel): # Pydantic BaseModel:自动数据验证和序列化
"""聊天请求"""
question: str
use_score: bool = False
class ChatResponse(BaseModel):
"""聊天响应"""
answer: str
sources: list[dict]
question: str
confidence: float | None = None
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
聊天接口
Args:
request: 聊天请求
Returns:
聊天响应
"""
try:
if request.use_score:
result = rag_chain.query_with_score(request.question)
else:
result = rag_chain.query(request.question)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
8. Streamlit前端 (frontend/app.py)¶
Python
"""
Streamlit前端应用
"""
import streamlit as st
import requests
# 配置页面
st.set_page_config(
page_title="RAG知识库问答系统",
page_icon="🤖",
layout="wide"
)
# API配置
API_BASE_URL = "http://localhost:8000/api/v1"
def main():
"""主函数"""
st.title("🤖 RAG知识库问答系统")
# 侧边栏
with st.sidebar:
st.header("文档管理")
# 上传文档
uploaded_file = st.file_uploader(
"上传文档",
type=["pdf", "docx", "txt", "md"],
help="支持PDF、Word、TXT、Markdown格式"
)
if uploaded_file:
if st.button("上传"):
with st.spinner("正在上传和处理文档..."):
try:
files = {"file": uploaded_file}
response = requests.post(
f"{API_BASE_URL}/documents/upload",
files=files
)
if response.status_code == 200:
st.success("文档上传成功!")
st.json(response.json())
else:
st.error(f"上传失败: {response.text}")
except Exception as e:
st.error(f"上传失败: {str(e)}")
st.divider()
# 文档统计
st.subheader("文档统计")
if st.button("刷新统计"):
try:
response = requests.get(f"{API_BASE_URL}/documents/stats")
if response.status_code == 200:
stats = response.json()
st.json(stats)
except Exception as e:
st.error(f"获取统计失败: {str(e)}")
st.divider()
# 删除所有文档
st.subheader("危险操作")
confirm_delete = st.checkbox("我确认要删除所有文档(不可恢复)")
if st.button("删除所有文档", type="primary", disabled=not confirm_delete):
try:
response = requests.delete(f"{API_BASE_URL}/documents")
if response.status_code == 200:
st.success("所有文档已删除")
except Exception as e:
st.error(f"删除失败: {str(e)}")
# 主区域 - 聊天界面
st.header("智能问答")
# 初始化聊天历史
if "messages" not in st.session_state:
st.session_state.messages = []
# 显示聊天历史
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# 用户输入
if prompt := st.chat_input("请输入您的问题..."): # 海象运算符:=:赋值并判断——获取用户输入的同时检查是否非空
# 显示用户消息
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# 获取AI回复
with st.chat_message("assistant"):
with st.spinner("正在思考..."):
try:
response = requests.post(
f"{API_BASE_URL}/chat",
json={"question": prompt, "use_score": True}
)
if response.status_code == 200:
result = response.json()
# 显示答案
st.markdown(result["answer"])
# 显示来源
if result["sources"]:
st.divider()
st.subheader("📚 参考来源")
for i, source in enumerate(result["sources"], 1):
with st.expander(f"来源 {i}"):
st.write(f"**相似度**: {source.get('similarity_score', 'N/A'):.2%}")
st.write(f"**文件名**: {source['metadata'].get('file_name', 'N/A')}")
st.write(f"**内容片段**:")
st.write(source["content"])
# 显示置信度
if "confidence" in result:
st.divider()
st.write(f"**置信度**: {result['confidence']:.2%}")
# 保存到历史
st.session_state.messages.append({
"role": "assistant",
"content": result["answer"]
})
else:
st.error(f"请求失败: {response.text}")
except Exception as e:
st.error(f"请求失败: {str(e)}")
if __name__ == "__main__":
main()
9. 依赖文件 (requirements.txt)¶
Text Only
fastapi==0.115.0
uvicorn[standard]==0.32.0
python-multipart==0.0.12
pydantic==2.10.0
pydantic-settings==2.6.0
langchain==0.3.7
openai==1.55.0
chromadb==0.5.20
pypdf2==3.0.1
python-docx==1.1.0
chardet==5.2.0
streamlit==1.40.0
requests==2.32.0
python-dotenv==1.0.1
10. 环境变量文件 (.env.example)¶
Text Only
# OpenAI API配置
OPENAI_API_KEY=your_openai_api_key_here
# 可选:使用其他LLM
# OPENAI_BASE_URL=https://api.deepseek.com/v1
# OPENAI_MODEL=deepseek-chat
🚀 部署说明¶
1. 本地部署¶
步骤1: 克隆项目¶
步骤2: 创建虚拟环境¶
Bash
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
步骤3: 安装依赖¶
步骤4: 配置环境变量¶
步骤5: 启动后端服务¶
Bash
# 启动FastAPI服务
python -m app.main
# 或使用uvicorn
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
步骤6: 启动前端服务¶
步骤7: 访问应用¶
- 前端界面: http://localhost:8501
- API文档: http://localhost:8000/docs
2. Docker部署¶
步骤1: 构建镜像¶
步骤2: 运行容器¶
Bash
docker run -d \
--name rag-qa \
-p 8000:8000 \
-p 8501:8501 \
-e OPENAI_API_KEY=your_api_key_here \
rag-qa-system
步骤3: 使用Docker Compose¶
3. 云部署¶
部署到阿里云¶
- 购买ECS实例
- 安装Docker
- 上传代码
- 运行Docker容器
部署到腾讯云¶
- 购买CVM实例
- 安装Docker
- 上传代码
- 运行Docker容器
部署到AWS¶
- 使用EC2服务
- 使用Elastic Beanstalk
- 或使用AWS App Runner
🔧 扩展方向¶
1. 功能扩展¶
- 多模态支持: 支持图片、视频等多媒体文档
- 多语言支持: 支持中英文等多语言问答
- 用户管理: 添加用户认证和权限管理
- 文档版本控制: 支持文档版本管理
- 高级检索: 支持布尔查询、范围查询等
2. 性能优化¶
- 缓存机制: 添加Redis缓存常用查询
- 异步处理: 使用异步IO提高性能
- 负载均衡: 使用Nginx做负载均衡
- 数据库优化: 优化向量数据库配置
3. 用户体验¶
- 实时反馈: 显示处理进度
- 导出功能: 支持导出对话记录
- 分享功能: 支持分享问答结果
- 移动端适配: 优化移动端体验
4. 企业功能¶
- API限流: 添加API访问限流
- 日志审计: 完整的操作日志
- 监控告警: 系统监控和告警
- 数据备份: 自动数据备份
📚 学习收获¶
完成本项目后,你将掌握:
- RAG技术原理: 理解检索增强生成的核心思想
- 向量数据库: 掌握ChromaDB等向量数据库的使用
- LangChain框架: 熟练使用LangChain构建LLM应用
- API开发: 掌握FastAPI开发RESTful API
- 前端开发: 使用Streamlit快速构建Web应用
- 文档处理: 掌握各种文档格式的处理方法
- 文本嵌入: 理解文本嵌入和相似度计算
- 系统设计: 学习完整的系统架构设计
🎉 开始学习¶
现在你已经了解了整个RAG系统的实现,开始动手构建你自己的知识库问答系统吧!
推荐学习顺序: 1. 先运行后端API,测试基本功能 2. 然后运行前端界面,体验完整流程 3. 修改代码,添加新功能 4. 部署到云服务器,分享给他人使用
祝你学习顺利! 💪