跳转至

📖 第5章:序列标注

序列标注

学习时间:8小时 难度星级:⭐⭐⭐⭐ 前置知识:文本分类、CRF概念、BiLSTM 学习目标:掌握NER和POS任务,能实现BiLSTM-CRF和BERT-NER


📋 目录


1. 序列标注概述

1.1 什么是序列标注

序列标注(Sequence Labeling)是为输入序列中的每个元素分配一个标签的任务。

Text Only
输入:  小    明    在    北    京    大    学    学    习
标签:  B-PER E-PER O     B-ORG I-ORG I-ORG E-ORG O     O

输入:  I     love  New   York  City
标签:  O     O     B-LOC I-LOC E-LOC

1.2 核心任务

Python
sequence_labeling_tasks = {
    "命名实体识别(NER)": {
        "目标": "识别文本中的实体及其类型",
        "输入": "乔布斯在加利福尼亚创立了苹果公司",
        "输出": "[乔布斯/PER] 在 [加利福尼亚/LOC] 创立了 [苹果公司/ORG]",
        "实体类型": ["人名PER", "地名LOC", "机构名ORG", "时间TIME"],
    },
    "词性标注(POS)": {
        "目标": "为每个词标注词性",
        "输入": "小明在学校学习",
        "输出": "小明/NR 在/P 学校/NN 学习/VV",
        "标签集": ["名词NN", "动词VV", "形容词JJ", "副词RB"],
    },
    "中文分词": {
        "目标": "将连续文本切分为词",
        "输入": "自然语言处理",
        "输出": "B I I I E (BIES标注)",
    },
}

2. 标注体系

2.1 BIO标注

Text Only
B-X: 实体X的开始 (Begin)
I-X: 实体X的内部 (Inside)
O:   非实体 (Outside)

示例:
小  明  在  北  京  大  学  读  书
B-PER I-PER O B-ORG I-ORG I-ORG I-ORG O O

2.2 BIOES标注(更常用)

Text Only
B-X: 实体X的开始 (Begin)
I-X: 实体X的内部 (Inside)
O:   非实体 (Outside)
E-X: 实体X的结束 (End)
S-X: 单字实体X (Single)

示例:
小  明  在  北  京  大  学  读  书
B-PER E-PER O B-ORG I-ORG I-ORG E-ORG O O
Python
def bio_to_entities(tokens, tags):
    """从BIO标签序列中提取实体"""
    entities = []
    current_entity = None
    current_type = None
    start = None

    for i, (token, tag) in enumerate(zip(tokens, tags)):  # enumerate同时获取索引和元素  # zip按位置配对
        if tag.startswith('B-'):
            # 保存上一个实体
            if current_entity is not None:
                entities.append({
                    'entity': ''.join(current_entity),
                    'type': current_type,
                    'start': start,
                    'end': i
                })
            current_entity = [token]
            current_type = tag[2:]
            start = i
        elif tag.startswith('I-') and current_entity is not None:
            current_entity.append(token)
        else:
            if current_entity is not None:
                entities.append({
                    'entity': ''.join(current_entity),
                    'type': current_type,
                    'start': start,
                    'end': i
                })
                current_entity = None

    if current_entity is not None:
        entities.append({
            'entity': ''.join(current_entity),
            'type': current_type,
            'start': start,
            'end': len(tokens)
        })

    return entities

# 测试
tokens = list("小明在北京大学读书")
tags = ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "I-ORG", "I-ORG", "O", "O"]

entities = bio_to_entities(tokens, tags)
print("提取的实体:")
for e in entities:
    print(f"  [{e['type']}] {e['entity']} (位置: {e['start']}-{e['end']})")

3. HMM模型

3.1 HMM原理

隐马尔可夫模型将序列标注看作隐状态序列推断问题。

三个核心要素: - 初始概率 \(\pi\):初始状态的概率分布 - 转移概率 \(A\):从一个状态转移到另一个状态的概率 - 发射概率 \(B\):某个状态生成某个观测的概率

