RabbitMQ中的Work Queues模式

在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件之一。RabbitMQ 是一个广泛使用的开源消息代理软件,支持多种消息传递模式。其中,Work Queues(工作队列)模式是一种常见的模式,用于在多个消费者之间分配任务,从而实现负载均衡和提高系统的处理能力。下面将详细介绍 RabbitMQ 中的 Work Queues 模式。

1. 什么是 Work Queues 模式?

Work Queues 模式(也称为任务队列模式)是一种消息传递模式,用于在多个消费者之间分配任务。在这种模式下,生产者将任务(消息)发送到队列中,多个消费者从队列中获取任务并进行处理。每个任务只会被一个消费者处理,从而实现负载均衡。

在这里插入图片描述

Work Queues 模式的主要优点包括:

  1. 负载均衡:多个消费者可以并行处理任务,从而提高系统的处理能力。
  2. 解耦:生产者和消费者之间通过队列进行通信,彼此之间不需要直接交互。
  3. 异步处理:任务可以异步处理,生产者不需要等待任务完成即可继续执行其他操作。

2. Work Queues 模式的工作原理

2.1 生产者(Producer)

生产者负责将任务(消息)发送到队列中。生产者不需要知道有多少消费者会处理这些任务,只需要将任务发送到队列即可。

2.2 队列(Queue)

队列是消息的缓冲区,用于存储生产者发送的任务。队列可以有多个消费者,但每个任务只会被一个消费者处理。

2.3 消费者(Consumer)

消费者从队列中获取任务并进行处理。多个消费者可以并行处理任务,从而实现负载均衡。消费者可以是独立的进程、线程或服务。

2.4 消息确认(Message Acknowledgment)

为了确保任务能够可靠地处理,RabbitMQ 提供了消息确认机制。消费者在处理完任务后,需要向 RabbitMQ 发送确认消息,告知任务已经处理完成。如果消费者在处理任务时崩溃,RabbitMQ 会将未确认的任务重新分配给其他消费者。

2.5 公平分发(Fair Dispatch)

默认情况下,RabbitMQ 会按顺序将任务分发给消费者。然而,如果某些消费者处理任务的速度较慢,可能会导致任务堆积。为了避免这种情况,可以使用 basicQos 方法设置预取计数(prefetch count),限制每个消费者一次可以获取的任务数量,从而实现更公平的分发。

3. 环境准备

在开始之前,确保你已经安装了以下环境:

  • Java 开发环境(JDK 8 或更高版本)
  • RabbitMQ 服务器(已启动并运行)
  • Maven(用于管理依赖)

3.1 添加依赖

首先,在你的项目中添加 RabbitMQ 客户端库的依赖。如果你使用的是 Maven,可以在 pom.xml 中添加以下依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

4. 代码案例

4.1 生产者代码

生产者负责将任务发送到队列。以下是一个简单的生产者代码示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {private static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.将消息发送到队列for (int i = 1; i <= 10; i++) {String message = "Task to be processed, " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}}
}

代码解释:

  • ConnectionFactory: 用于创建与 RabbitMQ 服务器的连接。
  • Connection: 表示与 RabbitMQ 服务器的物理连接。
  • Channel: 表示与 RabbitMQ 服务器的逻辑连接,用于发送和接收消息。
  • queueDeclare: 声明一个队列,true 表示队列是持久的。
  • basicPublish: 将消息发布到队列,使用默认交换机(空字符串)。

4.2 消费者代码

消费者负责从队列中获取任务并处理。以下是一个简单的消费者代码示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1); // 每次只处理一个消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");//手动ack,因为不同的机器处理速度不一样,因此不同的机器会在不同时间应答,这样机器就可以根据实际能力处理了channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

代码解释:

  • basicQos(1): 设置每次只处理一个消息,确保任务的公平分配。
  • DeliverCallback: 定义消息处理逻辑,doWork 方法模拟任务处理过程。
  • basicAck: 确认消息处理完成,从队列中移除消息。

5. 运行示例

  1. 启动 RabbitMQ 服务器:确保 RabbitMQ 服务器已启动并运行。
  2. 运行多个消费者:启动多个消费者实例,确保它们连接到同一个队列。
  3. 运行生产者:启动生产者实例,发送任务到队列。

