跳转至

并发与异步编程

🎯 学习目标

  • 深入理解 GIL 的原理及其对多线程的影响
  • 掌握 threading 模块的使用和线程同步原语
  • 掌握 multiprocessing 多进程编程和进程间通信
  • 理解协程演进历史,熟练使用 asyncio 进行异步编程
  • 能根据场景选择多线程/多进程/异步方案
  • 掌握性能基准测试和最佳实践

第一部分:多线程

1.1 GIL(全局解释器锁)详解

GIL(Global Interpreter Lock)是 CPython 解释器中的一把全局互斥锁。它确保在任意时刻,只有一个线程在执行 Python 字节码。

为什么存在 GIL:

  • CPython 的内存管理(引用计数)不是线程安全的
  • GIL 是一种简单粗暴但有效的方式来保护引用计数
  • 它让 CPython 的单线程性能更好,C 扩展开发更简单

GIL 的影响:

Python
import threading
import time

def cpu_bound_task():
    """CPU 密集型任务"""
    count = 0
    for _ in range(50_000_000):
        count += 1
    return count

# 单线程
start = time.time()
cpu_bound_task()
cpu_bound_task()
print(f"单线程: {time.time() - start:.2f}s")

# 多线程(并不会更快!因为 GIL)
start = time.time()
t1 = threading.Thread(target=cpu_bound_task)  # threading.Thread创建新线程执行并发任务
t2 = threading.Thread(target=cpu_bound_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多线程: {time.time() - start:.2f}s")

# 输出示例:
# 单线程: 4.20s
# 多线程: 4.35s  ← 甚至可能更慢(线程切换开销)

关键结论:

任务类型 多线程效果 原因
CPU 密集型 ❌ 无法加速(甚至更慢) GIL 阻止真正的并行执行
IO 密集型 ✅ 有效加速 IO 等待时 GIL 会释放

如何绕过 GIL:

  1. 使用多进程(multiprocessing)—— 每个进程都有自己的 GIL
  2. 使用 C 扩展(如 NumPy)—— 在 C 层面释放 GIL
  3. 使用其他 Python 实现(Jython、PyPy)
  4. 使用 asyncio 异步编程(IO 密集场景)
  5. Python 3.13+ 的 free-threaded 模式(实验性,需安装 python3.13t 构建版本或设置 PYTHON_GIL=0

1.2 threading 模块基础

Python
import threading
import time

# 方式一:传入函数
def worker(name, delay):
    print(f"线程 {name} 开始")
    time.sleep(delay)
    print(f"线程 {name} 完成")

t1 = threading.Thread(target=worker, args=("A", 2))
t2 = threading.Thread(target=worker, args=("B", 1))

t1.start()          # 启动线程(非阻塞)
t2.start()

t1.join()           # 等待线程完成(阻塞)
t2.join()
print("所有线程完成")

# 方式二:继承 Thread 类
class DownloadThread(threading.Thread):
    def __init__(self, url):
        super().__init__()
        self.url = url
        self.result = None

    def run(self):
        # 模拟下载
        print(f"正在下载: {self.url}")
        time.sleep(1)
        self.result = f"{self.url} 的内容"

t = DownloadThread("https://example.com")
t.start()
t.join()
print(t.result)

# 守护线程(主线程结束时自动终止)
t = threading.Thread(target=worker, args=("daemon", 10), daemon=True)
t.start()
# 主线程结束时,daemon 线程会被强制终止

1.3 线程同步原语

Lock(互斥锁)

Python
import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(1_000_000):
        with lock:              # 推荐使用 with 语句(自动释放)
            counter += 1

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start(); t2.start()
t1.join(); t2.join()
print(f"counter = {counter}")   # 2000000(正确结果)
# 如果不加锁,结果可能小于 2000000

RLock(可重入锁)

Python
import threading

rlock = threading.RLock()

def recursive_function(n):
    with rlock:                 # 同一线程可以多次获取
        if n > 0:
            print(f"depth: {n}")
            recursive_function(n - 1)

# Lock 在同一线程重复获取会死锁,RLock 不会

Semaphore(信号量)

Python
import threading
import time

# 限制同时运行的线程数
semaphore = threading.Semaphore(3)  # 最多3个线程同时执行

def limited_task(name):
    with semaphore:
        print(f"任务 {name} 开始执行")
        time.sleep(2)
        print(f"任务 {name} 完成")

threads = [threading.Thread(target=limited_task, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()
# 任何时刻最多只有 3 个任务在执行

Event(事件)

Python
import threading
import time

event = threading.Event()

def waiter():
    print("等待信号...")
    event.wait()            # 阻塞,直到 event 被 set
    print("收到信号!开始工作")

def sender():
    time.sleep(2)
    print("发送信号")
    event.set()             # 唤醒所有等待的线程

threading.Thread(target=waiter).start()
threading.Thread(target=sender).start()

Condition(条件变量)

Python
import threading
import time

condition = threading.Condition()
items = []

def producer():
    for i in range(5):
        time.sleep(0.5)
        with condition:
            items.append(f"item-{i}")
            print(f"生产: item-{i}")
            condition.notify()      # 通知消费者

def consumer():
    while True:
        with condition:
            while not items:
                condition.wait()    # 等待通知
            item = items.pop(0)
            print(f"消费: {item}")

threading.Thread(target=producer).start()
threading.Thread(target=consumer, daemon=True).start()

1.4 线程安全的数据结构(Queue)

Python
import threading
import queue
import time

q = queue.Queue(maxsize=10)     # 线程安全的队列

def producer(q):
    for i in range(20):
        item = f"product-{i}"
        q.put(item)             # 如果队列满了会阻塞
        print(f"[生产者] 生产了: {item}")
        time.sleep(0.1)
    q.put(None)                 # 毒丸(Poison Pill)标记结束

def consumer(q):
    while True:
        item = q.get()          # 如果队列空了会阻塞
        if item is None:        # 收到结束信号
            break
        print(f"[消费者] 消费了: {item}")
        time.sleep(0.2)
        q.task_done()           # 标记任务完成

producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))

producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
print("全部完成")

Queue 的三种类型:

类型 说明
queue.Queue FIFO 先进先出
queue.LifoQueue LIFO 后进先出(栈)
queue.PriorityQueue 优先级队列

1.5 生产者-消费者模型完整实现

Python
import threading
import queue
import time
import random

class ProducerConsumer:
    def __init__(self, buffer_size=5, num_producers=2, num_consumers=3):
        self.q = queue.Queue(maxsize=buffer_size)
        self.num_producers = num_producers
        self.num_consumers = num_consumers
        self.finished = threading.Event()

    def produce(self, producer_id):
        """生产者"""
        for i in range(5):
            item = f"P{producer_id}-Item{i}"
            self.q.put(item)
            print(f"[生产者-{producer_id}] 生产: {item} (队列大小: {self.q.qsize()})")
            time.sleep(random.uniform(0.1, 0.3))
        print(f"[生产者-{producer_id}] 完成生产")

    def consume(self, consumer_id):
        """消费者"""
        while not self.finished.is_set() or not self.q.empty():
            try:
                item = self.q.get(timeout=0.5)
                print(f"[消费者-{consumer_id}] 消费: {item}")
                time.sleep(random.uniform(0.1, 0.5))
                self.q.task_done()
            except queue.Empty:
                continue
        print(f"[消费者-{consumer_id}] 退出")

    def run(self):
        producers = [
            threading.Thread(target=self.produce, args=(i,))
            for i in range(self.num_producers)
        ]
        consumers = [
            threading.Thread(target=self.consume, args=(i,))
            for i in range(self.num_consumers)
        ]

        for t in producers + consumers:
            t.start()

        for t in producers:
            t.join()

        self.q.join()           # 等待所有任务被处理
        self.finished.set()     # 通知消费者退出

        for t in consumers:
            t.join()

        print("所有生产和消费完成")

if __name__ == "__main__":
    pc = ProducerConsumer()
    pc.run()

1.6 ThreadPoolExecutor 线程池

Python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def download(url):
    """模拟下载任务"""
    print(f"开始下载: {url}")
    time.sleep(1)
    return f"{url} 下载完成 ({len(url)} bytes)"

urls = [f"https://example.com/page/{i}" for i in range(10)]

# 使用线程池(推荐方式)
with ThreadPoolExecutor(max_workers=5) as executor:
    # 方式一:map(保持顺序)
    results = executor.map(download, urls)
    for result in results:
        print(result)

# 方式二:submit + as_completed(按完成顺序获取)
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(download, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
            print(result)
        except Exception as e:
            print(f"{url} 出错: {e}")

第二部分:多进程

2.1 multiprocessing 模块

Python
import multiprocessing
import time
import os

def cpu_bound_task(n):
    """CPU 密集型任务"""
    print(f"进程 {os.getpid()} 开始计算")
    result = sum(i * i for i in range(n))
    print(f"进程 {os.getpid()} 完成")
    return result

if __name__ == "__main__":
    # 创建进程
    p1 = multiprocessing.Process(target=cpu_bound_task, args=(10_000_000,))
    p2 = multiprocessing.Process(target=cpu_bound_task, args=(10_000_000,))

    start = time.time()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"多进程耗时: {time.time() - start:.2f}s")

    # 对比单进程
    start = time.time()
    cpu_bound_task(10_000_000)
    cpu_bound_task(10_000_000)
    print(f"单进程耗时: {time.time() - start:.2f}s")

    # 多进程真的更快!因为每个进程有自己的 GIL

⚠️ 重要:Windows 上必须将多进程代码放在 if __name__ == "__main__": 中,否则会报错。

2.2 进程间通信

Queue(进程安全队列)

Python
from multiprocessing import Process, Queue

def producer(q):
    for i in range(5):
        q.put(f"item-{i}")
    q.put(None)    # 结束信号

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")

if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Pipe(管道)

Python
from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from sender!")
    conn.send([1, 2, 3])
    conn.close()

def receiver(conn):
    while True:
        try:
            msg = conn.recv()
            print(f"收到: {msg}")
        except EOFError:
            break

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

共享内存(Value、Array)

Python
from multiprocessing import Process, Value, Array

def modify(shared_val, shared_arr):
    # ⚠️ 并发安全提示:Value 的 += 操作不是原子的(先读后写),多进程并发时会产生竞态条件。
    # 应使用 shared_val.get_lock() 加锁:
    #   with shared_val.get_lock():
    #       shared_val.value += 1
    shared_val.value += 1
    for i in range(len(shared_arr)):
        shared_arr[i] *= 2

if __name__ == "__main__":
    # Value:共享单个值('i' 表示整数,'d' 表示浮点数)
    val = Value('i', 0)
    # Array:共享数组
    arr = Array('i', [1, 2, 3, 4, 5])

    p = Process(target=modify, args=(val, arr))
    p.start()
    p.join()

    print(f"Value: {val.value}")     # 1
    print(f"Array: {list(arr)}")     # [2, 4, 6, 8, 10]

2.3 进程池

Python
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
import time

def heavy_computation(n):
    """模拟CPU密集计算"""
    return sum(i ** 2 for i in range(n))

if __name__ == "__main__":
    numbers = [5_000_000] * 8

    # 方式一:multiprocessing.Pool
    start = time.time()
    with Pool(processes=4) as pool:
        results = pool.map(heavy_computation, numbers)
    print(f"Pool: {time.time() - start:.2f}s")

    # 方式二:ProcessPoolExecutor(推荐,与 ThreadPoolExecutor 接口一致)
    start = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(heavy_computation, numbers))
    print(f"ProcessPoolExecutor: {time.time() - start:.2f}s")

    # 对比单进程
    start = time.time()
    results = [heavy_computation(n) for n in numbers]
    print(f"单进程: {time.time() - start:.2f}s")

2.4 多进程 vs 多线程选择策略

场景 推荐方案 原因
CPU 密集计算 多进程 绕过 GIL,真正并行
IO 密集(文件/网络) 多线程 / 异步 IO 等待时 GIL 释放
需要共享大量数据 多线程 线程共享内存空间
需要隔离性/稳定性 多进程 进程间互不影响
大量并发连接 异步(asyncio) 内存开销小

第三部分:异步编程

3.1 协程概念——从生成器到 async/await 的演进

Python
# 阶段一:生成器(Python 2.5+)
def simple_generator():
    yield 1
    yield 2
    yield 3

# 阶段二:yield from(Python 3.3+)
def sub_gen():
    yield "a"
    yield "b"

def main_gen():
    yield from sub_gen()    # 委托给子生成器
    yield "c"

# 阶段三:@asyncio.coroutine + yield from(Python 3.4)
import asyncio

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)
    return "done"

