TDMQ CKafka 版客户端实战指南系列之一:生产最佳实践

TDMQ CKafka 版客户端实战指南系列之一:生产最佳实践

导语

在当今数字化时代,数据的产生和流动呈爆发式增长,消息队列作为一种高效的数据传输和处理工具,在各种应用场景中发挥着关键作用。TDMQ CKafka 版作为一款分布式、高吞吐量、高可扩展性的消息系统,100% 兼容开源 Kafka API 2.4、2.8、3.2 版本 ,基于发布 / 订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。凭借高可用、数据压缩、同时支持离线和实时数据处理等优点,TDMQ CKafka 版广泛应用于日志压缩收集、监控数据聚合、流式数据集成等场景。

对于开发者而言,深入了解并熟练掌握 TDMQ CKafka 版的生产消费实践至关重要。它不仅能够帮助我们构建高效、稳定的数据传输和处理系统,还能在面对海量数据时,确保系统的性能和可靠性。本文将详细介绍 TDMQ CKafka 版的生产实践教程,包括生产消息的各个环节以及相关的参数配置和最佳实践,希望能为大家在实际项目中应用 TDMQ CKafka 版提供有益的参考和指导。

生产篇:步步为营,高效生产

Topic 使用与创建

配置要求:推荐节点的整倍数副本,减少数据倾斜问题,同步复制最小同步副本数为2,且同步副本数不能等于 Topic 副本数,否则宕机1个副本会导致无法生产消息。

创建方式:支持选择是否开启 CKafka 自动创建 Topic 的开关。选择开启后,表示生产或消费一个未创建的 Topic 时,会自动创建一个默认值包含3个分区和2个副本的 Topic,控制台支持修改默认值。

分区数估计

分区数的准确估算能实现数据的均衡分布。为了达到这个目的,分区数建议为节点数的整倍数。同时,还需结合预估流量来设置,按照 10MB/s 一个分区的标准来计算。例如,若一个 Topic 的预估吞吐为 100MB/s,那么建议设置分区数为 10。这样可以确保在高流量情况下,消息能均匀地分布在各个分区,避免某个分区负载过高。

失败重试

在分布式环境中,由于网络等原因,消息发送偶尔会失败,其原因可能是消息已经发送成功但是 ACK 机制失败或者是消息确实没有发送成功,这就需要设置合理的重试策略,您可以根据业务需求,设置以下重试参数:

  • Retries:用于设置重试次数,默认值为 3。重试不成功会触发报错,如果客户不接受消息丢失,建议改重试次数或者手动重试。

  • Retry.backoff.ms:设置重试间隔,建议设置为 1000。这个间隔时间可以让生产者在重试前等待一段时间,避免在短时间内频繁重试。

这样将能应对 Broker 的 Leader 分区出现无法立刻响应 Producer 请求的情况。

异步发送

消息发送接口通常是异步的,这意味着生产者在发送消息后不需要等待消息被完全处理就可以继续执行其他任务。如果想要接收发送的结果,可以使用 Send 方法中的 Callback 接口获取发送结果。

一个 Producer 对应一个应用

Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,建议一个应用对应一个 Producer。

Acks

Kafka 的 ACK 机制,指 Producer 的消息发送确认机制,Acks 参数决定了生产者在发送消息后等待服务端响应的方式,对 Kafka 集群的吞吐量和消息可靠性有直接影响。

Acks 的参数说明如下:

  • Acks=0 时,当生产者采用无确认机制时,消息发送后无需等待任何 Broker 节点的响应即可继续执行,这种模式可获得最高的吞吐性能,但因缺乏写入保障机制,存在较高的数据丢失风险;

  • Acks=1 时,采用主节点单确认机制时,生产者仅需等待 Leader 副本完成消息写入即会收到确认响应。该模式在性能与可靠性间取得平衡,但需注意:若 Leader 节点在同步完成前发生故障,已发送但未同步的消息存在部分丢失的可能性;

  • Acks=all 时,启用全副本确认机制时,生产者必须等待 Leader 副本及所有同步副本(ISR 集合)均完成消息持久化后才会收到确认。虽然该模式通过多重冗余保障实现了最高级别的数据安全性(仅当整个 ISR 集群同时失效时才会丢失数据),但跨节点同步带来的延迟使其吞吐性能相对较低。

