goframe开发一个企业网站 rabbitmq队例15

RabbitMQ消息队列封装

在目录internal/pkg/rabbitmq/rabbitmq.go

# 消息队列配置
mq:# 消息队列类型: rocketmq 或 rabbitmqtype: "rabbitmq"# 是否启用消息队列enabled: truerocketmq:nameServer: "127.0.0.1:9876"producerGroup: "myProducerGroup"consumerGroup: "myConsumerGroup"brokerAddress: "127.0.0.1:10911"  # 添加 broker 地址rabbitmq:url: "amqp://wanghaibin:wanghaibin@127.0.0.1:5672/"exchange: "gf_exchange"dlx_exchange: "gf_dlx_exchange"    # 新增:死信交换机queue: "gf_queue"delay_queue: "gf_delay_queue"      # 新增:延迟队列routingKey: "gf_key"vhost: "/"
package rabbitmqimport ("context""fmt""time""github.com/gogf/gf/v2/frame/g"amqp "github.com/rabbitmq/amqp091-go"
)var (// conn RabbitMQ连接实例conn *amqp.Connection// channel RabbitMQ通道实例channel *amqp.Channel
)// Initialize 初始化 RabbitMQ 连接和通道
// 包括:建立连接、创建通道、声明交换机和队列、建立绑定关系
func Initialize() {var err errorctx := context.Background()// 从配置文件获取RabbitMQ连接URLurl := g.Cfg().MustGet(ctx, "rabbitmq.url").String()// 建立RabbitMQ连接conn, err = amqp.Dial(url)if err != nil {g.Log().Fatalf(ctx, "Failed to connect to RabbitMQ: %v", err)}// 创建通道channel, err = conn.Channel()if err != nil {g.Log().Fatalf(ctx, "Failed to open channel: %v", err)}// 1. 声明主交换机// 类型:direct,持久化:true,自动删除:false,内部的:false,非阻塞:falseerr = channel.ExchangeDeclare(g.Cfg().MustGet(ctx, "rabbitmq.exchange").String(),"direct", // 交换机类型true,     // 持久化false,    // 自动删除false,    // 内部的false,    // 非阻塞nil,      // 参数)if err != nil {g.Log().Fatalf(ctx, "Failed to declare main exchange: %v", err)}// 2. 声明死信交换机(DLX)// 用于处理无法被正常消费的消息err = channel.ExchangeDeclare(g.Cfg().MustGet(ctx, "rabbitmq.dlx_exchange").String(),"direct",true,false,false,false,nil,)if err != nil {g.Log().Fatalf(ctx, "Failed to declare DLX exchange: %v", err)}// 3. 声明主队列// 持久化:true,非自动删除,非排他,非阻塞_, err = channel.QueueDeclare(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),true,  // 持久化false, // 自动删除false, // 排他的false, // 非阻塞nil,   // 参数)if err != nil {g.Log().Fatalf(ctx, "Failed to declare main queue: %v", err)}// 4. 声明延迟队列// 配置死信交换机参数args := amqp.Table{"x-dead-letter-exchange":    g.Cfg().MustGet(ctx, "rabbitmq.dlx_exchange").String(),"x-dead-letter-routing-key": g.Cfg().MustGet(ctx, "rabbitmq.routingKey").String(),}_, err = channel.QueueDeclare(g.Cfg().MustGet(ctx, "rabbitmq.delay_queue").String(),true,false,false,false,args,)if err != nil {g.Log().Fatalf(ctx, "Failed to declare delay queue: %v", err)}// 5. 绑定主队列到主交换机err = channel.QueueBind(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),      // 队列名g.Cfg().MustGet(ctx, "rabbitmq.routingKey").String(), // 路由键g.Cfg().MustGet(ctx, "rabbitmq.exchange").String(),   // 交换机名false,nil,)if err != nil {g.Log().Fatalf(ctx, "Failed to bind main queue: %v", err)}// 6. 绑定主队列到死信交换机err = channel.QueueBind(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),g.Cfg().MustGet(ctx, "rabbitmq.routingKey").String(),g.Cfg().MustGet(ctx, "rabbitmq.dlx_exchange").String(),false,nil,)if err != nil {g.Log().Fatalf(ctx, "Failed to bind queue to DLX: %v", err)}g.Log().Info(ctx, "RabbitMQ initialized successfully")
}// PublishMessage 发布消息到RabbitMQ
// 参数:
//   - ctx: 上下文
//   - message: 要发送的消息内容
// 返回:
//   - error: 发送错误,如果成功则为nil
func PublishMessage(ctx context.Context, message string) error {// 创建带超时的上下文ctxTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)defer cancel()// 发布消息到指定的交换机和路由err := channel.PublishWithContext(ctxTimeout,g.Cfg().MustGet(ctx, "rabbitmq.exchange").String(),g.Cfg().MustGet(ctx, "rabbitmq.routingKey").String(),false, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)if err != nil {return fmt.Errorf("failed to publish message: %v", err)}return nil
}// ConsumeMessages 消费队列中的消息
// 参数:
//   - ctx: 上下文
//   - handler: 消息处理函数
// 返回:
//   - error: 消费错误,如果成功则为nil
func ConsumeMessages(ctx context.Context, handler func(string) error) error {messages, err := channel.Consume(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),"",    // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil,   // args)if err != nil {return fmt.Errorf("failed to register a consumer: %v", err)}// 启动goroutine处理消息go func() {for msg := range messages {err := handler(string(msg.Body))if err != nil {g.Log().Errorf(ctx, "Error handling message: %v", err)msg.Nack(false, true) // 处理失败,消息重新入队} else {msg.Ack(false) // 处理成功,确认消息}}}()return nil
}// Cleanup 清理RabbitMQ连接和通道
func Cleanup() {if channel != nil {channel.Close()}if conn != nil {conn.Close()}
}// GetChannel 获取RabbitMQ通道实例
func GetChannel() *amqp.Channel {return channel
}// PurgeQueue 清空指定队列中的所有消息
// 参数:
//   - ctx: 上下文
// 返回:
//   - error: 清空错误,如果成功则为nil
func PurgeQueue(ctx context.Context) error {_, err := channel.QueuePurge(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),false, // no-wait)return err
}// PublishDelayMessage 发送延迟消息
// 参数:
//   - ctx: 上下文
//   - message: 消息内容
//   - delaySeconds: 延迟秒数
// 返回:
//   - error: 发送错误,如果成功则为nil
func PublishDelayMessage(ctx context.Context, message string, delaySeconds int) error {return channel.PublishWithContext(ctx,"",                                                    // 默认交换机g.Cfg().MustGet(ctx, "rabbitmq.delay_queue").String(), // 延迟队列false,false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),Expiration:  fmt.Sprintf("%d", delaySeconds*1000), // 转换为毫秒},)
}// GetQueueLength 获取队列中的消息数量
// 参数:
//   - ctx: 上下文
// 返回:
//   - int: 消息数量
//   - error: 获取错误,如果成功则为nil
func GetQueueLength(ctx context.Context) (int, error) {queue, err := channel.QueueInspect(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),)if err != nil {return 0, fmt.Errorf("failed to inspect queue: %v", err)}return queue.Messages, nil
}
logic逻辑的实现
package rabbitmqmsgimport ("context""fmt""gf_new_web/internal/pkg/rabbitmq""gf_new_web/internal/service""github.com/gogf/gf/v2/frame/g"
)// sRabbitmqMsg RabbitMQ消息服务结构体
type sRabbitmqMsg struct{}// New 创建新的RabbitMQ消息服务实例
func New() *sRabbitmqMsg {return &sRabbitmqMsg{}
}// init 初始化函数,在包加载时自动注册RabbitMQ消息服务
func init() {service.RegisterRabbitmqMsg(New())
}// SendMessage 发送普通消息到RabbitMQ
// 参数:
//   - ctx: 上下文信息
//   - message: 要发送的消息内容
// 返回:
//   - error: 发送错误,成功则为nil
func (s *sRabbitmqMsg) SendMessage(ctx context.Context, message string) error {return rabbitmq.PublishMessage(ctx, message)
}// SendDelayMessage 发送延迟消息到RabbitMQ
// 参数:
//   - ctx: 上下文信息
//   - message: 要发送的消息内容
//   - delaySeconds: 延迟时间(秒)
// 返回:
//   - error: 发送错误,成功则为nil
func (s *sRabbitmqMsg) SendDelayMessage(ctx context.Context, message string, delaySeconds int) error {return rabbitmq.PublishDelayMessage(ctx, message, delaySeconds)
}// SendBatchMessages 批量发送消息到RabbitMQ
// 参数:
//   - ctx: 上下文信息
//   - messages: 消息内容数组
// 返回:
//   - error: 发送错误,成功则为nil
// 注意:任一消息发送失败都会导致整个批次失败
func (s *sRabbitmqMsg) SendBatchMessages(ctx context.Context, messages []string) error {for _, msg := range messages {if err := rabbitmq.PublishMessage(ctx, msg); err != nil {return err}}return nil
}// GetQueueLength 获取队列当前的消息数量
// 参数:
//   - ctx: 上下文信息
// 返回:
//   - int: 队列中的消息数量
//   - error: 获取错误,成功则为nil
func (s *sRabbitmqMsg) GetQueueLength(ctx context.Context) (int, error) {queue, err := rabbitmq.GetChannel().QueueInspect(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),)if err != nil {return 0, fmt.Errorf("failed to inspect queue: %v", err)}return queue.Messages, nil
}// PurgeQueue 清空队列中的所有消息
// 参数:
//   - ctx: 上下文信息
// 返回:
//   - error: 清空错误,成功则为nil
func (s *sRabbitmqMsg) PurgeQueue(ctx context.Context) error {return rabbitmq.PurgeQueue(ctx)
}// handleMessage 处理接收到的单条消息
// 参数:
//   - message: 消息内容
// 返回:
//   - error: 处理错误,成功则为nil
// 注意:这是内部方法,实现具体的消息处理逻辑
func (s *sRabbitmqMsg) handleMessage(message string) error {// 记录接收到的消息g.Log().Info(context.Background(), "收到消息:", message)// TODO: 在这里添加实际的消息处理逻辑return nil
}// Initialize 初始化消息消费处理
// 参数:
//   - ctx: 上下文信息
// 返回:
//   - error: 初始化错误,成功则为nil
// 功能:启动消息消费者,并设置消息处理函数
func (s *sRabbitmqMsg) Initialize(ctx context.Context) error {return rabbitmq.ConsumeMessages(ctx, func(msg string) error {return s.handleMessage(msg)})
}