# 阶段四:async/await(Python 3.5+)✨ 现代方式
async def modern_coroutine():
    await asyncio.sleep(1)
    return "done"

核心区别:

  • 线程:OS 管理调度,抢占式切换,开销较大
  • 协程:用户代码控制切换(await),协作式,开销极小
  • 一个线程可以运行数千上万个协程

3.2 asyncio 事件循环

事件循环是 asyncio 的核心,它管理和调度所有协程的执行。

Python
import asyncio

async def say_hello(name, delay):
    await asyncio.sleep(delay)
    print(f"Hello, {name}!")

# Python 3.7+ 推荐方式
async def main():
    # 这三个协程会并发执行
    await asyncio.gather(
        say_hello("Alice", 2),
        say_hello("Bob", 1),
        say_hello("Charlie", 3),
    )

asyncio.run(main())  # asyncio.run()创建事件循环并运行顶层协程,是异步程序的入口
# 输出(按完成顺序):
# Hello, Bob!        (1秒后)
# Hello, Alice!      (2秒后)
# Hello, Charlie!    (3秒后)
# 总耗时约3秒,而非6秒!

3.3 async def / await 语法

Python
import asyncio

# async def 定义协程函数
async def fetch_data(url):
    print(f"开始获取 {url}")
    await asyncio.sleep(1)          # 模拟网络请求(非阻塞)
    return f"{url} 的数据"

