RabbitMQ ②-工作模式

在这里插入图片描述

RabbitMQ 工作模式

官方提供了七种工作模式

Simple(简单模式)在这里插入图片描述

  • P:生产者,发布消息到队列
  • C:消费者,从队列中获取消息并消费
  • Queue:消息队列,存储消息。

一个生产者,一个消费者,消息只能被消费一次,也被称为点对点(Point-to-Point)模式。

Work-Queues(工作队列模式)

在这里插入图片描述

  • P:生产者,发布消息到队列
  • C1C2:消费者,从队列中获取消息并消费
  • Queue:消息队列,存储消息。

Queue 存储多个消息时,就会分配给不同的消费者,每个消费者接收到不同的消息。

Publish/Subscribe(发布/订阅模式)

在这里插入图片描述

  • P:生产者,发布消息到队列
  • C1C2:消费者,从队列中获取消息并消费
  • Q1Q2:消息队列,存储消息。
  • X:即为 Exchange,交换机,交换机可以根据一定的规则将生产者发布的消息路由到指定的队列中。

RabbitMQ 的交换机有四种类型,不同的类型有着不同的策略:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。(Publish/Subscribe 模式)
  • Direct:定向,将消息路由到符合 routing key(路由键)的队列(Routing 模式)。
  • Topic:通配符,将消息路由到符合 routing pattern(路由匹配规则)的队列(Topics 模式)。
  • HeadersHeaders 类型的交换机不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配,这种类型的交换机性能很差,一般不会使用。

routing key 和 binding key:

  • routing key:路由键,生产者将消息发给交换机时,指定的一个字符串,用来告诉交换机应该如何将消息路由到指定队列。
  • binding key:绑定,将队列和交换机绑定时,指定的一个字符串,这样 RabbitMQ 就可以知道如何正确地将消息路由到指定的队列。
    在这里插入图片描述

Routing(路由模式)

在这里插入图片描述

  • X:交换机,交换机根据 routing key 进行消息路由。

Topics(通配符模式)在这里插入图片描述

  • X:交换机,交换机根据 routing pattern 进行消息路由。
  • *:匹配一个单词。
  • #:匹配多个单词。

RPC(远程过程调用模式)

在这里插入图片描述
在这里插入图片描述

可以把该模式理解有客户端和服务端间的通信,客户端发送请求,服务端处理请求并返回结果。

客户端发送请求时,指定 correlation _idreply_to,将该请求发送到 rpc_queue 里。

服务端从 rpc_queue 里取出请求,处理请求后,将结果发送到 reply_to 里。

客户端根据 correlation _id 取出结果。

Publisher Confirms(发布确认模式)

在这里插入图片描述
在这里插入图片描述

该模式是 RabbitMQ 服务器也就是 Broker 向生产者发送确认消息,生产者接收到确认消息后才认为消息发送成功。

如果 RabbitMQ 服务器因为某种原因没有接收到确认消息,需要根据业务情况决定是否重新发送消息。

工作模式使用案例

创建普通 Maven 项目,引入依赖:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

定义 Constants 类

package mq.Constants;public class Constants {public static final String HOST = "47.94.9.33";public static final int PORT = 5672;public static final String USER_NAME = "admin";public static final String PASSWORD = "admin";public static final String VIRTUAL_HOST = "/";// * 工作队列模式public static final String WORK_QUEUE = "work.queue";// * 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";// * 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";// * 通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";// * RPC 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";// * Publisher Confirms 模式public static final String P_CONFIRMS_QUEUE1 = "p.confirms.queue1";public static final String P_CONFIRMS_QUEUE2 = "p.confirms.queue2";public static final String P_CONFIRMS_QUEUE3 = "p.confirms.queue3";
}

Simple(简单模式)

生产者