生成service ,不再写上

controller代码

package frontimport ("fmt""gf_new_web/internal/service""time""github.com/gogf/gf/v2/frame/g""github.com/gogf/gf/v2/net/ghttp"
)var (RabbitMsg = cRabbitMsg{}
)type cRabbitMsg struct{}// SendMessage 处理发送普通消息的HTTP请求
// 请求参数:
//   - message: 消息内容
// 响应格式:
//   成功:{"code": 0, "msg": "消息发送成功"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) SendMessage(r *ghttp.Request) {message := r.Get("message").String()err := service.RabbitmqMsg().SendMessage(r.GetCtx(), message)if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "消息发送成功",})
}// SendDelayMessage 处理发送延迟消息的HTTP请求
// 请求参数:
//   - message: 消息内容
//   - delay: 延迟时间(秒)
// 响应格式:
//   成功:{"code": 0, "msg": "延迟消息发送成功"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) SendDelayMessage(r *ghttp.Request) {message := r.Get("message").String()delaySeconds := r.Get("delay").Int()err := service.RabbitmqMsg().SendDelayMessage(r.GetCtx(), message, delaySeconds)if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "延迟消息发送成功",})
}// SendBatchMessages 处理批量发送消息的HTTP请求
// 请求参数:
//   - messages: 消息内容数组
// 响应格式:
//   成功:{"code": 0, "msg": "批量消息发送成功"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) SendBatchMessages(r *ghttp.Request) {messages := r.Get("messages").Strings()err := service.RabbitmqMsg().SendBatchMessages(r.GetCtx(), messages)if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "批量消息发送成功",})
}// GetQueueLength 处理获取队列长度的HTTP请求
// 响应格式:
//   成功:{"code": 0, "msg": "获取队列长度成功", "data": 队列长度}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) GetQueueLength(r *ghttp.Request) {length, err := service.RabbitmqMsg().GetQueueLength(r.GetCtx())if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "获取队列长度成功","data": length,})
}// PurgeQueue 处理清空队列的HTTP请求
// 响应格式:
//   成功:{"code": 0, "msg": "清空队列成功"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) PurgeQueue(r *ghttp.Request) {err := service.RabbitmqMsg().PurgeQueue(r.GetCtx())if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "清空队列成功",})
}// ConsumeMessages 处理消费消息的HTTP请求
// 特点:异步处理,非阻塞
// 响应格式:
//   成功:{"code": 0, "msg": "消息消费已开始,请查看服务器日志获取消费详情"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) ConsumeMessages(r *ghttp.Request) {g.Log().Info(r.GetCtx(), "开始消费消息...")done := make(chan bool)go func() {err := service.RabbitmqMsg().Initialize(r.GetCtx())if err != nil {g.Log().Error(r.GetCtx(), "消费消息出错:", err)r.Response.WriteJson(g.Map{"code": -1,"msg":  fmt.Sprintf("消费消息失败: %v", err),})done <- truereturn}}()select {case <-done:returncase <-time.After(5 * time.Second):g.Log().Info(r.GetCtx(), "消息消费进行中...")r.Response.WriteJson(g.Map{"code": 0,"msg":  "消息消费已开始,请查看服务器日志获取消费详情",})}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/60304.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第8章 利用CSS制作导航菜单

