4、RabbitMQ的七种工作模式介绍

目录

一、Simple(简单模式)

1.1 概念

1.2 代码实现

消费者

运行结果

二、Work Queue(工作队列)

2.1 概念

1.2 代码实现

生产者

消费者

 运行结果

三、Publish/Subscribe(发布/订阅模式)

3.1 概念

3.2 代码实现

生产者

消费者

运行结果

四、Routing(路由模式)

4.1 概念

4.2 代码实现

Constants类

生产者

消费者

运行结果

五、Topics(通配符模式)

5.1 概念

5.2 代码实现

生产者

消费者

运行结果

六、RPC(RPC通信)了解

6.1 概念

6.2 代码实现

客户端代码编写

编写服务器代码

运行结果:

七、Publish Confirms(发布确认模式)​​​​​​​

publishing Messages Individually(单独确认)

Publishing Messages in Batches(批量确认)

Handling Publisher Confirms Asynchronously(异步确认)


一、Simple(简单模式)

1.1 概念

P: 生产者,也就是要发送消息的程序

C: 消费者,消息的接受者

Queue:消息队列,图中Queue类似提个邮箱,可以缓存消息;生产者向其中投递消息,消费者从中取出消息。

特点:一个生产者P,一个消费者C,消息只能被消费一次,也称为点对点(Point-to-Point)模式

适用场景:消息只能被单个消费者处理

1.2 代码实现

消费者

package rabbitmq.simple;import com.rabbitmq.client.*;
import java.io.IOException;public class ConsumerDemo {public static void main(String[] args) throws Exception {//1.创建连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost("8.136.108.248");// 设置RabbitMQ服务器的端口号connectionFactory.setPort(5672);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername("pinkboy");// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword("123456");// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost("/");// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();/***3.声明一个队列** @param channel RabbitMQ的通道,用于执行队列操作** 此处使用了queueDeclare方法来声明一个名为"hello"的队列该方法的参数分别表示:* 1. 队列名称("hello"):指定要声明的队列的名称* 2. true:表示该队列是持久化的,意味着即使RabbitMQ服务重启,队列也会被保留* 3. false:表示该队列不是排他的,意味着该队列可以被所有通道共享* 4. false:表示该队列不会在使用后自动删除,需要手动删除* 5. null:表示不设置额外的参数** 选择这些参数值的原因可能是希望创建一个持久化的、共享的队列,以便在不同的时间点和不同的消费者之间传递消息*/channel.queueDeclare("hello", true, false, false, null);// 4.开始从名为"hello"的队列中消费消息channel.basicConsume("hello", true, new DefaultConsumer(channel) {/*** 处理接收到的消息** @param consumerTag 消费者标签,用于标识消费者* @param envelope 包含消息路由信息的信封* @param properties 消息的属性,如内容类型、内容编码等* @param body 消息的主体内容,以字节数组形式表示* @throws IOException 如果处理消息时发生I/O错误*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});Thread.sleep(2000);//5.关闭资源channel.close();connection.close();}
}

运行结果

启动生产者代码:

观察消息队列

 启动消费者代码:

观察消息队列 

二、Work Queue(工作队列)

2.1 概念

一个生产者P,多个消费者C1,C2 在多个消息的情况下,WorkQueue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息

特点:消息不会重复,分配各不同的消费者

适用场景:集群环境中做异步处理

举个例子:12306短息通知服务,订票成功后,订单消息会发送到RabbitMQ,短信服务从RabbitMQ中获取订单信息,并发送通知信息(在短信服务之间进行任务分配)

1.2 代码实现

工作模式就是简单模式的增强版 和简单模式的区别就是 简单模式就一个消费者,工作模式支持多个消费者接收消息,消费者之间是竟争关系 每个消息只能被一个消费者接收

和简单模式代码差不多 为了展示多个消费者竞争的关系 生产者一次生产10条消息

常量类

package rabbitmq.constant;public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//工作队列模式public static final String WORK_QUEUE = "work_queue";}

生产者

package rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2、创建通道Channel channel = connection.createChannel();//3、声明交换机//4、声明队列/*** 声明一个队列** @param channel RabbitMQ的通道,用于执行队列操作** 此处使用了queueDeclare方法来声明一个名为"hello"的队列该方法的参数分别表示:* 1. 队列名称("hello"):指定要声明的队列的名称* 2. true:表示该队列是持久化的,意味着即使RabbitMQ服务重启,队列也会被保留* 3. false:表示该队列不是排他的,意味着该队列可以被所有通道共享* 4. false:表示该队列不会在使用后自动删除,需要手动删除* 5. null:表示不设置额外的参数** 选择这些参数值的原因可能是希望创建一个持久化的、共享的队列,以便在不同的时间点和不同的消费者之间传递消息*/channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//5、发送消息// 循环发送消息到 RabbitMQ 的 "hello" 队列中for (int i = 0; i < 10; i++) {// 构造消息内容String msg = "hello work queue ..." + i;/*** 参数1 表示交换机名称,因为使用默认交换机,所以为空字符串* 参数2 表示队列名称* 参数3 :消息的属性* 参数4:消息内容*/channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功!");//6、释放资源channel.close();connection.close();}
}

