跳转至

05 - 操作系统基础

目标: 理解操作系统核心概念,为系统编程和性能优化打下基础

时间: 2-3周

核心原则: 理解"为什么"比知道"怎么做"更重要


🎯 为什么要学操作系统?

实际应用场景

Text Only
1. 程序运行缓慢
   - 不了解进程调度 → 无法优化
   - 不了解内存管理 → 内存泄漏无法定位

2. 高并发系统设计
   - 不了解线程/进程 → 无法设计并发模型
   - 不了解锁机制 → 出现死锁无法解决

3. 系统调优
   - 不了解文件系统 → IO性能无法优化
   - 不了解网络栈 → 网络性能无法优化

📚 核心概念

1. 进程与线程

进程 (Process)

Text Only
定义: 程序的一次执行实例,是资源分配的基本单位

进程组成:
┌─────────────────────────┐
│ 代码段 (Text Segment)    │  程序代码
├─────────────────────────┤
│ 数据段 (Data Segment)    │  全局变量、静态变量
├─────────────────────────┤
│ 堆 (Heap)               │  动态内存分配
├─────────────────────────┤
│ 栈 (Stack)              │  局部变量、函数调用
├─────────────────────────┤
│ PCB (进程控制块)         │  进程状态、优先级等
└─────────────────────────┘
Python
# Python多进程示例
import multiprocessing
import os
import time

def worker(name):
    print(f"Worker {name} started, PID: {os.getpid()}")
    time.sleep(2)
    print(f"Worker {name} finished")

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    # 创建进程
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    # 等待所有进程完成
    for p in processes:
        p.join()

    print("All workers finished")

线程 (Thread)

Text Only
定义: 进程内的执行单元,是CPU调度的基本单位

线程组成:
┌─────────────────────────┐
│ 线程ID                   │
├─────────────────────────┤
│ 程序计数器 (PC)          │  下一条指令地址
├─────────────────────────┤
│ 寄存器组                 │
├─────────────────────────┤
│ 栈 (Stack)              │  独立的栈空间
└─────────────────────────┘

特点:
- 同一进程的线程共享代码段、数据段、堆
- 每个线程有独立的栈和寄存器
- 线程切换比进程切换开销小
Python
# Python多线程示例
import threading
import time

def worker(name):
    print(f"Thread {name} started")
    time.sleep(2)
    print(f"Thread {name} finished")

# 创建线程
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

print("All threads finished")

进程 vs 线程

特性 进程 线程
资源占用 独立地址空间,资源多 共享进程资源,资源少
通信方式 IPC(管道、消息队列等) 直接共享内存
切换开销 大(需要切换页表) 小(只需切换寄存器)
安全性 一个崩溃不影响其他 一个崩溃可能导致整个进程崩溃
创建速度
适用场景 CPU密集型 IO密集型

2. 内存管理

虚拟内存

Text Only
概念: 每个进程拥有独立的虚拟地址空间,操作系统负责映射到物理内存

虚拟地址空间布局 (64位Linux):
┌──────────────────────────┐ 高地址
│       内核空间            │  (128TB)
├──────────────────────────┤
│       栈 (向下增长)        │
│           ↓              │
├──────────────────────────┤
│       内存映射区域         │  (动态库、文件映射)
├──────────────────────────┤
│           ↑              │
│       堆 (向上增长)        │
├──────────────────────────┤
│       BSS段 (未初始化数据) │
├──────────────────────────┤
│       数据段 (已初始化数据) │
├──────────────────────────┤
│       代码段              │
└──────────────────────────┘ 低地址 (0x400000)

内存分配

Python
# Python内存管理示例
import sys

# 查看对象内存占用
a = [1, 2, 3, 4, 5]
print(f"List size: {sys.getsizeof(a)} bytes")

# 内存池机制
# Python使用pymalloc管理小内存(<512字节)
# 大内存直接调用C的malloc