package mq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST); // ? 公网 IPconnectionFactory.setPort(Constants.PORT); // ? 端口connectionFactory.setUsername(Constants.USER_NAME); // ? 用户名connectionFactory.setPassword(Constants.PASSWORD); // ? 密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // ? 虚拟主机Connection connection = connectionFactory.newConnection();//  TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明交换机(使用内置的交换机)// TODO 4. 声明队列/*** Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,*                                  Map<String, Object> arguments)* queue:队列名称* durable:是否持久化* exclusive:是否独占* autoDelete:是否自动删除* arguments:参数*/channel.queueDeclare("hello", true, false, false, null);// TODO 5. 发送消息/*** void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:交换机名称* routingKey:使用内置交换机,routingKey 和队列名保持一致* props:属性配置* body:消息体*/String msg = "hello rabbitMQ~";for (int i = 0; i < 1000; i++) {channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功," + msg);// TODO 6. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}

消费者

package mq.simple;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST); // ? 公网 IPconnectionFactory.setPort(Constants.PORT); // ? 端口connectionFactory.setUsername(Constants.USER_NAME); // ? 用户名connectionFactory.setPassword(Constants.PASSWORD); // ? 密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // ? 虚拟主机Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare("hello", true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){/*** 从队列中,收到消息就会执行该方法* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message 封包的消息,比如交换机,队列名称等...* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array)* @throws IOException IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume("hello", true, consumer);Thread.sleep(1000);// TODO 5. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}

Work-Queues(工作队列模式)

生产者

package mq.workQueues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//  TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明交换机(使用内置的交换机)// TODO 4. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 5. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~:" + i;channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功");// TODO 6. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}

消费者1

package mq.workQueues;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// TODO 5. 释放资源
//        channel.close(); // ! 先关闭 channel
//        connection.close();}
}

消费者2

package mq.workQueues;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// TODO 5. 释放资源
//        channel.close(); // ! 先关闭 channel
//        connection.close();}
}

Publish/Subscribe(发布/订阅模式)

生产者

package mq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//  TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明交换机/*** Exchange.DeclareOk exchangeDeclare(String exchange, the name of the exchange*         BuiltinExchangeType type, the exchange type*         boolean durable, true if we are declaring a durable exchange (the exchange will survive a server restart)*         boolean autoDelete, true if the server should delete the exchange when it is no longer in use*         boolean internal, true if the exchange is internal, it can't be directly published to by a client.*         Map<String, Object> arguments), other properties (construction arguments) for the exchange*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);// TODO 4. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);/*** Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)*  queue: the name of the queue*  exchange: the name of the exchange*  routingKey: the routing key to use for the binding*  arguments: other properties (binding parameters)*/// TODO 5. 绑定交换机和队列channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");// TODO 6. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~:" + i;channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功");// TODO 7. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}

消费者1

package mq.fanout;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);// TODO 5. 释放资源
//        channel.close(); // ! 先关闭 channel
//        connection.close();}
}

消费者2

package mq.fanout;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);// TODO 5. 释放资源
//        channel.close(); // ! 先关闭 channel
//        connection.close();}
}

Routing(路由模式)

生产者

package mq.direct;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启隧道Channel channel = connection.createChannel();// TODO 3. 声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);// TODO 5. 将队列和交换机绑定channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");// TODO 6. 发送消息List<String> list = List.of("a", "b", "c", "d");Random random = new Random();for (int i = 0; i < 10; i++) {String routingKey = list.get(random.nextInt(3));String msg = "hello routing mode~:" + routingKey;System.out.println(msg);channel.basicPublish(Constants.DIRECT_EXCHANGE, routingKey, null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功");// TODO 7. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}

消费者1

package mq.direct;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);// TODO 5. 释放资源
//        channel.close(); // ! 先关闭 channel
//        connection.close();}
}

消费者2

package mq.direct;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);// TODO 5. 释放资源
//        channel.close(); // ! 先关闭 channel
//        connection.close();}
}

Topics(通配符模式)

生产者

package mq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启隧道Channel channel = connection.createChannel();// TODO 3. 声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);// TODO 4. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);// TODO 5. 将队列和交换机绑定channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");// TODO 6. 发送消息String msg1 = "hello topic mode~:ef.a.c";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.c", null, msg1.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q1String msg2 = "hello topic mode~:rr.a.b";channel.basicPublish(Constants.TOPIC_EXCHANGE, "rr.a.b", null, msg2.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q1,Q2String msg3 = "hello topic mode~:c.com.ljh";channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.com.ljh", null, msg3.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q2System.out.println("消息发送成功");// TODO 7. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}

