RabbitMQ实战教程(1)

RabbitMQ

一、RabbitMQ介绍

1.1 现存问题
  • 服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用SpringBoot提供的@Async注解实现异步调用,但是这种方式无法确保请求一定回访问到服务B的接口。那如何保证服务A的请求信息一定能送达到服务B去完成一些业务操作呢?| 如何实现异步调用1642517531404.png
  • 海量请求:在我们在做一些秒杀业务时,可能会在某个时间点突然出现大量的并发请求,这可能已经远远超过服务器的并发瓶颈,这时我们需要做一些削峰的操作,也就是将大量的请求缓冲到一个队列中,然后慢慢的消费掉。如何提供一个可以存储千万级别请求的队列呢?1642517747632.png
  • 在微服务架构下,可能一个业务会出现同时调用多个其他服务的场景,而且这些服务之间一般会用到Feign的方式进行轻量级的通讯,如果存在一个业务,用户创建订单成功后,还需要去给用户添加积分、通知商家、通知物流系统、扣减商品库存,而在执行这个操作时,如果任意一个服务出现了问题,都会导致整体的下单业务失败,并且会导致给用户反馈的时间延长。这时就造成了服务之间存在一个较高的耦合性的问题。如何可以降低服务之间的耦合性呢?1642517948196.png
1.2 处理问题

RabbitMQ就可以解决上述的全部问题

  • 服务之间如何想实现可靠的异步调用,可以通过RabbitMQ的方式实现,服务A只需要保证可以把消息发送到RabbitMQ的队列中,服务B就一定会消费到队列中的消息只不过会存在一定的延时。| 异步访问1642518013295.png
  • 忽然的海量请求可以存储在RabbitMQ的队列中,然后由消费者慢慢消费掉,RabbitMQ的队列本身就可以存储上千万条消息 1642518109219.png
  • 在调用其他服务时,如果允许延迟效果的出现,可以将消息发送到RabbitMQ中,再由消费者慢慢消费| 服务解耦
    1642518233825.png
1.3 RabbitMQ介绍

百度百科:

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

首先RabbitMQ基于AMQP协议开发,所以很多基于AMQP协议的功能RabbitMQ都是支持的,比如SpringCloud中的消息总线bus

其次RabbitMQ是基于Erlang编写,这是也是RabbitMQ天生的优势,Erlang被称为面向并发编程的语言,并发能力极强,在众多的MQ中,RabbitMQ的延迟特别低,在微秒级别,所以一般的业务处理RabbitMQ比Kafka和RocketMQ更有优势。

最后RabbitMQ提供自带了图形化界面,操作方便,还自带了多种集群模式,可以保证RabbitMQ的高可用,并且SpringBoot默认就整合RabbitMQ,使用简单方便。

二、RabbitMQ安装


2.1 安装RabbitMQ

这里推荐搭建采用Docker的方式在Linux中安装RabbitMQ,如果对Docker不了解,推荐去学习一下Docker的应用,不然学习其他的知识时,安装的成本都特别高,这里我们就采用Docker的方式安装RabbitMQ。

直接使用docker-compose.yml文件即可安装RabbitMQ服务

version: '3.1'
services:rabbitmq:restart: alwaysimage: daocloud.io/library/rabbitmq:3.8.8volumes:- ./data/:/var/lib/rabbitmq/- ./log/:/var/log/rabbitmq/log/ports:- 15672:15672- 5672:5672

执行 docker-compose up -d运行

测试效果:curl localhost:5672

查看效果
image20220121005749217.png
2.2 开启图形化界面

默认情况下,当前镜像的图形化界面默认没有开启,需要进入到容器内部开启图形化管理界面

启动图形化界面插件
image20220121005619975.png
image20220121005624253.png

通过浏览器访问15672,查看图形化界面

查看登录页面
image20220121005852818.png

默认用户和密码均为:guest,查看首页

查看首页
image20220121005930123.png

三、RabbitMQ构架

RabbitMQ的架构可以查看官方地址:https://rabbitmq.com/tutorials/amqp-concepts.html

官方简单架构
image20220121010054992.png