# await 只能在 async 函数中使用
async def process():
    # await 暂停当前协程,等待结果
    data = await fetch_data("https://api.example.com")
    print(f"处理: {data}")

asyncio.run(process())

重要概念:

  • async def 定义的函数返回一个协程对象,不会立即执行
  • await 暂停当前协程的执行,等待被 await 的对象完成
  • await 只能用于 awaitable 对象(协程、Task、Future)

3.4 asyncio 核心 API

Python
import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} 完成(耗时{delay}s)"

async def main():
    # 1. create_task:创建任务(立即调度执行)
    t1 = asyncio.create_task(task("A", 2))
    t2 = asyncio.create_task(task("B", 1))
    result1 = await t1
    result2 = await t2
    print(result1, result2)

    # 2. gather:并发运行多个协程,收集所有结果
    results = await asyncio.gather(
        task("C", 2),
        task("D", 1),
        task("E", 3),
    )
    print(results)  # ['C 完成(耗时2s)', 'D 完成(耗时1s)', 'E 完成(耗时3s)']

    # 3. wait:并发运行,更灵活的控制
    tasks = [asyncio.create_task(task(f"T{i}", i)) for i in range(1, 4)]
    done, pending = await asyncio.wait(tasks, timeout=2.5)
    for t in done:
        print(f"完成: {t.result()}")
    for t in pending:
        print(f"未完成: {t.get_name()}")
        t.cancel()

    # 4. sleep:非阻塞等待
    await asyncio.sleep(1)

    # 5. wait_for:设置超时
    try:
        result = await asyncio.wait_for(task("slow", 10), timeout=3)
    except asyncio.TimeoutError:
        print("任务超时!")

    # 6. as_completed:按完成顺序获取结果
    coros = [task(f"X{i}", 3 - i) for i in range(3)]
    for coro in asyncio.as_completed(coros):
        result = await coro
        print(f"按完成顺序: {result}")