消费者

两个消费者的代码是一样的

消费者1

package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);channel.basicConsume(Constants.WORK_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");//5.关闭资源
//        channel.close();
//        connection.close();}
}

消费者2

package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);channel.basicConsume(Constants.WORK_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer2 启动成功!");//5.关闭资源
//        channel.close();
//        connection.close();}
}

 运行结果

生产者

生产者消息队列

 

消费者

为了避免第一个启动的消费者会将10条消息消费掉 需要先启动两个消费者,再去启动生产者

消费者1

消费者2

观察消息队列 

可以看到管理页面中有两个消费者被显示

三、Publish/Subscribe(发布/订阅模式)

3.1 概念

Exchange: 交换机 (X).
作⽤: 生产者将消息发送到Exchange, 由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产 者将消息投递到队列中, 实际上这个在RabbitMQ中不会发生. )
RabbitMQ交换机有四种类型: fanout,direct, topic, headers, 不同类型有着不同的路由策略. AMQP协议里还有另外两种类型, System和⾃定义, 此处不再描述.
Fanout:广播,将消息交给所有绑定到交换机的队列( Publish/Subscribe模式)
一个生产者P,多个消费者C1,C2,X代表交换机消息复制多份,每个消费者接收相同的消息
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者
适合场景:消息需要被多个消费者同时接收的场景.如:实时通知或者广播消息

3.2 代码实现

常量类

public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//发布订阅模式public static final String FANOUT_EXCHANGE = "fanout_exchange";public static final String FANOUT_QUEUE1 = "fanout_queue1";public static final String FANOUT_QUEUE2 = "fanout_queue2";
}

这个模式需要创建交换机,并绑定队列和交换机 

//3、声明交换机
/*** 参数1:交换机名称* 参数2:交换机类型 Fanout类型 -> 广播机制* 参数3:是否持久化*/
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

声明队列

//4、声明队列
/*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除* 参数5:其他参数*/
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

绑定队列和交换机

//5、交换机和队列绑定
/*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则 如果交换机类型为fanout类型,routingKey设置为空字符串*/
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");

生产者

package rabbitmq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;public class Producer {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、打开信道Channel channel = connection.createChannel();//3、声明交换机/*** 参数1:交换机名称* 参数2:交换机类型 Fanout类型 -> 广播机制* 参数3:是否持久化*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//4、声明队列/*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除* 参数5:其他参数*/channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//5、交换机和队列绑定/*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则 如果交换机类型为fanout类型,routingKey设置为空字符串*/channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");//6、发布消息String msg = "hello fanout ...";channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());System.out.println("消息发送成功!");//7、释放资源channel.close();connection.close();}
}

