🛠️ NLP实战项目¶
3个完整项目:文本分类 → 命名实体识别 → RAG问答系统,覆盖数据处理、模型训练、评估部署全流程,附可运行代码。
📋 项目总览¶
| 项目 | 核心技术 | 难度 | 预计耗时 |
|---|---|---|---|
| P1: 文本分类系统 | BERT微调/HuggingFace | ⭐⭐⭐ | 2天 |
| P2: 命名实体识别 | BiLSTM-CRF/BERT-NER | ⭐⭐⭐⭐ | 3天 |
| P3: RAG问答系统 | LangChain/向量检索/LLM | ⭐⭐⭐⭐⭐ | 3天 |
📝 P1: 文本分类系统¶
1.1 项目简介¶
使用BERT在中文文本数据集上微调,构建完整的文本分类Pipeline,包含数据清洗、模型训练、评估和FastAPI部署。
技术栈:HuggingFace Transformers、Datasets、PyTorch、FastAPI
1.2 数据处理¶
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import TrainingArguments, Trainer
from datasets import load_dataset, Dataset
import pandas as pd
import numpy as np
import re
# ===================== 数据清洗 =====================
def clean_text(text):
"""中文文本清洗"""
if not isinstance(text, str): # isinstance检查类型
return ""
text = re.sub(r'<[^>]+>', '', text) # 去HTML标签
text = re.sub(r'http\S+|www\.\S+', '', text) # 去URL
text = re.sub(r'@\w+', '', text) # 去@提及
text = re.sub(r'#\w+', '', text) # 去话题标签
text = re.sub(r'\s+', ' ', text).strip() # 合并空白
return text
# ===================== 加载数据 =====================
# 方法1: 从CSV加载
df = pd.read_csv("data/train.csv") # columns: text, label
df['text'] = df['text'].apply(clean_text)
df = df[df['text'].str.len() > 5] # 过滤太短的文本
print(f"数据量: {len(df)}, 类别分布:\n{df['label'].value_counts()}")
# 划分训练/验证集
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(df, test_size=0.15, stratify=df['label'], random_state=42)
train_dataset = Dataset.from_pandas(train_df[['text', 'label']].reset_index(drop=True))
val_dataset = Dataset.from_pandas(val_df[['text', 'label']].reset_index(drop=True))
# ===================== Tokenization =====================
MODEL_NAME = "bert-base-chinese"
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
def tokenize_function(examples):
return tokenizer(
examples["text"],
padding="max_length",
truncation=True,
max_length=256,
return_tensors="pt"
)
train_dataset = train_dataset.map(tokenize_function, batched=True, batch_size=1000)
val_dataset = val_dataset.map(tokenize_function, batched=True, batch_size=1000)
# 设置格式
train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
val_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
print(f"训练集: {len(train_dataset)}, 验证集: {len(val_dataset)}")
1.3 模型训练¶
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report
# ===================== 评估指标 =====================
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='weighted')
acc = accuracy_score(labels, predictions)
return {"accuracy": acc, "f1": f1, "precision": precision, "recall": recall}
# ===================== 模型初始化 =====================
num_labels = df['label'].nunique()
label_names = sorted(df['label'].unique())
model = BertForSequenceClassification.from_pretrained(
MODEL_NAME,
num_labels=num_labels,
problem_type="single_label_classification"
)
# ===================== 训练配置 =====================
training_args = TrainingArguments(
output_dir="./results/text_cls",
num_train_epochs=5,
per_device_train_batch_size=32,
per_device_eval_batch_size=64,
learning_rate=2e-5,
weight_decay=0.01,
warmup_ratio=0.1,
lr_scheduler_type="cosine",
# 评估与保存
eval_strategy="steps",
eval_steps=200,
save_strategy="steps",
save_steps=200,
save_total_limit=3,
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
# 混合精度
fp16=True,
# 日志
logging_dir="./logs",
logging_steps=50,
report_to="none",
)
# ===================== 训练 =====================
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
trainer.train()
print("✅ 训练完成!")
# ===================== 最终评估 =====================
eval_results = trainer.evaluate()
print(f"\n验证集结果:")
print(f" Accuracy: {eval_results['eval_accuracy']:.4f}")
print(f" F1: {eval_results['eval_f1']:.4f}")
print(f" Precision:{eval_results['eval_precision']:.4f}")
print(f" Recall: {eval_results['eval_recall']:.4f}")
# 详细分类报告
predictions = trainer.predict(val_dataset)
preds = np.argmax(predictions.predictions, axis=-1)
print("\n" + classification_report(val_dataset['label'], preds, target_names=[str(l) for l in label_names]))
# 保存模型
trainer.save_model("./model/text_cls_bert")
tokenizer.save_pretrained("./model/text_cls_bert")
print("✅ 模型已保存到 ./model/text_cls_bert")
1.4 FastAPI部署¶
from fastapi import FastAPI
from pydantic import BaseModel
from transformers import pipeline
import uvicorn
app = FastAPI(title="文本分类API")
# 加载模型
classifier = pipeline(
"text-classification",
model="./model/text_cls_bert",
tokenizer="./model/text_cls_bert",
device=0, # GPU
max_length=256,
truncation=True,
)
class TextRequest(BaseModel): # BaseModel Pydantic数据验证模型
text: str
class BatchRequest(BaseModel):
texts: list[str]
class PredictionResponse(BaseModel):
text: str
label: str
confidence: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: TextRequest): # async定义异步函数
text = clean_text(request.text)
result = classifier(text)[0]
return PredictionResponse(
text=request.text,
label=result["label"],
confidence=round(result["score"], 4)
)
@app.post("/batch_predict", response_model=list[PredictionResponse])
async def batch_predict(request: BatchRequest):
texts = [clean_text(t) for t in request.texts]
results = classifier(texts)
return [
PredictionResponse(text=t, label=r["label"], confidence=round(r["score"], 4))
for t, r in zip(request.texts, results) # zip按位置配对
]
@app.get("/health")
async def health():
return {"status": "ok", "model": "bert-base-chinese"}
# uvicorn main:app --host 0.0.0.0 --port 8000
🏷️ P2: 命名实体识别(NER)¶
2.1 项目简介¶
实现中文NER系统,对比BiLSTM-CRF和BERT-NER两种方案,支持人名、地名、组织名等实体识别。
技术栈:PyTorch、HuggingFace、torchcrf、seqeval
2.2 数据处理¶
import json
from collections import Counter
# ===================== BIO标签体系 =====================
"""
标签说明:
B-PER: 人名开始 I-PER: 人名内部
B-LOC: 地名开始 I-LOC: 地名内部
B-ORG: 组织名开始 I-ORG: 组织名内部
O: 非实体
"""
LABEL_LIST = ['O', 'B-PER', 'I-PER', 'B-LOC', 'I-LOC', 'B-ORG', 'I-ORG']
LABEL2ID = {l: i for i, l in enumerate(LABEL_LIST)} # enumerate同时获取索引和元素
ID2LABEL = {i: l for l, i in LABEL2ID.items()}
# ===================== 数据加载 =====================
def load_ner_data(filepath):
"""加载CONLL格式NER数据
每行: 字\t标签, 空行分隔句子
"""
sentences, labels = [], []
cur_tokens, cur_labels = [], []
with open(filepath, 'r', encoding='utf-8') as f: # with自动管理文件关闭
for line in f:
line = line.strip()
if not line:
if cur_tokens:
sentences.append(cur_tokens)
labels.append(cur_labels)
cur_tokens, cur_labels = [], []
else:
parts = line.split('\t')
if len(parts) == 2:
cur_tokens.append(parts[0])
cur_labels.append(parts[1])
if cur_tokens:
sentences.append(cur_tokens)
labels.append(cur_labels)
return sentences, labels
train_sents, train_labels = load_ner_data("data/ner_train.txt")
val_sents, val_labels = load_ner_data("data/ner_val.txt")
print(f"训练集: {len(train_sents)} 句, 验证集: {len(val_sents)} 句")
# 统计实体分布
all_labels = [l for seq in train_labels for l in seq]
print(f"标签分布: {Counter(all_labels)}") # Counter统计元素出现次数
2.3 方案一:BiLSTM-CRF¶
import torch
import torch.nn as nn
from torchcrf import CRF
# ===================== BiLSTM-CRF模型 =====================
class BiLSTMCRF(nn.Module): # 继承nn.Module定义网络层
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_labels, pretrained_embeddings=None):
super().__init__() # super()调用父类方法
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
if pretrained_embeddings is not None:
self.embedding.weight.data.copy_(pretrained_embeddings)
self.lstm = nn.LSTM(
embedding_dim, hidden_dim // 2,
num_layers=2, bidirectional=True,
batch_first=True, dropout=0.3
)
self.dropout = nn.Dropout(0.5)
self.fc = nn.Linear(hidden_dim, num_labels)
self.crf = CRF(num_labels, batch_first=True)
def forward(self, input_ids, labels=None, mask=None):
embeds = self.dropout(self.embedding(input_ids))
lstm_out, _ = self.lstm(embeds)
emissions = self.fc(self.dropout(lstm_out))
if labels is not None:
# 训练: 计算CRF负对数似然
loss = -self.crf(emissions, labels, mask=mask, reduction='mean')
return loss
else:
# 推理: Viterbi解码
return self.crf.decode(emissions, mask=mask)
# ===================== 构建词表 =====================
def build_vocab(sentences, min_freq=2):
counter = Counter(c for sent in sentences for c in sent)
vocab = {'[PAD]': 0, '[UNK]': 1}
for char, freq in counter.items():
if freq >= min_freq:
vocab[char] = len(vocab)
return vocab
char_vocab = build_vocab(train_sents)
print(f"词表大小: {len(char_vocab)}")
# ===================== 训练循环 =====================
def train_bilstm_crf(model, train_data, val_data, epochs=30, lr=1e-3):
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=3)
best_f1 = 0
for epoch in range(epochs):
model.train() # train()训练模式
total_loss = 0
for batch in train_data:
input_ids = batch['input_ids'].to(device) # 移至GPU/CPU
labels = batch['labels'].to(device)
mask = batch['mask'].to(device)
loss = model(input_ids, labels=labels, mask=mask)
optimizer.zero_grad() # 清零梯度
loss.backward() # 反向传播计算梯度
nn.utils.clip_grad_norm_(model.parameters(), 5.0)
optimizer.step() # 更新参数
total_loss += loss.item() # 将单元素张量转为Python数值
# 验证
val_f1 = evaluate_ner(model, val_data)
scheduler.step(val_f1)
print(f"Epoch {epoch+1}/{epochs} | Loss: {total_loss/len(train_data):.4f} | Val F1: {val_f1:.4f}")
if val_f1 > best_f1:
best_f1 = val_f1
torch.save(model.state_dict(), "best_bilstm_crf.pth")
print(f" ✅ 保存最佳模型, F1={best_f1:.4f}")
return best_f1
2.4 方案二:BERT-NER¶
from transformers import BertForTokenClassification, BertTokenizerFast
# ===================== BERT NER数据处理 =====================
tokenizer = BertTokenizerFast.from_pretrained("bert-base-chinese")
def tokenize_and_align_labels(sentences, labels, max_length=128):
"""BERT tokenization + 标签对齐"""
tokenized_inputs = tokenizer(
[list(s) for s in sentences],
is_split_into_words=True,
padding="max_length",
truncation=True,
max_length=max_length,
return_tensors="pt"
)
aligned_labels = []
for i, label_seq in enumerate(labels):
word_ids = tokenized_inputs.word_ids(batch_index=i)
label_ids = []
for word_id in word_ids:
if word_id is None:
label_ids.append(-100) # 特殊token忽略
else:
label_ids.append(LABEL2ID[label_seq[word_id]])
aligned_labels.append(label_ids)
tokenized_inputs["labels"] = torch.tensor(aligned_labels)
return tokenized_inputs
# ===================== BERT NER模型训练 =====================
model = BertForTokenClassification.from_pretrained(
"bert-base-chinese",
num_labels=len(LABEL_LIST),
id2label=ID2LABEL,
label2id=LABEL2ID,
)
from seqeval.metrics import classification_report as seq_report
from seqeval.metrics import f1_score as seq_f1
def compute_ner_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
true_labels, pred_labels = [], []
for pred_seq, label_seq in zip(predictions, labels):
true_seq, pred_seq_filtered = [], []
for p, l in zip(pred_seq, label_seq):
if l != -100:
true_seq.append(ID2LABEL[l])
pred_seq_filtered.append(ID2LABEL[p])
true_labels.append(true_seq)
pred_labels.append(pred_seq_filtered)
f1 = seq_f1(true_labels, pred_labels, average='micro')
return {"f1": f1}
training_args = TrainingArguments(
output_dir="./results/ner_bert",
num_train_epochs=10,
per_device_train_batch_size=32,
per_device_eval_batch_size=64,
learning_rate=3e-5,
weight_decay=0.01,
warmup_ratio=0.1,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1",
fp16=True,
report_to="none",
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_tokenized,
eval_dataset=val_tokenized,
compute_metrics=compute_ner_metrics,
)
trainer.train()
print("✅ BERT-NER训练完成!")
2.5 NER评估与推理¶
# ===================== 实体提取推理 =====================
class NERPredictor:
def __init__(self, model_path, tokenizer_name="bert-base-chinese"):
self.tokenizer = BertTokenizerFast.from_pretrained(tokenizer_name)
self.model = BertForTokenClassification.from_pretrained(model_path)
self.model.eval()
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(self.device)
@torch.no_grad() # 禁用梯度计算,节省内存
def predict(self, text):
"""预测单条文本的实体"""
chars = list(text)
inputs = self.tokenizer(
chars, is_split_into_words=True,
return_tensors="pt", padding=True, truncation=True, max_length=256
).to(self.device)
outputs = self.model(**inputs)
preds = torch.argmax(outputs.logits, dim=-1)[0].cpu().numpy()
# 对齐到原始字符
word_ids = inputs.word_ids(0)
entities = []
current_entity = None
for idx, word_id in enumerate(word_ids):
if word_id is None:
continue
label = ID2LABEL[preds[idx]]
char = chars[word_id]
if label.startswith('B-'):
if current_entity:
entities.append(current_entity)
current_entity = {'type': label[2:], 'text': char, 'start': word_id}
elif label.startswith('I-') and current_entity and label[2:] == current_entity['type']:
current_entity['text'] += char
else:
if current_entity:
entities.append(current_entity)
current_entity = None
if current_entity:
entities.append(current_entity)
return entities
def batch_predict(self, texts):
return [self.predict(text) for text in texts]
# 使用示例
predictor = NERPredictor("./results/ner_bert/best")
test_texts = [
"李明在北京大学计算机系读研究生",
"马云于1999年在杭州创立了阿里巴巴集团",
"华为公司总部位于深圳市龙岗区",
]
for text in test_texts:
entities = predictor.predict(text)
print(f"\n输入: {text}")
for ent in entities:
print(f" [{ent['type']}] {ent['text']}")
输出示例:
输入: 李明在北京大学计算机系读研究生
[PER] 李明
[ORG] 北京大学
输入: 马云于1999年在杭州创立了阿里巴巴集团
[PER] 马云
[LOC] 杭州
[ORG] 阿里巴巴集团
2.6 方案对比¶
| 指标 | BiLSTM-CRF | BERT-NER |
|---|---|---|
| F1 (micro) | ~88% | ~94% |
| 训练速度 | 快 (5min/epoch) | 慢 (15min/epoch) |
| 推理速度 | ~5000 sent/s | ~500 sent/s |
| 模型大小 | ~20MB | ~400MB |
| 低资源场景 | 较好 | 需较多数据 |
🔮 P3: RAG问答系统¶
3.1 项目简介¶
构建生产级RAG(检索增强生成)问答系统,完整覆盖文档解析 → 分块 → 向量化 → 检索 → 生成全流程。
技术栈:LangChain、ChromaDB、OpenAI/本地LLM、Sentence-Transformers
3.2 文档处理Pipeline¶
from langchain_community.document_loaders import (
PyPDFLoader, TextLoader, UnstructuredMarkdownLoader,
DirectoryLoader
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
import hashlib
# ===================== 文档加载 =====================
class DocumentProcessor:
"""多格式文档处理器"""
LOADER_MAP = {
'.pdf': PyPDFLoader,
'.txt': TextLoader,
'.md': UnstructuredMarkdownLoader,
}
def __init__(self, chunk_size=500, chunk_overlap=100):
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""],
length_function=len,
)
def load_documents(self, file_paths):
"""加载多个文档"""
all_docs = []
for path in file_paths:
suffix = Path(path).suffix.lower()
loader_cls = self.LOADER_MAP.get(suffix)
if loader_cls:
try: # try/except捕获异常
docs = loader_cls(str(path)).load()
for doc in docs:
doc.metadata['source'] = str(path)
doc.metadata['doc_id'] = hashlib.md5(doc.page_content.encode()).hexdigest()[:8] # 切片操作,取前n个元素
all_docs.extend(docs)
print(f" ✅ {path}: {len(docs)} 页")
except Exception as e:
print(f" ❌ {path}: {e}")
return all_docs
def split_documents(self, documents):
"""智能分块"""
chunks = self.splitter.split_documents(documents)
# 添加chunk索引
for i, chunk in enumerate(chunks):
chunk.metadata['chunk_id'] = i
chunk.metadata['chunk_length'] = len(chunk.page_content)
print(f"✅ 分块完成: {len(documents)} 文档 → {len(chunks)} 块")
print(f" 平均长度: {sum(c.metadata['chunk_length'] for c in chunks) / len(chunks):.0f} 字符")
return chunks
# 使用示例
processor = DocumentProcessor(chunk_size=500, chunk_overlap=100)
file_paths = list(Path("knowledge_base").glob("**/*.pdf")) + list(Path("knowledge_base").glob("**/*.md"))
documents = processor.load_documents(file_paths)
chunks = processor.split_documents(documents)
3.3 向量存储与检索¶
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
# ===================== Embedding模型 =====================
embed_model = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-zh-v1.5", # 中文SOTA embedding
model_kwargs={'device': 'cuda'},
encode_kwargs={'normalize_embeddings': True, 'batch_size': 64}
)
# ===================== 创建向量数据库 =====================
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embed_model,
collection_name="knowledge_base",
persist_directory="./chroma_db",
)
# Chroma 0.4+ 自动持久化,无需手动调用 persist()
print(f"✅ 向量数据库已创建: {vectorstore._collection.count()} 条记录")
# ===================== 检索器 =====================
# 基础检索: 语义相似度 Top-K
base_retriever = vectorstore.as_retriever(
search_type="mmr", # MMR多样性检索
search_kwargs={
"k": 10, # 返回数量
"fetch_k": 30, # MMR候选池
"lambda_mult": 0.7, # 相关性 vs 多样性权重
}
)
# 重排序: Cross-Encoder精排
reranker = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-large")
compressor = CrossEncoderReranker(model=reranker, top_n=5)
retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever,
)
# 检索测试
query = "什么是Transformer的自注意力机制?"
docs = retriever.invoke(query)
print(f"\n🔍 查询: {query}")
for i, doc in enumerate(docs):
print(f" [{i+1}] ({doc.metadata.get('source', 'N/A')}) {doc.page_content[:100]}...")
3.4 RAG问答Chain¶
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain.callbacks import StreamingStdOutCallbackHandler
# ===================== LLM配置 =====================
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1,
max_tokens=1024,
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()],
)
# ===================== RAG Prompt模板 =====================
RAG_PROMPT = ChatPromptTemplate.from_template("""你是一个专业的AI助手。请根据以下检索到的上下文回答用户问题。
要求:
1. 只使用上下文中的信息回答,不要编造
2. 如果上下文不包含答案,明确说"根据现有资料无法回答"
3. 回答要准确、简洁、有条理
4. 如果引用了具体内容,请标注来源
上下文:
{context}
用户问题:{question}
回答:""")
# ===================== 构建RAG Chain =====================
class RAGSystem:
def __init__(self, retriever, llm, prompt):
self.retriever = retriever
self.llm = llm
self.prompt = prompt
def query(self, question, return_sources=True):
"""RAG问答"""
# 1. 检索
docs = self.retriever.invoke(question)
# 2. 构建上下文
context = "\n\n---\n\n".join([
f"[来源: {doc.metadata.get('source', 'unknown')}]\n{doc.page_content}"
for doc in docs
])
# 3. 生成回答
messages = self.prompt.format_messages(context=context, question=question)
response = self.llm.invoke(messages)
result = {
"answer": response.content,
"question": question,
}
if return_sources:
result["sources"] = [
{
"content": doc.page_content[:200],
"source": doc.metadata.get("source", ""),
"chunk_id": doc.metadata.get("chunk_id", ""),
}
for doc in docs
]
return result
def chat(self, question):
"""简单问答接口"""
result = self.query(question)
print(f"\n📖 参考来源:")
for i, src in enumerate(result.get("sources", [])):
print(f" [{i+1}] {src['source']}")
return result["answer"]
# ===================== 使用RAG系统 =====================
rag = RAGSystem(retriever, llm, RAG_PROMPT)
# 交互式问答
questions = [
"什么是Transformer的自注意力机制?",
"BERT和GPT的主要区别是什么?",
"如何使用LoRA进行大模型微调?",
]
for q in questions:
print(f"\n{'='*60}")
print(f"❓ {q}")
print(f"{'='*60}")
answer = rag.chat(q)
3.5 Streamlit Web界面¶
import streamlit as st
st.set_page_config(page_title="📚 RAG问答系统", layout="wide")
st.title("📚 RAG智能问答系统")
# 初始化RAG (缓存)
@st.cache_resource
def init_rag():
# ... 初始化retriever, llm, rag_system
return rag_system
rag = init_rag()
# 聊天界面
if "messages" not in st.session_state:
st.session_state.messages = []
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
if "sources" in msg:
with st.expander("📖 参考来源"):
for src in msg["sources"]:
st.caption(f"📄 {src['source']}: {src['content'][:150]}...")
if prompt := st.chat_input("输入你的问题..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("🔍 检索中..."):
result = rag.query(prompt)
st.markdown(result["answer"])
with st.expander("📖 参考来源"):
for src in result.get("sources", []):
st.caption(f"📄 {src['source']}: {src['content'][:150]}...")
st.session_state.messages.append({
"role": "assistant",
"content": result["answer"],
"sources": result.get("sources", [])
})
# streamlit run app.py
3.6 RAG评估¶
from ragas import evaluate
from ragas.metrics import (
faithfulness, # 忠实度:回答是否基于上下文
answer_relevancy, # 相关性:回答是否切题
context_precision, # 上下文精度
context_recall, # 上下文召回
)
# 评估数据集
eval_questions = ["什么是注意力机制?", "BERT如何预训练?"]
eval_ground_truths = ["注意力机制是...", "BERT使用MLM和NSP..."]
# 收集RAG输出
eval_data = []
for q, gt in zip(eval_questions, eval_ground_truths):
result = rag.query(q)
eval_data.append({
"question": q,
"answer": result["answer"],
"contexts": [s["content"] for s in result["sources"]],
"ground_truth": gt,
})
# RAGAS评估
from datasets import Dataset
eval_dataset = Dataset.from_list(eval_data)
scores = evaluate(eval_dataset, metrics=[faithfulness, answer_relevancy, context_precision, context_recall])
print(f"\n📊 RAG评估结果:")
print(f" 忠实度: {scores['faithfulness']:.4f}")
print(f" 相关性: {scores['answer_relevancy']:.4f}")
print(f" 上下文精度: {scores['context_precision']:.4f}")
print(f" 上下文召回: {scores['context_recall']:.4f}")
📊 项目总结与简历包装¶
简历描述模板¶
文本分类系统:基于BERT微调构建中文文本分类服务,采用Label Smoothing + Cosine退火策略,混合精度训练加速2倍;封装FastAPI服务支持批量推理,F1达96.3%,QPS达500+。
命名实体识别系统:实现BiLSTM-CRF和BERT-NER双方案中文NER,BERT方案在MSRA测试集F1达94.2%;设计实体提取Pipeline支持PER/LOC/ORG三类实体,部署为RESTful API。
RAG智能问答系统:基于LangChain构建企业知识库问答系统,BGE Embedding + ChromaDB向量检索 + Cross-Encoder重排序,RAGAS忠实度达92.1%;支持PDF/Markdown多格式文档,Streamlit交互界面秒级响应。
💡 学习建议:P1学微调基础 → P2学序列标注 → P3学工程化RAG,每个项目先跑通pipeline再优化细节。