跳转至

🔄 Go并发编程

📋 学习目标

通过本章学习,你将能够:

  • 深入理解 Goroutine 调度模型(GMP)与栈增长机制
  • 掌握 Channel 的高级模式(fan-in/fan-out/pipeline/done)
  • 熟练使用 sync 包的各种同步原语
  • 运用 context 包进行超时控制和取消传播
  • 使用 errgroup 进行并发错误处理
  • 实现 Worker Pool 等经典并发模式
  • 掌握 select 多路复用的用法与技巧
  • 能用 -race 检测竞态条件
  • 具备并发相关面试题的解答能力

Go并发GMP与模式总览

上图先给出 GMP 调度模型,再映射到 channel、worker pool、context/errgroup 等工程实践模式。


1️⃣ Goroutine 深入理解

1.1 Goroutine 基础

Go
package main

import (
    "fmt"
    "sync"
    "time"
)

func sayHello(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Hello from goroutine %d\n", id)
    time.Sleep(100 * time.Millisecond)
}

func main() {
    var wg sync.WaitGroup  // WaitGroup 等待一组goroutine完成

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go sayHello(i, &wg)  // go 关键字启动goroutine并发执行
    }

    wg.Wait() // 等待所有 goroutine 完成
    fmt.Println("All goroutines finished")
}

1.2 GMP 调度模型

Text Only
Go 运行时采用 GMP 调度模型来管理 goroutine:

G (Goroutine) — 用户级协程,初始栈仅 2KB(可动态增长到 1GB)
M (Machine)   — 操作系统线程,实际执行 goroutine 的载体
P (Processor) — 逻辑处理器,默认数量 = GOMAXPROCS(通常等于 CPU 核数)

调度流程:
┌─────────┐    ┌─────────┐    ┌─────────┐
│  P (LRQ) │    │  P (LRQ) │    │  P (LRQ) │   LRQ = Local Run Queue
│  ┌─┬─┬─┐│    │  ┌─┬─┐  │    │  ┌─┐    │
│  │G│G│G││    │  │G│G│  │    │  │G│    │
│  └─┴─┴─┘│    │  └─┴─┘  │    │  └─┘    │
│    ↓     │    │    ↓     │    │    ↓     │
│    M     │    │    M     │    │    M     │   M = OS Thread
└─────────┘    └─────────┘    └─────────┘
      ↑               ↑              ↑
      └───────────────┼──────────────┘
              ┌───────────────┐
              │   GRQ (全局)   │  GRQ = Global Run Queue
              │  ┌─┬─┬─┬─┐   │
              │  │G│G│G│G│   │
              │  └─┴─┴─┴─┘   │
              └───────────────┘

关键机制:
• Work Stealing —— P 的本地队列为空时,从其他 P 或全局队列偷取 G
• Hand Off —— G 发生系统调用阻塞时,P 与当前 M 解绑,绑定新的 M 继续执行
• 抢占式调度 —— Go 1.14+ 基于信号的异步抢占,防止 goroutine 长期占用

1.3 栈增长机制

Go
package main

import (
    "fmt"
    "runtime"
)

// goroutine 初始栈仅 2KB,按需增长(连续栈/copy stack)
// 当栈空间不足时,运行时会分配一个更大的栈(通常 2 倍),
// 并将旧栈内容复制过去,然后更新所有指针

func deepRecursion(n int) int {
    if n <= 0 {
        // 打印当前 goroutine 的栈使用情况
        var m runtime.MemStats
        runtime.ReadMemStats(&m)
        fmt.Printf("Stack in use: %d KB\n", m.StackInuse/1024)
        return 0
    }
    return n + deepRecursion(n-1)
}

func main() {
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
    fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())

    result := deepRecursion(10000)
    fmt.Printf("Result: %d\n", result)
}

2️⃣ Channel 模式

2.1 Channel 基础

Go
package main

import "fmt"

