跳转至

🛠️ NLP实战项目

3个完整项目:文本分类 → 命名实体识别 → RAG问答系统,覆盖数据处理、模型训练、评估部署全流程,附可运行代码。


📋 项目总览

项目 核心技术 难度 预计耗时
P1: 文本分类系统 BERT微调/HuggingFace ⭐⭐⭐ 2天
P2: 命名实体识别 BiLSTM-CRF/BERT-NER ⭐⭐⭐⭐ 3天
P3: RAG问答系统 LangChain/向量检索/LLM ⭐⭐⭐⭐⭐ 3天

📝 P1: 文本分类系统

1.1 项目简介

使用BERT在中文文本数据集上微调,构建完整的文本分类Pipeline,包含数据清洗、模型训练、评估和FastAPI部署。

技术栈:HuggingFace Transformers、Datasets、PyTorch、FastAPI

1.2 数据处理

Python
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import TrainingArguments, Trainer
from datasets import load_dataset, Dataset
import pandas as pd
import numpy as np
import re

# ===================== 数据清洗 =====================
def clean_text(text):
    """中文文本清洗"""
    if not isinstance(text, str):  # isinstance检查类型
        return ""
    text = re.sub(r'<[^>]+>', '', text)           # 去HTML标签
    text = re.sub(r'http\S+|www\.\S+', '', text)  # 去URL
    text = re.sub(r'@\w+', '', text)               # 去@提及
    text = re.sub(r'#\w+', '', text)               # 去话题标签
    text = re.sub(r'\s+', ' ', text).strip()        # 合并空白
    return text

# ===================== 加载数据 =====================
# 方法1: 从CSV加载
df = pd.read_csv("data/train.csv")  # columns: text, label
df['text'] = df['text'].apply(clean_text)
df = df[df['text'].str.len() > 5]  # 过滤太短的文本
print(f"数据量: {len(df)}, 类别分布:\n{df['label'].value_counts()}")

# 划分训练/验证集
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(df, test_size=0.15, stratify=df['label'], random_state=42)
train_dataset = Dataset.from_pandas(train_df[['text', 'label']].reset_index(drop=True))
val_dataset = Dataset.from_pandas(val_df[['text', 'label']].reset_index(drop=True))

# ===================== Tokenization =====================
MODEL_NAME = "bert-base-chinese"
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)

def tokenize_function(examples):
    return tokenizer(
        examples["text"],
        padding="max_length",
        truncation=True,
        max_length=256,
        return_tensors="pt"
    )

train_dataset = train_dataset.map(tokenize_function, batched=True, batch_size=1000)
val_dataset = val_dataset.map(tokenize_function, batched=True, batch_size=1000)

# 设置格式
train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
val_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
print(f"训练集: {len(train_dataset)}, 验证集: {len(val_dataset)}")

1.3 模型训练

Python
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report

# ===================== 评估指标 =====================
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='weighted')
    acc = accuracy_score(labels, predictions)
    return {"accuracy": acc, "f1": f1, "precision": precision, "recall": recall}

# ===================== 模型初始化 =====================
num_labels = df['label'].nunique()
label_names = sorted(df['label'].unique())

model = BertForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=num_labels,
    problem_type="single_label_classification"
)

# ===================== 训练配置 =====================
training_args = TrainingArguments(
    output_dir="./results/text_cls",
    num_train_epochs=5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=64,
    learning_rate=2e-5,
    weight_decay=0.01,
    warmup_ratio=0.1,
    lr_scheduler_type="cosine",
    # 评估与保存
    eval_strategy="steps",
    eval_steps=200,
    save_strategy="steps",
    save_steps=200,
    save_total_limit=3,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    # 混合精度
    fp16=True,
    # 日志
    logging_dir="./logs",
    logging_steps=50,
    report_to="none",
)

# ===================== 训练 =====================
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
)

trainer.train()
print("✅ 训练完成!")

# ===================== 最终评估 =====================
eval_results = trainer.evaluate()
print(f"\n验证集结果:")
print(f"  Accuracy: {eval_results['eval_accuracy']:.4f}")
print(f"  F1:       {eval_results['eval_f1']:.4f}")
print(f"  Precision:{eval_results['eval_precision']:.4f}")
print(f"  Recall:   {eval_results['eval_recall']:.4f}")