可以看出RabbitMQ中主要分为三个角色:

  • Publisher:消息的发布者,将消息发布到RabbitMQ中的Exchange
  • RabbitMQ服务:Exchange接收Publisher的消息,并且根据Routes策略将消息转发到Queue中
  • Consumer:消息的消费者,监听Queue中的消息并进行消费

官方提供的架构图相对简洁,我们可以自己画一份相对完整一些的架构图:

RabbitMQ架构图
image20220121011000157.png

可以看出Publisher和Consumer都是单独和RabbitMQ服务中某一个Virtual Host建立Connection的客户端

后续通过Connection可以构建Channel通道,用来发布、接收消息

一个Virtual Host中可以有多个Exchange和Queue,Exchange可以同时绑定多个Queue

在基于架构图查看图形化界面,会更加清晰

图形化界面信息
image20220121011418076.png

四、RabbitMQ通讯方式


RabbitMQ提供了很多中通讯方式,依然可以去官方查看:https://rabbitmq.com/getstarted.html

七种通讯方式
image20220121011637076.png
4.1 RabbitMQ提供的通讯方式
  • Hello World!:为了入门操作!
  • Work queues:一个队列被多个消费者消费
  • Publish/Subscribe:手动创建Exchange(FANOUT)
  • Routing:手动创建Exchange(DIRECT)
  • Topics:手动创建Exchange(TOPIC)
  • RPC:RPC方式
  • Publisher Confirms:保证消息可靠性
4.2 构建Connection工具类
  • 导入依赖:amqp-client,junit

    <dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
    </dependencies>
    
  • 构建工具类:

    package com.mashibing.util;import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
    import java.util.concurrent.TimeoutException;/*** @author zjw* @description*/
    public class RabbitMQConnectionUtil {public static final String RABBITMQ_HOST = "192.168.11.32";public static final int RABBITMQ_PORT = 5672;public static final String RABBITMQ_USERNAME = "guest";public static final String RABBITMQ_PASSWORD = "guest";public static final String RABBITMQ_VIRTUAL_HOST = "/";/*** 构建RabbitMQ的连接对象* @return*/public static Connection getConnection() throws Exception {//1. 创建Connection工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置RabbitMQ的连接信息factory.setHost(RABBITMQ_HOST);factory.setPort(RABBITMQ_PORT);factory.setUsername(RABBITMQ_USERNAME);factory.setPassword(RABBITMQ_PASSWORD);factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);//3. 返回连接对象Connection connection = factory.newConnection();return connection;}}
    
4.3 Hello World
通讯方式
image.png

生产者:

package com.mashibing.helloworld;import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;/*** @author zjw* @description* @date 2022/1/24 22:54*/
public class Publisher {public static final String QUEUE_NAME = "hello";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息String message = "Hello World!";channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功!");}
}

消费者:

package com.mashibing.helloworld;import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;/*** @author zjw* @description* @date 2022/1/24 23:02*/
public class Consumer {@Testpublic void consume() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4. 监听消息DefaultConsumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);System.out.println("开始监听队列");System.in.read();}
}
4.4 Work Queues
WorkQueues需要学习的内容
image.png
  • 生产者:生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。

  • 消费者:让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息

    package com.mashibing.workqueues;import com.mashibing.util.RabbitMQConnectionUtil;
    import com.rabbitmq.client.*;
    import org.junit.Test;import java.io.IOException;/*** @author zjw* @description* @date 2022/1/25 19:52*/
    public class Consumer {@Testpublic void consume1() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.5 设置消息的流控channel.basicQos(3);//4. 监听消息DefaultConsumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1号-获取到消息:" + new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听队列");System.in.read();}@Testpublic void consume2() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);channel.basicQos(3);//4. 监听消息DefaultConsumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2号-获取到消息:" + new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听队列");System.in.read();}
    }
    
4.5 Publish/Subscribe
自定义一个交换机
image.png

生产者:自行构建Exchange并绑定指定队列(FANOUT类型)