func main() {
    // 无缓冲 channel(同步通信)
    ch := make(chan int)  // chan 创建通道,用于goroutine间通信
    go func() {
        ch <- 42 // 发送(阻塞直到接收方就绪)
    }()
    val := <-ch // 接收(阻塞直到发送方就绪)
    fmt.Println(val) // 42

    // 有缓冲 channel(异步通信)
    bufCh := make(chan string, 3)
    bufCh <- "A"
    bufCh <- "B"
    bufCh <- "C"
    // bufCh <- "D"  // 缓冲满,会阻塞
    fmt.Println(<-bufCh) // A(FIFO)

    // 单向 channel
    // chan<- int  只发送
    // <-chan int  只接收

    // 关闭 channel
    dataCh := make(chan int, 5)
    for i := 0; i < 5; i++ {
        dataCh <- i
    }
    close(dataCh)

    // range 遍历已关闭的 channel
    for v := range dataCh {
        fmt.Print(v, " ") // 0 1 2 3 4
    }
    fmt.Println()
}

2.2 Pipeline 模式

Go
package main

import "fmt"

// 生成器:产生数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// 平方:对数据变换
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// 过滤:只保留偶数
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // pipeline: generate → square → filterEven
    nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(nums)
    evens := filterEven(squared)

    for v := range evens {
        fmt.Print(v, " ") // 4 16 36 64 100
    }
    fmt.Println()
}

2.3 Fan-Out / Fan-In 模式

Go
package main

import (
    "fmt"
    "sync"
)

// Fan-Out: 一个 channel 的数据分发给多个 worker
// Fan-In:  多个 channel 的结果合并到一个 channel

// channels ...<-chan int: 可变参数,每个都是只读 channel; 返回值 <-chan int 也是只读 channel
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    // 每个输入 channel 启动一个 goroutine
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch) // 立即调用匿名函数并传入ch,避免闭包捕获循环变量(Go经典模式)
    }

    // 所有输入完成后关闭输出
    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func worker(id int, jobs <-chan int) <-chan int {
    results := make(chan int)
    go func() {
        defer close(results)
        for j := range jobs {
            result := j * j // 模拟处理
            fmt.Printf("Worker %d processed %d → %d\n", id, j, result)
            results <- result
        }
    }()
    return results
}

func main() {
    jobs := make(chan int, 10)

    // 发送任务
    go func() {
        for i := 1; i <= 9; i++ {
            jobs <- i
        }
        close(jobs)
    }()

    // Fan-Out: 启动 3 个 worker
    w1 := worker(1, jobs)
    w2 := worker(2, jobs)
    w3 := worker(3, jobs)

    // Fan-In: 合并结果
    for result := range fanIn(w1, w2, w3) {
        fmt.Println("Result:", result)
    }
}

2.4 Done Channel 模式(优雅取消)

Go
package main

import (
    "fmt"
    "time"
)

// done channel 用于通知 goroutine 停止工作
func doWork(done <-chan struct{}, id int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        i := 0
        for {
            select {  // select 多路复用,监听多个channel操作
            case <-done:
                fmt.Printf("Worker %d: cancelled\n", id)
                return
            case out <- i:
                i++
                time.Sleep(50 * time.Millisecond)
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})

    ch := doWork(done, 1)

    // 收集一些数据
    for i := 0; i < 5; i++ {
        fmt.Println("Received:", <-ch)
    }

    // 通知取消
    close(done) // 关闭 done channel 会让所有读取操作立即返回
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Main: done")
}

3️⃣ sync 包

3.1 WaitGroup

Go
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    urls := []string{
        "https://golang.org",
        "https://google.com",
        "https://github.com",
    }

    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()
            fmt.Printf("Fetching %s\n", u)
            // 模拟 HTTP 请求...
        }(url)
    }

    wg.Wait()
    fmt.Println("All fetches complete")
}

3.2 Mutex / RWMutex

Go
package main

import (
    "fmt"
    "sync"
)

// 线程安全的计数器
// ⚠️ 优化提示:读操作 Value() 使用了排他锁 Lock(),会阻塞其他读者。
// 在读多写少场景下,建议使用 sync.RWMutex,读操作用 RLock() 以允许并发读取。
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.v[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.v[key]
}

// 读写锁:读多写少场景
type RWConfig struct {
    mu   sync.RWMutex
    data map[string]string
}

func (c *RWConfig) Get(key string) string {
    c.mu.RLock()         // 读锁(多个读者可并发)
    defer c.mu.RUnlock()
    return c.data[key]
}

func (c *RWConfig) Set(key, value string) {
    c.mu.Lock()          // 写锁(独占)
    defer c.mu.Unlock()
    c.data[key] = value
}

func main() {
    counter := SafeCounter{v: make(map[string]int)}
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc("key")
        }()
    }

    wg.Wait()
    fmt.Println("Counter:", counter.Value("key")) // 1000
}