# 详细分类报告
predictions = trainer.predict(val_dataset)
preds = np.argmax(predictions.predictions, axis=-1)
print("\n" + classification_report(val_dataset['label'], preds, target_names=[str(l) for l in label_names]))

# 保存模型
trainer.save_model("./model/text_cls_bert")
tokenizer.save_pretrained("./model/text_cls_bert")
print("✅ 模型已保存到 ./model/text_cls_bert")

1.4 FastAPI部署

Python
from fastapi import FastAPI
from pydantic import BaseModel
from transformers import pipeline
import uvicorn

app = FastAPI(title="文本分类API")

# 加载模型
classifier = pipeline(
    "text-classification",
    model="./model/text_cls_bert",
    tokenizer="./model/text_cls_bert",
    device=0,  # GPU
    max_length=256,
    truncation=True,
)

class TextRequest(BaseModel):  # BaseModel Pydantic数据验证模型
    text: str

class BatchRequest(BaseModel):
    texts: list[str]

class PredictionResponse(BaseModel):
    text: str
    label: str
    confidence: float

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: TextRequest):  # async定义异步函数
    text = clean_text(request.text)
    result = classifier(text)[0]
    return PredictionResponse(
        text=request.text,
        label=result["label"],
        confidence=round(result["score"], 4)
    )

@app.post("/batch_predict", response_model=list[PredictionResponse])
async def batch_predict(request: BatchRequest):
    texts = [clean_text(t) for t in request.texts]
    results = classifier(texts)
    return [
        PredictionResponse(text=t, label=r["label"], confidence=round(r["score"], 4))
        for t, r in zip(request.texts, results)  # zip按位置配对
    ]

@app.get("/health")
async def health():
    return {"status": "ok", "model": "bert-base-chinese"}

# uvicorn main:app --host 0.0.0.0 --port 8000

🏷️ P2: 命名实体识别(NER)

2.1 项目简介

实现中文NER系统,对比BiLSTM-CRF和BERT-NER两种方案,支持人名、地名、组织名等实体识别。

技术栈:PyTorch、HuggingFace、torchcrf、seqeval

2.2 数据处理

Python
import json
from collections import Counter

# ===================== BIO标签体系 =====================
"""
标签说明:
  B-PER: 人名开始    I-PER: 人名内部
  B-LOC: 地名开始    I-LOC: 地名内部
  B-ORG: 组织名开始  I-ORG: 组织名内部
  O: 非实体
"""

LABEL_LIST = ['O', 'B-PER', 'I-PER', 'B-LOC', 'I-LOC', 'B-ORG', 'I-ORG']
LABEL2ID = {l: i for i, l in enumerate(LABEL_LIST)}  # enumerate同时获取索引和元素
ID2LABEL = {i: l for l, i in LABEL2ID.items()}

# ===================== 数据加载 =====================
def load_ner_data(filepath):
    """加载CONLL格式NER数据
    每行: 字\t标签, 空行分隔句子
    """
    sentences, labels = [], []
    cur_tokens, cur_labels = [], []

    with open(filepath, 'r', encoding='utf-8') as f:  # with自动管理文件关闭
        for line in f:
            line = line.strip()
            if not line:
                if cur_tokens:
                    sentences.append(cur_tokens)
                    labels.append(cur_labels)
                    cur_tokens, cur_labels = [], []
            else:
                parts = line.split('\t')
                if len(parts) == 2:
                    cur_tokens.append(parts[0])
                    cur_labels.append(parts[1])

    if cur_tokens:
        sentences.append(cur_tokens)
        labels.append(cur_labels)

    return sentences, labels

train_sents, train_labels = load_ner_data("data/ner_train.txt")
val_sents, val_labels = load_ner_data("data/ner_val.txt")
print(f"训练集: {len(train_sents)} 句, 验证集: {len(val_sents)} 句")

# 统计实体分布
all_labels = [l for seq in train_labels for l in seq]
print(f"标签分布: {Counter(all_labels)}")  # Counter统计元素出现次数

2.3 方案一:BiLSTM-CRF

Python
import torch
import torch.nn as nn
from torchcrf import CRF