消费者

消费者1

package rabbitmq.fanout;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);//4、消费消息channel.basicConsume(Constants.FANOUT_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}

消费者2

package rabbitmq.fanout;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//4、消费消息channel.basicConsume(Constants.FANOUT_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer2 启动成功!");}
}

运行结果

启动生产者

观察消息队列

fanout_queue1 和 fanout_queue2 分别有了1条消息

 Exchange中多了队列的绑定关系

启动两个消费者

 

观察消息队列

四、Routing(路由模式)

4.1 概念

路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由key

发布订阅模式是无条件的将所有消息分发给所有的消费者,路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列

适合场景:需要根据特定规则分发消息的场景

比如系统打印日志, 日志等级分为error, warning, info,debug, 就可以通过这种模式,把不同的日志发
送到不同的队列, 最终输出到不同的⽂件
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
 
RoutingKey:路由键.生产者将消息发给交换器时,指定的一个字符串,用来告诉交换机应该如何处理这个消息.
 
BindingKey:绑定.RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候一般会指定一个BindingKey,这样RabbitMQ就知道如何正确地将消息路由到队列了.

队列和交换机的绑定,不能是任意的绑定了,而是要制定了一个BindKey(RoutingKey的一种)消息的发送方在向Exchange发送消息时也需要指定消息的RoutingKey

Exchange也不再把消息交给每一个绑定的key,而是根据消息的RountingKey进行判断,只要队列的BindingKey和发送消息的RoutingKey完全一致,才会接收到消息

创建交换机,定义交换机类型为BuiltinExchangeType.DIRECT

4.2 代码实现

Constants类

public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";
}

生产者

package rabbitmq.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;public class Producer {public static void main(String[] args) throws Exception {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.  创建信道Channel channel = connection.createChannel();//3.  声明交换机/*** 参数1:交换机名称* 参数2:交换机类型* 参数3:是否持久化* 参数4:是否自动删除* 参数5:其他参数*/channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, "direct", true, false, null);//4.  声明队列/*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,队列不再使用时是否自动删除此队列,如果将此参数和参数2设置为true就可以实现临时队列(队列不用了就自动删除)* 参数5:其他参数*/channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5、交换机和队列绑定/*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则*/channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6、发布消息String msg_a = "hello direct my routingKey is a...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg_a.getBytes());String msg_b = "hello direct my routingKey is b...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());String msg_c = "hello direct my routingKey is c...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());System.out.println("消息发送成功!");//7、释放资源channel.close();connection.close();}
}

消费者

package rabbitmq.direct;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//4、消费消息channel.basicConsume(Constants.DIRECT_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
package rabbitmq.direct;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//4、消费消息channel.basicConsume(Constants.DIRECT_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}

运行结果

启动生产者

观察消息队列界面

exchange下队列和Routing Key的绑定关系

启动消费者

观察消息队列界面

五、Topics(通配符模式)

5.1 概念

在路由模式上进行了升级,在routingKey的基础上,增加可通配符的功能,适之更加灵活

Topic和Routing的基本原同,即:生产者将消息发送给交换机,交换机根据RoutingKey将消息转发给RoutingKey匹配的队列,类似于正则表达式的方式来定义RoutingKey的模式. 

适合场景:需要灵活匹配和过滤消息的场景

Topic和Routing模式的区别:

1、topic模式使用的交换机类型为topic(Rounting模式使用的交换机类型为direct)

2、topic类型的交换机在匹配规则上进行了扩展,BingingKey支持通配符匹配(direct类型的将换季路由规则是BingKey和RoutingKey完全匹配)

在topic类型的交换机在匹配规则上有些要求:

1.RoutingKey是一系列由点(.)分隔的单词,比如“stock.usd.nyse”,"nyse.vmw","quick.organge.rabbit"

2. BingdingKey和RountingKey,也是点(.)分隔的字符串

3. BingdingKey中可以存在两种特殊的字符串,用于模糊匹配

  * 表示一个单词

  # 表示0-N个单词

举个例子:

Binding Key 为"d.a.b" 会同时路由到Q1 和Q2
Binding Key 为"d.a.f" 会路由到Q1
Binding Key 为"c.e.f" 会路由到Q2
Binding Key 为"d.b.f" 会被丢弃, 或者返回给⽣产者(需要设置mandatory参数)

5.2 代码实现

Constants类

public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";}

生产者

创建交换机类型为BuiltinExchangeType.TOPIC

 //3.声明交换机

channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);
 