asyncio.run(main())

3.5 异步上下文管理器(async with)

Python
import asyncio

class AsyncResource:
    async def __aenter__(self):  # 异步上下文管理器入口,对应 async with 语句
        print("获取资源")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):  # 异步退出:exc_type=异常类型, exc_val=异常值, exc_tb=回溯信息
        print("释放资源")
        await asyncio.sleep(0.1)

    async def do_something(self):
        print("使用资源")

async def main():
    async with AsyncResource() as resource:  # async with异步上下文管理器,自动管理异步资源的获取和释放
        await resource.do_something()

asyncio.run(main())
# 获取资源
# 使用资源
# 释放资源

3.6 异步迭代器(async for)

Python
import asyncio

class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):  # 返回异步迭代器自身,使对象可用于 async for
        return self

    async def __anext__(self):  # 异步迭代器的"取下一个"方法
        if self.current >= self.stop:
            raise StopAsyncIteration  # 抛出此异常表示异步迭代结束(类似同步的StopIteration)
        self.current += 1
        await asyncio.sleep(0.1)
        return self.current

async def main():
    async for num in AsyncCounter(5):
        print(num)

asyncio.run(main())

# 异步生成器(Python 3.6+)
async def async_range(start, stop):
    for i in range(start, stop):
        await asyncio.sleep(0.1)
        yield i