# ===================== BiLSTM-CRF模型 =====================
class BiLSTMCRF(nn.Module):  # 继承nn.Module定义网络层
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_labels, pretrained_embeddings=None):
        super().__init__()  # super()调用父类方法
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        if pretrained_embeddings is not None:
            self.embedding.weight.data.copy_(pretrained_embeddings)

        self.lstm = nn.LSTM(
            embedding_dim, hidden_dim // 2,
            num_layers=2, bidirectional=True,
            batch_first=True, dropout=0.3
        )
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(hidden_dim, num_labels)
        self.crf = CRF(num_labels, batch_first=True)

    def forward(self, input_ids, labels=None, mask=None):
        embeds = self.dropout(self.embedding(input_ids))
        lstm_out, _ = self.lstm(embeds)
        emissions = self.fc(self.dropout(lstm_out))

        if labels is not None:
            # 训练: 计算CRF负对数似然
            loss = -self.crf(emissions, labels, mask=mask, reduction='mean')
            return loss
        else:
            # 推理: Viterbi解码
            return self.crf.decode(emissions, mask=mask)

# ===================== 构建词表 =====================
def build_vocab(sentences, min_freq=2):
    counter = Counter(c for sent in sentences for c in sent)
    vocab = {'[PAD]': 0, '[UNK]': 1}
    for char, freq in counter.items():
        if freq >= min_freq:
            vocab[char] = len(vocab)
    return vocab

char_vocab = build_vocab(train_sents)
print(f"词表大小: {len(char_vocab)}")

# ===================== 训练循环 =====================
def train_bilstm_crf(model, train_data, val_data, epochs=30, lr=1e-3):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=3)
    best_f1 = 0

    for epoch in range(epochs):
        model.train()  # train()训练模式
        total_loss = 0
        for batch in train_data:
            input_ids = batch['input_ids'].to(device)  # 移至GPU/CPU
            labels = batch['labels'].to(device)
            mask = batch['mask'].to(device)

            loss = model(input_ids, labels=labels, mask=mask)
            optimizer.zero_grad()  # 清零梯度
            loss.backward()  # 反向传播计算梯度
            nn.utils.clip_grad_norm_(model.parameters(), 5.0)
            optimizer.step()  # 更新参数
            total_loss += loss.item()  # 将单元素张量转为Python数值

        # 验证
        val_f1 = evaluate_ner(model, val_data)
        scheduler.step(val_f1)

        print(f"Epoch {epoch+1}/{epochs} | Loss: {total_loss/len(train_data):.4f} | Val F1: {val_f1:.4f}")

        if val_f1 > best_f1:
            best_f1 = val_f1
            torch.save(model.state_dict(), "best_bilstm_crf.pth")
            print(f"  ✅ 保存最佳模型, F1={best_f1:.4f}")

    return best_f1

2.4 方案二:BERT-NER

Python
from transformers import BertForTokenClassification, BertTokenizerFast

# ===================== BERT NER数据处理 =====================
tokenizer = BertTokenizerFast.from_pretrained("bert-base-chinese")

def tokenize_and_align_labels(sentences, labels, max_length=128):
    """BERT tokenization + 标签对齐"""
    tokenized_inputs = tokenizer(
        [list(s) for s in sentences],
        is_split_into_words=True,
        padding="max_length",
        truncation=True,
        max_length=max_length,
        return_tensors="pt"
    )

    aligned_labels = []
    for i, label_seq in enumerate(labels):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = []
        for word_id in word_ids:
            if word_id is None:
                label_ids.append(-100)  # 特殊token忽略
            else:
                label_ids.append(LABEL2ID[label_seq[word_id]])
        aligned_labels.append(label_ids)

    tokenized_inputs["labels"] = torch.tensor(aligned_labels)
    return tokenized_inputs

# ===================== BERT NER模型训练 =====================
model = BertForTokenClassification.from_pretrained(
    "bert-base-chinese",
    num_labels=len(LABEL_LIST),
    id2label=ID2LABEL,
    label2id=LABEL2ID,
)

from seqeval.metrics import classification_report as seq_report
from seqeval.metrics import f1_score as seq_f1

def compute_ner_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    true_labels, pred_labels = [], []
    for pred_seq, label_seq in zip(predictions, labels):
        true_seq, pred_seq_filtered = [], []
        for p, l in zip(pred_seq, label_seq):
            if l != -100:
                true_seq.append(ID2LABEL[l])
                pred_seq_filtered.append(ID2LABEL[p])
        true_labels.append(true_seq)
        pred_labels.append(pred_seq_filtered)

    f1 = seq_f1(true_labels, pred_labels, average='micro')
    return {"f1": f1}

