目录
一、Simple(简单模式)
1.1 概念
1.2 代码实现
消费者
运行结果
二、Work Queue(工作队列)
2.1 概念
1.2 代码实现
生产者
消费者
运行结果
三、Publish/Subscribe(发布/订阅模式)
3.1 概念
3.2 代码实现
生产者
消费者
运行结果
四、Routing(路由模式)
4.1 概念
4.2 代码实现
Constants类
生产者
消费者
运行结果
五、Topics(通配符模式)
5.1 概念
5.2 代码实现
生产者
消费者
运行结果
六、RPC(RPC通信)了解
6.1 概念
6.2 代码实现
客户端代码编写
编写服务器代码
运行结果:
七、Publish Confirms(发布确认模式)
publishing Messages Individually(单独确认)
Publishing Messages in Batches(批量确认)
Handling Publisher Confirms Asynchronously(异步确认)
一、Simple(简单模式)
1.1 概念
P: 生产者,也就是要发送消息的程序
C: 消费者,消息的接受者
Queue:消息队列,图中Queue类似提个邮箱,可以缓存消息;生产者向其中投递消息,消费者从中取出消息。
特点:一个生产者P,一个消费者C,消息只能被消费一次,也称为点对点(Point-to-Point)模式
适用场景:消息只能被单个消费者处理
1.2 代码实现
消费者
package rabbitmq.simple;import com.rabbitmq.client.*;
import java.io.IOException;public class ConsumerDemo {public static void main(String[] args) throws Exception {//1.创建连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost("8.136.108.248");// 设置RabbitMQ服务器的端口号connectionFactory.setPort(5672);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername("pinkboy");// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword("123456");// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost("/");// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();/***3.声明一个队列** @param channel RabbitMQ的通道,用于执行队列操作** 此处使用了queueDeclare方法来声明一个名为"hello"的队列该方法的参数分别表示:* 1. 队列名称("hello"):指定要声明的队列的名称* 2. true:表示该队列是持久化的,意味着即使RabbitMQ服务重启,队列也会被保留* 3. false:表示该队列不是排他的,意味着该队列可以被所有通道共享* 4. false:表示该队列不会在使用后自动删除,需要手动删除* 5. null:表示不设置额外的参数** 选择这些参数值的原因可能是希望创建一个持久化的、共享的队列,以便在不同的时间点和不同的消费者之间传递消息*/channel.queueDeclare("hello", true, false, false, null);// 4.开始从名为"hello"的队列中消费消息channel.basicConsume("hello", true, new DefaultConsumer(channel) {/*** 处理接收到的消息** @param consumerTag 消费者标签,用于标识消费者* @param envelope 包含消息路由信息的信封* @param properties 消息的属性,如内容类型、内容编码等* @param body 消息的主体内容,以字节数组形式表示* @throws IOException 如果处理消息时发生I/O错误*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});Thread.sleep(2000);//5.关闭资源channel.close();connection.close();}
}
运行结果
启动生产者代码:
观察消息队列
启动消费者代码:
观察消息队列
二、Work Queue(工作队列)
2.1 概念
一个生产者P,多个消费者C1,C2 在多个消息的情况下,WorkQueue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息
特点:消息不会重复,分配各不同的消费者
适用场景:集群环境中做异步处理
举个例子:12306短息通知服务,订票成功后,订单消息会发送到RabbitMQ,短信服务从RabbitMQ中获取订单信息,并发送通知信息(在短信服务之间进行任务分配)
1.2 代码实现
工作模式就是简单模式的增强版 和简单模式的区别就是 简单模式就一个消费者,工作模式支持多个消费者接收消息,消费者之间是竟争关系 每个消息只能被一个消费者接收
和简单模式代码差不多 为了展示多个消费者竞争的关系 生产者一次生产10条消息
常量类
package rabbitmq.constant;public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//工作队列模式public static final String WORK_QUEUE = "work_queue";}
生产者
package rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2、创建通道Channel channel = connection.createChannel();//3、声明交换机//4、声明队列/*** 声明一个队列** @param channel RabbitMQ的通道,用于执行队列操作** 此处使用了queueDeclare方法来声明一个名为"hello"的队列该方法的参数分别表示:* 1. 队列名称("hello"):指定要声明的队列的名称* 2. true:表示该队列是持久化的,意味着即使RabbitMQ服务重启,队列也会被保留* 3. false:表示该队列不是排他的,意味着该队列可以被所有通道共享* 4. false:表示该队列不会在使用后自动删除,需要手动删除* 5. null:表示不设置额外的参数** 选择这些参数值的原因可能是希望创建一个持久化的、共享的队列,以便在不同的时间点和不同的消费者之间传递消息*/channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//5、发送消息// 循环发送消息到 RabbitMQ 的 "hello" 队列中for (int i = 0; i < 10; i++) {// 构造消息内容String msg = "hello work queue ..." + i;/*** 参数1 表示交换机名称,因为使用默认交换机,所以为空字符串* 参数2 表示队列名称* 参数3 :消息的属性* 参数4:消息内容*/channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功!");//6、释放资源channel.close();connection.close();}
}
消费者
两个消费者的代码是一样的
消费者1
package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);channel.basicConsume(Constants.WORK_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");//5.关闭资源
// channel.close();
// connection.close();}
}
消费者2
package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);channel.basicConsume(Constants.WORK_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer2 启动成功!");//5.关闭资源
// channel.close();
// connection.close();}
}
运行结果
生产者
生产者消息队列
消费者
为了避免第一个启动的消费者会将10条消息消费掉 需要先启动两个消费者,再去启动生产者
消费者1
消费者2
观察消息队列
可以看到管理页面中有两个消费者被显示
三、Publish/Subscribe(发布/订阅模式)
3.1 概念
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者
适合场景:消息需要被多个消费者同时接收的场景.如:实时通知或者广播消息
3.2 代码实现
常量类
public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//发布订阅模式public static final String FANOUT_EXCHANGE = "fanout_exchange";public static final String FANOUT_QUEUE1 = "fanout_queue1";public static final String FANOUT_QUEUE2 = "fanout_queue2";
}
这个模式需要创建交换机,并绑定队列和交换机
//3、声明交换机 /*** 参数1:交换机名称* 参数2:交换机类型 Fanout类型 -> 广播机制* 参数3:是否持久化*/ channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
声明队列
//4、声明队列 /*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除* 参数5:其他参数*/ channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null); channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
绑定队列和交换机
//5、交换机和队列绑定 /*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则 如果交换机类型为fanout类型,routingKey设置为空字符串*/ channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, ""); channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
生产者
package rabbitmq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;public class Producer {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、打开信道Channel channel = connection.createChannel();//3、声明交换机/*** 参数1:交换机名称* 参数2:交换机类型 Fanout类型 -> 广播机制* 参数3:是否持久化*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//4、声明队列/*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除* 参数5:其他参数*/channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//5、交换机和队列绑定/*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则 如果交换机类型为fanout类型,routingKey设置为空字符串*/channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");//6、发布消息String msg = "hello fanout ...";channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());System.out.println("消息发送成功!");//7、释放资源channel.close();connection.close();}
}
消费者
消费者1
package rabbitmq.fanout;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);//4、消费消息channel.basicConsume(Constants.FANOUT_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
消费者2
package rabbitmq.fanout;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//4、消费消息channel.basicConsume(Constants.FANOUT_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer2 启动成功!");}
}
运行结果
启动生产者
观察消息队列
fanout_queue1 和 fanout_queue2 分别有了1条消息
Exchange中多了队列的绑定关系
启动两个消费者
观察消息队列
四、Routing(路由模式)
4.1 概念
路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由key
发布订阅模式是无条件的将所有消息分发给所有的消费者,路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列
适合场景:需要根据特定规则分发消息的场景
队列和交换机的绑定,不能是任意的绑定了,而是要制定了一个BindKey(RoutingKey的一种)消息的发送方在向Exchange发送消息时也需要指定消息的RoutingKey
Exchange也不再把消息交给每一个绑定的key,而是根据消息的RountingKey进行判断,只要队列的BindingKey和发送消息的RoutingKey完全一致,才会接收到消息
创建交换机,定义交换机类型为BuiltinExchangeType.DIRECT
4.2 代码实现
Constants类
public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";
}
生产者
package rabbitmq.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;public class Producer {public static void main(String[] args) throws Exception {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机/*** 参数1:交换机名称* 参数2:交换机类型* 参数3:是否持久化* 参数4:是否自动删除* 参数5:其他参数*/channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, "direct", true, false, null);//4. 声明队列/*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,队列不再使用时是否自动删除此队列,如果将此参数和参数2设置为true就可以实现临时队列(队列不用了就自动删除)* 参数5:其他参数*/channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5、交换机和队列绑定/*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则*/channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6、发布消息String msg_a = "hello direct my routingKey is a...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg_a.getBytes());String msg_b = "hello direct my routingKey is b...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());String msg_c = "hello direct my routingKey is c...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());System.out.println("消息发送成功!");//7、释放资源channel.close();connection.close();}
}
消费者
package rabbitmq.direct;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//4、消费消息channel.basicConsume(Constants.DIRECT_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
package rabbitmq.direct;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//4、消费消息channel.basicConsume(Constants.DIRECT_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
运行结果
启动生产者
观察消息队列界面
exchange下队列和Routing Key的绑定关系
启动消费者
观察消息队列界面
五、Topics(通配符模式)
5.1 概念
在路由模式上进行了升级,在routingKey的基础上,增加可通配符的功能,适之更加灵活
Topic和Routing的基本原同,即:生产者将消息发送给交换机,交换机根据RoutingKey将消息转发给RoutingKey匹配的队列,类似于正则表达式的方式来定义RoutingKey的模式.
适合场景:需要灵活匹配和过滤消息的场景
Topic和Routing模式的区别:
1、topic模式使用的交换机类型为topic(Rounting模式使用的交换机类型为direct)
2、topic类型的交换机在匹配规则上进行了扩展,BingingKey支持通配符匹配(direct类型的将换季路由规则是BingKey和RoutingKey完全匹配)
在topic类型的交换机在匹配规则上有些要求:
1.RoutingKey是一系列由点(.)分隔的单词,比如“stock.usd.nyse”,"nyse.vmw","quick.organge.rabbit"
2. BingdingKey和RountingKey,也是点(.)分隔的字符串
3. BingdingKey中可以存在两种特殊的字符串,用于模糊匹配
* 表示一个单词
# 表示0-N个单词
举个例子:
5.2 代码实现
Constants类
public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";}
生产者
创建交换机类型为BuiltinExchangeType.TOPIC
//3.声明交换机
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);
声明队列
//4.声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null); channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
绑定交换机和队列
//5.绑定交换机和队列 channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*"); channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b"); channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
package rabbitmq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);//4.声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5.绑定交换机和队列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6、发布消息String msg_a = "hello topic my routingKey is ae.a.f...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes());String msg_b = "hello topic my routingKey is ef.a.b...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());String msg_c = "hello topic my routingKey is c...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes());System.out.println("消息发送成功!");}
}
消费者
package rabbitmq.topic;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.basicConsume(Constants.TOPIC_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);channel.basicConsume(Constants.TOPIC_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
运行结果
生产者
观察消息队列界面
消费者
六、RPC(RPC通信)了解
6.1 概念
RPC(Remote Procedure Call) 即远程调用 它是一种通过网络从远程计算机上请求服务,而不是需要了解底层网络的技术,类似HTTP远程调用
RabbitMQ实现RPC通信的过程,大概率是通过两个队列实现一个可回调的过程
大概流程
1.客户端发消息到一个指定的队列,并在消息属性中设置replyTo字段,这个字段指定一个回调队列,服务端处理后,会把响应结果发送到这个队列
2.服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
3.客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应
客户端:
1 发送请求(携带replyTo,CorrelationID)
2 接收响应(校验correlationID)
服务端:
1 接受请求,进行响应
2 发送响应(按照客户端指定的replyTo,设置correlationID)
6.2 代码实现
客户端代码编写
1、声明两个队列 RPC_REQUEST_QUEUE和RPC_RESPONSE_QUEUE,声明本次请求的唯一标志correlationID
2、将RPC_RESPONSE_QUEUE和correlationID配置到要发送的消息队列中
3、使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
4、阻塞队列有消息后,主线程被唤醒,打印返回内容
Constants类
public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//rpc 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";}
客户端代码
package rabbitmq.rpc;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;/*** rpc 客户端* 1. 发送请求* 2. 等待响应*/
public class RpcClient {public static void main(String[] args) throws Exception { //1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、开启信道Channel channel = connection.createChannel();channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//3、发送请求String msg = "hello rpc...";//设置请求唯一标识//设置请求的相关属性// 生成一个唯一的关联ID,用于跟踪请求和响应String correlationID = UUID.randomUUID().toString();// 创建并配置AMQP基本属性,设置消息的关联ID和回复队列AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();// 发布消息到指定的请求队列,携带配置的属性和消息体channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, prop, msg.getBytes());//4、接收响应//使用阻塞队列,存储响应信息final ArrayBlockingQueue<String> response = new ArrayBlockingQueue<>(1);channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回调消息:" + respMsg);if (correlationID.equals(properties.getCorrelationId())) {//如果correlationID校验一致,则将响应信息保存在response中response.offer(respMsg);}}});/*** 阻塞等待响应*/String take = response.take();System.out.println("[RPC Client 响应结果]:" + take);}
}
编写服务器代码
package rabbitmq.rpc;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;/*** 1、接受请求* 2、发送响应*/
public class RpcServer {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、开启信道Channel channel = connection.createChannel();//3、接受请求channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body);System.out.println("接收到请求:" + request);String response = "针对request:" + request + ",响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}});}
}
运行结果:
启动客户端
观察消息队列界面
启动服务器端
客户端输出消息
观察消息队列界面
七、Publish Confirms(发布确认模式)
作为消息中间件,都会面临消息丢失的问题
消息丢失大概分为三种情况
1、生产者问题 因为应用程序故、障网络抖动等原因,生产者没有成功想broker发送消息
2、消息中间件自身问题,生产者成功发送给Broker 但是Broker没有把消息保存好,导致消息丢失
3、消费者问题,Broker发送到消费者,消费者在消费时,因没处理好,导致消费者失败的消息从队列中删除了
针对问题1 可以采用发确认(Publisher Cofirms)机制实现
生产者将信道设置成confirm(确认)模式,一但信道进入confirm模式,所有在该信道上面发布的消息都是会被指派一个唯一的ID(从1开始),一但消息被投递到所有匹配的队列之后,RabbitMq就会发送一个确认给生产者(包括消息的唯一ID)这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么消息确认会在将消息写入到磁盘之后发出,broker回传给生产者的确认消息中
deliveryTag包包含了消息的序号,此外broker也可以设置channel basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到处理
发送方确认机制最大的好处是他是异步的,生产者可以同时发布消息和等待信道返回确认消息
1、当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息
2、如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该nack命令
使用发送确认机制,必须要将信道设置成confirm(确认)模式
package rabbitmq.comfirms;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 200;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {// 发布确认publishingMessagesIndividually();// 批量发布确认publishingMessagesInBatchs();// 异步发布确认publishingMessagesAsynchronously();}private static void publishingMessagesAsynchronously() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE3, true, false, false, null);//4、监听confirm消息//集合中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();} else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();} else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理,比如重发}});//5.发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;long seqNO = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNO);}while (!confirmSeqNo.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));}}private static void publishingMessagesInBatchs() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE2, true, false, false, null);//4、发送消息long start = System.currentTimeMillis();int batchSize = 100;int outStandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE2, null, msg.getBytes());outStandingMessageCount++;if (outStandingMessageCount == batchSize) {//6、等待确认channel.waitForConfirms(5000);outStandingMessageCount = 0;}}if (outStandingMessageCount > 0) {channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE1, true, false, false, null);//4、发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE1, null, msg.getBytes());//5、等待确认channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException | InterruptedException e) {throw new RuntimeException(e);}}
}
publishing Messages Individually(单独确认)
Publishing Messages in Batches(批量确认)