文章目录
- Go语言并发编程
- **简单介绍**
- goroutine channel 实现并发和并行
- for循环开启多个协程
- Channel管道
- goroutine 结合 channel 管道
- **goroutine 结合 channel打印素数**
- 单向管道
- Select多路复用
- Goroutine Recover解决协程中出现的Panic
- Go中的并发安全和互斥锁
Go语言并发编程
简单介绍
Go语言并发编程是通过goroutine(轻量级线程)和channel(通道)实现的高效并发模型,其核心特点是"不要通过共享内存来通信,而要通过通信来共享内存"。主要优势包括:1) 轻量级(goroutine开销极小,可轻松创建数万个);2) 内置CSP模型(通过channel安全传递数据,避免锁竞争);3) 简单易用(go关键字即可启动并发,相比线程/回调更简洁)。作用体现在提升CPU/IO密集型任务效率(如网络服务、并行计算),通过GPM调度器智能利用多核,同时保持代码可读性,典型应用如高并发服务器、爬虫、数据处理等。
goroutine channel 实现并发和并行
为什么要使用goroutine呢
需求:要统计1-10000000的数字中那些是素数,并打印这些素数?
素数:就是除了1和它本身不能被其他数整除的数
实现方法:
- 传统方法,通过一个for循环判断各个数是不是素数
- 使用并发或者并行的方式,将统计素数的任务分配给多个goroutine去完成,这个时候就用到了goroutine
- goroutine 结合 channel
进程、线程以及并行、并发
进程
进程(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进程都有一个自己的地址空间。一个进程至少有5种基本状态,它们是:初始态,执行态,等待状态,就绪状态,终止状态。
通俗的讲进程就是一个正在执行的程序。
线程
线程是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位
一个进程可以创建多个线程,同一个进程中多个线程可以并发执行 ,一个线程要运行的话,至少有一个进程
并发和并行
并发:多个线程同时竞争一个位置,竞争到的才可以执行,每一个时间段只有一个线程在执行。
并行:多个线程可以同时执行,每一个时间段,可以有多个线程同时执行。
通俗的讲多线程程序在单核CPU上面运行就是并发,多线程程序在多核CUP上运行就是并行,如果线程数大于CPU核数,则多线程程序在多个CPU上面运行既有并行又有并发
Golang中协程(goroutine)以及主线程
golang中的主线程:(可以理解为线程/也可以理解为进程),在一个Golang程序的主线程上可以起多个协程。Golang中多协程可以实现并行或者并发。
协程:可以理解为用户级线程,这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的。Golang的一大特色就是从语言层面原生持协程,在函数或者方法前面加go关键字就可创建一个协程。可以说Golang中的协程就是goroutine。
Golang中的多协程有点类似于Java中的多线程
多协程和多线程
多协程和多线程:Golang中每个goroutine(协程)默认占用内存远比Java、C的线程少。
OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB左右),一个goroutine(协程)占用内存非常小,只有2KB左右,多协程goroutine切换调度开销方面远比线程要少。
这也是为什么越来越多的大公司使用Golang的原因之一。
goroutine的使用以及sync.WaitGroup
并行执行需求
在主线程(可以理解成进程)中,开启一个goroutine,该协程每隔50毫秒秒输出“你好golang"
在主线程中也每隔50毫秒输出“你好golang",输出10次后,退出程序,要求主线程和goroutine同时执行。
这是时候,我们就可以开启协程来了,通过 go关键字开启
package mainimport ("fmt""time"
)// 协程需要运行的方法
func test() {for i := 0; i < 5; i++ {fmt.Println("test 你好Golang")time.Sleep(time.Millisecond * 100)}
}func main() {// 通过go关键字,就可以直接开启一个协程go test()// 这是主进程执行的for i := 0; i < 5; i++ {fmt.Println("main 你好Goland")time.Sleep(time.Millisecond * 100)}
}
上述的代码其实还有问题的,也就是说当主进程执行完毕后,不管协程有没有执行完成,都会退出
输出结果:
PS D:\Microsoft VS Code\GOproject\src\go_code\chapter3\goroutine> go run .\goroutine.go
main 你好Goland
test 你好Golang
test 你好Golang
main 你好Goland
main 你好Goland
test 你好Golang
test 你好Golang
main 你好Goland
main 你好Goland
test 你好Golang
这时使用我们就需要用到 sync.WaitGroup等待协程
首先我们需要创建一个协程计数器:
// 定义一个协程计数器
var wg sync.WaitGroup
然后当我们开启协程的时候,我们要让计数器加1
// 开启协程,协程计数器加1
wg.Add(1)
go tes
代码修改:
package mainimport ("fmt""sync"
)// 定义一个协程计数器,初始化
var wg sync.WaitGroup// 协程需要运行的方法
func test() {// 这是主进程执行的for i := 0; i < 100; i++ {fmt.Println("test 你好Golang", i)// time.Sleep(time.Millisecond * 100)}// 协程计数器减1wg.Done()
}func test2() {//这是主进程执行的for i := 0; i < 100; i++ {fmt.Println("test2 你好Golang", i)}// 协程计数器减1wg.Done()
}func main() {// 通过go关键字,就可以直接开启一个协程wg.Add(1)go test()//协程计数器加1wg.Add(1)go test2()//并发执行//与主Goroutinue并行运行//这是主进程执行的,主 Goroutine 执行循环for i := 0; i < 100; i++ {fmt.Println("main 你好Golang")}// 等待所有的协程执行完毕wg.Wait()fmt.Println("主线程退出")
}
使用 sync.WaitGroup
作为协程同步工具,用于等待所有 Goroutine 执行完毕。
两个 Goroutine 函数:
test()
:循环 100 次,每次打印并休眠 100ms。test2()
:循环 100 次,仅打印。
主函数 main()
:
- 启动两个 Goroutine(
test
和test2
)。 - 主 Goroutine 执行自己的循环 100 次。
- 最后通过
wg.Wait()
等待所有子 Goroutine 完成。
测试输出:
//主 Goroutine 的循环不会等待子 Goroutine,直接开始执行。输出 main 你好Golang 的速度最快(无任何阻塞)。//并发输出顺序:
//test 和 test2 的输出会交替出现:
//test 和 test2 各自循环 100 次,均无阻塞。
//它们的输出 test 你好Golang X 和 test2 你好Golang X 会随机交错//主 Goroutine 完成自己的循环后,执行 wg.Wait(),阻塞等待 test 和 test2 完成。
//此时 test 和 test2 可能仍在执行,也可能已结束(取决于调度)。//test 和 test2 在完成各自的 100 次循环后,调用 wg.Done(),将计数器分别减 1。
//当计数器归零时,wg.Wait() 解除阻塞。//最终输出 主线程退出,程序终止。
流程示例:
启动阶段:
主 Goroutine 调用 wg.Add(1) 两次,启动 test 和 test2。
两个子 Goroutine 开始并发执行。
并发执行阶段:
主 Goroutine:快速执行 100 次 fmt.Println("main 你好Golang")(无阻塞,几乎瞬间完成)。
子 Goroutine test:执行 100 次 fmt.Println("test 你好Golang", i)。
子 Goroutine test2:执行 100 次 fmt.Println("test2 你好Golang", i)。
输出顺序随机,例如:
main 你好Golang
test2 你好Golang 0
test 你好Golang 0
main 你好Golang
test2 你好Golang 1
...
同步等待阶段:
主 Goroutine 完成自己的循环后,调用 wg.Wait(),进入阻塞状态。
子 Goroutine test 和 test2 继续执行,直到完成各自的循环并调用 wg.Done()。
当计数器归零时,主 Goroutine 解除阻塞,输出 主线程退出。
练习例子1:
// 生产者-消费者模型
// 假设一个程序需要生成数据并处理数据,可以用两个协程分工合作:
package mainimport ("fmt""time"
)func producer(ch chan<- int) {for i := 0; i < 5; i++ {ch <- i // 发送数据到通道fmt.Printf("生产了数据:%d\n", i)time.Sleep(time.Second) // 模拟耗时}close(ch) // 生产完毕,关闭通道
}func consumer(ch <-chan int) {for num := range ch { // 从通道接收数据fmt.Printf("消费了数据: %d\n", num)}
}func main() {ch := make(chan int)go producer(ch) // 启动生产者协程consumer(ch) // 消费者在主协程运行
}// 生产者每秒生成一个数字,通过通道发送给消费者。
// 消费者实时接收并处理数据。
// 通道保证了生产者和消费者的同步,避免数据混乱
输出结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
消费了数据: 0
生产了数据:0
生产了数据:1
消费了数据: 1
消费了数据: 2
生产了数据:2
生产了数据:3
消费了数据: 3
生产了数据:4
消费了数据: 4
练习例子2:
// 例子2:多任务并行下载
// 用多个协程同时下载多个文件:
package mainimport ("fmt""sync""time"
)func download(url string, wg *sync.WaitGroup) {defer wg.Done() // 协程结束时通知WaitGroupfmt.Printf("开始下载: %s\n", url)time.Sleep(2 * time.Second) //模拟耗时fmt.Printf("下载完成: %s\n", url)
}// Add(n):计数器增加 n(每个任务启动前+1)
// Done():计数器减1(每个协程结束时调用)
// Wait():阻塞主协程,直到计数器归零。func main() {//初始化 sync.WaitGroup(计数器初始为0)var wg sync.WaitGroup//定义下载数据urls := []string{"file1.txt", "file2.png", "file3.jpg"}for _, url := range urls {wg.Add(1)go download(url, &wg) //启动多个下载协程}wg.Wait() //等待所有协程完成fmt.Println("所有文件下载完成")
}// 每个文件下载任务由一个协程独立执行。
// sync.WaitGroup 确保主协程等待所有下载完成后再结束。
// 协程的并行执行显著缩短了总耗时
输出结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
开始下载: file3.jpg
开始下载: file2.png
开始下载: file1.txt
下载完成: file1.txt
下载完成: file2.png
下载完成: file3.jpg
所有文件下载完成[Done] exited with code=0 in 8.174 seconds
设置Go并行运行的时候占用的cpu数量
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个oS线程上。
Go 语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。
package mainimport ("fmt""runtime"
)func main() {// 获取cpu个数npmCpu := runtime.NumCPU()fmt.Println("cup的个数:", npmCpu)// 设置允许使用的CPU数量runtime.GOMAXPROCS(runtime.NumCPU() - 1)
}
测试输出:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
cup的个数: 16[Done] exited with code=0 in 1.312 seconds
for循环开启多个协程
类似于Java里面开启多个线程,同时执行
// 启动10个协程并发执行,每个协程打印10次自己的编号(0到9)。
// 主线程等待所有协程结束后退出。
package mainimport ("fmt""sync"
)// 初始化全局变量
var vg sync.WaitGroupfunc test(num int) {for i := 0; i < 10; i++ {fmt.Printf("协程 (%v) 打印的第 %v 条数据 \n", num, i)}vg.Done()
}func main() {for i := 0; i < 10; i++ {vg.Add(1)go test(i) //启动协程}vg.Wait()fmt.Println("主线程退出")
}
输出结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
协程 (2) 打印的第 0 条数据
协程 (2) 打印的第 1 条数据
协程 (2) 打印的第 2 条数据
协程 (2) 打印的第 3 条数据
协程 (2) 打印的第 4 条数据
协程 (2) 打印的第 5 条数据
协程 (2) 打印的第 6 条数据
协程 (2) 打印的第 7 条数据
协程 (2) 打印的第 8 条数据
协程 (2) 打印的第 9 条数据
协程 (8) 打印的第 0 条数据
协程 (1) 打印的第 0 条数据
协程 (1) 打印的第 1 条数据
协程 (1) 打印的第 2 条数据
协程 (1) 打印的第 3 条数据
协程 (1) 打印的第 4 条数据
协程 (1) 打印的第 5 条数据
协程 (1) 打印的第 6 条数据
协程 (1) 打印的第 7 条数据
协程 (1) 打印的第 8 条数据
协程 (1) 打印的第 9 条数据
协程 (7) 打印的第 0 条数据
协程 (5) 打印的第 0 条数据
协程 (5) 打印的第 1 条数据
协程 (5) 打印的第 2 条数据
协程 (5) 打印的第 3 条数据
协程 (0) 打印的第 0 条数据
协程 (0) 打印的第 1 条数据
协程 (0) 打印的第 2 条数据
协程 (0) 打印的第 3 条数据
协程 (0) 打印的第 4 条数据
协程 (0) 打印的第 5 条数据
协程 (0) 打印的第 6 条数据
协程 (0) 打印的第 7 条数据
协程 (0) 打印的第 8 条数据
协程 (0) 打印的第 9 条数据
协程 (4) 打印的第 0 条数据
协程 (4) 打印的第 1 条数据
协程 (4) 打印的第 2 条数据
协程 (4) 打印的第 3 条数据
协程 (4) 打印的第 4 条数据
协程 (4) 打印的第 5 条数据
协程 (4) 打印的第 6 条数据
协程 (4) 打印的第 7 条数据
协程 (4) 打印的第 8 条数据
协程 (4) 打印的第 9 条数据
协程 (6) 打印的第 0 条数据
协程 (6) 打印的第 1 条数据
协程 (6) 打印的第 2 条数据
协程 (6) 打印的第 3 条数据
协程 (6) 打印的第 4 条数据
协程 (6) 打印的第 5 条数据
协程 (6) 打印的第 6 条数据
协程 (6) 打印的第 7 条数据
协程 (6) 打印的第 8 条数据
协程 (6) 打印的第 9 条数据
协程 (7) 打印的第 1 条数据
协程 (7) 打印的第 2 条数据
协程 (7) 打印的第 3 条数据
协程 (7) 打印的第 4 条数据
协程 (9) 打印的第 0 条数据
协程 (9) 打印的第 1 条数据
协程 (9) 打印的第 2 条数据
协程 (9) 打印的第 3 条数据
协程 (9) 打印的第 4 条数据
协程 (9) 打印的第 5 条数据
协程 (9) 打印的第 6 条数据
协程 (9) 打印的第 7 条数据
协程 (9) 打印的第 8 条数据
协程 (9) 打印的第 9 条数据
协程 (3) 打印的第 0 条数据
协程 (3) 打印的第 1 条数据
协程 (3) 打印的第 2 条数据
协程 (3) 打印的第 3 条数据
协程 (3) 打印的第 4 条数据
协程 (3) 打印的第 5 条数据
协程 (3) 打印的第 6 条数据
协程 (3) 打印的第 7 条数据
协程 (3) 打印的第 8 条数据
协程 (3) 打印的第 9 条数据
协程 (7) 打印的第 5 条数据
协程 (7) 打印的第 6 条数据
协程 (8) 打印的第 1 条数据
协程 (8) 打印的第 2 条数据
协程 (8) 打印的第 3 条数据
协程 (8) 打印的第 4 条数据
协程 (8) 打印的第 5 条数据
协程 (8) 打印的第 6 条数据
协程 (8) 打印的第 7 条数据
协程 (8) 打印的第 8 条数据
协程 (8) 打印的第 9 条数据
协程 (5) 打印的第 4 条数据
协程 (5) 打印的第 5 条数据
协程 (5) 打印的第 6 条数据
协程 (5) 打印的第 7 条数据
协程 (5) 打印的第 8 条数据
协程 (5) 打印的第 9 条数据
协程 (7) 打印的第 7 条数据
协程 (7) 打印的第 8 条数据
协程 (7) 打印的第 9 条数据
主线程退出[Done] exited with code=0 in 1.205 seconds
Channel管道
管道是Golang在语言级别上提供的goroutine间的通讯方式,我们可以使用channel在多个goroutine之间传递消息。如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Golang的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
Go语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
channel类型
channel是一种类型,一种引用类型。声明管道类型的格式如下:
// 声明一个传递整型的管道
var ch1 chan int
// 声明一个传递布尔类型的管道
var ch2 chan bool
// 声明一个传递int切片的管道
var ch3 chan []int
创建channel
声明管道后,需要使用make函数初始化之后才能使用
make(chan 元素类型, 容量)
举例如下:
// 创建一个能存储10个int类型的数据管道
ch1 = make(chan int, 10)
// 创建一个能存储4个bool类型的数据管道
ch2 = make(chan bool, 4)
// 创建一个能存储3个[]int切片类型的管道
ch3 = make(chan []int, 3)
channel操作
管道有发送,接收和关闭的三个功能
发送和接收 都使用 <- 符号
现在我们先使用以下语句
定义一个管道:
ch := make(chan int, 3)
发送
将数据放到管道内,将一个值发送到管道内
// 把10发送到ch中
ch <- 10
取操作
x := <- ch
关闭管道
通过调用内置的close函数来关闭管道
close(ch)
完整例子
package mainimport ("fmt""time"
)func main() {//创建管道,容量为2ch := make(chan int, 2)//启动子协程持续读取go func() {for num := range ch {fmt.Printf("读取到数据:%d (剩余长度: %d)", num, len(ch))time.Sleep(time.Second)}}()//主协程写入数据ch <- 1fmt.Println("写入1,当前长度为:", len(ch))ch <- 2fmt.Println("写入2,当前长度为:", len(ch))//主协程等待3秒后关闭管道time.Sleep(3 * time.Second)close(ch)for num := range ch { // 尝试读取所有剩余数据fmt.Println("关闭后读取:", num)}fmt.Println("主协程退出")
}
输出结果:
写入1,当前长度为: 1 // 主协程顺序输出
读取到数据:1 (剩余长度: 0) // 子协程第一次读取,立即抢占执行
写入2,当前长度为: 1 // 主协程继续写入,此时子协程还未读2,所以长度暂时为1
读取到数据:2 (剩余长度: 0) // 子协程在下一轮循环读取2
主协程退出 // 管道关闭后无残留数据,循环不执行
疑问解答
1. 为什么 写入2
时的长度是1?
- 主协程写入1
- 管道长度变为
1
。 fmt.Println
立即打印这个值。- 子协程被调度,读取数据1,长度变为
0
。
- 管道长度变为
- 主协程写入2
- 管道长度再次变为
1
。 fmt.Println
打印时的长度是基于写入后的当前状态(后续被子协程读取才会减为0)。
- 管道长度再次变为
2. 为什么关闭管道后没有输出?
- 子协程在
time.Sleep
之前已经读取所有数据- 写入1和2后,主协程休眠3秒。
- 子协程期间完成了两次读取(两次
time.Sleep
总和为2秒),剩余1秒还在休眠。 - 主协程休眠结束后关闭管道,此时管道已空。
for num := range ch
直接跳过,不输出任何内容。
for range从管道循环取值
当向管道中发送完数据时,我们可以通过close函数来关闭管道,当管道被关闭时,再往该管道发送值会引发panic,从该管道取值的操作会去完管道中的值,再然后取到的值一直都是对应类型的零值。那如何判断一个管道是否被关闭的呢?
未关闭管道——导致死锁
package mainimport ("fmt"
)func main() {//定义管道,容量为10ch := make(chan int, 10)// 存入10个值(管道缓冲填满)for i := 0; i < 10; i++ {ch <- i}// 尝试从管道中读取数据(无其他协程写入)for value := range ch {// 问题所在:未关闭管道,循环会持续等待新数据fmt.Println(value)}// 所有数据读完后,主协程阻塞在此处,触发死锁
}
测试结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]:
main.main()d:/Microsoft VS Code/GOproject/src/go_code/goroutine/test1.go:12 +0xb9
exit status 2[Done] exited with code=1 in 1.355 seconds
原因:管道中有10个值,for-range
会循环读取完所有值后继续等待新的数据。由于管道未关闭,且没有其他协程向管道写数据,主协程会永久阻塞,触发死锁。
正确关闭管道——避免死锁
package mainimport ("fmt"
)func main() {//定义管道,容量为10ch := make(chan int, 10)// 存入10个值(管道缓冲填满)for i := 0; i < 10; i++ {ch <- i}// 关键操作:告知接收方管道已不再写入close(ch)for value := range ch {// 读取现存数据后自动退出循环fmt.Println(value)}fmt.Println("管道已关闭且数据读取完毕,程序正常退出")
}
输出结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
0
1
2
3
4
5
6
7
8
9
管道已关闭且数据读取完毕,程序正常退出[Done] exited with code=0 in 1.308 seconds
原因:管道关闭后,for-range
会遍历所有已存储的数据,并在读取完最后一个数据后自动退出循环,不会陷入无限等待。
核心总结
操作 | 是否关闭管道 | 结果 | 原理 |
---|---|---|---|
for-range 读取 | 关闭 | ✅ 正常退出 | 关闭后,读完已有的数据即退出循环,不会等待新数据。 |
for-range 读取 | 未关闭 | ❌ 死锁 | 持续等待新数据,但无其他协程写入,主协程永久阻塞。 |
goroutine 结合 channel 管道
需求1:定义两个方法,一个方法给管道里面写数据,一个给管道里面读取数据。要求同步进行。
- 开启一个fn1的的协程给向管道inChan中写入10条数据
- 开启一个fn2的协程读取inChan中写入的数据
- 注意:fn1和fn2同时操作一个管道
- 主线程必须等待操作完成后才可以退出
管道是安全的,是一边写入,一边读取,当读取比较快的时候,会等待写入
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroup // 同步原语:等待组func write(ch chan int) {// 生产者:循环向通道写入10个数据for i := 0; i < 10; i++ {fmt.Println("写入:", i)ch <- i// 控制写入速度time.Sleep(time.Millisecond * 10)}close(ch)// 完成一个任务-计数器减1wg.Done()
}func read(ch chan int) {// 通道关闭后自动退出循环for v := range ch {fmt.Println("读取:", v)time.Sleep(time.Millisecond * 10)}wg.Done()
}func main() {// 初始化一个容量为10的缓冲通道ch := make(chan int, 10)// 启动写协程(生产者)wg.Add(1)go write(ch)// 启动读协程(消费者)wg.Add(1)go read(ch)// 等待所有协程完成(计数器归零)wg.Wait()fmt.Println("主线程执行完毕")
}
该程序实现了一个生产者-消费者模型,使用带缓冲的通道(channel)、协程(goroutine)和等待组(sync.WaitGroup)来实现并发控制。
输出结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
写入: 0
读取: 0
写入: 1
读取: 1
写入: 2
读取: 2
写入: 3
读取: 3
写入: 4
读取: 4
写入: 5
读取: 5
写入: 6
读取: 6
写入: 7
读取: 7
写入: 8
读取: 8
写入: 9
读取: 9
主线程执行完毕[Done] exited with code=0 in 6.345 seconds
goroutine 结合 channel打印素数
package mainimport ("fmt""math""sync"
)var wg sync.WaitGroup// 向intChan中放入 1~ 120000个数
func putNum(inChan chan int) {for i := 2; i < 12000; i++ { // 从2开始,0和1不是素数inChan <- i}close(inChan)wg.Done()
}// 从intChan取出数据,并判断是否为素数,
// 如果是的话,就把得到的素数放到primeChan中
func primeNum(inChan chan int, primeChan chan int, exitChan chan bool) {for value := range inChan {var flag = true// 不是素数的情况if value <= 1 {flag = false} else {for i := 2; i <= int(math.Sqrt(float64(value))); i++ {if value%i == 0 { // 修正这里flag = falsebreak}}}if flag {// 是素数的话primeChan <- value}}exitChan <- truewg.Done()
}// 打印素数
func printPrime(primeChan chan int) {for value := range primeChan {fmt.Println(value)}wg.Done()
}func main() {// 写入数字intChan := make(chan int, 1000)// 存放素数primeChan := make(chan int, 1000)// 存放 primeChan退出状态exitChan := make(chan bool, 10) // 改为10,与primeNum协程数量一致// 开启写的协程wg.Add(1)go putNum(intChan)// 开启计算素数的协程for i := 0; i < 10; i++ {wg.Add(1)go primeNum(intChan, primeChan, exitChan)}// 开启打印协程wg.Add(1)go printPrime(primeChan)// 等待所有primeNum协程完成wg.Add(1)go func() {for i := 0; i < 10; i++ { // 改为10,与primeNum协程数量一致<-exitChan}close(primeChan)wg.Done()}()wg.Wait()fmt.Println("主线程执行完毕")
}
输出结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
2
3
5
7
11
13
17
19
23
29
31
37
41
43
47
...
...
11941
11953
11959
11117
11969
11971
11987
11981
主线程执行完毕[Done] exited with code=0 in 1.263 seconds
代码详细讲解:
通道定义
go
复制
intChan := make(chan int, 1000) // 缓冲数字(生产协程 -> 计算协程)
primeChan := make(chan int, 1000) // 缓冲素数结果(计算协程 -> 打印协程)
exitChan := make(chan bool, 10) // 记录计算协程完成状态(协助关闭primeChan)
核心函数
函数名 | 作用 | 协程类型 |
---|---|---|
putNum | 向 intChan 中写入2~11999的数字 | 生产协程 |
primeNum | 从 intChan 读取数字并判断是否为素数,结果写入 primeChan | 计算协程 |
printPrime | 从 primeChan 读取并打印所有素数 | 打印协程 |
匿名函数 | 监控所有计算协程的完成状态,随后关闭 primeChan | 协调协程 |
协程启动顺序
生产协程 (putNum
):首先启动,填充 intChan
。
10个计算协程 (primeNum
):并发启动,消费 intChan
。
打印协程 (printPrime
):持续消费 primeChan
,实时输出素数。
协调协程:等待所有 primeNum
协程完成后关闭 primeChan
。
wg.Add(1)
go putNum(intChan) // 启动生产协程for i := 0; i < 10; i++ { // 启动10个计算协程wg.Add(1)go primeNum(...)
}wg.Add(1)
go printPrime(primeChan) // 启动打印协程wg.Add(1)
go func() { ... }() // 启动协调协程
同步机制
-
sync.WaitGroup
:总计数器为
1(putNum) + 10(primeNum) + 1(printPrime) + 1(协调协程) = 13
- 每个协程结束后调用
wg.Done()
,主线程通过wg.Wait()
等待所有协程退出。
- 每个协程结束后调用
-
exitChan
的设计- 每个计算协程 (
primeNum
) 结束后发送一个true
到exitChan
。 - 协调协程读取10次
exitChan
后触发primeChan
关闭,终止printPrime
协程。
- 每个计算协程 (
func primeNum(...) {for value := range inChan {flag := trueif value <= 1 { flag = false } else {// 计算范围优化:只需检查到 sqrt(value)sqrtVal := int(math.Sqrt(float64(value)))for i := 2; i <= sqrtVal; i++ {if value%i == 0 {flag = falsebreak}}}if flag {primeChan <- value // 素数进入结果通道}}
}
单向管道
有时候我们会将管道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中,使用管道都会对其进行限制,比如限制管道在函数中只能发送或者只能接受
默认的管道是 可读可写
单向管道的作用:
chan<- int
:表示只能向该管道写入数据(如producer
函数)。<-chan int
:表示只能从该管道读取数据(如consumer
函数)。- Go编译器会检查单向管道的违规操作(如尝试在只写管道中读取数据会直接报错)
练习一下:
package mainimport ("fmt""time"
)// 生产函数
func producer(writeChan chan<- int) {for i := 0; i < 5; i++ {fmt.Printf("生产者发送:%d\n", i)writeChan <- itime.Sleep(time.Second)}close(writeChan)
}// 消费函数
func consumer(readChan <-chan int) {for num := range readChan {fmt.Printf("消费者收到: %d\n", num)}
}func main() {//创建一个双向管道,容量为3ch := make(chan int, 3)// 启动生产者协程(将双向管道转为只写管道传入)go producer(ch)consumer(ch)// 主协程作为消费者(将双向管道转为只读管道传入)fmt.Println("程序结束")
}
测试输出:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
生产者发送:0
消费者收到: 0
生产者发送:1
消费者收到: 1
生产者发送:2
消费者收到: 2
生产者发送:3
消费者收到: 3
生产者发送:4
消费者收到: 4
程序结束[Done] exited with code=0 in 11.354 seconds
Select多路复用
在某些场景下我们需要同时从多个通道接收数据。这个时候就可以用到golang中给我们提供的select多路复用。
通常情况通道在接收数据时,如果没有数据可以接收将会发生阻塞。
package mainimport ("fmt""math/rand""time"
)func main() {// 创建通道:数据通道(无缓冲)、超时通道(3秒后触发)dataChan := make(chan int)// timeoutChan 在3秒后会收到一个时间对象timeoutChan := time.After(3 * time.Second)//启动生产者协程:数据通道(无缓冲)、超时通道(3秒后超时)go func() {// 生成随机延迟时间(0~4秒区间)sleepTime := time.Duration(rand.Intn(5)) * time.Second// 模拟耗时操作(如网络请求、IO操作)time.Sleep(sleepTime)// 向数据通道发送计算结果dataChan <- 42}()fmt.Println("等待数据或超时(最多3秒)...")//select多路复用机制,优先级:同时就绪时随机选择select {case data := <-dataChan: // 接收到数据时的处理fmt.Printf("收到数据: %d\n", data)// 可以在该分支补充其他业务逻辑,如数据处理case <-timeoutChan:fmt.Println("超时,未收到数据!")// 可在此设置重试/写日志/返回错误状态等}fmt.Println("程序结束")
}
多路复用机制:
select
同时监听多个case
操作(如dataChan
和timeoutChan
)。- 当 任一通道就绪(有数据可读或可写)时,对应
case
执行。 - 如果 多个通道同时就绪,Go 会随机选择一个执行。
测试输出:
//测试1
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
等待数据或超时(最多3秒)...
超时,未收到数据!
程序结束[Done] exited with code=0 in 9.378 seconds//测试2
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
等待数据或超时(最多3秒)...
收到数据: 42
程序结束[Done] exited with code=0 in 8.286 seconds
tip:使用select来获取数据的时候,不需要关闭chan,不然会出现问题
Goroutine Recover解决协程中出现的Panic
Go语言中,如果一个协程发生未捕获的 panic
,会导致 整个程序崩溃退出(即使其他协程正常)。如果不处理这一问题,在处理关键任务的系统中,单点故障可能引发级联崩溃。
核心机制
Go语言通过两个层级处理协程外的错误隔离:
协程间隔离:默认情况下,协程之间的Panic相互独立
全局进程级防护:任意未恢复的Panic都会导致整个程序终止
无Recover的危害案例
假设以下业务场景:
- 服务器处理用户请求 → 分发到10个协程处理
- 金融数处理 → 其中一个协程触发除零错误(division by zero)
- 系统后果 → 所有请求处理中断,服务完全崩溃
灾难性后果:单点破坏全局稳定,LOGO投资人血本无归
Recover机制生效原理
必须满足 三个核心条件:
条件项 | 作用描述 | 典型错误应用场景 |
---|---|---|
位于defer函数内部 | Go要求Recover只能在延迟函数中触发 | 直接在主流程调用recover() 无效 |
正确捕获层级 | 只能在发生Panic的函数栈内 | 跨协程直接调用Recover无效 |
主动触发机制 | 需通过逻辑主动触犯Panic事件 | – |
通过defer结合recover函数,可以在发生异常时进行恢复操作,并获取异常信息进行处理。这有助于程序的健壮性和错误处理。
package mainimport ("fmt""math/rand""strings""sync""time"
)// 情况:需要并发处理10个文件,某个文件处理可能引发Panic
// 目标:单个文件处理失败不能影响其他文件,需记录错误信息// 文件处理函数(随机触发Panic)
func ProcessFile(filename string, wg *sync.WaitGroup) {defer wg.Done() // 确保资源释放//核心:每个协程必须包含自己的recover机制//通过 defer 和 recover 实现协程级别的错误恢复机制defer func() {if err := recover(); err != nil {fmt.Printf("[Error]文件 %s 处理失败: %v\n", filename, err)}}()//如果当前协程中发生任何未捕获的 panic(如空指针、索引越界等错误)//这段代码会 拦截异常 并打印错误信息,而非直接退出整个程序。// 模拟文件处理时随机出现异常if rand.Intn(10) < 3 { // 30%概率触发异常panic("文件格式解析失败:非标准文件结构")}//正常处理逻辑fmt.Printf("正在处理文件: %s\n", filename)time.Sleep(500 * time.Millisecond)fmt.Printf("完成处理文件:%s\n", filename)
}// 打印分割线的工具函数
func PrintDivider() {fmt.Println("\n" + strings.Repeat("=", 50) + "\n")
}func main() {rand.Intn(1000)var wg sync.WaitGroupfiles := []string{"用户数据.xlsx","机密文档.zip","图片1.jpg","损坏文件.bin","备份.tar.gz",}PrintDivider()fmt.Println("开始并发文件处理流程(安全模式)")// 批量启动文件处理协程for _, f := range files {wg.Add(1)go ProcessFile(f, &wg)}wg.Wait()PrintDivider()fmt.Println("所有文件处理流程结束(包含成功和失败的任务)")
}
输出结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"==================================================开始并发文件处理流程(安全模式)
正在处理文件: 用户数据.xlsx
正在处理文件: 备份.tar.gz
正在处理文件: 图片1.jpg
[Error]文件 机密文档.zip 处理失败: 文件格式解析失败:非标准文件结构
正在处理文件: 损坏文件.bin
完成处理文件:损坏文件.bin
完成处理文件:图片1.jpg
完成处理文件:用户数据.xlsx
完成处理文件:备份.tar.gz==================================================所有文件处理流程结束(包含成功和失败的任务)[Done] exited with code=0 in 7.048 seconds
┌────────────────┐│ 协程开始运行 │└───┬───────────┬┘│ 业务代码 │▼ ║ 发生错误(panic) ║ ║ ▼ 触发流程中断 正常结束╚═════════▶┌─────────┐│ │ 正常结束 │┌─────▼─────┐ └─────────┘│ 执行defer链 │┌───────┤尝试调用recover├───┐▼ └─────┬───┬───┘ ▼
无错误 捕获到错误 → 记录并恢复执行▼ 显示错误后步骤继续正常结束
通过在协程内部实现 panic
恢复,确保了程序的健壮性和业务的连续性,是现代高并发系统设计的重要实践。
Go中的并发安全和互斥锁
如下面一段代码,我们在并发环境下进行操作,就会出现并发访问的问题
var count = 0
var wg sync.WaitGroupfunc test() {count++fmt.Println("the count is : ", count)time.Sleep(time.Millisecond)wg.Done()
}
func main() {for i := 0; i < 20; i++ {wg.Add(1)go test()}time.Sleep(time.Second * 10)
}
输出结果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
the count is : 1
the count is : 7
the count is : 2
the count is : 3
the count is : 4
the count is : 6
the count is : 9
the count is : 10
the count is : 11
the count is : 17
the count is : 12
the count is : 13
the count is : 8
the count is : 14
the count is : 15
the count is : 5
the count is : 18
the count is : 19
the count is : 20
the count is : 16[Done] exited with code=0 in 11.013 seconds
最终结果不稳定。
互斥锁
互斥锁是传统并发编程中对共享资源进行访问控制的主要手段,它由标准库sync中的Mutex结构体类型表示。sync.Mutex类型只有两个公开的指针方法,Lock和Unlock。Lock锁定当前的共享资源,Unlock 进行解锁
// 定义一个锁
var mutex sync.Mutex
// 加锁
mutex.Lock()
// 解锁
mutex.Unlock()
使用互斥锁:
package mainimport ("fmt""sync""time"
)// 共享资源
var count = 0// 同步控制组合
var (wg sync.WaitGroup // 等待组用于等待所有goroutine完成mutex sync.Mutex // 互斥锁用于保护count的原子操作
)func safeIncrement() {// 确保无论是否panic都会标记完成defer wg.Done()// Step 1. 加锁保护临界区mutex.Lock()// 延迟解锁确保即使在panic情况下也能解锁defer mutex.Unlock()// Step 2. 安全操作资源temp := counttemp++ // 模拟操作耗时(这里只是示例,实际应直接count++)time.Sleep(1 * time.Millisecond) // 故意放大并发问题效果count = temp// Step 3. 安全读取资源(仍处于锁保护中)fmt.Printf("安全更新后的值:%d\n", count)// 注意:所有对共享资源count的操作都在锁的保护范围内
}func main() {//启动20个并发操作for i := 0; i < 20; i++ {wg.Add(1)go safeIncrement()}// 正确等待方式(替换原来的Sleep猜测等待)wg.Wait()// 验证最终结果fmt.Println("\n最终结果:", count) // 现在一定会输出20
}
关键点
互斥锁的作用:确保同一时间只有一个goroutine能访问count
,防止数据竞争
等待组的作用:确保主goroutine等待所有工作goroutine完成
defer的使用:确保锁一定会被释放,即使发生panic
测试结果:
main()函数
启动20个并发goroutine调用safeIncrement()
使用wg.Wait()等待所有goroutine完成
打印最终结果[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
安全更新后的值:1
安全更新后的值:2
安全更新后的值:3
安全更新后的值:4
安全更新后的值:5
安全更新后的值:6
安全更新后的值:7
安全更新后的值:8
安全更新后的值:9
安全更新后的值:10
安全更新后的值:11
安全更新后的值:12
安全更新后的值:13
安全更新后的值:14
安全更新后的值:15
安全更新后的值:16
安全更新后的值:17
安全更新后的值:18
安全更新后的值:19
安全更新后的值:20最终结果: 20[Done] exited with code=0 in 1.003 seconds
safeIncrement()
函数
- 使用
defer wg.Done()
确保函数退出时通知等待组 - 获取互斥锁
mutex.Lock()
并defer mutex.Unlock()
确保锁会被释放 - 安全地读取、修改并写回
count
值 - 打印当前计数值
main()
函数
- 启动20个并发goroutine调用
safeIncrement()
- 使用
wg.Wait()
等待所有goroutine完成 - 打印最终结果
输出流程
初始化阶段:
count
初始化为0- 创建20个goroutine,每个都调用
safeIncrement()
并发执行阶段:
- 每个goroutine尝试获取互斥锁
- 只有一个goroutine能成功获取锁,其他被阻塞
- 获取锁的goroutine:
- 读取
count
值到临时变量 - 增加临时变量值
- 短暂睡眠(1ms)
- 将新值写回
count
- 打印当前值
- 释放锁
- 读取
- 下一个goroutine获取锁并重复上述过程
完成阶段:
- 所有goroutine完成后,
wg.Wait()
解除阻塞 - 打印最终结果
20
读写互斥锁
互斥锁的本质是当一个goroutine访问的时候,其他goroutine都不能访问。这样在资源同步,避免竞争的同时也降低了程序的并发性能。程序由原来的并行执行变成了串行执行。
其实,当我们对一个不会变化的数据只做“读”操作的话,是不存在资源竞争的问题的。因为数据是不变的,不管怎么读取,多少goroutine同时读取,都是可以的。
所以问题不是出在“读”上,主要是修改,也就是“写”。修改的数据要同步,这样其他goroutine才可以感知到。所以真正的互斥应该是读取和修改、修改和修改之间,读和读是没有互斥操作的必要的。
因此,衍生出另外一种锁,叫做读写锁。
读写锁可以让多个读操作并发,同时读取,但是对于写操作是完全互斥的。也就是说,当一个goroutine进行写操作的时候,其他goroutine既不能进行读操作,也不能进行写操作。
会输出20
}
**关键点****互斥锁的作用**:确保同一时间只有一个goroutine能访问`count`,防止数据竞争**等待组的作用**:确保主goroutine等待所有工作goroutine完成**defer的使用**:确保锁一定会被释放,即使发生panic测试结果:~~~go
main()函数
启动20个并发goroutine调用safeIncrement()
使用wg.Wait()等待所有goroutine完成
打印最终结果[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
安全更新后的值:1
安全更新后的值:2
安全更新后的值:3
安全更新后的值:4
安全更新后的值:5
安全更新后的值:6
安全更新后的值:7
安全更新后的值:8
安全更新后的值:9
安全更新后的值:10
安全更新后的值:11
安全更新后的值:12
安全更新后的值:13
安全更新后的值:14
安全更新后的值:15
安全更新后的值:16
安全更新后的值:17
安全更新后的值:18
安全更新后的值:19
安全更新后的值:20最终结果: 20[Done] exited with code=0 in 1.003 seconds
safeIncrement()
函数
- 使用
defer wg.Done()
确保函数退出时通知等待组 - 获取互斥锁
mutex.Lock()
并defer mutex.Unlock()
确保锁会被释放 - 安全地读取、修改并写回
count
值 - 打印当前计数值
main()
函数
- 启动20个并发goroutine调用
safeIncrement()
- 使用
wg.Wait()
等待所有goroutine完成 - 打印最终结果
输出流程
初始化阶段:
count
初始化为0- 创建20个goroutine,每个都调用
safeIncrement()
并发执行阶段:
- 每个goroutine尝试获取互斥锁
- 只有一个goroutine能成功获取锁,其他被阻塞
- 获取锁的goroutine:
- 读取
count
值到临时变量 - 增加临时变量值
- 短暂睡眠(1ms)
- 将新值写回
count
- 打印当前值
- 释放锁
- 读取
- 下一个goroutine获取锁并重复上述过程
完成阶段:
- 所有goroutine完成后,
wg.Wait()
解除阻塞 - 打印最终结果
20