training_args = TrainingArguments(
    output_dir="./results/ner_bert",
    num_train_epochs=10,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=64,
    learning_rate=3e-5,
    weight_decay=0.01,
    warmup_ratio=0.1,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    fp16=True,
    report_to="none",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized,
    eval_dataset=val_tokenized,
    compute_metrics=compute_ner_metrics,
)

trainer.train()
print("✅ BERT-NER训练完成!")

2.5 NER评估与推理

Python
# ===================== 实体提取推理 =====================
class NERPredictor:
    def __init__(self, model_path, tokenizer_name="bert-base-chinese"):
        self.tokenizer = BertTokenizerFast.from_pretrained(tokenizer_name)
        self.model = BertForTokenClassification.from_pretrained(model_path)
        self.model.eval()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

    @torch.no_grad()  # 禁用梯度计算,节省内存
    def predict(self, text):
        """预测单条文本的实体"""
        chars = list(text)
        inputs = self.tokenizer(
            chars, is_split_into_words=True,
            return_tensors="pt", padding=True, truncation=True, max_length=256
        ).to(self.device)

        outputs = self.model(**inputs)
        preds = torch.argmax(outputs.logits, dim=-1)[0].cpu().numpy()

        # 对齐到原始字符
        word_ids = inputs.word_ids(0)
        entities = []
        current_entity = None

        for idx, word_id in enumerate(word_ids):
            if word_id is None:
                continue
            label = ID2LABEL[preds[idx]]
            char = chars[word_id]

            if label.startswith('B-'):
                if current_entity:
                    entities.append(current_entity)
                current_entity = {'type': label[2:], 'text': char, 'start': word_id}
            elif label.startswith('I-') and current_entity and label[2:] == current_entity['type']:
                current_entity['text'] += char
            else:
                if current_entity:
                    entities.append(current_entity)
                    current_entity = None

        if current_entity:
            entities.append(current_entity)

        return entities

    def batch_predict(self, texts):
        return [self.predict(text) for text in texts]

# 使用示例
predictor = NERPredictor("./results/ner_bert/best")

test_texts = [
    "李明在北京大学计算机系读研究生",
    "马云于1999年在杭州创立了阿里巴巴集团",
    "华为公司总部位于深圳市龙岗区",
]

for text in test_texts:
    entities = predictor.predict(text)
    print(f"\n输入: {text}")
    for ent in entities:
        print(f"  [{ent['type']}] {ent['text']}")

输出示例:

Text Only
输入: 李明在北京大学计算机系读研究生
  [PER] 李明
  [ORG] 北京大学

输入: 马云于1999年在杭州创立了阿里巴巴集团
  [PER] 马云
  [LOC] 杭州
  [ORG] 阿里巴巴集团

2.6 方案对比

指标 BiLSTM-CRF BERT-NER
F1 (micro) ~88% ~94%
训练速度 快 (5min/epoch) 慢 (15min/epoch)
推理速度 ~5000 sent/s ~500 sent/s
模型大小 ~20MB ~400MB
低资源场景 较好 需较多数据

🔮 P3: RAG问答系统

3.1 项目简介

构建生产级RAG(检索增强生成)问答系统,完整覆盖文档解析 → 分块 → 向量化 → 检索 → 生成全流程。

技术栈:LangChain、ChromaDB、OpenAI/本地LLM、Sentence-Transformers

3.2 文档处理Pipeline

