爱站网使用体验wordpress页脚二维码

news/2025/9/27 1:48:55/文章来源:
爱站网使用体验,wordpress页脚二维码,电子公司网站设计,百度网站建设费用#x1f4a2;欢迎来到张胤尘的开源技术站 #x1f4a5;开源如江河#xff0c;汇聚众志成。代码似星辰#xff0c;照亮行征程。开源精神长#xff0c;传承永不忘。携手共前行#xff0c;未来更辉煌#x1f4a5; 文章目录 通道通道声明初始化缓冲机制无缓冲通道代码示例 带… 欢迎来到张胤尘的开源技术站 开源如江河汇聚众志成。代码似星辰照亮行征程。开源精神长传承永不忘。携手共前行未来更辉煌 文章目录 通道通道声明初始化缓冲机制无缓冲通道代码示例 带缓冲通道代码示例 通道操作发送数据接收数据单向通道单向通道的用途代码示例 多路复用代码示例通道复用器的实现 超时机制time.Aftercontext 关闭通道检查通道是否关闭获取通道长度获取通道容量 源码解析通道结构体创建通道函数原型函数内容 发送数据函数原型函数内容 接收数据函数原型函数内容 关闭通道函数原型函数内容 通道 在传统的并发编程中多个线程或进程之间通常通过共享内存来通信。这种模型虽然高效但容易引发 竞争条件 和 死锁 等问题。为了避免这些问题程序员在开发时需要使用复杂的同步机制例如锁、信号量等来保护共享数据。但是 golang 采用了不同的思路避免共享内存通过通信来实现并发。 示意图所下所示 通道就是 golang 中实现 Goroutine 之间通信的机制。它是一种 类型化的通道允许多个 Goroutine 之间安全地传递数据。通道是 Golang 并发模型的核心它解决了传统并发编程中共享内存带来的复杂性和风险。 本篇文章主要介绍的是通道有关于协程、并发编程等相关知识点请关注后续文章《Golang 协程》、《Golang 并发编程》。 通道声明 使用 var 关键字声明通道变量。如下所示 var ch chan int声明后ch 是一个未初始化的通道其默认值为 nil。 初始化 使用 make 函数初始化通道如下所示 ch make(chan int)也可以使用 make 函数创建一个带缓冲区的通道如下所示 ch make(chan int, 10)缓冲机制 通道分为 无缓冲通道 和 带缓冲通道它们在行为和使用场景上有一些关键区别。 无缓冲通道 在初始化的小结中使用 make 函数进行初始化通道当没有指定通道的容量或者说通道的容量大小为 0 时称为无缓冲通道。 无缓冲通道在发送数据和接收数据时存在如下的特点 发送操作发送数据时发送方会阻塞直到有接收方准备接收数据。接收操作接收方会阻塞直到有发送方发送数据。无缓冲通道的发送和接收操作是同步的必须有发送方和接收方同时准备好才能完成通信。 代码示例 package mainimport (fmtsynctime )func main() {unbufferedChan : make(chan int) // 创建无缓冲通道var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()fmt.Println(receiver is waiting...) // receiver is waiting...time.Sleep(time.Second * 3) // 模拟3秒的准备时间data : -unbufferedChan // 在这行函数执行之前发送方处于阻塞状态fmt.Println(received:, data)}()unbufferedChan - 42 // 发送方阻塞直到接收方准备好close(unbufferedChan) // 关闭通道wg.Wait() }带缓冲通道 当指定了通道的容量例如 make(chan int, 10)则称为带缓冲通道。 带缓冲通道在发送数据和接收数据时存在如下的特点 发送操作发送数据时如果缓冲区未满数据会被放入缓冲区发送方不会阻塞如果缓冲区已满发送方会阻塞直到缓冲区有空间。接收操作接收方从缓冲区中取出数据如果缓冲区为空接收方会阻塞直到有数据可用。带缓冲通道的发送和接收操作是异步的发送方和接收方不需要同时准备好。缓冲区的存在允许数据在发送方和接收方之间暂时存储。 代码示例 package mainimport (fmtsynctime )func main() {bufferedChan : make(chan int, 2) // 创建带缓冲通道容量大小为2var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()fmt.Println(receiver is waiting...)// for {// select {// case data, ok : -bufferedChan:// if ok {// fmt.Println(received:, data)// } else {// fmt.Println(bufferedChan closed)// return// }// }// }for {time.Sleep(time.Second * 5) // 模拟接收方不从缓冲通道中接收数据}}()bufferedChan - 42bufferedChan - 43bufferedChan - 44 // 如果接收方一直未接收数据则在此处会发送阻塞fmt.Println(send data over ...)close(bufferedChan) // 关闭通道wg.Wait() }通道操作 发送数据 使用 - 操作符将数据发送到通道如下所示 package mainfunc main() {ch : make(chan int)ch - 10 // 向通道中发送一个 10close(ch) // 关闭通道 }接收数据 使用 - 操作符从通道中读取数据如下所示 package mainimport fmtfunc main() {ch : make(chan int, 1)ch - 10 // 向通道中发送一个 10data : -ch // 通道中读取一个数据fmt.Println(data) // 10close(ch) // 关闭通道 }单向通道 golang 中提供了单向通道这么一种特殊的通道类型它只能用于发送或接收数据而不能同时进行发送和接收操作。单向通道在类型声明上与普通通道双向通道有所不同主要用于限制通道的使用方式从而提高代码的可读性和安全性。 单向通道有如下两种类型 只发送通道用于发送数据但不能接收数据。 chan- Type只接收通道用于接收数据但不能发送数据。 -chan Type单向通道的用途 单向通道的主要用途是限制通道的使用范围避免在函数或方法中滥用通道的发送和接收功能。例如 当一个函数只需要从通道中读取数据时使用只接收通道可以明确表示该函数的意图。当一个函数只需要向通道中写入数据时使用只发送通道可以避免意外读取通道中的数据。 代码示例 package mainimport (fmtsync )func producer(ch chan- int, wg *sync.WaitGroup) { // ch 参数一个只发送通道defer wg.Done()for i : 0; i 5; i {ch - i // 向通道发送数据fmt.Printf(sent: %d\n, i)}close(ch) // 关闭通道 }func consumer(ch -chan int, wg *sync.WaitGroup) { // ch 参数一个只接收通道defer wg.Done()for {select {case data, ok : -ch: // 读取通道数据if ok {fmt.Printf(received: %d\n, data)} else {fmt.Println(chan closed ...)return}}} }func main() {var wg sync.WaitGroupch : make(chan int) // 创建一个双向通道wg.Add(2)// received: 0// sent: 0// sent: 1// received: 1// received: 2// sent: 2// sent: 3// received: 3// received: 4// sent: 4// chan closed ...go consumer(ch, wg) // 接收者go producer(ch, wg) // 发送者wg.Wait() } 多路复用 通道的多路复用机制是一种用于处理多个通道操作的技术它允许程序同时等待多个通道的读写操作。这种机制的核心是 select...case 语句它提供了对多个通道操作的并发处理能力。 select { case -ch1:// 处理 ch1 的读操作 case data : -ch2:// 处理 ch2 的读操作并将读取到的数据赋值给 data case ch3 - value:// 向 ch3 中发送值 default:// 如果没有通道准备好则执行默认逻辑 }另外需要强调的是如果多个通道同时准备好select...case 会随机选择一个通道执行操作如果没有任何一个通道准备好则 select 会陷入阻塞除非有 default 分支执行。 需要注意的是上面提到的 通道准备好 是个口语对于接收操作来说需要满足以下两个条件中的任意一个 通道中有数据可读如果通道的缓冲区中有数据或者有发送操作正在等待发送数据到通道中那么接收操作就准备好。通道已关闭即使通道中没有数据如果通道已经被关闭接收操作也会立即返回零值并且 ok 为 false如果使用了 data, ok : -ch 的形式。 对发送操作来说也需要满足以下两个条件中的任意一个 通道中有空间可写如果通道是无缓冲的并且有等待接收的协程或者通道是缓冲的且缓冲区中有空闲位置那么发送操作就准备好。通道已关闭如果通道已经被关闭发送操作会触发 panic(send on closed channel)。 代码示例 通过 select 同时处理多个通道如下所示 package mainimport (fmttime )func main() {ch1 : make(chan string) // 创建通道 ch1ch2 : make(chan string) // 创建通道 ch2go func() { // 每间隔 1 秒向通道 ch1 中发送数据一共发送 10 条数据defer close(ch1)for i : 1; i 10; i {ch1 - message from ch1time.Sleep(1 * time.Second)}}()go func() { // 每间隔 2 秒向通道 ch2 中发送数据一共发送 10 条数据defer close(ch2)for i : 1; i 10; i {ch2 - message from ch2time.Sleep(2 * time.Second)}}()// no data reception ...// recived: message from ch2// recived: message from ch1// no data reception ...// no data reception ...// recived: message from ch1// no data reception ...// recived: message from ch2// recived: message from ch1// no data reception ...// recived: message from ch1// no data reception ...// no data reception ...// recived: message from ch1// recived: message from ch2// no data reception ...// recived: message from ch1// no data reception ...// recived: message from ch1// recived: message from ch2// no data reception ...// recived: message from ch1// no data reception ...// no data reception ...// recived: message from ch1// recived: message from ch2// no data reception ...// recived: message from ch1// no data reception ...// recived: message from ch2// ch1 chan closed ...// recived: message from ch2// recived: message from ch2// recived: message from ch2// recived: message from ch2// ch2 chan closed ...// both channels closed. exiting...ch1_s, ch2_s : true, truefor ch1_s || ch2_s {select {case data, ok : -ch1:if ok {fmt.Println(recived: , data)} else if ch1_s {fmt.Println(ch1 chan closed ...)ch1_s false}case data, ok : -ch2:if ok {fmt.Println(recived: , data)} else if ch2_s {fmt.Println(ch2 chan closed ...)ch2_s false}default:fmt.Println(no data reception ...)time.Sleep(time.Second)}}fmt.Println(both channels closed. exiting...) } 通道复用器的实现 在某些复杂场景中可能需要手动实现通道复用器。例如可以将多个通道的输出合并到一个通道中从而简化后续的处理逻辑。 下面是一个简单的通道复用器实现思路 package mainimport (fmtsynctime )func send(ch chan- int) { // 模拟数据发送每秒发送一个数字defer close(ch)for i : 1; i 10; i {ch - itime.Sleep(time.Second)} }func multiplexer(channels ...-chan int) -chan int { // 合并多个通道数据函数返回最终的一个只读通道out : make(chan int)go func() {defer close(out)var wg sync.WaitGroupfor _, ch : range channels {wg.Add(1)go func(ch -chan int) {defer wg.Done()for {select {case data, ok : -ch:if ok {out - data // 数据发送到最终的只读通道} else {return}}}}(ch)}wg.Wait()}()return out }func main() {ch1 : make(chan int) // 创建通道 ch1ch2 : make(chan int) // 创建通道 ch2ch3 : make(chan int) // 创建通道 ch3go send(ch1)go send(ch2)go send(ch3)ret_ch : multiplexer(ch1, ch2, ch3) // 接收返回的通道for data : range ret_ch { // 从只读通道中获取合并后的数据fmt.Printf(ret received: %d\n, data)}fmt.Println(ret chan closed ...) }超时机制 在 golang 中通道的超时机制可以通过 time.After 或 context 两种方式来实现。 time.After time.After 是一个返回通道的函数会在指定的超时时间后向通道发送一个时间值。结合 select...case 语句可以实现超时逻辑。如下所示 package mainimport (fmttime )func main() {ch : make(chan string) // 创建通道go func() {time.Sleep(3 * time.Second) // 模拟耗时操作ch - 任务完成}()select {case res : -ch:fmt.Println(res)case -time.After(2 * time.Second): // 设置超时为2秒fmt.Println(超时退出) // 最终打印 超时退出} }time.After 是一种非常简洁且易于理解的超时机制特别适合简单的超时场景。但是需要注意的是time.After 会在后台启动一个定时器即使 select 提前退出定时器也不会立刻回收可能导致轻微的资源泄漏。 context context 是 golang 中用于传递可取消信号、超时时间等的工具。通过 context.WithTimeout 创建一个带有超时功能的上下文其 Done() 方法返回一个通道用于超时控制如下所示 package mainimport (contextfmttime )func main() {ch : make(chan string) // 创建通道ctx, cancel : context.WithTimeout(context.Background(), 2*time.Second)defer cancel() // cancel() 会释放与上下文相关的资源避免内存泄漏go func() {time.Sleep(3 * time.Second) // 模拟耗时操作ch - 任务完成}()select {case res : -ch:fmt.Println(res)case -ctx.Done(): // 监听超时信号fmt.Println(超时退出, ctx.Err())} }在上面的这个代码中 context.WithTimeout 创建了一个带有超时的上下文并设置超时时间为 2 秒。defer cancel() 确保在函数返回时调用 cancel()释放资源。如果任务在超时前完成cancel() 会被调用终止所有监听 ctx.Done() 的协程。 总的来说context 更适用于复杂的并发场景例如多个任务的超时控制、任务取消等time.After 更适用于简单的超时控制例如单个任务的超时。 关闭通道 使用 close 函数关闭通道如下所示 package mainfunc main() {ch : make(chan int, 5)close(ch) // 关闭通道 }需要注意的是当关闭通道后不能再向通道发送数据但可以继续从通道中接收数据。 检查通道是否关闭 接收数据时会有两个返回值一个是数据另一个是布尔值用于判断通道是否关闭如下所示 package mainimport (fmtsync )func main() {ch : make(chan int, 5)var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for {select {case data, ok : -ch: // data 表示数据ok 表示通道是否关闭if ok {// received: 11// received: 10// received: 15fmt.Println(received:, data)} else {fmt.Println(ch closed) // ch closedreturn}}}}()ch - 11ch - 10ch - 15close(ch) // 关闭通道wg.Wait() }获取通道长度 使用 len 函数获取通道的当前长度如下所示 package mainimport fmtfunc main() {ch : make(chan int, 5)fmt.Println(len(ch)) // 0ch - 10fmt.Println(len(ch)) // 1close(ch) // 关闭通道 }获取通道容量 使用 cap 函数获取通道的当前容量如下所示 package mainimport fmtfunc main() {ch : make(chan int, 5)fmt.Println(cap(ch)) // 5ch1 : make(chan int)fmt.Println(cap(ch1)) // 0close(ch) // 关闭通道 chclose(ch1) // 关闭通道 ch1 } 源码解析 针对通道的源代码进行解析从以下几个方面 创建通道发送数据接收数据关闭通道 通道结构体 源码位置src/runtime/chan.go type hchan struct {qcount uint // total data in the queuedataqsiz uint // size of the circular queuebuf unsafe.Pointer // points to an array of dataqsiz elementselemsize uint16closed uint32timer *timer // timer feeding this chanelemtype *_type // element typesendx uint // send indexrecvx uint // receive indexrecvq waitq // list of recv waiterssendq waitq // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another Gs status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex }qcount当前通道中存储的数据数量对于无缓冲通道qcount 的值通常为 0 或 1。dataqsiz对于缓冲通道dataqsiz 表示缓冲区可以存储的最大元素数量。对于无缓冲通道dataqsiz 为 0。buf指向缓冲区的指针。缓冲区是一个数组用于存储通道中的数据。仅对缓冲通道有效。无缓冲通道的 buf 为 nil。elemsize通道中每个元素的大小以字节为单位。closed标记通道是否已关闭。关闭的通道不能再次发送数据但可以继续接收数据直到缓冲区为空。timer指向一个定时器该定时器与通道相关联例如用于超时操作。elemtype指向通道中元素的类型信息用于在运行时检查通道中存储的数据类型是否正确。sendx用于管理缓冲区的环形队列记录下一次发送操作在缓冲区中的索引。recvx用于管理缓冲区的环形队列记录下一次接收操作在缓冲区中的索引。recvq存储等待接收的协程队列在发送操作中如果缓冲区已满且没有等待接收的协程则发送协程会被加入到 sendq。sendq存储等待发送的协程队列在接收操作中如果缓冲区为空且没有等待发送的协程则接收协程会被加入到 recvq。lock保护 hchan 结构体中所有字段的互斥锁确保通道操作的线程安全性。在发送、接收和关闭操作中lock 用于防止多个协程同时修改通道的状态。 创建通道 在 golang 的运行时中创建通道的代码会被编译为对 makechan 的调用。如下所示 package mainfunc main() {ch : make(chan int, 1) }编译成汇编代码如下所示 0x001a 00026 CALL runtime.makechan(SB)以上汇编代码只是部分截取请注意甄别。 makechan 函数是运行时中用于创建通道的核心函数。它初始化了一个 hchan 结构体并根据指定的类型和缓冲区大小分配内存。 源码位置src/runtime/chan.go 函数原型 func makechan(t *chantype, size int) *hchan {// ... }t *chantype通道的类型信息包含通道中元素的类型。size int通道的缓冲区大小。如果为 0则创建无缓冲通道如果大于 0则创建有缓冲通道。返回一个初始化后的 hchan 结构体指针。 函数内容 func makechan(t *chantype, size int) *hchan {elem : t.Elem // 从通道类型 t 中获取通道中元素的类型信息// compiler checks this but be safe.// 检查通道中元素的大小是否超过 64KB如果超过抛出异常if elem.Size_ 116 {throw(makechan: invalid channel element type)}// 确保 hchan 的大小是最大对齐单位的倍数并且元素的对齐要求不超过最大对齐单位// maxAlign 8if hchanSize%maxAlign ! 0 || elem.Align_ maxAlign {throw(makechan: bad alignment)}// 计算缓冲区的总大小elem.Size_ * sizemem, overflow : math.MulUintptr(elem.Size_, uintptr(size))// 检查是否发生溢出或者缓冲区大小超过最大分配限制或者缓冲区大小小于0抛出异常if overflow || mem maxAlloc-hchanSize || size 0 {panic(plainError(makechan: size out of range))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoGs are referenced from their owning thread so they cant be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.// 内存分配var c *hchanswitch {case mem 0:// 如果缓冲区大小为 0无缓冲通道或元素大小为 0仅分配 hchan 的内存// Queue or element size is zero.c (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.// 初始化通道的缓冲区的指针// buf 指向 hchan 结构体本身表示没有独立的缓冲区c.buf c.raceaddr()case !elem.Pointers():// 如果元素类型不包含指针如基本类型或数组将 hchan 和缓冲区分配在同一块内存中// Elements do not contain pointers.// Allocate hchan and buf in one call.c (*hchan)(mallocgc(hchanSizemem, nil, true))// 初始化通道的缓冲区的指针// 将 c.buf 指向 hchan 结构体之后的内存区域该区域用于存储缓冲区数据c.buf add(unsafe.Pointer(c), hchanSize)default:// 如果元素类型包含指针如结构体或切片分别分配 hchan 和缓冲区的内存// 因为 gc 需要跟踪指针类型的内存分配// Elements contain pointers.c new(hchan)// 初始化通道的缓冲区的指针c.buf mallocgc(mem, elem, true)}// 设置通道的元素大小c.elemsize uint16(elem.Size_)// 设置通道的元素类型c.elemtype elem// 设置通道的缓冲区大小c.dataqsiz uint(size)// 初始化通道的互斥锁lockInit(c.lock, lockRankHchan)// 如果启用了通道调试模式打印调试信息if debugChan {print(makechan: chan, c, ; elemsize, elem.Size_, ; dataqsiz, size, \n)}// 返回通道指针return c }具体分配内存 mallocgc 函数原型如下所示 func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {// ... }size分配的内存大小。typ分配内存的类型信息。needzero是否需要将分配的内存清零。 更多关于 mallocgc 函数的内容本文章不再赘述感兴趣的同学请关注后续文章《Golang 内存模型》。 发送数据 在 golang 的运行时中发送操作的代码会被编译为对 chansend1 的调用。如下所示 package mainfunc main() {ch : make(chan int, 1)ch - 1 }编译成汇编代码如下所示 0x001a 00026 CALL runtime.makechan(SB) # ... 0x0026 00038 CALL runtime.chansend1(SB)以上汇编代码只是部分截取请注意甄别。 而在 chansend1 的函数内部又是对 chansend 的调用如下所示 源码位置src/runtime/chan.go // entry point for c - x from compiled code. // //go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc()) }函数原型 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// ... }c *hchan指向通道的指针。ep unsafe.Pointer指向数据的指针。block bool是否阻塞发送。 如果 block true发送操作会在通道缓冲区满或没有接收方时阻塞直到可以发送数据。如果 block false发送操作是非阻塞的。如果当前无法发送数据缓冲区满或没有接收方函数会立即返回 false。 callerpc uintptr程序计数器PC的值主要用于调试和竞态检测。发送操作是否成功。 函数内容 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 检查通道指针 c 是否为 nilif c nil {if !block {// 如果通道为 nil 且是非阻塞发送直接返回 falsereturn false}// 如果是阻塞发送则协程会阻塞并抛出异常gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)throw(unreachable)}// 如果启用了通道调试模式打印调试信息if debugChan {print(chansend: chan, c, \n)}// 如果启用了竞态检测器记录对通道的读操作if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from ready for sending to// not ready for sending, even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasnt closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this threads view of c.closed and full().// 如果是非阻塞发送并且通道未关闭但缓冲区已满直接返回 false// 前置判断避免进入锁的开销if !block c.closed 0 full(c) {return false}// 如果启用了阻塞剖析记录当前时间戳var t0 int64if blockprofilerate 0 {t0 cputicks()}// 锁定通道的互斥锁确保操作的原子性lock(c.lock)// 如果通道已关闭解锁并抛出异常// 阅读到此处时考虑是否改为使用原子操作代替 lock/unlock// 可不可// 记录吧if c.closed ! 0 {unlock(c.lock)panic(plainError(send on closed channel))}// 如果有等待接收的协程直接将数据发送给接收协程跳过缓冲区if sg : c.recvq.dequeue(); sg ! nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).// 调用 send 函数完成数据传递并解锁send(c, sg, ep, func() { unlock(c.lock) }, 3)return true}// 如果没有等待接收的协程则判断缓冲区是否有空间if c.qcount c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.// 计算缓冲区中下一个写入位置的指针qp : chanbuf(c, c.sendx)if raceenabled {// 通知竞态检测器当前操作的上下文信息对缓冲区的写入操作被正确跟踪racenotify(c, c.sendx, nil)}// 将要发送的数据从 ep 复制到缓冲区的 qp 位置typedmemmove(c.elemtype, qp, ep)// 更新发送索引为下一个写入位置c.sendx// 如果 c.sendx 达到缓冲区大小则将其重置为 0实现环形缓冲区的效果if c.sendx c.dataqsiz {c.sendx 0}// 每次成功写入数据后c.qcount 增加 1c.qcount// 解锁通道的互斥锁unlock(c.lock)// 写入成功返回 truereturn true}// 如果缓冲区已满且是非阻塞发送解锁并返回 falseif !block {unlock(c.lock)return false}// 下面是阻塞发送的代码逻辑// Block on the channel. Some receiver will complete our operation for us.// 获取当前正在运行的协程的指针gp : getg()// 分配一个 sudog 对象sudog 是 golang 运行时中用于表示协程在通道操作中等待的结构体mysg : acquireSudog()mysg.releasetime 0if t0 ! 0 {mysg.releasetime -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 下面这些是初始化 sudog 对象属性mysg.elem ep // 指向要发送的数据指针mysg.waitlink nilmysg.g gp // 指向当前协程mysg.isSelect false // 标记是否是 select 操作mysg.c c // 指向当前通道gp.waiting mysg // 将当前协程的 sudog 对象设置为等待状态gp.param nil// 将 sudog 对象加入通道的发送等待队列c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that were about// to park on a channel. The window between when this Gs status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.// 标记当前协程即将阻塞在通道操作gp.parkingOnChan.Store(true)// 阻塞当前协程直到被接收操作唤醒gopark(chanparkcommit, unsafe.Pointer(c.lock), waitReasonChanSend, traceBlockChanSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs arent considered as roots of the// stack tracer.// 确保发送的数据在接收协程拷贝之前不会被回收因为 ep 指向的数据有可能是栈上的数据// 而栈上的数据可能在协程阻塞后被回收KeepAlive(ep)// 下面是协程被唤醒后的处理代码// someone woke us up.if mysg ! gp.waiting {throw(G waiting list is corrupted)}// 清空协程的等待状态gp.waiting nil // 标记协程不再阻塞在通道操作上gp.activeStackChans false// 发送操作是否成功mysg.success 表示 true 成功false 失败closed : !mysg.successgp.param nilif mysg.releasetime 0 {// 记录协程阻塞的时间blockevent(mysg.releasetime-t0, 2)}// 清空 sudog 对象的通道指针mysg.c nilreleaseSudog(mysg)if closed {// 如果通道未关闭但协程被唤醒抛出异常if c.closed 0 {throw(chansend: spurious wakeup)}// 通道在发送操作完成前被关闭报错 // 这也就是为什么说向一个已经关闭通道写数据会报错原因就在这里panic(plainError(send on closed channel))}// 发送操作成功完成return true }send 函数的作用是将数据直接发送给等待接收的协程并唤醒等待接收的协程。 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// 竞态检测if raceenabled {// 无缓冲通道if c.dataqsiz 0 {// 通知竞态检测器当前操作的上下文racesync(c, sg)} else {// 有缓冲通道// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.// 即使数据是直接发送的竞态检测器也会模拟数据通过缓冲区的流程// 调用 racenotify记录接收索引c.recvx的变化。racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)// 更新接收索引 c.recvx 和发送索引 c.sendx模拟环形缓冲区的行为c.recvxif c.recvx c.dataqsiz {c.recvx 0}c.sendx c.recvx // c.sendx (c.sendx1) % c.dataqsiz}}// 如果 sg.elem 不为 nil调用 sendDirect 将数据从发送方 ep 直接发送到接收方 sg.elemif sg.elem ! nil {sendDirect(c.elemtype, sg, ep)// 清空 sg.elem表示数据已发送sg.elem nil}// 获取等待接收的协程gp : sg.g// 调用解锁函数释放通道的锁unlockf()// 将 sudog 对象传递给接收协程用于后续处理gp.param unsafe.Pointer(sg)// 标记发送操作成功sg.success trueif sg.releasetime ! 0 {sg.releasetime cputicks()}// 将 gp 协程标记为可运行状态并将其加入调度队列中等待执行goready(gp, skip1) }sendDirect 函数的作用是将数据直接从发送方的内存位置发送到等待接收的协程 sudog 对象 。 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {// src is on our stack, dst is a slot on another stack.// Once we read sg.elem out of sg, it will no longer// be updated if the destinations stack gets copied (shrunk).// So make sure that no preemption points can happen between read use.// 获取接收方的内存位置以便将数据直接发送到该位置dst : sg.elem// 触发写屏障// 确保垃圾回收器能够正确追踪目标内存区域中的指针typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)// No need for cgo write barrier checks because dst is always// Go memory.// 这是一个底层函数用于在内存中安全地移动数据// dst 目标地址src 源地址memmove(dst, src, t.Size_) }关于更多内存管理的知识点请关注后续文章《Golang 内存模型》。 接收数据 在 golang 的运行时中接收数据操作的代码会被编译为对 chanrecv1 的调用。如下所示 package mainimport fmtfunc main() {ch : make(chan int, 1)ch - 1data : -chfmt.Println(data) }编译成汇编代码如下所示 0x001a 00026 CALL runtime.makechan(SB) # ... 0x0043 00067 CALL runtime.chanrecv1(SB)以上汇编代码只是部分截取请注意甄别。 而在 chanrecv1 的函数内部又是对 chanrecv 的调用如下所示 源码位置src/runtime/chan.go // entry points for - c from compiled code. // //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true) }函数原型 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// ... }c *hchan指向目标通道的指针。ep unsafe.Pointer指向接收数据的内存位置。如果为 nil表示仅检查通道状态而不接收数据。block bool是否允许阻塞接收。如果为 false则在无法立即接收数据时返回。selected bool表示是否成功接收数据。在 select 语句中用于标记是否选择了当前通道。received bool表示是否实际接收到数据。如果通道关闭且缓冲区为空received 为 false。 函数内容 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: dont need to check ep, as it is always on the stack// or is new memory allocated by reflect.// 如果启用了通道调试模式打印调试信息if debugChan {print(chanrecv: chan, c, \n)}// 如果通道为 nilif c nil {if !block {// 非阻塞接收直接返回return}// 如果是阻塞接收协程会阻塞并抛出异常gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)throw(unreachable)}// 如果通道关联了一个定时器,则调用 maybeRunChan 来处理定时器事件if c.timer ! nil {c.timer.maybeRunChan()}// 如果是非阻塞接收且通道为空// Fast path: check for failed non-blocking operation without acquiring the lock.if !block empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate open and empty. To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock. This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.// 检查通道是否关闭if atomic.Load(c.closed) 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.// 如果通道未关闭直接返回return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.// 如果通道已关闭并且为非阻塞并且缓冲区为空if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}// ep 不是空则清空目标内存 epif ep ! nil {typedmemclr(c.elemtype, ep)}// 返回成功接收数据(true)未实际接收到数据(false)return true, false}}var t0 int64if blockprofilerate 0 {t0 cputicks()}// 锁定通道的互斥锁确保操作的原子性lock(c.lock)// 如果通道已关闭if c.closed ! 0 {// 缓冲区为空if c.qcount 0 {if raceenabled {raceacquire(c.raceaddr())}// 这里是通道已经关闭了而且缓冲区已经为空// 考虑是否可以采用原子操作来代替这里的 lock/unlock 操作的操作// ...// 和通道关闭时发送者的检测同理// 虽然理论上可以通过原子操作来避免加锁但在实际实现中锁的使用是为了确保线程安全和一致性另外虽然原子操作避免了锁的开销但它们仍然有一定的性能开销// 即使使用原子操作也需要确保在检查 c.closed 和 c.qcount 时不会出现竞态条件。例如如果一个协程在检查 c.closed 后修改了 c.qcount可能会导致不一致的行为// 记录吧// 解锁unlock(c.lock)// 清空目标内存 epif ep ! nil {typedmemclr(c.elemtype, ep)}// 返回成功接收数据(true)未实际接收到数据(false)return true, false}// The channel has been closed, but the channels buffer have data.} else {// 通道未关闭并且有等待发送的协程// Just found waiting sender with not closed.if sg : c.sendq.dequeue(); sg ! nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add senders value to the tail of the queue (both map to// the same buffer slot because the queue is full).// 调用 recv 函数直接从发送协程接收数据跳过缓冲区// 解锁通道recv(c, sg, ep, func() { unlock(c.lock) }, 3)// 返回成功接收数据(true)实际接收到数据(true)return true, true}}// 如果缓冲区中有数据if c.qcount 0 {// Receive directly from queue// 得到缓冲区地址qp : chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}// 从缓冲区中读取数据到目标内存if ep ! nil {typedmemmove(c.elemtype, ep, qp)}// 清空缓冲区中的数据typedmemclr(c.elemtype, qp)// 更新接收索引 c.recvxc.recvx// 如果接收索引 缓冲区大小则从 0 重新开始if c.recvx c.dataqsiz {c.recvx 0}// 更新缓冲区计数 c.qcountc.qcount--// 解锁unlock(c.lock)// 返回成功接收数据(true)实际接收到数据(true)return true, true}// 如果是非阻塞接收且缓冲区为空解锁通道并返回未成功接收数据(false)未实际接收到数据(false)if !block {unlock(c.lock)return false, false}// 下面是进行阻塞接收数据代码逻辑// no sender available: block on this channel.// 获取当前协程gp : getg()// 分配一个 sudog 对象mysg : acquireSudog()// 下面是针对 mysg 对象属性的初始化mysg.releasetime 0if t0 ! 0 {mysg.releasetime -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem epmysg.waitlink nilgp.waiting mysgmysg.g gpmysg.isSelect falsemysg.c cgp.param nil// 将协程加入接收等待队列c.recvq.enqueue(mysg)// 如果通道关联了定时器调用 blockTimerChanif c.timer ! nil {blockTimerChan(c)}// Signal to anyone trying to shrink our stack that were about// to park on a channel. The window between when this Gs status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.// 标记协程即将阻塞在通道操作上gp.parkingOnChan.Store(true)// 调用 gopark 阻塞当前协程直到被发送操作唤醒gopark(chanparkcommit, unsafe.Pointer(c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)// 下面是协程被唤醒之后的操作流程// someone woke us up// 检查协程是否被正确唤醒if mysg ! gp.waiting {throw(G waiting list is corrupted)}// 如果通道关联了定时器调用 unblockTimerChanif c.timer ! nil {unblockTimerChan(c)}// 清理协程状态gp.waiting nilgp.activeStackChans falseif mysg.releasetime 0 {blockevent(mysg.releasetime-t0, 2)}// 获取接收操作的结果success : mysg.successgp.param nil// 释放 sudog 对象mysg.c nilreleaseSudog(mysg)// 返回接收操作结果return true, success }recv 函数用于从通道中读取数据并将其传递给等待接收的协程。 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// 如果通道无缓冲区if c.dataqsiz 0 {if raceenabled {racesync(c, sg)}// 并且目标内存位置不是 nil 则直接拷贝到目标位置if ep ! nil {// copy data from senderrecvDirect(c.elemtype, sg, ep)}} else {// 通道有缓冲区// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.// 计算缓冲区中接收位置的指针qp : chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// copy data from queue to receiverif ep ! nil {// 如果目标内存位置不是 nil 将数据从缓冲区复制到接收方的内存位置typedmemmove(c.elemtype, ep, qp)}// copy data from sender to queue// 将发送方的数据复制到缓冲区的尾部typedmemmove(c.elemtype, qp, sg.elem)// 增加接收索引并处理环形缓冲区的边界条件c.recvxif c.recvx c.dataqsiz {c.recvx 0}// 更新发送索引使其指向下一个可用位置c.sendx c.recvx // c.sendx (c.sendx1) % c.dataqsiz}// 清空 sudog 对象中的数据指针sg.elem nil// 获取发送协程的指针gp : sg.g// 调用解锁函数释放通道的锁unlockf()gp.param unsafe.Pointer(sg)// 标记接收操作成功sg.success trueif sg.releasetime ! 0 {sg.releasetime cputicks()}// 将 gp 协程标记为可运行状态并将其加入调度队列中等待执行goready(gp, skip1) }recvDirect 函数的作用是从发送方直接接收数据并将其复制到接收方的目标内存位置。 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {// dst is on our stack or the heap, src is on another stack.// The channel is locked, so src will not move during this// operation.// 指向发送方的内存位置src : sg.elem// 写屏障// 确保目标内存位置中的指针被垃圾回收器正确追踪typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)// 将数据从发送方的内存位置复制到接收方的内存位置memmove(dst, src, t.Size_) }关于更多内存管理的知识点请关注后续文章《Golang 内存模型》。 关闭通道 在 golang 的运行时中关闭通道操作的代码会被编译为对 closechan 的调用。如下所示 package mainimport fmtfunc main() {ch : make(chan int, 1)close(ch) }编译成汇编代码如下所示 0x001a 00026 CALL runtime.makechan(SB) # ... 0x0020 00032 CALL runtime.closechan(SB)以上汇编代码只是部分截取请注意甄别。 函数原型 源码位置src/runtime/chan.go func closechan(c *hchan) {// ... }c *hchan指向要关闭的通道的指针。 函数内容 func closechan(c *hchan) {// 如果通道为 nil直接抛出异常if c nil {panic(plainError(close of nil channel))}// 锁定通道的互斥锁确保关闭操作的原子性lock(c.lock)if c.closed ! 0 {// 如果通道已经关闭解锁抛出异常unlock(c.lock)panic(plainError(close of closed channel))}if raceenabled {callerpc : getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}// 将通道的 closed 标志设置为 1表示通道已关闭c.closed 1// 需要被唤醒的协程集合var glist gList// release all readersfor {// 遍历通道的接收等待队列释放所有等待接收的协程sg : c.recvq.dequeue()if sg nil {break}// 如果 sg.elem 不为 nilif sg.elem ! nil {// 清空接收方的内存位置typedmemclr(c.elemtype, sg.elem)sg.elem nil}if sg.releasetime ! 0 {sg.releasetime cputicks()}// 获取到接收者协程gp : sg.ggp.param unsafe.Pointer(sg)// 标记获取操作失败sg.success falseif raceenabled {raceacquireg(gp, c.raceaddr())}// 将协程加入到 glist 中稍后唤醒glist.push(gp)}// release all writers (they will panic)for {// 遍历通道的发送等待队列释放所有等待发送的协程sg : c.sendq.dequeue()if sg nil {break}// 清空 sg.elem避免内存泄漏sg.elem nilif sg.releasetime ! 0 {sg.releasetime cputicks()}// 获取到发送者协程gp : sg.ggp.param unsafe.Pointer(sg)// 标记发送操作失败sg.success falseif raceenabled {raceacquireg(gp, c.raceaddr())}// 将协程加入到 glist 中稍后唤醒glist.push(gp)}// 解锁通道的互斥锁unlock(c.lock)// Ready all Gs now that weve dropped the channel lock.// 遍历 glist唤醒所有等待的协程for !glist.empty() {gp : glist.pop()gp.schedlink 0// 调用 goready 将协程标记为可运行状态goready(gp, 3)} }撒花 如果本文对你有帮助就点关注或者留个 如果您有任何技术问题或者需要更多其他的内容请随时向我提问。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/919001.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