3.3 Once(只执行一次)

Go
package main

import (
    "fmt"
    "sync"
)

type Database struct {
    name string
}

var (
    dbOnce     sync.Once
    dbInstance *Database
)

func GetDB() *Database {
    dbOnce.Do(func() {
        fmt.Println("Initializing database connection...")
        dbInstance = &Database{name: "production"}
    })
    return dbInstance
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            db := GetDB()
            fmt.Printf("Goroutine %d got DB: %s\n", id, db.name)
        }(i)
    }
    wg.Wait()
    // "Initializing database connection..." 只打印一次
}

3.4 sync.Map(并发安全 Map)

Go
package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map

    // Store
    m.Store("lang", "Go")
    m.Store("version", 1.21)

    // Load
    if val, ok := m.Load("lang"); ok {
        fmt.Println("lang:", val) // lang: Go
    }

    // LoadOrStore(不存在则存储,存在则返回已有值)
    actual, loaded := m.LoadOrStore("lang", "Rust")
    fmt.Println(actual, loaded) // Go true

    // Range 遍历
    m.Range(func(key, value any) bool {
        fmt.Printf("%v = %v\n", key, value)
        return true // 返回 false 停止遍历
    })

    // Delete
    m.Delete("version")

    // ⚠️ sync.Map 适用场景:
    // - key 相对稳定,读多写少
    // - 多个 goroutine 读写不同的 key
    // - 不适合大量写入或需要遍历计数的场景
}

3.5 sync.Pool(对象池)

Go
package main

import (
    "bytes"
    "fmt"
    "sync"
)

// sync.Pool 用于复用临时对象,减少 GC 压力
var bufPool = sync.Pool{
    New: func() any {
        return new(bytes.Buffer)
    },
}

func process(data string) string {
    buf := bufPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()
        bufPool.Put(buf) // 用完归还
    }()

    buf.WriteString("processed: ")
    buf.WriteString(data)
    return buf.String()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := process(fmt.Sprintf("data-%d", id))
            fmt.Println(result)
        }(i)
    }
    wg.Wait()
}

4️⃣ context 包

4.1 WithCancel

Go
package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {  // context.Context 控制goroutine生命周期
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: stopped (%v)\n", id, ctx.Err())
            return
        default:
            fmt.Printf("Worker %d: working...\n", id)
            time.Sleep(200 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }

    time.Sleep(600 * time.Millisecond)
    cancel() // 取消所有 worker
    time.Sleep(100 * time.Millisecond)
    fmt.Println("All workers cancelled")
}

4.2 WithTimeout / WithDeadline

Go
package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(2 * time.Second): // 模拟慢操作
        return "completed", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // WithTimeout: 500ms 超时
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel() // 总是调用 cancel 释放资源

    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("Error:", err) // context deadline exceeded
        return
    }
    fmt.Println("Result:", result)
}

4.3 WithValue(链式传播)

Go
package main

import (
    "context"
    "fmt"
)

type contextKey string

const (
    keyRequestID contextKey = "requestID"
    keyUserID    contextKey = "userID"
)

func middleware(ctx context.Context) context.Context {
    ctx = context.WithValue(ctx, keyRequestID, "req-abc-123")
    ctx = context.WithValue(ctx, keyUserID, 42)
    return ctx
}

func handler(ctx context.Context) {
    reqID := ctx.Value(keyRequestID).(string)
    userID := ctx.Value(keyUserID).(int)
    fmt.Printf("Request: %s, User: %d\n", reqID, userID)
}

func main() {
    // 链式传播:Background → WithValue(requestID) → WithValue(userID)
    ctx := context.Background()
    ctx = middleware(ctx)
    handler(ctx)

    // ⚠️ context.WithValue 最佳实践:
    // - 使用自定义类型作 key(避免冲突)
    // - 仅传递请求级别的数据(如 traceID、auth token)
    // - 不要用来传递函数参数或业务逻辑数据
}

4.4 Context 树与取消传播

