📖 第2章:文本预处理¶
学习时间:6小时 难度星级:⭐⭐ 前置知识:Python基础、正则表达式基础 学习目标:掌握中英文文本预处理的全流程,能独立构建数据清洗Pipeline
📋 目录¶
- 1. 文本预处理概述
- 2. 中文分词
- 3. 英文分词与词形处理
- 4. 停用词处理
- 5. 正则表达式文本清洗
- 6. 文本规范化
- 7. 中文特殊处理
- 8. 完整预处理Pipeline
- 9. 面试要点
- 10. 练习题
1. 文本预处理概述¶
1.1 为什么需要文本预处理?¶
原始文本通常是"脏"的——包含HTML标签、特殊字符、拼写错误、不一致的格式等。文本预处理的目标是将原始文本转化为干净、标准、适合模型处理的格式。
# 现实世界的"脏"文本示例
dirty_texts = [
"<p>这个产品<b>超级好用</b>!!!推荐给大家👍👍👍</p>",
"今天买了iPhone15,花了5999元,感觉还行吧~\n\n\n@小红 你觉得呢?",
"YYDS!!!这剧太好看了啊啊啊啊啊\t\t#追剧日常#",
" 服务态度差的一批...再也不来了 ",
"这家diàn的niúròu miàn很好吃",
]
for text in dirty_texts:
print(f"原始: {repr(text)}")
print()
1.2 预处理流程全景¶
原始文本
│
▼
┌──────────────┐
│ 文本获取 │ 从数据库/文件/API获取
└──────────────┘
│
▼
┌──────────────┐
│ 编码处理 │ 统一为UTF-8
└──────────────┘
│
▼
┌──────────────┐
│ HTML/噪声清洗│ 去除标签、特殊字符
└──────────────┘
│
▼
┌──────────────┐
│ 文本规范化 │ 大小写、全半角、繁简体
└──────────────┘
│
▼
┌──────────────┐
│ 分词/分句 │ jieba分词/NLTK分句
└──────────────┘
│
▼
┌──────────────┐
│ 停用词过滤 │ 去除无意义的词
└──────────────┘
│
▼
┌──────────────┐
│ 词形处理 │ 词干提取/词形还原
└──────────────┘
│
▼
干净文本(可供模型使用)
2. 中文分词¶
2.1 中文分词的挑战¶
中文没有天然的空格分隔词语,这使得分词成为中文NLP的第一步,也是特有的难题。
# 分词歧义示例
ambiguous_sentences = [
"结婚的和尚未结婚的", # 和尚/未 vs 和/尚未
"研究生命的起源", # 研究生/命 vs 研究/生命
"南京市长江大桥", # 南京市长/江大桥 vs 南京市/长江大桥
"乒乓球拍卖完了", # 球拍/卖完了 vs 拍卖/完了
"下雨天留客天留我不留", # 多种断句方式
]
for sent in ambiguous_sentences:
print(f"歧义句: {sent}")
2.2 分词算法分类¶
中文分词算法
├── 基于规则的方法
│ ├── 正向最大匹配 (FMM)
│ ├── 逆向最大匹配 (BMM)
│ └── 双向最大匹配 (Bi-MM)
├── 基于统计的方法
│ ├── HMM (隐马尔可夫模型)
│ ├── CRF (条件随机场)
│ └── N-gram 语言模型
├── 基于深度学习的方法
│ ├── BiLSTM-CRF
│ ├── BERT分词
│ └── 字符级CNN
└── 混合方法
└── jieba (DAG + 动态规划 + HMM)
2.3 最大匹配算法实现¶
class MaxMatchTokenizer:
"""最大匹配分词算法"""
def __init__(self, dictionary):
self.dictionary = set(dictionary)
self.max_len = max(len(w) for w in dictionary) if dictionary else 1
def forward_max_match(self, text):
"""正向最大匹配(FMM)"""
words = []
i = 0
while i < len(text):
matched = False
for length in range(min(self.max_len, len(text) - i), 0, -1):
candidate = text[i:i + length]
if candidate in self.dictionary:
words.append(candidate)
i += length
matched = True
break
if not matched:
words.append(text[i])
i += 1
return words
def backward_max_match(self, text):
"""逆向最大匹配(BMM)"""
words = []
i = len(text)
while i > 0:
matched = False
for length in range(min(self.max_len, i), 0, -1):
candidate = text[i - length:i]
if candidate in self.dictionary:
words.insert(0, candidate)
i -= length
matched = True
break
if not matched:
words.insert(0, text[i - 1])
i -= 1
return words
def bidirectional_max_match(self, text):
"""双向最大匹配"""
fmm_result = self.forward_max_match(text)
bmm_result = self.backward_max_match(text)
# 选择词数较少的结果
if len(fmm_result) != len(bmm_result):
return fmm_result if len(fmm_result) < len(bmm_result) else bmm_result
else:
# 词数相同时,选择单字词较少的
fmm_singles = sum(1 for w in fmm_result if len(w) == 1)
bmm_singles = sum(1 for w in bmm_result if len(w) == 1)
return fmm_result if fmm_singles <= bmm_singles else bmm_result
# 建立一个简单的词典
dictionary = [
"自然语言", "语言", "处理", "自然", "自然语言处理",
"是", "人工智能", "人工", "智能", "领域", "的",
"重要", "研究", "方向", "研究生", "南京市", "长江大桥",
"南京", "市长", "江大桥",
]
tokenizer = MaxMatchTokenizer(dictionary)
texts = [
"自然语言处理是人工智能领域的重要研究方向",
"南京市长江大桥",
]
for text in texts:
print(f"\n原文: {text}")
print(f"FMM: {'/'.join(tokenizer.forward_max_match(text))}")
print(f"BMM: {'/'.join(tokenizer.backward_max_match(text))}")
print(f"Bi-MM: {'/'.join(tokenizer.bidirectional_max_match(text))}")
2.4 jieba分词详解¶
jieba是最流行的Python中文分词工具,采用DAG(有向无环图)+ 动态规划 + HMM的方法。
import jieba
import jieba.posseg as pseg
# ==================
# 1. 三种分词模式
# ==================
text = "自然语言处理是人工智能领域中最重要的研究方向之一"
# 精确模式(默认)- 适合文本分析
words_precise = jieba.lcut(text)
print(f"精确模式: {'/'.join(words_precise)}")
# 全模式 - 扫描所有可能的词
words_full = jieba.lcut(text, cut_all=True)
print(f"全模式: {'/'.join(words_full)}")
# 搜索引擎模式 - 在精确的基础上对长词再切分
words_search = jieba.lcut_for_search(text)
print(f"搜索模式: {'/'.join(words_search)}")
# ==================
# 2. 自定义词典
# ==================
# 添加自定义词
jieba.add_word("自然语言处理", freq=20000, tag="n")
jieba.add_word("人工智能", freq=20000, tag="n")
# 也可以从文件加载词典
# jieba.load_userdict("user_dict.txt")
# 文件格式: 词语 词频(可选) 词性(可选)
# 测试新词
text2 = "我在学习深度强化学习和联邦学习"
jieba.add_word("深度强化学习")
jieba.add_word("联邦学习")
print(f"\n自定义词典: {'/'.join(jieba.lcut(text2))}")
# ==================
# 3. 调节词频
# ==================
text3 = "如果放到post中将出错"
print(f"默认分词: {'/'.join(jieba.lcut(text3))}")
jieba.suggest_freq(("中", "将"), True) # 提高"中"和"将"分开的频率
print(f"调频后: {'/'.join(jieba.lcut(text3))}")
# ==================
# 4. 词性标注
# ==================
text4 = "小明毕业于北京大学计算机专业"
words_with_pos = pseg.lcut(text4)
print(f"\n词性标注:")
for word, flag in words_with_pos:
print(f" {word}/{flag}", end="")
print()
# ==================
# 5. 关键词提取
# ==================
import jieba.analyse
text5 = """
自然语言处理是计算机科学领域与人工智能领域中的一个重要方向。
它研究能实现人与计算机之间用自然语言进行有效通信的各种理论和方法。
自然语言处理是一门融语言学、计算机科学、数学于一体的科学。
因此,这一领域的研究将涉及自然语言,即人们日常使用的语言,
所以它与语言学的研究有着密切的联系。
"""
# TF-IDF关键词提取
keywords_tfidf = jieba.analyse.extract_tags(text5, topK=10, withWeight=True)
print("\nTF-IDF关键词:")
for word, weight in keywords_tfidf:
print(f" {word}: {weight:.4f}")
# TextRank关键词提取
keywords_textrank = jieba.analyse.textrank(text5, topK=10, withWeight=True)
print("\nTextRank关键词:")
for word, weight in keywords_textrank:
print(f" {word}: {weight:.4f}")
2.5 其他中文分词工具对比¶
# 各分词工具的特点对比
tokenizer_comparison = {
"jieba": {
"优点": ["安装简单", "速度快", "社区活跃", "自定义词典方便"],
"缺点": ["准确率中等", "不支持新词发现(HMM可部分解决)"],
"适用": "一般NLP任务、快速原型开发",
},
"pkuseg": {
"优点": ["准确率高", "支持多领域", "支持词性标注"],
"缺点": ["速度较慢", "安装有时有问题"],
"适用": "需要高准确率的场景",
},
"HanLP": {
"优点": ["功能全面", "多语言支持", "准确率高"],
"缺点": ["学习成本高", "依赖Java"],
"适用": "企业级NLP项目",
},
"LAC(百度)": {
"优点": ["准确率高", "词性标注准", "实体识别"],
"缺点": ["依赖PaddlePaddle"],
"适用": "百度生态项目",
},
"LTP(哈工大)": {
"优点": ["学术认可度高", "功能齐全"],
"缺点": ["速度偏慢"],
"适用": "学术研究",
},
}
for name, info in tokenizer_comparison.items():
print(f"\n📦 {name}:")
print(f" 优点: {', '.join(info['优点'])}")
print(f" 缺点: {', '.join(info['缺点'])}")
print(f" 适用: {info['适用']}")
3. 英文分词与词形处理¶
3.1 英文分词(Tokenization)¶
# 英文分词相对简单,但也有细节需要处理
# 方法1:简单split
text = "I don't think this is a good idea."
print(f"Simple split: {text.split()}")
# 问题:don't没有被正确处理
# 方法2:NLTK
import re
# 模拟NLTK的word_tokenize行为
def simple_word_tokenize(text):
"""简化版英文分词"""
# 处理缩写
text = re.sub(r"n't", " n't", text)
text = re.sub(r"'re", " 're", text)
text = re.sub(r"'s", " 's", text)
text = re.sub(r"'ve", " 've", text)
text = re.sub(r"'ll", " 'll", text)
text = re.sub(r"'m", " 'm", text)
text = re.sub(r"'d", " 'd", text)
# 分离标点
text = re.sub(r'([.,!?;:])', r' \1', text)
return text.split()
text = "I don't think it's a good idea. What're you doing?"
tokens = simple_word_tokenize(text)
print(f"Tokenized: {tokens}")
3.2 子词分词(Subword Tokenization)¶
现代NLP中,子词分词是主流方法:
# ==================
# BPE (Byte-Pair Encoding) 简化实现
# ==================
from collections import Counter, defaultdict
def get_stats(vocab):
"""统计所有相邻符号对的频率"""
pairs = defaultdict(int) # defaultdict访问不存在的键时返回默认值
for word, freq in vocab.items():
symbols = word.split()
for i in range(len(symbols) - 1):
pairs[(symbols[i], symbols[i + 1])] += freq
return pairs
def merge_vocab(pair, vocab):
"""合并最频繁的符号对"""
new_vocab = {}
bigram = ' '.join(pair)
replacement = ''.join(pair)
for word, freq in vocab.items():
new_word = word.replace(bigram, replacement)
new_vocab[new_word] = freq
return new_vocab
# 示例训练数据
corpus = ["low", "low", "low", "low", "low",
"lower", "lower",
"newest", "newest", "newest", "newest",
"newest", "newest",
"widest", "widest", "widest"]
# 初始化词汇表(每个字符加空格分隔,末尾加</w>标记)
vocab = Counter() # Counter统计元素出现次数
for word in corpus:
word_repr = ' '.join(list(word)) + ' </w>'
vocab[word_repr] += 1
print("初始词汇表:")
for word, freq in vocab.items():
print(f" {word}: {freq}")
# BPE迭代合并
num_merges = 10
print(f"\n执行 {num_merges} 次BPE合并:")
for i in range(num_merges):
pairs = get_stats(vocab)
if not pairs:
break
best_pair = max(pairs, key=pairs.get)
vocab = merge_vocab(best_pair, vocab)
print(f" Merge {i+1}: {best_pair} → {''.join(best_pair)} (频率: {pairs[best_pair]})")
print(f"\n最终词汇表:")
for word, freq in vocab.items():
print(f" {word}: {freq}")
3.3 词干提取(Stemming)¶
# Porter词干提取算法简化实现
class SimplePorterStemmer:
"""简化版Porter词干提取"""
def __init__(self):
self.suffix_rules = [
# (后缀, 替换, 最小词干长度)
("ational", "ate", 3),
("tional", "tion", 3),
("ization", "ize", 3),
("fulness", "ful", 3),
("ousness", "ous", 3),
("iveness", "ive", 3),
("ingly", "", 3),
("edly", "", 3),
("tion", "t", 3),
("ing", "", 3),
("ness", "", 3),
("ment", "", 3),
("able", "", 3),
("ible", "", 3),
("ies", "i", 2),
("ied", "i", 2),
("sses", "ss", 2),
("es", "", 3),
("ed", "", 3),
("ly", "", 3),
("s", "", 3),
]
def stem(self, word):
word = word.lower()
for suffix, replacement, min_len in self.suffix_rules:
if word.endswith(suffix):
stem = word[:-len(suffix)]
if len(stem) >= min_len:
return stem + replacement
return word
stemmer = SimplePorterStemmer()
words = ["running", "runs", "runner", "easily", "happiness",
"organization", "nationalization", "playing", "played"]
print("词干提取结果:")
for word in words:
print(f" {word} → {stemmer.stem(word)}")
3.4 词形还原(Lemmatization)¶
# 词形还原:将词还原为词典中的基本形式(词元)
# 比词干提取更准确,但速度稍慢
class SimpleLemmatizer:
"""基于词典的简化版词形还原"""
def __init__(self):
self.lemma_dict = {
# 动词
"running": "run", "ran": "run", "runs": "run",
"swimming": "swim", "swam": "swim", "swims": "swim",
"playing": "play", "played": "play", "plays": "play",
"going": "go", "went": "go", "gone": "go", "goes": "go",
"eating": "eat", "ate": "eat", "eaten": "eat",
"is": "be", "am": "be", "are": "be", "was": "be", "were": "be",
"has": "have", "had": "have", "having": "have",
# 名词
"children": "child", "mice": "mouse", "teeth": "tooth",
"men": "man", "women": "woman", "feet": "foot",
"geese": "goose", "oxen": "ox",
# 形容词
"better": "good", "best": "good",
"worse": "bad", "worst": "bad",
}
def lemmatize(self, word):
word_lower = word.lower()
return self.lemma_dict.get(word_lower, word_lower)
lemmatizer = SimpleLemmatizer()
words = ["running", "ran", "better", "children", "went", "mice"]
print("词形还原结果:")
for word in words:
print(f" {word} → {lemmatizer.lemmatize(word)}")
# 词干提取 vs 词形还原 对比
print("\n对比:词干提取 vs 词形还原")
print("-" * 40)
for word in ["running", "better", "children"]:
print(f" {word}:")
print(f" 词干提取: {stemmer.stem(word)}")
print(f" 词形还原: {lemmatizer.lemmatize(word)}")
4. 停用词处理¶
4.1 什么是停用词¶
停用词(Stop Words)是在文本中频繁出现但通常不携带太多语义信息的词,如"的"、"是"、"在"(中文),"the"、"is"、"at"(英文)。
4.2 中文停用词表¶
# 常用中文停用词(整合多个来源)
chinese_stopwords = set([
# 助词
"的", "地", "得", "了", "着", "过",
# 代词
"我", "你", "他", "她", "它", "我们", "你们", "他们",
"这", "那", "这个", "那个", "这些", "那些",
# 连词
"和", "与", "及", "或", "而", "但", "但是", "然而",
"虽然", "虽", "即使", "如果", "因为", "所以",
# 介词
"在", "从", "到", "向", "对", "以", "为", "把", "被",
# 副词
"不", "没", "没有", "很", "非常", "十分", "太",
"已", "已经", "就", "都", "也", "还", "又", "才",
# 动词
"是", "有", "可以", "能", "会", "要",
# 量词
"个", "些", "种",
# 方位词
"上", "下", "前", "后", "里", "中", "内", "外",
# 标点
",", "。", "!", "?", "、", ";", ":", """, """,
"'", "'", "(", ")", "【", "】", "《", "》",
# 其他
"等", "等等", "之", "之一", "之类", "以及",
"并", "并且", "及其", "其", "其中",
])
def remove_stopwords(words, stopwords=chinese_stopwords):
"""去除停用词"""
return [w for w in words if w not in stopwords]
# 测试
import jieba
text = "自然语言处理是人工智能领域中的一个非常重要的研究方向"
words = jieba.lcut(text)
filtered = remove_stopwords(words)
print(f"原始分词: {'/'.join(words)}")
print(f"去停用词: {'/'.join(filtered)}")
4.3 自定义停用词策略¶
def build_stopwords(base_file=None, custom_words=None, min_freq_ratio=None, corpus=None):
"""
构建停用词表的多种策略
1. 加载基础停用词表
2. 添加自定义停用词
3. 基于词频自动发现停用词
"""
stopwords = set()
# 策略1:加载基础停用词文件
if base_file:
try: # try/except捕获异常
with open(base_file, 'r', encoding='utf-8') as f: # with自动管理文件关闭
for line in f:
word = line.strip()
if word:
stopwords.add(word)
except FileNotFoundError:
print(f"文件 {base_file} 不存在")
# 策略2:添加自定义停用词
if custom_words:
stopwords.update(custom_words)
# 策略3:基于词频自动发现(文档频率超过阈值的词)
if min_freq_ratio and corpus:
from collections import Counter
word_doc_freq = Counter()
total_docs = len(corpus)
for doc in corpus:
unique_words = set(doc) if isinstance(doc, list) else set(jieba.lcut(doc)) # isinstance检查类型
for word in unique_words:
word_doc_freq[word] += 1
for word, freq in word_doc_freq.items():
if freq / total_docs > min_freq_ratio:
stopwords.add(word)
return stopwords
# 示例:基于词频发现停用词
corpus = [
"今天天气很好适合出去游玩",
"这个产品的质量很好推荐购买",
"我觉得这家餐厅的服务很好",
"这本书很好值得一读",
]
auto_stopwords = build_stopwords(
custom_words=chinese_stopwords,
min_freq_ratio=0.6,
corpus=corpus
)
print(f"自动发现的高频词(出现在>60%文档中):")
new_stops = auto_stopwords - chinese_stopwords
print(f" {new_stops}")
4.4 何时不应该去停用词¶
# 重要提示:停用词处理并非总是必要的
不应去停用词的场景 = {
"情感分析": "「不好」中的「不」很关键,去掉后语义完全改变",
"问答系统": "「什么时候」「在哪里」中的停用词是关键信息",
"机器翻译": "停用词是翻译的一部分,不能去除",
"BERT等预训练模型": "模型已经学习了停用词的语义,去除反而有害",
"序列标注": "需要保留所有token的位置信息",
}
for scenario, reason in 不应去停用词的场景.items():
print(f"❌ {scenario}: {reason}")
5. 正则表达式文本清洗¶
5.1 正则表达式基础¶
import re
# ==================
# 常用正则表达式模式
# ==================
patterns = {
"中文字符": r'[\u4e00-\u9fa5]',
"英文字母": r'[a-zA-Z]',
"数字": r'\d+',
"URL": r'https?://[^\s<>"{}|\\^`\[\]]+',
"邮箱": r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
"手机号": r'1[3-9]\d{9}',
"HTML标签": r'<[^>]+>',
"多余空白": r'\s+',
"emoji": r'[\U00010000-\U0010ffff]',
"@用户": r'@[\w\u4e00-\u9fa5]+',
"#话题#": r'#[^#]+#',
"连续标点": r'[!!??。.]{2,}',
}
# 测试文本
text = "看看这个链接 https://example.com 联系我 test@email.com 手机13812345678 @张三 #好剧推荐#"
for name, pattern in patterns.items():
matches = re.findall(pattern, text) # re.findall返回所有匹配项列表
if matches:
print(f"{name}: {matches}")
5.2 完整的文本清洗工具¶
import re
from typing import List, Optional
class TextCleaner:
"""文本清洗工具类"""
def __init__(self):
# 编译正则表达式(提高性能)
self.url_pattern = re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+')
self.email_pattern = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
self.phone_pattern = re.compile(r'1[3-9]\d{9}')
self.html_pattern = re.compile(r'<[^>]+>')
self.emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # 表情符号
"\U0001F300-\U0001F5FF" # 符号和象形文字
"\U0001F680-\U0001F6FF" # 交通和地图
"\U0001F1E0-\U0001F1FF" # 旗帜
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE
)
self.mention_pattern = re.compile(r'@[\w\u4e00-\u9fa5]+')
self.hashtag_pattern = re.compile(r'#[^#]+#')
self.whitespace_pattern = re.compile(r'\s+')
self.repeated_punct_pattern = re.compile(r'([!!??。.,,])\1+')
def remove_html(self, text: str) -> str:
"""去除HTML标签"""
return self.html_pattern.sub('', text)
def remove_urls(self, text: str) -> str:
"""去除URL"""
return self.url_pattern.sub('', text)
def remove_emails(self, text: str) -> str:
"""去除邮箱"""
return self.email_pattern.sub('', text)
def remove_phone_numbers(self, text: str) -> str:
"""去除手机号"""
return self.phone_pattern.sub('', text)
def remove_emojis(self, text: str) -> str:
"""去除emoji"""
return self.emoji_pattern.sub('', text)
def remove_mentions(self, text: str) -> str:
"""去除@用户"""
return self.mention_pattern.sub('', text)
def remove_hashtags(self, text: str) -> str:
"""去除#话题#"""
return self.hashtag_pattern.sub('', text)
def normalize_whitespace(self, text: str) -> str:
"""规范化空白字符"""
return self.whitespace_pattern.sub(' ', text).strip()
def normalize_punctuation(self, text: str) -> str:
"""规范化连续标点"""
return self.repeated_punct_pattern.sub(r'\1', text)
def remove_special_chars(self, text: str, keep_chinese=True, keep_english=True,
keep_numbers=True, keep_punctuation=False) -> str:
"""去除特殊字符,保留指定类型"""
pattern_parts = []
if keep_chinese:
pattern_parts.append(r'\u4e00-\u9fa5')
if keep_english:
pattern_parts.append(r'a-zA-Z')
if keep_numbers:
pattern_parts.append(r'0-9')
if keep_punctuation:
pattern_parts.append(r',。!?、;:""''()\[\]【】.,!?;:\'"()\s')
if pattern_parts:
pattern = f"[^{''.join(pattern_parts)}]"
return re.sub(pattern, ' ', text)
return text
def clean(self, text: str,
remove_html_tags=True,
remove_url=True,
remove_email=True,
remove_phone=True,
remove_emoji=True,
remove_mention=True,
remove_hashtag=True,
normalize_ws=True,
normalize_punct=True) -> str:
"""
完整的清洗流程
"""
if remove_html_tags:
text = self.remove_html(text)
if remove_url:
text = self.remove_urls(text)
if remove_email:
text = self.remove_emails(text)
if remove_phone:
text = self.remove_phone_numbers(text)
if remove_emoji:
text = self.remove_emojis(text)
if remove_mention:
text = self.remove_mentions(text)
if remove_hashtag:
text = self.remove_hashtags(text)
if normalize_punct:
text = self.normalize_punctuation(text)
if normalize_ws:
text = self.normalize_whitespace(text)
return text
# 测试
cleaner = TextCleaner()
dirty_texts = [
"<p>这个产品<b>超级好用</b>!!!推荐👍👍👍</p>",
"今天买了iPhone15,花了5999元 @小红 你觉得呢? #购物分享#",
"详情访问 https://example.com/product 或发邮件到 info@test.com",
" 很多 空格 和\n\n换行\t\t制表符 ",
]
print("文本清洗结果:")
print("=" * 60)
for text in dirty_texts:
cleaned = cleaner.clean(text)
print(f"原始: {repr(text)}")
print(f"清洗: {repr(cleaned)}")
print("-" * 60)
6. 文本规范化¶
6.1 大小写转换¶
def normalize_case(text, mode='lower'):
"""
大小写规范化
mode: 'lower', 'upper', 'title', 'smart'
"""
if mode == 'lower':
return text.lower()
elif mode == 'upper':
return text.upper()
elif mode == 'title':
return text.title()
elif mode == 'smart':
# 智能处理:保留缩写和专有名词
# 简化版本:只转小写
words = text.split()
result = []
abbreviations = {'NLP', 'AI', 'GPU', 'CPU', 'API', 'URL', 'HTML', 'BERT', 'GPT'}
for word in words:
if word.upper() in abbreviations:
result.append(word.upper())
else:
result.append(word.lower())
return ' '.join(result)
texts = [
"Natural Language Processing WITH BERT and GPT",
"THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG",
]
for text in texts:
print(f"原始: {text}")
print(f"小写: {normalize_case(text, 'lower')}")
print(f"智能: {normalize_case(text, 'smart')}")
print()
6.2 全角半角转换¶
def full_to_half(text):
"""全角转半角"""
result = []
for char in text:
code = ord(char)
if 0xFF01 <= code <= 0xFF5E:
# 全角ASCII字符转半角
result.append(chr(code - 0xFEE0))
elif code == 0x3000:
# 全角空格
result.append(' ')
else:
result.append(char)
return ''.join(result)
def half_to_full(text):
"""半角转全角"""
result = []
for char in text:
code = ord(char)
if 0x21 <= code <= 0x7E:
result.append(chr(code + 0xFEE0))
elif code == 0x20:
result.append('\u3000')
else:
result.append(char)
return ''.join(result)
# 测试
text = "Hello,World!123"
print(f"全角: {text}")
print(f"半角: {full_to_half(text)}")
text2 = "Hello, World! 123"
print(f"半角: {text2}")
print(f"全角: {half_to_full(text2)}")
6.3 繁简体转换¶
# pip install opencc-python-reimplemented
# 或使用简单映射
# 简化版繁简转换(实际项目推荐使用opencc)
simple_t2s_map = {
"國": "国", "學": "学", "語": "语", "處": "处", "計": "计",
"機": "机", "書": "书", "車": "车", "開": "开", "門": "门",
"個": "个", "時": "时", "電": "电", "長": "长", "東": "东",
"動": "动", "點": "点", "現": "现", "場": "场", "問": "问",
"還": "还", "進": "进", "經": "经", "發": "发", "關": "关",
"對": "对", "應": "应", "認": "认", "義": "义", "實": "实",
"產": "产", "無": "无", "從": "从", "師": "师", "數": "数",
}
def traditional_to_simplified(text):
"""繁体转简体(简化版,实际使用opencc)"""
return ''.join(simple_t2s_map.get(c, c) for c in text)
text = "自然語言處理是計算機科學的重要領域"
print(f"繁体: {text}")
print(f"简体: {traditional_to_simplified(text)}")
# 使用opencc的标准写法(需安装opencc-python-reimplemented)
# from opencc import OpenCC
# cc = OpenCC('t2s') # 繁体到简体
# simplified = cc.convert(text)
6.4 数字规范化¶
def normalize_numbers(text, mode='digit'):
"""
数字规范化
mode: 'digit' - 中文数字转阿拉伯数字
'chinese' - 阿拉伯数字转中文
'remove' - 移除所有数字
'placeholder' - 替换为占位符
"""
if mode == 'placeholder':
return re.sub(r'\d+\.?\d*', '<NUM>', text)
elif mode == 'remove':
return re.sub(r'\d+\.?\d*', '', text)
else:
return text
texts = [
"他在2024年3月15日花了299.9元买了3本书",
"公司有1500名员工,年营收超过10亿元",
]
for text in texts:
print(f"原始: {text}")
print(f"占位符: {normalize_numbers(text, 'placeholder')}")
print(f"移除: {normalize_numbers(text, 'remove')}")
print()
7. 中文特殊处理¶
7.1 中文编码处理¶
def detect_and_convert_encoding(text_bytes):
"""检测并转换文本编码"""
encodings_to_try = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5', 'utf-16']
for encoding in encodings_to_try:
try:
decoded = text_bytes.decode(encoding)
return decoded, encoding
except (UnicodeDecodeError, UnicodeError):
continue
return None, None
# 测试
text = "自然语言处理"
for encoding in ['utf-8', 'gbk', 'gb2312']:
encoded = text.encode(encoding)
print(f"{encoding}: {encoded} ({len(encoded)} bytes)")
# 实际项目中推荐使用chardet
# import chardet
# result = chardet.detect(raw_bytes)
# print(result) # {'encoding': 'utf-8', 'confidence': 0.99, 'language': ''}
7.2 中文标点处理¶
def normalize_chinese_punctuation(text, to_chinese=True):
"""中英文标点互转"""
if to_chinese:
# 英文标点转中文标点
mapping = {
',': ',', '.': '。', '!': '!', '?': '?',
';': ';', ':': ':', '(': '(', ')': ')',
'[': '【', ']': '】',
}
else:
# 中文标点转英文标点
mapping = {
',': ',', '。': '.', '!': '!', '?': '?',
';': ';', ':': ':', '(': '(', ')': ')',
'【': '[', '】': ']',
}
for old, new in mapping.items():
text = text.replace(old, new)
return text
text = "你好,世界!这是一个测试。"
print(f"中文标点: {text}")
print(f"英文标点: {normalize_chinese_punctuation(text, to_chinese=False)}")
7.3 中文分句¶
def split_sentences(text):
"""中文分句"""
# 使用句末标点进行分句
sentences = re.split(r'([。!?!?\n])', text)
result = []
for i in range(0, len(sentences) - 1, 2):
sent = sentences[i].strip()
if sent:
punct = sentences[i + 1] if i + 1 < len(sentences) else ""
result.append(sent + punct)
# 处理最后可能没有标点的句子
if len(sentences) % 2 == 1 and sentences[-1].strip(): # [-1]负索引取最后元素
result.append(sentences[-1].strip())
return result
text = "自然语言处理是一个很有前景的方向。它在搜索引擎、智能客服等场景中广泛应用!你对NLP感兴趣吗?如果感兴趣的话可以学习一下。"
sentences = split_sentences(text)
print("分句结果:")
for i, sent in enumerate(sentences): # enumerate同时获取索引和元素
print(f" [{i}] {sent}")
7.4 新词发现¶
from collections import defaultdict
import math
class NewWordDiscovery:
"""基于统计的新词发现"""
def __init__(self, min_freq=5, min_pmi=3.0, max_word_len=5):
self.min_freq = min_freq
self.min_pmi = min_pmi
self.max_word_len = max_word_len
def discover(self, corpus):
"""从语料中发现新词"""
# Step 1: 统计n-gram频率
ngram_freq = defaultdict(int)
char_freq = defaultdict(int)
total_chars = 0
for text in corpus:
total_chars += len(text)
for char in text:
char_freq[char] += 1
for n in range(2, self.max_word_len + 1):
for i in range(len(text) - n + 1):
ngram = text[i:i+n]
ngram_freq[ngram] += 1
# Step 2: 计算内部凝聚度(PMI)
candidates = {}
for ngram, freq in ngram_freq.items():
if freq < self.min_freq:
continue
if len(ngram) < 2:
continue
# 计算PMI
# 简化:只检查二分切割中最小的互信息
min_pmi = float('inf')
for split_pos in range(1, len(ngram)):
left = ngram[:split_pos]
right = ngram[split_pos:]
left_freq = ngram_freq.get(left, char_freq.get(left, 1))
right_freq = ngram_freq.get(right, char_freq.get(right, 1))
p_ngram = freq / total_chars
p_left = left_freq / total_chars
p_right = right_freq / total_chars
if p_left > 0 and p_right > 0:
pmi = math.log2(p_ngram / (p_left * p_right))
min_pmi = min(min_pmi, pmi)
if min_pmi > self.min_pmi:
candidates[ngram] = {
'freq': freq,
'pmi': min_pmi,
}
# 按频率排序
sorted_candidates = sorted(candidates.items(),
key=lambda x: x[1]['freq'], # lambda匿名函数
reverse=True)
return sorted_candidates
# 测试
corpus = [
"今天的自然语言处理课程真有意思",
"自然语言处理是人工智能的重要分支",
"我们学了自然语言处理的基础知识",
"深度学习在自然语言处理中取得了很大进展",
"机器学习和自然语言处理密切相关",
"人工智能包括计算机视觉和自然语言处理",
"自然语言处理技术在搜索引擎中广泛应用",
"很多公司都在招聘自然语言处理工程师",
] * 5 # 重复以增加频率
nwd = NewWordDiscovery(min_freq=3, min_pmi=1.0)
new_words = nwd.discover(corpus)
print("发现的新词候选:")
for word, info in new_words[:20]: # 切片操作,取前n个元素
print(f" {word}: 频率={info['freq']}, PMI={info['pmi']:.2f}")
8. 完整预处理Pipeline¶
8.1 中文文本预处理Pipeline¶
import re
import jieba
from collections import Counter
from typing import List, Dict, Optional
class ChineseTextPreprocessor:
"""中文文本预处理完整Pipeline"""
def __init__(self,
user_dict_path: Optional[str] = None,
stopwords_path: Optional[str] = None,
custom_stopwords: Optional[set] = None):
# 加载自定义词典
if user_dict_path:
jieba.load_userdict(user_dict_path)
# 加载停用词
self.stopwords = set()
if stopwords_path:
with open(stopwords_path, 'r', encoding='utf-8') as f:
self.stopwords = set(line.strip() for line in f)
if custom_stopwords:
self.stopwords.update(custom_stopwords)
# 如果没有提供停用词,使用默认集合
if not self.stopwords:
self.stopwords = {
"的", "了", "在", "是", "我", "有", "和", "就",
"不", "人", "都", "一", "一个", "上", "也", "很",
"到", "说", "要", "去", "你", "会", "着", "没有",
"看", "好", "自己", "这", "他", "她", "它",
"把", "被", "从", "对", "而", "但", "以", "之",
"给", "让", "与", "及", "个", "等", "或", "又",
}
# 编译正则表达式
self._compile_patterns()
def _compile_patterns(self):
"""编译常用正则表达式"""
self.url_re = re.compile(r'https?://\S+')
self.email_re = re.compile(r'\S+@\S+\.\S+')
self.phone_re = re.compile(r'1[3-9]\d{9}')
self.html_re = re.compile(r'<[^>]+>')
self.emoji_re = re.compile(
"["
"\U0001F600-\U0001F64F"
"\U0001F300-\U0001F5FF"
"\U0001F680-\U0001F6FF"
"\U0001F1E0-\U0001F1FF"
"]+", flags=re.UNICODE
)
self.whitespace_re = re.compile(r'\s+')
self.repeated_punct_re = re.compile(r'([!!??。.,,])\1{2,}')
def clean(self, text: str) -> str:
"""文本清洗"""
# 1. HTML标签
text = self.html_re.sub('', text)
# 2. URL
text = self.url_re.sub('', text)
# 3. 邮箱
text = self.email_re.sub('', text)
# 4. 手机号
text = self.phone_re.sub('', text)
# 5. Emoji
text = self.emoji_re.sub('', text)
# 6. 全角转半角
text = self._full_to_half(text)
# 7. 连续标点规范化
text = self.repeated_punct_re.sub(r'\1', text)
# 8. 空白规范化
text = self.whitespace_re.sub(' ', text).strip()
return text
def _full_to_half(self, text: str) -> str:
"""全角转半角"""
result = []
for char in text:
code = ord(char)
if 0xFF01 <= code <= 0xFF5E:
result.append(chr(code - 0xFEE0))
elif code == 0x3000:
result.append(' ')
else:
result.append(char)
return ''.join(result)
def tokenize(self, text: str, mode: str = 'precise') -> List[str]:
"""分词"""
if mode == 'precise':
return jieba.lcut(text)
elif mode == 'search':
return jieba.lcut_for_search(text)
elif mode == 'full':
return jieba.lcut(text, cut_all=True)
else:
return jieba.lcut(text)
def remove_stopwords(self, words: List[str]) -> List[str]:
"""去停用词"""
return [w for w in words if w not in self.stopwords and w.strip()]
def process(self, text: str,
do_clean: bool = True,
do_tokenize: bool = True,
do_remove_stopwords: bool = True,
tokenize_mode: str = 'precise',
return_string: bool = False) -> object:
"""
完整预处理流程
Returns:
如果不分词:返回清洗后的字符串
如果分词但不去停用词:返回词列表
如果分词且去停用词:返回词列表
如果return_string:返回空格连接的字符串
"""
# Step 1: 清洗
if do_clean:
text = self.clean(text)
# 如果不需要分词,直接返回清洗后的文本
if not do_tokenize:
return text
# Step 2: 分词
words = self.tokenize(text, mode=tokenize_mode)
# Step 3: 去停用词
if do_remove_stopwords:
words = self.remove_stopwords(words)
# 返回格式
if return_string:
return ' '.join(words)
return words
def batch_process(self, texts: List[str], **kwargs) -> List: # *args接收任意位置参数,**kwargs接收任意关键字参数
"""批量处理"""
return [self.process(text, **kwargs) for text in texts]
# ==================
# 使用示例
# ==================
preprocessor = ChineseTextPreprocessor()
# 单条处理
text = "<p>这个产品超级好用!!!推荐给大家👍 访问 https://example.com 了解详情</p>"
result = preprocessor.process(text)
print(f"原始: {text}")
print(f"处理: {result}")
print()
# 批量处理
texts = [
"今天天气真好!!!适合出去游玩 @小红",
"<b>自然语言处理</b>是人工智能的重要方向",
"这部电影太好看了,强烈推荐!!!!!",
]
results = preprocessor.batch_process(texts)
for original, processed in zip(texts, results): # zip按位置配对
print(f"原始: {original}")
print(f"处理: {processed}")
print("-" * 40)
8.2 英文文本预处理Pipeline¶
import re
from typing import List
class EnglishTextPreprocessor:
"""英文文本预处理Pipeline"""
def __init__(self):
self.stopwords = set([
"i", "me", "my", "myself", "we", "our", "ours", "ourselves",
"you", "your", "yours", "yourself", "yourselves",
"he", "him", "his", "himself", "she", "her", "hers",
"it", "its", "itself", "they", "them", "their",
"what", "which", "who", "whom", "this", "that",
"am", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "having", "do", "does", "did",
"a", "an", "the", "and", "but", "if", "or", "as",
"of", "at", "by", "for", "with", "about", "between",
"to", "from", "in", "on", "up", "out", "off",
"not", "no", "nor", "so", "too", "very",
"can", "will", "just", "should",
])
self.contraction_map = {
"ain't": "am not", "aren't": "are not",
"can't": "cannot", "couldn't": "could not",
"didn't": "did not", "doesn't": "does not",
"don't": "do not", "hadn't": "had not",
"hasn't": "has not", "haven't": "have not",
"isn't": "is not", "shouldn't": "should not",
"wasn't": "was not", "weren't": "were not",
"won't": "will not", "wouldn't": "would not",
"i'm": "i am", "you're": "you are",
"he's": "he is", "she's": "she is",
"it's": "it is", "we're": "we are",
"they're": "they are", "i've": "i have",
"you've": "you have", "we've": "we have",
"they've": "they have", "i'd": "i would",
"you'd": "you would", "he'd": "he would",
"she'd": "she would", "we'd": "we would",
"they'd": "they would", "i'll": "i will",
"you'll": "you will", "he'll": "he will",
"she'll": "she will", "we'll": "we will",
"they'll": "they will",
}
def to_lowercase(self, text: str) -> str:
return text.lower()
def expand_contractions(self, text: str) -> str:
"""展开缩写"""
for contraction, expansion in self.contraction_map.items():
text = re.sub(re.escape(contraction), expansion, text, flags=re.IGNORECASE)
return text
def remove_urls(self, text: str) -> str:
return re.sub(r'https?://\S+', '', text)
def remove_special_chars(self, text: str) -> str:
return re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
def tokenize(self, text: str) -> List[str]:
return text.split()
def remove_stopwords(self, words: List[str]) -> List[str]:
return [w for w in words if w.lower() not in self.stopwords]
def process(self, text: str) -> List[str]:
text = self.to_lowercase(text)
text = self.expand_contractions(text)
text = self.remove_urls(text)
text = self.remove_special_chars(text)
text = re.sub(r'\s+', ' ', text).strip()
words = self.tokenize(text)
words = self.remove_stopwords(words)
return words
# 测试
en_preprocessor = EnglishTextPreprocessor()
text = "I don't think this product is good! Check https://example.com for more info. It's really terrible."
result = en_preprocessor.process(text)
print(f"原始: {text}")
print(f"处理: {result}")
9. 面试要点¶
🔑 面试高频考点
考点1:中文分词的方法有哪些?各有什么优缺点?¶
✅ 标准答案要点:
1. 基于词典的方法(最大匹配):
- 优点: 实现简单,速度快
- 缺点: 依赖词典质量,无法处理未登录词
2. 基于统计的方法(HMM/CRF):
- 优点: 可以处理未登录词
- 缺点: 需要标注数据训练
3. 基于深度学习的方法(BiLSTM-CRF/BERT):
- 优点: 效果最好,能处理歧义
- 缺点: 计算量大,需要GPU
4. 混合方法(jieba):
- 使用DAG+动态规划做已知词切分
- 使用HMM处理未登录词
- 兼顾了速度和效果
考点2:jieba的原理是什么?¶
✅ 标准答案要点:
1. 基于前缀词典实现高效的词图扫描(DAG)
2. 对DAG使用动态规划查找最大概率路径
3. 对未登录词使用HMM模型进行识别(基于字的标注)
4. 支持全模式、精确模式和搜索引擎模式
关键概念:
- DAG: 有向无环图,记录所有可能的分词方式
- 动态规划: 选择概率最大的分词路径
- HMM: B/M/E/S四标签,处理新词
考点3:词干提取和词形还原的区别?¶
✅ 标准答案要点:
- 词干提取(Stemming): 基于规则删除后缀,结果可能不是有效单词
例: running → run, studies → studi
- 词形还原(Lemmatization): 基于词典还原为标准形式,结果是有效单词
例: running → run, better → good, mice → mouse
- 词形还原更准确但更慢,词干提取更快但可能产生无效词
- 现代NLP(BERT等)使用子词分词(BPE/WordPiece),不需要这些处理
考点4:BPE子词分词的原理?¶
✅ 标准答案要点:
1. 初始化:将所有字符作为初始词汇表
2. 统计:统计所有相邻符号对的出现频率
3. 合并:将最高频的符号对合并为新符号
4. 重复:重复步骤2-3直到达到目标词汇表大小
优点:
- 自动平衡词的粒度(常见词保持完整,罕见词拆分)
- 有效处理OOV(Out-Of-Vocabulary)问题
- BERT使用的WordPiece和GPT使用的BPE是同族算法
考点5:文本预处理的一般步骤?¶
✅ 标准答案要点:
1. 编码统一(UTF-8)
2. HTML/噪声清洗(去除标签、特殊字符)
3. 文本规范化(大小写、全半角、繁简体)
4. 分词/分句
5. 停用词过滤
6. 词形处理(词干提取/词形还原)
注意点:
- 不是所有任务都需要完整的预处理步骤
- BERT等预训练模型通常只需要简单清洗+子词分词
- 情感分析不应去掉否定词
- 预处理步骤和顺序应根据具体任务调整
10. 练习题¶
📝 基础题¶
- 使用jieba对以下文本进行分词,并统计词频Top20:
text = """
近年来,随着深度学习技术的快速发展,自然语言处理领域取得了显著进步。
特别是Transformer架构的提出和BERT等预训练语言模型的出现,
彻底改变了NLP的研究范式。从传统的特征工程方法到端到端的深度学习方法,
NLP技术经历了革命性的变化。如今,以ChatGPT为代表的大语言模型
更是将NLP推向了新的高度,让人们看到了通用人工智能的曙光。
"""
# 你的代码
- 实现一个完整的中文文本清洗函数,能处理:HTML标签、URL、邮箱、emoji、多余空白、连续标点。
💻 编程题¶
-
实现逆向最大匹配(BMM)算法,并与正向最大匹配(FMM)的结果进行比较。
-
实现一个简单的BPE子词分词算法,在给定语料上训练后,能对新文本进行分词。
-
构建一个TextPreprocessor类,支持以下可配置的预处理步骤:
- 文本清洗(可选)
- 分词方式选择(jieba精确/搜索/字符级)
- 停用词过滤(可选,支持自定义停用词表)
- 最小/最大词长过滤
- 词频过滤
🔬 思考题¶
- 在BERT时代,传统的文本预处理(如去停用词、词干提取)还有多大意义?请分析。
答案:意义降低但未完全消失。①BERT等预训练模型使用子词分词(WordPiece/BPE),传统的分词、词干提取不再是必须步骤;②去停用词可能反而有害,因为BERT依赖完整上下文理解语义(如"不好"中的"不"不能去掉);③但文本清洗(去HTML标签、处理编码、去除噪声)仍然必要;④在传统ML管道(TF-IDF + SVM等)中,停用词过滤和词干提取仍然有效。总结:预处理从"特征工程"转向"数据清洗"。
- 如果要处理社交媒体文本(包含大量emoji、网络用语、缩写),预处理策略应该如何调整?
答案:①Emoji处理:不要简单删除,应转换为文本描述(如😊→"开心表情"),因为emoji包含情感信息;②网络用语归一化:建立词典映射(如"yyds"→"永远的神");③缩写展开:将常见缩写还原;④@/# 标签处理:保留或提取为特征而非删除;⑤重复字符处理:如"太好吃吃吃吃了"归一化为"太好吃了";⑥自定义分词词典:将网络热词加入jieba词典;⑦放宽清洗规则:不要过度清洗,保留社交媒体特有的表达方式。
✅ 自我检查清单¶
□ 我能用jieba完成中文分词并处理自定义词典
□ 我理解最大匹配算法的原理并能手写代码
□ 我知道jieba的三种分词模式及其区别
□ 我能实现完整的文本清洗Pipeline
□ 我理解BPE子词分词的原理
□ 我知道什么时候应该/不应该去停用词
□ 我能处理全角半角、繁简体转换
□ 我完成了至少4道练习题
📚 延伸阅读¶
- jieba分词GitHub
- BPE原始论文: Neural Machine Translation of Rare Words with Subword Units
- 中文分词入门之资源
- SentencePiece: 无监督分词工具
- Hugging Face Tokenizers库
下一篇:03-文本表示方法 — 如何将文本转化为计算机能理解的数学表示