一般建议选择 Acks=1,对于重要的服务可以设置 Acks=all 。

Batch

通常情况下,TDMQ CKafka 版的 Topic 会设置多个分区。Producer 客户端向服务端发送消息时,要先明确消息要发送到哪个 Topic 的哪个分区。当向同一分区发送多条消息时,Producer 客户端会把这些消息整合成一个 Batch,批量发送至服务端。不过,Producer 客户端在处理 Batch 时会产生额外开销。一般来说,小 Batch 会使 Producer 客户端产生大量请求,致使请求在客户端和服务端堆积排队,还会提高相关机器的 CPU 使用率,进而整体抬高消息发送和消费的延迟。而设置一个合适的 Batch 大小,能减少客户端向服务端发送消息时的请求次数,从整体上提升消息发送的吞吐量。

以下是 Batch 相关参数的说明:

  • Batch.size:这是发往每个分区的消息缓存量阈值,当缓存的消息量达到这个设定值时,就会触发一次网络请求,随后 Producer 客户端会把消息批量发送到服务器。

  • Linger.ms:它规定了每条消息在缓存中的最长停留时间,如果消息在缓存中的时间超过这个值,Producer 客户端就会不再遵循 Batch.size 的限制,直接把消息发送到服务器。

  • Buffer.memory:当所有缓存消息的总体大小超过这个数值时,就会触发消息发送到服务器的操作,此时会忽略 Batch.size 和 Linger.ms 的限制。Buffer.memory 的默认值是 32MB,对于单个 Producer 而言,这个数值足以保障其性能。

Key 和 Value

消息队列中的消息有 Key(消息标识)和 Value(消息内容)两个字段。为消息设置一个唯一的 Key 便于追踪消息,通过打印发送日志和消费日志,就能了解该消息的生产和消费情况。比如在电商订单系统中,将订单号作为 Key,就可以轻松追踪订单消息的流转过程。如果消息发送量较大,建议不要设置 Key,并使用黏性分区策略。

黏性分区

在消息队列 Kafka 中,只有被发送至同一分区的消息才会被归入同一个 Batch,所以 Kafka Producer 端配置的分区策略是影响 Batch 形成的关键因素之一。Kafka Producer 支持用户通过自定义 Partitioner 实现类,来契合业务需求选择合适的分区方式。

当消息指定了 Key 时,Kafka Producer 默认会先对消息的 Key 进行哈希计算,再依据哈希结果选定分区,以此确保相同 Key 的消息能够发送到同一分区。

当消息未指定 Key 时,在 Kafka 2.4 版本之前,其默认分区策略是按顺序循环遍历主题下的所有分区,以轮询形式把消息依次发送到各分区。不过,这种默认策略在 Batch 聚合方面表现不佳,实际应用中容易生成大量小 Batch,进而导致消息处理的实际延迟上升。为改善无 Key 消息分区效率低的问题,Kafka 在 2.4 版本推出了黏性分区策略(Sticky Partitioning Strategy)。

黏性分区策略重点针对无 Key 消息被分散到不同分区、进而产生众多小 Batch 的问题。它的核心机制是,当某个分区的 Batch 处理完毕,会随机挑选另一个分区,之后尽可能让后续消息都发送到这个新选定的分区。从短期视角看,消息会集中发送到同一分区;但从长期运行来看,消息仍能均匀分布到各个分区。如此一来,既避免了消息在分区上分布不均(分区倾斜),又能降低延迟,提升整个服务的性能。

要是你用的 Kafka Producer 客户端版本是 2.4 及以上,那默认就会采用黏性分区策略。要是客户端版本低于 2.4,你可以依据黏性分区策略的原理,自己动手实现分区策略,再通过参数 Partitioner.class 来指定所设置的分区策略。

关于黏性分区策略的实现,下面给出了 Java 版的代码示例,其核心逻辑是按照一定时间间隔来切换分区。