# 垃圾回收
import gc

# 查看垃圾回收器状态
print(gc.get_count())  # (年轻代, 中年代, 老年代)

# 手动触发垃圾回收
gc.collect()

# 循环引用示例
class Node:
    def __init__(self):
        self.ref = None

a = Node()
b = Node()
a.ref = b
b.ref = a

# 即使删除引用,循环引用对象也不会被立即回收
del a
del b

# 需要垃圾回收器检测循环引用
gc.collect()

页面置换算法

Python
# 简单的LRU缓存实现(基于操作系统页面置换思想)
from collections import OrderedDict

class LRUCache:
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()

    def get(self, key):
        if key not in self.cache:
            return -1
        # 移动到末尾(最近使用)
        self.cache.move_to_end(key)
        return self.cache[key]

    def put(self, key, value):
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = value
        # 超出容量,移除最久未使用的
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)

# 使用示例
cache = LRUCache(2)
cache.put(1, 1)
cache.put(2, 2)
print(cache.get(1))    # 返回 1
cache.put(3, 3)        # 移除 key 2
print(cache.get(2))    # 返回 -1

3. 文件系统

文件IO模型

Python
import os
import mmap

# 1. 普通文件IO
with open('test.txt', 'w') as f:  # with自动管理资源,确保文件正确关闭
    f.write('Hello World')

# 2. 内存映射文件(mmap)
# 适合大文件随机访问
with open('large_file.bin', 'r+b') as f:
    # 内存映射
    mmapped = mmap.mmap(f.fileno(), 0)
    # 像操作内存一样操作文件
    print(mmapped[:10])
    mmapped.close()

# 3. 零拷贝(Zero-copy)
# 使用sendfile系统调用,数据不经过用户空间
import socket

sock = socket.socket()
# 在Linux上可以使用sendfile
# sock.sendfile(open('file.bin', 'rb'))

IO多路复用

Python
# select/poll/epoll示例
import select
import socket

# 创建服务器socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 8080))
server.listen(5)
server.setblocking(False)

# 使用select实现IO多路复用
inputs = [server]
outputs = []

while inputs:
    # 等待IO事件
    readable, writable, exceptional = select.select(inputs, outputs, inputs)

    for s in readable:
        if s is server:
            # 新连接
            connection, client_address = s.accept()
            connection.setblocking(False)
            inputs.append(connection)
        else:
            # 接收数据
            data = s.recv(1024)
            if data:
                # 处理数据
                pass
            else:
                # 连接关闭
                inputs.remove(s)
                s.close()

4. 进程间通信 (IPC)

Python
# 1. 管道 (Pipe)
import multiprocessing

def sender(conn):
    conn.send("Hello from sender")
    conn.close()

def receiver(conn):
    msg = conn.recv()
    print(f"Received: {msg}")

parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(child_conn,))
p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))

p1.start()
p2.start()
p1.join()
p2.join()

# 2. 共享内存
from multiprocessing import Process, Value, Array

def modify_shared(n, arr):
    n.value = 3.14159
    for i in range(len(arr)):
        arr[i] = -arr[i]

num = Value('d', 0.0)  # 共享双精度浮点数
arr = Array('i', range(10))  # 共享整数数组

p = Process(target=modify_shared, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])

# 3. 消息队列
from multiprocessing import Queue

def worker(q):
    q.put("Message from worker")

q = Queue()
p = Process(target=worker, args=(q,))
p.start()
print(q.get())  # 阻塞等待消息
p.join()

5. 同步机制

Python
import threading  # 线程池/多线程:并发执行任务
import time

# 1. 锁 (Lock)
lock = threading.Lock()
counter = 0

def increment():
    global counter
    for _ in range(100000):
        with lock:  # 获取锁
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Counter: {counter}")  # 应该是1000000

# 2. 信号量 (Semaphore)
# 控制同时访问的线程数量
semaphore = threading.Semaphore(3)  # 最多3个线程同时访问

