并发与异步编程¶
🎯 学习目标¶
- 深入理解 GIL 的原理及其对多线程的影响
- 掌握
threading模块的使用和线程同步原语 - 掌握
multiprocessing多进程编程和进程间通信 - 理解协程演进历史,熟练使用
asyncio进行异步编程 - 能根据场景选择多线程/多进程/异步方案
- 掌握性能基准测试和最佳实践
第一部分:多线程¶
1.1 GIL(全局解释器锁)详解¶
GIL(Global Interpreter Lock)是 CPython 解释器中的一把全局互斥锁。它确保在任意时刻,只有一个线程在执行 Python 字节码。
为什么存在 GIL:
- CPython 的内存管理(引用计数)不是线程安全的
- GIL 是一种简单粗暴但有效的方式来保护引用计数
- 它让 CPython 的单线程性能更好,C 扩展开发更简单
GIL 的影响:
import threading
import time
def cpu_bound_task():
"""CPU 密集型任务"""
count = 0
for _ in range(50_000_000):
count += 1
return count
# 单线程
start = time.time()
cpu_bound_task()
cpu_bound_task()
print(f"单线程: {time.time() - start:.2f}s")
# 多线程(并不会更快!因为 GIL)
start = time.time()
t1 = threading.Thread(target=cpu_bound_task) # threading.Thread创建新线程执行并发任务
t2 = threading.Thread(target=cpu_bound_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多线程: {time.time() - start:.2f}s")
# 输出示例:
# 单线程: 4.20s
# 多线程: 4.35s ← 甚至可能更慢(线程切换开销)
关键结论:
| 任务类型 | 多线程效果 | 原因 |
|---|---|---|
| CPU 密集型 | ❌ 无法加速(甚至更慢) | GIL 阻止真正的并行执行 |
| IO 密集型 | ✅ 有效加速 | IO 等待时 GIL 会释放 |
如何绕过 GIL:
- 使用多进程(
multiprocessing)—— 每个进程都有自己的 GIL - 使用 C 扩展(如 NumPy)—— 在 C 层面释放 GIL
- 使用其他 Python 实现(Jython、PyPy)
- 使用
asyncio异步编程(IO 密集场景) - Python 3.13+ 的 free-threaded 模式(实验性,需安装
python3.13t构建版本或设置PYTHON_GIL=0)
1.2 threading 模块基础¶
import threading
import time
# 方式一:传入函数
def worker(name, delay):
print(f"线程 {name} 开始")
time.sleep(delay)
print(f"线程 {name} 完成")
t1 = threading.Thread(target=worker, args=("A", 2))
t2 = threading.Thread(target=worker, args=("B", 1))
t1.start() # 启动线程(非阻塞)
t2.start()
t1.join() # 等待线程完成(阻塞)
t2.join()
print("所有线程完成")
# 方式二:继承 Thread 类
class DownloadThread(threading.Thread):
def __init__(self, url):
super().__init__()
self.url = url
self.result = None
def run(self):
# 模拟下载
print(f"正在下载: {self.url}")
time.sleep(1)
self.result = f"{self.url} 的内容"
t = DownloadThread("https://example.com")
t.start()
t.join()
print(t.result)
# 守护线程(主线程结束时自动终止)
t = threading.Thread(target=worker, args=("daemon", 10), daemon=True)
t.start()
# 主线程结束时,daemon 线程会被强制终止
1.3 线程同步原语¶
Lock(互斥锁)¶
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(1_000_000):
with lock: # 推荐使用 with 语句(自动释放)
counter += 1
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start(); t2.start()
t1.join(); t2.join()
print(f"counter = {counter}") # 2000000(正确结果)
# 如果不加锁,结果可能小于 2000000
RLock(可重入锁)¶
import threading
rlock = threading.RLock()
def recursive_function(n):
with rlock: # 同一线程可以多次获取
if n > 0:
print(f"depth: {n}")
recursive_function(n - 1)
# Lock 在同一线程重复获取会死锁,RLock 不会
Semaphore(信号量)¶
import threading
import time
# 限制同时运行的线程数
semaphore = threading.Semaphore(3) # 最多3个线程同时执行
def limited_task(name):
with semaphore:
print(f"任务 {name} 开始执行")
time.sleep(2)
print(f"任务 {name} 完成")
threads = [threading.Thread(target=limited_task, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# 任何时刻最多只有 3 个任务在执行
Event(事件)¶
import threading
import time
event = threading.Event()
def waiter():
print("等待信号...")
event.wait() # 阻塞,直到 event 被 set
print("收到信号!开始工作")
def sender():
time.sleep(2)
print("发送信号")
event.set() # 唤醒所有等待的线程
threading.Thread(target=waiter).start()
threading.Thread(target=sender).start()
Condition(条件变量)¶
import threading
import time
condition = threading.Condition()
items = []
def producer():
for i in range(5):
time.sleep(0.5)
with condition:
items.append(f"item-{i}")
print(f"生产: item-{i}")
condition.notify() # 通知消费者
def consumer():
while True:
with condition:
while not items:
condition.wait() # 等待通知
item = items.pop(0)
print(f"消费: {item}")
threading.Thread(target=producer).start()
threading.Thread(target=consumer, daemon=True).start()
1.4 线程安全的数据结构(Queue)¶
import threading
import queue
import time
q = queue.Queue(maxsize=10) # 线程安全的队列
def producer(q):
for i in range(20):
item = f"product-{i}"
q.put(item) # 如果队列满了会阻塞
print(f"[生产者] 生产了: {item}")
time.sleep(0.1)
q.put(None) # 毒丸(Poison Pill)标记结束
def consumer(q):
while True:
item = q.get() # 如果队列空了会阻塞
if item is None: # 收到结束信号
break
print(f"[消费者] 消费了: {item}")
time.sleep(0.2)
q.task_done() # 标记任务完成
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
print("全部完成")
Queue 的三种类型:
| 类型 | 说明 |
|---|---|
queue.Queue | FIFO 先进先出 |
queue.LifoQueue | LIFO 后进先出(栈) |
queue.PriorityQueue | 优先级队列 |
1.5 生产者-消费者模型完整实现¶
import threading
import queue
import time
import random
class ProducerConsumer:
def __init__(self, buffer_size=5, num_producers=2, num_consumers=3):
self.q = queue.Queue(maxsize=buffer_size)
self.num_producers = num_producers
self.num_consumers = num_consumers
self.finished = threading.Event()
def produce(self, producer_id):
"""生产者"""
for i in range(5):
item = f"P{producer_id}-Item{i}"
self.q.put(item)
print(f"[生产者-{producer_id}] 生产: {item} (队列大小: {self.q.qsize()})")
time.sleep(random.uniform(0.1, 0.3))
print(f"[生产者-{producer_id}] 完成生产")
def consume(self, consumer_id):
"""消费者"""
while not self.finished.is_set() or not self.q.empty():
try:
item = self.q.get(timeout=0.5)
print(f"[消费者-{consumer_id}] 消费: {item}")
time.sleep(random.uniform(0.1, 0.5))
self.q.task_done()
except queue.Empty:
continue
print(f"[消费者-{consumer_id}] 退出")
def run(self):
producers = [
threading.Thread(target=self.produce, args=(i,))
for i in range(self.num_producers)
]
consumers = [
threading.Thread(target=self.consume, args=(i,))
for i in range(self.num_consumers)
]
for t in producers + consumers:
t.start()
for t in producers:
t.join()
self.q.join() # 等待所有任务被处理
self.finished.set() # 通知消费者退出
for t in consumers:
t.join()
print("所有生产和消费完成")
if __name__ == "__main__":
pc = ProducerConsumer()
pc.run()
1.6 ThreadPoolExecutor 线程池¶
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def download(url):
"""模拟下载任务"""
print(f"开始下载: {url}")
time.sleep(1)
return f"{url} 下载完成 ({len(url)} bytes)"
urls = [f"https://example.com/page/{i}" for i in range(10)]
# 使用线程池(推荐方式)
with ThreadPoolExecutor(max_workers=5) as executor:
# 方式一:map(保持顺序)
results = executor.map(download, urls)
for result in results:
print(result)
# 方式二:submit + as_completed(按完成顺序获取)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(download, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(result)
except Exception as e:
print(f"{url} 出错: {e}")
第二部分:多进程¶
2.1 multiprocessing 模块¶
import multiprocessing
import time
import os
def cpu_bound_task(n):
"""CPU 密集型任务"""
print(f"进程 {os.getpid()} 开始计算")
result = sum(i * i for i in range(n))
print(f"进程 {os.getpid()} 完成")
return result
if __name__ == "__main__":
# 创建进程
p1 = multiprocessing.Process(target=cpu_bound_task, args=(10_000_000,))
p2 = multiprocessing.Process(target=cpu_bound_task, args=(10_000_000,))
start = time.time()
p1.start()
p2.start()
p1.join()
p2.join()
print(f"多进程耗时: {time.time() - start:.2f}s")
# 对比单进程
start = time.time()
cpu_bound_task(10_000_000)
cpu_bound_task(10_000_000)
print(f"单进程耗时: {time.time() - start:.2f}s")
# 多进程真的更快!因为每个进程有自己的 GIL
⚠️ 重要:Windows 上必须将多进程代码放在
if __name__ == "__main__":中,否则会报错。
2.2 进程间通信¶
Queue(进程安全队列)¶
from multiprocessing import Process, Queue
def producer(q):
for i in range(5):
q.put(f"item-{i}")
q.put(None) # 结束信号
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
Pipe(管道)¶
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from sender!")
conn.send([1, 2, 3])
conn.close()
def receiver(conn):
while True:
try:
msg = conn.recv()
print(f"收到: {msg}")
except EOFError:
break
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
共享内存(Value、Array)¶
from multiprocessing import Process, Value, Array
def modify(shared_val, shared_arr):
# ⚠️ 并发安全提示:Value 的 += 操作不是原子的(先读后写),多进程并发时会产生竞态条件。
# 应使用 shared_val.get_lock() 加锁:
# with shared_val.get_lock():
# shared_val.value += 1
shared_val.value += 1
for i in range(len(shared_arr)):
shared_arr[i] *= 2
if __name__ == "__main__":
# Value:共享单个值('i' 表示整数,'d' 表示浮点数)
val = Value('i', 0)
# Array:共享数组
arr = Array('i', [1, 2, 3, 4, 5])
p = Process(target=modify, args=(val, arr))
p.start()
p.join()
print(f"Value: {val.value}") # 1
print(f"Array: {list(arr)}") # [2, 4, 6, 8, 10]
2.3 进程池¶
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
import time
def heavy_computation(n):
"""模拟CPU密集计算"""
return sum(i ** 2 for i in range(n))
if __name__ == "__main__":
numbers = [5_000_000] * 8
# 方式一:multiprocessing.Pool
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(heavy_computation, numbers)
print(f"Pool: {time.time() - start:.2f}s")
# 方式二:ProcessPoolExecutor(推荐,与 ThreadPoolExecutor 接口一致)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(heavy_computation, numbers))
print(f"ProcessPoolExecutor: {time.time() - start:.2f}s")
# 对比单进程
start = time.time()
results = [heavy_computation(n) for n in numbers]
print(f"单进程: {time.time() - start:.2f}s")
2.4 多进程 vs 多线程选择策略¶
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| CPU 密集计算 | 多进程 | 绕过 GIL,真正并行 |
| IO 密集(文件/网络) | 多线程 / 异步 | IO 等待时 GIL 释放 |
| 需要共享大量数据 | 多线程 | 线程共享内存空间 |
| 需要隔离性/稳定性 | 多进程 | 进程间互不影响 |
| 大量并发连接 | 异步(asyncio) | 内存开销小 |
第三部分:异步编程¶
3.1 协程概念——从生成器到 async/await 的演进¶
# 阶段一:生成器(Python 2.5+)
def simple_generator():
yield 1
yield 2
yield 3
# 阶段二:yield from(Python 3.3+)
def sub_gen():
yield "a"
yield "b"
def main_gen():
yield from sub_gen() # 委托给子生成器
yield "c"
# 阶段三:@asyncio.coroutine + yield from(Python 3.4)
import asyncio
@asyncio.coroutine
def old_style_coroutine():
yield from asyncio.sleep(1)
return "done"
# 阶段四:async/await(Python 3.5+)✨ 现代方式
async def modern_coroutine():
await asyncio.sleep(1)
return "done"
核心区别:
- 线程:OS 管理调度,抢占式切换,开销较大
- 协程:用户代码控制切换(
await),协作式,开销极小 - 一个线程可以运行数千上万个协程
3.2 asyncio 事件循环¶
事件循环是 asyncio 的核心,它管理和调度所有协程的执行。
import asyncio
async def say_hello(name, delay):
await asyncio.sleep(delay)
print(f"Hello, {name}!")
# Python 3.7+ 推荐方式
async def main():
# 这三个协程会并发执行
await asyncio.gather(
say_hello("Alice", 2),
say_hello("Bob", 1),
say_hello("Charlie", 3),
)
asyncio.run(main()) # asyncio.run()创建事件循环并运行顶层协程,是异步程序的入口
# 输出(按完成顺序):
# Hello, Bob! (1秒后)
# Hello, Alice! (2秒后)
# Hello, Charlie! (3秒后)
# 总耗时约3秒,而非6秒!
3.3 async def / await 语法¶
import asyncio
# async def 定义协程函数
async def fetch_data(url):
print(f"开始获取 {url}")
await asyncio.sleep(1) # 模拟网络请求(非阻塞)
return f"{url} 的数据"
# await 只能在 async 函数中使用
async def process():
# await 暂停当前协程,等待结果
data = await fetch_data("https://api.example.com")
print(f"处理: {data}")
asyncio.run(process())
重要概念:
async def定义的函数返回一个协程对象,不会立即执行await暂停当前协程的执行,等待被 await 的对象完成await只能用于 awaitable 对象(协程、Task、Future)
3.4 asyncio 核心 API¶
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} 完成(耗时{delay}s)"
async def main():
# 1. create_task:创建任务(立即调度执行)
t1 = asyncio.create_task(task("A", 2))
t2 = asyncio.create_task(task("B", 1))
result1 = await t1
result2 = await t2
print(result1, result2)
# 2. gather:并发运行多个协程,收集所有结果
results = await asyncio.gather(
task("C", 2),
task("D", 1),
task("E", 3),
)
print(results) # ['C 完成(耗时2s)', 'D 完成(耗时1s)', 'E 完成(耗时3s)']
# 3. wait:并发运行,更灵活的控制
tasks = [asyncio.create_task(task(f"T{i}", i)) for i in range(1, 4)]
done, pending = await asyncio.wait(tasks, timeout=2.5)
for t in done:
print(f"完成: {t.result()}")
for t in pending:
print(f"未完成: {t.get_name()}")
t.cancel()
# 4. sleep:非阻塞等待
await asyncio.sleep(1)
# 5. wait_for:设置超时
try:
result = await asyncio.wait_for(task("slow", 10), timeout=3)
except asyncio.TimeoutError:
print("任务超时!")
# 6. as_completed:按完成顺序获取结果
coros = [task(f"X{i}", 3 - i) for i in range(3)]
for coro in asyncio.as_completed(coros):
result = await coro
print(f"按完成顺序: {result}")
asyncio.run(main())
3.5 异步上下文管理器(async with)¶
import asyncio
class AsyncResource:
async def __aenter__(self): # 异步上下文管理器入口,对应 async with 语句
print("获取资源")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb): # 异步退出:exc_type=异常类型, exc_val=异常值, exc_tb=回溯信息
print("释放资源")
await asyncio.sleep(0.1)
async def do_something(self):
print("使用资源")
async def main():
async with AsyncResource() as resource: # async with异步上下文管理器,自动管理异步资源的获取和释放
await resource.do_something()
asyncio.run(main())
# 获取资源
# 使用资源
# 释放资源
3.6 异步迭代器(async for)¶
import asyncio
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self): # 返回异步迭代器自身,使对象可用于 async for
return self
async def __anext__(self): # 异步迭代器的"取下一个"方法
if self.current >= self.stop:
raise StopAsyncIteration # 抛出此异常表示异步迭代结束(类似同步的StopIteration)
self.current += 1
await asyncio.sleep(0.1)
return self.current
async def main():
async for num in AsyncCounter(5):
print(num)
asyncio.run(main())
# 异步生成器(Python 3.6+)
async def async_range(start, stop):
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main2():
async for i in async_range(1, 6):
print(i)
asyncio.run(main2())
3.7 aiohttp 异步 HTTP 请求实战¶
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步获取单个URL"""
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
start = time.time()
# 使用 aiohttp 的 ClientSession
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"异步请求 {len(urls)} 个URL,耗时: {elapsed:.2f}s")
# 5个延迟1秒的请求,同步需要5秒,异步只需约1秒
# 限制并发数量
semaphore = asyncio.Semaphore(3)
async def limited_fetch(session, url):
async with semaphore:
return await fetch_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [limited_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
# pip install aiohttp
asyncio.run(main())
3.8 异步文件操作(aiofiles)¶
import asyncio
import aiofiles
async def async_file_operations():
# 异步写文件
async with aiofiles.open("output.txt", mode="w") as f:
await f.write("Hello, async world!\n")
await f.write("Line 2\n")
# 异步读文件
async with aiofiles.open("output.txt", mode="r") as f:
content = await f.read()
print(content)
# 异步逐行读取
async with aiofiles.open("output.txt", mode="r") as f:
async for line in f:
print(line.strip())
# pip install aiofiles
asyncio.run(async_file_operations())
3.9 异步数据库操作简介¶
import asyncio
# 示例:使用 asyncpg(PostgreSQL 异步驱动)
# pip install asyncpg
async def db_operations():
import asyncpg
# 创建连接池
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password",
min_size=5,
max_size=20,
)
async with pool.acquire() as conn:
# 查询
rows = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
for row in rows:
print(dict(row))
# 插入
await conn.execute(
"INSERT INTO users(name, email) VALUES($1, $2)",
"Alice", "alice@example.com"
)
# 事务
async with conn.transaction():
await conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = $1", 1)
await conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = $1", 2)
await pool.close()
# 常见异步数据库驱动:
# PostgreSQL: asyncpg, aiopg
# MySQL: aiomysql
# SQLite: aiosqlite
# ORM: SQLAlchemy 2.0 (支持 async), Tortoise ORM
第四部分:实战与比较¶
4.1 IO 密集 vs CPU 密集任务的选择策略¶
| 特性 | 多线程 (threading) | 多进程 (multiprocessing) | 异步 (asyncio) |
|---|---|---|---|
| 适用场景 | IO 密集型 | CPU 密集型 | 高并发 IO |
| 并行能力 | 受 GIL 限制 | 真正并行 | 单线程并发 |
| 内存开销 | 中等(~8MB/线程) | 大(独立内存空间) | 极小(~KB/协程) |
| 数据共享 | 共享内存(需加锁) | 需 IPC 机制 | 自然共享(单线程) |
| 切换开销 | 较大(OS 调度) | 大(OS 调度) | 极小(用户态切换) |
| 编程复杂度 | 中等 | 中等 | 较高(async 传染性) |
| 可扩展性 | 数百 | 数十 | 数万+ |
| 调试难度 | 困难(竞态条件) | 中等 | 中等(但栈追踪复杂) |
| 典型用例 | 文件读写、简单网络请求 | 数据处理、图像处理 | Web 服务器、爬虫 |
决策流程图:
你的任务是什么类型?
├── CPU 密集型 → 用多进程(multiprocessing / ProcessPoolExecutor)
└── IO 密集型
├── 并发量 < 100 → 多线程(ThreadPoolExecutor)
├── 并发量 100~10000 → asyncio
└── 已有异步生态(aiohttp等) → asyncio
4.2 性能基准测试对比¶
import time
import threading
import multiprocessing
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests # pip install requests
# ============================================
# IO 密集型任务对比
# ============================================
URL = "https://httpbin.org/delay/1" # 每个请求延迟1秒
NUM_REQUESTS = 10
# 1. 同步方式
def sync_fetch():
results = []
start = time.time()
for _ in range(NUM_REQUESTS):
resp = requests.get(URL)
results.append(resp.status_code)
elapsed = time.time() - start
print(f"同步: {elapsed:.2f}s")
# 2. 多线程
def threaded_fetch():
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(lambda _: requests.get(URL).status_code, range(NUM_REQUESTS)))
elapsed = time.time() - start
print(f"多线程: {elapsed:.2f}s")
# 3. 异步
async def async_fetch():
import aiohttp
start = time.time()
async with aiohttp.ClientSession() as session:
async def _fetch(url):
async with session.get(url) as resp: # async with 确保响应被正确释放
return resp.status
tasks = [_fetch(URL) for _ in range(NUM_REQUESTS)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"异步: {elapsed:.2f}s")
# 预期结果(10个延迟1秒的请求):
# 同步: ~10.0s
# 多线程: ~2.0s (5个worker)
# 异步: ~1.0s (全部并发)
# ============================================
# CPU 密集型任务对比
# ============================================
def cpu_task(n):
"""计算密集"""
return sum(i ** 2 for i in range(n))
def compare_cpu():
N = 5_000_000
COUNT = 4
# 单进程
start = time.time()
for _ in range(COUNT):
cpu_task(N)
print(f"单进程 CPU 密集: {time.time() - start:.2f}s")
# 多线程(不会更快)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
list(executor.map(cpu_task, [N] * COUNT))
print(f"多线程 CPU 密集: {time.time() - start:.2f}s")
# 多进程(真正加速)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
list(executor.map(cpu_task, [N] * COUNT))
print(f"多进程 CPU 密集: {time.time() - start:.2f}s")
4.3 综合实战:爬虫性能对比¶
"""
三种爬虫实现方式对比:同步 vs 多线程 vs 异步
"""
import time
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
import asyncio
# ==========================================
# 版本一:同步爬虫
# ==========================================
def sync_crawler(urls):
"""同步爬虫"""
results = []
start = time.time()
for url in urls:
try:
resp = requests.get(url, timeout=10)
results.append({
"url": url,
"status": resp.status_code,
"length": len(resp.text),
})
except Exception as e:
results.append({"url": url, "error": str(e)})
elapsed = time.time() - start
print(f"同步爬虫: {elapsed:.2f}s, 成功: {sum(1 for r in results if 'status' in r)}/{len(urls)}")
return results
# ==========================================
# 版本二:多线程爬虫
# ==========================================
def threaded_crawler(urls, max_workers=10):
"""多线程爬虫"""
results = []
lock = threading.Lock()
def fetch(url):
try:
resp = requests.get(url, timeout=10)
result = {"url": url, "status": resp.status_code, "length": len(resp.text)}
except Exception as e:
result = {"url": url, "error": str(e)}
with lock:
results.append(result)
start = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(fetch, urls)
elapsed = time.time() - start
print(f"多线程爬虫: {elapsed:.2f}s, 成功: {sum(1 for r in results if 'status' in r)}/{len(urls)}")
return results
# ==========================================
# 版本三:异步爬虫
# ==========================================
async def async_crawler(urls, max_concurrent=20):
"""异步爬虫"""
import aiohttp
results = []
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch(session, url):
async with semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
text = await resp.text()
return {"url": url, "status": resp.status, "length": len(text)}
except Exception as e:
return {"url": url, "error": str(e)}
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"异步爬虫: {elapsed:.2f}s, 成功: {sum(1 for r in results if 'status' in r)}/{len(urls)}")
return results
# ==========================================
# 对比运行
# ==========================================
if __name__ == "__main__":
# 准备测试 URL 列表
test_urls = [f"https://httpbin.org/get?id={i}" for i in range(20)]
print("=" * 50)
print(f"爬取 {len(test_urls)} 个 URL")
print("=" * 50)
# 同步
sync_crawler(test_urls)
# 多线程
threaded_crawler(test_urls, max_workers=10)
# 异步(需要 pip install aiohttp)
asyncio.run(async_crawler(test_urls, max_concurrent=20))
4.4 concurrent.futures 统一接口¶
concurrent.futures 提供了统一的高级接口,可以在线程池和进程池之间无缝切换。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def task(n):
"""通用任务"""
time.sleep(0.1)
return n ** 2
def run_with_executor(executor_class, max_workers, data):
"""通用执行器"""
with executor_class(max_workers=max_workers) as executor:
# submit:提交单个任务
future = executor.submit(task, 42)
print(f"单个结果: {future.result()}")
# map:批量提交(保持顺序)
results = list(executor.map(task, data))
print(f"批量结果: {results[:5]}...")
if __name__ == "__main__":
data = list(range(20))
print("--- ThreadPoolExecutor ---")
run_with_executor(ThreadPoolExecutor, 4, data)
print("--- ProcessPoolExecutor ---")
run_with_executor(ProcessPoolExecutor, 4, data)
# 只需要改一个类名!接口完全一致
4.5 常见陷阱与最佳实践¶
陷阱一:async 传染性¶
# ❌ 不能在同步函数中直接调用异步函数
def sync_function():
result = await async_function() # SyntaxError!
# ✅ 正确方式
async def correct_way():
result = await async_function()
# 或者在同步上下文中运行
import asyncio
result = asyncio.run(async_function())
陷阱二:忘记 await¶
# ❌ 忘记 await,协程不会执行
async def main():
asyncio.sleep(1) # 没有 await!这只是创建了协程对象
print("立刻打印了")
# ✅ 正确
async def main():
await asyncio.sleep(1)
print("等待1秒后打印")
陷阱三:在异步代码中使用阻塞操作¶
# ❌ time.sleep 会阻塞整个事件循环
async def bad_example():
time.sleep(5) # 阻塞!其他协程也无法执行
# ✅ 使用异步版本
async def good_example():
await asyncio.sleep(5) # 非阻塞
# ✅ 如果必须使用阻塞操作,放到线程池中
async def necessary_blocking():
loop = asyncio.get_running_loop() # Python 3.10+推荐,替代已弃用的get_event_loop()
result = await loop.run_in_executor(None, requests.get, "https://example.com")
陷阱四:多线程竞态条件¶
# ❌ 非线程安全
counter = 0
def increment():
global counter
for _ in range(1_000_000):
counter += 1 # 读-改-写不是原子操作!
# ✅ 使用锁
lock = threading.Lock()
def safe_increment():
global counter
for _ in range(1_000_000):
with lock:
counter += 1
最佳实践¶
- IO 密集优先用 asyncio:性能最优、资源消耗最小
- CPU 密集用 multiprocessing:绕过 GIL
- 简单并发用 ThreadPoolExecutor:API 简单、易理解
- 控制并发数量:使用 Semaphore 避免资源耗尽
- 优雅关闭:正确处理 KeyboardInterrupt 和资源清理
- 使用
concurrent.futures:统一的接口方便切换策略 - 避免共享可变状态:尽量用消息传递(Queue)而非共享内存
✏️ 练习题¶
基础练习¶
-
编写一个多线程程序,同时下载5个网页内容并打印每个网页的大小。
-
使用
threading.Lock实现一个线程安全的计数器类。 -
使用
asyncio.gather并发执行三个模拟IO任务,对比总耗时与顺序执行的差异。 -
实现生产者-消费者模型,2个生产者、3个消费者的多线程版本。
进阶练习¶
-
编写同步、多线程、异步三个版本的文件批量读取程序,比较性能差异。
-
使用
ProcessPoolExecutor实现并行计算 1 到 N 的素数,对比多进程与单进程的加速比。 -
用
asyncio+aiohttp实现一个简单的异步爬虫,能并发爬取100个页面,控制最大并发数为20。 -
实现一个异步生产者-消费者模型,使用
asyncio.Queue。
📋 面试高频题(15道)¶
Q1: GIL 是什么?为什么存在?如何绕过?
GIL(全局解释器锁)是 CPython 中的互斥锁,确保同一时刻只有一个线程执行 Python 字节码。存在原因是保护 CPython 的引用计数机制。绕过方式:使用多进程、C扩展(NumPy)、其他 Python 实现、asyncio。
Q2: 进程 vs 线程 vs 协程的区别?
| 特性 | 进程 | 线程 | 协程 |
|---|---|---|---|
| 调度者 | OS | OS | 用户代码 |
| 切换方式 | OS抢占式 | OS抢占式 | 协作式(await) |
| 内存 | 独立地址空间 | 共享进程内存 | 共享线程内存 |
| 开销 | 大(MB级) | 中(MB级) | 极小(KB级) |
| 通信 | IPC(队列、管道) | 共享内存(需锁) | 直接访问 |
| 并行 | 真正并行 | 受GIL限制 | 单线程并发 |
Q3: asyncio 事件循环的工作原理?
事件循环维护一个待执行的协程队列。当协程遇到 await 时,它会暂停并让出控制权给事件循环。事件循环检查是否有其他可执行的协程或已完成的IO操作,调度下一个协程运行。整个过程在单线程中完成,通过非阻塞IO和协作式调度实现并发。
Q4: 什么时候用多线程、多进程、异步?
- 多线程:IO密集且并发量不大(<100)、需要共享内存、已有同步库无异步替代
- 多进程:CPU密集型任务、需要利用多核、需要进程隔离
- 异步:高并发IO(>100)、网络服务器、爬虫、已有异步生态库
Q5: threading.Lock 和 threading.RLock 有什么区别?
Lock 同一线程不可重复获取(会死锁),RLock(可重入锁)允许同一线程多次获取。RLock 内部维护一个计数器,获取几次就需要释放几次。
Q6: asyncio.gather 和 asyncio.wait 有什么区别?
gather:等待所有协程完成,按输入顺序返回结果列表,可设置return_exceptions=Truewait:返回两个集合(done, pending),可设置timeout和return_when参数,更灵活
Q7: 为什么 time.sleep 在异步代码中是有害的?
time.sleep 是阻塞调用,会阻塞整个事件循环,导致所有其他协程都无法执行。应该使用 await asyncio.sleep() 替代,它会让出控制权给事件循环。
Q8: 什么是"生产者-消费者"模式?如何实现?
一种经典的并发设计模式:生产者负责生成数据放入缓冲区,消费者从缓冲区取数据处理。解耦了生产和消费的速度差异。Python 中可用 queue.Queue(多线程)、multiprocessing.Queue(多进程)、asyncio.Queue(异步)实现。
Q9: concurrent.futures 模块的优势?
提供统一的高级接口(ThreadPoolExecutor 和 ProcessPoolExecutor),只需更换一个类名就能在线程和进程之间切换。支持 submit(单任务)、map(批量任务)、as_completed(按完成顺序获取)。
Q10: 如何限制异步任务的并发数量?
使用 asyncio.Semaphore:
Q11: 多进程间如何通信?各种方式的优缺点?
- Queue:线程/进程安全,使用简单,性能适中
- Pipe:两个进程间通信,速度快,但只能一对一
- 共享内存(Value/Array):速度快,但需要手动加锁
- Manager:支持复杂数据结构(dict/list),但性能较低
Q12: async with 和 async for 分别用于什么场景?
async with:异步上下文管理器,用于异步的资源获取和释放(如数据库连接、HTTP会话)async for:异步迭代器,用于逐个获取异步生成的数据(如流式读取、WebSocket消息)
Q13: Python 3.12/3.13 在并发方面有什么新特性?
- Python 3.12:改进了
asyncio性能,新增TaskGroup - Python 3.13:实验性的 free-threaded 模式(需安装
python3.13t构建版本或设置PYTHON_GIL=0),可以真正实现多线程并行 TaskGroup提供结构化并发,类似 trio 的 nursery
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(coro1())
task2 = tg.create_task(coro2())
# 所有任务完成后才会离开 with 块
Q14: 什么是 async 传染性?如何处理?
一旦函数变成 async,调用它的函数也必须是 async,像传染一样扩散到整个调用链。处理方法: 1. 尽早在代码架构中引入 async 2. 用 asyncio.run() 在同步入口点运行异步代码 3. 用 loop.run_in_executor() 在异步代码中调用同步函数
Q15: 如何在异步代码中调用同步的阻塞函数?
使用 run_in_executor 将阻塞操作放到线程池中执行:
import asyncio
async def main():
loop = asyncio.get_running_loop()
# 在默认线程池中运行阻塞函数
result = await loop.run_in_executor(None, blocking_function, arg1, arg2)
# 或指定线程池
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=5)
result = await loop.run_in_executor(executor, blocking_function, arg1, arg2)