跳转至

AI/ML中的算法应用 - 完全详解

重要性:⭐⭐⭐⭐⭐ 难度:⭐⭐⭐⭐ 学习时间:4-6天 前置知识:基础数据结构、动态规划、图算法


📚 目录

  1. AI/ML中的算法概述
  2. 数据预处理中的算法
  3. 动态规划在序列建模中的应用
  4. 图算法在深度学习中的应用
  5. 优化算法
  6. 实战案例

AI/ML中的算法概述

为什么算法对AI/ML很重要?

算法是AI/ML的基础: - 训练前:数据预处理需要排序、搜索、哈希 - 训练中:优化算法使用梯度下降、动态规划 - 推理时:搜索算法用于路径规划、解码 - 系统设计:数据结构影响模型性能

AI/ML中的算法分类

Text Only
算法应用层级:

┌─────────────────────────────────────┐
│   应用层:推荐系统、NLP、CV、RL      │
├─────────────────────────────────────┤
│   算法层:搜索、优化、图、DP         │
├─────────────────────────────────────┤
│   数据层:数组、哈希、树、堆         │
└─────────────────────────────────────┘

常见算法应用场景

AI/ML任务 使用的算法 时间复杂度
数据预处理 排序、去重、哈希 O(n log n)
特征工程 二分查找、滑动窗口 O(n), O(log n)
序列标注 动态规划(Viterbi) O(n²)
注意力机制 矩阵运算、Top-K搜索 O(n²), O(n log n)
图神经网络 BFS/DFS、最短路径 O(V+E)
强化学习 MCTS、搜索算法 O(b^d)

数据预处理中的算法

1. 排序算法在特征工程中的应用

应用场景:特征重要性排序

问题:有1000个特征,需要找出最重要的top-K个特征。

Python
import numpy as np
from collections import defaultdict

def select_top_k_features(X, y, k=10):
    """
    选择重要性最高的K个特征
    方法:基于方差的特征选择
    时间复杂度:O(n + m log m),m是特征数
    空间复杂度:O(m)
    """
    # 计算每个特征的方差
    feature_variances = np.var(X, axis=0)

    # 获取top-k特征的索引(使用堆排序优化)
    # 方法1:完全排序 O(m log m)
    top_k_indices = np.argsort(feature_variances)[-k:]

    # 方法2:使用堆 O(m + k log m) - 更适合m很大时
    import heapq
    top_k_indices_heap = heapq.nlargest(k,
                                       range(len(feature_variances)),
                                       feature_variances.__getitem__)

    return top_k_indices_heap

# 示例
X = np.random.rand(1000, 100)  # 1000个样本,100个特征
y = np.random.randint(0, 2, 1000)
top_features = select_top_k_features(X, y, k=10)
print(f"Top-10特征索引: {top_features}")

算法对比: - 完全排序:O(m log m),适合需要全部排序 - 堆排序(Top-K):O(m + k log m),适合k << m - 快速选择:O(m)平均,适合只需要中位数或特定分位数

应用场景:数据清洗(异常值检测)

Python
def detect_outliers_sorted(data, threshold=3):
    """
    使用排序检测异常值
    方法:基于IQR(四分位距)
    时间复杂度:O(n log n)
    """
    # 排序数据
    sorted_data = np.sort(data)
    n = len(sorted_data)

    # 计算四分位数(二分查找优化)
    def find_percentile(sorted_arr, percentile):
        """使用二分查找找百分位数"""
        index = int(n * percentile / 100)
        return sorted_arr[index]

    Q1 = find_percentile(sorted_data, 25)
    Q3 = find_percentile(sorted_data, 75)
    IQR = Q3 - Q1

    # 定义异常值边界
    lower_bound = Q1 - threshold * IQR
    upper_bound = Q3 + threshold * IQR

    # 找出异常值
    outliers = [x for x in data if x < lower_bound or x > upper_bound]

    return outliers