async def main2():
    async for i in async_range(1, 6):
        print(i)

asyncio.run(main2())

3.7 aiohttp 异步 HTTP 请求实战

Python
import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    """异步获取单个URL"""
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]

    start = time.time()

    # 使用 aiohttp 的 ClientSession
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    print(f"异步请求 {len(urls)} 个URL,耗时: {elapsed:.2f}s")
    # 5个延迟1秒的请求,同步需要5秒,异步只需约1秒

    # 限制并发数量
    semaphore = asyncio.Semaphore(3)

    async def limited_fetch(session, url):
        async with semaphore:
            return await fetch_url(session, url)

    async with aiohttp.ClientSession() as session:
        tasks = [limited_fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

# pip install aiohttp
asyncio.run(main())

3.8 异步文件操作(aiofiles)

Python
import asyncio
import aiofiles

async def async_file_operations():
    # 异步写文件
    async with aiofiles.open("output.txt", mode="w") as f:
        await f.write("Hello, async world!\n")
        await f.write("Line 2\n")

    # 异步读文件
    async with aiofiles.open("output.txt", mode="r") as f:
        content = await f.read()
        print(content)

    # 异步逐行读取
    async with aiofiles.open("output.txt", mode="r") as f:
        async for line in f:
            print(line.strip())

# pip install aiofiles
asyncio.run(async_file_operations())

3.9 异步数据库操作简介

Python
import asyncio

# 示例:使用 asyncpg(PostgreSQL 异步驱动)
# pip install asyncpg

async def db_operations():
    import asyncpg

    # 创建连接池
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="user",
        password="password",
        min_size=5,
        max_size=20,
    )

    async with pool.acquire() as conn:
        # 查询
        rows = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
        for row in rows:
            print(dict(row))

        # 插入
        await conn.execute(
            "INSERT INTO users(name, email) VALUES($1, $2)",
            "Alice", "alice@example.com"
        )

        # 事务
        async with conn.transaction():
            await conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = $1", 1)
            await conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = $1", 2)

    await pool.close()

# 常见异步数据库驱动:
# PostgreSQL: asyncpg, aiopg
# MySQL: aiomysql
# SQLite: aiosqlite
# ORM: SQLAlchemy 2.0 (支持 async), Tortoise ORM

第四部分:实战与比较

4.1 IO 密集 vs CPU 密集任务的选择策略

特性 多线程 (threading) 多进程 (multiprocessing) 异步 (asyncio)
适用场景 IO 密集型 CPU 密集型 高并发 IO
并行能力 受 GIL 限制 真正并行 单线程并发
内存开销 中等(~8MB/线程) 大(独立内存空间) 极小(~KB/协程)
数据共享 共享内存(需加锁) 需 IPC 机制 自然共享(单线程)
切换开销 较大(OS 调度) 大(OS 调度) 极小(用户态切换)
编程复杂度 中等 中等 较高(async 传染性)
可扩展性 数百 数十 数万+
调试难度 困难(竞态条件) 中等 中等(但栈追踪复杂)
典型用例 文件读写、简单网络请求 数据处理、图像处理 Web 服务器、爬虫

决策流程图:

Text Only
你的任务是什么类型?
├── CPU 密集型 → 用多进程(multiprocessing / ProcessPoolExecutor)
└── IO 密集型
    ├── 并发量 < 100 → 多线程(ThreadPoolExecutor)
    ├── 并发量 100~10000 → asyncio
    └── 已有异步生态(aiohttp等) → asyncio

4.2 性能基准测试对比

Python
import time
import threading
import multiprocessing
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests  # pip install requests

# ============================================
# IO 密集型任务对比
# ============================================

URL = "https://httpbin.org/delay/1"  # 每个请求延迟1秒
NUM_REQUESTS = 10

# 1. 同步方式
def sync_fetch():
    results = []
    start = time.time()
    for _ in range(NUM_REQUESTS):
        resp = requests.get(URL)
        results.append(resp.status_code)
    elapsed = time.time() - start
    print(f"同步: {elapsed:.2f}s")

