rabbitmq-发布订阅模式

【README】

本文po出 mq的发布订阅模式,及代码示例;

 

【1】intro

1) 角色: 有4个角色, 包括 生产者,消费者, 交换机 exchange(X), 队列;

2)交换机: 一方面,接收生产者的消息,另一方面,处理消息,如发送给队列,或丢弃;这取决于 exchange类型;
3)exchange类型有如下3种:
fanout 广播, 把消费转发给所有 绑定到该交换机的所有队列;
direct 定向, 把消息转发给符合 指定 routing key 路由键的队列;
topic 通配符, 把消息交给 routing pattern(路由模式)的队列;
4)exchange 交换机, 只负责转发消息, 不具备存储消息的能力; 因此如果没有任何队列与 exchange 绑定, 或者没有符合规则的队列, 那么消息会丢失;

5)发布订阅模式:
5.1-每个消费者监听自己的队列;
5.2-生产者把消息发送给 broker, 由交换机把消息转发到绑定此交换机的所有队列;

6)交换机需要与队列绑定, 绑定之后,一个消息可以被多个消费者收到;

【2】代码(生产者1个,交换机exchange1个,但对应到2个队列,即消息有2个replication)

生产者


/*** 发布订阅模式生产者* 本文发布订阅模式使用的交换机类型为广播 fanout * @author tang rong */
public class PSProduer {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";/** 队列名称1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接Channel channel = conn.createChannel();  // 创建频道/*** 声明交换机* 参数1-交换机名称 * 参数2-交换机类型(fanout, topic, direct, headers)*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);/*** 创建队列* @param1 队列名称* @param2  是否持久化队列* @param3 是否独占本次连接 * @param4 是否在不使用的时候自动删除队列 * @param5 队列其他参数  */ channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 队列绑定交换机 */channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, 	"");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, 	"");/*** 发送消息 */long temp = 1; for (int i = 0; i < 1000; i++) { String msg = "发布订阅模式消息,序号=" + (temp+i) + "时间=" + MyDateUtil.getNow();/*** 参数1 交换机名称,没有指定则使用默认交换机 Default change * 参数2 路由key,简单模式可以传递队列名称 * 参数3 消息其他属性 * 参数4 消息内容 */channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes("UTF-8")); System.out.println("生产者发送消息" + msg);  }  System.out.println("=== 生产者消息发送完成");/* 关闭资源 */channel.close();conn.close(); }
}

消费者1


/*** 发布订阅模式消费者1* @author tang rong */
public class PSConsumer1 {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接 Channel channel = conn.createChannel();  // 创建队列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机/*** 创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);/*** 队列绑定交换机*/channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");/* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组  */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息* 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(FANOUT_QUEUE_1, true, consumer); }}

消费者2

/*** 发布订阅模式消费者* @author tang rong */
public class PSConsumer2 {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接 Channel channel = conn.createChannel();  // 创建队列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机/*** 创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 队列绑定交换机*/channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");/* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组  */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者2 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者2 end ===\n"); } };/*** 监听消息* 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(FANOUT_QUEUE_2, true, consumer); }}

 

【3】小结

1)发布订阅模式与工作模式的区别;
区别1)工作队列模式不需要定义交换机, 发布订阅模式需要;
区别2)工作队列模式的生产者向队列发送消息(底层使用默认交换机),  发布订阅模式的生产者向交换机发送消息;
区别3)工作队列模式的队列不需要与交换机绑定(底层与默认交换机绑定), 发布订阅模式中的队列需要与交换机绑定;

2)默认交换机

AMQP default

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/330126.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入Java类型信息:RTTI和反射

转载自 「深入Java」类型信息&#xff1a;RTTI和反射 1.RTTI Run-Time Type Infomation 运行时类型信息 为什么需要RTTI&#xff1f; 越是优秀的面向对象设计&#xff0c;越是强调高内聚低耦合&#xff0c;正如依赖倒转原则所说&#xff1a;“无论是高层模块还是低层模块&#…