Python
from langchain_community.document_loaders import (
    PyPDFLoader, TextLoader, UnstructuredMarkdownLoader,
    DirectoryLoader
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
import hashlib

# ===================== 文档加载 =====================
class DocumentProcessor:
    """多格式文档处理器"""

    LOADER_MAP = {
        '.pdf': PyPDFLoader,
        '.txt': TextLoader,
        '.md': UnstructuredMarkdownLoader,
    }

    def __init__(self, chunk_size=500, chunk_overlap=100):
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""],
            length_function=len,
        )

    def load_documents(self, file_paths):
        """加载多个文档"""
        all_docs = []
        for path in file_paths:
            suffix = Path(path).suffix.lower()
            loader_cls = self.LOADER_MAP.get(suffix)
            if loader_cls:
                try:  # try/except捕获异常
                    docs = loader_cls(str(path)).load()
                    for doc in docs:
                        doc.metadata['source'] = str(path)
                        doc.metadata['doc_id'] = hashlib.md5(doc.page_content.encode()).hexdigest()[:8]  # 切片操作,取前n个元素
                    all_docs.extend(docs)
                    print(f"  ✅ {path}: {len(docs)} 页")
                except Exception as e:
                    print(f"  ❌ {path}: {e}")
        return all_docs

    def split_documents(self, documents):
        """智能分块"""
        chunks = self.splitter.split_documents(documents)
        # 添加chunk索引
        for i, chunk in enumerate(chunks):
            chunk.metadata['chunk_id'] = i
            chunk.metadata['chunk_length'] = len(chunk.page_content)

        print(f"✅ 分块完成: {len(documents)} 文档 → {len(chunks)} 块")
        print(f"   平均长度: {sum(c.metadata['chunk_length'] for c in chunks) / len(chunks):.0f} 字符")
        return chunks

# 使用示例
processor = DocumentProcessor(chunk_size=500, chunk_overlap=100)
file_paths = list(Path("knowledge_base").glob("**/*.pdf")) + list(Path("knowledge_base").glob("**/*.md"))
documents = processor.load_documents(file_paths)
chunks = processor.split_documents(documents)

3.3 向量存储与检索

Python
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

# ===================== Embedding模型 =====================
embed_model = HuggingFaceEmbeddings(
    model_name="BAAI/bge-large-zh-v1.5",  # 中文SOTA embedding
    model_kwargs={'device': 'cuda'},
    encode_kwargs={'normalize_embeddings': True, 'batch_size': 64}
)

# ===================== 创建向量数据库 =====================
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embed_model,
    collection_name="knowledge_base",
    persist_directory="./chroma_db",
)
# Chroma 0.4+ 自动持久化,无需手动调用 persist()
print(f"✅ 向量数据库已创建: {vectorstore._collection.count()} 条记录")

# ===================== 检索器 =====================
# 基础检索: 语义相似度 Top-K
base_retriever = vectorstore.as_retriever(
    search_type="mmr",        # MMR多样性检索
    search_kwargs={
        "k": 10,              # 返回数量
        "fetch_k": 30,        # MMR候选池
        "lambda_mult": 0.7,   # 相关性 vs 多样性权重
    }
)

# 重排序: Cross-Encoder精排
reranker = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-large")
compressor = CrossEncoderReranker(model=reranker, top_n=5)

retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=base_retriever,
)

# 检索测试
query = "什么是Transformer的自注意力机制?"
docs = retriever.invoke(query)
print(f"\n🔍 查询: {query}")
for i, doc in enumerate(docs):
    print(f"  [{i+1}] ({doc.metadata.get('source', 'N/A')}) {doc.page_content[:100]}...")

3.4 RAG问答Chain

Python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain.callbacks import StreamingStdOutCallbackHandler

# ===================== LLM配置 =====================
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.1,
    max_tokens=1024,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
)

# ===================== RAG Prompt模板 =====================
RAG_PROMPT = ChatPromptTemplate.from_template("""你是一个专业的AI助手。请根据以下检索到的上下文回答用户问题。

要求:
1. 只使用上下文中的信息回答,不要编造
2. 如果上下文不包含答案,明确说"根据现有资料无法回答"
3. 回答要准确、简洁、有条理
4. 如果引用了具体内容,请标注来源

上下文:
{context}

用户问题:{question}

回答:""")

# ===================== 构建RAG Chain =====================
class RAGSystem:
    def __init__(self, retriever, llm, prompt):
        self.retriever = retriever
        self.llm = llm
        self.prompt = prompt

    def query(self, question, return_sources=True):
        """RAG问答"""
        # 1. 检索
        docs = self.retriever.invoke(question)

        # 2. 构建上下文
        context = "\n\n---\n\n".join([
            f"[来源: {doc.metadata.get('source', 'unknown')}]\n{doc.page_content}"
            for doc in docs
        ])

        # 3. 生成回答
        messages = self.prompt.format_messages(context=context, question=question)
        response = self.llm.invoke(messages)

        result = {
            "answer": response.content,
            "question": question,
        }

        if return_sources:
            result["sources"] = [
                {
                    "content": doc.page_content[:200],
                    "source": doc.metadata.get("source", ""),
                    "chunk_id": doc.metadata.get("chunk_id", ""),
                }
                for doc in docs
            ]

        return result

    def chat(self, question):
        """简单问答接口"""
        result = self.query(question)
        print(f"\n📖 参考来源:")
        for i, src in enumerate(result.get("sources", [])):
            print(f"  [{i+1}] {src['source']}")
        return result["answer"]

