Go语言实现生产者-消费者问题的多种方法

Go语言实现生产者-消费者问题的多种方法

生产者-消费者问题是并发编程中的经典问题,涉及多个生产者生成数据,多个消费者消费数据,二者通过缓冲区(队列)进行协调,保证数据的正确传递和同步。本文将从简单到复杂,使用不同的 Go 语言并发原语实现生产者-消费者模型,并详细介绍所用知识点。


目录

  1. 方法一:使用无缓冲 Channel(同步通信)
  2. 方法二:使用带缓冲 Channel(异步通信)
  3. 方法三:使用 sync.Mutex + 条件变量 sync.Cond 实现缓冲区
  4. 方法四:使用 Channel + select 实现多路复用和超时控制

方法一:使用无缓冲 Channel(同步通信)

知识点

  • 无缓冲 Channel:发送和接收必须同时准备好,适合严格同步的场景。
  • Goroutine:轻量级线程,使用 go 关键字启动。
  • sync.WaitGroup:等待所有 goroutine 完成。

代码示例

package mainimport ("fmt""sync""time"
)func producer(id int, ch chan<- int, wg *sync.WaitGroup) {defer wg.Done()for i := 0; i < 3; i++ {item := id*100 + ifmt.Printf("生产者 %d 生产了产品 %d\n", id, item)ch <- item                         // 发送数据,阻塞直到有消费者接收time.Sleep(100 * time.Millisecond) // 模拟生产时间}
}func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {defer wg.Done()for item := range ch {fmt.Printf("消费者 %d 消费了产品 %d\n", id, item)time.Sleep(150 * time.Millisecond) // 模拟消费时间}
}func main() {ch := make(chan int) // 无缓冲 channelvar wg sync.WaitGroup// 启动生产者for i := 1; i <= 2; i++ {wg.Add(1)go producer(i, ch, &wg)}// 启动消费者for i := 1; i <= 2; i++ {wg.Add(1)go consumer(i, ch, &wg)}// 等待生产者完成wg.Wait()// 关闭 channel,通知消费者结束close(ch)// 由于消费者在 range 中消费,关闭后会退出// 这里主 goroutine 退出,程序结束
}

说明

  • 生产者发送数据时会阻塞,直到消费者接收,保证同步。
  • 适合生产和消费速度相近的场景。
  • 关闭 channel 后,消费者会自动退出。

方法二:使用带缓冲 Channel(异步通信)

知识点

  • 带缓冲 Channel:允许生产者先发送一定数量数据,消费者稍后接收,提升并发效率。
  • 生产者和消费者速度不匹配时,缓冲区能暂存数据,减少阻塞。

代码示例

package mainimport ("fmt""math/rand""sync""time"
)func producer(id int, ch chan<- int, wg *sync.WaitGroup) {defer wg.Done()for i := 0; i < 5; i++ {item := id*100 + ifmt.Printf("生产者 %d 生产了产品 %d\n", id, item)ch <- itemtime.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)}
}func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {defer wg.Done()for item := range ch {fmt.Printf("消费者 %d 消费了产品 %d\n", id, item)time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)}
}func main() {rand.Seed(time.Now().UnixNano())ch := make(chan int, 3) // 带缓冲 channel,缓冲区大小为3var wgProducers sync.WaitGroupvar wgConsumers sync.WaitGroup// 启动生产者for i := 1; i <= 3; i++ {wgProducers.Add(1)go producer(i, ch, &wgProducers)}// 启动消费者for i := 1; i <= 2; i++ {wgConsumers.Add(1)go consumer(i, ch, &wgConsumers)}// 等待所有生产者完成wgProducers.Wait()// 关闭 channel,通知消费者没有更多数据close(ch)// 等待所有消费者完成wgConsumers.Wait()fmt.Println("所有生产者和消费者已完成工作,程序结束")
}

说明

  • 生产者可以先发送数据到缓冲区,不必等待消费者立即接收。
  • 缓冲区大小影响生产者和消费者的阻塞情况。
  • 关闭 channel 后,消费者会自动退出。

方法三:使用 sync.Mutex + sync.Cond 实现缓冲区(手动实现队列)

知识点

  • sync.Mutex:互斥锁,保护共享资源。
  • sync.Cond:条件变量,支持等待和通知机制。
  • 手动实现缓冲区:用切片模拟队列,生产者和消费者通过条件变量协调。

代码示例

