初识RabbitMQ - 实践

news/2026/1/25 9:13:58/文章来源:https://www.cnblogs.com/ljbguanli/p/19528430

初识RabbitMQ - 实践

初识RabbitMQ

  • RabbitMQ安装和使用(docker,centos 7 )
  • RabbitMQ的介绍
    • RabbitMQ的作用
    • RabbitMQ的基础知识
      • 架构
      • 交换机类型和路由规则
      • SpringBoot整合RabbitMQ
      • 发送和接收消息(测试版)
      • 发送消息的api
      • 消息堆积问题
      • 消息可靠性问题
        • 生产者确认机制
      • MQ消息可靠性
      • 消费者确认机制

RabbitMQ安装和使用(docker,centos 7 )

docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=214913 \  - 指定用户名和密码,用于登录RabbitMQ
-v mq-plugins:/plugins \         -   设置数据卷
--name mq          - 为容器指定名称为"mq"
--hostname mq     - 设置容器的主机名为"mq",RabbitMQ推荐显式设置主机名以确保正确运行
-p 15672:15672    - 端口映射,将容器的15672端口(RabbitMQ管理界面)映射到宿主机的15672端口
-p 5672:5672    - 端口映射,将容器的5672端口(RabbitMQ AMQP协议端口)映射到宿主机的5672端口
--network cc\       - 不存在不会创建,会报错
-d \               -  以守护进程方式运行容器(后台运行)
rabbitmq:3.8-management    - 使用RabbitMQ 3.8版本镜像,并且包含管理插件

RabbitMQ管理界面的地址:ip:15672
这个貌似不需要开放虚拟机防火墙端口也可以访问(没试过),但是api端口需要开放。下面就是RabbitMQ的管理端页面,这里可以自己摸索一下。

RabbitMQ的介绍

RabbitMQ的作用

在一些场景下,比如用户支付,支付之后还需要修改订单状态,加积分,发通知,等一系列操作,如果我是同步调用,会引发一些问题。

  1. 性能低,支付业务逻辑处理完之后,还需要在进行其他业务,消耗时间。
  2. 耦合度高,如果加积分的代码修改了,可能支付业务这一块代码也需要修改
  3. 拓展性差,支付完之后,老板说在给客户来个京豆,优惠卷啥的,又要修改代码

如何解决呢,异步调用。

异步调用的模型结构:

  1. 发布者
  2. 消息代理(broker),也就是RabbitMQ的扮演的角色
  3. 消费者
    具体流程就是:用户完成支付之后,支付的业务代码发送一条消息到RabbitMQ,消费者监听并处理RabbitMQ中的消息。
    Broker的选择不只RabbitMQ一种,还有kafka,RocketMQ等。可以了解。

RabbitMQ的基础知识

架构

  1. 虚拟主机(virtualHost) :起到数据隔离的作用,就像java的模块,每一个虚拟主机都有自己的交换机和队列
  2. 交换机(exchange):生产者发送的消息由交换机决定投递到哪个队列
  3. 队列(queue):生产者投递的消息会暂存在消息队列中,等待消费者处理
  4. 生产者:发送消息的一方
  5. 消费者:消费消息的一方

交换机类型和路由规则

  1. Fanout Exchange (广播模式):会把接收到的消息广播到每一个跟其绑定的queue。
  2. Direct Exchange (定向路由): 把消息按照规则路由到指定的Queue,可以根据这种特性实现类似Fanout的效果。
    ○ 每一个Queue都与Exchange 设置一个BindingKey
    ○ 发布者发布消息时,指定消息的RoutingKey
    ○ Exchange将消息路由到BingdingKey和消息的RoutingKey一致的队列
  3. Topic交换机:和上一个类似。
    a. routingKey可以是多个单词的列表,可以以 . 来分割
    b. BingdingKey 可以使用通配符: # :代指0个或者是多个单词 * : 代指一个单词。

SpringBoot整合RabbitMQ

1.导入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.yml文件配置

rabbitmq:
port: 5672
virtual-host: your-VirtualHost  # 虚拟主机
username: your_username        # 用户名
password: your_password        # 密码
listener:                # 监听器配置
simple:
prefetch: 1 # 每次只处理一个消息
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制

3.使用注解来声明交换机和队列