# 2. 多线程
def threaded_fetch():
    start = time.time()
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(lambda _: requests.get(URL).status_code, range(NUM_REQUESTS)))
    elapsed = time.time() - start
    print(f"多线程: {elapsed:.2f}s")

# 3. 异步
async def async_fetch():
    import aiohttp
    start = time.time()
    async with aiohttp.ClientSession() as session:
        async def _fetch(url):
            async with session.get(url) as resp:  # async with 确保响应被正确释放
                return resp.status
        tasks = [_fetch(URL) for _ in range(NUM_REQUESTS)]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    print(f"异步: {elapsed:.2f}s")

# 预期结果(10个延迟1秒的请求):
# 同步:   ~10.0s
# 多线程: ~2.0s  (5个worker)
# 异步:   ~1.0s  (全部并发)

# ============================================
# CPU 密集型任务对比
# ============================================

def cpu_task(n):
    """计算密集"""
    return sum(i ** 2 for i in range(n))

def compare_cpu():
    N = 5_000_000
    COUNT = 4

    # 单进程
    start = time.time()
    for _ in range(COUNT):
        cpu_task(N)
    print(f"单进程 CPU 密集: {time.time() - start:.2f}s")

    # 多线程(不会更快)
    start = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        list(executor.map(cpu_task, [N] * COUNT))
    print(f"多线程 CPU 密集: {time.time() - start:.2f}s")

    # 多进程(真正加速)
    start = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        list(executor.map(cpu_task, [N] * COUNT))
    print(f"多进程 CPU 密集: {time.time() - start:.2f}s")

4.3 综合实战:爬虫性能对比

Python
"""
三种爬虫实现方式对比:同步 vs 多线程 vs 异步
"""

import time
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
import asyncio

# ==========================================
# 版本一:同步爬虫
# ==========================================
def sync_crawler(urls):
    """同步爬虫"""
    results = []
    start = time.time()
    for url in urls:
        try:
            resp = requests.get(url, timeout=10)
            results.append({
                "url": url,
                "status": resp.status_code,
                "length": len(resp.text),
            })
        except Exception as e:
            results.append({"url": url, "error": str(e)})
    elapsed = time.time() - start
    print(f"同步爬虫: {elapsed:.2f}s, 成功: {sum(1 for r in results if 'status' in r)}/{len(urls)}")
    return results

# ==========================================
# 版本二:多线程爬虫
# ==========================================
def threaded_crawler(urls, max_workers=10):
    """多线程爬虫"""
    results = []
    lock = threading.Lock()

    def fetch(url):
        try:
            resp = requests.get(url, timeout=10)
            result = {"url": url, "status": resp.status_code, "length": len(resp.text)}
        except Exception as e:
            result = {"url": url, "error": str(e)}
        with lock:
            results.append(result)

    start = time.time()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        executor.map(fetch, urls)
    elapsed = time.time() - start
    print(f"多线程爬虫: {elapsed:.2f}s, 成功: {sum(1 for r in results if 'status' in r)}/{len(urls)}")
    return results

# ==========================================
# 版本三:异步爬虫
# ==========================================
async def async_crawler(urls, max_concurrent=20):
    """异步爬虫"""
    import aiohttp

    results = []
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch(session, url):
        async with semaphore:
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                    text = await resp.text()
                    return {"url": url, "status": resp.status, "length": len(text)}
            except Exception as e:
                return {"url": url, "error": str(e)}

    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    print(f"异步爬虫: {elapsed:.2f}s, 成功: {sum(1 for r in results if 'status' in r)}/{len(urls)}")
    return results

# ==========================================
# 对比运行
# ==========================================
if __name__ == "__main__":
    # 准备测试 URL 列表
    test_urls = [f"https://httpbin.org/get?id={i}" for i in range(20)]

    print("=" * 50)
    print(f"爬取 {len(test_urls)} 个 URL")
    print("=" * 50)

    # 同步
    sync_crawler(test_urls)

    # 多线程
    threaded_crawler(test_urls, max_workers=10)

    # 异步(需要 pip install aiohttp)
    asyncio.run(async_crawler(test_urls, max_concurrent=20))

4.4 concurrent.futures 统一接口

concurrent.futures 提供了统一的高级接口,可以在线程池和进程池之间无缝切换。

Python
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def task(n):
    """通用任务"""
    time.sleep(0.1)
    return n ** 2