def limited_resource():
    with semaphore:
        print(f"{threading.current_thread().name} accessing resource")
        time.sleep(1)

# 3. 条件变量 (Condition)
# 用于线程间协调
condition = threading.Condition()
shared_data = []

def producer():
    with condition:
        shared_data.append("data")
        condition.notify_all()  # 通知等待的线程

def consumer():
    with condition:
        while not shared_data:
            condition.wait()  # 等待数据
        data = shared_data.pop()
        print(f"Consumed: {data}")

# 4. 读写锁 (RLock)
# 允许多个读线程,但写线程独占
rw_lock = threading.RLock()

🛠️ 实践练习

练习1: 实现简单的进程池

Python
import multiprocessing
import time
from queue import Empty

class ProcessPool:
    """简单的进程池实现"""

    def __init__(self, num_workers):
        self.num_workers = num_workers
        self.task_queue = multiprocessing.Queue()
        self.result_queue = multiprocessing.Queue()
        self.workers = []

    def worker_func(self):
        """工作进程函数"""
        while True:
            try:  # try/except捕获异常
                task = self.task_queue.get(timeout=1)
                if task is None:  # 结束信号
                    break
                func, args = task
                result = func(*args)  # *args接收任意位置参数;**kwargs接收任意关键字参数
                self.result_queue.put(result)
            except Empty:
                continue

    def start(self):
        """启动工作进程"""
        for _ in range(self.num_workers):
            p = multiprocessing.Process(target=self.worker_func)
            p.start()
            self.workers.append(p)

    def submit(self, func, args=()):
        """提交任务"""
        self.task_queue.put((func, args))

    def get_result(self, timeout=None):
        """获取结果"""
        return self.result_queue.get(timeout=timeout)

    def stop(self):
        """停止所有工作进程"""
        for _ in self.workers:
            self.task_queue.put(None)
        for w in self.workers:
            w.join()

# 使用示例
def square(n):
    time.sleep(1)
    return n * n

pool = ProcessPool(4)
pool.start()

for i in range(10):
    pool.submit(square, (i,))

for _ in range(10):
    print(pool.get_result())

pool.stop()

练习2: 内存分析工具

Python
import tracemalloc
import sys

def analyze_memory():
    """分析内存使用情况"""
    # 开始跟踪内存分配
    tracemalloc.start()

    # 执行一些操作
    data = [i for i in range(100000)]

    # 获取内存分配快照
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')

    print("Top 10 memory allocations:")
    for stat in top_stats[:10]:  # 切片操作:[start:end:step]提取子序列
        print(stat)

    tracemalloc.stop()

# 对象引用计数分析
import gc

def analyze_references():
    """分析对象引用关系"""
    class Node:
        def __init__(self, name):
            self.name = name
            self.ref = None

    a = Node("A")
    b = Node("B")
    a.ref = b
    b.ref = a

    # 获取引用对象
    print(f"Referrers of a: {gc.get_referrers(a)}")
    print(f"Referents of a: {gc.get_referents(a)}")

✅ 学习检查点

完成本阶段后,你应该能够:

  • 解释进程和线程的区别及适用场景
  • 使用Python实现多进程和多线程程序
  • 理解虚拟内存的概念和作用
  • 使用同步机制(锁、信号量、条件变量)解决并发问题
  • 理解不同IO模型的特点和适用场景
  • 实现基本的进程间通信

📚 推荐资源

书籍

  • 《操作系统导论》(Operating Systems: Three Easy Pieces)
  • 《深入理解计算机系统》(CSAPP)
  • 《Linux内核设计与实现》

在线资源

  • MIT 6.828: Operating System Engineering
  • 清华操作系统课程(B站)
  • Linux内核文档

记住:操作系统是计算机科学的基石,理解它才能写出高效、稳定的程序! 🖥️