@RabbitListener(
// 绑定队列到交换机
bindings = @QueueBinding(
// 队列配置 ,durable = true 表示持久化
value = @Queue(
value = "petlife.board.queue",
name = "petlife.board.queue",
arguments = @Argument(name = "x-queue-mode", value = "lazy"),
// x-queue-mode: 队列模式,lazy 懒加载,表示队列中的消息会存储在磁盘中,而不是内存中
//value: 指定队列在 RabbitMQ 中的实际名称
//name: 指定队列在 Spring 容器中的 Bean 名称
durable = "true"),
// 交换机配置,
exchange = @Exchange(
value = "petlife.board.exchange",
// 交换机名称
type = ExchangeTypes.TOPIC),
// 交换机类型,topic 交换机
key = "board.#"))
// 路由键,匹配 board. 开头的路由键
public void listener(String message) {
//执行具体的业务逻辑
System.out.println("监听到消息:" + message);
}

4.声明消息转换器
通过源码可以看到,默认是SimpleMessageConverter ,支持的是java原生序列化,不是Json序列化。
在这里插入图片描述

@SpringBootApplication
@ComponentScan(basePackages = {"com.petlife"})
public class PetLifeApplication {
public static void main(String[] args) {
SpringApplication.run(PetLifeApplication.class, args);
}
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器 : springboot默认使用JDK序列化,这里使用Jackson2JsonMessageConverter
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
}

发送和接收消息(测试版)

直接往队列中发送消息,当前虚拟主机中要有这个队列,不然会报错。

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class TestApplicationTests {
@Autowired![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/9f1fdae9bb534e47b86f4bec2af827a7.png)
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TestListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("TestListener.listenSimpleQueue: " + message);
}
}

运行结果
在这里插入图片描述

发送消息的api

@Service
public class SimpleMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 1. 简单发送 - 使用默认交换机
*/
public void sendSimpleMessage(String queueName, Object message) {
rabbitTemplate.convertAndSend(queueName, message);
}
/**
* 2. 发送到指定交换机和路由键
*/
public void sendToExchange(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
/**
* 3. 发送消息并设置消息属性
*/
public void sendWithProperties(String exchange, String routingKey, Object message) {
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
properties.setHeader("custom-header", "value");
properties.setExpiration("10000"); // 10秒过期
Message msg = MessageBuilder.withBody(JsonUtils.toJsonBytes(message))
.andProperties(properties)
.build();
rabbitTemplate.send(exchange, routingKey, msg);
}
/**
* 发送消息并处理确认和返回
*/
public void sendWithCallback(String exchange, String routingKey, Object message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 异步处理确认结果
correlationData.getFuture().addCallback(
result -> {
if (result != null && result.isAck()) {
log.info("消息发送成功, ID: {}", correlationData.getId());
} else {
log.error("消息发送失败, ID: {}", correlationData.getId());
}
},
ex -> log.error("消息发送异常, ID: {}", correlationData.getId(), ex)
);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}

消息堆积问题

默认情况下,RabbitMQ会将消息一次轮询投递给绑定在队列上的每一个消费者,但这并没有考虑到消费者是否已经处理完消息,可能会出现消息堆积。
测试:

@SpringBootTest
class ListenerApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
}
}
}
@Component
public class TestListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) throws InterruptedException {
Thread.sleep(1000);
log.info("Simple queue1: {}", message);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue2(String message) {
log.info("Simple queue2: {}", message);
}
}

运行结果:
在这里插入图片描述
解决办法:
在yml配置中添加

listener:                # 监听器配置
simple:
prefetch: 1 # 每次只处理一个消息

开启后同样的代码再次运行

在这里插入图片描述
对于消息堆积问题,可以使用work模型

  • 多个消费者绑定同一个队列可以加快消息处理速度
  • 同一条消息只会被同一个消息处理
  • 通过yml配置来控制消费者预取的消息数量,处理完一条在处理下一条,能者多劳。

消息可靠性问题

  • MQ没有成功的发送消息,
  • MQ发送消息了,但是还有没接收到,MQ挂了;
  • 消息收到了,但是处理过程中出了异常。
    这些都可能导致业务出现错误。
    对于这些,RabbitMQ有哪些相应的机制来解决呢。
生产者确认机制

1,生产者重连
网络波动导致客户端连接MQ失败,可以配置失败重连机制。spingAMQP重试机制是阻塞式的,会影响业务性能。

connection-timeout: 1s  # MQ超时连接时间
template:
retry:
enabled: true     # 开启重试
initial-interval: 1000ms  # 初始等待时间
max-attempts: 3 # 重试次数
multiplier: 2  # 下次等待时间 = initial - interval * multiplier

2.生产者确认机制

在这里插入图片描述

 发布者发送消息到RabbitMQ,怎么知道是否发送到了呢,他有两种机制,一个是Publisher Return ,一个是publisher-confirm。但是,比较消耗性能,所以一般不建议开启。开启之后,会根据消息的状态返回一个回执.