# ===================== 使用RAG系统 =====================
rag = RAGSystem(retriever, llm, RAG_PROMPT)

# 交互式问答
questions = [
    "什么是Transformer的自注意力机制?",
    "BERT和GPT的主要区别是什么?",
    "如何使用LoRA进行大模型微调?",
]

for q in questions:
    print(f"\n{'='*60}")
    print(f"❓ {q}")
    print(f"{'='*60}")
    answer = rag.chat(q)

3.5 Streamlit Web界面

Python
import streamlit as st

st.set_page_config(page_title="📚 RAG问答系统", layout="wide")
st.title("📚 RAG智能问答系统")

# 初始化RAG (缓存)
@st.cache_resource
def init_rag():
    # ... 初始化retriever, llm, rag_system
    return rag_system

rag = init_rag()

# 聊天界面
if "messages" not in st.session_state:
    st.session_state.messages = []

for msg in st.session_state.messages:
    with st.chat_message(msg["role"]):
        st.markdown(msg["content"])
        if "sources" in msg:
            with st.expander("📖 参考来源"):
                for src in msg["sources"]:
                    st.caption(f"📄 {src['source']}: {src['content'][:150]}...")

if prompt := st.chat_input("输入你的问题..."):
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)

    with st.chat_message("assistant"):
        with st.spinner("🔍 检索中..."):
            result = rag.query(prompt)
        st.markdown(result["answer"])

        with st.expander("📖 参考来源"):
            for src in result.get("sources", []):
                st.caption(f"📄 {src['source']}: {src['content'][:150]}...")

    st.session_state.messages.append({
        "role": "assistant",
        "content": result["answer"],
        "sources": result.get("sources", [])
    })

# streamlit run app.py

3.6 RAG评估

Python
from ragas import evaluate
from ragas.metrics import (
    faithfulness,       # 忠实度:回答是否基于上下文
    answer_relevancy,   # 相关性:回答是否切题
    context_precision,  # 上下文精度
    context_recall,     # 上下文召回
)

# 评估数据集
eval_questions = ["什么是注意力机制?", "BERT如何预训练?"]
eval_ground_truths = ["注意力机制是...", "BERT使用MLM和NSP..."]

# 收集RAG输出
eval_data = []
for q, gt in zip(eval_questions, eval_ground_truths):
    result = rag.query(q)
    eval_data.append({
        "question": q,
        "answer": result["answer"],
        "contexts": [s["content"] for s in result["sources"]],
        "ground_truth": gt,
    })

# RAGAS评估
from datasets import Dataset
eval_dataset = Dataset.from_list(eval_data)
scores = evaluate(eval_dataset, metrics=[faithfulness, answer_relevancy, context_precision, context_recall])
print(f"\n📊 RAG评估结果:")
print(f"  忠实度:     {scores['faithfulness']:.4f}")
print(f"  相关性:     {scores['answer_relevancy']:.4f}")
print(f"  上下文精度: {scores['context_precision']:.4f}")
print(f"  上下文召回: {scores['context_recall']:.4f}")

📊 项目总结与简历包装

简历描述模板

文本分类系统:基于BERT微调构建中文文本分类服务,采用Label Smoothing + Cosine退火策略,混合精度训练加速2倍;封装FastAPI服务支持批量推理,F1达96.3%,QPS达500+。

命名实体识别系统:实现BiLSTM-CRF和BERT-NER双方案中文NER,BERT方案在MSRA测试集F1达94.2%;设计实体提取Pipeline支持PER/LOC/ORG三类实体,部署为RESTful API。

RAG智能问答系统:基于LangChain构建企业知识库问答系统,BGE Embedding + ChromaDB向量检索 + Cross-Encoder重排序,RAGAS忠实度达92.1%;支持PDF/Markdown多格式文档,Streamlit交互界面秒级响应。


💡 学习建议:P1学微调基础 → P2学序列标注 → P3学工程化RAG,每个项目先跑通pipeline再优化细节。