package com.mashibing.pubsub;import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;/*** @author zjw* @description* @date 2022/1/25 20:08*/
public class Publisher {public static final String EXCHANGE_NAME = "pubsub";public static final String QUEUE_NAME1 = "pubsub-one";public static final String QUEUE_NAME2 = "pubsub-two";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1,false,false,false,null);channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");//6. 发消息到交换机channel.basicPublish(EXCHANGE_NAME,"45jk6h645jk",null,"publish/subscribe!".getBytes());System.out.println("消息成功发送!");}
}
4.6 Routing
DIRECT类型Exchange
image.png

生产者:在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到指定的Queue

package com.mashibing.routing;import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;/*** @author zjw* @description* @date 2022/1/25 20:20*/
public class Publisher {public static final String EXCHANGE_NAME = "routing";public static final String QUEUE_NAME1 = "routing-one";public static final String QUEUE_NAME2 = "routing-two";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1,false,false,false,null);channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");//6. 发消息到交换机channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林大狸子".getBytes());channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());System.out.println("消息成功发送!");}}
4.7 Topic
Topic模式
image.png

生产者:TOPIC类型可以编写带有特殊意义的routingKey的绑定方式

package com.mashibing.topics;import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;/*** @author zjw* @description* @date 2022/1/25 20:28*/
public class Publisher {public static final String EXCHANGE_NAME = "topic";public static final String QUEUE_NAME1 = "topic-one";public static final String QUEUE_NAME2 = "topic-two";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//4. 构建队列channel.queueDeclare(QUEUE_NAME1,false,false,false,null);channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列,// TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey// 其中有两个特殊字符:*(相当于占位符),#(相当通配符)channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");//6. 发消息到交换机channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子!".getBytes());channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes());channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog.dog.dog",null,"懒狗狗狗狗狗狗".getBytes());System.out.println("消息成功发送!");}
}
4.8 RPC(了解)

因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作

需要让Client发送消息时,携带两个属性:

  • replyTo告知Server将相应信息放到哪个队列
  • correlationId告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息
RPC方式
image.png

客户端:

package com.mashibing.rpc;import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;
import java.util.UUID;/*** @author zjw* @description* @date 2022/2/8 20:03*/
public class Publisher {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 发布消息String message = "Hello RPC!";String uuid = UUID.randomUUID().toString();AMQP.BasicProperties props = new AMQP.BasicProperties().builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();channel.basicPublish("",QUEUE_PUBLISHER,props,message.getBytes());channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String id = properties.getCorrelationId();if(id != null && id.equalsIgnoreCase(uuid)){System.out.println("接收到服务端的响应:" + new String(body,"UTF-8"));}channel.basicAck(envelope.getDeliveryTag(),false);}});System.out.println("消息发送成功!");System.in.read();}}

服务端:

package com.mashibing.rpc;import com.mashibing.helloworld.Publisher;
import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;/*** @author zjw* @description* @date 2022/1/24 23:02*/
public class Consumer {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void consume() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 监听消息DefaultConsumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));String resp = "获取到了client发出的请求,这里是响应的信息";String respQueueName = properties.getReplyTo();String uuid = properties.getCorrelationId();AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(uuid).build();channel.basicPublish("",respQueueName,props,resp.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE_PUBLISHER,false,callback);System.out.println("开始监听队列");System.in.read();}
}

五、SpringBoot操作RabbitMQ