\[P(Y, X) = \pi(y_1) \prod_{t=2}^{T} A(y_t|y_{t-1}) \prod_{t=1}^{T} B(x_t|y_t)\]

3.2 HMM实现

Python
import numpy as np
from collections import defaultdict

class HMM:
    """隐马尔可夫模型用于序列标注"""

    def __init__(self):
        self.states = []
        self.observations = []
        self.start_prob = {}      # 初始概率
        self.trans_prob = {}      # 转移概率
        self.emit_prob = {}       # 发射概率

    def train(self, sequences):
        """
        从标注数据中学习HMM参数
        sequences: [[(观测1, 状态1), (观测2, 状态2), ...], ...]
        """
        state_counts = defaultdict(int)  # defaultdict访问不存在的键时返回默认值
        start_counts = defaultdict(int)
        trans_counts = defaultdict(lambda: defaultdict(int))
        emit_counts = defaultdict(lambda: defaultdict(int))

        for seq in sequences:
            # 初始状态
            start_counts[seq[0][1]] += 1

            for i, (obs, state) in enumerate(seq):
                state_counts[state] += 1
                emit_counts[state][obs] += 1

                if i > 0:
                    prev_state = seq[i-1][1]
                    trans_counts[prev_state][state] += 1

        # 计算概率(加平滑)
        self.states = list(state_counts.keys())
        total_starts = sum(start_counts.values())

        for state in self.states:
            self.start_prob[state] = (start_counts.get(state, 0) + 1) / (total_starts + len(self.states))

        for state in self.states:
            total = sum(trans_counts[state].values()) + len(self.states)
            self.trans_prob[state] = {}
            for next_state in self.states:
                self.trans_prob[state][next_state] = (trans_counts[state].get(next_state, 0) + 1) / total

        for state in self.states:
            total = sum(emit_counts[state].values())
            self.emit_prob[state] = defaultdict(lambda: 1e-10)
            for obs, count in emit_counts[state].items():
                self.emit_prob[state][obs] = count / total

    def viterbi(self, observations):
        """Viterbi算法解码"""
        T = len(observations)

        # 初始化
        viterbi_prob = [{} for _ in range(T)]
        backpointer = [{} for _ in range(T)]

        for state in self.states:
            viterbi_prob[0][state] = np.log(self.start_prob.get(state, 1e-10)) + \
                                      np.log(self.emit_prob[state].get(observations[0], 1e-10))
            backpointer[0][state] = None

        # 递推
        for t in range(1, T):
            for state in self.states:
                max_prob = float('-inf')
                max_state = None

                for prev_state in self.states:
                    prob = viterbi_prob[t-1][prev_state] + \
                           np.log(self.trans_prob[prev_state].get(state, 1e-10)) + \
                           np.log(self.emit_prob[state].get(observations[t], 1e-10))

                    if prob > max_prob:
                        max_prob = prob
                        max_state = prev_state

                viterbi_prob[t][state] = max_prob
                backpointer[t][state] = max_state

        # 回溯
        best_path = [None] * T
        best_state = max(viterbi_prob[T-1], key=viterbi_prob[T-1].get)
        best_path[T-1] = best_state

        for t in range(T-2, -1, -1):
            best_path[t] = backpointer[t+1][best_path[t+1]]

        return best_path

# 训练HMM进行简单的NER
train_sequences = [
    [("小", "B-PER"), ("明", "I-PER"), ("在", "O"), ("北", "B-LOC"), ("京", "I-LOC"), ("工", "O"), ("作", "O")],
    [("张", "B-PER"), ("三", "I-PER"), ("去", "O"), ("上", "B-LOC"), ("海", "I-LOC"), ("出", "O"), ("差", "O")],
    [("李", "B-PER"), ("四", "I-PER"), ("住", "O"), ("在", "O"), ("广", "B-LOC"), ("州", "I-LOC")],
    [("王", "B-PER"), ("五", "I-PER"), ("是", "O"), ("北", "B-LOC"), ("京", "I-LOC"), ("人", "O")],
] * 10