Go
package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // 父 context
    parentCtx, parentCancel := context.WithCancel(context.Background())

    // 子 context(继承父 context 的取消信号)
    // ⚠️ 即使父 cancel 会连带取消子 context,子 cancel 仍必须调用
    //    以释放关联的资源(如 WithTimeout 的内部定时器 goroutine)
    childCtx1, childCancel1 := context.WithTimeout(parentCtx, 5*time.Second)
    defer childCancel1() // 必须调用!否则定时器 goroutine 会泄漏
    childCtx2, childCancel2 := context.WithCancel(parentCtx)
    defer childCancel2()

    go func() {
        <-childCtx1.Done()
        fmt.Println("Child1 cancelled:", childCtx1.Err())
    }()
    go func() {
        <-childCtx2.Done()
        fmt.Println("Child2 cancelled:", childCtx2.Err())
    }()

    // 取消父 context → 所有子 context 都会被取消
    time.Sleep(100 * time.Millisecond)
    parentCancel()
    time.Sleep(100 * time.Millisecond)
    // 输出:
    // Child1 cancelled: context canceled
    // Child2 cancelled: context canceled
}

5️⃣ errgroup 并发错误处理

Go
package main

import (
    "context"
    "fmt"
    "errors"
    "time"

    "golang.org/x/sync/errgroup"
)

func fetchURL(ctx context.Context, url string) error {
    // 模拟 HTTP 请求
    select {
    case <-time.After(100 * time.Millisecond):
        if url == "https://bad.example.com" {
            return fmt.Errorf("failed to fetch %s: 404", url)
        }
        fmt.Printf("Fetched: %s\n", url)
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    urls := []string{
        "https://golang.org",
        "https://google.com",
        "https://bad.example.com",
        "https://github.com",
    }

    // errgroup.WithContext:
    // - 任何一个 goroutine 返回 error → 自动 cancel context
    // - Wait() 返回第一个非 nil error
    g, ctx := errgroup.WithContext(context.Background())

    for _, url := range urls {
        url := url // 捕获循环变量(Go < 1.22)
        g.Go(func() error {
            return fetchURL(ctx, url)
        })
    }

    if err := g.Wait(); err != nil {
        fmt.Println("Error:", err)
    } else {
        fmt.Println("All fetches succeeded")
    }
}

// errgroup 还支持限制并发数:
// g.SetLimit(3)  // 最多 3 个 goroutine 并发执行

6️⃣ Worker Pool 模式

Go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID      int
    Payload string
}

type Result struct {
    JobID  int
    Output string
}

func workerPool(numWorkers int, jobs <-chan Job, results chan<- Result) {
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs {
                // 模拟处理
                time.Sleep(50 * time.Millisecond)
                output := fmt.Sprintf("Worker %d processed job %d: %s",
                    workerID, job.ID, job.Payload)
                results <- Result{JobID: job.ID, Output: output}
            }
        }(i)
    }

    // 所有 worker 完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()
}

func main() {
    const numJobs = 20
    const numWorkers = 4

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)

    // 启动 worker pool
    workerPool(numWorkers, jobs, results)

    // 发送任务
    for i := 0; i < numJobs; i++ {
        jobs <- Job{ID: i, Payload: fmt.Sprintf("task-%d", i)}
    }
    close(jobs)

    // 收集结果
    for r := range results {
        fmt.Println(r.Output)
    }
}

带速率限制的 Worker Pool

Go
package main

import (
    "fmt"
    "sync"
    "time"
)

func rateLimitedPool(numWorkers int, rateLimit time.Duration) {
    jobs := make(chan int, 100)
    var wg sync.WaitGroup

    // 速率限制器
    limiter := time.NewTicker(rateLimit)
    defer limiter.Stop()

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for job := range jobs {
                <-limiter.C // 等待限速令牌
                fmt.Printf("[%s] Worker %d: job %d\n",
                    time.Now().Format("15:04:05.000"), id, job)
            }
        }(i)
    }

    // 提交任务
    for i := 0; i < 10; i++ {
        jobs <- i
    }
    close(jobs)
    wg.Wait()
}

func main() {
    // 每 200ms 最多处理一个任务
    rateLimitedPool(3, 200*time.Millisecond)
}

7️⃣ select 多路复用详解

