RabbitMQ Topic RPC

Topics(通配符模式)

Topics 和Routing模式的区别是:

  1. topics 模式使⽤的交换机类型为topic(Routing模式使⽤的交换机类型为direct)
  2. topic 类型的交换机在匹配规则上进⾏了扩展, Binding Key⽀持通配符匹配(direct类型的交换机路 由规则是BindingKey和RoutingKey完全匹配)

在topic类型的交换机在匹配规则上, 有些要求:

  1. RoutingKey 是⼀系列由点( . )分隔的单词, ⽐如 " stock.usd.nyse ", " nyse.vmw ", " quick.orange.rabbit "
  2. BindingKey 和RoutingKey⼀样, 也是点( . )分割的字符串
  3. Binding Key中可以存在两种特殊字符串, ⽤于模糊匹配
  • * 表⽰⼀个单词
  • # 表⽰多个单词(0-N个)

⽐如:

  • Binding Key 为"d.a.b" 会同时路由到Q1 和Q2
  • Binding Key 为"d.a.f" 会路由到Q1
  • Binding Key 为"c.e.f" 会路由到Q2
  • Binding Key 为"d.b.f" 会被丢弃, 或者返回给⽣产者(需要设置mandatory参数)

引⼊依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

编写⽣产者代码 和路由模式, 发布订阅模式的区别是:

交换机类型不同, 绑定队列的RoutingKey不同

创建交换机

定义交换机类型为BuiltinExchangeType.TOPIC

channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, false, null);

声明队列

channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false, null);

绑定交换机和队列

//队列1绑定error, 仅接收error信息
channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,
"*.error");
//队列2绑定info, error: error,info信息都接收
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"#.info");
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"*.error");

发送消息

String msg = "hello topic, I'm order.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBy
tes());
String msg_black = "hello topic, I'm order.pay.info";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_bl
ack.getBytes());
String msg_green= "hello topic, I'm pay.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.g
etBytes());

完整代码:

public static String TOPIC_EXCHANGE_NAME = "test_topic";
public static String TOPIC_QUEUE_NAME1 = "topic_queue1";
public static String TOPIC_QUEUE_NAME2 = "topic_queue2";import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;
public class TopicRabbitProducer {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guestfactory.setPassword(Constants.PASSWORD);//密码, 默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 创建交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, false, null);//3. 声明队列//如果没有⼀个这样的⼀个队列, 会⾃动创建, 如果有, 则不创建channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false,
null);channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false,
null);//4. 绑定队列和交换机//队列1绑定error, 仅接收error信息channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,
"*.error");//队列2绑定info, error: error,info信息都接收channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"#.info");channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"*.error");//5. 发送消息String msg = "hello topic, I'm order.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBy
tes());String msg_black = "hello topic, I'm order.pay.info";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_bl
ack.getBytes());String msg_green= "hello topic, I'm pay.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.g
etBytes());//6.释放资源channel.close();connection.close();}
}

编写消费者代码

Routing模式的消费者代码和Routing模式代码⼀样, 修改消费的队列名称即可

同样复制出来两份

消费者1:TopicRabbitmqConsumer1

消费者2: TopicRabbitmqConsumer2

完整代码:

import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class TopicRabbitmqConsumer1 {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guestfactory.setPassword(Constants.PASSWORD);//密码, 默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 接收消息, 并消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);}
}

运⾏程序, 观察结果

运⾏⽣产者, 可以看到队列的消息数

运⾏消费者

RPC(RPC通信)

RPC(Remote Procedure Call), 即远程过程调⽤. 它是⼀种通过⽹络从远程计算机上请求服务, ⽽不需要 了解底层⽹络的技术. 类似于Http远程调⽤

RabbitMQ实现RPC通信的过程, ⼤概是通过两个队列实现⼀个可回调的过程

⼤概流程如下:

  1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队 列, 服务端处理后, 会把响应结果发送到这个队列
  2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确 保它是所期望的响应

引⼊依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

编写客⼾端代码 客⼾端代码主要流程如下:

  1. 声明两个队列, 包含回调队列replyQueueName, 声明本次请求的唯⼀标志corrId
  2. 将replyQueueName和corrId配置到要发送的消息队列中
  3. 使⽤阻塞队列来阻塞当前进程, 监听回调队列中的消息, 把请求放到阻塞队列中
  4. 阻塞队列有消息后, 主线程被唤醒,打印返回内容

声明队列

//2. 声明队列, 发送消息
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, false,
null);

定义回调队列

// 定义临时队列,并返回⽣成的队列名称
String replyQueueName = channel.queueDeclare().getQueue();

使⽤内置交换机发送消息

// 本次请求唯⼀标志
String corrId = UUID.randomUUID().toString();
// ⽣成发送消息的属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId) // 唯⼀标志本次请求.replyTo(replyQueueName) // 设置回调队列.build();
// 通过内置交换机, 发送消息
String message = "hello rpc...";
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props,
message.getBytes());