做个网站app吗南昌网站建设基本流程

2020/11/3操作记录搭建好Python的数据环境之后,接下来就是在Python代码中访问数据库我先在Navicat图形化界面创建一个数据库命名为pythontest,再在数据库中创建了一个表studentinfo有nid,nname,ngrade,nage四个字段,nid为主键递增。通过查询编…

wordpress国外主题网站wordpress的数据库名

作者主页:源码空间codegym 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 管理员:首页、个人中心、公告信息管理、班级管理、学生管理、教师管理、课程类型管理、课程信息管理、学生选课管理、作业布置管理…

合肥网站关键词张槎手机网站建设

今天天气很好,早上看了肺炎的病情如下图这个是丁香医生的统计数据,腾讯也出了一份统计数据,截止到写这篇文章的时候,已经确诊人数为 7766 人,但是我觉得丁香医生里面的截图内容比较多,也没有一些诱导二维码…

集团门户网站建设公司无锡seo关键词排名

文章目录 前言1.安装erlang 语言2.安装rabbitMQ3. 内网穿透3.1 安装cpolar内网穿透(支持一键自动安装脚本)3.2 创建HTTP隧道 4. 公网远程连接5.固定公网TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址 前言 RabbitMQ是一个在 AMQP(高级消息队列协议)基…

