AI/ML中的算法应用 - 完全详解¶
重要性:⭐⭐⭐⭐⭐ 难度:⭐⭐⭐⭐ 学习时间:4-6天 前置知识:基础数据结构、动态规划、图算法
📚 目录¶
AI/ML中的算法概述¶
为什么算法对AI/ML很重要?¶
算法是AI/ML的基础: - 训练前:数据预处理需要排序、搜索、哈希 - 训练中:优化算法使用梯度下降、动态规划 - 推理时:搜索算法用于路径规划、解码 - 系统设计:数据结构影响模型性能
AI/ML中的算法分类¶
算法应用层级:
┌─────────────────────────────────────┐
│ 应用层:推荐系统、NLP、CV、RL │
├─────────────────────────────────────┤
│ 算法层:搜索、优化、图、DP │
├─────────────────────────────────────┤
│ 数据层:数组、哈希、树、堆 │
└─────────────────────────────────────┘
常见算法应用场景¶
| AI/ML任务 | 使用的算法 | 时间复杂度 |
|---|---|---|
| 数据预处理 | 排序、去重、哈希 | O(n log n) |
| 特征工程 | 二分查找、滑动窗口 | O(n), O(log n) |
| 序列标注 | 动态规划(Viterbi) | O(n²) |
| 注意力机制 | 矩阵运算、Top-K搜索 | O(n²), O(n log n) |
| 图神经网络 | BFS/DFS、最短路径 | O(V+E) |
| 强化学习 | MCTS、搜索算法 | O(b^d) |
数据预处理中的算法¶
1. 排序算法在特征工程中的应用¶
应用场景:特征重要性排序¶
问题:有1000个特征,需要找出最重要的top-K个特征。
import numpy as np
from collections import defaultdict
def select_top_k_features(X, y, k=10):
"""
选择重要性最高的K个特征
方法:基于方差的特征选择
时间复杂度:O(n + m log m),m是特征数
空间复杂度:O(m)
"""
# 计算每个特征的方差
feature_variances = np.var(X, axis=0)
# 获取top-k特征的索引(使用堆排序优化)
# 方法1:完全排序 O(m log m)
top_k_indices = np.argsort(feature_variances)[-k:]
# 方法2:使用堆 O(m + k log m) - 更适合m很大时
import heapq
top_k_indices_heap = heapq.nlargest(k,
range(len(feature_variances)),
feature_variances.__getitem__)
return top_k_indices_heap
# 示例
X = np.random.rand(1000, 100) # 1000个样本,100个特征
y = np.random.randint(0, 2, 1000)
top_features = select_top_k_features(X, y, k=10)
print(f"Top-10特征索引: {top_features}")
算法对比: - 完全排序:O(m log m),适合需要全部排序 - 堆排序(Top-K):O(m + k log m),适合k << m - 快速选择:O(m)平均,适合只需要中位数或特定分位数
应用场景:数据清洗(异常值检测)¶
def detect_outliers_sorted(data, threshold=3):
"""
使用排序检测异常值
方法:基于IQR(四分位距)
时间复杂度:O(n log n)
"""
# 排序数据
sorted_data = np.sort(data)
n = len(sorted_data)
# 计算四分位数(二分查找优化)
def find_percentile(sorted_arr, percentile):
"""使用二分查找找百分位数"""
index = int(n * percentile / 100)
return sorted_arr[index]
Q1 = find_percentile(sorted_data, 25)
Q3 = find_percentile(sorted_data, 75)
IQR = Q3 - Q1
# 定义异常值边界
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
# 找出异常值
outliers = [x for x in data if x < lower_bound or x > upper_bound]
return outliers
# 示例
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100] # 100是异常值
outliers = detect_outliers_sorted(data)
print(f"异常值: {outliers}") # [100]
2. 哈希表在数据处理中的应用¶
应用场景:数据去重¶
def remove_duplicates_hash(data):
"""
使用哈希表去重
时间复杂度:O(n)
空间复杂度:O(n)
"""
seen = set()
unique_data = []
for item in data:
if item not in seen:
seen.add(item)
unique_data.append(item)
return unique_data
# NLP应用:去重文档
documents = [
"This is a document.",
"Another document.",
"This is a document.", # 重复
"Third document."
]
unique_docs = remove_duplicates_hash(documents)
print(f"去重后文档数: {len(unique_docs)}") # 3
应用场景:词汇表构建¶
def build_vocabulary(corpus):
"""
构建词汇表(NLP基础操作)
时间复杂度:O(n),n是所有文档的总词数
空间复杂度:O(v),v是词汇量
"""
word_to_id = {}
id_to_word = {}
word_freq = defaultdict(int) # defaultdict带默认值的字典,避免KeyError
# 第一遍扫描:统计词频
for doc in corpus:
words = doc.lower().split()
for word in words:
word_freq[word] += 1
# 第二遍扫描:构建词汇表(按词频排序)
sorted_words = sorted(word_freq.items(),
key=lambda x: x[1], # lambda匿名函数:简洁的单行函数
reverse=True)
for idx, (word, freq) in enumerate(sorted_words): # enumerate同时获取索引和值
word_to_id[word] = idx
id_to_word[idx] = word
return word_to_id, id_to_word, word_freq
# 示例
corpus = [
"the cat sat on the mat",
"the dog sat on the log",
"cats and dogs are pets"
]
word_to_id, id_to_word, word_freq = build_vocabulary(corpus)
print(f"词汇表大小: {len(word_to_id)}")
print(f"词频最高的5个词: {sorted(word_freq.items(), key=lambda x: -x[1])[:5]}")
应用场景:特征哈希(降维技术)¶
``python def feature_hashing(features, dim=1000): """ 特征哈希(哈希技巧) 用于处理高维稀疏特征(如文本) 时间复杂度:O(n) 空间复杂度:O(dim),dim是哈希表维度 """ hashed = np.zeros(dim)
for feature in features:
# 使用哈希函数将特征映射到固定维度
idx = hash(feature) % dim
hashed[idx] += 1
return hashed
文本分类应用¶
text = "machine learning is awesome" words = text.split() hashed_features = feature_hashing(words, dim=1000) print(f"哈希特征向量: {hashed_features}")
### 3. 滑动窗口在时序数据中的应用
#### 应用场景:时间序列平滑
```python
def moving_average(data, window_size):
"""
移动平均(时间序列平滑)
时间复杂度:O(n)
空间复杂度:O(n)
"""
smoothed = []
for i in range(len(data)):
# 窗口范围
start = max(0, i - window_size // 2)
end = min(len(data), i + window_size // 2 + 1)
# 计算窗口内平均值
window = data[start:end]
avg = sum(window) / len(window)
smoothed.append(avg)
return smoothed
# 示例:传感器数据平滑
sensor_data = [10, 12, 13, 11, 10, 9, 10, 11, 12, 13]
smoothed = moving_average(sensor_data, window_size=3)
print(f"原始数据: {sensor_data}")
print(f"平滑后: {[round(x, 2) for x in smoothed]}")
应用场景:文本的N-gram提取¶
def extract_ngrams(text, n=3):
"""
提取文本的n-gram(滑动窗口应用)
时间复杂度:O(L),L是文本长度
空间复杂度:O(L)
"""
words = text.split()
ngrams = []
# 滑动窗口
for i in range(len(words) - n + 1):
ngram = ' '.join(words[i:i+n])
ngrams.append(ngram)
return ngrams
# 示例
text = "natural language processing is fun"
trigrams = extract_ngrams(text, n=3)
print(f"三元组: {trigrams}")
# ['natural language processing', 'language processing is', 'processing is fun']
应用场景:最大池化(CNN)¶
def max_pooling(feature_map, pool_size=2, stride=2):
"""
最大池化(CNN中的降采样操作)
使用滑动窗口实现
时间复杂度:O(HW),H和W是特征图的高和宽
"""
h, w = feature_map.shape
pooled_h = (h - pool_size) // stride + 1
pooled_w = (w - pool_size) // stride + 1
pooled = np.zeros((pooled_h, pooled_w))
# 滑动窗口
for i in range(0, h - pool_size + 1, stride):
for j in range(0, w - pool_size + 1, stride):
# 提取窗口
window = feature_map[i:i+pool_size, j:j+pool_size]
# 最大池化
pooled[i//stride, j//stride] = np.max(window)
return pooled
# 示例
feature_map = np.array([
[1, 3, 2, 4],
[5, 6, 7, 8],
[9, 10, 11, 12],
[13, 14, 15, 16]
])
pooled = max_pooling(feature_map, pool_size=2, stride=2)
print(f"池化结果:\n{pooled}")
4. 搜索算法在数据查找中的应用¶
应用场景:二分查找找阈值¶
def find_best_threshold(y_true, y_scores):
"""
使用二分查找找到最佳分类阈值
目标:最大化F1分数
时间复杂度:O(n log m),n是样本数,m是候选阈值数
"""
# 排序所有可能的预测分数
sorted_scores = np.sort(y_scores)
def compute_f1(threshold):
"""计算给定阈值的F1分数"""
y_pred = (y_scores >= threshold).astype(int)
tp = np.sum((y_pred == 1) & (y_true == 1))
fp = np.sum((y_pred == 1) & (y_true == 0))
fn = np.sum((y_pred == 0) & (y_true == 1))
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
if precision + recall == 0:
return 0
return 2 * precision * recall / (precision + recall)
# 二分查找最佳阈值
left, right = 0, len(sorted_scores) - 1
best_threshold = sorted_scores[0]
best_f1 = 0
while left <= right:
mid = (left + right) // 2
threshold = sorted_scores[mid]
f1 = compute_f1(threshold)
if f1 > best_f1:
best_f1 = f1
best_threshold = threshold
# 根据F1分数调整搜索范围
# 这里简化处理,实际可能需要更复杂的策略
if f1 < 0.5:
right = mid - 1
else:
left = mid + 1
return best_threshold, best_f1
# 示例
y_true = np.array([0, 1, 1, 0, 1])
y_scores = np.array([0.3, 0.7, 0.8, 0.4, 0.6])
threshold, f1 = find_best_threshold(y_true, y_scores)
print(f"最佳阈值: {threshold:.2f}, F1分数: {f1:.2f}")
动态规划在序列建模中的应用¶
1. 序列标注问题¶
Viterbi算法:HMM中的解码¶
问题:给定观测序列,找出最可能的隐含状态序列。
def viterbi(observations, states, start_p, trans_p, emit_p):
"""
Viterbi算法:寻找最可能的隐含状态序列
时间复杂度:O(T * S²),T是观测序列长度,S是状态数
空间复杂度:O(T * S)
参数:
- observations: 观测序列
- states: 状态列表
- start_p: 初始概率
- trans_p: 转移概率矩阵
- emit_p: 发射概率矩阵
"""
T = len(observations)
S = len(states)
# V[i][s]:时刻t到达状态s的最大概率
V = np.zeros((T, S))
# path[i][s]:时刻t到达状态s的路径
path = np.zeros((T, S), dtype=int)
# 初始化
for s in range(S):
V[0][s] = start_p[s] * emit_p[s][observations[0]]
path[0][s] = s
# 递推
for t in range(1, T):
for s in range(S):
# 找到前一时刻的最佳状态
max_prob = 0
best_prev_s = 0
for prev_s in range(S):
prob = V[t-1][prev_s] * trans_p[prev_s][s] * emit_p[s][observations[t]]
if prob > max_prob:
max_prob = prob
best_prev_s = prev_s
V[t][s] = max_prob
path[t][s] = best_prev_s
# 回溯找最优路径
best_path = []
best_last_state = np.argmax(V[T-1])
best_path.append(states[best_last_state])
for t in range(T-1, 0, -1):
best_last_state = path[t][best_last_state]
best_path.append(states[best_last_state])
return best_path[::-1] # 反转
# 示例:词性标注
states = ['名词', '动词']
observations = ['我', '吃', '苹果'] # 简化:用词本身代替观测
start_p = [0.6, 0.4] # 初始概率
trans_p = [[0.7, 0.3], [0.4, 0.6]] # 转移概率
emit_p = [[0.9, 0.1, 0.3], [0.1, 0.9, 0.7]] # 发射概率
best_path = viterbi(observations, states, start_p, trans_p, emit_p)
print(f"最可能的词性序列: {best_path}")
Viterbi vs 普通递归: - 递归:O(S^T),指数级爆炸 - Viterbi(DP):O(T * S²),多项式时间
2. 编辑距离算法在NLP中的应用¶
Levenshtein距离:拼写纠错¶
def levenshtein_distance(word1, word2):
"""
计算编辑距离(Levenshtein距离)
时间复杂度:O(m * n),m和n是两个词的长度
空间复杂度:O(m * n)
应用:拼写纠错、模糊匹配
"""
m, n = len(word1), len(word2)
# dp[i][j]:word1[:i]和word2[:j]的编辑距离
dp = np.zeros((m+1, n+1))
# 初始化边界条件
for i in range(m+1):
dp[i][0] = i # 删除i个字符
for j in range(n+1):
dp[0][j] = j # 插入j个字符
# 填充DP表
for i in range(1, m+1):
for j in range(1, n+1):
if word1[i-1] == word2[j-1]:
dp[i][j] = dp[i-1][j-1] # 字符相同,无需操作
else:
dp[i][j] = 1 + min(
dp[i-1][j], # 删除
dp[i][j-1], # 插入
dp[i-1][j-1] # 替换
)
return dp[m][n]
# 示例:拼写纠错
correct_words = ["apple", "banana", "orange", "grape"]
typo = "aple"
best_match = None
min_dist = float('inf')
for word in correct_words:
dist = levenshtein_distance(typo, word)
if dist < min_dist:
min_dist = dist
best_match = word
print(f"输入: {typo}")
print(f"纠正: {best_match} (编辑距离: {min_dist})")
空间优化版本:
def levenshtein_distance_optimized(word1, word2):
"""
空间优化的编辑距离算法
时间复杂度:O(m * n)
空间复杂度:O(min(m, n))
"""
if len(word1) < len(word2):
word1, word2 = word2, word1
m, n = len(word1), len(word2)
# 只保留两行
prev = np.arange(n + 1)
curr = np.zeros(n + 1)
for i in range(1, m + 1):
curr[0] = i
for j in range(1, n + 1):
if word1[i-1] == word2[j-1]:
curr[j] = prev[j-1]
else:
curr[j] = 1 + min(prev[j], curr[j-1], prev[j-1])
prev, curr = curr, prev
return prev[n]
3. 最长公共子序列(LCS)¶
应用:DNA序列比对、文档相似度¶
def longest_common_subsequence(text1, text2):
"""
最长公共子序列
时间复杂度:O(m * n)
空间复杂度:O(m * n)
应用:版本控制、DNA比对、抄袭检测
"""
m, n = len(text1), len(text2)
# dp[i][j]:text1[:i]和text2[:j]的最长公共子序列长度
dp = np.zeros((m+1, n+1))
for i in range(1, m+1):
for j in range(1, n+1):
if text1[i-1] == text2[j-1]:
dp[i][j] = dp[i-1][j-1] + 1
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
# 回溯找LCS
lcs = []
i, j = m, n
while i > 0 and j > 0:
if text1[i-1] == text2[j-1]:
lcs.append(text1[i-1])
i -= 1
j -= 1
elif dp[i-1][j] > dp[i][j-1]:
i -= 1
else:
j -= 1
return lcs[::-1]
# 示例:代码相似度检测
code1 = "def add(a, b): return a + b"
code2 = "def add(x, y): return x + y"
lcs = longest_common_subsequence(code1, code2)
similarity = len(lcs) / (max(len(code1), len(code2)))
print(f"公共子序列: {''.join(lcs)}")
print(f"相似度: {similarity:.2%}")
图算法在深度学习中的应用¶
1. 图神经网络(GNN)基础¶
消息传递:BFS的应用¶
def message_passing_gnn(adj_matrix, node_features, num_layers=2):
"""
简化的图神经网络(消息传递)
使用BFS思想聚合邻居信息
时间复杂度:O(L * (V + E)),L是层数,V是节点数,E是边数
空间复杂度:O(V * F),F是特征维度
"""
num_nodes = adj_matrix.shape[0]
feature_dim = node_features.shape[1]
# 初始化隐藏状态
hidden_states = node_features.copy()
for layer in range(num_layers):
new_states = np.zeros_like(hidden_states)
# 对每个节点聚合邻居信息
for node in range(num_nodes):
# 找到所有邻居(包括自己)
neighbors = np.where(adj_matrix[node] == 1)[0]
# 聚合:求和(也可以用平均、最大值等)
aggregated = np.sum(hidden_states[neighbors], axis=0)
# 非线性激活(简化版,实际会用神经网络)
new_states[node] = np.maximum(0, aggregated) # ReLU
hidden_states = new_states
return hidden_states
# 示例:引文网络
# 论文之间的引用关系
adj_matrix = np.array([
[0, 1, 1, 0], # 论文0被1、2引用
[0, 0, 0, 1], # 论文1被3引用
[0, 0, 0, 1], # 论文2被3引用
[0, 0, 0, 0] # 论文3没有被引用
])
# 初始特征(词袋向量)
node_features = np.array([
[1, 0, 1], # 论文0的特征
[0, 1, 0], # 论文1的特征
[1, 1, 0], # 论文2的特征
[0, 0, 1] # 论文3的特征
])
final_features = message_passing_gnn(adj_matrix, node_features, num_layers=2)
print(f"最终节点特征:\n{final_features}")
2. 最短路径在强化学习中的应用¶
MCTS(蒙特卡洛树搜索)¶
import math
import random
class MCTSNode:
"""蒙特卡洛树搜索节点"""
def __init__(self, state, parent=None):
self.state = state
self.parent = parent
self.children = []
self.visits = 0
self.value = 0
self.untried_actions = self.get_untried_actions()
def get_untried_actions(self):
"""获取未尝试的动作"""
# 简化:返回所有可能的动作
return list(range(4)) # 假设有4个动作
def uct_value(self, c=1.414):
"""计算UCT值(用于选择节点)"""
if self.visits == 0:
return float('inf')
return (self.value / self.visits) + c * math.sqrt(math.log(self.parent.visits) / self.visits)
def mcts(root_state, num_simulations=1000):
"""
蒙特卡洛树搜索
时间复杂度:O(N * D),N是模拟次数,D是搜索深度
空间复杂度:O(B^D),B是分支因子
应用:AlphaGo、游戏AI、规划
"""
root = MCTSNode(root_state)
for _ in range(num_simulations):
# 1. 选择(使用树策略)
node = root
while node.untried_actions == [] and node.children:
node = max(node.children, key=lambda n: n.uct_value())
# 2. 扩展
if node.untried_actions:
action = random.choice(node.untried_actions)
node.untried_actions.remove(action)
new_state = node.state # 简化:实际需要应用动作
node = MCTSNode(new_state, parent=node)
node.parent.children.append(node)
# 3. 模拟(随机策略)
reward = simulate(node.state)
# 4. 回溯
while node:
node.visits += 1
node.value += reward
node = node.parent
# 返回最优动作
best_child = max(root.children, key=lambda n: n.visits)
return best_child
def simulate(state):
"""随机模拟(简化版)"""
return random.random() # 返回0-1之间的随机奖励
3. 拓扑排序在任务调度中的应用¶
训练流水线调度¶
from collections import deque
def topological_sort_tasks(tasks, dependencies):
"""
拓扑排序:确定任务执行顺序
时间复杂度:O(V + E)
空间复杂度:O(V + E)
应用:ML流水线调度、数据预处理流程
"""
# 构建图
in_degree = {task: 0 for task in tasks}
graph = {task: [] for task in tasks}
for task, dep in dependencies:
graph[dep].append(task)
in_degree[task] += 1
# 找到所有入度为0的节点
queue = deque([task for task in tasks if in_degree[task] == 0])
result = []
while queue:
task = queue.popleft()
result.append(task)
# 移除该节点的所有出边
for neighbor in graph[task]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) != len(tasks):
raise ValueError("存在循环依赖!")
return result
# 示例:ML训练流水线
tasks = ["数据收集", "数据清洗", "特征工程", "模型训练", "模型评估", "模型部署"]
dependencies = [
("数据清洗", "数据收集"), # 清洗依赖收集
("特征工程", "数据清洗"), # 特征依赖清洗
("模型训练", "特征工程"), # 训练依赖特征
("模型评估", "模型训练"), # 评估依赖训练
("模型部署", "模型评估") # 部署依赖评估
]
execution_order = topological_sort_tasks(tasks, dependencies)
print(f"任务执行顺序: {' -> '.join(execution_order)}")
优化算法¶
1. 梯度下降:搜索算法的应用¶
一维搜索(线搜索)¶
def line_search(f, df, x, direction, alpha=0.3, beta=0.8):
"""
回溯线搜索(Backtracking Line Search)
时间复杂度:O(log(1/ε)),ε是精度
空间复杂度:O(1)
应用:梯度下降中的步长选择
"""
step_size = 1.0
while f(x + step_size * direction) > f(x) + alpha * step_size * df(x) * direction:
step_size *= beta
return step_size
# 示例:优化二次函数
def f(x):
"""目标函数:f(x) = x²"""
return x ** 2
def df(x):
"""导数:f'(x) = 2x"""
return 2 * x
x = 10 # 初始点
for i in range(10):
direction = -df(x) # 负梯度方向
step = line_search(f, df, x, direction)
x = x + step * direction
print(f"迭代{i+1}: x = {x:.4f}, f(x) = {f(x):.4f}")
2. 动态规划在强化学习中的应用¶
价值迭代(Value Iteration)¶
def value_iteration(states, actions, transitions, rewards, gamma=0.9, theta=1e-6):
"""
价值迭代算法
时间复杂度:O(S² * A),S是状态数,A是动作数
空间复杂度:O(S)
应用:马尔可夫决策过程(MDP)求解
"""
V = {s: 0 for s in states}
while True:
delta = 0
for s in states:
v = V[s]
# Bellman最优方程
max_value = float('-inf')
for a in actions:
expected_value = 0
for s_prime in states:
prob = transitions.get((s, a, s_prime), 0)
reward = rewards.get((s, a, s_prime), 0)
expected_value += prob * (reward + gamma * V[s_prime])
max_value = max(max_value, expected_value)
V[s] = max_value
delta = max(delta, abs(v - V[s]))
# 收敛检查
if delta < theta:
break
return V
# 示例:简单的网格世界
states = [(0, 0), (0, 1), (1, 0), (1, 1)]
actions = ['up', 'down', 'left', 'right']
# 简化的转移概率(确定性)
transitions = {}
rewards = {}
V = value_iteration(states, actions, transitions, rewards)
print(f"最优价值函数: {V}")
实战案例¶
案例1:构建推荐系统¶
协同过滤中的相似度计算¶
def collaborative_filtering(user_item_matrix, k=5):
"""
协同过滤推荐(基于用户)
时间复杂度:O(U² * I + U * I * log I)
U是用户数,I是物品数
步骤:
1. 计算用户相似度
2. 找到K个最相似用户
3. 生成推荐
"""
num_users = user_item_matrix.shape[0]
# 1. 计算用户相似度矩阵(余弦相似度)
def cosine_similarity(user1, user2):
"""余弦相似度"""
dot_product = np.dot(user1, user2)
norm1 = np.linalg.norm(user1)
norm2 = np.linalg.norm(user2)
return dot_product / (norm1 * norm2 + 1e-10)
similarity_matrix = np.zeros((num_users, num_users))
for i in range(num_users):
for j in range(i+1, num_users):
sim = cosine_similarity(user_item_matrix[i], user_item_matrix[j])
similarity_matrix[i][j] = sim
similarity_matrix[j][i] = sim
# 2. 为每个用户找到K个最相似用户
recommendations = {}
for user_id in range(num_users):
# 找到最相似的K个用户(排除自己)
similarities = similarity_matrix[user_id]
similarities[user_id] = -1 # 排除自己
# 堆排序找top-k
import heapq
top_k_users = heapq.nlargest(k,
range(num_users),
similarities.__getitem__)
# 3. 生成推荐
user_ratings = user_item_matrix[user_id]
predicted_ratings = {}
for item_id in range(user_item_matrix.shape[1]):
if user_ratings[item_id] == 0: # 未评分的物品
# 加权平均
weighted_sum = 0
sim_sum = 0
for similar_user in top_k_users:
if user_item_matrix[similar_user][item_id] > 0:
weighted_sum += (similarities[similar_user] *
user_item_matrix[similar_user][item_id])
sim_sum += abs(similarities[similar_user])
if sim_sum > 0:
predicted_ratings[item_id] = weighted_sum / sim_sum
# 推荐评分最高的物品
recommended_items = sorted(predicted_ratings.items(),
key=lambda x: -x[1])[:k]
recommendations[user_id] = recommended_items
return recommendations
# 示例:用户-物品评分矩阵
user_item_matrix = np.array([
[5, 3, 0, 1], # 用户0
[4, 0, 0, 1], # 用户1
[1, 1, 0, 5], # 用户2
[1, 0, 0, 4], # 用户3
[0, 1, 5, 4] # 用户4
])
recommendations = collaborative_filtering(user_item_matrix, k=2)
for user_id, recs in recommendations.items():
print(f"用户{user_id}的推荐: {recs}")
案例2:图像处理中的算法应用¶
边缘检测:卷积与滑动窗口¶
def edge_detection(image, kernel):
"""
边缘检测(使用卷积)
时间复杂度:O(H * W * K * K)
H和W是图像高和宽,K是卷积核大小
应用:Sobel算子、Canny边缘检测
"""
h, w = image.shape
kh, kw = kernel.shape
pad_h, pad_w = kh // 2, kw // 2
# 填充图像
padded = np.pad(image, ((pad_h, pad_h), (pad_w, pad_w)), mode='constant')
# 卷积
result = np.zeros((h, w))
for i in range(h):
for j in range(w):
# 提取窗口
window = padded[i:i+kh, j:j+kw]
# 卷积运算
result[i, j] = np.sum(window * kernel)
return result
# Sobel算子
sobel_x = np.array([
[-1, 0, 1],
[-2, 0, 2],
[-1, 0, 1]
])
sobel_y = np.array([
[-1, -2, -1],
[ 0, 0, 0],
[ 1, 2, 1]
])
# 示例
image = np.random.rand(100, 100) # 随机图像
edges_x = edge_detection(image, sobel_x)
edges_y = edge_detection(image, sobel_y)
magnitude = np.sqrt(edges_x**2 + edges_y**2)
案例3:自然语言处理中的动态规划¶
分词算法(最短路径)¶
def word_segmentation(text, word_freq):
"""
基于动态规划的分词
目标:找到最高频的词组合
时间复杂度:O(n²)
空间复杂度:O(n)
"""
n = len(text)
# dp[i]:text[:i]的最高频分词
dp = [float('-inf')] * (n + 1)
dp[0] = 0
# 记录路径
path = [-1] * (n + 1)
for i in range(1, n + 1):
for j in range(i):
word = text[j:i]
freq = word_freq.get(word, 0)
if dp[j] + freq > dp[i]:
dp[i] = dp[j] + freq
path[i] = j
# 回溯
result = []
i = n
while i > 0:
j = path[i]
result.append(text[j:i])
i = j
return result[::-1]
# 示例
word_freq = {
"我": 1000,
"爱": 500,
"自然": 300,
"语言": 300,
"处理": 200,
"自然语言": 400
}
text = "我爱自然语言处理"
segments = word_segmentation(text, word_freq)
print(f"分词结果: {segments}")
# 可能输出: ['我', '爱', '自然语言', '处理']
📝 总结¶
核心要点¶
✅ 数据预处理: - 排序:特征选择、异常值检测 - 哈希:去重、词汇表构建 - 滑动窗口:时序平滑、N-gram、池化 - 搜索:阈值优化、最近邻查找
✅ 动态规划: - Viterbi算法:序列标注 - 编辑距离:拼写纠错 - LCS:相似度计算 - 分词:最短路径
✅ 图算法: - 消息传递:GNN - MCTS:强化学习 - 拓扑排序:任务调度
✅ 优化算法: - 梯度下降:线搜索 - 价值迭代:强化学习
性能优化建议¶
- 时间复杂度优化:
- 预处理:使用哈希表加速查找
- 批处理:向量化操作
-
并行化:多进程/多线程
-
空间复杂度优化:
- 滚动数组:DP空间优化
- 稀疏表示:大规模图
-
生成器:流式处理
-
实际工程考虑:
- 使用sklearn、PyTorch等库的优化实现
- GPU加速矩阵运算
- 分布式计算
🎯 下一步学习¶
继续深入: - LeetCode 100+题详解 - 巩固算法基础 - 大厂面试真题 - AI岗位面试 - 实际项目:实现一个完整的推荐系统
掌握这些算法,让你的AI/ML技能更上一层楼! 🚀