声明队列

//4.声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

 绑定交换机和队列

//5.绑定交换机和队列
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
package rabbitmq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);//4.声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5.绑定交换机和队列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6、发布消息String msg_a = "hello topic my routingKey is ae.a.f...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes());String msg_b = "hello topic my routingKey is ef.a.b...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());String msg_c = "hello topic my routingKey is c...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes());System.out.println("消息发送成功!");}
}

消费者

package rabbitmq.topic;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.basicConsume(Constants.TOPIC_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);channel.basicConsume(Constants.TOPIC_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}

运行结果

生产者

观察消息队列界面

消费者

六、RPC(RPC通信)了解

6.1 概念

RPC(Remote Procedure Call) 即远程调用 它是一种通过网络从远程计算机上请求服务,而不是需要了解底层网络的技术,类似HTTP远程调用

RabbitMQ实现RPC通信的过程,大概率是通过两个队列实现一个可回调的过程

大概流程

1.客户端发消息到一个指定的队列,并在消息属性中设置replyTo字段,这个字段指定一个回调队列,服务端处理后,会把响应结果发送到这个队列

2.服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列

3.客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应

客户端:

1 发送请求(携带replyTo,CorrelationID)

2 接收响应(校验correlationID)

服务端:

1 接受请求,进行响应

2 发送响应(按照客户端指定的replyTo,设置correlationID)

6.2 代码实现

客户端代码编写

1、声明两个队列 RPC_REQUEST_QUEUE和RPC_RESPONSE_QUEUE,声明本次请求的唯一标志correlationID

2、将RPC_RESPONSE_QUEUE和correlationID配置到要发送的消息队列中

3、使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中

4、阻塞队列有消息后,主线程被唤醒,打印返回内容

Constants类

public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//rpc 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";}

客户端代码

package rabbitmq.rpc;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;/*** rpc 客户端* 1. 发送请求* 2. 等待响应*/
public class RpcClient {public static void main(String[] args) throws Exception { //1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、开启信道Channel channel = connection.createChannel();channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//3、发送请求String msg = "hello rpc...";//设置请求唯一标识//设置请求的相关属性// 生成一个唯一的关联ID,用于跟踪请求和响应String correlationID = UUID.randomUUID().toString();// 创建并配置AMQP基本属性,设置消息的关联ID和回复队列AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();// 发布消息到指定的请求队列,携带配置的属性和消息体channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, prop, msg.getBytes());//4、接收响应//使用阻塞队列,存储响应信息final ArrayBlockingQueue<String> response = new ArrayBlockingQueue<>(1);channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回调消息:" + respMsg);if (correlationID.equals(properties.getCorrelationId())) {//如果correlationID校验一致,则将响应信息保存在response中response.offer(respMsg);}}});/*** 阻塞等待响应*/String take = response.take();System.out.println("[RPC Client 响应结果]:" + take);}
}

编写服务器代码

package rabbitmq.rpc;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;/*** 1、接受请求* 2、发送响应*/
public class RpcServer {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、开启信道Channel channel = connection.createChannel();//3、接受请求channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body);System.out.println("接收到请求:" + request);String response = "针对request:" + request + ",响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

运行结果:

启动客户端

观察消息队列界面

启动服务器端

客户端输出消息 

