rabbitmq-通配符模式

【README】

本文介绍 通配符模式,及代码示例

【1】intro to rabbitmq通配符模式

0)通配符模式-交换机类型为 Topic
1)与路由模式相比,相同点是 两者都可以通过 routingkey 把消息转发到不同的队列;
不同点是通配符模式-topic类型的exchange可以让队列在绑定routing key的时候使用通配符;
2)通配符模式的routingkey 通常使用多个单词并用点号连接,如 item.insert ;
3)通配符规则:
# 匹配一个或多个词;
* 匹配不多不少一个词;  
荔枝:
item.# 能够匹配 item.insert.abc 或 item.insert  ; (可以多层)
item.* 能够匹配 item.insert ;  (只能一层)

refers2 https://www.rabbitmq.com/tutorials/tutorial-five-java.html 

4)新建队列

5)把队列绑定到交换机 

6)生产者发送消息到队列,路由key 分别是 item.insert , item.update, item.delete ; 如下:

【2】代码

生产者

/*** 通配符模式-交换机类型为TOPIC*/
public class WildProducer {/* 交换机名称 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*队列名称1*/ static final String TOPIC_QUEUE_1 = "topic_queue_1";/*队列名称2*/static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {/*获取连接*/Connection conn = RBConnectionUtil.getConn();// 创建频道 Channel channel = conn.createChannel();/*** 声明交换机* 参数1-交换机名称  * 参数2-交换机类型(fanout, topic, direct, headers)*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);  /*** routingkey-路由键 */String itemInsertRoutingKey = "item.insert";  String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/* 发送消息-insert */  /*** 参数1 交换机名称 如果没有指定则使用默认 default exchange * 参数2 routingkey-路由key, 简单模式可以传递队列名称 * 参数3 消息其他属性* 参数4 消息内容 */String insertMsg = "我是消息,通配符模式,routingkey=" + itemInsertRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemInsertRoutingKey, null, insertMsg.getBytes());System.out.println("已发送消息=" + insertMsg); String updMsg = "我是消息,通配符模式,routingkey=" + itemUpdateRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemUpdateRoutingKey, null, updMsg.getBytes());System.out.println("已发送消息=" + updMsg);String deleteMsg = "我是消息,通配符模式,routingkey=" + itemDeleteRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemDeleteRoutingKey, null, deleteMsg.getBytes());System.out.println("已发送消息=" + deleteMsg);/* 关闭连接和信道 */ channel.close();conn.close(); }
}

消费者1  topic_queue_1

/*** 通配符模式消费者-routingkey */
public class RouteConsumerWild1 {/* 交换机名称 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*队列名称1*/ static final String TOPIC_QUEUE_1 = "topic_queue_1";public static void main(String[] args) throws Exception {/*创建连接 */Connection conn = RBConnectionUtil.getConn();/*创建队列*/Channel channel = conn.createChannel(); /*声明交换机*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由键 */String itemInsertRoutingKey = "item.insert";  String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/*** 声明/创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */
//		channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);  // ui界面可以创建队列 /*** 队列绑定交换机* 参数1 队列名称* 参数2 交换机* 参数3 routingkey-路由键 */
//		channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.#"); // ui界面可以把队列绑定到交换机 /* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组  */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息  * 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(TOPIC_QUEUE_1, true, consumer); }
}

消费者2 topic_queue_2

/*** 通配符模式消费者-routingkey */
public class RouteConsumerWild2 {/* 交换机名称 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*队列名称1*/ static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {/*创建连接 */Connection conn = RBConnectionUtil.getConn();/*创建队列*/Channel channel = conn.createChannel(); /*声明交换机*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由键 */String itemInsertRoutingKey = "item.insert";  String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/*** 声明/创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */
//		channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);  // ui界面可以创建队列 /*** 队列绑定交换机* 参数1 队列名称 * 参数2 交换机* 参数3 routingkey-路由键  */
//		channel.queueBind(TOPIC_QUEUE_2 TOPIC_EXCHANGE, "*.delete"); // ui界面可以把队列绑定到交换机  /* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/** * @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组  */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息  * 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(TOPIC_QUEUE_2, true, consumer); }
}

【3】 rabbitmq 模式总结  

8.1)模式1 简单模式 helloworld

一个生产者,一个消费者,不需要设置交换机,使用默认交换机;

8.2)模式2 工作队列模式 work queue

一个生产者,多个消费者(竞争关系),不需要设置交换机(使用默认交换机);

8.3)发布订阅模式  publish/subscribe