public class MyStickyPartitioner implements Partitioner {// 记录上一次切换分区时间。private long lastPartitionChangeTimeMillis = 0L;// 记录当前分区。private int currentPartition = -1;// 分区切换时间间隔,可以根据实际业务选择切换分区的时间间隔。private long partitionChangeTimeGap = 100L;public void configure(Map<String, ?> configs) {}/**      * Compute the partition for the given record.      ** @param topic The topic name      * @param key The key to partition on (or null if no key)      * @param keyBytes serialized key to partition on (or null if no key)      * @param value The value to partition on or null      * @param valueBytes serialized value to partition on or null      * @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取所有分区信息。List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);int availablePartitionSize = availablePartitions.size();// 判断当前可用分区。if (availablePartitionSize > 0) {handlePartitionChange(availablePartitionSize);return availablePartitions.get(currentPartition).partition();} else {handlePartitionChange(numPartitions);return currentPartition;}} else {// 对于有key的消息,根据key的哈希值选择分区。return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private void handlePartitionChange(int partitionNum) {long currentTimeMillis = System.currentTimeMillis();// 如果超过分区切换时间间隔,则切换下一个分区,否则还是选择之前的分区。if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap|| currentPartition < 0 || currentPartition >= partitionNum) {lastPartitionChangeTimeMillis = currentTimeMillis;currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;}}public void close() {}
}

分区顺序

单个分区(Partition)内,消息是按照发送顺序储存的,是基本有序的。每个主题下面都有若干分区,如果消息被分配到不同的分区中,不同 Partition 之间不能保证顺序。

如果需要消息具有消费顺序性,可以在生产端指定这一类消息的 Key,这类消息都用相同的 Key 进行消息发送,CKafka 就会根据 Key 哈希取模选取其中一个分区进行存储,由于一个分区只能由一个消费者进行监听消费,此时消息就具有消息消费的顺序性了。

TDMQ CKafka 版顺序消息场景实践教程

顺序消息场景

在 TDMQ CKafka 版中,确保消息顺序性的主要手段依赖于其分区(Partition)设计以及消息 Key 的使用。客户端所涉及的顺序消息使用场景可分为两类:一是全局顺序场景,二是分区顺序场景。针对这两种场景,CKafka 的实践教程如下:

  1. 全局顺序:为保证全局顺序,在 CKafka 控制台,您需设置 Topic 分区为1,副本数可以根据具体使用场景和可用性要求以及平衡成本指定,建议设置为2。

    全局顺序由于单分区存在吞吐上限,因此整体吞吐不会太高,单分区吞吐指标请参见:https://cloud.tencent.com/document/product/597/52489。

  2. 分区顺序:为保证分区顺序,您可以根据预估 Topic 的业务流量,除以单分区流量,取整后获得分区数,同时为避免数据倾斜,分区数尽量向节点整倍数取整,从而确定最终合理的分区数。单分区的吞吐量可参见:https://cloud.tencent.com/document/product/597/52489。

    在发送 CKafka 消息时候,需要指定 Key,CKafka 会根据 Key 计算出一个哈希值,确保具有相同 Key 的消息会被发送到同一个分区,从而确保这些消息在分区内部是有序的。同时建议尽可能让业务 Key 分散,如果生产消息都指定同一个 Key,那么分区顺序会退化为全局顺序,从而降低整体的写入吞吐。

参数实践教程

由于顺序消息,要求消息有序、不重复,默认的 Kafka 生产者发送参数当遇到网络抖动、Kafka Broker 节点变化、分区 Leader 选举等场景,容易出现消息重复、乱序问题,因此顺序场景,必须对 Kafka 生产者参数进行特别设置,关键参数设置如下:

  • Enable.idempotence

Enable.idempotence 表示是否开启幂等功能。顺序场景建议开启幂等功能,应对上述场景出现的分区消息乱序、消息重复等问题。建议 Kafka 的 Producer 设置:Enable.idempotence 为 True。需要注意,该功能要确保 Kafka 的 Broker 版本大于等于0.11,即 Kafka versions >= 0.11,同时:从 Kafka 3.0开始包括3.0,Kafka 的 Producer 默认 Enable.idempotence=True 和 Acks=All ,而对于 Kafka 版本>=0.11且 Kafka<3.0 的版本,默认是关闭幂等的,因此建议顺序场景显式指定该参数值确保开启幂等。

  • Acks

在开启幂等后,Acks 需要显式指定为 All,如果不指定为 All 的话,则会无法通过参数校验从而报错。

  • Max.in.flight.requests.per.connection