 观察消息队列界面

七、Publish Confirms(发布确认模式)

作为消息中间件,都会面临消息丢失的问题

消息丢失大概分为三种情况

1、生产者问题 因为应用程序故、障网络抖动等原因,生产者没有成功想broker发送消息

2、消息中间件自身问题,生产者成功发送给Broker 但是Broker没有把消息保存好,导致消息丢失

3、消费者问题,Broker发送到消费者,消费者在消费时,因没处理好,导致消费者失败的消息从队列中删除了

针对问题1 可以采用发确认(Publisher Cofirms)机制实现

生产者将信道设置成confirm(确认)模式,一但信道进入confirm模式,所有在该信道上面发布的消息都是会被指派一个唯一的ID(从1开始),一但消息被投递到所有匹配的队列之后,RabbitMq就会发送一个确认给生产者(包括消息的唯一ID)这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么消息确认会在将消息写入到磁盘之后发出,broker回传给生产者的确认消息中

deliveryTag包包含了消息的序号,此外broker也可以设置channel basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到处理

发送方确认机制最大的好处是他是异步的,生产者可以同时发布消息和等待信道返回确认消息

1、当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息

2、如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该nack命令

使用发送确认机制,必须要将信道设置成confirm(确认)模式

package rabbitmq.comfirms;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 200;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {// 发布确认publishingMessagesIndividually();// 批量发布确认publishingMessagesInBatchs();// 异步发布确认publishingMessagesAsynchronously();}private static void publishingMessagesAsynchronously() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE3, true, false, false, null);//4、监听confirm消息//集合中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();} else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();} else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理,比如重发}});//5.发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;long seqNO = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNO);}while (!confirmSeqNo.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));}}private static void publishingMessagesInBatchs() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE2, true, false, false, null);//4、发送消息long start = System.currentTimeMillis();int batchSize = 100;int outStandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE2, null, msg.getBytes());outStandingMessageCount++;if (outStandingMessageCount == batchSize) {//6、等待确认channel.waitForConfirms(5000);outStandingMessageCount = 0;}}if (outStandingMessageCount > 0) {channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE1, true, false, false, null);//4、发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE1, null, msg.getBytes());//5、等待确认channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException | InterruptedException e) {throw new RuntimeException(e);}}
}

publishing Messages Individually(单独确认)

观察上面代码, 会发现这种策略是每发送⼀条消息后就调用channel.waitForConfirms方法,之后等待服务端的确认, 这实际上是⼀种串行同步等待的方式. 尤其对于持久化的消息来说, 需要等待消息确
认存储在磁盘之后才会返回(调用Linux内核的fsync方法). 但是发布确认机制是⽀持异步的. 可以⼀边发送消息, ⼀边等待消息确认。

Publishing Messages in Batches(批量确认)

 

相比于单独确认策略, 批量确认极大地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们 不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量.
当消息经常丢失时,批量确认的性能应该是不升反降的.

Handling Publisher Confirms Asynchronously(异步确认)

异步confirm方法的编程实现最为复杂. Channel 接⼝提供了⼀个方法addConfirmListener. 这个方法
可以添加ConfirmListener 回调接口.
ConfirmListener 接口中包含两个方法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理 RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表示 发送消息的序号. multiple 表示 是否批量确认.
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集
合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的
deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.
1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表示小于等于当前序号
deliveryTag的消息都收到了, 则清除对应集合
2. 当收到nack时, 处理逻辑类似, 不过 需要结合具体的业务情况, 进型消息重发等操作.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/903433.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

厚铜PCB钻孔工艺全解析:从参数设置到孔壁质量的关键控制点

在现代电子设备中&#xff0c;厚铜PCB&#xff08;印刷电路板&#xff09;扮演着至关重要的角色。它们不仅为电子元件提供了支撑&#xff0c;还实现了电路之间的连接。然而&#xff0c;在生产厚铜PCB时&#xff0c;钻孔是一个关键环节。本文将为您介绍厚铜PCB生产中钻孔的科普知…

缺口拼图,非线性坐标关联

继上一篇文章&#xff0c; 欢迎一起交流探讨 https://t.zsxq.com/GEIze

OTA(Over-The-Air)升级

简介&#xff1a; OTA&#xff08;Over-the-Air&#xff09;是一种通过无线方式进行数据传输和更新的技术&#xff0c;通常用于电子设备&#xff08;如智能手机、汽车、物联网设备等&#xff09;的软件、固件或配置更新。OTA可以在设备与服务器之间进行远程传输&#xff0c;用户…

fastapi和flaskapi有什么区别

FastAPI 和 Flask 都是 Python 的 Web 框架&#xff0c;但设计目标和功能特性有显著差异。以下是它们的核心区别&#xff1a; 1. ‌性能与异步支持‌ ‌FastAPI‌ 基于 ‌Starlette‌&#xff08;高性能异步框架&#xff09;和 ‌Pydantic‌&#xff08;数据校验库&#xff09;…

RCS认证是什么?RCS认证的好处?RCS认证所需要的资料

1. RCS&#xff08;Recycled Claim Standard&#xff09;认证 定义&#xff1a;由 Textile Exchange&#xff08;纺织品交易所&#xff09; 制定的国际标准&#xff0c;用于验证产品中回收材料&#xff08;如再生纤维、塑料、金属等&#xff09;的含量和供应链的可追溯性&…

10 基于Gazebo和Rviz实现导航仿真,包括SLAM建图,地图服务,机器人定位,路径规划

在9中我们已经实现了机器人的模块仿真&#xff0c;现在要在这个基础上实现SLAM建图&#xff0c;地图服务&#xff0c;机器人定位&#xff0c;路径规划 1. 还是在上述机器人的工作空间下&#xff0c;新建功能包&#xff08;nav&#xff09;&#xff0c;导入依赖 gmapping ma…

OpenGL----OpenGL纹理与纹理缓存区

在现代计算机图形学中,纹理(Texture)是一个至关重要的概念。它不仅可以为几何体表面添加细节和真实感,还可以用于实现各种复杂的视觉效果和数据处理。在OpenGL中,纹理的应用范围非常广泛,从基本的颜色映射到高级的阴影映射、环境映射等。本文将深入探讨OpenGL纹理与纹理缓…

Scikit-learn工具介绍与数据集

一、Scikit-learn简介与安装 Scikit-learn是Python中最流行的机器学习库之一&#xff0c;它提供了简单高效的数据挖掘和数据分析工具。 Python语言机器学习工具 Scikit-learn包括许多智能的机器学习算法的实现 Scikit-learn文档完善&#xff0c;容易上手&#xff0c;丰富的A…

Byte-Buddy系列 - 第4讲 byte-buddy无法读取到SpringBoot Jar中的类

目录 一、问题描述二、原因分析三、解决方案1&#xff08;推荐&#xff09;&#xff1a;获取线程上下文中的类加载器扩展 四、解决方案2&#xff1a;自定义SpringBoot类加载器 一、问题描述 在使用Byte-Buddy中的TypePool对类进行扩展后&#xff0c;在本地开发集成环境&#x…

AutogenStudio使用

官网介绍&#xff1a;https://microsoft.github.io/autogen/stable/ Autogen是什么&#xff1f; AutoGen 是由微软开发的一个开源框架&#xff0c;旨在通过 多智能体协作&#xff08;Multi-Agent Collaboration&#xff09; 实现复杂的任务自动化。它的核心思想是让多个 AI 代…

Vue3 Echarts 3D圆形柱状图实现教程以及封装一个可复用的组件

文章目录 前言一、实现原理二、series ——type: "pictorialBar" 简介2.1 常用属性 三、代码实战3.1 封装一个echarts通用组件 echarts.vue3.2 首先实现一个基础柱状图3.3 添加上下2个椭圆面3.4 进阶封装一个可复用的3D圆形柱状图组件 总结 前言 在前端开发的数据可视…

yolov8中train、test、val

说明yolov8中train、test、val是什么意思&#xff0c;是什么作用呢&#xff1f;详细介绍使用yolov8进行实例分割&#xff0c;我应该如何制作我的数据集呢&#xff1f; 1. YOLOv8中的train、val、test是什么意思&#xff1f;作用是什么&#xff1f; 在YOLOv8&#xff08;由Ultr…

借助Spring AI实现智能体代理模式:从理论到实践

借助Spring AI实现智能体代理模式&#xff1a;从理论到实践 前言 在人工智能领域&#xff0c;大语言模型&#xff08;LLM&#xff09;的应用愈发广泛&#xff0c;如何高效构建基于LLM的系统成为众多开发者关注的焦点。Anthropic的研究报告《构建高效代理》为我们提供了新的思…

【学习笔记】计算机操作系统(二)—— 进程的描述与控制

第二章 进程的描述与控制 文章目录 第二章 进程的描述与控制2.1 前趋图和程序执行2.1.1 前趋图2.1.2 程序顺序执行2.1.3 程序并发执行 2.2 进程的描述2.2.1 进程的定义和特征2.2.2 进程的基本状态及转换2.2.3 挂起操作和进程状态的转换2.2.4 进程管理中的数据结构 2.3 进程控制…

具身智能之强化学习

在具身智能&#xff08;Embodied AI&#xff09;中&#xff0c;强化学习&#xff08;Reinforcement Learning&#xff0c;RL&#xff09;是一种非常核心的学习方法。它让智能体&#xff08;agent&#xff09;通过与环境交互&#xff0c;不断试错&#xff0c;学习完成任务的策略…

go打印金字塔

需求 打印空心金字塔 解析 // * // * * // * * * // * * * *// 看成由星号、空格组成的矩形&#xff1a; // 1 1 1 0 // 2 3 2 1 // 3 5 3 2 // 4 7 4 3// 层数&#xff1a;n // 每层总元素数&#xff1a;2n-1 // 每星号数&#xff1a;n // 每层空格数&am…

C语言教程(二十二):C 语言头文件详解

一、头文件的定义与形式 头文件一般具有 .h 扩展名&#xff0c;它主要用来存放函数声明、宏定义、结构体和共用体的定义、全局变量的声明等内容。在C语言程序里&#xff0c;可借助 #include 预处理指令把这些头文件包含到源文件中。 二、头文件的作用 2.1 函数声明 头文件可对…

数据库day-08

一、实验名称和性质 删除修改数据 验证 设计 二、实验目的 1&#xff0e;掌握数据操作-- 删除、修改&#xff1b; 三、实验的软硬件环境要求 硬件环境要求&#xff1a; PC机&#xff08;单机&#xff09; 使用的软件名称、版本号以及模块&#xff1a; Windows 10&#x…

JAVA中Spring全局异常处理@ControllerAdvice解析

一、ControllerAdvice基础概念 1. 什么是ControllerAdvice&#xff1f; ControllerAdvice是Spring 3.2引入的注解&#xff0c;用于定义全局控制器增强组件&#xff0c;主要功能包括&#xff1a; 全局异常处理&#xff08;最常用&#xff09;全局数据绑定全局数据预处理 2. …

开放平台架构方案- GraphQL 详细解释

GraphQL 详细解释 GraphQL 是一种用于 API 的查询语言&#xff0c;由 Facebook 开发并开源&#xff0c;旨在提供一种更高效、灵活且强大的数据获取和操作方式。它与传统的 REST API 有显著不同&#xff0c;通过类型系统和灵活的查询能力&#xff0c;解决了 REST 中常见的过度获…