📖 第5章:序列标注¶
学习时间:8小时 难度星级:⭐⭐⭐⭐ 前置知识:文本分类、CRF概念、BiLSTM 学习目标:掌握NER和POS任务,能实现BiLSTM-CRF和BERT-NER
📋 目录¶
1. 序列标注概述¶
1.1 什么是序列标注¶
序列标注(Sequence Labeling)是为输入序列中的每个元素分配一个标签的任务。
输入: 小 明 在 北 京 大 学 学 习
标签: B-PER E-PER O B-ORG I-ORG I-ORG E-ORG O O
输入: I love New York City
标签: O O B-LOC I-LOC E-LOC
1.2 核心任务¶
sequence_labeling_tasks = {
"命名实体识别(NER)": {
"目标": "识别文本中的实体及其类型",
"输入": "乔布斯在加利福尼亚创立了苹果公司",
"输出": "[乔布斯/PER] 在 [加利福尼亚/LOC] 创立了 [苹果公司/ORG]",
"实体类型": ["人名PER", "地名LOC", "机构名ORG", "时间TIME"],
},
"词性标注(POS)": {
"目标": "为每个词标注词性",
"输入": "小明在学校学习",
"输出": "小明/NR 在/P 学校/NN 学习/VV",
"标签集": ["名词NN", "动词VV", "形容词JJ", "副词RB"],
},
"中文分词": {
"目标": "将连续文本切分为词",
"输入": "自然语言处理",
"输出": "B I I I E (BIES标注)",
},
}
2. 标注体系¶
2.1 BIO标注¶
B-X: 实体X的开始 (Begin)
I-X: 实体X的内部 (Inside)
O: 非实体 (Outside)
示例:
小 明 在 北 京 大 学 读 书
B-PER I-PER O B-ORG I-ORG I-ORG I-ORG O O
2.2 BIOES标注(更常用)¶
B-X: 实体X的开始 (Begin)
I-X: 实体X的内部 (Inside)
O: 非实体 (Outside)
E-X: 实体X的结束 (End)
S-X: 单字实体X (Single)
示例:
小 明 在 北 京 大 学 读 书
B-PER E-PER O B-ORG I-ORG I-ORG E-ORG O O
def bio_to_entities(tokens, tags):
"""从BIO标签序列中提取实体"""
entities = []
current_entity = None
current_type = None
start = None
for i, (token, tag) in enumerate(zip(tokens, tags)): # enumerate同时获取索引和元素 # zip按位置配对
if tag.startswith('B-'):
# 保存上一个实体
if current_entity is not None:
entities.append({
'entity': ''.join(current_entity),
'type': current_type,
'start': start,
'end': i
})
current_entity = [token]
current_type = tag[2:]
start = i
elif tag.startswith('I-') and current_entity is not None:
current_entity.append(token)
else:
if current_entity is not None:
entities.append({
'entity': ''.join(current_entity),
'type': current_type,
'start': start,
'end': i
})
current_entity = None
if current_entity is not None:
entities.append({
'entity': ''.join(current_entity),
'type': current_type,
'start': start,
'end': len(tokens)
})
return entities
# 测试
tokens = list("小明在北京大学读书")
tags = ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "I-ORG", "I-ORG", "O", "O"]
entities = bio_to_entities(tokens, tags)
print("提取的实体:")
for e in entities:
print(f" [{e['type']}] {e['entity']} (位置: {e['start']}-{e['end']})")
3. HMM模型¶
3.1 HMM原理¶
隐马尔可夫模型将序列标注看作隐状态序列推断问题。
三个核心要素: - 初始概率 \(\pi\):初始状态的概率分布 - 转移概率 \(A\):从一个状态转移到另一个状态的概率 - 发射概率 \(B\):某个状态生成某个观测的概率
3.2 HMM实现¶
import numpy as np
from collections import defaultdict
class HMM:
"""隐马尔可夫模型用于序列标注"""
def __init__(self):
self.states = []
self.observations = []
self.start_prob = {} # 初始概率
self.trans_prob = {} # 转移概率
self.emit_prob = {} # 发射概率
def train(self, sequences):
"""
从标注数据中学习HMM参数
sequences: [[(观测1, 状态1), (观测2, 状态2), ...], ...]
"""
state_counts = defaultdict(int) # defaultdict访问不存在的键时返回默认值
start_counts = defaultdict(int)
trans_counts = defaultdict(lambda: defaultdict(int))
emit_counts = defaultdict(lambda: defaultdict(int))
for seq in sequences:
# 初始状态
start_counts[seq[0][1]] += 1
for i, (obs, state) in enumerate(seq):
state_counts[state] += 1
emit_counts[state][obs] += 1
if i > 0:
prev_state = seq[i-1][1]
trans_counts[prev_state][state] += 1
# 计算概率(加平滑)
self.states = list(state_counts.keys())
total_starts = sum(start_counts.values())
for state in self.states:
self.start_prob[state] = (start_counts.get(state, 0) + 1) / (total_starts + len(self.states))
for state in self.states:
total = sum(trans_counts[state].values()) + len(self.states)
self.trans_prob[state] = {}
for next_state in self.states:
self.trans_prob[state][next_state] = (trans_counts[state].get(next_state, 0) + 1) / total
for state in self.states:
total = sum(emit_counts[state].values())
self.emit_prob[state] = defaultdict(lambda: 1e-10)
for obs, count in emit_counts[state].items():
self.emit_prob[state][obs] = count / total
def viterbi(self, observations):
"""Viterbi算法解码"""
T = len(observations)
# 初始化
viterbi_prob = [{} for _ in range(T)]
backpointer = [{} for _ in range(T)]
for state in self.states:
viterbi_prob[0][state] = np.log(self.start_prob.get(state, 1e-10)) + \
np.log(self.emit_prob[state].get(observations[0], 1e-10))
backpointer[0][state] = None
# 递推
for t in range(1, T):
for state in self.states:
max_prob = float('-inf')
max_state = None
for prev_state in self.states:
prob = viterbi_prob[t-1][prev_state] + \
np.log(self.trans_prob[prev_state].get(state, 1e-10)) + \
np.log(self.emit_prob[state].get(observations[t], 1e-10))
if prob > max_prob:
max_prob = prob
max_state = prev_state
viterbi_prob[t][state] = max_prob
backpointer[t][state] = max_state
# 回溯
best_path = [None] * T
best_state = max(viterbi_prob[T-1], key=viterbi_prob[T-1].get)
best_path[T-1] = best_state
for t in range(T-2, -1, -1):
best_path[t] = backpointer[t+1][best_path[t+1]]
return best_path
# 训练HMM进行简单的NER
train_sequences = [
[("小", "B-PER"), ("明", "I-PER"), ("在", "O"), ("北", "B-LOC"), ("京", "I-LOC"), ("工", "O"), ("作", "O")],
[("张", "B-PER"), ("三", "I-PER"), ("去", "O"), ("上", "B-LOC"), ("海", "I-LOC"), ("出", "O"), ("差", "O")],
[("李", "B-PER"), ("四", "I-PER"), ("住", "O"), ("在", "O"), ("广", "B-LOC"), ("州", "I-LOC")],
[("王", "B-PER"), ("五", "I-PER"), ("是", "O"), ("北", "B-LOC"), ("京", "I-LOC"), ("人", "O")],
] * 10
hmm = HMM()
hmm.train(train_sequences)
# 测试
test_obs = list("小明住在上海")
pred_tags = hmm.viterbi(test_obs)
print("HMM NER结果:")
for char, tag in zip(test_obs, pred_tags):
print(f" {char} → {tag}")
entities = bio_to_entities(test_obs, pred_tags)
print(f"\n识别的实体: {entities}")
4. CRF模型¶
4.1 CRF原理¶
条件随机场(CRF)是序列标注的经典模型,它直接建模条件概率 \(P(Y|X)\)。
CRF vs HMM:
| 特性 | HMM | CRF |
|---|---|---|
| 模型类型 | 生成式 | 判别式 |
| 建模目标 | P(X,Y) | P(Y|X) |
| 独立性假设 | 观测独立 | 无需此假设 |
| 特征 | 只能用当前观测 | 可用全局特征 |
| 效果 | 较低 | 较高 |
4.2 CRF特征函数¶
# CRF的特征函数示例
def crf_features(words, prev_tag, current_tag, position):
"""CRF特征函数"""
features = []
word = words[position]
# 转移特征:前一个标签 → 当前标签
features.append(f"trans:{prev_tag}→{current_tag}")
# 发射特征:当前词+当前标签
features.append(f"emit:{word}+{current_tag}")
# 上下文特征
if position > 0:
features.append(f"prev_word:{words[position-1]}+{current_tag}")
if position < len(words) - 1:
features.append(f"next_word:{words[position+1]}+{current_tag}")
# 词形特征
if word.isdigit():
features.append(f"is_digit+{current_tag}")
# 字符类型特征(中文NER常用)
if '\u4e00' <= word <= '\u9fff':
features.append(f"is_chinese+{current_tag}")
return features
# 示例
words = list("小明在北京工作")
tags = ["B-PER", "I-PER", "O", "B-LOC", "I-LOC", "O", "O"]
print("CRF特征示例:")
for i in range(1, len(words)):
feats = crf_features(words, tags[i-1], tags[i], i)
print(f" 位置{i} '{words[i]}': {feats}")
4.3 使用sklearn-crfsuite¶
# pip install sklearn-crfsuite
def word2features(sent, i):
"""为CRF提取特征"""
word = sent[i]
features = {
'word': word,
'word.isdigit': word.isdigit(),
}
if i > 0:
features['prev_word'] = sent[i-1]
features['prev_word+word'] = sent[i-1] + word
else:
features['BOS'] = True
if i < len(sent) - 1:
features['next_word'] = sent[i+1]
features['word+next_word'] = word + sent[i+1]
else:
features['EOS'] = True
if i > 1:
features['prev2_word'] = sent[i-2]
if i < len(sent) - 2:
features['next2_word'] = sent[i+2]
return features
def sent2features(sent):
return [word2features(sent, i) for i in range(len(sent))]
# 准备训练数据
train_sents = [
list("小明在北京大学读书"),
list("张三去了上海交通大学"),
list("李四在广州华南理工工作"),
]
train_labels = [
["B-PER", "E-PER", "O", "B-ORG", "I-ORG", "I-ORG", "E-ORG", "O", "O"],
["B-PER", "E-PER", "O", "O", "B-ORG", "I-ORG", "I-ORG", "I-ORG", "E-ORG"],
["B-PER", "E-PER", "O", "B-LOC", "E-LOC", "B-ORG", "I-ORG", "I-ORG", "E-ORG", "O", "O"],
]
X_train = [sent2features(s) for s in train_sents]
y_train = train_labels
try: # try/except捕获异常
import sklearn_crfsuite
crf = sklearn_crfsuite.CRF(
algorithm='lbfgs',
c1=0.1, c2=0.1,
max_iterations=100,
)
crf.fit(X_train, y_train)
# 预测
test_sent = list("王五在清华大学工作")
X_test = [sent2features(test_sent)]
pred = crf.predict(X_test)[0]
print("CRF NER结果:")
for char, tag in zip(test_sent, pred):
print(f" {char} → {tag}")
except ImportError:
print("请安装 sklearn-crfsuite: pip install sklearn-crfsuite")
5. BiLSTM-CRF¶
5.1 架构¶
BiLSTM-CRF 架构:
输入: 小 明 在 北 京 大 学 读 书
│ │ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ Embedding Layer │
└─────────────────────────────────────────────┘
│ │ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ BiLSTM Layer │
│ → LSTM →→→→→→→→→→→→→→→→→→→→→→→→→→ │
│ ← LSTM ←←←←←←←←←←←←←←←←←←←←←←←←← │
└─────────────────────────────────────────────┘
│ │ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ Emission Scores Layer (Linear) │
└─────────────────────────────────────────────┘
│ │ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ CRF Layer │
│ 学习标签之间的转移约束 │
│ 如 B-PER → I-PER ✓, B-PER → I-LOC ✗ │
└─────────────────────────────────────────────┘
│ │ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
输出: B-PER E-PER O B-ORG I-ORG I-ORG E-ORG O O
5.2 CRF层的作用¶
为什么需要CRF层?
没有CRF(只用BiLSTM + Softmax):
- 每个位置独立预测标签
- 可能产生非法序列,如 "I-PER O I-PER"(I不能紧跟O后面)
- 可能产生 "B-PER I-LOC"(类型不一致)
有CRF:
- 学习标签转移矩阵
- 保证输出的标签序列满足约束
- 使用Viterbi算法找到全局最优序列
5.3 PyTorch实现BiLSTM-CRF¶
import torch
import torch.nn as nn
class CRFLayer(nn.Module): # 继承nn.Module定义网络层
"""CRF层"""
def __init__(self, num_tags):
super().__init__() # super()调用父类方法
self.num_tags = num_tags
# 转移矩阵 transitions[i][j] = 从标签i转移到标签j的分数
self.transitions = nn.Parameter(torch.randn(num_tags, num_tags))
# 起始和结束约束
self.start_transitions = nn.Parameter(torch.randn(num_tags))
self.end_transitions = nn.Parameter(torch.randn(num_tags))
def forward_algorithm(self, emissions):
"""前向算法,计算log Z(x)"""
# emissions: (seq_len, num_tags)
seq_len = emissions.size(0)
# 初始化
score = self.start_transitions + emissions[0]
for i in range(1, seq_len):
# score: (num_tags,) → (num_tags, 1)
# transitions: (num_tags, num_tags),transitions[i][j] = 从tag i到tag j
# emissions[i]: (num_tags,) → (1, num_tags)
broadcast_score = score.unsqueeze(1) # unsqueeze增加一个维度
broadcast_emissions = emissions[i].unsqueeze(0)
# next_score[j] = logsumexp over i of (score[i] + trans[i,j] + emit[j])
next_score = broadcast_score + self.transitions + broadcast_emissions
score = torch.logsumexp(next_score, dim=0)
score = score + self.end_transitions
return torch.logsumexp(score, dim=0)
def score_sentence(self, emissions, tags):
"""计算给定标签序列的得分"""
seq_len = emissions.size(0)
score = self.start_transitions[tags[0]] + emissions[0, tags[0]]
for i in range(1, seq_len):
score += self.transitions[tags[i-1], tags[i]]
score += emissions[i, tags[i]]
score += self.end_transitions[tags[-1]] # [-1]负索引取最后元素
return score
def viterbi_decode(self, emissions):
"""
Viterbi解码 - 使用动态规划找到最优标签序列
算法步骤:
1. 初始化:第一个位置的分数 = 起始转移分数 + 发射分数
2. 递推:对每个位置,计算从所有标签转移过来的最大分数
3. 终止:找到最后一个位置的最高分标签
4. 回溯:从后向前追溯最优路径
时间复杂度:O(T × K²),T=序列长度,K=标签数
"""
seq_len = emissions.size(0)
# 初始化:第一个时间步的分数
score = self.start_transitions + emissions[0]
history = [] # 记录每个位置的最优前驱标签
# 递推:计算每个时间步的最大分数
for i in range(1, seq_len):
broadcast_score = score.unsqueeze(1)
broadcast_emissions = emissions[i].unsqueeze(0)
next_score = broadcast_score + self.transitions + broadcast_emissions
# 对每个当前标签,找到最优的前驱标签
next_score, indices = next_score.max(dim=0)
score = next_score
history.append(indices)
# 终止:加上结束转移分数
score += self.end_transitions
_, best_last_tag = score.max(dim=0)
# 回溯:从最后一个位置向前追溯最优路径
best_path = [best_last_tag.item()] # 将单元素张量转为Python数值
for hist in reversed(history):
best_last_tag = hist[best_last_tag]
best_path.insert(0, best_last_tag.item())
return best_path
def neg_log_likelihood(self, emissions, tags):
"""
计算负对数似然损失(CRF训练目标)
Loss = log Z(x) - score(x, y)
其中:
- Z(x) 是归一化因子(所有可能标签序列的分数之和)
- score(x, y) 是真实标签序列的分数
"""
# 前向算法计算归一化因子
log_partition = self.forward_algorithm(emissions)
# 计算真实序列的分数
score = self.score_sentence(emissions, tags)
# 返回负对数似然
return log_partition - score
class BiLSTMCRF(nn.Module):
"""BiLSTM-CRF序列标注模型"""
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags,
dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(
embedding_dim, hidden_dim // 2,
num_layers=1, bidirectional=True, batch_first=True,
)
self.dropout = nn.Dropout(dropout)
self.hidden2tag = nn.Linear(hidden_dim, num_tags)
self.crf = CRFLayer(num_tags)
self.num_tags = num_tags
def _get_emissions(self, x):
"""获取发射分数"""
embedded = self.embedding(x)
embedded = self.dropout(embedded)
lstm_out, _ = self.lstm(embedded)
lstm_out = self.dropout(lstm_out)
emissions = self.hidden2tag(lstm_out)
return emissions
def forward(self, x, tags):
"""计算负对数似然(训练用)"""
# x: (batch_size, seq_len), tags: (batch_size, seq_len)
emissions = self._get_emissions(x)
# 简化:先处理batch_size=1的情况
loss = 0
for i in range(x.size(0)):
emit = emissions[i] # (seq_len, num_tags)
tag = tags[i] # (seq_len,)
forward_score = self.crf.forward_algorithm(emit)
gold_score = self.crf.score_sentence(emit, tag)
loss += forward_score - gold_score
return loss / x.size(0)
def predict(self, x):
"""Viterbi解码(预测用)"""
emissions = self._get_emissions(x)
predictions = []
for i in range(x.size(0)):
path = self.crf.viterbi_decode(emissions[i])
predictions.append(path)
return predictions
# ==================
# 训练BiLSTM-CRF
# ==================
# 标签定义
tag2idx = {"O": 0, "B-PER": 1, "I-PER": 2, "B-LOC": 3, "I-LOC": 4,
"B-ORG": 5, "I-ORG": 6}
idx2tag = {i: t for t, i in tag2idx.items()}
# 词汇表
chars = set()
train_data_ner = [
(list("小明在北京工作"), ["B-PER", "I-PER", "O", "B-LOC", "I-LOC", "O", "O"]),
(list("张三去了上海"), ["B-PER", "I-PER", "O", "O", "B-LOC", "I-LOC"]),
(list("李四在清华大学"), ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "I-ORG", "I-ORG"]),
] * 20
for sent, _ in train_data_ner:
chars.update(sent)
char2idx = {"<PAD>": 0, "<UNK>": 1}
for c in sorted(chars):
char2idx[c] = len(char2idx)
vocab_size = len(char2idx)
# 创建模型
model = BiLSTMCRF(
vocab_size=vocab_size,
embedding_dim=64,
hidden_dim=128,
num_tags=len(tag2idx),
dropout=0.3,
)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
# 训练
model.train() # train()训练模式
for epoch in range(30):
total_loss = 0
for sent, tags in train_data_ner:
x = torch.LongTensor([[char2idx.get(c, 1) for c in sent]])
y = torch.LongTensor([[tag2idx[t] for t in tags]])
loss = model(x, y)
total_loss += loss.item()
optimizer.zero_grad() # 清零梯度
loss.backward() # 反向传播计算梯度
nn.utils.clip_grad_norm_(model.parameters(), 5.0)
optimizer.step() # 更新参数
if (epoch + 1) % 10 == 0:
avg_loss = total_loss / len(train_data_ner)
print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")
# 预测
model.eval()
test_sent = list("小明在北京工作")
x = torch.LongTensor([[char2idx.get(c, 1) for c in test_sent]])
with torch.no_grad(): # 禁用梯度计算,节省内存
pred = model.predict(x)[0]
pred_tags = [idx2tag[i] for i in pred]
print("\nBiLSTM-CRF预测结果:")
for char, tag in zip(test_sent, pred_tags):
print(f" {char} → {tag}")
entities = bio_to_entities(test_sent, pred_tags)
print(f"\n识别的实体: {entities}")
6. BERT-NER¶
6.1 BERT用于NER¶
# BERT-NER通过对每个token的输出做分类来实现序列标注
"""
BERT-NER架构:
输入: [CLS] 小 明 在 北 京 大 学 [SEP]
BERT: h₀ h₁ h₂ h₃ h₄ h₅ h₆ h₇ h₈
Linear: ↓ ↓ ↓ ↓ ↓ ↓ ↓
输出: B-P I-P O B-O I-O I-O I-O
"""
# 使用Hugging Face实现
from transformers import BertTokenizerFast, BertForTokenClassification
import torch
# 定义标签
label_list = ["O", "B-PER", "I-PER", "B-LOC", "I-LOC", "B-ORG", "I-ORG"]
label2id = {l: i for i, l in enumerate(label_list)}
id2label = {i: l for l, i in label2id.items()}
# 加载模型
model_name = "bert-base-chinese"
tokenizer = BertTokenizerFast.from_pretrained(model_name)
model = BertForTokenClassification.from_pretrained(
model_name,
num_labels=len(label_list),
id2label=id2label,
label2id=label2id,
)
# 分词并对齐标签
def tokenize_and_align_labels(text, labels):
"""分词并对齐标签"""
tokenized = tokenizer(
list(text),
is_split_into_words=True,
return_tensors='pt',
padding=True,
truncation=True,
max_length=128,
)
# 对齐标签
word_ids = tokenized.word_ids()
aligned_labels = []
for word_id in word_ids:
if word_id is None:
aligned_labels.append(-100) # 忽略[CLS]和[SEP]
else:
aligned_labels.append(label2id[labels[word_id]])
return tokenized, torch.tensor([aligned_labels])
# 训练示例
text = "小明在北京大学读书"
labels = ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "I-ORG", "I-ORG", "O", "O"]
tokenized, aligned = tokenize_and_align_labels(text, labels)
print(f"Tokens: {tokenizer.convert_ids_to_tokens(tokenized['input_ids'][0])}")
print(f"Labels: {aligned}")
# 预测
model.eval()
with torch.no_grad():
outputs = model(**tokenized)
predictions = outputs.logits.argmax(dim=-1)[0]
print("\nBERT-NER预测:")
tokens = tokenizer.convert_ids_to_tokens(tokenized['input_ids'][0])
for token, pred_id in zip(tokens, predictions):
if token not in ['[CLS]', '[SEP]', '[PAD]']:
pred_label = id2label[pred_id.item()]
print(f" {token} → {pred_label}")
7. SpaCy实战¶
# pip install spacy
# python -m spacy download zh_core_web_sm
try:
import spacy
nlp = spacy.load("zh_core_web_sm")
text = "小明在北京大学学习计算机科学,他的导师是张教授"
doc = nlp(text)
print("SpaCy NER结果:")
for ent in doc.ents:
print(f" [{ent.label_}] {ent.text} (位置: {ent.start_char}-{ent.end_char})")
print("\n词性标注:")
for token in doc:
print(f" {token.text}/{token.pos_}", end=" ")
print()
except OSError:
print("请安装中文模型: python -m spacy download zh_core_web_sm")
except ImportError:
print("请安装spacy: pip install spacy")
8. 完整NER项目¶
"""
完整NER项目:中文命名实体识别
"""
class NERSystem:
"""中文NER系统"""
def __init__(self, model_type="rule"):
self.model_type = model_type
self.patterns = self._build_patterns()
def _build_patterns(self):
"""构建规则模式"""
import re
return {
"DATE": re.compile(r'\d{4}[年/-]\d{1,2}[月/-]\d{1,2}[日号]?'),
"TIME": re.compile(r'\d{1,2}[点时:]\d{0,2}[分]?\d{0,2}[秒]?'),
"MONEY": re.compile(r'\d+(?:\.\d+)?[万亿]?[元美]?[元金币]'),
"PERCENT": re.compile(r'\d+(?:\.\d+)?%'),
"PHONE": re.compile(r'1[3-9]\d{9}'),
"EMAIL": re.compile(r'[\w.+-]+@[\w-]+\.[\w.]+'),
}
def rule_based_ner(self, text):
"""基于规则的NER"""
entities = []
for etype, pattern in self.patterns.items():
for match in pattern.finditer(text):
entities.append({
"text": match.group(),
"type": etype,
"start": match.start(),
"end": match.end(),
})
return sorted(entities, key=lambda x: x["start"]) # lambda匿名函数
def predict(self, text):
"""综合预测"""
return self.rule_based_ner(text)
def evaluate(self, predictions, gold_entities):
"""评估NER结果"""
pred_set = set((e["text"], e["type"]) for e in predictions)
gold_set = set((e["text"], e["type"]) for e in gold_entities)
tp = len(pred_set & gold_set)
precision = tp / len(pred_set) if pred_set else 0
recall = tp / len(gold_set) if gold_set else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
return {"precision": precision, "recall": recall, "f1": f1}
# 测试
ner = NERSystem()
text = "2024年3月15日,张三在北京大学参加了一场价值50万元的项目研讨会,联系电话13812345678"
entities = ner.predict(text)
print("NER识别结果:")
for e in entities:
print(f" [{e['type']}] {e['text']} (位置: {e['start']}-{e['end']})")
9. 面试要点¶
🔑 面试高频考点
考点1:为什么BiLSTM-CRF中需要CRF层?¶
✅ 标准答案要点:
1. BiLSTM独立预测每个位置的标签,可能产生非法序列
2. CRF层学习标签之间的转移约束(转移矩阵)
3. 例如:B-PER后面只能接I-PER或O,不能接I-LOC
4. CRF使用Viterbi算法找到全局最优标签序列
5. 实验证明CRF层能稳定提升1-2个F1点
考点2:CRF的训练和推理过程?¶
✅ 标准答案要点:
训练:
- 损失函数 = log Z(x) - Score(x, y*)
- 前向算法计算log Z(x)(配分函数)
- 直接计算正确路径得分Score(x, y*)
- 梯度下降优化
推理:
- 使用Viterbi算法找最优标签序列
- 时间复杂度O(T × K²),T=序列长度,K=标签数
- 动态规划思想,从左到右递推
考点3:NER的评估指标?¶
✅ 标准答案要点:
- 实体级别的P/R/F1(不是token级别!)
- 实体的边界和类型都要完全匹配才算正确
- 常用Micro和Macro F1
- 严格匹配 vs 部分匹配(宽松评估)
10. 练习题¶
📝 基础题¶
- 解释BIO和BIOES标注体系的区别,各有什么优缺点。
答案:BIO使用3类标签:B(实体开始)、I(实体内部)、O(非实体)。BIOES增加了E(实体结尾)和S(单字实体)共5类标签。BIO优点是标签集小、标注简单;缺点是实体边界不够清晰(两个相邻同类实体难以区分)。BIOES优点是边界信息更丰富,模型能更精确地识别实体起止位置,效果通常更好;缺点是标签集更大,标注成本更高,数据稀疏问题更明显。实践中BIOES在精度上略优于BIO。
- 手动进行Viterbi解码过程(给定发射分数和转移矩阵)。
答案:Viterbi是动态规划算法:①初始化:第一个时间步的分数=初始概率+发射分数;②递推:对每个时间步t的每个状态j,计算所有前驱状态i的"前一步最优分数+转移分数i→j+发射分数j",取最大值并记录回溯指针;③终止:在最后一步选分数最高的状态;④回溯:沿指针从后往前得到最优标签序列。时间复杂度从穷举的 \(O(N^T)\) 降为 \(O(T \cdot N^2)\)(T为序列长度,N为标签数)。
💻 编程题¶
- 实现一个完整的BiLSTM-CRF模型,在中文NER数据集上训练。
- 使用Hugging Face的BERT进行NER任务。
- 实现NER的评估函数,支持实体级别的P/R/F1计算。
🔬 思考题¶
- 嵌套实体(如"北京大学计算机系"中"北京大学"和"计算机系"嵌套)如何处理?
答案:传统BIO序列标注只能标注扁平实体。处理嵌套实体的方法:①多层序列标注:每层标注一种实体类型,允许同一token在不同层有不同标签;②Span-based方法:枚举所有可能文本片段,对每个span分类是否为实体,天然支持嵌套;③MRC框架:将NER转化为阅读理解问题(如"找出组织名"),预测答案span;④序列到集合:用Seq2Seq直接生成所有实体;⑤层叠CRF:先识别外层再识别内层。目前Span-based和MRC方法效果最好。
✅ 自我检查清单¶
□ 我理解BIO/BIOES标注体系
□ 我能解释HMM的三个核心要素
□ 我理解CRF和HMM的区别
□ 我能手写BiLSTM-CRF的PyTorch代码
□ 我知道CRF层为什么重要
□ 我了解BERT-NER的实现方式
□ 我完成了至少3道练习题
📚 延伸阅读¶
下一篇:06-文本生成 — 从语言模型到GPT的文本生成技术