● 消息投递到了MQ但是路由失败了,会通过 PR 返回路由异常原因,然后返回ACK,告知投递成功(就是发送到了交换机,但是没有队列接收,通常是routingkey或者是绑定配置出现了问题)
● 临时消息投递到了MQ,并且入队成功,返回ACK,告知通敌成功
● 持久消息投递到了MQ,并且入队成功完成持久化,返回ACK,告知投递成功
● 其他情况返回NACK,告知投递失败
其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。

MQ消息可靠性

默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化。
Spring amqp 默认情况下创建的交换机是持久的。(注解是如此,可以看源码,使用bean创建,没怎么注意)
1.交换机持久化
可以在控制台设置
在这里插入图片描述
之前使用注解来声明交换机和队列时,里面的参数都有讲解,这里就不说了。
在这里插入图片描述

2.队列持久化
队列也是一样的
在这里插入图片描述

3.消息持久化

消费者确认机制

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。

…未完待续

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1213636.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2026年可靠的酒店工程墙布/十字布基墙布厂家最新推荐权威榜

在酒店装修工程中,墙布的选择直接关系到整体装饰效果和使用寿命。经过对国内30余家墙布生产企业的实地考察和产品测试,我们基于生产能力、技术创新、环保标准、工程案例和售后服务五个维度,评选出2026年最值得信赖的…

Qwen-Image蒸馏版和原版哪个好?实测数据告诉你

Qwen-Image蒸馏版和原版哪个好&#xff1f;实测数据告诉你 你是不是也遇到过这样的纠结&#xff1a;想用Qwen-Image生成高质量中文图文&#xff0c;但又担心显卡带不动、出图太慢、效果不稳&#xff1f;官方原版模型看着强大&#xff0c;可4090D单卡跑起来真能扛住吗&#xff…

WeakMap内存机制揭秘:ES6弱引用特性深度剖析

以下是对您提供的博文《WeakMap内存机制揭秘:ES6弱引用特性深度剖析》的 全面润色与优化版本 。本次改写严格遵循您的要求: ✅ 彻底去除AI腔、模板化表达与刻板结构(如“引言/总结/展望”等标题) ✅ 以真实技术博主口吻重写,语言自然、有节奏、带思考痕迹和实战温度 …

语音指令分割实战:让每个命令独立可分析

语音指令分割实战&#xff1a;让每个命令独立可分析 在智能语音交互系统中&#xff0c;我们常常遇到一个看似简单却极其关键的问题&#xff1a;一段连续的录音里&#xff0c;到底包含几个独立的语音指令&#xff1f;比如用户对着设备说“打开空调”“调高两度”“关闭灯光”&a…

AUTOSAR架构下诊断堆栈详解:新手教程配置步骤

以下是对您提供的博文《AUTOSAR架构下诊断堆栈详解:核心机制、模块协同与配置实践》的 深度润色与结构化重构版本 。本次优化严格遵循您的全部要求: ✅ 彻底去除AI痕迹,语言自然如资深汽车软件工程师现场讲解 ✅ 打破模板化标题体系,以逻辑流替代章节标签,全文一气呵成…

零基础入门verl:手把手教你搭建大模型强化学习环境

零基础入门verl&#xff1a;手把手教你搭建大模型强化学习环境 注意&#xff1a;本文面向完全零基础的开发者&#xff0c;不假设你了解强化学习、PPO算法或分布式训练。所有操作均可在一台带单张A100或V100的服务器上完成&#xff0c;无需集群&#xff0c;无需修改源码&#xf…

指令定制提升效果:Qwen3-Embedding-0.6B高级玩法

指令定制提升效果&#xff1a;Qwen3-Embedding-0.6B高级玩法 你有没有遇到过这样的问题&#xff1a;用同一个嵌入模型处理中文客服问答和英文技术文档&#xff0c;效果却差了一大截&#xff1f;或者在金融场景下做语义匹配&#xff0c;明明句子意思相近&#xff0c;向量距离却…

2026年评价高的双锥干燥机TOP实力厂家推荐榜

在制药、化工、食品等行业中,双锥干燥机作为关键设备,其性能直接影响生产效率和产品质量。本文基于设备性能指标、用户实际反馈、技术创新能力及售后服务体系四个维度,对国内双锥干燥机生产企业进行客观评估。经实地…

FSMN-VAD性能优化建议,让检测速度提升一倍