使⽤阻塞队列, 来存储回调结果

// 阻塞队列,⽤于存储回调结果
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
//接收服务端的响应
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息:"+ new String(body));//如果唯⼀标识正确, 放到阻塞队列中if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body, "UTF-8"));}}
};
channel.basicConsume(replyQueueName, true, consumer);

获取回调结果

// 获取回调的结果
String result = response.take();
System.out.println(" [RPCClient] Result:" + result);

完整代码

public static String RPC_REQUEST_QUEUE_NAME = "rpc_request_queue";import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class RPCClient {public static void main(String[] args) throws Exception {//1. 创建Channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guestfactory.setPassword(Constants.PASSWORD);//密码, 默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false,
false, null);// 唯⼀标志本次请求String corrId = UUID.randomUUID().toString();// 定义临时队列,并返回⽣成的队列名称String replyQueueName = channel.queueDeclare().getQueue();// ⽣成发送消息的属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId) // 唯⼀标志本次请求.replyTo(replyQueueName) // 设置回调队列.build();// 通过内置交换机, 发送消息String message = "hello rpc...";channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props,
message.getBytes());// 阻塞队列,⽤于存储回调结果final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//接收服务端的响应DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息:"+ new String(body));if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body, "UTF-8"));}}};channel.basicConsume(replyQueueName, true, consumer);// 获取回调的结果String result = response.take();System.out.println(" [RPCClient] Result:" + result);//释放资源channel.close();connection.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/81737.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

服务器死机了需要检查哪些问题

在这个数字化的时代&#xff0c;服务器就像是我们信息世界的“大管家”&#xff0c;可要是它突然死机了&#xff0c;那可真是让人头疼。今天咱们就来聊聊&#xff0c;服务器死机了&#xff0c;到底需要检查哪些问题。 一、硬件问题 电源供应&#xff1a;检查电源是否稳定&…

【MySQL成神之路】运算符总结

MySQL运算符总结 MySQL提供了丰富的运算符&#xff0c;用于在SQL语句中进行各种计算和比较操作。这些运算符可以分为算术运算符、比较运算符、逻辑运算符、位运算符等几大类。合理使用这些运算符可以构建复杂的查询条件和计算表达式。 一、算术运算符 MySQL支持基本的算术运…

自用Vscode 配置c++ debug环境

前言 使用vscode配置c debug环境的好处 1、可以借助vscode方便轻量的扩展和功能 2、避免了传统使用gdb 复杂按键以及不够直观的可视化 3、方便一次运行&#xff0c;断点处查看变量&#xff0c;降低找bug难度 4、某大公司项目采用类似配置&#xff0c;经过实践检验 配置c运行环…

创建一个使用 GPT-4o 和 SERP 数据的 RAG 聊天机器人

亮数据-网络IP代理及全网数据一站式服务商屡获殊荣的代理网络、强大的数据挖掘工具和现成可用的数据集。亮数据&#xff1a;网络数据平台领航者https://www.bright.cn/?promogithub15?utm_sourceorganic-social-cn&utm_campaigncsdn 本指南将解释如何使用 Python、GPT-4…

吴恩达 Deep Learning(1-36)ppt逐行理解

课程地址&#xff1a;(超爽中英!) 2024公认最好的【吴恩达深度学习】教程&#xff01;附课件代码 Professionalization of Deep Learning_哔哩哔哩_bilibili 1.目录 2.什么是神经网络 3.用神经网络进行监督学习 4.为什么深度学习会兴起 7.二分分类 适用于二元分类问题的函数&…

三维点云的处理

1 点云原理 https://zh.wikipedia.org/wiki/%E9%BB%9E%E9%9B%B2 点云&#xff08;英语&#xff1a;point cloud&#xff09;是空间中点的数据集&#xff0c;可以表示三维形状或对象&#xff0c;通常由三维扫描仪获取。点云中每个点的位置都由一组笛卡尔坐标(X,Y,Z)描述[1]&…

鸿蒙HarmonyOS多设备流转:分布式的智能协同技术介绍

随着物联网和智能设备的普及&#xff0c;多设备间的无缝协作变得越来越重要。鸿蒙&#xff08;HarmonyOS&#xff09;作为华为推出的新一代操作系统&#xff0c;其分布式技术为实现多设备流转提供了强大的支持。本文将详细介绍鸿蒙多设备流转的技术原理、实现方式和应用场景。 …

Spring Boot- 2 (数万字入门教程 ):数据交互篇

JDBC交互框架: Spring的JDBC操作工具: 依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> JDBC的模版类:JdbcTemplate 引入Mysql的依赖 <depe…

在 Kotlin 中,什么是内联函数?有什么作用?