5.1 SpringBoot声明信息
  • 创建项目

  • 导入依赖

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 配置RabbitMQ信息

    spring:rabbitmq:host: 192.168.11.32port: 5672username: guestpassword: guestvirtual-host: /
  • 声明交换机&队列

    package com.mashibing.rabbitmqboot.config;import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;/*** @author zjw* @description* @date 2022/2/8 20:25*/
    @Configuration
    public class RabbitMQConfig {public static final String EXCHANGE = "boot-exchange";public static final String QUEUE = "boot-queue";public static final String ROUTING_KEY = "*.black.*";@Beanpublic Exchange bootExchange(){// channel.DeclareExchangereturn ExchangeBuilder.topicExchange(EXCHANGE).build();}@Beanpublic Queue bootQueue(){return QueueBuilder.durable(QUEUE).build();}@Beanpublic Binding bootBinding(Exchange bootExchange,Queue bootQueue){return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();}
    }
    
5.2 生产者操作
package com.mashibing.rabbitmqboot;import com.mashibing.rabbitmqboot.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;/*** @author zjw* @description* @date 2022/2/8 21:05*/
@SpringBootTest
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void publish(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");}@Testpublic void publishWithProps(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "messageWithProps", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setCorrelationId("123");return message;}});System.out.println("消息发送成功");}
}
5.3 消费者操作
package com.mashibing.rabbitmqboot;import com.mashibing.rabbitmqboot.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author zjw* @description* @date 2022/2/8 21:11*/
@Component
public class ConsumeListener {@RabbitListener(queues = RabbitMQConfig.QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("队列的消息为:" + msg);String correlationId = message.getMessageProperties().getCorrelationId();System.out.println("唯一标识为:" + correlationId);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

六、RabbitMQ保证消息可靠性


6.1 保证消息一定送达到Exchange

Confirm机制

可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果

//4. 开启confirms
channel.confirmSelect();//5. 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息成功的发送到Exchange!");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");}
});
6.2 保证消息可以路由到Queue

Return机制

为了保证Exchange上的消息一定可以送达到Queue

//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");}
});
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
6.3 保证Queue可以持久化消息

DeliveryMode设置消息持久化

DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。

//7. 设置消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();//7. 发布消息
channel.basicPublish("","confirms",true,props,message.getBytes());
6.4 保证消费者可以正常消费消息

详情看WorkQueue模式

6.5 SpringBoot实现上述操作
6.5.1 Confirm
  • 编写配置文件开启Confirm机制

    spring:rabbitmq:publisher-confirm-type: correlated  # 新版本publisher-confirms: true  # 老版本 
    
  • 在发送消息时,配置RabbitTemplate

    @Test
    public void publishWithConfirms() throws IOException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到交换机!!");}else{System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!");}}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();
    }
    
6.5.2 Return
  • 编写配置文件开启Return机制

    spring:rabbitmq:publisher-returns: true # 开启Return机制
    
  • 在发送消息时,配置RabbitTemplate

    @Test
    public void publishWithReturn() throws IOException {// 新版本用 setReturnsCallback ,老版本用setReturnCallbackrabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {String msg = new String(returned.getMessage().getBody());System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();
    }
    
6.5.3 消息持久化
@Test
public void publishWithBasicProperties() throws IOException {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息的持久化!message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});System.out.println("消息发送成功");
}

七、RabbitMQ死信队列&延迟交换机

7.1 什么是死信
死信&死信队列
1644476424544.png

死信队列的应用:

  • 基于死信队列在队列消息已满的情况下,消息也不会丢失
  • 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间
7.2 实现死信队列
7.2.1 准备Exchange&Queue
                                             package com.mashibing.rabbitmqboot.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author zjw* @description* @date 2022/2/10 15:04*/
@Configuration
public class DeadLetterConfig {public static final String NORMAL_EXCHANGE = "normal-exchange";public static final String NORMAL_QUEUE = "normal-queue";public static final String NORMAL_ROUTING_KEY = "normal.#";public static final String DEAD_EXCHANGE = "dead-exchange";public static final String DEAD_QUEUE = "dead-queue";public static final String DEAD_ROUTING_KEY = "dead.#";@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}@Beanpublic Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();}@Beanpublic Binding normalBinding(Queue normalQueue,Exchange normalExchange){return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();}@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}@Beanpublic Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}@Beanpublic Binding deadBinding(Queue deadQueue,Exchange deadExchange){return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}}
7.2.2 实现效果
  • 基于消费者进行reject或者nack实现死信效果

    package com.mashibing.rabbitmqboot;import com.mashibing.rabbitmqboot.config.DeadLetterConfig;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;import java.io.IOException;/*** @author zjw* @description* @date 2022/2/10 15:17*/
    @Component
    public class DeadListener {@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到normal队列的消息:" + msg);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}
    }
    
  • 消息的生存时间

    • 给消息设置生存时间

      @Test
      public void publishExpire(){String msg = "dead letter expire";rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");return message;}});
      }
      
    • 给队列设置消息的生存时间

      @Bean
      public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").ttl(10000).build();
      }
      
  • 设置Queue中的消息最大长度

    @Bean
    public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").maxLength(1).build();
    }
    

    只要Queue中已经有一个消息,如果再次发送一个消息,这个消息会变为死信!