需要设置类型为 fanout-广播的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列;

8.4)路由模式 routing

需要设置类型为 direct的交换机, 交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key 将消息发送到对应队列;

8.5)通配符模式 topic

需要设置类型为 topic的交换机, 交换机和队列进行绑定, 并且指定通配符方式的routing key, 当发送消息到交换机后,交换机会根据 routing key将消息发送到对应的队列;

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/330120.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中海达手部链接电脑安装软件_山东水文局:较大含沙量条件下中海达ADCP外接测深仪测流系统试验成功...

近日,黄委山东水文水资源局与中海达海洋公司联合,在黄河泺口水文站较大含沙量环境下进行了ADCP外接测深仪、GNSS测流试验,试验取得了圆满成功。泺口水文站简介1泺口水文站概况泺口水文站位于山东省济南市天桥区黄河泺口浮桥南岸,隶…

一文搞懂ThreadLocal及相关的内存泄露问题

首先,看一张整体的结构图,来帮助理解 什么是ThreadLocal ThreadLocal用于创建线程局部变量,如果创建一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,…

resnet50加入fpn_FPN+SSD同时兼顾速度和精度的检测器(二)

本文首发于知乎专栏“人工智能从入门到逆天杀神”,本文以及本专栏所有算法源代码都可以在神力AI平台获取,如果你没有GPU但需要预训练模型或者你想获取更多开箱即用的AI算法,欢迎加入我们的会员,一杯咖啡即可带你入门AI&#xff0c…

Java秒杀系统实战系列~RabbitMQ死信队列处理超时未支付的订单(转)

转自: https://juejin.cn/post/6844903903130042376 文末有源代码,非常棒 摘要: 本篇博文是“Java秒杀系统实战系列文章”的第十篇,本篇博文我们将采用RabbitMQ的死信队列的方式处理“用户秒杀成功生成订单后,却迟…

主流Java数据库连接池比较及前瞻

转载自 主流Java数据库连接池比较及前瞻主流数据库连接池 常用的主流开源数据库连接池有C3P0、DBCP、Tomcat Jdbc Pool、BoneCP、Druid等 C3p0: 开源的JDBC连接池,实现了数据源和JNDI绑定,支持JDBC3规范和JDBC2的标准扩展。目前使用它的开源项目有Hibern…

有序数组中查找第一个比target大的数

思路&#xff1a;二分法 时间复杂度&#xff1a;O(logn) 空间复杂度&#xff1a;O(1) 代码&#xff1a; public class Solution{public int search(int[] nums, int target){int lf0, rtn.length-1;while(lf<rt){int mid lf(rt-lf)/2;if(n[mid]>t){rt mid-1;}else{lf…

2020最新Java线程池入门(超详细)

转 https://blog.csdn.net/weixin_43893397/article/details/104361154 【1】代码示例 /*** 线程池测试-自定义线程池创建方式* since 2021/03/23 */ public class ThreadPoolMain2 {public static void main(String[] args) throws Exception {newMethod();}public static…

HDFS的诞生

转载自 HDFS的诞生 1牛刀小试 张大胖找了个实习的工作&#xff0c; 第一天上班Bill师傅给他分了个活儿&#xff1a;日志分析。张大胖拿到了师傅给的日志文件&#xff0c;大概有几十兆&#xff0c;打开一看&#xff0c; 每一行都长得差不多&#xff0c;类似这样&#xff1a;212.…

项目背景怎么描述_产品经理写简历,如何让「项目经验」更出众?

项目经验怎么写更出众&#xff1f;时间长但效果一般的项目经验要不要写&#xff1f;没有项目经验怎么办&#xff1f;本文凭借作者自己长期招聘产品、阅读大量简历所积累的经验解答了这三个问题&#xff0c;希望对你有所帮助。产品经理写简历时&#xff0c;都会通过项目案例来证…

一致性Hash算法原理