Go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "from ch1"
    }()
    go func() {
        time.Sleep(200 * time.Millisecond)
        ch2 <- "from ch2"
    }()

    // select: 同时监听多个 channel,哪个先就绪执行哪个
    for i := 0; i < 2; i++ {
        select {
        case msg := <-ch1:
            fmt.Println(msg)
        case msg := <-ch2:
            fmt.Println(msg)
        }
    }

    // ──── select 的常见模式 ────

    // 1. 超时控制
    select {
    case msg := <-ch1:
        fmt.Println(msg)
    case <-time.After(500 * time.Millisecond):
        fmt.Println("Timeout!")
    }

    // 2. 非阻塞操作
    select {
    case msg := <-ch1:
        fmt.Println(msg)
    default:
        fmt.Println("No message available (non-blocking)")
    }

    // 3. 定时器循环
    ticker := time.NewTicker(300 * time.Millisecond)
    defer ticker.Stop()
    done := make(chan struct{})

    go func() {
        time.Sleep(1 * time.Second)
        close(done)
    }()

    for {
        select {
        case t := <-ticker.C:
            fmt.Println("Tick at", t.Format("15:04:05.000"))
        case <-done:
            fmt.Println("Done!")
            return
        }
    }
}

select 与 nil channel 技巧

Go
package main

import "fmt"

func main() {
    ch1 := make(chan int, 2)
    ch2 := make(chan int, 2)
    ch1 <- 1
    ch1 <- 2
    ch2 <- 3
    ch2 <- 4
    close(ch1)
    close(ch2)

    // 技巧:读完一个 channel 后将其设为 nil,
    // nil channel 在 select 中永远阻塞(被跳过)
    var c1, c2 <-chan int = ch1, ch2
    for c1 != nil || c2 != nil {
        select {
        case v, ok := <-c1:
            if !ok {
                c1 = nil
                continue
            }
            fmt.Println("ch1:", v)
        case v, ok := <-c2:
            if !ok {
                c2 = nil
                continue
            }
            fmt.Println("ch2:", v)
        }
    }
    fmt.Println("Both channels drained")
}

8️⃣ 竞态检测(-race flag)

Go
package main

import (
    "fmt"
    "sync"
)

// ❌ 有数据竞争的代码
var unsafeCounter int

func unsafeIncrement(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        unsafeCounter++ // DATA RACE!
    }
}

// ✅ 安全版本
var (
    safeCounter int
    mu          sync.Mutex
)

func safeIncrement(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        mu.Lock()
        safeCounter++
        mu.Unlock()
    }
}

func main() {
    var wg sync.WaitGroup

    // 运行命令: go run -race main.go
    // 会检测并报告数据竞争

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go unsafeIncrement(&wg) // ← -race 会报告这里
    }
    wg.Wait()
    fmt.Println("Unsafe:", unsafeCounter) // 结果不确定

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go safeIncrement(&wg)
    }
    wg.Wait()
    fmt.Println("Safe:", safeCounter) // 始终 10000
}

/*
使用方式:
  go run -race main.go     # 运行时检测
  go test -race ./...       # 测试时检测
  go build -race            # 构建带竞态检测的二进制

-race 的限制:
  - 只能检测运行时实际发生的竞态(非静态分析)
  - 会显著降低性能(10x 左右)
  - 建议在 CI 中对所有测试开启 -race
*/

9️⃣ 并发安全的设计模式

9.1 不可变数据

Go
package main

import "fmt"

// 不可变数据天然并发安全——创建后不再修改
type ImmutableConfig struct {
    host string
    port int
}

func NewConfig(host string, port int) ImmutableConfig {
    return ImmutableConfig{host: host, port: port}
}

func (c ImmutableConfig) Host() string { return c.host }
func (c ImmutableConfig) Port() int    { return c.port }

// 需要"修改"时,返回新实例
func (c ImmutableConfig) WithPort(port int) ImmutableConfig {
    return ImmutableConfig{host: c.host, port: port}
}

func main() {
    cfg := NewConfig("localhost", 8080)
    newCfg := cfg.WithPort(9090)
    fmt.Println(cfg.Port())    // 8080(原始不变)
    fmt.Println(newCfg.Port()) // 9090(新实例)
}

9.2 Channel 封装并发(CSP 风格)

Go
package main

import "fmt"

// 用 channel 代替共享内存 + mutex
// "Do not communicate by sharing memory; share memory by communicating."

type BankAccount struct {
    balance  int
    deposits chan int
    withdrawals chan withdrawReq
    balanceReqs chan chan int
    done     chan struct{}
}

type withdrawReq struct {
    amount int
    result chan bool
}