# 示例
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100]  # 100是异常值
outliers = detect_outliers_sorted(data)
print(f"异常值: {outliers}")  # [100]

2. 哈希表在数据处理中的应用

应用场景:数据去重

Python
def remove_duplicates_hash(data):
    """
    使用哈希表去重
    时间复杂度:O(n)
    空间复杂度:O(n)
    """
    seen = set()
    unique_data = []

    for item in data:
        if item not in seen:
            seen.add(item)
            unique_data.append(item)

    return unique_data

# NLP应用:去重文档
documents = [
    "This is a document.",
    "Another document.",
    "This is a document.",  # 重复
    "Third document."
]

unique_docs = remove_duplicates_hash(documents)
print(f"去重后文档数: {len(unique_docs)}")  # 3

应用场景:词汇表构建

Python
def build_vocabulary(corpus):
    """
    构建词汇表(NLP基础操作)
    时间复杂度:O(n),n是所有文档的总词数
    空间复杂度:O(v),v是词汇量
    """
    word_to_id = {}
    id_to_word = {}
    word_freq = defaultdict(int)  # defaultdict带默认值的字典,避免KeyError

    # 第一遍扫描:统计词频
    for doc in corpus:
        words = doc.lower().split()
        for word in words:
            word_freq[word] += 1

    # 第二遍扫描:构建词汇表(按词频排序)
    sorted_words = sorted(word_freq.items(),
                         key=lambda x: x[1],  # lambda匿名函数:简洁的单行函数
                         reverse=True)

    for idx, (word, freq) in enumerate(sorted_words):  # enumerate同时获取索引和值
        word_to_id[word] = idx
        id_to_word[idx] = word

    return word_to_id, id_to_word, word_freq

# 示例
corpus = [
    "the cat sat on the mat",
    "the dog sat on the log",
    "cats and dogs are pets"
]

word_to_id, id_to_word, word_freq = build_vocabulary(corpus)
print(f"词汇表大小: {len(word_to_id)}")
print(f"词频最高的5个词: {sorted(word_freq.items(), key=lambda x: -x[1])[:5]}")

应用场景:特征哈希(降维技术)

``python def feature_hashing(features, dim=1000): """ 特征哈希(哈希技巧) 用于处理高维稀疏特征(如文本) 时间复杂度:O(n) 空间复杂度:O(dim),dim是哈希表维度 """ hashed = np.zeros(dim)

Text Only
for feature in features:
    # 使用哈希函数将特征映射到固定维度
    idx = hash(feature) % dim
    hashed[idx] += 1

return hashed

文本分类应用

text = "machine learning is awesome" words = text.split() hashed_features = feature_hashing(words, dim=1000) print(f"哈希特征向量: {hashed_features}")

Text Only
### 3. 滑动窗口在时序数据中的应用

#### 应用场景:时间序列平滑