7.3 延迟交换机

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

  • 构建延迟交换机

    package com.mashibing.rabbitmqboot.config;import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;import java.util.HashMap;
    import java.util.Map;/*** @author zjw* @description*/
    @Configuration
    public class DelayedConfig {public static final String DELAYED_EXCHANGE = "delayed-exchange";public static final String DELAYED_QUEUE = "delayed-queue";public static final String DELAYED_ROUTING_KEY = "delayed.#";@Beanpublic Exchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","topic");Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);return exchange;}@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE).build();}@Beanpublic Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
    }
    
  • 发送消息

    package com.mashibing.rabbitmqboot;import com.mashibing.rabbitmqboot.config.DelayedConfig;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;/*** @author zjw* @description*/
    @SpringBootTest
    public class DelayedPublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void publish(){rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(30000);return message;}});}
    }
    

八、RabbitMQ的集群

RabbitMQ的镜像模式

RabbitMQ的集群
1644926959251.png

高可用

提升RabbitMQ的效率

搭建RabbitMQ集群

  • 准备两台虚拟机(克隆)

  • 准备RabbitMQ的yml文件

    rabbitmq1:

    version: '3.1'
    services:rabbitmq1:image: rabbitmq:3.8.5-management-alpinecontainer_name: rabbitmq1hostname: rabbitmq1extra_hosts:- "rabbitmq1:192.168.11.32"- "rabbitmq2:192.168.11.33"environment: - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFSports:- 5672:5672- 15672:15672- 4369:4369- 25672:25672
    

    rabbitmq2:

    version: '3.1'
    services:rabbitmq2:image: rabbitmq:3.8.5-management-alpinecontainer_name: rabbitmq2hostname: rabbitmq2extra_hosts:- "rabbitmq1:192.168.11.32"- "rabbitmq2:192.168.11.33"environment: - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFSports:- 5672:5672- 15672:15672- 4369:4369- 25672:25672
    

    准备完毕之后,启动两台RabbitMQ

    启动效果
    1644924815935.png
  • 让RabbitMQ服务实现join操作

    需要四个命令完成join操作

    让rabbitmq2 join rabbitmq1,需要进入到rabbitmq2的容器内部,去执行下述命令

    rabbitmqctl stop_app
    rabbitmqctl reset 
    rabbitmqctl join_cluster rabbit@rabbitmq1
    rabbitmqctl start_app
    

    执行成功后:

    执行成功后
    1644925359203.png
  • 设置镜像模式

    在指定的RabbitMQ服务中设置好镜像策略即可

    镜像模式
    1644925812667.png

九、RabbitMQ其他内容

9.1 Headers类型Exchange

headers就是一个基于key-value的方式,让Exchange和Queue绑定的到一起的一种规则

相比Topic形式,可以采用的类型更丰富。

headers绑定方式
1645705080465.png

具体实现方式