hmm = HMM()
hmm.train(train_sequences)

# 测试
test_obs = list("小明住在上海")
pred_tags = hmm.viterbi(test_obs)
print("HMM NER结果:")
for char, tag in zip(test_obs, pred_tags):
    print(f"  {char}{tag}")

entities = bio_to_entities(test_obs, pred_tags)
print(f"\n识别的实体: {entities}")

4. CRF模型

4.1 CRF原理

条件随机场(CRF)是序列标注的经典模型,它直接建模条件概率 \(P(Y|X)\)

\[P(Y|X) = \frac{1}{Z(X)} \exp\left(\sum_{t=1}^{T} \sum_{k} \lambda_k f_k(y_{t-1}, y_t, X, t)\right)\]

CRF vs HMM

特性 HMM CRF
模型类型 生成式 判别式
建模目标 P(X,Y) P(Y|X)
独立性假设 观测独立 无需此假设
特征 只能用当前观测 可用全局特征
效果 较低 较高

4.2 CRF特征函数

Python
# CRF的特征函数示例
def crf_features(words, prev_tag, current_tag, position):
    """CRF特征函数"""
    features = []
    word = words[position]

    # 转移特征:前一个标签 → 当前标签
    features.append(f"trans:{prev_tag}{current_tag}")

    # 发射特征:当前词+当前标签
    features.append(f"emit:{word}+{current_tag}")

    # 上下文特征
    if position > 0:
        features.append(f"prev_word:{words[position-1]}+{current_tag}")
    if position < len(words) - 1:
        features.append(f"next_word:{words[position+1]}+{current_tag}")

    # 词形特征
    if word.isdigit():
        features.append(f"is_digit+{current_tag}")

    # 字符类型特征(中文NER常用)
    if '\u4e00' <= word <= '\u9fff':
        features.append(f"is_chinese+{current_tag}")

    return features

# 示例
words = list("小明在北京工作")
tags = ["B-PER", "I-PER", "O", "B-LOC", "I-LOC", "O", "O"]

print("CRF特征示例:")
for i in range(1, len(words)):
    feats = crf_features(words, tags[i-1], tags[i], i)
    print(f"  位置{i} '{words[i]}': {feats}")

4.3 使用sklearn-crfsuite

Python
# pip install sklearn-crfsuite

def word2features(sent, i):
    """为CRF提取特征"""
    word = sent[i]
    features = {
        'word': word,
        'word.isdigit': word.isdigit(),
    }

    if i > 0:
        features['prev_word'] = sent[i-1]
        features['prev_word+word'] = sent[i-1] + word
    else:
        features['BOS'] = True

    if i < len(sent) - 1:
        features['next_word'] = sent[i+1]
        features['word+next_word'] = word + sent[i+1]
    else:
        features['EOS'] = True

    if i > 1:
        features['prev2_word'] = sent[i-2]

    if i < len(sent) - 2:
        features['next2_word'] = sent[i+2]

    return features

def sent2features(sent):
    return [word2features(sent, i) for i in range(len(sent))]

# 准备训练数据
train_sents = [
    list("小明在北京大学读书"),
    list("张三去了上海交通大学"),
    list("李四在广州华南理工工作"),
]
train_labels = [
    ["B-PER", "E-PER", "O", "B-ORG", "I-ORG", "I-ORG", "E-ORG", "O", "O"],
    ["B-PER", "E-PER", "O", "O", "B-ORG", "I-ORG", "I-ORG", "I-ORG", "E-ORG"],
    ["B-PER", "E-PER", "O", "B-LOC", "E-LOC", "B-ORG", "I-ORG", "I-ORG", "E-ORG", "O", "O"],
]

X_train = [sent2features(s) for s in train_sents]
y_train = train_labels