在 Kotlin 中&#xff0c;内联函数是一种通过 inline 关键字声明的函数&#xff0c;其主要目的是优化高阶函数&#xff08;即以函数作为参数或返回值的函数&#xff09;的性能。 内联函数的函数体会在编译时直接插入到调用处&#xff0c;从而避免函数调用的开销&#xff0c;并…

LLM笔记(五)概率论

1. 随机变量与概率分布&#xff1a;模型输出的基础 在LLM中&#xff0c;随机变量最直观的体现就是模型预测的下一个token。每个时刻&#xff0c;模型都会输出一个概率分布&#xff0c;表示词汇表中每个token可能是"下一个词"的概率。 直观理解 想象模型在处理句子…

LeetCode-滑动窗口-找到字符串中所有字母异位词

LeetCode-滑动窗口-找到字符串中所有字母异位词 ✏️ 关于专栏&#xff1a;专栏用于记录 prepare for the coding test。 文章目录 LeetCode-滑动窗口-找到字符串中所有字母异位词&#x1f4dd; 找到字符串中所有字母异位词&#x1f3af;题目描述&#x1f50d; 输入输出示例&am…

PostgreSQL 初体验

目录 一、PostgreSQL 1. 简介 2. 特点 &#xff08;1&#xff09; 开源免费&#xff08;Open Source&#xff09; &#xff08;2&#xff09;标准兼容&#xff08;SQL Compliance&#xff09; &#xff08;3&#xff09; 丰富的数据类型&#xff08;Data Types&#xff09…

05_核支持向量机

描述 核支持向量机&#xff08;通常简称为SVM&#xff09;可以推广到更复杂模型的扩展&#xff0c;这些模型无法被输入空间的超平面定义。 SVM 的核心思想是找到一个最优的超平面&#xff0c;将不同类别的数据分开。这个超平面不仅要能够正确分类数据&#xff0c;还要使得两个…

Java + 鸿蒙双引擎:ZKmall开源商城如何定义下一代B2C商城技术标准?

在 B2C 电商领域持续革新的当下&#xff0c;技术架构的优劣成为决定商城竞争力的核心要素。ZKmall开源商城以其创新融合的 Java 与鸿蒙双引擎&#xff0c;为下一代 B2C 商城技术标准勾勒出全新蓝图&#xff0c;在性能、兼容性、拓展性等关键维度实现了重大突破。 一、Java 技术…

关于 Web 漏洞原理与利用:3. CSRF(跨站请求伪造)

一、原理&#xff1a; 利用用户登录态伪造操作 CSRF&#xff08;Cross-Site Request Forgery&#xff0c;跨站请求伪造&#xff09;是攻击者“借刀杀人”&#xff0c;借用用户浏览器中已有的登录状态&#xff0c;诱导用户完成攻击者指定的操作。 1. 基本机制分解 1&#xf…

【HTML5】【AJAX的几种封装方法详解】

【HTML5】【AJAX的几种封装方法详解】 AJAX (Asynchronous JavaScript and XML) 封装是为了简化重复的异步请求代码&#xff0c;提高开发效率和代码复用性。下面我将介绍几种常见的 AJAX 封装方式。 方法1. 基于原生 XMLHttpRequest 的封装 XMLHttpRequest。其主要特点如下…

C++ - 网络编程之初始连接(Winsock2 概述、初始连接案例、初始连接案例解读)

一、Winsock2 概述 Winsock2&#xff08;Windows Sockets 2&#xff09;是微软提供的 Windows 平台网络编程库 二、初始连接案例 1、Server #include <winsock2.h> #include <ws2tcpip.h> #include <iostream>#pragma comment(lib, "ws2_32.lib&quo…

Spring Cloud Gateway深度解析:原理、架构与生产实践

文章目录 前言一、概述二、核心架构设计及设计原理2.1 分层架构模型网络层&#xff08;I/O模型&#xff09;核心处理层 2.2 核心组件协作流程路由定位阶段过滤器执行阶段 2.3 响应式编程模型实现Reactor上下文传递背压处理机制 2.4 动态路由设计原理2.5 异常处理体系2.6 关键路…

游戏开发实战(一):Python复刻「崩坏星穹铁道」嗷呜嗷呜事务所---源码级解析该小游戏背后的算法与设计模式【纯原创】

文章目录 奇美拉项目游戏规则奇美拉(Chimeras)档案领队成员 结果展示&#xff1a; 奇美拉项目 由于项目工程较大&#xff0c;并且我打算把我的思考过程和实现过程中踩过的坑都分享一下&#xff0c;因此会分3-4篇博文详细讲解本项目。本文首先介绍下游戏规则并给出奇美拉档案。…

说一下响应状态码有哪些?

HTTP响应状态码分类(RFC 7231标准) 1. 1xx(信息类) 临时响应,表示请求已被接收,需要继续处理 100 Continue:客户端应继续发送请求体 101 Switching Protocols:服务器同意升级协议(如WebSocket) 102 Processing(WebDAV):服务器正在处理但未完成 2. 2xx(成功类)…