示例输出:

  • 生产者输出

     [x] Sent 'Task to be processed, 1'[x] Sent 'Task to be processed, 2'[x] Sent 'Task to be processed, 3'[x] Sent 'Task to be processed, 4'[x] Sent 'Task to be processed, 5'[x] Sent 'Task to be processed, 6'[x] Sent 'Task to be processed, 7'[x] Sent 'Task to be processed, 8'[x] Sent 'Task to be processed, 9'[x] Sent 'Task to be processed, 10'
    
  • 消费者1输出

     [*] Waiting for messages. To exit press CTRL+C[x] Received 'Task to be processed, 1'[x] Done[x] Received 'Task to be processed, 4'[x] Done[x] Received 'Task to be processed, 6'[x] Done[x] Received 'Task to be processed, 8'[x] Done[x] Received 'Task to be processed, 10'[x] Done
    
  • 消费者2输出

     [*] Waiting for messages. To exit press CTRL+C[x] Received 'Task to be processed, 2'[x] Done[x] Received 'Task to be processed, 3'[x] Done[x] Received 'Task to be processed, 5'[x] Done[x] Received 'Task to be processed, 7'[x] Done[x] Received 'Task to be processed, 9'[x] Done
    

在这里插入图片描述

6. 总结

本文详细介绍了如何在 RabbitMQ 中实现 Work Queues 模式,包括生产者、默认交换机、队列和多个消费者的设计与实现。通过使用 RabbitMQ 的 Java 客户端库,我们可以轻松地实现任务的分配和处理。Work Queues 模式非常适合需要将任务分配给多个消费者处理的场景,如任务调度、日志处理等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/64403.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode hot100-81

https://leetcode.cn/problems/climbing-stairs/description/?envTypestudy-plan-v2&envIdtop-100-liked 70. 爬楼梯 已解答 简单 相关标签 相关企业 提示 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&…

【Python爬虫系列】_032.Scrapy_全站爬取

课 程 推 荐我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈虚 拟 环 境 搭 建 :👉👉 Python项目虚拟环境(超详细讲解) 👈👈PyQt5 系 列 教 程:👉👉 Python GUI(PyQt5)教程合集 👈👈

音频声音太小怎么调大?调大音频声音的几种方法

音频声音太小怎么调大&#xff1f;音频声音过小可能由多种原因引起。从设备本身的硬件设置&#xff0c;到应用程序或播放软件的音量控制&#xff0c;再到文件本身的音频质量&#xff0c;都可能是导致声音过小的因素。尤其是在观看视频或听音乐时&#xff0c;若音量过低&#xf…

条件随机场(CRF)详解:原理、算法与实现(深入浅出)

目录 1. 引言2. 什么是条件随机场&#xff1f;2.1 直观理解2.2 形式化定义 3. CRF的核心要素3.1 特征函数3.2 参数学习 4. 实战案例&#xff1a;命名实体识别5. CRF vs HMM6. CRF的优化与改进6.1 特征选择6.2 正则化 7. 总结与展望参考资料 1. 引言 条件随机场(Conditional Ra…

基于Clinical BERT的医疗知识图谱自动化构建方法,双层对比框架

基于Clinical BERT的医疗知识图谱自动化构建方法&#xff0c;双层对比框架 论文大纲理解1. 确认目标2. 目标-手段分析3. 实现步骤4. 金手指分析 全流程核心模式核心模式提取压缩后的系统描述核心创新点 数据分析第一步&#xff1a;数据收集第二步&#xff1a;规律挖掘第三步&am…

什么是MyBatis?

MyBatis 是一个优秀的持久层框架&#xff0c;它消除了几乎所有的 JDBC 代码和手动设置参数以及获取结果集的工作。MyBatis 使用简单的 XML 或注解用于配置和原始映射&#xff0c;将接口和 Java 的 POJOs&#xff08;Plain Old Java Objects&#xff0c;普通的 Java对象&#xf…

MySQL事务与锁机制详细讲解

事务与锁机制是数据库系统中非常重要的概念&#xff0c;尤其在 MySQL 这样的关系型数据库中&#xff0c;它们决定了数据的 一致性、完整性 和 并发控制。下面我将详细讲解事务和锁机制&#xff0c;分步骤深入分析。 一、事务&#xff08;Transaction&#xff09; 1. 什么是事务…

LWIP协议:三次握手和四次挥手、TCP/IP模型

一、三次握手&#xff1a;是客户端与服务器建立连接的方式&#xff1b; 1、客户端发送建立TCP连接的请求。seq序列号是由发送端随机生成的&#xff0c;SYN字段置为1表示需要建立TCP连接。&#xff08;SYN1&#xff0c;seqx&#xff0c;x为随机生成数值&#xff09;&#xff1b;…

使用winscp从windows访问Ubuntu进行文件传输