8.1 水平顶部导航栏 水平莱单导航栏是网站设计中应用范围最广的导航设计&#xff0c;一般放置在页面的顶部。水平 导航适用性强&#xff0c;几乎所有类型的网站都可以使用&#xff0c;设计难度较低。 如果导航过于普通&#xff0c;无法容纳复杂的信息结构&#xff0c;就需要在…

JavaScript Cookie 与 服务器生成的 Cookie 的区别与应用

JavaScript Cookie 与 服务器生成的 Cookie 的区别与应用 Cookie是一种甜点&#xff0c;同时也是web前端开发中一种非常常见且重要的技术&#xff0c;它用于在客户端和服务器之间存储和传递信息。用户身份验证、会话管理&#xff0c;还是用户个性化设置&#xff0c;都离不开Coo…

【C#/C++】C++/CL中String^的含义和举例,C++层需要调用C#层对象时...

示例&#xff1a; String^ IDataServer::GetParam(String^ aParamName){ /// }在 C/CLI 中&#xff0c;String^ 和 IDataServer::GetParam(String^ aParamName) 这种写法是一种混合了 C 和 .NET 的语法&#xff0c;用于在 C 中操作 .NET 对象。C/CLI 是微软扩展的 C 语言&…

创客节小学组C++模拟题