package mainimport ("fmt""sync""time"
)type Buffer struct {items    []intsize     intlock     sync.MutexnotEmpty *sync.CondnotFull  *sync.Cond
}func NewBuffer(size int) *Buffer {b := &Buffer{items: make([]int, 0, size),size:  size,}b.notEmpty = sync.NewCond(&b.lock)b.notFull = sync.NewCond(&b.lock)return b
}func (b *Buffer) Put(item int) {b.lock.Lock()defer b.lock.Unlock()// 如果缓冲区满,等待 notFull 信号for len(b.items) == b.size {b.notFull.Wait()}b.items = append(b.items, item)fmt.Printf("生产了产品 %d,缓冲区大小: %d\n", item, len(b.items))// 通知消费者缓冲区非空b.notEmpty.Signal()
}func (b *Buffer) Get() int {b.lock.Lock()defer b.lock.Unlock()// 如果缓冲区空,等待 notEmpty 信号for len(b.items) == 0 {b.notEmpty.Wait()}item := b.items[0]b.items = b.items[1:]fmt.Printf("消费了产品 %d,缓冲区大小: %d\n", item, len(b.items))// 通知生产者缓冲区非满b.notFull.Signal()return item
}func producer(id int, b *Buffer, count int, wg *sync.WaitGroup) {defer wg.Done()for i := 0; i < count; i++ {item := id*100 + ib.Put(item)time.Sleep(100 * time.Millisecond)}
}func consumer(id int, b *Buffer, wg *sync.WaitGroup, done <-chan struct{}) {defer wg.Done()for {select {case <-done:returndefault:item := b.Get()time.Sleep(150 * time.Millisecond)fmt.Printf("消费者 %d 处理了产品 %d\n", id, item)}}
}func main() {bufferSize := 5b := NewBuffer(bufferSize)var wgProducers sync.WaitGroupvar wgConsumers sync.WaitGroupdone := make(chan struct{})// 启动生产者numProducers := 2produceCount := 10for i := 1; i <= numProducers; i++ {wgProducers.Add(1)go producer(i, b, produceCount, &wgProducers)}// 启动消费者numConsumers := 3for i := 1; i <= numConsumers; i++ {wgConsumers.Add(1)go consumer(i, b, &wgConsumers, done)}// 等待生产者完成wgProducers.Wait()// 生产结束,等待缓冲区清空for {b.lock.Lock()empty := len(b.items) == 0b.lock.Unlock()if empty {break}time.Sleep(100 * time.Millisecond)}// 通知消费者退出close(done)// 等待消费者退出wgConsumers.Wait()fmt.Println("所有生产者和消费者已完成工作,程序结束")
}

说明

  • 手动实现缓冲区,生产者和消费者通过条件变量等待和通知。
  • 适合需要自定义缓冲区行为的场景。
  • 需要额外处理消费者退出逻辑。

方法四:使用 Channel + select 实现多路复用和超时控制

知识点

  • select:Go 语言中用于监听多个 channel 的操作,支持超时和默认分支。
  • 超时控制:防止 goroutine 永久阻塞。
  • 多路复用:同时监听多个事件。

代码示例

package mainimport ("fmt""math/rand""time"
)func producer(id int, ch chan<- int, done <-chan struct{}) {for i := 0; i < 10; i++ {item := id*100 + iselect {case ch <- item:fmt.Printf("生产者 %d 生产了产品 %d\n", id, item)case <-done:fmt.Printf("生产者 %d 收到退出信号\n", id)return}time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)}
}func consumer(id int, ch <-chan int, done <-chan struct{}) {for {select {case item, ok := <-ch:if !ok {fmt.Printf("消费者 %d 发现通道关闭,退出\n", id)return}fmt.Printf("消费者 %d 消费了产品 %d\n", id, item)time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)case <-done:fmt.Printf("消费者 %d 收到退出信号\n", id)returncase <-time.After(2 * time.Second):fmt.Printf("消费者 %d 超时退出\n", id)return}}
}func main() {rand.Seed(time.Now().UnixNano())ch := make(chan int, 5)done := make(chan struct{})// 启动生产者for i := 1; i <= 3; i++ {go producer(i, ch, done)}// 启动消费者for i := 1; i <= 2; i++ {go consumer(i, ch, done)}// 运行一段时间后关闭生产者time.Sleep(5 * time.Second)close(done) // 通知所有 goroutine 退出// 关闭 channel,通知消费者没有更多数据close(ch)// 主 goroutine 等待一段时间让所有 goroutine 退出time.Sleep(3 * time.Second)fmt.Println("程序结束")
}

说明

  • 使用 select 监听多个 channel,支持超时和退出信号。
  • 生产者和消费者都能响应退出通知,优雅结束。
  • 适合复杂场景下的生产者-消费者模型。

总结

