文章目录
- 1、mq(消息队列)概述
- 2、RabbitMQ环境搭建
- 3、java基于AMQP协议操作RabbitMQ
- 4、基于Spring AMQP操作RabbitMQ
- 5、代码中创建队列与交换机
- ①、配置类创建
- ②、基于@RabbitListener注解创建
- 6、RabbitMQ详解
- ①、work模型
- ②、交换机
- 1、Fanout(广播)交换机
- 2、Direct(定向)交换机
- 3、Topic(话题)交换机
- 7、消息转换器
- 总结
1、mq(消息队列)概述
MQ 是 Message Queue(消息队列)的简称,是一种用于异步通信和解耦的中间件技术。它的核心功能是通过队列结构存储和传输消息,允许生产者(发送消息的一方)和消费者(接收消息的一方)在不同时间进行数据交换,而无需直接连接
MQ作用:
①、异步调用:
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息代理:管理、管理存储、转发消息的中间件
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
优点:
- 异步调用,无需等待,性能好
- 故障隔离,下游服务故障不影响上游业务
缺点:
- 不能立即得到调用结果,时效性差
- 确定下游业务执行是否成功
- 业务安全依赖于Broker的可靠性
②、削峰/降流
在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:
③、降低系统耦合性
对于发送方来说,只需要将自己的消息发送到消息队列就ok了,而对于接收方来说,只需要接收消息即可,而无需关注谁发的,极大降低了发送接收方的耦合性。
④、顺序保证
消息队列保证数据按照特定的顺序被处理,适用于那些对数据顺序有严格要求的场景。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持顺序消息。
⑤、延时/定时处理
消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持定时/延时消息。
⑥、即时通讯
MQTT(消息队列遥测传输协议)是一种轻量级的通讯协议,采用发布/订阅模式,非常适合于物联网(IoT)等需要在低带宽、高延迟或不可靠网络环境下工作的应用。它支持即时消息传递,即使在网络条件较差的情况下也能保持通信的稳定性。RabbitMQ 内置了 MQTT 插件用于实现 MQTT 功能(默认不启用,需要手动开启)
四大mq产品对比:
2、RabbitMQ环境搭建
我们在docker环境下通过docker pull来快速获取RabbitMQ的镜像。
拉取:
docker pull rabbitmq:3.8-management
运行:
docker run -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq -p 15672:15672 -p 5672:5672 -d rabbitmq:3.8-management
其中,15672端口是图形化界面的端口,而5672是发送接收消息的端口。
登录之后,界面如下:
-
publisher: 消息发送者
-
consumer: 消息的消费者
-
queue: 队列,存储消息
-
exchange: 交换机,负责路由消息
-
connectors: 生产者或者消费者和消息队列建立连接的情况
-
channels: 消息通道,生产者消费者进行通信需要建立一个通道。
-
Admin: 管理虚拟主机,添加和查看已有的用户
RabbitMQ架构:
3、java基于AMQP协议操作RabbitMQ
AMQP(Advanced Message Queuing Protocol),是用于在应用程序之间传递消息的开放标准协议。协议以语言和平台无关,更符合互联网的要求。
官网文档教程:
java中操作RabbitMQ
新建java的maven项目,添加依赖:
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.17</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.17</version></dependency></dependencies>
这几个依赖必须导入。
发送方(Send.java) :
public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.138.133");factory.setPort(5672);factory.setUsername("root");factory.setPassword("123456");// 建立连接,创建管道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");} catch (Exception e) {throw new RuntimeException(e);}}}
接收方(Recv.java) :
public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.138.133");factory.setPort(5672);factory.setUsername("root");factory.setPassword("123456");// 建立连接,创建管道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
host和用户、密码等依据自己情况修改
先运行Recv,此时接收方会处于等待接收的状态,随后Send发送消息,接收成功。
运行结果:
[x] Sent 'Hello RabbitMQ!'[x] Received 'Hello RabbitMQ!'
4、基于Spring AMQP操作RabbitMQ
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。因此RabbitMQ中我们可以通过Spring进行操作。
<dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
消息发送者
@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSend() {// 队列名称String queueName = "hello";// 消息String message = "hello RabbitMQ!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
消息消费者(其实就是通过监听队列来获取信息):
@RabbitListener()中的queues参数里面的值就是队列的名字。
@Slf4j
@Component
public class Consumer {@RabbitListener(queues = {"hello"}) //这里参数是队列的名字,填写的时候按自己情况来。public void testConsumer(String msg) {log.info("消费者收到消息:" + msg);}}
如果想要在代码中创建队列的话,可以在config类中定义:
@Configuration
public class MQConfig {//代表创建一个叫queue的队列@Beanpublic Queue queue() {return new Queue("queue");}
}
5、代码中创建队列与交换机
①、配置类创建
@Configuration
public class MQConfig {// 声明交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("test.fanout");}// 声明队列1@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}// 声明队列2@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}// 绑定队列1与交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 绑定队列2与交换机@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
如上就是通过在一个config类中定义bean实现注入。
②、基于@RabbitListener注解创建
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
该注解用配置类等价于:
@Configuration
public class MQConfig {// 声明交换机@Beanpublic DirectExchange fanoutExchange() {return new DirectExchange("test.direct");}// 声明队列1@Beanpublic Queue fanoutQueue1() {return new Queue("direct.queue1");}// 绑定队列1与交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, DirectExchange directExchange) {return BindingBuilder.bind(fanoutQueue1).to(directExchange).with("red");}@Beanpublic Binding bindingQueue2(Queue fanoutQueue1, DirectExchange directExchange) {return BindingBuilder.bind(fanoutQueue1).to(directExchange).with("blue");}
}
可以说极大简化了开发。
6、RabbitMQ详解
①、work模型
work模型就是多个消费者绑定到一个队列,加快消息处理速度,通过设置prefech来控制消费者领取消息的数量。
而prefetch默认值为250,这个可以自己来控制调整。
发送方代码:
@Testpublic void testWorkQueue() throws Exception {String queueName = "work.queue";for (int i = 0; i < 50; i++) {String message = "hello, worker, message_" + i;rabbitTemplate.convertAndSend(queueName, message);Thread.sleep(50);}}
接收方代码:
@RabbitListener(queues = {"work.queue"}) //这里参数是队列的名字,填写的时候按自己情况来。public void workConsumer(String msg) throws InterruptedException {System.out.println("work.queue队列1收到消息:" + msg);Thread.sleep(20);}@RabbitListener(queues = {"work.queue"}) //这里参数是队列的名字,填写的时候按自己情况来。public void workConsumer2(String msg) throws InterruptedException {System.err.println("work.queue队列2收到消息:" + msg);Thread.sleep(200);}
最后发现两个接收方无论处理快还是慢,最后每个都只能处理25个消息,而我们一共发了50条消息,这样会导致处理效率很低。
解决方法,在properties中加一个配置:
spring:listener:simple:prefetch: 1
运行结果:
这样处理的话,效率就高多了,基本就是发一条消息,谁有空谁来处理即可。
②、交换机
1、Fanout(广播)交换机
Fanout交换机会将接收到的消息广播到每一个与其绑定的queue,也叫广播模式。
这里我们在队列中声名两个queue,分别叫fanout.queue1和fanout.queue2。
交换机使用amq.fanout,为fanout类型
记得先将交换机与队列进行绑定:
接收方代码:
@RabbitListener(queues = {"fanout.queue1"}) //这里参数是队列的名字,填写的时候按自己情况来。public void fanoutConsumer(String msg) throws InterruptedException {System.out.println("fanout.queue队列1收到消息:" + msg);}@RabbitListener(queues = {"fanout.queue2"}) //这里参数是队列的名字,填写的时候按自己情况来。public void fanoutConsumer2(String msg) throws InterruptedException {System.err.println("fanout.queue队列2收到消息:" + msg);}
发送方代码:
@Testpublic void testFanoutQueue() throws Exception {String exchange = "amq.fanout";String message = "hello, everyone";rabbitTemplate.convertAndSend(exchange, "", message);}
运行结果:
fanout.queue队列2收到消息:hello, everyone
fanout.queue队列1收到消息:hello, everyone
2、Direct(定向)交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
每一个Queue都与Exchange设置一个BindingKey,发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
当交换机中的key对应的值和queue中的bingdingKey值相同时,消息就发送到对应的消费者手中,同时不同queue的bingdingKey值可以是相同的,同一个bindingKey可以有多个值。
创建两个direct.queue:
绑定到amq.direct上:
接收消息代码:
@RabbitListener(queues = {"direct.queue1"}) //这里参数是队列的名字,填写的时候按自己情况来。public void DirectConsumer(String msg) throws InterruptedException {System.out.println("fanout.queue队列1收到消息:" + msg);}@RabbitListener(queues = {"direct.queue2"}) //这里参数是队列的名字,填写的时候按自己情况来。public void DirectConsumer2(String msg) throws InterruptedException {System.err.println("fanout.queue队列2收到消息:" + msg);}
发送消息代码:
@Testpublic void testDirectQueue() throws Exception {String exchange = "amq.direct";String message1 = "hello, Red";String message2 = "hello, Blue";String message3 = "hello, Yellow";rabbitTemplate.convertAndSend(exchange, "red", message1);rabbitTemplate.convertAndSend(exchange, "blue", message2);rabbitTemplate.convertAndSend(exchange, "yellow", message3);}
运行结果:
direct.queue队列1收到消息:hello, Red
direct.queue队列1收到消息:hello, Blue
direct.queue队列2收到消息:hello, Red
direct.queue队列2收到消息:hello, Yellow
3、Topic(话题)交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以“.”分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#: 代指0个或多个单词
*: 代指一个单词
新建两个队列:
使用amq.topic绑定:
接收方代码:
@RabbitListener(queues = {"topic.queue1"}) //这里参数是队列的名字,填写的时候按自己情况来。public void TopicConsumer(String msg) throws InterruptedException {System.out.println("topic.queue队列1收到消息:" + msg);}@RabbitListener(queues = {"topic.queue2"}) //这里参数是队列的名字,填写的时候按自己情况来。public void TopicConsumer2(String msg) throws InterruptedException {System.err.println("topic.queue队列2收到消息:" + msg);}
发送方代码:
@Testpublic void testTopicQueue() throws Exception {String exchange = "amq.topic";String message = "Japan's news";String message2 = "China's news";String message3 = "China's weather";String message4 = "Japan's weather";rabbitTemplate.convertAndSend(exchange, "Japan.news", message);rabbitTemplate.convertAndSend(exchange, "China.news", message2);rabbitTemplate.convertAndSend(exchange, "China.weather", message3);rabbitTemplate.convertAndSend(exchange, "Japan.weather", message4);}
运行结果:
topic.queue队列2收到消息:Japan's news
topic.queue队列2收到消息:China's news
topic.queue队列1收到消息:China's news
topic.queue队列1收到消息:China's weather
7、消息转换器
发送java对象代码:
@Testpublic void testSendObject() {Map<String, Object> msg = new HashMap<>();msg.put("name", "jack");msg.put("age", 18);rabbitTemplate.convertAndSend("object.queue", msg);}
接收消息为乱码:
解决方法:
我们引入消息转换器,使用json来处理消息。
引入对应依赖:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
接收方和消费方都配置消息转换器:
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;@Configuration
public class messageConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
总结
重要点:
- 搭建环境,熟悉RabbitMQ面板与配置
- 使用SpringBoot集成开发配置
- 重点学会@RabbitListener的使用
- 熟悉常见交换机和队列
- 配置使用消息转换器