来源:加码未来2024年深圳罗湖区创客节模拟题(小学组) 第一题 题目描述 给你n个数,找出出现次数超过一半的数。题目保证这样的数一定存在。 输入格式 第一行一个整数n,(n<=1000) 第二行n个整数(<1000000) 输出格式 输出一个整数 样例输入 5 1 2 3 3 3 样例输…

将数据上传至hdfs的两种方式:java代码上传、将数据放入kafka中,通过flume抽取

目录 1、 生成一条&#xff0c;使用 java 代码将数据放入hdfs上传。 2、 生成一条&#xff0c;编写kafka生产者&#xff0c;将数据放入kafka。kafka source-->flume -->hdfs sink 场景题&#xff1a; 使用 java 代码随机生成学生信息&#xff0c;学生的学号从 0001 开…

微信小程序原生 canvas画布截取视频帧保存为图片并进行裁剪

html页面&#xff1a; 视频尺寸过大会画布会撑开屏幕&#xff0c;要下滑 尺寸和视频链接是从上个页面点击传过来的&#xff0c;可自行定义 <canvas id"cvs1" type"2d" style"width: {{videoWidth}}px;height: {{videoHeight}}px;"><…

【Linux】软件安装目录的选择

根据 FHS Referenced Specifications /home 是用来放用户文档和个人文件的&#xff0c;不是应用。/opt 是用来放发行版的扩展应用&#xff0c;比如应用商店、计算器之类的&#xff08;有的不会放在这儿&#xff0c;例如 Ubuntu&#xff09;。/usr/lib 则是用于编程和包的库&am…

【vue】echarts地图添加蒙版图片,多图层地图实现天气信息展示

实现原理&#xff1a;多层图层叠加实现复杂的信息展示。 <template><div class"wrapper"><el-drawertitle"天气信息":modal"iszz":visible.sync"weatherinfo":direction"direction"><drawer:labelnam…

100+SCI科研绘图系列教程(R和python)

科研绘图系列&#xff1a;箱线图加百分比点图展示组间差异-CSDN博客科研绘图系列&#xff1a;箱线图加蜜蜂图展示组间数据分布-CSDN博客科研绘图系列&#xff1a;小提琴图和双侧小提琴图展示组间差异-CSDN博客科研绘图系列&#xff1a;组间差异的STAMP图的ggplot2实现-CSDN博客…

QT鼠标事件