try:  # try/except捕获异常
    import sklearn_crfsuite

    crf = sklearn_crfsuite.CRF(
        algorithm='lbfgs',
        c1=0.1, c2=0.1,
        max_iterations=100,
    )
    crf.fit(X_train, y_train)

    # 预测
    test_sent = list("王五在清华大学工作")
    X_test = [sent2features(test_sent)]
    pred = crf.predict(X_test)[0]

    print("CRF NER结果:")
    for char, tag in zip(test_sent, pred):
        print(f"  {char}{tag}")
except ImportError:
    print("请安装 sklearn-crfsuite: pip install sklearn-crfsuite")

5. BiLSTM-CRF

5.1 架构

Text Only
BiLSTM-CRF 架构:

输入:  小  明  在  北  京  大  学  读  书
        │   │   │   │   │   │   │   │   │
        ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼
┌─────────────────────────────────────────────┐
│              Embedding Layer                │
└─────────────────────────────────────────────┘
        │   │   │   │   │   │   │   │   │
        ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼
┌─────────────────────────────────────────────┐
│              BiLSTM Layer                   │
│   → LSTM →→→→→→→→→→→→→→→→→→→→→→→→→→      │
│   ← LSTM ←←←←←←←←←←←←←←←←←←←←←←←←←      │
└─────────────────────────────────────────────┘
        │   │   │   │   │   │   │   │   │
        ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼
┌─────────────────────────────────────────────┐
│         Emission Scores Layer (Linear)       │
└─────────────────────────────────────────────┘
        │   │   │   │   │   │   │   │   │
        ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼
┌─────────────────────────────────────────────┐
│              CRF Layer                       │
│    学习标签之间的转移约束                       │
│    如 B-PER → I-PER ✓, B-PER → I-LOC ✗      │
└─────────────────────────────────────────────┘
        │   │   │   │   │   │   │   │   │
        ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼   ▼
输出: B-PER E-PER O B-ORG I-ORG I-ORG E-ORG O  O

5.2 CRF层的作用

Text Only
为什么需要CRF层?

没有CRF(只用BiLSTM + Softmax):
- 每个位置独立预测标签
- 可能产生非法序列,如 "I-PER O I-PER"(I不能紧跟O后面)
- 可能产生 "B-PER I-LOC"(类型不一致)

有CRF:
- 学习标签转移矩阵
- 保证输出的标签序列满足约束
- 使用Viterbi算法找到全局最优序列

5.3 PyTorch实现BiLSTM-CRF

Python
import torch
import torch.nn as nn