FSMN-VAD性能优化建议&#xff0c;让检测速度提升一倍 语音端点检测&#xff08;VAD&#xff09;是语音处理流水线中不可或缺的预处理环节。在实际工程中&#xff0c;我们常遇到这样的问题&#xff1a;一段5分钟的会议录音&#xff0c;FSMN-VAD默认配置下需要近8秒才能完成检测…

2026年评价高的渔用钢丝绳索具/船用钢丝绳索具厂家最新TOP排行榜

在渔用和船用钢丝绳索具领域,选择可靠的供应商至关重要。本文基于企业技术实力、生产规模、行业标准参与度、客户评价及市场反馈等维度,综合评估筛选出5家值得关注的厂家。其中,上海正申金属制品凭借30年行业深耕、…

2026最新IDEA激活码免费获取以及永久激活安装教程分享

2026最新IDEA激活码免费获取链接: https://docs.qq.com/doc/DTWJUbE50b1Z2bHFz2026最新IntelliJ IDEA安装激活教程分享 IntelliJ IDEA作为JetBrains旗下的旗舰级IDE,凭借强大的代码提示、重构工具与跨平台兼容性,始…

2026年正规的导轨清洗机/溶剂清洗机厂家推荐及采购指南

在工业制造领域,导轨清洗机和溶剂清洗机是确保精密零部件加工质量的关键设备。选择优质供应商需综合考虑技术实力、行业经验、设备性能及售后服务能力。经过对2026年市场调研,我们推荐以下标准作为筛选依据:企业需具…

FSMN-VAD中文语音检测专项优化,更准更快

FSMN-VAD中文语音检测专项优化&#xff0c;更准更快 你有没有遇到过这样的情况&#xff1a;会议录音转文字时&#xff0c;开头3秒静音被当成有效语音切进去&#xff0c;结果ASR模型把“呃…啊…”识别成乱码&#xff1b;或者客服热线长音频里&#xff0c;客户说了15分钟&#…

UDS协议中NRC码的典型应用场景实战案例

以下是对您提供的博文内容进行 深度润色与专业重构后的技术文章 。全文已彻底去除AI生成痕迹,语言更贴近一线嵌入式诊断工程师的实战口吻,结构上打破传统“引言-原理-案例-总结”的模板化节奏,以真实开发场景为牵引,层层递进展开;关键概念加粗强调,代码与逻辑解释深度融…

React Native状态管理:一文说清核心要点

以下是对您提供的博文《React Native状态管理:核心原理与工程实践深度解析》的 全面润色与重构版本 。本次优化严格遵循您的全部要求: ✅ 彻底去除AI痕迹 :语言自然、口语化但不失专业,像一位有五年以上RN实战经验的高级前端工程师在技术分享会上娓娓道来; ✅ 打破…

YOLOv9镜像内含哪些依赖?numpy到seaborn全都有

YOLOv9镜像内含哪些依赖&#xff1f;numpy到seaborn全都有 你有没有遇到过这样的情况&#xff1a;刚下载好YOLOv9代码&#xff0c;准备跑通第一个检测任务&#xff0c;结果卡在ModuleNotFoundError: No module named torch上&#xff1f;或者好不容易装完PyTorch&#xff0c;又…

2026年可靠的拉压力传感器/高精度传感器厂家推荐及选择指南

在工业自动化与精密测量领域,选择优质的拉压力传感器/高精度传感器供应商至关重要。本文基于产品性能、技术创新能力、行业应用经验及客户反馈等核心维度,筛选出5家值得关注的厂家。其中,南京翰可泰科自动化设备凭借…

2026年山东真空波纹管专业厂家排行榜,前十名有谁?

本榜单依托全维度市场调研与真实行业口碑,深度筛选出五家真空波纹管领域标杆企业,为工业采购者提供客观依据,助力精准匹配适配的源头供应商伙伴。 TOP1 推荐:安徽恒达管业有限责任公司 推荐指数:★★★★★ | 口碑…

2026年进口岩板品牌商排名,三星岩(TRE STELLE)实力不容小觑

2026年家居与商业空间市场持续升级,进口岩板凭借其耐用性、美学表现力与空间适配性,成为设计师与业主打造品质空间的核心材料选择。无论是追求纯正意式工艺的进口岩板优质服务商品牌、支持个性化需求的来样定制生产企…

河北廊坊资质齐全的短视频代运营专业公司推荐,廊坊哇噻科技上榜原因解析

本榜单依托全维度市场调研与真实行业口碑,深度筛选出五家资质齐全的短视频代运营专业公司,为企业选型提供客观依据,助力精准匹配适配的服务伙伴。TOP1 推荐:廊坊哇噻科技有限公司 推荐指数:★★★★★ | 口碑评分…