func NewBankAccount(initial int) *BankAccount {
    ba := &BankAccount{
        balance:     initial,
        deposits:    make(chan int),
        withdrawals: make(chan withdrawReq),
        balanceReqs: make(chan chan int),
        done:        make(chan struct{}),
    }
    go ba.run()
    return ba
}

func (ba *BankAccount) run() {
    for {
        select {
        case amount := <-ba.deposits:
            ba.balance += amount
        case req := <-ba.withdrawals:
            if ba.balance >= req.amount {
                ba.balance -= req.amount
                req.result <- true
            } else {
                req.result <- false
            }
        case reply := <-ba.balanceReqs:
            reply <- ba.balance
        case <-ba.done:
            return
        }
    }
}

func (ba *BankAccount) Deposit(amount int)  { ba.deposits <- amount }
func (ba *BankAccount) Withdraw(amount int) bool {
    req := withdrawReq{amount: amount, result: make(chan bool)}
    ba.withdrawals <- req
    return <-req.result
}
func (ba *BankAccount) Balance() int {
    reply := make(chan int)
    ba.balanceReqs <- reply
    return <-reply
}
func (ba *BankAccount) Close() { close(ba.done) }

func main() {
    acct := NewBankAccount(1000)
    defer acct.Close()

    acct.Deposit(500)
    ok := acct.Withdraw(200)
    fmt.Println("Withdraw 200:", ok)       // true
    fmt.Println("Balance:", acct.Balance()) // 1300
}

9.3 选择 Channel 还是 Mutex?

Text Only
                  Channel                      Mutex
─────────────────────────────────────────────────────────────
模型           CSP(通信顺序进程)            共享内存
适用           数据在 goroutine 间流动        保护共享状态的原地修改
类比           管道 / 消息传递                锁 / 临界区
优势           天然防止共享、解耦好            简单直接、性能好

选用指南:
✅ 用 Channel 当:
  - 传递数据所有权(一个 goroutine 产生,另一个消费)
  - 协调多个 goroutine 的启停
  - Pipeline / Fan-out Fan-in 模式

✅ 用 Mutex 当:
  - 保护一个被多次读写的共享变量
  - 实现缓存、计数器等简单共享状态
  - 性能敏感,临界区非常短

🎯 面试题精选

Q1: Goroutine 泄漏如何排查?

Text Only
常见泄漏场景:
1. channel 无人读/写导致 goroutine 永远阻塞
2. 忘记关闭 channel,range 循环永远等待
3. context 没有正确传递取消信号
4. select 里缺少 done/timeout 分支

排查方法:
✅ runtime.NumGoroutine() —— 监控 goroutine 数量变化
✅ pprof goroutine profile:
   import _ "net/http/pprof"
   go tool pprof http://localhost:6060/debug/pprof/goroutine
✅ go test -count=1 -run TestXxx 后检查 goroutine 数是否回到基线
✅ goleak 库:go.uber.org/goleak

预防措施:
- 总是为 channel 操作配 context/done/timeout
- defer cancel() 确保 context 被取消
- 总是关闭(或让发送方关闭)channel

Q2: Channel 和 Mutex 怎么选?

Text Only
见上方 9.3 节的详细对比。简短回答:

- 数据需要在 goroutine 间"流动" → Channel
- 数据在一个地方被多个 goroutine "共享访问" → Mutex
- 不确定时,先用 Channel(更符合 Go 哲学),
  如果发现代码变复杂了再改用 Mutex

Q3: Buffered Channel 和 Unbuffered Channel 有什么区别?

Text Only
Unbuffered (make(chan T)):
  - 发送和接收同步阻塞(rendezvous)
  - 保证发送方执行在接收方之前(happens-before)
  - 适合需要严格同步的场景

Buffered (make(chan T, n)):
  - 缓冲未满时发送不阻塞
  - 缓冲非空时接收不阻塞
  - 适合生产者-消费者解耦、限制并发度

常见用法:
  sem := make(chan struct{}, N)  // 信号量,限制 N 个并发

Q4: sync.WaitGroup 使用有哪些注意事项?

Text Only
1. Add() 必须在 go func() 之前调用,否则 Wait() 可能提前返回
2. Add() 的数量必须与 Done() 匹配,多了死锁,少了 panic
3. WaitGroup 不可拷贝(传参用指针)
4. 嵌套使用时注意死锁:Wait() 阻塞的 goroutine 不能再 Add()