网站备案现场浏览器官网入口

目录 一、前言:误差与拟合 (一)经验误差 (二)过拟合、欠拟合 二、评估方法 (一)评估总体的思路 (二)如何划分训练集和测试集 1.留出法 2.k折交叉验证 3.自助法 …

个人网站变现如何查网站是哪家公司做的

任务:将asterisk1.8.24.0移植到rt5350上面 交叉工具链:mipsel-linux(3.4.2) 1. 配置asterisk,执行./configure --hostmipsel-linux 找不到termcap和libxml2,分别对他们交叉编译,然后将生成的库拷贝到交叉工具链的lib…

成都 网站建设 公司汉中做网站

开源项目推荐 Reloader Reloader 是一个 Kubernetes 控制器,用于监控 ConfigMap 和 Secrets 中的变化,并对 Pod 及其相关部署、StatefulSet、DaemonSet 和 DeploymentConfig 进行滚动升级! Spegel Spegel 在瑞典语中意为镜像,…

Unity渲染时的排序规则

按照我们的常规理解,渲染顺序应该是: 1.不透明物体按照相机空间进行Z轴排序,从前往后排,先渲染前面再渲染后面,避免造成过多的OverDraw 2.绘制天空盒,天空盒在要不透明物体之后渲染,避免造成OverDraw 3.绘制透明…