设计费收费标准2002修订_@设计师,2019景观园林设计收费标准,注意查收!

From&#xff1a;景观邦(ID&#xff1a;LA_bang)一、设计费量的定制&#xff1a;设计费在10万以上的(包括10万)省外工程方能承接。其工作内容有:景观概念规划设计、方案设计、初步设计、施工图设计、效果图绘制及多媒体制作等。二、设计费的取费标准&#xff1a;居住区、道路、…

rabbitmq-路由模式-routingkey

【README】 本文po出 rabbitmq路由模式&#xff1b; 【1】intro to 路由模式 特点1&#xff09;队列与交换机的绑定&#xff0c;不能是任意绑定&#xff0c; 而是指定一个路由key-routingkey&#xff1b; 特点2&#xff09;消息的发送方向在向 exchange-交换机发送消息时&…

一文理清HashMap的实现及细节

前言 最近阅读了许多HashMap实现及源码分析的文章&#xff0c;特意此文记录HashMap的知识点。 HashMap 底层由 数组 链表 组成&#xff0c;在 jdk1.7 和 1.8 中具体略有不同。 JDK1.7的HashMap 数据结构&#xff1a;图片来源 核心成员变量 图片来源 初始化桶大小&…

java提高篇之详解内部类

转载自 java提高篇之详解内部类内部类是一个非常有用的特性但又比较难理解使用的特性(鄙人到现在都没有怎么使用过内部类&#xff0c;对内部类也只是略知一二)。第一次见面内部类我们从外面看是非常容易理解的&#xff0c;无非就是在一个类的内部在定义一个类。123456789101112…

rabbitmq-通配符模式

【README】 本文介绍 通配符模式&#xff0c;及代码示例 【1】intro to rabbitmq通配符模式 0&#xff09;通配符模式-交换机类型为 Topic&#xff1b; 1&#xff09;与路由模式相比&#xff0c;相同点是 两者都可以通过 routingkey 把消息转发到不同的队列&#xff1b; 不同…

中海达手部链接电脑安装软件_山东水文局:较大含沙量条件下中海达ADCP外接测深仪测流系统试验成功...

近日&#xff0c;黄委山东水文水资源局与中海达海洋公司联合&#xff0c;在黄河泺口水文站较大含沙量环境下进行了ADCP外接测深仪、GNSS测流试验&#xff0c;试验取得了圆满成功。泺口水文站简介1泺口水文站概况泺口水文站位于山东省济南市天桥区黄河泺口浮桥南岸&#xff0c;隶…

一文搞懂ThreadLocal及相关的内存泄露问题

首先&#xff0c;看一张整体的结构图&#xff0c;来帮助理解 什么是ThreadLocal ThreadLocal用于创建线程局部变量&#xff0c;如果创建一个ThreadLocal变量&#xff0c;那么访问这个变量的每个线程都会有这个变量的一个副本&#xff0c;在实际多线程操作的时候&#xff0c;…

resnet50加入fpn_FPN+SSD同时兼顾速度和精度的检测器(二)

本文首发于知乎专栏“人工智能从入门到逆天杀神”&#xff0c;本文以及本专栏所有算法源代码都可以在神力AI平台获取&#xff0c;如果你没有GPU但需要预训练模型或者你想获取更多开箱即用的AI算法&#xff0c;欢迎加入我们的会员&#xff0c;一杯咖啡即可带你入门AI&#xff0c…

Java秒杀系统实战系列~RabbitMQ死信队列处理超时未支付的订单(转)

转自&#xff1a; https://juejin.cn/post/6844903903130042376 文末有源代码&#xff0c;非常棒 摘要&#xff1a; 本篇博文是“Java秒杀系统实战系列文章”的第十篇&#xff0c;本篇博文我们将采用RabbitMQ的死信队列的方式处理“用户秒杀成功生成订单后&#xff0c;却迟…

主流Java数据库连接池比较及前瞻

转载自 主流Java数据库连接池比较及前瞻主流数据库连接池 常用的主流开源数据库连接池有C3P0、DBCP、Tomcat Jdbc Pool、BoneCP、Druid等 C3p0: 开源的JDBC连接池&#xff0c;实现了数据源和JNDI绑定&#xff0c;支持JDBC3规范和JDBC2的标准扩展。目前使用它的开源项目有Hibern…

有序数组中查找第一个比target大的数

思路&#xff1a;二分法 时间复杂度&#xff1a;O(logn) 空间复杂度&#xff1a;O(1) 代码&#xff1a; public class Solution{public int search(int[] nums, int target){int lf0, rtn.length-1;while(lf<rt){int mid lf(rt-lf)/2;if(n[mid]>t){rt mid-1;}else{lf…

2020最新Java线程池入门(超详细)

转 https://blog.csdn.net/weixin_43893397/article/details/104361154 【1】代码示例 /*** 线程池测试-自定义线程池创建方式* since 2021/03/23 */ public class ThreadPoolMain2 {public static void main(String[] args) throws Exception {newMethod();}public static…

HDFS的诞生

转载自 HDFS的诞生 1牛刀小试 张大胖找了个实习的工作&#xff0c; 第一天上班Bill师傅给他分了个活儿&#xff1a;日志分析。张大胖拿到了师傅给的日志文件&#xff0c;大概有几十兆&#xff0c;打开一看&#xff0c; 每一行都长得差不多&#xff0c;类似这样&#xff1a;212.…

项目背景怎么描述_产品经理写简历,如何让「项目经验」更出众?

项目经验怎么写更出众&#xff1f;时间长但效果一般的项目经验要不要写&#xff1f;没有项目经验怎么办&#xff1f;本文凭借作者自己长期招聘产品、阅读大量简历所积累的经验解答了这三个问题&#xff0c;希望对你有所帮助。产品经理写简历时&#xff0c;都会通过项目案例来证…

一致性Hash算法原理

前言 当在需要将数据分发到多个数据库/缓存&#xff0c;或将请求分发给多个服务节点时&#xff0c;不可避免的会遇到以下问题&#xff1a; 如何将数据均匀的分散到各个节点中&#xff0c;并且尽量的在加减节点时能使受影响的数据最少。 选择节点的方法 随机放置 从多个节点…

转: java多线程-ThreadPoolExecutor的拒绝策略RejectedExecutionHandler

转自&#xff1a; https://blog.csdn.net/qq_25806863/article/details/71172823 概述 原文地址 http://blog.csdn.net/qq_25806863/article/details/71172823 在分析ThreadPoolExecutor的构造参数时&#xff0c;有一个RejectedExecutionHandler参数。 RejectedExecutionH…

已知两点坐标如何快速增加其他坐标_「测绘精选」坐标转换概述

引言&#xff1a;这篇“坐标转换概述”献给各位&#xff0c;可以对坐标转换有一个大致地、整体地了解。文中有些名词是为了便于表达而自创的&#xff0c;大家不用考据、较真。一、静态坐标和动态坐标(1)静态坐标传统大地测量没有考虑板块运动对坐标的影响。虽然板块运动客观存在…

什么是G1垃圾回收算法

转载自 什么是G1垃圾回收算法为解决CMS算法产生空间碎片和其它一系列的问题缺陷&#xff0c;HotSpot提供了另外一种垃圾回收策略&#xff0c;G1&#xff08;Garbage First&#xff09;算法&#xff0c;通过参数 -XX:UseG1GC来启用&#xff0c;该算法在JDK 7u4版本被正式推出&am…

一文理清RocketMQ顺序消费、重复消费、消息丢失问题

前言 在使用消息队列时不可避免的会遇到顺序消费、重复消费、消息丢失三个问题。在一次面试字节的时候&#xff0c;面试官问到如何保证顺序消费&#xff0c;当时回答不太准确&#xff0c;特意此文回顾如何解决顺序消费、重复消费、消息丢失三个问题。 重复消费 解决重复消费…