消费者1

package mq.topic;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);// TODO 5. 释放资源
//        channel.close(); // ! 先关闭 channel
//        connection.close();}
}

消费者2

package mq.topic;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);// TODO 5. 释放资源
//        channel.close(); // ! 先关闭 channel
//        connection.close();}
}

RPC(远程过程调用模式)

服务端

package mq.rpc;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启隧道Channel channel = connection.createChannel();// TODO 2.1 声明队列channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body);System.out.println("[RPC Server 接收到请求]:" + request);String response = "针对请求:" + request + " 做出响应:" + "🫡";// TODO 4. 发送响应AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes(StandardCharsets.UTF_8));// TODO 5. 确认收到channel.basicAck(envelope.getDeliveryTag(), false);}};// TODO 3. 接收请求channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, defaultConsumer);}
}

客户端

package mq.rpc;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启隧道Channel channel = connection.createChannel();// TODO 2.1 声明队列channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);// TODO 3. 发送请求String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId) // ? 唯一标识,标识接收该 ID 的响应.replyTo(Constants.RPC_RESPONSE_QUEUE).build();String msg = "hello RPC mode~:" + correlationId;channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, msg.getBytes(StandardCharsets.UTF_8));// TODO 4. 接收响应BlockingQueue<String> queue = new LinkedBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String resp = new String(body);System.out.println("接收到回调消息:" + resp);if (properties.getCorrelationId().equals(correlationId)) {queue.offer(resp);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);String res = queue.take();// ! 若没有对应的消息,程序会在这里阻塞System.out.println("[RPC Client接收到符合 ID 的消息]:" + res);}
}

Publisher Confirms(发布确认模式)

发布确认

package mq.publisher.confirms;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class PublisherConfirms {private static final int MAX_MESSAGE = 10000;static Connection createConnection() throws Exception {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {// * Strategy #1: Publishing Messages Individually
//        publishingMessagesIndividually();// * Strategy #2: Publishing Messages in BatchespublishingMessagesInBatches();// * Strategy #3: Handling Publisher Confirms AsynchronouslyhandlingPublisherConfirmsAsynchronously();}private static void handlingPublisherConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 开启发布确认机制channel.confirmSelect();// TODO 4. 声明队列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE3, true, false, false, null);SortedSet<Long> sortedSet = Collections.synchronizedSortedSet(new TreeSet<>());// TODO 5. 监听来自 Broker 的 ack 或 nackchannel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {sortedSet.headSet(deliveryTag + 1).clear();} else {sortedSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {sortedSet.headSet(deliveryTag + 1).clear();} else {sortedSet.remove(deliveryTag);}// TODO 5.1 根据业务逻辑处理消息重传}});Long start = System.currentTimeMillis();// TODO 6. 发送消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;Long ackSeq = channel.getNextPublishSeqNo();sortedSet.add(ackSeq);channel.basicPublish("", Constants.P_CONFIRMS_QUEUE3, null, msg.getBytes(StandardCharsets.UTF_8));}while (!sortedSet.isEmpty()) {
//                Thread.sleep(10);}Long end = System.currentTimeMillis();System.out.printf("异步确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesInBatches() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 开启发布确认机制channel.confirmSelect();// TODO 4. 声明队列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE2, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 发送消息int batchSize = 100, outstandingMessageCnt = 0;for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE2, null, msg.getBytes(StandardCharsets.UTF_8));outstandingMessageCnt++;if (outstandingMessageCnt >= batchSize) {channel.waitForConfirms(5_000);outstandingMessageCnt = 0;}}if (outstandingMessageCnt > 0) {channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("批量确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 开启发布确认机制channel.confirmSelect();// TODO 4. 声明队列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 发布消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));// TODO 5.1 等待 5s 收到来自 broker 的确认消息channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("单独确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}
}
em.out.printf("批量确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 开启发布确认机制channel.confirmSelect();// TODO 4. 声明队列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 发布消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));// TODO 5.1 等待 5s 收到来自 broker 的确认消息channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("单独确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/79744.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(2)python开发经验

文章目录 1 pyside6加载ui文件2 使用pyinstaller打包 更多精彩内容&#x1f449;内容导航 &#x1f448;&#x1f449;Qt开发 &#x1f448;&#x1f449;python开发 &#x1f448; 1 pyside6加载ui文件 方法1&#xff1a; 直接加载ui文件 from PySide6.QtWidgets import QAp…

【C++】互斥锁(Mutex)

在C中&#xff0c;互斥锁&#xff08;Mutex&#xff09;是用于线程同步的重要工具&#xff0c;用于保护共享资源&#xff0c;防止多线程同时访问导致的数据竞争&#xff08;Data Race&#xff09;问题。 以下是C中互斥锁的核心用法和示例&#xff1a; 一、基本互斥锁 std::mut…

Jsoup与HtmlUnit:两大Java爬虫工具对比解析

Jsoup&#xff1a;HTML解析利器 定位&#xff1a;专注HTML解析的轻量级库&#xff08;也就是快&#xff0c;但动态页面无法抓取&#xff09; 核心能力&#xff1a; DOM树解析与CSS选择器查询 HTML净化与格式化 支持元素遍历与属性提取 应用场景&#xff1a;静态页面数据抽…

小白成长之路-vim编辑

文章目录 Vim一、命令模式二、插入模式3.a:进入插入模式&#xff0c;在当前光标的后一个字符插入![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/fd293c3832ed49e2974abfbb63eeb5bb.png)4.o: 在当前光标的下一行插入5.i:在当前光标所在字符插入&#xff0c;返回命令模…

[redis进阶六]详解redis作为缓存分布式锁

目录 一 什么是缓存 缓存总结板书: 二 使⽤Redis作为缓存 三 缓存的更新策略 1) 定期⽣成 2) 实时⽣成 四 面试重点:缓存预热,缓存穿透,缓存雪崩 和缓存击穿 1)缓存预热 2)缓存穿透 3)缓存雪崩 4)缓存击穿 五 分布式锁 板书: 1)什么是分布式锁 2)分布式锁的基…