def run_with_executor(executor_class, max_workers, data):
    """通用执行器"""
    with executor_class(max_workers=max_workers) as executor:
        # submit:提交单个任务
        future = executor.submit(task, 42)
        print(f"单个结果: {future.result()}")

        # map:批量提交(保持顺序)
        results = list(executor.map(task, data))
        print(f"批量结果: {results[:5]}...")

if __name__ == "__main__":
    data = list(range(20))

    print("--- ThreadPoolExecutor ---")
    run_with_executor(ThreadPoolExecutor, 4, data)

    print("--- ProcessPoolExecutor ---")
    run_with_executor(ProcessPoolExecutor, 4, data)

    # 只需要改一个类名!接口完全一致

4.5 常见陷阱与最佳实践

陷阱一:async 传染性

Python
# ❌ 不能在同步函数中直接调用异步函数
def sync_function():
    result = await async_function()  # SyntaxError!

# ✅ 正确方式
async def correct_way():
    result = await async_function()

# 或者在同步上下文中运行
import asyncio
result = asyncio.run(async_function())

陷阱二:忘记 await

Python
# ❌ 忘记 await,协程不会执行
async def main():
    asyncio.sleep(1)        # 没有 await!这只是创建了协程对象
    print("立刻打印了")

# ✅ 正确
async def main():
    await asyncio.sleep(1)
    print("等待1秒后打印")

陷阱三:在异步代码中使用阻塞操作

Python
# ❌ time.sleep 会阻塞整个事件循环
async def bad_example():
    time.sleep(5)           # 阻塞!其他协程也无法执行

# ✅ 使用异步版本
async def good_example():
    await asyncio.sleep(5)  # 非阻塞

# ✅ 如果必须使用阻塞操作,放到线程池中
async def necessary_blocking():
    loop = asyncio.get_running_loop()  # Python 3.10+推荐,替代已弃用的get_event_loop()
    result = await loop.run_in_executor(None, requests.get, "https://example.com")

陷阱四:多线程竞态条件

Python
# ❌ 非线程安全
counter = 0
def increment():
    global counter
    for _ in range(1_000_000):
        counter += 1       # 读-改-写不是原子操作!

# ✅ 使用锁
lock = threading.Lock()
def safe_increment():
    global counter
    for _ in range(1_000_000):
        with lock:
            counter += 1

最佳实践

  1. IO 密集优先用 asyncio:性能最优、资源消耗最小
  2. CPU 密集用 multiprocessing:绕过 GIL
  3. 简单并发用 ThreadPoolExecutor:API 简单、易理解
  4. 控制并发数量:使用 Semaphore 避免资源耗尽
  5. 优雅关闭:正确处理 KeyboardInterrupt 和资源清理
  6. 使用 concurrent.futures:统一的接口方便切换策略
  7. 避免共享可变状态:尽量用消息传递(Queue)而非共享内存

✏️ 练习题

基础练习

  1. 编写一个多线程程序,同时下载5个网页内容并打印每个网页的大小。

  2. 使用 threading.Lock 实现一个线程安全的计数器类。

  3. 使用 asyncio.gather 并发执行三个模拟IO任务,对比总耗时与顺序执行的差异。

  4. 实现生产者-消费者模型,2个生产者、3个消费者的多线程版本。

进阶练习

  1. 编写同步、多线程、异步三个版本的文件批量读取程序,比较性能差异。

  2. 使用 ProcessPoolExecutor 实现并行计算 1 到 N 的素数,对比多进程与单进程的加速比。

  3. asyncio + aiohttp 实现一个简单的异步爬虫,能并发爬取100个页面,控制最大并发数为20。

  4. 实现一个异步生产者-消费者模型,使用 asyncio.Queue


📋 面试高频题(15道)

Q1: GIL 是什么?为什么存在?如何绕过?

GIL(全局解释器锁)是 CPython 中的互斥锁,确保同一时刻只有一个线程执行 Python 字节码。存在原因是保护 CPython 的引用计数机制。绕过方式:使用多进程、C扩展(NumPy)、其他 Python 实现、asyncio。


Q2: 进程 vs 线程 vs 协程的区别?

特性 进程 线程 协程
调度者 OS OS 用户代码
切换方式 OS抢占式 OS抢占式 协作式(await)
内存 独立地址空间 共享进程内存 共享线程内存
开销 大(MB级) 中(MB级) 极小(KB级)
通信 IPC(队列、管道) 共享内存(需锁) 直接访问
并行 真正并行 受GIL限制 单线程并发

