Go实现RabbitMQ消息模式

【目标】

  1. go实现RabbitMQ简单模式和work工作模式

  2. go实现RabbitMQ 消息持久化和手动应答

  3. go实现RabbitMQ 发布订阅模式

  4. go使用MQ实现评论后排行榜更新

1. go实现简单模式

编写路由实现生产消息

实现生产消息

MQ消息执行为命令行执行,所以创建命令行执行函数main,用来消费消息

创建mq/demo/main.go

浏览器中访问路由,执行生产者生产消息

打开http://localhost:15672/#/queues, 查看RabbitMQ客户端查看是否消息

执行消费者,实现消息消费

进入 mq/demo/中,执行bee run

2. go实现work工作模式

在启动另一个窗口,实现第二个消费者

生产消息

打开RabbitMQ客户端,查看消费者

查看work消费

两个work时,轮询执行消费

2.1 go实现RabbitMQ消息持久化和手动应答

消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化

生产者实现消息持久化

第二个参数设置为true,即durable=true.

消费者实现消息持久化

在RabbitMQ服务重启或者服务宕机的情况下,也不会丢失消息。

可以将Queue与Message都设置为可持久化(durable),这样可以保证绝大部分情况下RabbitMQ消息不会丢失。

手动应答

RabbitMQ 消息应答机制

消费者处理一个任务是需要一段时间的,如果有一个消费者正在处理一个比较耗时的任务并且只处理了一部分,突然这个时候消费者宕机了,那么会出现什么情况呢?

如果是自动应答模式,消费者在处理任务的过程中宕机了,那么消息将会丢失,而手动应答则能够保证消息不会被丢失,所以在实际的应用当中绝大多数都采用手动应答

为了保证消息从队列可靠地达到消费者并且被消费者消费处理,RabbitMQ 提供了消息应答机制,RabbitMQ 有两种应答机制,自动应答和手动应答

1、自动应答

RabbitMQ 只要将消息分发给消费者就被认为消息传递成功,就会将内存中的消息删除,而不管消费者有没有处理完消息

2、手动应答

RabbitMQ 将消息分发给了消费者,并且只有当消费者处理完成了整个消息之后才会被认为消息传递成功了,然后才会将内存中的消息删除

消息应答:

消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

手动应答优点:

可以批量应答并且减少网络拥堵

消费方法中设置手动应答

效果:

关闭自动应答

RabbitMQ中查看

开启手动应答后,才返回消息执行成功,保证了消息不会被丢失

3. go实现RabbitMQ 发布订阅模式

RabbitMq消息模式的核心思想是:

一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。

  实际上,生产者只能把消息发送给一个exchange(交换机),exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列。一个exchage必须清楚地知道如何处理一条消息。

有四种类型的交换器,分别是:direct、topic、headers、fanous(广播模式)

广播模式交换器很简单,从字面意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。在我们的日志系统中正好需要这种模式。

go实现RabbitMQ 发布订阅模式 RabbitMQ tutorial - Publish/Subscribe | RabbitMQ

实现广播模式(发布订阅模式)demo

生产者向交换机中发送消息

和简单模式、work模式相比,多了创建交换机

消费者拉取交换机中消息实现消费

和简单模式、work模式相比,多了创建交换机、创建了临时队列、绑定临时队列

临时队列

  我们使用的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

 demo中的日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,而不是部分。并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。

  首先,无论何时我们的消费者连接到RabbitMq,我们都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列,或者让RabbitMq生成任意的队列名字。

  其次,一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。

  通过queueDeclare()来创建一个非持久化、专有的、自动删除的、名字随机生成的队列。

实现发布订阅模式:

创建消息路由

控制器中实现生产者消息推送到交换机

创建mq/fanout/main.go,实现消费者从交换机中获取消息实现消费

效果:

执行生产者,实现消息生产

打开RabbitMQ客户端,查看消息状态

执行消费者,实现消费

注:因为是发布订阅模式。所以我们启动两个消费者实现多个用户消费同一消息

消费者1

消费者2

当生产者生产消息时,所订阅的消费者会执行消费

消费者1

消费者2

4. go实现RabbitMQ 路由模式


一个通过路由把One的消息取出来,另一个通过路由把two的消息取出来,一个队列打印奇数,一个队列打印偶数

生产者代码

消费者代码奇数代码


消费者代码偶数代码

运行效果

5. go实现RabbitMQ 主题模式


生产者代码

// topic主题push
// @router /mq/topic/push [*]
func (this *MqDemoController) GetTopic() {//创建线程执行(发送自增的数字到队列中)go func() {count := 0for {if count%2 == 0 {//strconv.Itoa 把count转化为字符串mq.PublishEx("wsyb.demo.topic", "topic", "wsyb.video", "wsyb.video"+strconv.Itoa(count))} else {mq.PublishEx("wsyb.demo.topic", "topic", "user.wsyb", "user.wsyb"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("topic")
}// topic主题push
// @router /mq/topictwo/push [*]
func (this *MqDemoController) GetTopicTwo() {//创建线程执行(发送自增的数字到队列中)go func() {count := 0for {if count%2 == 0 {//strconv.Itoa 把count转化为字符串mq.PublishEx("wsyb.demo.topic", "topic", "a.frog.name", "a.frog.name"+strconv.Itoa(count))} else {mq.PublishEx("wsyb.demo.topic", "topic", "b.frog.uid", "b.frog.uid"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("topic")
}