默认情况下,Kafka 生产者会尝试尽快发送记录,Max.in.flight.requests.per.connection 表示一个 Connection 同时发送的最大请求数,默认值是5。Kafka 在 0.11 版本之后包括 0.11,小于 1.1 的版本,即(Kafka >= 0.11 & < 1.1),Kafka Broker 没有针对该方面优化,需要设置

Max.in.flight.requests.per.connection 为1,在 Kafka>=1.1 后,针对幂等场景的吞吐进行优化,在 Broker 端会维持一个队列对5个并发批次的消息的顺序进行顺序校验,允许 Max.in.flight.requests.per.connection 设置5,但不能大于5。

因此建议:

  • Kafka >= 0.11 & < 1.1:显式设置 Max.in.flight.requests.per.connection 为1。

  • Kafka>=1.1:显式设置 Max.in.flight.requests.per.connection 可以为 1<=Max.in.flight.requests.per.connection<=5;建议设置为5。

  • Retries

在顺序场景下,建议指定重试参数,Retries 在不同版本,有不同的默认行为,在 Kafka <= 2.0,默认为0;Kafka >= 2.1,默认为 Integer.MAX_VALUE,即2147483647;建议顺序场景,显式设置为 Integer.MAX_VALUE。

总结

在顺序场景中,需要开启的生产者参数示例如下:

Kafka >= 0.11 & < 1.1:

// create Producer properties
Properties properties = new Properties();
properties.setProperty("enable.idempotence", "true");
properties.setProperty("acks", "all");
properties.setProperty("max.in.flight.requests.per.connection", "1");
properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));

Kafka>=1.1:

// create Producer properties
Properties properties = new Properties();
properties.setProperty("enable.idempotence", "true");
properties.setProperty("acks", "all");
properties.setProperty("max.in.flight.requests.per.connection", "5");
properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));

数据倾斜

Kafka Broker 数据倾斜问题通常是由于分区分布不均匀或者生产者发送数据的 Key 分布不均匀导致的,会引发几类问题:

  1. 整体流量没有限流,但是节点局部限流;

  2. 某些节点负载过大,导致整体 Kafka 使用率不高,影响整体吞吐。

针对该类问题可以通过以下方式进行优化:

  1. 使用合理分区数,分区数保障为节点数的整倍数。

  2. 合理的分区策略,例如:RoundRobin(轮询)、Range(范围)和 Sticky(粘性)或者自定义的分区策略,均衡发送消息。

  3. 查是否使用 Key 进行发送,如果使用了 Key 进行发送,尽量设计策略让 Key 更加分区均衡。

总结

在消息队列的消息生产环节中,“高效” 不仅是吞吐的追求,更是稳定性与可靠性的平衡。无论是 Topic 的副本与分区设计、重试策略的精细调优,还是顺序消息的场景化实现,每一个配置细节都可能影响集群的整体性能。本文围绕 TDMQ CKafka 版的生产实践,详解如何通过合理的参数设置与策略选择,构建高可靠、低延迟的消息生产链路,避免数据倾斜、消息乱序等典型问题,为业务流量的平稳流转奠定基础。下一篇,我们将会为大家详细介绍 TDMQ CKafka 版的消费实践,敬请期待!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/907849.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ACM出版】2025年人工智能、虚拟现实与交互设计国际学术会议(AIVRID 2025)

2025年人工智能、虚拟现实与交互设计国际学术会议(AIVRID 2025)将于2025年10月17-19日在中国广东省东莞市召开。【ACM出版社出版-高录用,快检索-最快见刊后1个月EI & Scopus检索】 【海内外高校、IEEE Fellow等…

《sklearn机器学习——特征提取》 - 指南

《sklearn机器学习——特征提取》 - 指南pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "M…

Windows 10 11 Boot Fix

修復啟動程式碼Bootrec.exe 例如主開機記錄、開機設定資料儲存 (BCD) 和開機磁區。 更新主開機記錄和開機磁區程式碼來修復問題。修復引導程式碼 步驟1.按「Windows + S」開啟搜尋框。然後,在其中鍵入cmd 或命令提示符…

ubuntu 安装 milvus

docker 是 引擎,是底层的基础工具。它用于管理单个容器(Container)的生命周期(构建、运行、停止、删除)。 docker-compose 是 编排工具,是上层的操作界面。它用于定义和运行由多个容器组成的、相互关联的整套应用…