```python
def moving_average(data, window_size):
    """
    移动平均(时间序列平滑)
    时间复杂度:O(n)
    空间复杂度:O(n)
    """
    smoothed = []

    for i in range(len(data)):
        # 窗口范围
        start = max(0, i - window_size // 2)
        end = min(len(data), i + window_size // 2 + 1)

        # 计算窗口内平均值
        window = data[start:end]
        avg = sum(window) / len(window)
        smoothed.append(avg)

    return smoothed

# 示例:传感器数据平滑
sensor_data = [10, 12, 13, 11, 10, 9, 10, 11, 12, 13]
smoothed = moving_average(sensor_data, window_size=3)
print(f"原始数据: {sensor_data}")
print(f"平滑后: {[round(x, 2) for x in smoothed]}")

应用场景:文本的N-gram提取

Python
def extract_ngrams(text, n=3):
    """
    提取文本的n-gram(滑动窗口应用)
    时间复杂度:O(L),L是文本长度
    空间复杂度:O(L)
    """
    words = text.split()
    ngrams = []

    # 滑动窗口
    for i in range(len(words) - n + 1):
        ngram = ' '.join(words[i:i+n])
        ngrams.append(ngram)

    return ngrams

# 示例
text = "natural language processing is fun"
trigrams = extract_ngrams(text, n=3)
print(f"三元组: {trigrams}")
# ['natural language processing', 'language processing is', 'processing is fun']

应用场景:最大池化(CNN)

Python
def max_pooling(feature_map, pool_size=2, stride=2):
    """
    最大池化(CNN中的降采样操作)
    使用滑动窗口实现
    时间复杂度:O(HW),H和W是特征图的高和宽
    """
    h, w = feature_map.shape
    pooled_h = (h - pool_size) // stride + 1
    pooled_w = (w - pool_size) // stride + 1
    pooled = np.zeros((pooled_h, pooled_w))

    # 滑动窗口
    for i in range(0, h - pool_size + 1, stride):
        for j in range(0, w - pool_size + 1, stride):
            # 提取窗口
            window = feature_map[i:i+pool_size, j:j+pool_size]
            # 最大池化
            pooled[i//stride, j//stride] = np.max(window)

    return pooled

# 示例
feature_map = np.array([
    [1, 3, 2, 4],
    [5, 6, 7, 8],
    [9, 10, 11, 12],
    [13, 14, 15, 16]
])
pooled = max_pooling(feature_map, pool_size=2, stride=2)
print(f"池化结果:\n{pooled}")

4. 搜索算法在数据查找中的应用

应用场景:二分查找找阈值

Python
def find_best_threshold(y_true, y_scores):
    """
    使用二分查找找到最佳分类阈值
    目标:最大化F1分数
    时间复杂度:O(n log m),n是样本数,m是候选阈值数
    """
    # 排序所有可能的预测分数
    sorted_scores = np.sort(y_scores)

    def compute_f1(threshold):
        """计算给定阈值的F1分数"""
        y_pred = (y_scores >= threshold).astype(int)
        tp = np.sum((y_pred == 1) & (y_true == 1))
        fp = np.sum((y_pred == 1) & (y_true == 0))
        fn = np.sum((y_pred == 0) & (y_true == 1))

        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0

        if precision + recall == 0:
            return 0
        return 2 * precision * recall / (precision + recall)

    # 二分查找最佳阈值
    left, right = 0, len(sorted_scores) - 1
    best_threshold = sorted_scores[0]
    best_f1 = 0

    while left <= right:
        mid = (left + right) // 2
        threshold = sorted_scores[mid]
        f1 = compute_f1(threshold)

        if f1 > best_f1:
            best_f1 = f1
            best_threshold = threshold

        # 根据F1分数调整搜索范围
        # 这里简化处理,实际可能需要更复杂的策略
        if f1 < 0.5:
            right = mid - 1
        else:
            left = mid + 1

    return best_threshold, best_f1

# 示例
y_true = np.array([0, 1, 1, 0, 1])
y_scores = np.array([0.3, 0.7, 0.8, 0.4, 0.6])
threshold, f1 = find_best_threshold(y_true, y_scores)
print(f"最佳阈值: {threshold:.2f}, F1分数: {f1:.2f}")

动态规划在序列建模中的应用

1. 序列标注问题

Viterbi算法:HMM中的解码

问题:给定观测序列,找出最可能的隐含状态序列。

Python
def viterbi(observations, states, start_p, trans_p, emit_p):
    """
    Viterbi算法:寻找最可能的隐含状态序列
    时间复杂度:O(T * S²),T是观测序列长度,S是状态数
    空间复杂度:O(T * S)

    参数:
    - observations: 观测序列
    - states: 状态列表
    - start_p: 初始概率
    - trans_p: 转移概率矩阵
    - emit_p: 发射概率矩阵
    """
    T = len(observations)
    S = len(states)

    # V[i][s]:时刻t到达状态s的最大概率
    V = np.zeros((T, S))
    # path[i][s]:时刻t到达状态s的路径
    path = np.zeros((T, S), dtype=int)

    # 初始化
    for s in range(S):
        V[0][s] = start_p[s] * emit_p[s][observations[0]]
        path[0][s] = s

    # 递推
    for t in range(1, T):
        for s in range(S):
            # 找到前一时刻的最佳状态
            max_prob = 0
            best_prev_s = 0

            for prev_s in range(S):
                prob = V[t-1][prev_s] * trans_p[prev_s][s] * emit_p[s][observations[t]]
                if prob > max_prob:
                    max_prob = prob
                    best_prev_s = prev_s

            V[t][s] = max_prob
            path[t][s] = best_prev_s

    # 回溯找最优路径
    best_path = []
    best_last_state = np.argmax(V[T-1])
    best_path.append(states[best_last_state])

    for t in range(T-1, 0, -1):
        best_last_state = path[t][best_last_state]
        best_path.append(states[best_last_state])

    return best_path[::-1]  # 反转

# 示例:词性标注
states = ['名词', '动词']
observations = ['我', '吃', '苹果']  # 简化:用词本身代替观测
start_p = [0.6, 0.4]  # 初始概率
trans_p = [[0.7, 0.3], [0.4, 0.6]]  # 转移概率
emit_p = [[0.9, 0.1, 0.3], [0.1, 0.9, 0.7]]  # 发射概率

best_path = viterbi(observations, states, start_p, trans_p, emit_p)
print(f"最可能的词性序列: {best_path}")

Viterbi vs 普通递归: - 递归:O(S^T),指数级爆炸 - Viterbi(DP):O(T * S²),多项式时间

2. 编辑距离算法在NLP中的应用

Levenshtein距离:拼写纠错

Python
def levenshtein_distance(word1, word2):
    """
    计算编辑距离(Levenshtein距离)
    时间复杂度:O(m * n),m和n是两个词的长度
    空间复杂度:O(m * n)

    应用:拼写纠错、模糊匹配
    """
    m, n = len(word1), len(word2)

    # dp[i][j]:word1[:i]和word2[:j]的编辑距离
    dp = np.zeros((m+1, n+1))

    # 初始化边界条件
    for i in range(m+1):
        dp[i][0] = i  # 删除i个字符
    for j in range(n+1):
        dp[0][j] = j  # 插入j个字符

    # 填充DP表
    for i in range(1, m+1):
        for j in range(1, n+1):
            if word1[i-1] == word2[j-1]:
                dp[i][j] = dp[i-1][j-1]  # 字符相同,无需操作
            else:
                dp[i][j] = 1 + min(
                    dp[i-1][j],    # 删除
                    dp[i][j-1],    # 插入
                    dp[i-1][j-1]   # 替换
                )

    return dp[m][n]

# 示例:拼写纠错
correct_words = ["apple", "banana", "orange", "grape"]
typo = "aple"

best_match = None
min_dist = float('inf')

for word in correct_words:
    dist = levenshtein_distance(typo, word)
    if dist < min_dist:
        min_dist = dist
        best_match = word

print(f"输入: {typo}")
print(f"纠正: {best_match} (编辑距离: {min_dist})")

空间优化版本

Python
def levenshtein_distance_optimized(word1, word2):
    """
    空间优化的编辑距离算法
    时间复杂度:O(m * n)
    空间复杂度:O(min(m, n))
    """
    if len(word1) < len(word2):
        word1, word2 = word2, word1

    m, n = len(word1), len(word2)

    # 只保留两行
    prev = np.arange(n + 1)
    curr = np.zeros(n + 1)

    for i in range(1, m + 1):
        curr[0] = i
        for j in range(1, n + 1):
            if word1[i-1] == word2[j-1]:
                curr[j] = prev[j-1]
            else:
                curr[j] = 1 + min(prev[j], curr[j-1], prev[j-1])
        prev, curr = curr, prev

    return prev[n]

3. 最长公共子序列(LCS)

应用:DNA序列比对、文档相似度

Python
def longest_common_subsequence(text1, text2):
    """
    最长公共子序列
    时间复杂度:O(m * n)
    空间复杂度:O(m * n)

    应用:版本控制、DNA比对、抄袭检测
    """
    m, n = len(text1), len(text2)

    # dp[i][j]:text1[:i]和text2[:j]的最长公共子序列长度
    dp = np.zeros((m+1, n+1))

    for i in range(1, m+1):
        for j in range(1, n+1):
            if text1[i-1] == text2[j-1]:
                dp[i][j] = dp[i-1][j-1] + 1
            else:
                dp[i][j] = max(dp[i-1][j], dp[i][j-1])

    # 回溯找LCS
    lcs = []
    i, j = m, n
    while i > 0 and j > 0:
        if text1[i-1] == text2[j-1]:
            lcs.append(text1[i-1])
            i -= 1
            j -= 1
        elif dp[i-1][j] > dp[i][j-1]:
            i -= 1
        else:
            j -= 1

    return lcs[::-1]

# 示例:代码相似度检测
code1 = "def add(a, b): return a + b"
code2 = "def add(x, y): return x + y"

lcs = longest_common_subsequence(code1, code2)
similarity = len(lcs) / (max(len(code1), len(code2)))
print(f"公共子序列: {''.join(lcs)}")
print(f"相似度: {similarity:.2%}")

图算法在深度学习中的应用

1. 图神经网络(GNN)基础

消息传递:BFS的应用

Python
def message_passing_gnn(adj_matrix, node_features, num_layers=2):
    """
    简化的图神经网络(消息传递)
    使用BFS思想聚合邻居信息
    时间复杂度:O(L * (V + E)),L是层数,V是节点数,E是边数
    空间复杂度:O(V * F),F是特征维度
    """
    num_nodes = adj_matrix.shape[0]
    feature_dim = node_features.shape[1]

    # 初始化隐藏状态
    hidden_states = node_features.copy()

    for layer in range(num_layers):
        new_states = np.zeros_like(hidden_states)

        # 对每个节点聚合邻居信息
        for node in range(num_nodes):
            # 找到所有邻居(包括自己)
            neighbors = np.where(adj_matrix[node] == 1)[0]

            # 聚合:求和(也可以用平均、最大值等)
            aggregated = np.sum(hidden_states[neighbors], axis=0)

            # 非线性激活(简化版,实际会用神经网络)
            new_states[node] = np.maximum(0, aggregated)  # ReLU

        hidden_states = new_states

    return hidden_states

# 示例:引文网络
# 论文之间的引用关系
adj_matrix = np.array([
    [0, 1, 1, 0],  # 论文0被1、2引用
    [0, 0, 0, 1],  # 论文1被3引用
    [0, 0, 0, 1],  # 论文2被3引用
    [0, 0, 0, 0]   # 论文3没有被引用
])

# 初始特征(词袋向量)
node_features = np.array([
    [1, 0, 1],  # 论文0的特征
    [0, 1, 0],  # 论文1的特征
    [1, 1, 0],  # 论文2的特征
    [0, 0, 1]   # 论文3的特征
])

final_features = message_passing_gnn(adj_matrix, node_features, num_layers=2)
print(f"最终节点特征:\n{final_features}")

2. 最短路径在强化学习中的应用

MCTS(蒙特卡洛树搜索)

Python
import math
import random

class MCTSNode:
    """蒙特卡洛树搜索节点"""

    def __init__(self, state, parent=None):
        self.state = state
        self.parent = parent
        self.children = []
        self.visits = 0
        self.value = 0
        self.untried_actions = self.get_untried_actions()

    def get_untried_actions(self):
        """获取未尝试的动作"""
        # 简化:返回所有可能的动作
        return list(range(4))  # 假设有4个动作

    def uct_value(self, c=1.414):
        """计算UCT值(用于选择节点)"""
        if self.visits == 0:
            return float('inf')
        return (self.value / self.visits) + c * math.sqrt(math.log(self.parent.visits) / self.visits)

def mcts(root_state, num_simulations=1000):
    """
    蒙特卡洛树搜索
    时间复杂度:O(N * D),N是模拟次数,D是搜索深度
    空间复杂度:O(B^D),B是分支因子

    应用:AlphaGo、游戏AI、规划
    """
    root = MCTSNode(root_state)

    for _ in range(num_simulations):
        # 1. 选择(使用树策略)
        node = root
        while node.untried_actions == [] and node.children:
            node = max(node.children, key=lambda n: n.uct_value())

        # 2. 扩展
        if node.untried_actions:
            action = random.choice(node.untried_actions)
            node.untried_actions.remove(action)
            new_state = node.state  # 简化:实际需要应用动作
            node = MCTSNode(new_state, parent=node)
            node.parent.children.append(node)

        # 3. 模拟(随机策略)
        reward = simulate(node.state)

        # 4. 回溯
        while node:
            node.visits += 1
            node.value += reward
            node = node.parent

    # 返回最优动作
    best_child = max(root.children, key=lambda n: n.visits)
    return best_child

def simulate(state):
    """随机模拟(简化版)"""
    return random.random()  # 返回0-1之间的随机奖励

3. 拓扑排序在任务调度中的应用

训练流水线调度

Python
from collections import deque

def topological_sort_tasks(tasks, dependencies):
    """
    拓扑排序:确定任务执行顺序
    时间复杂度:O(V + E)
    空间复杂度:O(V + E)

    应用:ML流水线调度、数据预处理流程
    """
    # 构建图
    in_degree = {task: 0 for task in tasks}
    graph = {task: [] for task in tasks}

    for task, dep in dependencies:
        graph[dep].append(task)
        in_degree[task] += 1

    # 找到所有入度为0的节点
    queue = deque([task for task in tasks if in_degree[task] == 0])
    result = []

    while queue:
        task = queue.popleft()
        result.append(task)

        # 移除该节点的所有出边
        for neighbor in graph[task]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    if len(result) != len(tasks):
        raise ValueError("存在循环依赖!")

    return result

# 示例:ML训练流水线
tasks = ["数据收集", "数据清洗", "特征工程", "模型训练", "模型评估", "模型部署"]
dependencies = [
    ("数据清洗", "数据收集"),      # 清洗依赖收集
    ("特征工程", "数据清洗"),      # 特征依赖清洗
    ("模型训练", "特征工程"),      # 训练依赖特征
    ("模型评估", "模型训练"),      # 评估依赖训练
    ("模型部署", "模型评估")       # 部署依赖评估
]

execution_order = topological_sort_tasks(tasks, dependencies)
print(f"任务执行顺序: {' -> '.join(execution_order)}")

优化算法

1. 梯度下降:搜索算法的应用

一维搜索(线搜索)

Python
def line_search(f, df, x, direction, alpha=0.3, beta=0.8):
    """
    回溯线搜索(Backtracking Line Search)
    时间复杂度:O(log(1/ε)),ε是精度
    空间复杂度:O(1)

    应用:梯度下降中的步长选择
    """
    step_size = 1.0

    while f(x + step_size * direction) > f(x) + alpha * step_size * df(x) * direction:
        step_size *= beta

    return step_size

# 示例:优化二次函数
def f(x):
    """目标函数:f(x) = x²"""
    return x ** 2

def df(x):
    """导数:f'(x) = 2x"""
    return 2 * x

x = 10  # 初始点
for i in range(10):
    direction = -df(x)  # 负梯度方向
    step = line_search(f, df, x, direction)
    x = x + step * direction
    print(f"迭代{i+1}: x = {x:.4f}, f(x) = {f(x):.4f}")

2. 动态规划在强化学习中的应用

价值迭代(Value Iteration)

Python
def value_iteration(states, actions, transitions, rewards, gamma=0.9, theta=1e-6):
    """
    价值迭代算法
    时间复杂度:O(S² * A),S是状态数,A是动作数
    空间复杂度:O(S)

    应用:马尔可夫决策过程(MDP)求解
    """
    V = {s: 0 for s in states}

    while True:
        delta = 0

        for s in states:
            v = V[s]

            # Bellman最优方程
            max_value = float('-inf')
            for a in actions:
                expected_value = 0
                for s_prime in states:
                    prob = transitions.get((s, a, s_prime), 0)
                    reward = rewards.get((s, a, s_prime), 0)
                    expected_value += prob * (reward + gamma * V[s_prime])

                max_value = max(max_value, expected_value)

            V[s] = max_value
            delta = max(delta, abs(v - V[s]))

        # 收敛检查
        if delta < theta:
            break

    return V

# 示例:简单的网格世界
states = [(0, 0), (0, 1), (1, 0), (1, 1)]
actions = ['up', 'down', 'left', 'right']

# 简化的转移概率(确定性)
transitions = {}
rewards = {}

V = value_iteration(states, actions, transitions, rewards)
print(f"最优价值函数: {V}")

实战案例

案例1:构建推荐系统

协同过滤中的相似度计算

Python
def collaborative_filtering(user_item_matrix, k=5):
    """
    协同过滤推荐(基于用户)
    时间复杂度:O(U² * I + U * I * log I)
    U是用户数,I是物品数

    步骤:
    1. 计算用户相似度
    2. 找到K个最相似用户
    3. 生成推荐
    """
    num_users = user_item_matrix.shape[0]

    # 1. 计算用户相似度矩阵(余弦相似度)
    def cosine_similarity(user1, user2):
        """余弦相似度"""
        dot_product = np.dot(user1, user2)
        norm1 = np.linalg.norm(user1)
        norm2 = np.linalg.norm(user2)
        return dot_product / (norm1 * norm2 + 1e-10)

    similarity_matrix = np.zeros((num_users, num_users))
    for i in range(num_users):
        for j in range(i+1, num_users):
            sim = cosine_similarity(user_item_matrix[i], user_item_matrix[j])
            similarity_matrix[i][j] = sim
            similarity_matrix[j][i] = sim

    # 2. 为每个用户找到K个最相似用户
    recommendations = {}
    for user_id in range(num_users):
        # 找到最相似的K个用户(排除自己)
        similarities = similarity_matrix[user_id]
        similarities[user_id] = -1  # 排除自己

        # 堆排序找top-k
        import heapq
        top_k_users = heapq.nlargest(k,
                                     range(num_users),
                                     similarities.__getitem__)

        # 3. 生成推荐
        user_ratings = user_item_matrix[user_id]
        predicted_ratings = {}

        for item_id in range(user_item_matrix.shape[1]):
            if user_ratings[item_id] == 0:  # 未评分的物品
                # 加权平均
                weighted_sum = 0
                sim_sum = 0
                for similar_user in top_k_users:
                    if user_item_matrix[similar_user][item_id] > 0:
                        weighted_sum += (similarities[similar_user] *
                                       user_item_matrix[similar_user][item_id])
                        sim_sum += abs(similarities[similar_user])

                if sim_sum > 0:
                    predicted_ratings[item_id] = weighted_sum / sim_sum

        # 推荐评分最高的物品
        recommended_items = sorted(predicted_ratings.items(),
                                  key=lambda x: -x[1])[:k]
        recommendations[user_id] = recommended_items

    return recommendations

# 示例:用户-物品评分矩阵
user_item_matrix = np.array([
    [5, 3, 0, 1],  # 用户0
    [4, 0, 0, 1],  # 用户1
    [1, 1, 0, 5],  # 用户2
    [1, 0, 0, 4],  # 用户3
    [0, 1, 5, 4]   # 用户4
])

recommendations = collaborative_filtering(user_item_matrix, k=2)
for user_id, recs in recommendations.items():
    print(f"用户{user_id}的推荐: {recs}")

案例2:图像处理中的算法应用

边缘检测:卷积与滑动窗口

Python
def edge_detection(image, kernel):
    """
    边缘检测(使用卷积)
    时间复杂度:O(H * W * K * K)
    H和W是图像高和宽,K是卷积核大小

    应用:Sobel算子、Canny边缘检测
    """
    h, w = image.shape
    kh, kw = kernel.shape
    pad_h, pad_w = kh // 2, kw // 2

    # 填充图像
    padded = np.pad(image, ((pad_h, pad_h), (pad_w, pad_w)), mode='constant')

    # 卷积
    result = np.zeros((h, w))
    for i in range(h):
        for j in range(w):
            # 提取窗口
            window = padded[i:i+kh, j:j+kw]
            # 卷积运算
            result[i, j] = np.sum(window * kernel)

    return result

# Sobel算子
sobel_x = np.array([
    [-1, 0, 1],
    [-2, 0, 2],
    [-1, 0, 1]
])

sobel_y = np.array([
    [-1, -2, -1],
    [ 0,  0,  0],
    [ 1,  2,  1]
])

# 示例
image = np.random.rand(100, 100)  # 随机图像
edges_x = edge_detection(image, sobel_x)
edges_y = edge_detection(image, sobel_y)
magnitude = np.sqrt(edges_x**2 + edges_y**2)

案例3:自然语言处理中的动态规划

分词算法(最短路径)

Python
def word_segmentation(text, word_freq):
    """
    基于动态规划的分词
    目标:找到最高频的词组合
    时间复杂度:O(n²)
    空间复杂度:O(n)
    """
    n = len(text)

    # dp[i]:text[:i]的最高频分词
    dp = [float('-inf')] * (n + 1)
    dp[0] = 0

    # 记录路径
    path = [-1] * (n + 1)

    for i in range(1, n + 1):
        for j in range(i):
            word = text[j:i]
            freq = word_freq.get(word, 0)

            if dp[j] + freq > dp[i]:
                dp[i] = dp[j] + freq
                path[i] = j

    # 回溯
    result = []
    i = n
    while i > 0:
        j = path[i]
        result.append(text[j:i])
        i = j

    return result[::-1]

# 示例
word_freq = {
    "我": 1000,
    "爱": 500,
    "自然": 300,
    "语言": 300,
    "处理": 200,
    "自然语言": 400
}

text = "我爱自然语言处理"
segments = word_segmentation(text, word_freq)
print(f"分词结果: {segments}")
# 可能输出: ['我', '爱', '自然语言', '处理']

📝 总结

核心要点

数据预处理: - 排序:特征选择、异常值检测 - 哈希:去重、词汇表构建 - 滑动窗口:时序平滑、N-gram、池化 - 搜索:阈值优化、最近邻查找

动态规划: - Viterbi算法:序列标注 - 编辑距离:拼写纠错 - LCS:相似度计算 - 分词:最短路径

图算法: - 消息传递:GNN - MCTS:强化学习 - 拓扑排序:任务调度

优化算法: - 梯度下降:线搜索 - 价值迭代:强化学习

性能优化建议

  1. 时间复杂度优化
  2. 预处理:使用哈希表加速查找
  3. 批处理:向量化操作
  4. 并行化:多进程/多线程

  5. 空间复杂度优化

  6. 滚动数组:DP空间优化
  7. 稀疏表示:大规模图
  8. 生成器:流式处理

  9. 实际工程考虑

  10. 使用sklearn、PyTorch等库的优化实现
  11. GPU加速矩阵运算
  12. 分布式计算

🎯 下一步学习

继续深入: - LeetCode 100+题详解 - 巩固算法基础 - 大厂面试真题 - AI岗位面试 - 实际项目:实现一个完整的推荐系统


掌握这些算法,让你的AI/ML技能更上一层楼! 🚀