【README】
本文介绍 通配符模式,及代码示例
【1】intro to rabbitmq通配符模式
0)通配符模式-交换机类型为 Topic;
1)与路由模式相比,相同点是 两者都可以通过 routingkey 把消息转发到不同的队列;
不同点是通配符模式-topic类型的exchange可以让队列在绑定routing key的时候使用通配符;
2)通配符模式的routingkey 通常使用多个单词并用点号连接,如 item.insert ;
3)通配符规则:
# 匹配一个或多个词;
* 匹配不多不少一个词;
荔枝:
item.# 能够匹配 item.insert.abc 或 item.insert ; (可以多层)
item.* 能够匹配 item.insert ; (只能一层)
refers2 https://www.rabbitmq.com/tutorials/tutorial-five-java.html
4)新建队列
5)把队列绑定到交换机
6)生产者发送消息到队列,路由key 分别是 item.insert , item.update, item.delete ; 如下:
【2】代码
生产者
/*** 通配符模式-交换机类型为TOPIC*/
public class WildProducer {/* 交换机名称 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*队列名称1*/ static final String TOPIC_QUEUE_1 = "topic_queue_1";/*队列名称2*/static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {/*获取连接*/Connection conn = RBConnectionUtil.getConn();// 创建频道 Channel channel = conn.createChannel();/*** 声明交换机* 参数1-交换机名称 * 参数2-交换机类型(fanout, topic, direct, headers)*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); /*** routingkey-路由键 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/* 发送消息-insert */ /*** 参数1 交换机名称 如果没有指定则使用默认 default exchange * 参数2 routingkey-路由key, 简单模式可以传递队列名称 * 参数3 消息其他属性* 参数4 消息内容 */String insertMsg = "我是消息,通配符模式,routingkey=" + itemInsertRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemInsertRoutingKey, null, insertMsg.getBytes());System.out.println("已发送消息=" + insertMsg); String updMsg = "我是消息,通配符模式,routingkey=" + itemUpdateRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemUpdateRoutingKey, null, updMsg.getBytes());System.out.println("已发送消息=" + updMsg);String deleteMsg = "我是消息,通配符模式,routingkey=" + itemDeleteRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemDeleteRoutingKey, null, deleteMsg.getBytes());System.out.println("已发送消息=" + deleteMsg);/* 关闭连接和信道 */ channel.close();conn.close(); }
}
消费者1 topic_queue_1
/*** 通配符模式消费者-routingkey */
public class RouteConsumerWild1 {/* 交换机名称 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*队列名称1*/ static final String TOPIC_QUEUE_1 = "topic_queue_1";public static void main(String[] args) throws Exception {/*创建连接 */Connection conn = RBConnectionUtil.getConn();/*创建队列*/Channel channel = conn.createChannel(); /*声明交换机*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由键 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/*** 声明/创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */
// channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null); // ui界面可以创建队列 /*** 队列绑定交换机* 参数1 队列名称* 参数2 交换机* 参数3 routingkey-路由键 */
// channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.#"); // ui界面可以把队列绑定到交换机 /* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息 * 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(TOPIC_QUEUE_1, true, consumer); }
}
消费者2 topic_queue_2
/*** 通配符模式消费者-routingkey */
public class RouteConsumerWild2 {/* 交换机名称 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*队列名称1*/ static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {/*创建连接 */Connection conn = RBConnectionUtil.getConn();/*创建队列*/Channel channel = conn.createChannel(); /*声明交换机*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由键 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/*** 声明/创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */
// channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null); // ui界面可以创建队列 /*** 队列绑定交换机* 参数1 队列名称 * 参数2 交换机* 参数3 routingkey-路由键 */
// channel.queueBind(TOPIC_QUEUE_2 TOPIC_EXCHANGE, "*.delete"); // ui界面可以把队列绑定到交换机 /* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/** * @param consumerTag 消费者标签,在 channel.basicConsume 可以指定 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息 * 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(TOPIC_QUEUE_2, true, consumer); }
}
【3】 rabbitmq 模式总结
8.1)模式1 简单模式 helloworld
一个生产者,一个消费者,不需要设置交换机,使用默认交换机;
8.2)模式2 工作队列模式 work queue
一个生产者,多个消费者(竞争关系),不需要设置交换机(使用默认交换机);
8.3)发布订阅模式 publish/subscribe
需要设置类型为 fanout-广播的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列;
8.4)路由模式 routing
需要设置类型为 direct的交换机, 交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key 将消息发送到对应队列;
8.5)通配符模式 topic
需要设置类型为 topic的交换机, 交换机和队列进行绑定, 并且指定通配符方式的routing key, 当发送消息到交换机后,交换机会根据 routing key将消息发送到对应的队列;