package com.mashibing.headers;import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;import java.util.HashMap;
import java.util.Map;/*** @author zjw* @description*/
public class Publisher {public static final String HEADER_EXCHANGE = "header_exchange";public static final String HEADER_QUEUE = "header_queue";@Testpublic void publish()throws  Exception{//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机和队列并基于header的方式绑定channel.exchangeDeclare(HEADER_EXCHANGE, BuiltinExchangeType.HEADERS);channel.queueDeclare(HEADER_QUEUE,true,false,false,null);Map<String,Object> args = new HashMap<>();// 多个header的key-value只要可以匹配上一个就可以// args.put("x-match","any");// 多个header的key-value要求全部匹配上!args.put("x-match","all");args.put("name","jack");args.put("age","23");channel.queueBind(HEADER_QUEUE,HEADER_EXCHANGE,"",args);//4. 发送消息String msg = "header测试消息!";Map<String, Object> headers = new HashMap<>();headers.put("name","jac");headers.put("age","2");AMQP.BasicProperties props = new AMQP.BasicProperties().builder().headers(headers).build();channel.basicPublish(HEADER_EXCHANGE,"",props,msg.getBytes());System.out.println("发送消息成功,header = " + headers);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/819646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ue不消耗輸入

1、介紹 我們都知道ue裏面使用輸入時&#xff0c;都是在PlayerController裏面進行獲取&#xff0c; 使用官方的操作映射&#xff0c;軸映射&#xff0c;以及目前最新的增强型輸入 但是我們發現了一個問題 那就是輸入會被消耗 就是儅我鼠標按在一個按鈕上時 你另一個地方接受…

Elastic安装后 postman对elasticsearch进行测试

一、创建索引和mapping //id 字段自增id //good_sn 商品SKU //good_name 商品名称 //good_introduction 商品简介 //good_descript 商品详情 PUT http://IP:9200/shop { "mappings":{ "good":{ "properties":{ …

LabVIEW光学探测器板级检测系统

LabVIEW光学探测器板级检测系统 特种车辆乘员舱的灭火抑爆系统广泛采用光学探测技术来探测火情。光学探测器作为系统的关键部件&#xff0c;其探测灵敏度、响应速度和准确性直接关系到整个系统的运行效率和安全性。然而&#xff0c;光学探测器在长期使用过程中可能会因为灰尘污…

牛客Linux高并发服务器开发学习第一天

Linux开发环境搭建 安装Xshell 7远程连接虚拟机的Ubuntu 安装Xftp 7可以传输文件(暂时还没使用) 安装VMware Tools可以直接从Windows系统向虚拟机Linux系统拖拽文件实现文件交互。 安装CScode可以远程连接Linux系统进行代码的编写。&#xff08;Windows系统与Linxu系统公钥…

ActiveMQ 任意文件上传漏洞复现

一、使用弱口令登陆 ​ 访问 http://ip:8161/admin/ 进入admin登陆页面&#xff0c;使用弱口令登陆&#xff0c;账号密码皆为 admin&#xff0c;登陆成功后&#xff0c;headers中会出现验证信息 ​ 如&#xff1a; Authorization: Basic YWRtaW46YWRtaW4 # 二、利用PUT协议上…

STL体系结构与各容器基本介绍

STL体系结构与各容器基本介绍 STL体系结构基本容器序列式关联式&#xff08;查找更快&#xff09;其他&#xff08;不常用&#xff09;使用分配器 STL体系结构 六大模块 容器算法迭代器适配器仿函数分配器 基本容器 序列式 array c11新标准array<类型&#xff0c;大小&…

Linux-管道

目录 无名管道关闭未使用的管道文件描述符 管道对应的内存大小与shell命令进行通信&#xff08;popen&#xff09;命名管道FIFO创建FIFO文件打开FIFO文件 无名管道 管道是最早出现的进程间通信的手段。 管道的作用是在有亲缘关系的进程之间传递消息。所谓有亲缘关系&#xff…

MySQL中的SQL高级语句[二]

使用语言 MySQL 使用工具 Navicat Premium 16 代码能力快速提升小方法&#xff0c;看完代码自己敲一遍&#xff0c;十分有用 拖动表名到查询文件中就可以直接把名字拉进来以下是使用脚本方法&#xff0c;也可以直接进行修改中括号&#xff0c;就代表可写可不写 有些地方的代…

IO——标准IO

1.1概念 标准IO&#xff1a;是在C库中定义的一组专门用于输入输出的函数。 1.2特点 &#xff08;1&#xff09;通过缓冲机制减少系统调用&#xff0c;提高效率 &#xff08;2&#xff09;围绕流操作&#xff0c;用FILE*描述 &#xff08;3&#xff09;标准IO默认打开三个流&a…

PCIe错误定义与分类

前言&#xff1a; PCI总线中定义两个边带信号&#xff08;PERR#和SERR#&#xff09;来处理总线错误。其中PERR#主要对应的是普通数据奇偶校检错误&#xff08;Parity Error&#xff09;&#xff0c;而SERR#主要对应的是系统错误&#xff08;System Error&#xff09;。具体如下…

数据结构复习指导之绪论(算法的概念以及效率的度量)

文章目录 绪论&#xff1a; 2.算法和算法评价 知识总览 2.1算法的基本概念 知识点回顾与重要考点 2.2算法效率的度量 知识总览 1.时间复杂度 2.空间复杂度 知识点回顾与重要考点 归纳总结 绪论&#xff1a; 2.算法和算法评价 知识总览 2.1算法的基本概念 算法( Al…

【现代C++】模块的使用

C20引入了模块的概念&#xff0c;这是一个重要的新特性&#xff0c;旨在替代传统的预处理器和头文件机制。模块旨在提高编译速度、改善代码封装性、减少名称污染&#xff0c;并提供更好的工具支持。下面详细介绍模块的关键概念和使用方法&#xff1a; 1. 模块的基本概念 模块…

openGauss学习笔记-263 openGauss性能调优-TPCC性能调优测试指导-前置软件安装

文章目录 openGauss学习笔记-263 openGauss性能调优-TPCC性能调优测试指导-前置软件安装263.1 安装jdk263.2 安装numactl263.3 安装ant263.4 安装htop工具 openGauss学习笔记-263 openGauss性能调优-TPCC性能调优测试指导-前置软件安装 本章节主要介绍openGauss数据库内核基于…

谷歌浏览器的开发者插件vue-devtools

在这里我留下一个git地址用来下载插件包&#xff0c;首先在自己喜欢的位置创建一个新的文件夹&#xff0c;起一个自己喜欢的文件夹名字&#xff0c;下载到包后&#xff0c;然后点进文件夹里下载依赖&#xff0c;npm install,下载后如下面这个样子 git clone https://gitee.com…

【投稿优惠-EI稳定检索】2024年人工智能、自然语言处理与机器学习国际会议(ICAINLPML 2024)

2024 International Conference on Artificial Intelligence, Natural Language Processing and Machine Learning (ICAINLPML 2024) 网址&#xff1a;www.icainlpml.com 邮箱: ainlpmlsub-conf.com ●会议简介 2024年人工智能、自然语言处理与机器学习国际会议将邀请全球人…

Jackson 2.x 系列【24】Spring Web 集成

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Jackson 版本 2.17.0 源码地址&#xff1a;https://gitee.com/pearl-organization/study-jaskson-demo 文章目录 1. 前言2. Spring Web3. Jackson2ObjectMapperBuilder4. Jackson2ObjectMapperFa…

比例控制器H5773282、H8135950、H3390627、H6079948

名称&#xff1a;BEUEC数字比例放大器、伺服比例控制器、伺服比例阀放大板&#xff0c;订货代号&#xff1a;H5773282、H8135950、H3390627、H6079948、H6108848、H6700353、H8851035、H1688388、H9549313、H3264103、H1182967&#xff0c;输入指令可选10V、4-20mA&#xff0c;…

Session缓存、Hibernate处理对象的状态了解

Session接口 Session接口是Hibernate向应用程序提供的操纵数据库的最主要的接口&#xff0c;它提供了基本的保存&#xff0c;更新&#xff0c;删除和查询的方法。 Session是有一个缓存, 又叫Hibernate的一级缓存 session缓存是由一系列的Java集合构成的。当一个对象被加入到…

[大模型]Atom-7B-Chat 接入langchain搭建知识库助手

Atom-7B-Chat 接入langchain搭建知识库助手 环境准备 在autodl平台中租一个3090等24G显存的显卡机器&#xff0c;如下图所示镜像选择PyTorch–>2.0.0–>3.8(ubuntu20.04)–>11.8 接下来打开刚刚租用服务器的JupyterLab&#xff0c;并且打开其中的终端开始环境配置…

Linux 网络测速

1.开发背景 网络测速&#xff0c;为了测试开发板的网络速度是否达标的通用测试方法 2.开发需求 搭建 iperf3 &#xff0c;在 ubuntu 下安装服务端&#xff0c;在板卡上安装客户端&#xff0c;服务端和客户端互发 3.开发环境 ubuntu20.04 嵌入式开发板&#xff08;debian 千…