Ubuntu 系统上的准备工作 • 安装 SSH 服务器&#xff1a; 确保 Ubuntu 系统上已经安装了 SSH 服务器。如果没有安装&#xff0c;可以使用以下命令安装&#xff1a; sudo apt update sudo apt install openssh-server • 启动 SSH 服务&#xff1a; 确保 SSH 服务正在运行&a…

Springboot中使用Retrofit

Retrofit官网 https://square.github.io/retrofit/ 配置gradle implementation("com.squareup.okhttp3:okhttp:4.12.0")implementation ("com.squareup.retrofit2:retrofit:2.11.0")implementation ("com.squareup.retrofit2:converter-gson:2.11.0…

[机器学习]AdaBoost(数学原理 + 例子解释 + 代码实战)

AdaBoost AdaBoost&#xff08;Adaptive Boosting&#xff09;是一种Boosting算法&#xff0c;它通过迭代地训练弱分类器并将它们组合成一个强分类器来提高分类性能。 AdaBoost算法的特点是它能够自适应地调整样本的权重&#xff0c;使那些被错误分类的样本在后续的训练中得到…

PHP代码审计学习(一)--命令注入

1、漏洞原理 参数用户可控&#xff0c;程序将用户可控的恶意参数通过php可执行命令的函数中运行导致。 2、示例代码 <?php echorec-test; $command ping -c 1 .$_GET[ip]; system($command); //system函数特性 执行结果会自动打印 ?> 通过示例代码可知通过system函…

【并发容器】源码级ConcurrentHashMap详解(java78)

1. ConcurrentHashMap 为什么要使用ConcurrentHashmap 在多线程的情况下&#xff0c;使用HashMap是线程不安全的。另外可以使用Hashtable&#xff0c;其是线程安全的&#xff0c;但是Hashtable的运行效率很低&#xff0c;之所以效率低下主要是因为其实现使用了synchronized关…

Redis的基本使用命令(GET,SET,KEYS,EXISTS,DEL,EXPIRE,TTL,TYPE)

目录 SET GET KEYS EXISTS DEL EXPIRE TTL redis中的过期策略是怎么实现的&#xff08;面试&#xff09; 上文介绍reids的安装以及基本概念&#xff0c;本章节主要介绍 Redis的基本使用命令的使用 Redis 是一个基于键值对&#xff08;KEY - VALUE&#xff09;存储的…

基于SpringBoot的乡村信息服务平台的设计与实现

摘 要 乡村信息服务平台的研究背景源于当前乡村振兴战略的实施和信息化技术的快速发展。随着城乡经济差距的逐渐凸显&#xff0c;乡村信息服务平台成为一种新型的信息化手段。本系统采用Java语言&#xff0c;MySQL数据库&#xff0c;采用MVC框架, JS技术开发。乡村信息服务平…

大数据技术与应用——数据可视化(山东省大数据职称考试)

大数据分析应用-初级 第一部分 基础知识 一、大数据法律法规、政策文件、相关标准 二、计算机基础知识 三、信息化基础知识 四、密码学 五、大数据安全 六、数据库系统 七、数据仓库. 第二部分 专业知识 一、大数据技术与应用 二、大数据分析模型 三、数据科学 数据可视化 大…

sql中case when若条件重复 执行的顺序

sql case when若条件重复 执行的顺序 在 SQL 中&#xff0c;如果你在 CASE 表达式中定义了多个 WHEN 子句&#xff0c;并且这些条件有重叠&#xff0c;那么 CASE 表达式的执行顺序遵循以下规则&#xff1a; &#xff08;1&#xff09;从上到下&#xff1a;SQL 引擎会按照 CASE …

【并发容器】ConcurrentLinkedQueue:优雅地实现非阻塞式线程安全队列

实现一个线程安全的队列有两 种方式:一种是使用阻塞算法&#xff0c;另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方 式则可以使用循环CAS的方式来实现。 1. 简介 ConcurrentLi…

Flink CDC 读取oracle库数据性能优化

通过综合考虑Oracle数据库配置、Flink作业配置以及其他优化措施&#xff0c;可以显著提升Flink CDC读取Oracle库数据的性能和效率。可以从以下几个方面进行&#xff1a; 一、Oracle数据库配置优化 ‌开启归档日志‌&#xff1a; 通过执行sqlplus /assysdba或sqlplus/nolog命令…

SpringAop-拦截参数带注解的方法

拦截方法中参数类型为String 且带有Crypto注解的方法&#xff1a;execution(* *(..,Crypto (String),..)) 拦截方法中参数上带有Crypto注解的方法&#xff1a;execution(* *(..,Crypto (*),..)) ..&#xff1a;零个或者多个 *&#xff1a;通配符 样例 /*** 针对带有Crypto…