完整教程:MySQL并发问题解析

完整教程:MySQL并发问题解析2025-09-19 15:02 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important…

面向多模态检索的向量数据库对比分析和技术选型:Elasticsearch、Milvus、Pinecone、FAISS、Chroma、PGVector、Weaviate、Qdrant

目录1.向量数据库1.1 Elasticsearch 1.2 Milvus 1.3 Pinecone 1.4 FAISS 1.5 Chroma 1.6 PGVector 1.7 Weaviate 1.8 Qdrant2.向量数据库对比分析 3.多模态大规模图文检索选型3.1需求分析 3.2推荐方案 3.3实施建议 1.…

SI/PI学习笔记1 -20250911

SI/PI学习笔记1 -20250911作为硬件测试工程师,随着信号传输速度的迭代升级,分析信号和电源完整性将成为一个必然的趋势。SI全称为Signal Integrity 信号完整性,PI全称为Power Integrity 电源完整性。 从测试方面来…

Web开发工具一套式部署Maven/Nvm/Mysql/Redis - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

单词翻转

点击查看代码 #include<iostream> #include<cstring> using namespace std; int main() {char s[105];gets_s(s);int len = strlen(s);s[len] = \0;char d[105];d[len] = \0;for (int i = 0; i < len; …

详细介绍:kafka如何保证消息不被重复消费

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

终结AI幻觉:Amazon Bedrock如何用形式化方法重塑可信AI

AI幻觉(Hallucination)是高价值企业级AI应用落地的核心壁垒。本文从技术视角深入剖析了RAG架构中幻觉的多重根源,指出传统概率性缓解方案的局限性。进而,重点解析了**亚马逊云科技(Amazon Web Services)** 在其全…

技术解读 | OceanBase 数据库诊断与调优的关键技术与方法

技术解读 | OceanBase 数据库诊断与调优的关键技术与方法最高明的诊断调优,是让风险消弭于无形。本文聚焦于体系化的诊断调优方法论,旨在通过结构化流程与关键技术,帮助开发者建立"数据驱动、工具赋能"的…

我代表编程导航,向大家道歉!

大家最近访问我们网站可能会遇到很多莫名其妙的 Bug。这是因为最近我们网站前端正在进行技术升级,为了保险起见,我们选择 灰度发布,结果翻车了。对不起,我代表编程导航,向大家道歉! 大家最近访问网站可能会遇到…

cf div2 1051 E(视角转换,构造+思维)

E 一道简约清新的构造题,感觉这种构造题真的很难得。 回顾题意:给定一个括号串,每次可以翻转两个相邻的相同括号,任意次,问能否将原序列变成一个 \(RBS\),并给出构造方案。 直接按原操作的角度来想是很困难的。这…

从“被动监控”到“主动优化”:MyEMS 重构能源管理价值的路径

长期以来,能源管理对于许多企业和园区而言,更像是一个“事后诸葛亮”式的成本中心。其工作重心往往停留在安装电表、记录数据、生成月度报表的初级阶段,这是一种典型的被动监控模式。它虽然能告诉我们“用了多少能”…

openHarmony之开源三方库zlib适配讲解 - 实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

phoenix 导出sql执行结果到文件中

怎么使用phoenix服务安装包自带的sqlline.py文件,执行sql语句,并将结果导出到本地文件。连接phoenix ./sqlline.py zkmaster:2181执行导出逻辑 # 可以省略或设置为默认的table格式 !outputformat table # 输出的结果…

LK32V12A 过压/过流保护开关芯片 OVP过压45V 过流2.2A电流 SOT-23L

LK32V12A 过压/过流保护开关芯片 OVP过压45V 过流2.2A电流 SOT-23L概述 LK32V12A是一款过压/过流保护开关芯片, 该芯片内置高耐压功率MOSFET,芯片对输入 电压和输出电流以持续监测,当芯片监测到 输入电压或输出电流…

为什么企业需要高防IP - 详解

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

论文解读-《Graph Retrieval-Augmented Generation A Survey》 - zhang

1. 论文介绍 论文题目:Graph Retrieval-Augmented Generation: A Survey 论文领域:RAG 论文发布:2024.08 发布于Arxiv 论文代码:https://github.com/pengboci/GraphRAG-Survey 论文背景:2. 论文摘要 近年来,检索…