方法复杂度关键知识点适用场景
方法一简单无缓冲 channel,阻塞同步生产消费速度相近,简单同步
方法二中等带缓冲 channel,异步通信生产消费速度不匹配,提升效率
方法三较复杂sync.Mutex + sync.Cond,手动缓冲区需要自定义缓冲区行为,复杂同步
方法四复杂select 多路复用,超时控制,退出通知复杂场景,需多事件监听和优雅退出

Go 语言提供了丰富的并发原语,能够灵活实现生产者-消费者模型。根据实际需求和复杂度选择合适的方法,能让程序更高效、健壮。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/905762.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Opencv】canny边缘检测提取中心坐标

采用opencv 对图像中的小球通过canny边缘检测的方式进行提取坐标 本文介绍了如何使用OpenCV对图像中的小球进行Canny边缘检测&#xff0c;并通过Zernike矩进行亚像素边缘检测&#xff0c;最终拟合椭圆以获取小球的精确坐标。首先&#xff0c;图像被转换为灰度图并进行高斯平滑…

蓝桥杯12届国B 123

题目描述 小蓝发现了一个有趣的数列&#xff0c;这个数列的前几项如下&#xff1a; 1,1,2,1,2,3,1,2,3,4,⋯ 小蓝发现&#xff0c;这个数列前 1 项是整数 1&#xff0c;接下来 2 项是整数 1 至 2&#xff0c;接下来 3 项是整数 1 至 3&#xff0c;接下来 4 项是整数 1 至 4&…

鸿蒙OSUniApp 制作动态加载的瀑布流布局#三方框架 #Uniapp

使用 UniApp 制作动态加载的瀑布流布局 前言 最近在开发一个小程序项目时&#xff0c;遇到了需要实现瀑布流布局的需求。众所周知&#xff0c;瀑布流布局在展示不规则尺寸内容&#xff08;如图片、商品卡片等&#xff09;时非常美观和实用。但在实际开发过程中&#xff0c;我…

ThinkStation图形工作站进入BIOS方法

首先视频线需要接在独立显卡上&#xff0c;重新开机&#xff0c;持续按F1&#xff0c;或者显示器出来lenovo的logo的时候按F1&#xff0c;这样就进到bios里了。联*想*坑&#xff0c;戴尔贵。靠。

【源码级开发】Qwen3接入MCP,企业级智能体开发实战!

Qwen3接入MCP智能体开发实战&#xff08;上&#xff09; 一、MCP技术与Qwen3原生MCP能力介绍 1.智能体开发核心技术—MCP 1.1 Function calling技术回顾 如何快速开发一款智能体应用&#xff0c;最关键的技术难点就在于如何让大模型高效稳定的接入一些外部工具。而在MCP技术…

Linux下载与安装

一、YUM 1.1 什么是YUM 在CentOS系统中&#xff0c;软件管理方式通常有三种方式&#xff1a;rpm安装、yum安装以及编译&#xff08;源码&#xff09;安装。 编译安装&#xff0c;从过程上来讲比较麻烦&#xff0c;包需要用户自行下载&#xff0c;下载的是源码包&#xff0c;需…

PostgreSQL中的全页写

一、概述 在PGSQL数据库中&#xff0c;默认的页面大小为8KB&#xff0c;但是磁盘buffer的大小为4KB&#xff0c;扇区大小为512B。这就导致在操作系统的角度看数据库的写操作&#xff0c;其实并不是一种原子操作。如果操作系统发生了系统级别的故障&#xff0c;此时正好操作系统…

WEB安全--Java安全--shiro550反序列化漏洞

一、前言 什么是shiro&#xff1f; shiro是一个Apache的Java安全框架 它的作用是什么&#xff1f; Apache Shiro 是一个强大且灵活的 Java 安全框架&#xff0c;用于处理身份验证、授权、密码管理以及会话管理等功能 二、shiro550反序列化原理 1、用户首次登录并勾选记住密码…

2024 睿抗机器人开发者大赛CAIP-编程技能赛-专科组(国赛)解题报告 | 珂学家

前言 题解 2024 睿抗机器人开发者大赛CAIP-编程技能赛-专科组&#xff08;国赛&#xff09;&#xff0c;陈越姐姐出题。 国赛比省赛&#xff0c;难度增强了不少&#xff0c;题目就剩下4个题了。 涉及堆栈&#xff0c;hash表&#xff0c;优先队列等高阶数据结构的使用&#x…

15 C 语言字符类型详解:转义字符、格式化输出、字符类型本质、ASCII 码编程实战、最值宏汇总