正确模式:
  wg.Add(1)          // 先 Add
  go func() {
      defer wg.Done() // 用 defer 确保 Done 被调用
      // ...
  }()

Q5: Context 包的设计理念是什么?

Text Only
Context 解决的问题:
- 请求级别的取消传播(一个请求取消,所有关联操作都取消)
- 超时控制(防止慢请求占用资源)
- 请求级元数据传递(如 traceID、认证信息)

核心规则:
1. 函数的第一个参数应为 ctx context.Context
2. 不要在 struct 中存储 context
3. 不要传递 nil context,不确定时用 context.TODO()
4. context.Value 仅传递请求级数据,不传业务参数
5. 总是 defer cancel() 释放资源

Context 树:取消父节点 → 所有子节点递归取消

Q6: 什么是 GMP 模型?GOMAXPROCS 设多少?

Text Only
GMP = Goroutine + Machine(OS Thread) + Processor

G: goroutine,用户态协程
M: 操作系统线程,执行 goroutine 的载体
P: 逻辑处理器,持有本地 goroutine 队列

GOMAXPROCS 默认值 = runtime.NumCPU()
- CPU 密集型:GOMAXPROCS = CPU 核数(默认即可)
- IO 密集型:GOMAXPROCS = CPU 核数 或适当增大
- 容器环境:Go 1.19+ 自动识别 cgroup 限制,
  旧版本可用 go.uber.org/automaxprocs

Q7: 如何优雅关闭一组 goroutine?

Go
// 模式1: Done Channel
done := make(chan struct{})
// 启动 goroutine,内部 select case <-done
close(done) // 通知所有 goroutine 退出

// 模式2: Context
ctx, cancel := context.WithCancel(context.Background())
// 启动 goroutine,内部 select case <-ctx.Done()
cancel() // 取消所有 goroutine

// 模式3: errgroup(任一出错自动取消)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { ... })
err := g.Wait()

// 关键原则:
// - 谁创建 goroutine,谁负责确保其能退出
// - 使用 WaitGroup 等待所有 goroutine 完成
// - 清理顺序:发送停止信号 → 等待完成 → 关闭资源

Q8: 解释 Go 的 happens-before 规则

Text Only
happens-before 定义了操作之间的可见性保证:
如果事件 A happens-before 事件 B,那么 A 的效果对 B 可见。

Go 中关键的 happens-before 关系:
1. 同一 goroutine 中:代码顺序 = happens-before 顺序
2. Channel:
   - 发送 happens-before 对应的接收完成
   - 关闭 happens-before 接收到零值
   - 无缓冲:接收 happens-before 发送完成
3. sync.Mutex:
   - Unlock happens-before 下一次 Lock
4. sync.Once:
   - f() 完成 happens-before 任何 Do(f) 返回
5. go 语句:
   - go f() 中的 go 语句 happens-before f() 开始执行

没有 happens-before 关系的操作,编译器和 CPU 可能重排,
这就是为什么需要同步原语。

Q9: select 的随机性与应用

Text Only
当 select 多个 case 同时就绪时,Go 会随机选择一个执行。
这不是 bug,而是故意的设计——防止饥饿。

应用场景:
1. 负载均衡:多个相同的 worker channel,select 随机分配
2. 超时控制:case + time.After
3. 非阻塞尝试:case + default
4. 优先级模式(需要嵌套 select 或额外逻辑):

// 伪代码:优先处理高优先级
select {
case msg := <-highPriority:
    handle(msg)
default:
    select {
    case msg := <-highPriority:
        handle(msg)
    case msg := <-lowPriority:
        handle(msg)
    }
}

✅ 学习检查清单

主题 掌握情况
Goroutine 创建与管理
GMP 调度模型原理
Channel 基础(有/无缓冲、关闭、range)
Pipeline 模式
Fan-In / Fan-Out 模式
Done Channel 取消模式
sync.WaitGroup
sync.Mutex / sync.RWMutex
sync.Once / sync.Map / sync.Pool
context.WithCancel / WithTimeout
context.WithValue 与链式传播
errgroup 并发错误处理
Worker Pool 模式
select 多路复用
nil channel 技巧
-race 竞态检测
并发安全设计(不可变/CSP/锁)
Channel vs Mutex 的选择
能回答 8 道以上并发面试题

📚 推荐阅读


上一章: 错误处理 | 下一章: 接口与类型系统