QT鼠标事件 1.概述 这篇文章介绍如何使用事件和获取事件的信号 2.创建项目 创建一个widget类型项目&#xff0c;在widget.ui文件中添加一个label控件 然后在项目名称上右键选择Add new... 添加文件&#xff0c;选择 C Class 自定义类名Mylabel&#xff0c;选择基类Base …

“双十一”电商狂欢进行时,在AI的加持下看网易云信IM、RTC如何助力商家!

作为一年一度的消费盛会&#xff0c;2024年“双十一”购物狂欢节早已拉开帷幕。蹲守直播间、在主播热情介绍中点开链接并加购&#xff0c;也已成为大多数人打开“双11”的重要方式。然而&#xff0c;在这火热的购物氛围背后&#xff0c;主播频频“翻车”、优质主播稀缺、客服响…

深入浅出rust内存对齐

在 Rust 中&#xff0c;内存对齐是一个重要的概念&#xff0c;它涉及到数据在内存中的存储方式&#xff0c;以及如何优化内存访问的效率。往往一门语言的内存布局以及对齐方式决定了一门语言的性能&#xff0c;因此学会并深入理解rust中内存布局会让我们写出高性能的rust代码&a…

C++的起源与发展

一、C的起源与初期发展 C的起源可以追溯到1979年&#xff0c;当时丹麦计算机科学家比雅尼斯特劳斯特鲁普&#xff08;Bjarne Stroustrup&#xff09;在贝尔实验室从事计算机科学和软件工程的研究工作。面对项目中复杂的软件开发任务&#xff0c;特别是模拟和操作系统的开发工作…

SDL渲染器和纹理

文章目录 渲染器 (SDL_Renderer)纹理 (SDL_Texture)代码 渲染器 (SDL_Renderer) &#xff1a;它是渲染内容的接口&#xff0c;负责将内容绘制到窗口中。通过SDL_CreateRenderer创建&#xff0c;可以设置渲染器的背景颜色、绘图颜色、透明度等。所有绘图操作&#xff08;如绘制…

题目练习之二叉树那些事儿(续集)

♥♥♥~~~~~~欢迎光临知星小度博客空间~~~~~~♥♥♥ ♥♥♥零星地变得优秀~也能拼凑出星河~♥♥♥ ♥♥♥我们一起努力成为更好的自己~♥♥♥ ♥♥♥如果这一篇博客对你有帮助~别忘了点赞分享哦~♥♥♥ ♥♥♥如果有什么问题可以评论区留言或者私信我哦~♥♥♥ ✨✨✨✨✨✨个人…

go语言 分布式一致

flowchart TDStart([接收 key]) --> CheckCache{检查是否被缓存}CheckCache -->|是| ReturnCache1[返回缓存值 ⑴]CheckCache -->|否| CheckRemote{是否应当从远程节点获取}CheckRemote -->|是| HashSelect[使用一致性哈希选择节点]HashSelect --> IsRemote{是否…

【STL栈和队列】:高效数据结构的应用秘籍

前言&#xff1a; C 标准模板库&#xff08;STL&#xff09;为我们提供了多种容器&#xff0c;其中 stack&#xff08;栈&#xff09;和 queue&#xff08;队列&#xff09;是非常常用的两种容器。 根据之前C语言实现的栈和队列&#xff0c;&#xff08;如有遗忘&#xff0c;…

Zabbix 7 最新版本安装 Rocky Linux 8

前言 本实验主要在Rocky Linux 中安装Zabbix&#xff0c;其他centos8、Debian、Ubuntu、Alma Linux都可以安装&#xff0c;就是在中间件有点不同。Nginx就要配置一下&#xff0c;官网给的教程也算是很规范的&#xff0c;就是在MySQL上要自己安装&#xff0c;他没有告诉我们&am…

docker里rtsp推流+同一个docker接受流进行部署

1.参考&#xff1a; https://blog.csdn.net/m0_57609406/article/details/140323327 2.dockerfile命令 # 使用官方 Python 基础镜像 FROM python:3.8.18-slim# 设置工作目录 WORKDIR /usr/src/app# 安装必要的软件包&#xff08;FFmpeg、OpenCV、lsof、RTSP工具&#xff09;…

主进程main.js打印中文时终端显示乱码解决方案

{"name": "aaa","version": "1.0.0","description": "first electron app","main": "main.js","scripts": {// 解决乱码的问题"start": "chcp 65001 && no…