【MySQL】数据表插入数据

个人主页&#xff1a;Guiat 归属专栏&#xff1a;MySQL 文章目录 1. 插入数据概述1.1 插入数据的重要性1.2 插入数据的基本原则 2. 基本插入语句2.1 INSERT INTO语法2.2 插入多行数据2.3 不指定列名的插入2.4 插入NULL和默认值 3. 高级插入技术3.1 使用子查询插入数据3.2 IGNOR…

软考-软件设计师中级备考 14、刷题 算法

一、考点归纳 1&#xff09;排序 2、查找 3、复杂度 4、经典问题 0 - 1 背包动态规划0 - 1 背包问题具有最优子结构性质和重叠子问题性质。通过动态规划可以利用一个二维数组来记录子问题的解&#xff0c;避免重复计算&#xff0c;从而高效地求解出背包能装下的最大价值。分…

【阿里云】阿里云 Ubuntu 服务器无法更新 systemd(Operation not permitted)的解决方法

零、前言 目前正在使用的Ubuntu服务器中&#xff0c;仅阿里云&#xff08;不止一台&#xff09;出现了这个问题&#xff0c;因此我判定是阿里云服务器独有的问题。如果你的服务器提供商不是阿里云&#xff0c;那么这篇文章可能对你没有帮助。 如果已经因为升级错误导致依赖冲突…

css 点击后改变样式

背景&#xff1a; 期望实现效果&#xff1a;鼠标点击之后&#xff0c;保持选中样式。 实现思路&#xff1a;在css样式中&#xff0c;:active 是一种伪类&#xff0c;用于表示用户当前正在与被选定的元素进行交互。当用户点击或按住鼠标时&#xff0c;元素将被激活&#xff0c;此…

采用AI神经网络降噪算法的语言降噪消回音处理芯片NR2049-P

随着AI时代来临.通话设备的环境噪音抑制也进入AI降噪算法时代. AI神经网络降噪技术是一款革命性的语音处理技术&#xff0c;他突破了传统单麦克风和双麦克风降噪的局限性,利用采集的各种日常环境中的噪音样本进行训练学习.让降噪算法具有自适应噪声抑制功能&#xff0c;可以根…