商业网站开发的实训小结怎么写策划公司属于什么行业

微信搜一搜舒适11今天这篇文章,小壹就向大家科普一下空调和新风系统,告诉大家为什么装了空调还要装新风机。1、空调是什么? 对此大家都能够脱口而出:空调就是用来制冷或制热的机器,能够改变室内温度,让我们…

单位门户网站wordpress本地utc

官方文档地址:Index lifecycle actions | Elasticsearch Guide [7.12] | Elastic 索引生命周期操作(index lifecycle actions) Allocate 将分片移动到具有不同性能特征的节点并减少副本数量 Delete 永久删除索引。 Force merge 减少索…

建平县网站建设wordpress访问量统计

Python优化算法篇 scipy.optimize Scipy是一个用于数学、科学和工程的开源库,它建立在NumPy的基础上,提供了一系列强大的科学计算工具。在Scipy中,优化模块(scipy.optimize)提供了多种优化算法,用于求解最小…

专门做行业分析的网站ps做的网站

目录 1 创建工程3 配置文件4 静态资源 之前我们已经学习的Spring、SpringMVC、Mabatis、Maven,详细讲解了Spring、SpringMVC、Mabatis整合SSM的方案和案例,上一节我们学习了SpringBoot的开发步骤、工程构建方法以及工程的快速启动,从这一节开…