class CRFLayer(nn.Module):  # 继承nn.Module定义网络层
    """CRF层"""

    def __init__(self, num_tags):
        super().__init__()  # super()调用父类方法
        self.num_tags = num_tags
        # 转移矩阵 transitions[i][j] = 从标签i转移到标签j的分数
        self.transitions = nn.Parameter(torch.randn(num_tags, num_tags))
        # 起始和结束约束
        self.start_transitions = nn.Parameter(torch.randn(num_tags))
        self.end_transitions = nn.Parameter(torch.randn(num_tags))

    def forward_algorithm(self, emissions):
        """前向算法,计算log Z(x)"""
        # emissions: (seq_len, num_tags)
        seq_len = emissions.size(0)

        # 初始化
        score = self.start_transitions + emissions[0]

        for i in range(1, seq_len):
            # score: (num_tags,) → (num_tags, 1)
            # transitions: (num_tags, num_tags),transitions[i][j] = 从tag i到tag j
            # emissions[i]: (num_tags,) → (1, num_tags)
            broadcast_score = score.unsqueeze(1)  # unsqueeze增加一个维度
            broadcast_emissions = emissions[i].unsqueeze(0)

            # next_score[j] = logsumexp over i of (score[i] + trans[i,j] + emit[j])
            next_score = broadcast_score + self.transitions + broadcast_emissions
            score = torch.logsumexp(next_score, dim=0)

        score = score + self.end_transitions
        return torch.logsumexp(score, dim=0)

    def score_sentence(self, emissions, tags):
        """计算给定标签序列的得分"""
        seq_len = emissions.size(0)

        score = self.start_transitions[tags[0]] + emissions[0, tags[0]]

        for i in range(1, seq_len):
            score += self.transitions[tags[i-1], tags[i]]
            score += emissions[i, tags[i]]

        score += self.end_transitions[tags[-1]]  # [-1]负索引取最后元素
        return score

    def viterbi_decode(self, emissions):
        """
        Viterbi解码 - 使用动态规划找到最优标签序列

        算法步骤:
        1. 初始化:第一个位置的分数 = 起始转移分数 + 发射分数
        2. 递推:对每个位置,计算从所有标签转移过来的最大分数
        3. 终止:找到最后一个位置的最高分标签
        4. 回溯:从后向前追溯最优路径

        时间复杂度:O(T × K²),T=序列长度,K=标签数
        """
        seq_len = emissions.size(0)

        # 初始化:第一个时间步的分数
        score = self.start_transitions + emissions[0]
        history = []  # 记录每个位置的最优前驱标签

        # 递推:计算每个时间步的最大分数
        for i in range(1, seq_len):
            broadcast_score = score.unsqueeze(1)
            broadcast_emissions = emissions[i].unsqueeze(0)
            next_score = broadcast_score + self.transitions + broadcast_emissions

            # 对每个当前标签,找到最优的前驱标签
            next_score, indices = next_score.max(dim=0)
            score = next_score
            history.append(indices)

        # 终止:加上结束转移分数
        score += self.end_transitions
        _, best_last_tag = score.max(dim=0)

        # 回溯:从最后一个位置向前追溯最优路径
        best_path = [best_last_tag.item()]  # 将单元素张量转为Python数值
        for hist in reversed(history):
            best_last_tag = hist[best_last_tag]
            best_path.insert(0, best_last_tag.item())

        return best_path

    def neg_log_likelihood(self, emissions, tags):
        """
        计算负对数似然损失(CRF训练目标)

        Loss = log Z(x) - score(x, y)
        其中:
        - Z(x) 是归一化因子(所有可能标签序列的分数之和)
        - score(x, y) 是真实标签序列的分数
        """
        # 前向算法计算归一化因子
        log_partition = self.forward_algorithm(emissions)
        # 计算真实序列的分数
        score = self.score_sentence(emissions, tags)
        # 返回负对数似然
        return log_partition - score

class BiLSTMCRF(nn.Module):
    """BiLSTM-CRF序列标注模型"""

    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags,
                 dropout=0.5):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(
            embedding_dim, hidden_dim // 2,
            num_layers=1, bidirectional=True, batch_first=True,
        )
        self.dropout = nn.Dropout(dropout)
        self.hidden2tag = nn.Linear(hidden_dim, num_tags)
        self.crf = CRFLayer(num_tags)
        self.num_tags = num_tags

    def _get_emissions(self, x):
        """获取发射分数"""
        embedded = self.embedding(x)
        embedded = self.dropout(embedded)
        lstm_out, _ = self.lstm(embedded)
        lstm_out = self.dropout(lstm_out)
        emissions = self.hidden2tag(lstm_out)
        return emissions

    def forward(self, x, tags):
        """计算负对数似然(训练用)"""
        # x: (batch_size, seq_len), tags: (batch_size, seq_len)
        emissions = self._get_emissions(x)

        # 简化:先处理batch_size=1的情况
        loss = 0
        for i in range(x.size(0)):
            emit = emissions[i]  # (seq_len, num_tags)
            tag = tags[i]        # (seq_len,)

            forward_score = self.crf.forward_algorithm(emit)
            gold_score = self.crf.score_sentence(emit, tag)
            loss += forward_score - gold_score

        return loss / x.size(0)

    def predict(self, x):
        """Viterbi解码(预测用)"""
        emissions = self._get_emissions(x)

        predictions = []
        for i in range(x.size(0)):
            path = self.crf.viterbi_decode(emissions[i])
            predictions.append(path)

        return predictions

