🔄 Go并发编程¶
📋 学习目标¶
通过本章学习,你将能够:
- 深入理解 Goroutine 调度模型(GMP)与栈增长机制
- 掌握 Channel 的高级模式(fan-in/fan-out/pipeline/done)
- 熟练使用 sync 包的各种同步原语
- 运用 context 包进行超时控制和取消传播
- 使用 errgroup 进行并发错误处理
- 实现 Worker Pool 等经典并发模式
- 掌握 select 多路复用的用法与技巧
- 能用
-race检测竞态条件 - 具备并发相关面试题的解答能力
上图先给出 GMP 调度模型,再映射到 channel、worker pool、context/errgroup 等工程实践模式。
1️⃣ Goroutine 深入理解¶
1.1 Goroutine 基础¶
Go
package main
import (
"fmt"
"sync"
"time"
)
func sayHello(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Hello from goroutine %d\n", id)
time.Sleep(100 * time.Millisecond)
}
func main() {
var wg sync.WaitGroup // WaitGroup 等待一组goroutine完成
for i := 0; i < 5; i++ {
wg.Add(1)
go sayHello(i, &wg) // go 关键字启动goroutine并发执行
}
wg.Wait() // 等待所有 goroutine 完成
fmt.Println("All goroutines finished")
}
1.2 GMP 调度模型¶
Text Only
Go 运行时采用 GMP 调度模型来管理 goroutine:
G (Goroutine) — 用户级协程,初始栈仅 2KB(可动态增长到 1GB)
M (Machine) — 操作系统线程,实际执行 goroutine 的载体
P (Processor) — 逻辑处理器,默认数量 = GOMAXPROCS(通常等于 CPU 核数)
调度流程:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ P (LRQ) │ │ P (LRQ) │ │ P (LRQ) │ LRQ = Local Run Queue
│ ┌─┬─┬─┐│ │ ┌─┬─┐ │ │ ┌─┐ │
│ │G│G│G││ │ │G│G│ │ │ │G│ │
│ └─┴─┴─┘│ │ └─┴─┘ │ │ └─┘ │
│ ↓ │ │ ↓ │ │ ↓ │
│ M │ │ M │ │ M │ M = OS Thread
└─────────┘ └─────────┘ └─────────┘
↑ ↑ ↑
└───────────────┼──────────────┘
│
┌───────────────┐
│ GRQ (全局) │ GRQ = Global Run Queue
│ ┌─┬─┬─┬─┐ │
│ │G│G│G│G│ │
│ └─┴─┴─┴─┘ │
└───────────────┘
关键机制:
• Work Stealing —— P 的本地队列为空时,从其他 P 或全局队列偷取 G
• Hand Off —— G 发生系统调用阻塞时,P 与当前 M 解绑,绑定新的 M 继续执行
• 抢占式调度 —— Go 1.14+ 基于信号的异步抢占,防止 goroutine 长期占用
1.3 栈增长机制¶
Go
package main
import (
"fmt"
"runtime"
)
// goroutine 初始栈仅 2KB,按需增长(连续栈/copy stack)
// 当栈空间不足时,运行时会分配一个更大的栈(通常 2 倍),
// 并将旧栈内容复制过去,然后更新所有指针
func deepRecursion(n int) int {
if n <= 0 {
// 打印当前 goroutine 的栈使用情况
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Stack in use: %d KB\n", m.StackInuse/1024)
return 0
}
return n + deepRecursion(n-1)
}
func main() {
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
result := deepRecursion(10000)
fmt.Printf("Result: %d\n", result)
}
2️⃣ Channel 模式¶
2.1 Channel 基础¶
Go
package main
import "fmt"
func main() {
// 无缓冲 channel(同步通信)
ch := make(chan int) // chan 创建通道,用于goroutine间通信
go func() {
ch <- 42 // 发送(阻塞直到接收方就绪)
}()
val := <-ch // 接收(阻塞直到发送方就绪)
fmt.Println(val) // 42
// 有缓冲 channel(异步通信)
bufCh := make(chan string, 3)
bufCh <- "A"
bufCh <- "B"
bufCh <- "C"
// bufCh <- "D" // 缓冲满,会阻塞
fmt.Println(<-bufCh) // A(FIFO)
// 单向 channel
// chan<- int 只发送
// <-chan int 只接收
// 关闭 channel
dataCh := make(chan int, 5)
for i := 0; i < 5; i++ {
dataCh <- i
}
close(dataCh)
// range 遍历已关闭的 channel
for v := range dataCh {
fmt.Print(v, " ") // 0 1 2 3 4
}
fmt.Println()
}
2.2 Pipeline 模式¶
Go
package main
import "fmt"
// 生成器:产生数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// 平方:对数据变换
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// 过滤:只保留偶数
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
func main() {
// pipeline: generate → square → filterEven
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(nums)
evens := filterEven(squared)
for v := range evens {
fmt.Print(v, " ") // 4 16 36 64 100
}
fmt.Println()
}
2.3 Fan-Out / Fan-In 模式¶
Go
package main
import (
"fmt"
"sync"
)
// Fan-Out: 一个 channel 的数据分发给多个 worker
// Fan-In: 多个 channel 的结果合并到一个 channel
// channels ...<-chan int: 可变参数,每个都是只读 channel; 返回值 <-chan int 也是只读 channel
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
// 每个输入 channel 启动一个 goroutine
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch) // 立即调用匿名函数并传入ch,避免闭包捕获循环变量(Go经典模式)
}
// 所有输入完成后关闭输出
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func worker(id int, jobs <-chan int) <-chan int {
results := make(chan int)
go func() {
defer close(results)
for j := range jobs {
result := j * j // 模拟处理
fmt.Printf("Worker %d processed %d → %d\n", id, j, result)
results <- result
}
}()
return results
}
func main() {
jobs := make(chan int, 10)
// 发送任务
go func() {
for i := 1; i <= 9; i++ {
jobs <- i
}
close(jobs)
}()
// Fan-Out: 启动 3 个 worker
w1 := worker(1, jobs)
w2 := worker(2, jobs)
w3 := worker(3, jobs)
// Fan-In: 合并结果
for result := range fanIn(w1, w2, w3) {
fmt.Println("Result:", result)
}
}
2.4 Done Channel 模式(优雅取消)¶
Go
package main
import (
"fmt"
"time"
)
// done channel 用于通知 goroutine 停止工作
func doWork(done <-chan struct{}, id int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
i := 0
for {
select { // select 多路复用,监听多个channel操作
case <-done:
fmt.Printf("Worker %d: cancelled\n", id)
return
case out <- i:
i++
time.Sleep(50 * time.Millisecond)
}
}
}()
return out
}
func main() {
done := make(chan struct{})
ch := doWork(done, 1)
// 收集一些数据
for i := 0; i < 5; i++ {
fmt.Println("Received:", <-ch)
}
// 通知取消
close(done) // 关闭 done channel 会让所有读取操作立即返回
time.Sleep(100 * time.Millisecond)
fmt.Println("Main: done")
}
3️⃣ sync 包¶
3.1 WaitGroup¶
Go
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
urls := []string{
"https://golang.org",
"https://google.com",
"https://github.com",
}
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
fmt.Printf("Fetching %s\n", u)
// 模拟 HTTP 请求...
}(url)
}
wg.Wait()
fmt.Println("All fetches complete")
}
3.2 Mutex / RWMutex¶
Go
package main
import (
"fmt"
"sync"
)
// 线程安全的计数器
// ⚠️ 优化提示:读操作 Value() 使用了排他锁 Lock(),会阻塞其他读者。
// 在读多写少场景下,建议使用 sync.RWMutex,读操作用 RLock() 以允许并发读取。
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.v[key]++
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.v[key]
}
// 读写锁:读多写少场景
type RWConfig struct {
mu sync.RWMutex
data map[string]string
}
func (c *RWConfig) Get(key string) string {
c.mu.RLock() // 读锁(多个读者可并发)
defer c.mu.RUnlock()
return c.data[key]
}
func (c *RWConfig) Set(key, value string) {
c.mu.Lock() // 写锁(独占)
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
counter := SafeCounter{v: make(map[string]int)}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc("key")
}()
}
wg.Wait()
fmt.Println("Counter:", counter.Value("key")) // 1000
}
3.3 Once(只执行一次)¶
Go
package main
import (
"fmt"
"sync"
)
type Database struct {
name string
}
var (
dbOnce sync.Once
dbInstance *Database
)
func GetDB() *Database {
dbOnce.Do(func() {
fmt.Println("Initializing database connection...")
dbInstance = &Database{name: "production"}
})
return dbInstance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
db := GetDB()
fmt.Printf("Goroutine %d got DB: %s\n", id, db.name)
}(i)
}
wg.Wait()
// "Initializing database connection..." 只打印一次
}
3.4 sync.Map(并发安全 Map)¶
Go
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// Store
m.Store("lang", "Go")
m.Store("version", 1.21)
// Load
if val, ok := m.Load("lang"); ok {
fmt.Println("lang:", val) // lang: Go
}
// LoadOrStore(不存在则存储,存在则返回已有值)
actual, loaded := m.LoadOrStore("lang", "Rust")
fmt.Println(actual, loaded) // Go true
// Range 遍历
m.Range(func(key, value any) bool {
fmt.Printf("%v = %v\n", key, value)
return true // 返回 false 停止遍历
})
// Delete
m.Delete("version")
// ⚠️ sync.Map 适用场景:
// - key 相对稳定,读多写少
// - 多个 goroutine 读写不同的 key
// - 不适合大量写入或需要遍历计数的场景
}
3.5 sync.Pool(对象池)¶
Go
package main
import (
"bytes"
"fmt"
"sync"
)
// sync.Pool 用于复用临时对象,减少 GC 压力
var bufPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
func process(data string) string {
buf := bufPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufPool.Put(buf) // 用完归还
}()
buf.WriteString("processed: ")
buf.WriteString(data)
return buf.String()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := process(fmt.Sprintf("data-%d", id))
fmt.Println(result)
}(i)
}
wg.Wait()
}
4️⃣ context 包¶
4.1 WithCancel¶
Go
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) { // context.Context 控制goroutine生命周期
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: stopped (%v)\n", id, ctx.Err())
return
default:
fmt.Printf("Worker %d: working...\n", id)
time.Sleep(200 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
time.Sleep(600 * time.Millisecond)
cancel() // 取消所有 worker
time.Sleep(100 * time.Millisecond)
fmt.Println("All workers cancelled")
}
4.2 WithTimeout / WithDeadline¶
Go
package main
import (
"context"
"fmt"
"time"
)
func slowOperation(ctx context.Context) (string, error) {
select {
case <-time.After(2 * time.Second): // 模拟慢操作
return "completed", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
// WithTimeout: 500ms 超时
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel() // 总是调用 cancel 释放资源
result, err := slowOperation(ctx)
if err != nil {
fmt.Println("Error:", err) // context deadline exceeded
return
}
fmt.Println("Result:", result)
}
4.3 WithValue(链式传播)¶
Go
package main
import (
"context"
"fmt"
)
type contextKey string
const (
keyRequestID contextKey = "requestID"
keyUserID contextKey = "userID"
)
func middleware(ctx context.Context) context.Context {
ctx = context.WithValue(ctx, keyRequestID, "req-abc-123")
ctx = context.WithValue(ctx, keyUserID, 42)
return ctx
}
func handler(ctx context.Context) {
reqID := ctx.Value(keyRequestID).(string)
userID := ctx.Value(keyUserID).(int)
fmt.Printf("Request: %s, User: %d\n", reqID, userID)
}
func main() {
// 链式传播:Background → WithValue(requestID) → WithValue(userID)
ctx := context.Background()
ctx = middleware(ctx)
handler(ctx)
// ⚠️ context.WithValue 最佳实践:
// - 使用自定义类型作 key(避免冲突)
// - 仅传递请求级别的数据(如 traceID、auth token)
// - 不要用来传递函数参数或业务逻辑数据
}
4.4 Context 树与取消传播¶
Go
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 父 context
parentCtx, parentCancel := context.WithCancel(context.Background())
// 子 context(继承父 context 的取消信号)
// ⚠️ 即使父 cancel 会连带取消子 context,子 cancel 仍必须调用
// 以释放关联的资源(如 WithTimeout 的内部定时器 goroutine)
childCtx1, childCancel1 := context.WithTimeout(parentCtx, 5*time.Second)
defer childCancel1() // 必须调用!否则定时器 goroutine 会泄漏
childCtx2, childCancel2 := context.WithCancel(parentCtx)
defer childCancel2()
go func() {
<-childCtx1.Done()
fmt.Println("Child1 cancelled:", childCtx1.Err())
}()
go func() {
<-childCtx2.Done()
fmt.Println("Child2 cancelled:", childCtx2.Err())
}()
// 取消父 context → 所有子 context 都会被取消
time.Sleep(100 * time.Millisecond)
parentCancel()
time.Sleep(100 * time.Millisecond)
// 输出:
// Child1 cancelled: context canceled
// Child2 cancelled: context canceled
}
5️⃣ errgroup 并发错误处理¶
Go
package main
import (
"context"
"fmt"
"errors"
"time"
"golang.org/x/sync/errgroup"
)
func fetchURL(ctx context.Context, url string) error {
// 模拟 HTTP 请求
select {
case <-time.After(100 * time.Millisecond):
if url == "https://bad.example.com" {
return fmt.Errorf("failed to fetch %s: 404", url)
}
fmt.Printf("Fetched: %s\n", url)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
urls := []string{
"https://golang.org",
"https://google.com",
"https://bad.example.com",
"https://github.com",
}
// errgroup.WithContext:
// - 任何一个 goroutine 返回 error → 自动 cancel context
// - Wait() 返回第一个非 nil error
g, ctx := errgroup.WithContext(context.Background())
for _, url := range urls {
url := url // 捕获循环变量(Go < 1.22)
g.Go(func() error {
return fetchURL(ctx, url)
})
}
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("All fetches succeeded")
}
}
// errgroup 还支持限制并发数:
// g.SetLimit(3) // 最多 3 个 goroutine 并发执行
6️⃣ Worker Pool 模式¶
Go
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Payload string
}
type Result struct {
JobID int
Output string
}
func workerPool(numWorkers int, jobs <-chan Job, results chan<- Result) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobs {
// 模拟处理
time.Sleep(50 * time.Millisecond)
output := fmt.Sprintf("Worker %d processed job %d: %s",
workerID, job.ID, job.Payload)
results <- Result{JobID: job.ID, Output: output}
}
}(i)
}
// 所有 worker 完成后关闭 results
go func() {
wg.Wait()
close(results)
}()
}
func main() {
const numJobs = 20
const numWorkers = 4
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
// 启动 worker pool
workerPool(numWorkers, jobs, results)
// 发送任务
for i := 0; i < numJobs; i++ {
jobs <- Job{ID: i, Payload: fmt.Sprintf("task-%d", i)}
}
close(jobs)
// 收集结果
for r := range results {
fmt.Println(r.Output)
}
}
带速率限制的 Worker Pool¶
Go
package main
import (
"fmt"
"sync"
"time"
)
func rateLimitedPool(numWorkers int, rateLimit time.Duration) {
jobs := make(chan int, 100)
var wg sync.WaitGroup
// 速率限制器
limiter := time.NewTicker(rateLimit)
defer limiter.Stop()
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
<-limiter.C // 等待限速令牌
fmt.Printf("[%s] Worker %d: job %d\n",
time.Now().Format("15:04:05.000"), id, job)
}
}(i)
}
// 提交任务
for i := 0; i < 10; i++ {
jobs <- i
}
close(jobs)
wg.Wait()
}
func main() {
// 每 200ms 最多处理一个任务
rateLimitedPool(3, 200*time.Millisecond)
}
7️⃣ select 多路复用详解¶
Go
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "from ch2"
}()
// select: 同时监听多个 channel,哪个先就绪执行哪个
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
}
}
// ──── select 的常见模式 ────
// 1. 超时控制
select {
case msg := <-ch1:
fmt.Println(msg)
case <-time.After(500 * time.Millisecond):
fmt.Println("Timeout!")
}
// 2. 非阻塞操作
select {
case msg := <-ch1:
fmt.Println(msg)
default:
fmt.Println("No message available (non-blocking)")
}
// 3. 定时器循环
ticker := time.NewTicker(300 * time.Millisecond)
defer ticker.Stop()
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
close(done)
}()
for {
select {
case t := <-ticker.C:
fmt.Println("Tick at", t.Format("15:04:05.000"))
case <-done:
fmt.Println("Done!")
return
}
}
}
select 与 nil channel 技巧¶
Go
package main
import "fmt"
func main() {
ch1 := make(chan int, 2)
ch2 := make(chan int, 2)
ch1 <- 1
ch1 <- 2
ch2 <- 3
ch2 <- 4
close(ch1)
close(ch2)
// 技巧:读完一个 channel 后将其设为 nil,
// nil channel 在 select 中永远阻塞(被跳过)
var c1, c2 <-chan int = ch1, ch2
for c1 != nil || c2 != nil {
select {
case v, ok := <-c1:
if !ok {
c1 = nil
continue
}
fmt.Println("ch1:", v)
case v, ok := <-c2:
if !ok {
c2 = nil
continue
}
fmt.Println("ch2:", v)
}
}
fmt.Println("Both channels drained")
}
8️⃣ 竞态检测(-race flag)¶
Go
package main
import (
"fmt"
"sync"
)
// ❌ 有数据竞争的代码
var unsafeCounter int
func unsafeIncrement(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 1000; i++ {
unsafeCounter++ // DATA RACE!
}
}
// ✅ 安全版本
var (
safeCounter int
mu sync.Mutex
)
func safeIncrement(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 1000; i++ {
mu.Lock()
safeCounter++
mu.Unlock()
}
}
func main() {
var wg sync.WaitGroup
// 运行命令: go run -race main.go
// 会检测并报告数据竞争
for i := 0; i < 10; i++ {
wg.Add(1)
go unsafeIncrement(&wg) // ← -race 会报告这里
}
wg.Wait()
fmt.Println("Unsafe:", unsafeCounter) // 结果不确定
for i := 0; i < 10; i++ {
wg.Add(1)
go safeIncrement(&wg)
}
wg.Wait()
fmt.Println("Safe:", safeCounter) // 始终 10000
}
/*
使用方式:
go run -race main.go # 运行时检测
go test -race ./... # 测试时检测
go build -race # 构建带竞态检测的二进制
-race 的限制:
- 只能检测运行时实际发生的竞态(非静态分析)
- 会显著降低性能(10x 左右)
- 建议在 CI 中对所有测试开启 -race
*/
9️⃣ 并发安全的设计模式¶
9.1 不可变数据¶
Go
package main
import "fmt"
// 不可变数据天然并发安全——创建后不再修改
type ImmutableConfig struct {
host string
port int
}
func NewConfig(host string, port int) ImmutableConfig {
return ImmutableConfig{host: host, port: port}
}
func (c ImmutableConfig) Host() string { return c.host }
func (c ImmutableConfig) Port() int { return c.port }
// 需要"修改"时,返回新实例
func (c ImmutableConfig) WithPort(port int) ImmutableConfig {
return ImmutableConfig{host: c.host, port: port}
}
func main() {
cfg := NewConfig("localhost", 8080)
newCfg := cfg.WithPort(9090)
fmt.Println(cfg.Port()) // 8080(原始不变)
fmt.Println(newCfg.Port()) // 9090(新实例)
}
9.2 Channel 封装并发(CSP 风格)¶
Go
package main
import "fmt"
// 用 channel 代替共享内存 + mutex
// "Do not communicate by sharing memory; share memory by communicating."
type BankAccount struct {
balance int
deposits chan int
withdrawals chan withdrawReq
balanceReqs chan chan int
done chan struct{}
}
type withdrawReq struct {
amount int
result chan bool
}
func NewBankAccount(initial int) *BankAccount {
ba := &BankAccount{
balance: initial,
deposits: make(chan int),
withdrawals: make(chan withdrawReq),
balanceReqs: make(chan chan int),
done: make(chan struct{}),
}
go ba.run()
return ba
}
func (ba *BankAccount) run() {
for {
select {
case amount := <-ba.deposits:
ba.balance += amount
case req := <-ba.withdrawals:
if ba.balance >= req.amount {
ba.balance -= req.amount
req.result <- true
} else {
req.result <- false
}
case reply := <-ba.balanceReqs:
reply <- ba.balance
case <-ba.done:
return
}
}
}
func (ba *BankAccount) Deposit(amount int) { ba.deposits <- amount }
func (ba *BankAccount) Withdraw(amount int) bool {
req := withdrawReq{amount: amount, result: make(chan bool)}
ba.withdrawals <- req
return <-req.result
}
func (ba *BankAccount) Balance() int {
reply := make(chan int)
ba.balanceReqs <- reply
return <-reply
}
func (ba *BankAccount) Close() { close(ba.done) }
func main() {
acct := NewBankAccount(1000)
defer acct.Close()
acct.Deposit(500)
ok := acct.Withdraw(200)
fmt.Println("Withdraw 200:", ok) // true
fmt.Println("Balance:", acct.Balance()) // 1300
}
9.3 选择 Channel 还是 Mutex?¶
Text Only
Channel Mutex
─────────────────────────────────────────────────────────────
模型 CSP(通信顺序进程) 共享内存
适用 数据在 goroutine 间流动 保护共享状态的原地修改
类比 管道 / 消息传递 锁 / 临界区
优势 天然防止共享、解耦好 简单直接、性能好
选用指南:
✅ 用 Channel 当:
- 传递数据所有权(一个 goroutine 产生,另一个消费)
- 协调多个 goroutine 的启停
- Pipeline / Fan-out Fan-in 模式
✅ 用 Mutex 当:
- 保护一个被多次读写的共享变量
- 实现缓存、计数器等简单共享状态
- 性能敏感,临界区非常短
🎯 面试题精选¶
Q1: Goroutine 泄漏如何排查?¶
Text Only
常见泄漏场景:
1. channel 无人读/写导致 goroutine 永远阻塞
2. 忘记关闭 channel,range 循环永远等待
3. context 没有正确传递取消信号
4. select 里缺少 done/timeout 分支
排查方法:
✅ runtime.NumGoroutine() —— 监控 goroutine 数量变化
✅ pprof goroutine profile:
import _ "net/http/pprof"
go tool pprof http://localhost:6060/debug/pprof/goroutine
✅ go test -count=1 -run TestXxx 后检查 goroutine 数是否回到基线
✅ goleak 库:go.uber.org/goleak
预防措施:
- 总是为 channel 操作配 context/done/timeout
- defer cancel() 确保 context 被取消
- 总是关闭(或让发送方关闭)channel
Q2: Channel 和 Mutex 怎么选?¶
Text Only
见上方 9.3 节的详细对比。简短回答:
- 数据需要在 goroutine 间"流动" → Channel
- 数据在一个地方被多个 goroutine "共享访问" → Mutex
- 不确定时,先用 Channel(更符合 Go 哲学),
如果发现代码变复杂了再改用 Mutex
Q3: Buffered Channel 和 Unbuffered Channel 有什么区别?¶
Text Only
Unbuffered (make(chan T)):
- 发送和接收同步阻塞(rendezvous)
- 保证发送方执行在接收方之前(happens-before)
- 适合需要严格同步的场景
Buffered (make(chan T, n)):
- 缓冲未满时发送不阻塞
- 缓冲非空时接收不阻塞
- 适合生产者-消费者解耦、限制并发度
常见用法:
sem := make(chan struct{}, N) // 信号量,限制 N 个并发
Q4: sync.WaitGroup 使用有哪些注意事项?¶
Text Only
1. Add() 必须在 go func() 之前调用,否则 Wait() 可能提前返回
2. Add() 的数量必须与 Done() 匹配,多了死锁,少了 panic
3. WaitGroup 不可拷贝(传参用指针)
4. 嵌套使用时注意死锁:Wait() 阻塞的 goroutine 不能再 Add()
正确模式:
wg.Add(1) // 先 Add
go func() {
defer wg.Done() // 用 defer 确保 Done 被调用
// ...
}()
Q5: Context 包的设计理念是什么?¶
Text Only
Context 解决的问题:
- 请求级别的取消传播(一个请求取消,所有关联操作都取消)
- 超时控制(防止慢请求占用资源)
- 请求级元数据传递(如 traceID、认证信息)
核心规则:
1. 函数的第一个参数应为 ctx context.Context
2. 不要在 struct 中存储 context
3. 不要传递 nil context,不确定时用 context.TODO()
4. context.Value 仅传递请求级数据,不传业务参数
5. 总是 defer cancel() 释放资源
Context 树:取消父节点 → 所有子节点递归取消
Q6: 什么是 GMP 模型?GOMAXPROCS 设多少?¶
Text Only
GMP = Goroutine + Machine(OS Thread) + Processor
G: goroutine,用户态协程
M: 操作系统线程,执行 goroutine 的载体
P: 逻辑处理器,持有本地 goroutine 队列
GOMAXPROCS 默认值 = runtime.NumCPU()
- CPU 密集型:GOMAXPROCS = CPU 核数(默认即可)
- IO 密集型:GOMAXPROCS = CPU 核数 或适当增大
- 容器环境:Go 1.19+ 自动识别 cgroup 限制,
旧版本可用 go.uber.org/automaxprocs
Q7: 如何优雅关闭一组 goroutine?¶
Go
// 模式1: Done Channel
done := make(chan struct{})
// 启动 goroutine,内部 select case <-done
close(done) // 通知所有 goroutine 退出
// 模式2: Context
ctx, cancel := context.WithCancel(context.Background())
// 启动 goroutine,内部 select case <-ctx.Done()
cancel() // 取消所有 goroutine
// 模式3: errgroup(任一出错自动取消)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { ... })
err := g.Wait()
// 关键原则:
// - 谁创建 goroutine,谁负责确保其能退出
// - 使用 WaitGroup 等待所有 goroutine 完成
// - 清理顺序:发送停止信号 → 等待完成 → 关闭资源
Q8: 解释 Go 的 happens-before 规则¶
Text Only
happens-before 定义了操作之间的可见性保证:
如果事件 A happens-before 事件 B,那么 A 的效果对 B 可见。
Go 中关键的 happens-before 关系:
1. 同一 goroutine 中:代码顺序 = happens-before 顺序
2. Channel:
- 发送 happens-before 对应的接收完成
- 关闭 happens-before 接收到零值
- 无缓冲:接收 happens-before 发送完成
3. sync.Mutex:
- Unlock happens-before 下一次 Lock
4. sync.Once:
- f() 完成 happens-before 任何 Do(f) 返回
5. go 语句:
- go f() 中的 go 语句 happens-before f() 开始执行
没有 happens-before 关系的操作,编译器和 CPU 可能重排,
这就是为什么需要同步原语。
Q9: select 的随机性与应用¶
Text Only
当 select 多个 case 同时就绪时,Go 会随机选择一个执行。
这不是 bug,而是故意的设计——防止饥饿。
应用场景:
1. 负载均衡:多个相同的 worker channel,select 随机分配
2. 超时控制:case + time.After
3. 非阻塞尝试:case + default
4. 优先级模式(需要嵌套 select 或额外逻辑):
// 伪代码:优先处理高优先级
select {
case msg := <-highPriority:
handle(msg)
default:
select {
case msg := <-highPriority:
handle(msg)
case msg := <-lowPriority:
handle(msg)
}
}
✅ 学习检查清单¶
| 主题 | 掌握情况 |
|---|---|
| Goroutine 创建与管理 | ⬜ |
| GMP 调度模型原理 | ⬜ |
| Channel 基础(有/无缓冲、关闭、range) | ⬜ |
| Pipeline 模式 | ⬜ |
| Fan-In / Fan-Out 模式 | ⬜ |
| Done Channel 取消模式 | ⬜ |
| sync.WaitGroup | ⬜ |
| sync.Mutex / sync.RWMutex | ⬜ |
| sync.Once / sync.Map / sync.Pool | ⬜ |
| context.WithCancel / WithTimeout | ⬜ |
| context.WithValue 与链式传播 | ⬜ |
| errgroup 并发错误处理 | ⬜ |
| Worker Pool 模式 | ⬜ |
| select 多路复用 | ⬜ |
| nil channel 技巧 | ⬜ |
-race 竞态检测 | ⬜ |
| 并发安全设计(不可变/CSP/锁) | ⬜ |
| Channel vs Mutex 的选择 | ⬜ |
| 能回答 8 道以上并发面试题 | ⬜ |
📚 推荐阅读¶
- Effective Go: Concurrency
- Go Memory Model
- Go Blog: Pipelines and cancellation
- Go Blog: Context
- 《Concurrency in Go》(Katherine Cox-Buday)
- Go Race Detector