1 字符类型概述 在 C 语言中&#xff0c;字符类型 char 用于表示单个字符&#xff0c;例如一个数字、一个字母或一个符号。 char 类型的字面量是用单引号括起来的单个字符&#xff0c;例如 A、5 或 #。 当需要表示多个字符组成的序列时&#xff0c;就涉及到了字符串。在 C 语言…

操作系统-锁/内存/中断/IO

文章目录 锁自旋锁互斥锁悲观锁和乐观锁 内存管理物理/虚拟内存页表段表虚拟内存布局写时复制copy on writebrk&#xff0c;mmap页面置换算法 中断中断分类中断流程 网络I/OI/O模型服务器处理并发请求 锁 自旋锁 自旋锁是一种基于忙等待&#xff08;Busy-Waiting&#xff09;…

割点与其例题

割点 定义&#xff1a; 若一个点在图中被去掉后&#xff0c;图的连通块个数增加&#xff0c;那么这个点就被称为“割点”。如下图所示红点。 定义说白了就是若去掉一个点&#xff0c;图被“断开”的点称为割点。 朴素算法&#xff1a; 枚举每个点 u。遍历图&#xff0c;如果…

图卷积神经网络(Graph Convolutional Network, GCN)

最近看论文看到了图卷积神经网络的内容&#xff0c;之前整理过图神经网络的内容&#xff0c;这里再补充一下&#xff0c;方便以后查阅。 图卷积神经网络&#xff08;Graph Convolutional Network, GCN&#xff09; 图卷积神经网络1. 什么是图卷积神经网络&#xff08;GCN&#…

安装win11硬盘分区MBR还是GPT_装win11系统分区及安装教程

最近有网友问我,装win11系统分区有什么要求装win11系统硬盘分区用mbr还是GPT&#xff1f;我们知道现在的引导模式有uefi和legacy两种引导模式&#xff0c;如果采用的是uefi引导模式&#xff0c;分区类型对应的就是gpt分区(guid)&#xff0c;如果引导模式采用的是legacy&#xf…

服务培训QDA 的安装调试方法,硬件模块的讲解和软件控制台使用及系统测试

#服务培训##质谱仪##软件控制##硬件模块# 以下是关于Waters QDa单杆液质质谱仪的安装调试、硬件模块讲解以及软件控制台使用培训的相关内容&#xff1a; 安装调试 场地准备&#xff1a;用户需要提前准备好实验室&#xff0c;确保实验室环境符合仪器的要求&#xff0c;如温度、…

在K8S集群中部署EFK日志收集

目录 引言环境准备安装自定义资源部署ElasticsearchMaster 节点与 Data 节点的区别生产优化建议安装好以后测试ES是否正常部署Fluentd测试filebeat是否正常推送日志部署Kibana获取账号密码&#xff0c;账号是&#xff1a;elastic集群测试 引言 系统版本为 Centos7.9内核版本为…

polarctf-web-[rce1]

考点&#xff1a; (1)RCE(exec函数) (2)空格绕过 (3)执行函数(exec函数) (4)闭合(ping命令闭合) 题目来源&#xff1a;Polarctf-web-[rce1] 解题&#xff1a; 这段代码实现了一个简单的 Ping 测试工具&#xff0c;用户可以通过表单提交一个 IP 地址&#xff0c;服务器会执…

【串流VR手势】Pico 4 Ultra Enterprise 在 SteamVR 企业串流中无法识别手势的问题排查与解决过程(Pico4UE串流手势问题)

写在前面的话 此前&#xff08;用Pico 4U&#xff09;接入了MRTK3&#xff0c;现项目落地需要部署&#xff0c;发现串流场景中&#xff0c;Pico4UE的企业串流无法正常识别手势。&#xff08;一体机方式部署使用无问题&#xff09; 花了半小时解决&#xff0c;怕忘&#xff0c;…

ES(Elasticsearch)的应用与代码示例

Elasticsearch应用与代码示例技术文章大纲 一、引言 Elasticsearch在现代化应用中的核心作用典型应用场景分析&#xff08;日志分析/全文检索/数据聚合&#xff09; 二、环境准备(前提条件) Elasticsearch 8.x集群部署要点IK中文分词插件配置指南Ingest Attachment插件安装…

临床决策支持系统的提示工程优化路径深度解析

引言 随着人工智能技术在医疗领域的迅猛发展,临床决策支持系统(CDSS)正经历从传统规则引擎向智能提示工程的范式转变。在这一背景下,如何构建既符合循证医学原则又能适应个体化医疗需求的CDSS成为医学人工智能领域的核心挑战。本报告深入剖析了临床决策支持系统中提示工程的…