【README】
本文po出 mq的发布订阅模式,及代码示例;
【1】intro
1) 角色: 有4个角色, 包括 生产者,消费者, 交换机 exchange(X), 队列;
2)交换机: 一方面,接收生产者的消息,另一方面,处理消息,如发送给队列,或丢弃;这取决于 exchange类型;
3)exchange类型有如下3种:
fanout 广播, 把消费转发给所有 绑定到该交换机的所有队列;
direct 定向, 把消息转发给符合 指定 routing key 路由键的队列;
topic 通配符, 把消息交给 routing pattern(路由模式)的队列;
4)exchange 交换机, 只负责转发消息, 不具备存储消息的能力; 因此如果没有任何队列与 exchange 绑定, 或者没有符合规则的队列, 那么消息会丢失;
5)发布订阅模式:
5.1-每个消费者监听自己的队列;
5.2-生产者把消息发送给 broker, 由交换机把消息转发到绑定此交换机的所有队列;
6)交换机需要与队列绑定, 绑定之后,一个消息可以被多个消费者收到;
【2】代码(生产者1个,交换机exchange1个,但对应到2个队列,即消息有2个replication)
生产者
/*** 发布订阅模式生产者* 本文发布订阅模式使用的交换机类型为广播 fanout * @author tang rong */
public class PSProduer {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";/** 队列名称1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接Channel channel = conn.createChannel(); // 创建频道/*** 声明交换机* 参数1-交换机名称 * 参数2-交换机类型(fanout, topic, direct, headers)*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);/*** 创建队列* @param1 队列名称* @param2 是否持久化队列* @param3 是否独占本次连接 * @param4 是否在不使用的时候自动删除队列 * @param5 队列其他参数 */ channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 队列绑定交换机 */channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");/*** 发送消息 */long temp = 1; for (int i = 0; i < 1000; i++) { String msg = "发布订阅模式消息,序号=" + (temp+i) + "时间=" + MyDateUtil.getNow();/*** 参数1 交换机名称,没有指定则使用默认交换机 Default change * 参数2 路由key,简单模式可以传递队列名称 * 参数3 消息其他属性 * 参数4 消息内容 */channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes("UTF-8")); System.out.println("生产者发送消息" + msg); } System.out.println("=== 生产者消息发送完成");/* 关闭资源 */channel.close();conn.close(); }
}
消费者1
/*** 发布订阅模式消费者1* @author tang rong */
public class PSConsumer1 {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接 Channel channel = conn.createChannel(); // 创建队列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机/*** 创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);/*** 队列绑定交换机*/channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");/* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息* 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(FANOUT_QUEUE_1, true, consumer); }}
消费者2
/*** 发布订阅模式消费者* @author tang rong */
public class PSConsumer2 {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接 Channel channel = conn.createChannel(); // 创建队列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机/*** 创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 队列绑定交换机*/channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");/* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者2 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者2 end ===\n"); } };/*** 监听消息* 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(FANOUT_QUEUE_2, true, consumer); }}
【3】小结
1)发布订阅模式与工作模式的区别;
区别1)工作队列模式不需要定义交换机, 发布订阅模式需要;
区别2)工作队列模式的生产者向队列发送消息(底层使用默认交换机), 发布订阅模式的生产者向交换机发送消息;
区别3)工作队列模式的队列不需要与交换机绑定(底层与默认交换机绑定), 发布订阅模式中的队列需要与交换机绑定;
2)默认交换机
AMQP default