# ==================
# 训练BiLSTM-CRF
# ==================

# 标签定义
tag2idx = {"O": 0, "B-PER": 1, "I-PER": 2, "B-LOC": 3, "I-LOC": 4,
           "B-ORG": 5, "I-ORG": 6}
idx2tag = {i: t for t, i in tag2idx.items()}

# 词汇表
chars = set()
train_data_ner = [
    (list("小明在北京工作"), ["B-PER", "I-PER", "O", "B-LOC", "I-LOC", "O", "O"]),
    (list("张三去了上海"), ["B-PER", "I-PER", "O", "O", "B-LOC", "I-LOC"]),
    (list("李四在清华大学"), ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "I-ORG", "I-ORG"]),
] * 20

for sent, _ in train_data_ner:
    chars.update(sent)

char2idx = {"<PAD>": 0, "<UNK>": 1}
for c in sorted(chars):
    char2idx[c] = len(char2idx)

vocab_size = len(char2idx)

# 创建模型
model = BiLSTMCRF(
    vocab_size=vocab_size,
    embedding_dim=64,
    hidden_dim=128,
    num_tags=len(tag2idx),
    dropout=0.3,
)

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# 训练
model.train()  # train()训练模式
for epoch in range(30):
    total_loss = 0
    for sent, tags in train_data_ner:
        x = torch.LongTensor([[char2idx.get(c, 1) for c in sent]])
        y = torch.LongTensor([[tag2idx[t] for t in tags]])

        loss = model(x, y)
        total_loss += loss.item()

        optimizer.zero_grad()  # 清零梯度
        loss.backward()  # 反向传播计算梯度
        nn.utils.clip_grad_norm_(model.parameters(), 5.0)
        optimizer.step()  # 更新参数

    if (epoch + 1) % 10 == 0:
        avg_loss = total_loss / len(train_data_ner)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

# 预测
model.eval()
test_sent = list("小明在北京工作")
x = torch.LongTensor([[char2idx.get(c, 1) for c in test_sent]])
with torch.no_grad():  # 禁用梯度计算,节省内存
    pred = model.predict(x)[0]
    pred_tags = [idx2tag[i] for i in pred]

print("\nBiLSTM-CRF预测结果:")
for char, tag in zip(test_sent, pred_tags):
    print(f"  {char}{tag}")

entities = bio_to_entities(test_sent, pred_tags)
print(f"\n识别的实体: {entities}")

6. BERT-NER

6.1 BERT用于NER

Python
# BERT-NER通过对每个token的输出做分类来实现序列标注

"""
BERT-NER架构:

输入:  [CLS] 小 明 在 北 京 大 学 [SEP]

BERT:    h₀  h₁ h₂ h₃ h₄ h₅ h₆ h₇  h₈

Linear:       ↓  ↓  ↓  ↓  ↓  ↓  ↓

输出:        B-P I-P  O B-O I-O I-O I-O
"""

# 使用Hugging Face实现
from transformers import BertTokenizerFast, BertForTokenClassification
import torch

# 定义标签
label_list = ["O", "B-PER", "I-PER", "B-LOC", "I-LOC", "B-ORG", "I-ORG"]
label2id = {l: i for i, l in enumerate(label_list)}
id2label = {i: l for l, i in label2id.items()}

# 加载模型
model_name = "bert-base-chinese"
tokenizer = BertTokenizerFast.from_pretrained(model_name)
model = BertForTokenClassification.from_pretrained(
    model_name,
    num_labels=len(label_list),
    id2label=id2label,
    label2id=label2id,
)

# 分词并对齐标签
def tokenize_and_align_labels(text, labels):
    """分词并对齐标签"""
    tokenized = tokenizer(
        list(text),
        is_split_into_words=True,
        return_tensors='pt',
        padding=True,
        truncation=True,
        max_length=128,
    )

    # 对齐标签
    word_ids = tokenized.word_ids()
    aligned_labels = []
    for word_id in word_ids:
        if word_id is None:
            aligned_labels.append(-100)  # 忽略[CLS]和[SEP]
        else:
            aligned_labels.append(label2id[labels[word_id]])

    return tokenized, torch.tensor([aligned_labels])