不用联网不用编程,PLC通过智能网关快速实现HTTP协议JSON格式与MES等系统平台双向数据通讯

智能网关IGT-DSER集成了多种PLC的原厂协议&#xff0c;方便实现各种PLC、智能仪表通过HTTP协议与MES等各种系统平台通讯对接。PLC内不用编写程序&#xff0c;设备不用停机&#xff0c;通过网关的参数配置软件(下载地址)配置JSON文件的字段与PLC寄存器地址等参数即可。 …

如何将两台虚拟机进行搭桥

将两台虚拟机实现网络互通&#xff08;“搭桥”&#xff09;需配置虚拟网络&#xff0c;以下是基于 VMware Workstation 和 VirtualBox 的详细操作指南&#xff08;以 Windows 系统为例&#xff0c;Linux 原理类似&#xff09;&#xff1a; 一、VMware Workstation 配置&#x…

Xianyu AutoAgent,AI闲鱼客服机器人

Xianyu AutoAgent是一款专为闲鱼平台开发的智能客服机器人系统&#xff0c;旨在提供全天候的自动化服务。它具备多专家协同决策、智能议价和上下文感知对话等功能&#xff0c;能够管理轻量级的对话记忆&#xff0c;利用完整的对话历史为用户提供更自然的交流体验。 Xianyu Aut…

键盘输出希腊字符方法

在不同操作系统中&#xff0c;输出希腊字母的方法有所不同。以下是针对 Windows 和 macOS 系统的详细方法&#xff0c;以及一些通用技巧&#xff1a; 1.Windows 系统 1.1 使用字符映射表 字符映射表是一个内置工具&#xff0c;可以方便地找到并插入希腊字母。 • 步骤&#xf…

什么是SparkONYarn模式

1. 什么是 Spark on YARN&#xff1f; Spark on YARN 是 Apache Spark 的一种部署模式&#xff0c;允许 Spark 应用程序在 Hadoop YARN 集群上运行&#xff0c;充分利用 YARN 的资源管理和调度能力。这种模式将 Spark 与 Hadoop 生态深度集成&#xff0c;使企业能够在同一集群…

【git】clone项目后续,github clone的网络配置,大型项目git log 输出txt,切换commit学习,goland远程,自存档

git网络配置&#xff0c;解决git clone github速度奇慢 git config --global http.proxy http://127.0.0.1:7897 git config --global https.proxy http://127.0.0.1:7897git log输出到文件&#xff08;便于checkout&#xff09; 这里有些字符如表情会乱码&#xff0c;不知道…

Java游戏服务器开发流水账(3)游戏数据的缓存简介

简介 游戏服务器数据缓存是一种在游戏服务器运行过程中&#xff0c;用于临时存储经常访问的数据的技术手段&#xff0c;旨在提高游戏性能、降低数据库负载以及优化玩家体验。游戏开发中数据的缓存可以使用Java自身的内存也可以使用MemCache&#xff0c;Redis&#xff0c;注意M…

STL?vector!!!

一、前言 之前我们借助手撕string加深了类和对象相关知识&#xff0c;今天我们将一起手撕一个vector&#xff0c;继续深化类和对象、动态内存管理、模板的相关知识 二、vector相关的前置知识 1、什么是vector&#xff1f; vector是一个STL库中提供的类模板&#xff0c;它是存储…

C++学习之路,从0到精通的征途:继承

目录 一.继承的概念及定义 1.继承的概念 2.继承的定义 (1)继承的定义格式 (2)继承基类成员访问方式的变化 二.基类与派生类间的转换 1.派生类对象赋值给基类的引用/指针 2. 派生类对象直接赋值给基类对象 三.继承的作用域 四.派生类的默认成员函数 1.构造函数 2.拷…

用vue和go实现登录加密

前端使用CryptoJS默认加密方法&#xff1a; var pass CryptoJS.AES.encrypt(formData.password, key.value).toString()使用 CryptoJS.AES.encrypt() 时不指定加密模式和参数时&#xff0c;CryptoJS 默认会执行以下操作 var encrypted CryptoJS.AES.encrypt("明文&quo…