Q3: asyncio 事件循环的工作原理?

事件循环维护一个待执行的协程队列。当协程遇到 await 时,它会暂停并让出控制权给事件循环。事件循环检查是否有其他可执行的协程或已完成的IO操作,调度下一个协程运行。整个过程在单线程中完成,通过非阻塞IO和协作式调度实现并发。


Q4: 什么时候用多线程、多进程、异步?

  • 多线程:IO密集且并发量不大(<100)、需要共享内存、已有同步库无异步替代
  • 多进程:CPU密集型任务、需要利用多核、需要进程隔离
  • 异步:高并发IO(>100)、网络服务器、爬虫、已有异步生态库

Q5: threading.Lockthreading.RLock 有什么区别?

Lock 同一线程不可重复获取(会死锁),RLock(可重入锁)允许同一线程多次获取。RLock 内部维护一个计数器,获取几次就需要释放几次。


Q6: asyncio.gatherasyncio.wait 有什么区别?

  • gather:等待所有协程完成,按输入顺序返回结果列表,可设置 return_exceptions=True
  • wait:返回两个集合(done, pending),可设置 timeoutreturn_when 参数,更灵活

Q7: 为什么 time.sleep 在异步代码中是有害的?

time.sleep 是阻塞调用,会阻塞整个事件循环,导致所有其他协程都无法执行。应该使用 await asyncio.sleep() 替代,它会让出控制权给事件循环。


Q8: 什么是"生产者-消费者"模式?如何实现?

一种经典的并发设计模式:生产者负责生成数据放入缓冲区,消费者从缓冲区取数据处理。解耦了生产和消费的速度差异。Python 中可用 queue.Queue(多线程)、multiprocessing.Queue(多进程)、asyncio.Queue(异步)实现。


Q9: concurrent.futures 模块的优势?

提供统一的高级接口(ThreadPoolExecutorProcessPoolExecutor),只需更换一个类名就能在线程和进程之间切换。支持 submit(单任务)、map(批量任务)、as_completed(按完成顺序获取)。


Q10: 如何限制异步任务的并发数量?

使用 asyncio.Semaphore

Python
sem = asyncio.Semaphore(10)
async def limited_task():
    async with sem:
        await do_something()


Q11: 多进程间如何通信?各种方式的优缺点?

  • Queue:线程/进程安全,使用简单,性能适中
  • Pipe:两个进程间通信,速度快,但只能一对一
  • 共享内存(Value/Array):速度快,但需要手动加锁
  • Manager:支持复杂数据结构(dict/list),但性能较低

Q12: async withasync for 分别用于什么场景?

  • async with:异步上下文管理器,用于异步的资源获取和释放(如数据库连接、HTTP会话)
  • async for:异步迭代器,用于逐个获取异步生成的数据(如流式读取、WebSocket消息)

Q13: Python 3.12/3.13 在并发方面有什么新特性?

  • Python 3.12:改进了 asyncio性能,新增 TaskGroup
  • Python 3.13:实验性的 free-threaded 模式(需安装 python3.13t 构建版本或设置 PYTHON_GIL=0),可以真正实现多线程并行
  • TaskGroup 提供结构化并发,类似 trio 的 nursery
Python
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(coro1())
        task2 = tg.create_task(coro2())
    # 所有任务完成后才会离开 with 块

Q14: 什么是 async 传染性?如何处理?

一旦函数变成 async,调用它的函数也必须是 async,像传染一样扩散到整个调用链。处理方法: 1. 尽早在代码架构中引入 async 2. 用 asyncio.run() 在同步入口点运行异步代码 3. 用 loop.run_in_executor() 在异步代码中调用同步函数


Q15: 如何在异步代码中调用同步的阻塞函数?

使用 run_in_executor 将阻塞操作放到线程池中执行:

Python
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    # 在默认线程池中运行阻塞函数
    result = await loop.run_in_executor(None, blocking_function, arg1, arg2)

    # 或指定线程池
    from concurrent.futures import ThreadPoolExecutor
    executor = ThreadPoolExecutor(max_workers=5)
    result = await loop.run_in_executor(executor, blocking_function, arg1, arg2)

← 上一章:装饰器与生成器 | 返回目录