# 训练示例
text = "小明在北京大学读书"
labels = ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "I-ORG", "I-ORG", "O", "O"]

tokenized, aligned = tokenize_and_align_labels(text, labels)
print(f"Tokens: {tokenizer.convert_ids_to_tokens(tokenized['input_ids'][0])}")
print(f"Labels: {aligned}")

# 预测
model.eval()
with torch.no_grad():
    outputs = model(**tokenized)
    predictions = outputs.logits.argmax(dim=-1)[0]

print("\nBERT-NER预测:")
tokens = tokenizer.convert_ids_to_tokens(tokenized['input_ids'][0])
for token, pred_id in zip(tokens, predictions):
    if token not in ['[CLS]', '[SEP]', '[PAD]']:
        pred_label = id2label[pred_id.item()]
        print(f"  {token}{pred_label}")

7. SpaCy实战

Python
# pip install spacy
# python -m spacy download zh_core_web_sm

try:
    import spacy

    nlp = spacy.load("zh_core_web_sm")

    text = "小明在北京大学学习计算机科学,他的导师是张教授"
    doc = nlp(text)

    print("SpaCy NER结果:")
    for ent in doc.ents:
        print(f"  [{ent.label_}] {ent.text} (位置: {ent.start_char}-{ent.end_char})")

    print("\n词性标注:")
    for token in doc:
        print(f"  {token.text}/{token.pos_}", end=" ")
    print()

except OSError:
    print("请安装中文模型: python -m spacy download zh_core_web_sm")
except ImportError:
    print("请安装spacy: pip install spacy")

8. 完整NER项目

Python
"""
完整NER项目:中文命名实体识别
"""

class NERSystem:
    """中文NER系统"""

    def __init__(self, model_type="rule"):
        self.model_type = model_type
        self.patterns = self._build_patterns()

    def _build_patterns(self):
        """构建规则模式"""
        import re
        return {
            "DATE": re.compile(r'\d{4}[年/-]\d{1,2}[月/-]\d{1,2}[日号]?'),
            "TIME": re.compile(r'\d{1,2}[点时:]\d{0,2}[分]?\d{0,2}[秒]?'),
            "MONEY": re.compile(r'\d+(?:\.\d+)?[万亿]?[元美]?[元金币]'),
            "PERCENT": re.compile(r'\d+(?:\.\d+)?%'),
            "PHONE": re.compile(r'1[3-9]\d{9}'),
            "EMAIL": re.compile(r'[\w.+-]+@[\w-]+\.[\w.]+'),
        }

    def rule_based_ner(self, text):
        """基于规则的NER"""
        entities = []
        for etype, pattern in self.patterns.items():
            for match in pattern.finditer(text):
                entities.append({
                    "text": match.group(),
                    "type": etype,
                    "start": match.start(),
                    "end": match.end(),
                })
        return sorted(entities, key=lambda x: x["start"])  # lambda匿名函数

    def predict(self, text):
        """综合预测"""
        return self.rule_based_ner(text)

    def evaluate(self, predictions, gold_entities):
        """评估NER结果"""
        pred_set = set((e["text"], e["type"]) for e in predictions)
        gold_set = set((e["text"], e["type"]) for e in gold_entities)

        tp = len(pred_set & gold_set)
        precision = tp / len(pred_set) if pred_set else 0
        recall = tp / len(gold_set) if gold_set else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

        return {"precision": precision, "recall": recall, "f1": f1}

# 测试
ner = NERSystem()
text = "2024年3月15日,张三在北京大学参加了一场价值50万元的项目研讨会,联系电话13812345678"
entities = ner.predict(text)

print("NER识别结果:")
for e in entities:
    print(f"  [{e['type']}] {e['text']} (位置: {e['start']}-{e['end']})")

9. 面试要点