网站开发和企业级开发有什么区别wordpress languages

简介: 上海致拓软件有限公司利用云钉低代码应用构建平台——钉钉宜搭为合安建筑快速、低成本地搭建了个性化的项目管理系统,着力帮助合安建筑解决业务在线场景,形成场景化的工程项目管理数字化解决方案。 一封由工程公司发给项目管理数字化实…

US$79 Latest Version SBB Key Programmer V46.02 Multi-language

Latest Version SBB Key Programmer V46.02 Multi-languageSBB Key Programmer Features:1.Software Version: 46.022.Language: English,Italiano,Deutsch,Francais,Espaniol,Portugues,Turkish3.Support Toyota G Ch…

AI智慧的三重跃升:从「数理魔兽」到「悬荡悟空」的文明协作者

AI智慧的三重跃升:从「数理魔兽」到「悬荡悟空」的文明协作者 我们正站在一个岔路口。一边,是追求极致效率、基于数理逻辑的“AI魔兽”,它精准迅猛,却在复杂的人类价值困境前显得笨拙而冷漠;另一边,是多条人迹罕…

平面设计做网站的步骤凡科网建站模板

原文链接:http://blog.csdn.net/ivan_pig/article/details/8257365 -------------------------------------------------- 4 数组相关操作 4.1 编写一段代码,将a设置为一个n个随机整数的数组,要求随机数介于0(包含)和n(不包含)之间 random和…

织梦移动端网站模板下载地址网页素材及网站架构制作

ip addr和ifconfig都是用于配置和管理网络接口的工具 1. ifconfig ifconfig是较旧的网络配置工具,属于net-tools套件的一部分。 该命令主要用于配置、显示和控制网络接口的参数,如IP地址、子网掩码、广播地址等。 ifconfig命令的功能相对有限&#xff…

新学期每日总结(第 5天)

今日 相较昨日复习了Java相关知识

网站怎么做实名认证视觉设计包括哪些

一、摘要在很多业务的系统中,我们常常需要定时的执行一些任务,例如定时发短信、定时变更数据、定时发起促销活动等等。在上篇文章中,我们简单的介绍了定时任务的使用方式,不同的架构对应的解决方案也有所不同,总结起来…

苏州新区网站建设企业解决方案平台

文章目录 文件菜单子部件开关 Python绘图系统: 前置源码: Python打造动态绘图系统📈一 三维绘图系统 📈二 多图绘制系统📈三 坐 标 轴 定 制📈四 定制绘图风格 📈五 数据生成导入📈…