前言 当在需要将数据分发到多个数据库/缓存&#xff0c;或将请求分发给多个服务节点时&#xff0c;不可避免的会遇到以下问题&#xff1a; 如何将数据均匀的分散到各个节点中&#xff0c;并且尽量的在加减节点时能使受影响的数据最少。 选择节点的方法 随机放置 从多个节点…

转: java多线程-ThreadPoolExecutor的拒绝策略RejectedExecutionHandler

转自&#xff1a; https://blog.csdn.net/qq_25806863/article/details/71172823 概述 原文地址 http://blog.csdn.net/qq_25806863/article/details/71172823 在分析ThreadPoolExecutor的构造参数时&#xff0c;有一个RejectedExecutionHandler参数。 RejectedExecutionH…

已知两点坐标如何快速增加其他坐标_「测绘精选」坐标转换概述

引言&#xff1a;这篇“坐标转换概述”献给各位&#xff0c;可以对坐标转换有一个大致地、整体地了解。文中有些名词是为了便于表达而自创的&#xff0c;大家不用考据、较真。一、静态坐标和动态坐标(1)静态坐标传统大地测量没有考虑板块运动对坐标的影响。虽然板块运动客观存在…

什么是G1垃圾回收算法

转载自 什么是G1垃圾回收算法为解决CMS算法产生空间碎片和其它一系列的问题缺陷&#xff0c;HotSpot提供了另外一种垃圾回收策略&#xff0c;G1&#xff08;Garbage First&#xff09;算法&#xff0c;通过参数 -XX:UseG1GC来启用&#xff0c;该算法在JDK 7u4版本被正式推出&am…

一文理清RocketMQ顺序消费、重复消费、消息丢失问题

前言 在使用消息队列时不可避免的会遇到顺序消费、重复消费、消息丢失三个问题。在一次面试字节的时候&#xff0c;面试官问到如何保证顺序消费&#xff0c;当时回答不太准确&#xff0c;特意此文回顾如何解决顺序消费、重复消费、消息丢失三个问题。 重复消费 解决重复消费…

一道丧心病狂的java面试题

转载自 一道丧心病狂的java面试题无意中了解到如下题目&#xff0c;觉得蛮好。 题目如下&#xff1a; public class TestSync2 implements Runnable {int b 100; synchronized void m1() throws InterruptedException {b 1000;Thread.sleep(500); //6System.out.pri…

水晶报表图形位置_看了我用Excel做的年度报表,老板直夸好

2020年前5个月&#xff0c;最火爆的莫过于口罩。口罩的整条产业链都变得炙手可热&#xff0c;口罩、口罩机、炒熔喷布、聚丙烯等等相关企业的业务数据往往都是去年的几倍。那我们现在作为一家“表姐牌”的口罩厂的员工&#xff0c;老板叫我用Excel做一个既酷炫又简洁的年度报表…

Mysql优化(三):优化order by

MySQL中的两种排序方式 .通过有序索引顺序扫描直接返回有序数据 因为索引的结构是B树&#xff0c;索引中的数据是按照一定顺序进行排列的&#xff0c;所以在排序查询中如果能利用索引&#xff0c;就能避免额外的排序操作。EXPLAIN分析查询时&#xff0c;Extra显示为Using inde…

漫画:什么是服务熔断

转载自 漫画&#xff1a;什么是服务熔断什么是服务熔断&#xff1f;熔断这一概念来源于电子工程中的断路器&#xff08;Circuit Breaker&#xff09;。在互联网系统中&#xff0c;当下游服务因访问压力过大而响应变慢或失败&#xff0c;上游服务为了保护系统整体的可用性&#…

rabbitmq手动确认ack

【README】 参考 https://blog.csdn.net/u012943767/article/details/79300673 &#xff1b; 【0】声明交换机&#xff0c;队列 与绑定 /*** 交换机&#xff0c;队列声明与绑定 */ public class AckDeclarer {/** 确认交换机 */public static final String ACK_EXCHANGE2 &q…

python图片保存_python读取和保存图片5种方法对比

python读取和保存图片5种方法对比 python中对象之间的赋值是按引用传递的&#xff0c;如果需要拷贝对象&#xff0c;需要用到标准库中的copy模块 方法一&#xff1a;利用 PIL 中的 Image 函数 这个函数读取出来不是 array 格式&#xff0c;这时候需要用 np.asarray(im) 或者 np…