🔑 面试高频考点

考点1:为什么BiLSTM-CRF中需要CRF层?

Text Only
✅ 标准答案要点:
1. BiLSTM独立预测每个位置的标签,可能产生非法序列
2. CRF层学习标签之间的转移约束(转移矩阵)
3. 例如:B-PER后面只能接I-PER或O,不能接I-LOC
4. CRF使用Viterbi算法找到全局最优标签序列
5. 实验证明CRF层能稳定提升1-2个F1点

考点2:CRF的训练和推理过程?

Text Only
✅ 标准答案要点:
训练:
- 损失函数 = log Z(x) - Score(x, y*)
- 前向算法计算log Z(x)(配分函数)
- 直接计算正确路径得分Score(x, y*)
- 梯度下降优化

推理:
- 使用Viterbi算法找最优标签序列
- 时间复杂度O(T × K²),T=序列长度,K=标签数
- 动态规划思想,从左到右递推

考点3:NER的评估指标?

Text Only
✅ 标准答案要点:
- 实体级别的P/R/F1(不是token级别!)
- 实体的边界和类型都要完全匹配才算正确
- 常用Micro和Macro F1
- 严格匹配 vs 部分匹配(宽松评估)

10. 练习题

📝 基础题

  1. 解释BIO和BIOES标注体系的区别,各有什么优缺点。

答案BIO使用3类标签:B(实体开始)、I(实体内部)、O(非实体)。BIOES增加了E(实体结尾)和S(单字实体)共5类标签。BIO优点是标签集小、标注简单;缺点是实体边界不够清晰(两个相邻同类实体难以区分)。BIOES优点是边界信息更丰富,模型能更精确地识别实体起止位置,效果通常更好;缺点是标签集更大,标注成本更高,数据稀疏问题更明显。实践中BIOES在精度上略优于BIO。

  1. 手动进行Viterbi解码过程(给定发射分数和转移矩阵)。

答案:Viterbi是动态规划算法:①初始化:第一个时间步的分数=初始概率+发射分数;②递推:对每个时间步t的每个状态j,计算所有前驱状态i的"前一步最优分数+转移分数i→j+发射分数j",取最大值并记录回溯指针;③终止:在最后一步选分数最高的状态;④回溯:沿指针从后往前得到最优标签序列。时间复杂度从穷举的 \(O(N^T)\) 降为 \(O(T \cdot N^2)\)(T为序列长度,N为标签数)。

💻 编程题

  1. 实现一个完整的BiLSTM-CRF模型,在中文NER数据集上训练。
  2. 使用Hugging Face的BERT进行NER任务。
  3. 实现NER的评估函数,支持实体级别的P/R/F1计算。

🔬 思考题

  1. 嵌套实体(如"北京大学计算机系"中"北京大学"和"计算机系"嵌套)如何处理?

答案:传统BIO序列标注只能标注扁平实体。处理嵌套实体的方法:①多层序列标注:每层标注一种实体类型,允许同一token在不同层有不同标签;②Span-based方法:枚举所有可能文本片段,对每个span分类是否为实体,天然支持嵌套;③MRC框架:将NER转化为阅读理解问题(如"找出组织名"),预测答案span;④序列到集合:用Seq2Seq直接生成所有实体;⑤层叠CRF:先识别外层再识别内层。目前Span-based和MRC方法效果最好。


✅ 自我检查清单

Text Only
□ 我理解BIO/BIOES标注体系
□ 我能解释HMM的三个核心要素
□ 我理解CRF和HMM的区别
□ 我能手写BiLSTM-CRF的PyTorch代码
□ 我知道CRF层为什么重要
□ 我了解BERT-NER的实现方式
□ 我完成了至少3道练习题

📚 延伸阅读

  1. BiLSTM-CRF论文: Bidirectional LSTM-CRF Models for Sequence Tagging
  2. BERT-NER最佳实践
  3. 中文NER数据集汇总

下一篇06-文本生成 — 从语言模型到GPT的文本生成技术