消费所有主题代码(#)

// 包名必须是main否则消费不成功
package mainimport ("fmt""wsybapi/services/mq"
)func main() {//执行消费  # 代表获取所有的数据mq.ConsumerEx("wsyb.demo.topic", "topic", "#", callback)
}// 回调函数
func callback(s string) {//打印消费结果fmt.Printf("topic all msg is :%s\n", s)
}

匹配多个规则进行消费

// 包名必须是main否则消费不成功
package mainimport ("fmt""wsybapi/services/mq"
)func main() {//执行消费 * 匹配一个或者多个符合规则的数据mq.ConsumerEx("wsyb.demo.topic", "topic", "*.frog.*", callback)
}// 回调函数
func callback(s string) {//打印消费结果fmt.Printf("topic frog msg is :%s\n", s)
}

匹配一个规则进行消费

// 包名必须是main否则消费不成功
package mainimport ("fmt""wsybapi/services/mq"
)func main() {//执行消费mq.ConsumerEx("wsyb.demo.topic", "topic", "wsyb.*", callback)
}// 回调函数
func callback(s string) {//打印消费结果fmt.Printf("tpic wsyb msg is :%s\n", s)
}

运行结果

6. rabbitmq死信队列

6.1应用场景:
  1. 发送消息规定10分钟以后发送给用户
  2. 规定消息每天固定的时间发送
    3.下了订单没有支付,30分钟以后就会取消订单
    4.订单相关的,下单以后会定时收到会系统的提示消息
6.2什么是死信队列呢:

死信队列产生的条件,不仅是ttl时间过期了,还有消息被拒绝,队列达到最大长度,都会产生死信,相信大家已经明白了

7. go使用MQ实现评论后排行榜更新

修改逻辑,新增评论时更新redis排行榜的数据

发布评论

打开MQ客户端,查看队列状态

创建mq/top/main.go,连接数据库

在消费回调函数中,编写消费者逻辑实现排行榜更新

执行消费者

效果:

先评论内容

打开redis可视化界面,查看排行榜评论数

再次评论

打开redis可视化界面,查看排行榜评论数是否实现更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/880765.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【React】react项目中的redux使用

1. store目录结构设计 2. react组件中使用store中的数据——useSelector 3. react组件中修改store中的数据——useDispatch 4. 示例 react-basic\src\store\moduels\counterStore.js import { createSlice } from reduxjs/toolkitconst counterStore createSlice({name: cou…

Flutter屏幕适配

我们可以根据下面有适配属性的Widget来进行屏幕适配 1.MediaQuery 通过它可以直接获得屏幕的大小(宽度 / 高度)和方向(纵向 / 横向) Size screenSize MediaQuery.of(context).size; double width screenSize.width; double h…

【Linux:线程概念】

目录 概念: 创建线程的函数:​编辑 ​编辑 有多进程为什么还需要有多线程? 线程调度的成本为什么低? 进程与线程的区别: 概念: 线程是CPU的基本调度单位,在进程内部运行。在内核中&#xff…

CSS 效果:实现动态展示双箭头

最近写了一段 CSS 样式,虽然不难,但实现过程比较繁琐。这个效果结合了两个箭头,一个突出,一个内缩,非常适合用于步骤导航或选项卡切换等场景。样式不仅仅是静态的,还可以通过点击 click 或者 hover 事件&am…

Java的栈帧和动态链接是什么?

在 Java 的面试过程中,不可避免的一个面试题那就是 JVM,而 JVM 的面试题中,有各种,比如在堆中会被问到的关于垃圾回收机制的相关问题,在栈中会被问到入栈以及出栈的过程,来聊一下关于栈的相关问题&#xff…

C0008.Clion利用C++开发Qt界面,使用OpenCV时,配置OpenCV方法

安装OpenCV 配置环境 配置Clion中的CMakeLists.txt文件 # 设置OpenCV的安装路径 set(OpenCV_DIR "D:/OpenCv_Win/opencv/build/x64/vc16/lib"

分糖果C++

题目&#xff1a; 样例解释&#xff1a; 样例1解释 拿 k20 块糖放入篮子里。 篮子里现在糖果数 20≥n7&#xff0c;因此所有小朋友获得一块糖&#xff1b; 篮子里现在糖果数变成 13≥n7&#xff0c;因此所有小朋友获得一块糖&#xff1b; 篮子里现在糖果数变成 6<n7&#xf…

【算法竞赛】堆

堆是一种树形结构,树的根是堆顶,堆顶始终保持为所有元素的最优值。 有最大堆和最小堆,最大堆的根节点是最大值,最小堆的根节点是最小值。 本节都以最小堆为例进行讲解。 堆一般用二叉树实现,称为二叉堆。 二叉堆的典型应用有堆排序和优先队列。 二叉堆的概念 二叉堆是一棵…

定时器定时中断定时器外部中断

基础背景&#xff1a;TIM定时中断-CSDN博客 TIM的函数 // 恢复缺省设置 void TIM_DeInit(TIM_TypeDef* TIMx); // 时基单元初始化&#xff0c;第一个参数TIMx选择某个定时器&#xff0c;第二个参数是结构体&#xff0c;包含了配置时基单元的一些参数。 void TIM_TimeBaseInit…

blender解决缩放到某个距离就不能继续缩放

threejs中也存在同样的问题&#xff0c;原因相同&#xff0c;都是因为相机位置和相机观察点距离太近导致的。 threejs解决缩放到某个距离就不能继续缩放-CSDN博客 blender中的解决方案 1、视图中心->视图锁定->选择你想看的物体

图解C#高级教程(三):泛型

本讲用许多代码示例介绍了 C# 语言当中的泛型&#xff0c;主要包括泛型类、接口、结构、委托和方法。 文章目录 1. 为什么需要泛型&#xff1f;2. 泛型类的定义2.1 泛型类的定义2.2 使用泛型类创建变量和实例 3. 使用泛型类实现一个简单的栈3.1 类型参数的约束3.2 Where 子句3…

安装图片标识工具anylabeling

目录 下载压缩包 创建环境 安装opencv 安装第三方库 运行setup.py文件 安装过程可能会出现的错误&#xff1a; 错误1 错误2 安装完成 图标更换 之前提到的嵌入式开发】可编程4k蓝牙摄像头点击器还可以训练模型&#xff0c;使图像识别精度提高 现在讲解&#xff0c;如…

uniapp微信小程序,获取上一页面路由

在进入当前页面的时候&#xff0c;判断是不是从某个页面跳转过来的&#xff08;一般是当前页面为公共页面是出现的&#xff09;&#xff0c;比如 A-->B C-->B ,那么 要在 C跳转到B页面的时候多个提示语什么的 而在A跳转到B时不需要&#xff0c;那么就要判断 上一页面的…

前端规范工程-5:Git提交信息规范(commitlint + czg)

前面讲的都是在git提交之前的一些检查流程&#xff0c;然而我们git提交信息的时候&#xff0c;也应该是需要规范的。直接进入主题&#xff1a; 目录 需安装插件清单commitlint 介绍安装配置配置commit-msg钩子提交填写commit信息czg后续方式一&#xff1a;push触动build并上传…

DataEase v2 开源代码 Windows 从0到1环境搭建

一、环境准备 功能名称 描述 其它 操作系统 Windows 数据库 Mysql8.0 开发环境 JDK17以上 本项基于的21版本开发 Maven 3.9版本 开发工具 idea2024.2版本 前端 VSCode TIPS&#xff1a;如果你本地有jdk8版本&#xff0c;需要切换21版本&#xff0c;请看…

深入浅出MySQL事务处理:从基础概念到ACID特性及并发控制

1、什么是事务 在实际的业务开发中&#xff0c;有些业务操作要多次访问数据库。一个业务要发送多条SQL语句给数据库执行。需要将多次访问数据库的操作视为一个整体来执行&#xff0c;要么所有的SQL语句全部执行成功。如果其中有一条SQL语句失败&#xff0c;就进行事务的回滚&a…

RabbitMQ的应用问题

一、幂等性保障 幂等性是数学和计算机科学中某些运算的性质, 它们可以被多次应⽤, ⽽不会改变初始应⽤的结果 数学上的幂等性&#xff1a; f(x)f(f(x)) |x| 数据库操作幂等性&#xff1a; 数据库的 select 操作. 不同时间两次查询的结果可能不同, 但是这个操作是符合幂等性…

教务系统登录的分析

武汉纺织大学屏蔽了正方教务系统的默认登录页面&#xff0c;他们学校自定义的登录页面用户名和密码都是明文传输。可以使用Httpclient模拟登录。手动登录后&#xff0c;5次get请求才能获得真实的cookies。合肥工业大学需要3次。 第一次是POST请求。 Post请求的的下一个Location…

yum使用阿里云的镜像源报错 Failed connect to mirrors.aliyuncs.com:80; Connection refused“

报错&#xff1a;Failed connect to mirrors.aliyuncs.com:80; Connection refused"&#xff0c;如果单独只是这个报错的话&#xff0c;那么原因是由于非阿里云ECS用户无法解析主机“mirrors.cloud.aliyuncs.com”。如果不单单只是这个报错另外还有其它报错请参考我其它文…

【SQL】筛选字符串与正则表达式

目录 语法 需求 示例 分析 代码 语法 SELECT column1, column2, ... FROM table_name WHERE condition; WHERE 子句用于指定过滤条件&#xff0c;以限制从数据库表中检索的数据。当你执行一个查询时&#xff0c;WHERE 子句允许你筛选出满足特定条件的记录。如果记录满…