1 协程基础
1.1 协程定义(Goroutine)
- 概念:Go 语言特有的轻量级线程,由 Go 运行时(runtime)管理,相比系统线程(Thread),创建和销毁成本极低,占用内存小(初始 2KB)。协程是 Go 程序中最基本的并发执行单元。
-
创建方式:使用
go
关键字启动一个协程func main() {// 匿名函数直接启动协程go func() {fmt.Println("Hello from goroutine!")}() // 调用已定义函数启动协程go func1()go func2()time.Sleep(time.Second) // 等待协程执行,否则主协程退出导致所有协程终止fmt.Println("主协程退出") }
1.2 协程调度模型GMP
Go 调度器采用 Goroutine-Machine-Processor (GMP) 模型,核心组件包括:
- G (Goroutine):协程的抽象,包含执行栈、程序计数器等信息。
- M (Machine): 对应操作系统线程,实际执行代码的实体。
- P (Processor):逻辑处理器,持有运行队列(Local Queue)和 G 上下文,必须绑定 M 才能执行 G。
M 是 “执行任务的实体”,是唯一能运行 Go 代码的载体。G(任务)本身只是一段代码逻辑,必须依赖 M(操作系统线程)才能在 CPU 上执行
M和P是绑定关系,必须成对出现
1.2.1 协程创建
- 当调用
go func()
时,创建一个新的 G 对象,放入当前 P 的 Local Queue。 - 若 Local Queue 已满(默认 256 个 G),将一半 G 转移到全局队列(Global Queue)。
1.2.2 协程执行
- M 从绑定的 P 的 Local Queue 获取 G 执行。
- 若 Local Queue 为空,从 Global Queue 批量获取 G(通常为 P 的 GOMAXPROCS/2)。
- 若 Global Queue 也为空,从其他 P 的 Local Queue 窃取(Work Stealing) 一半 G。
1.2.3 协程阻塞 / 唤醒
- 当 G 执行系统调用(如 I/O)时,M 与 P 解绑,P 可被其他 M 接管继续执行队列中的 G。
如果 M 因系统调用被阻塞时,P 继续绑定 M,会导致以下问题:
- P 无法工作:P 的本地队列中可能有大量就绪的 G,但由于 M 被阻塞,这些 G 无法执行。
- CPU 核心浪费:如果 P 对应一个 CPU 核心,该核心将处于闲置状态,即使还有其他任务可执行。
- 因此,当 G 执行系统调用时,调度器会 解绑 M 和 P,允许 P 继续工作,避免 CPU 资源浪费
- 系统调用返回后,G 重新加入某个 P 的队列等待执行。
2 并发模式
2.1 共享内存并发
多个协程通过共享变量访问数据,需使用同步原语(如sync.Mutex
、sync.RWMutex
)保护临界区
var (counter intmu sync.Mutex
)func increment() {mu.Lock()defer mu.Unlock()counter++
}func main() {var wg sync.WaitGroupfor i := 0; i < 1000; i++ {wg.Add(1)go func() {defer wg.Done()increment()}()}wg.Wait()fmt.Println("Counter:", counter) // 输出1000,无竞争
}
2.2 CSP 并发(通过通道通信)
使用channel
实现协程间通信和同步,遵循 “不要通过共享内存来通信,而要通过通信来共享内存” 原则
func producer(ch chan<- int) {for i := 0; i < 5; i++ {ch <- i}close(ch)
}func consumer(ch <-chan int) {for num := range ch {fmt.Println("Received:", num)}
}func main() {ch := make(chan int)go producer(ch)consumer(ch)
}
2.3 并发任务控制
// 普通的协程创建方法:
go func() {// your code1
}()
go func() {// your code2
}()
// go on
这段 Go 代码的执行顺序如下:
-
启动 goroutine 1:主协程创建并启动第一个匿名函数(
// your code1
),该函数在后台异步执行。 -
启动 goroutine 2:主协程紧接着创建并启动第二个匿名函数(
// your code2
),同样在后台异步执行。 -
主协程继续执行:主协程不会等待这两个 goroutine 完成,而是立即继续执行
// go on
之后的代码。 -
并行执行 goroutine:
// your code1
和// your code2
的执行顺序取决于调度器,可能并行或交替执行,但它们的完成顺序不确定。由于主协程未等待它们,若主协程提前结束(例如程序退出),这两个 goroutine 可能被强制终止。
2.3.1 sync.WaitGroup
wg.Wait()会阻塞直到2个协程执行完后
go func() {
// func1wg.Done()
}()
go func() {
// func2wg.Done()
}()
wg.Wait()
// go on
这段 Go 代码的执行顺序如下:
-
启动 goroutine 1:主协程创建并启动第一个匿名函数(
func1
),该函数在后台异步执行。 -
启动 goroutine 2:主协程紧接着创建并启动第二个匿名函数(
func2
),同样在后台异步执行。 -
主协程阻塞:主协程执行
wg.Wait()
,进入阻塞状态,等待所有被等待的 goroutine 完成。 -
并行执行 goroutine:
func1
和func2
的执行顺序取决于调度器,可能并行或交替执行,但它们的完成顺序不确定。每个 goroutine 在完成任务后调用wg.Done()
通知等待组。 -
恢复主协程:当所有被等待的 goroutine(即
func1
和func2
)都调用了wg.Done()
后,wg.Wait()
返回,主协程继续执行后续代码(// go on
)。
主协程 | goroutine 1 | goroutine 2
---------------------------------------------------------------
wg.Add(2) | |
启动func1 | 开始执行func1 |
启动func2 | | 开始执行func2
wg.Wait()阻塞 | ... | ...
| 执行完毕 |
| wg.Done() |
| | 执行完毕
| | wg.Done()
wg.Wait()返回 | |
继续执行后续代码
2.3.2 errgroup.Group
var g errgroup.Group
g.Go(func() error {// 任务1:可能返回错误return nil
})
g.Go(func() error {// 任务2:可能返回错误return errors.New("task failed")
})
if err := g.Wait(); err != nil {// 处理首个错误(如任务2失败)
}
执行顺序:
-
主协程启动两个 goroutine 并行执行
-
若其中一个 goroutine 返回非 nil 错误:
-
自动调用内置的
context.CancelFunc
-
向其他 goroutine 发送取消信号(通过 context)
-
g.Wait()
立即返回首个错误
-
-
所有 goroutine(包括未出错的)需主动检查 context 状态并提前退出
2.3.3 对比
特性 | errgroup.Group | sync.WaitGroup |
---|---|---|
错误处理 | 自动捕获首个非 nil 错误并终止所有 goroutine | 不处理错误 |
执行控制 | 首个错误发生后,其他 goroutine 会被 CancelFunc 终止 | 所有 goroutine 独立运行至完成 |
结果聚合 | 可返回首个错误,用于统一错误处理 | 无内置错误传递机制 |
取消机制 | 支持通过 context 传播取消信号 | 无内置取消机制 |
3. 并发panic处理
协程中发生 panic 若未被捕获,仅会导致该协程崩溃,不会影响其他协程和主程序,但可能导致资源泄漏
3.1 普通goroutine的panic处理
对于普通的goroutine,可以在协程函数内部使用defer和recover组合来捕获panic。defer语句会将函数推迟到外层函数返回之前执行,而recover函数用于捕获panic,它只能在defer修饰的函数中有效
func worker() {defer func() {if r := recover(); r != nil {fmt.Println("Recovered in worker:", r)}}()// 可能触发panic的代码var data map[string]intdata["key"] = 1 // 触发panic: assignment to entry in nil map
}func main() {go worker()time.Sleep(time.Second)fmt.Println("Main continues")
}
3.2 使用 sync.WaitGroup 时的 panic 处理
sync.WaitGroup常用于等待一组goroutine完成任务。在这种场景下,每个goroutine内部仍需使用defer和recover捕获panic,并且可以通过额外的机制将panic信息传递给主协程。
import ("fmt""sync"
)type Result struct {Err errorData interface{}
}func worker(id int, wg *sync.WaitGroup, resultChan chan<- Result) {defer func() {if r := recover(); r != nil {resultChan <- Result{Err: fmt.Errorf("panic in worker %d: %v", id, r)}}}()// 模拟可能触发panic的任务if id == 2 {panic("simulated panic")}resultChan <- Result{Data: fmt.Sprintf("Worker %d finished", id)}wg.Done()
}func main() {var wg sync.WaitGroupresultChan := make(chan Result)numWorkers := 3for i := 1; i <= numWorkers; i++ {wg.Add(1)go worker(i, &wg, resultChan)}go func() {wg.Wait()close(resultChan)}()for result := range resultChan {if result.Err != nil {fmt.Println(result.Err)} else {fmt.Println(result.Data)}}
}
worker函数通过defer和recover捕获panic,并将错误信息封装成Result结构体发送到resultChan通道。主协程从通道中接收结果,判断是否存在错误并进行相应处理,确保即使有goroutine发生panic,也能及时获取信息并继续执行后续逻辑。
3.3 使用 errgroup.Group 时的 panic 处理
errgroup.Group可以方便地并行执行多个任务,并在其中一个任务出错时快速返回错误。然而,它只能处理函数返回的错误,无法自动捕获goroutine内部的panic。因此,需要手动在每个任务函数中添加panic捕获逻辑,并将panic转换为错误返回给errgroup.Group。
3.3.1 方法一:手动封装panic捕获
import ("fmt""golang.org/x/sync/errgroup"
)func safeGo(g *errgroup.Group, fn func() error) {g.Go(func() error {defer func() {if r := recover(); r != nil {return fmt.Errorf("panic occurred: %v", r)}}()return fn()})
}func main() {var g errgroup.GroupsafeGo(&g, func() error {// 可能触发panic的任务panic("unexpected error")return nil})if err := g.Wait(); err != nil {fmt.Println("Error:", err) // 输出 panic occurred: unexpected error}
}
3.3.2 封装增强版errgroup
import ("fmt""golang.org/x/sync/errgroup""sync"
)type SafeGroup struct {g errgroup.Groupmu sync.Mutexpanics []interface{}
}func (sg *SafeGroup) Go(fn func() error) {sg.g.Go(func() error {defer func() {if r := recover(); r != nil {sg.mu.Lock()sg.panics = append(sg.panics, r)sg.mu.Unlock()}}()return fn()})
}func (sg *SafeGroup) Wait() error {if err := sg.g.Wait(); err != nil {return err}if len(sg.panics) > 0 {return fmt.Errorf("panics occurred: %v", sg.panics)}return nil
}func main() {var sg SafeGroupsg.Go(func() error {panic("panic in goroutine")return nil})if err := sg.Wait(); err != nil {fmt.Println("Error:", err) // 输出: panics occurred: [panic in goroutine]}
}
这两种方案都能有效地在errgroup.Group中处理panic,方案一通过简单的函数封装,在每个任务中添加panic捕获;方案二则通过自定义结构体,将panic信息集中管理,在Wait方法中统一返回错误,方便在复杂场景下对panic进行更灵活的处理。