详细介绍:云原生时代 Kafka 深度实践:05性能调优与场景实战

news/2025/10/6 12:51:05/文章来源:https://www.cnblogs.com/wzzkaifa/p/19127504

5.1 性能调优全攻略

Producer调优

批量发送与延迟发送

通过调整batch.sizelinger.ms参数提升吞吐量:

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  // 默认16KBprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);      // 等待10ms以积累更多消息
  • batch.size:批量发送的字节数,达到该大小或linger.ms超时即发送。
  • linger.ms:消息在缓冲区的最大停留时间,即使未达到batch.size也会发送。
压缩算法选择

启用压缩可显著减少网络传输和磁盘存储开销:

props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");  // 可选:gzip、snappy、lz4、zstd
  • Snappy:压缩速度快,压缩比适中。
  • LZ4:压缩比和速度平衡,推荐大多数场景。
  • ZSTD:压缩比最高,但CPU开销较大。

Broker调优

内存与线程配置

调整Broker的网络和IO线程池大小:

# server.propertiesnum.network.threads=8    # 网络处理线程数,默认3num.io.threads=16        # IO处理线程数,默认8socket.send.buffer.bytes=102400  # 发送缓冲区大小,默认100KBsocket.receive.buffer.bytes=102400  # 接收缓冲区大小,默认100KB
磁盘与日志管理

优化日志存储和清理策略:

# 日志段滚动大小,默认1GBlog.segment.bytes=536870912   # 日志保留时间,默认7天log.retention.hours=168   # 日志清理策略:delete(按时间删除)或compact(按key压缩)log.cleanup.policy=delete   # 后台日志清理线程数log.cleaner.threads=2

Consumer调优

并行消费与反序列化优化

增加Consumer实例数或使用多线程消费:

// 增加Consumer Group中的Consumer数量,实现分区级并行KafkaConsumer consumer1 = new KafkaConsumer<>(props);KafkaConsumer consumer2 = new KafkaConsumer<>(props);consumer1.subscribe(Collections.singletonList("topic"));consumer2.subscribe(Collections.singletonList("topic")); // 或在单个Consumer中使用多线程处理消息ExecutorService executor = Executors.newFixedThreadPool(10);while (true) {    ConsumerRecords records = consumer.poll(Duration.ofMillis(100));    for (ConsumerRecord record : records) {        executor.submit(() -> process(record));    }}

使用高效的序列化格式(如Protobuf替代JSON):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufDeserializer.class.getName());

5.2 实战场景模拟

场景一:高并发日志采集(每秒10W+消息写入)

架构设计
性能测试

使用kafka-producer-perf-test.sh工具测试写入性能:

bin/kafka-producer-perf-test.sh --topic log-topic --num-records 10000000 \  --record-size 100 --throughput -1 --producer-props bootstrap.servers=localhost:9092

场景二:实时数据分析(电商实时大屏)

数据流设计
  1. 数据源:用户浏览、下单、支付等行为数据实时写入Kafka。
  2. 流处理:Kafka Streams计算实时指标(如UV、GMV、转化率):
KStream userEvents = builder.stream("user-events-topic");KTable, Long> hourlyUV = userEvents    .selectKey((key, value) -> value.getUserId())    .groupByKey()    .windowedBy(TimeWindows.of(Duration.ofHours(1)))    .count(Materialized.as("hourly-uv-store")); hourlyUV.toStream()    .map((windowedKey, count) -> new KeyValue<>(windowedKey.key(), count))    .to("hourly-uv-topic", Produced.with(Serdes.String(), Serdes.Long()));
  1. 结果存储:计算结果写入Redis,供前端大屏实时查询。
性能优化
  • Kafka配置
    # 减少消息延迟queued.max.requests=1000replica.lag.time.max.ms=30000
  • Kafka Streams配置
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);  // 10MB缓存config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);  // 1秒提交一次

场景三:金融级数据一致性(事务消息实现分布式事务)

架构设计
  1. 订单服务:接收用户订单请求,发送订单创建消息到Kafka。
  2. 库存服务:消费订单消息,扣减库存,发送库存扣减结果。
  3. 支付服务:消费库存扣减结果,处理支付,发送支付结果。
事务消息实现
// 初始化事务producer.initTransactions(); try {    producer.beginTransaction();        // 发送订单创建消息    producer.send(new ProducerRecord<>("order-topic", orderId, order));        // 执行本地事务(如更新订单状态)    orderService.updateOrderStatus(orderId, "PROCESSING");        // 提交事务    producer.commitTransaction();} catch (Exception e) {    // 回滚事务    producer.abortTransaction();}
幂等性保障

消费端通过唯一ID去重,确保同一消息只处理一次:

@KafkaListener(topics = "inventory-topic")public void processInventory(InventoryMessage message) {    // 检查是否已处理过    if (inventoryService.isProcessed(message.getId())) {        return;    }        // 处理库存扣减    inventoryService.decreaseStock(message.getProductId(), message.getQuantity());        // 标记为已处理    inventoryService.markAsProcessed(message.getId());}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/929325.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入解析:AI破局:饿了么如何搅动即时零售江湖

深入解析:AI破局:饿了么如何搅动即时零售江湖pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", &…

从零开始学Flink:数据输出的终极指南

本文详细介绍了Flink数据输出(Sink)的核心概念、各种连接器的使用方法、配置选项及可靠性保证机制。基于Flink 1.20.1的DataStream API,通过丰富的代码示例展示了如何将处理后的数据输出到Kafka、Elasticsearch、文件…

asp 企业网站dw制作一个手机网站模板下载地址

初识RPC RPC VS REST HTTP Dubbo Dubbo 特性&#xff1a; 基于接口动态代理的远程方法调用 Dubbo对开发者屏蔽了底层的调用细节&#xff0c;在实际代码中调用远程服务就像调用一个本地接口类一样方便。这个功能和Fegin很类似&#xff0c;但是Dubbo用起来比Fegin还要简单很多&a…

六盘水网站建设求职简历杭州公司网站建设套餐

FPGA-结合协议时序实现UART收发器&#xff08;四&#xff09;&#xff1a;串口驱动模块uart_drive、例化uart_rx、uart_tx 串口驱动模块uart_drive、例化uart_rx、uart_tx&#xff0c;功能实现 文章目录 FPGA-结合协议时序实现UART收发器&#xff08;四&#xff09;&#xff1…

数据编织平台实现AI代理自助数据访问

数据管理初创公司发布即时数据编织平台重大更新,支持AI代理自助数据访问。平台采用自然语言处理与自动化数据准备技术,无需ETL流程即可跨数百个数据源提供统一访问,并配备上下文引擎和对话式AI助手。数据编织初创公…

高水平的锦州网站建设厦门网站设计大概多少钱

目录 一、简介 二、BeanFactory 三、FactoryBean 四、区别 五、使用场景 总结 一、简介 在Spring框架中&#xff0c;IOC&#xff08;Inversion of Control&#xff09;容器是一个核心组件&#xff0c;它负责管理和配置Java对象及其依赖关系&#xff0c;实现了控制反转&a…

广州设计公司网站磁县邯郸网站建设

是将若干个学习器(分类器&回归器)组合之后产生一个新学习器。弱分类器(weak learner)指那些分类准确率只稍微好于随机猜测的分类器(errorrate <。 集成算法的成功在于保证弱分类器的多样性(Diversity)。而且集成不稳定的算法也能够得到一个比较明显的性能提升。 常见的…

[题解]P12008 【MX-X10-T4】[LSOT-4] Fragment of Memories

P12008 【MX-X10-T4】[LSOT-4] Fragment of Memories 可以想到很 naive 的思路,对于每个 \(x\) 值二分答案 \(m\)。check 函数可以 \(O(n)\) 完成。总时间是 \(O(n^2\log n)\) 的。我们发现 check 函数明显还能凹,考…

自然语言处理(NLP)的系统学习路径规划 - 实践

自然语言处理(NLP)的系统学习路径规划 - 实践pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", &…

2.Android Compose 基础系列:在 Kotlin 中创建和使用变量

2.Android Compose 基础系列:在 Kotlin 中创建和使用变量在应用开发中,有些内容是固定的,例如“设置”中的菜单项;而有些内容会不断变化,比如新闻应用中的标题、来源或发布时间。为了让这些变化自动更新,而不是每…

wordpress开启多站点好处西安网站建设winkuo

shutil.copyfileobj() 是 Python 的 shutil 模块中用于复制文件对象内容的一个函数。它可以将一个文件对象的内容复制到另一个文件对象中。 shutil.copyfileobj(fsrc, fdst, length16*1024) fsrc: 源文件对象&#xff0c;即要从中复制内容的文件对象。fdst: 目标文件对象&…

线性表的顺序存储和链式存储

目录核心概念一、顺序存储(顺序表)1. 核心特点2. 实现方式3. 基本操作分析4. 优缺点总结二、链式存储(链表)1. 核心特点2. 实现方式(以单链表为例)3. 基本操作分析4. 优缺点总结三、对比总结表四、如何选择?核心…

AWS WebRTC:获取ICE服务地址(part 3):STUN服务和TURN服务的作用 - 实践

AWS WebRTC:获取ICE服务地址(part 3):STUN服务和TURN服务的作用 - 实践pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-famil…

完整教程:进阶配置与优化:配置 HTTPS 以确保数据安全传输

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

贵州省城乡和住房建设厅官方网站网络方案设计与实现

二、用go语言&#xff0c;给定一个n结点的二叉树&#xff0c;写出一个 O(n)时间的递归过程&#xff0c;将该树每个结点的关键字输出。 文心一言&#xff1a; 在Go语言中&#xff0c;你可以使用递归函数来遍历二叉树的所有节点&#xff0c;并输出每个节点的关键字。以下是一个…

Python中的对象池与驻留机制:小整数、字符串与大整数

Python中的对象池与驻留机制:小整数、字符串与大整数 Python为优化内存使用和执行效率,设计了多种对象复用机制,其中最典型的是小整数对象池、字符串驻留(intern)机制和大整数对象池。以下是整理后的详细说明,包…

基于ADMM无穷范数检测算法的MIMO通信系统信号检测MATLAB仿真,对比ML,MMSE,ZF以及LAMA

1.算法运行效果图预览 (完整程序运行后无水印)2.算法运行软件版本 matlab2024b3.部分核心程序 (完整版代码包含详细中文注释和操作步骤视频)global MR%表示接收天线的数量 global MT%表示发射天线的数量 global mods…

厦门做返利网站的公司室内设计培训网课

1、# 创建数据库语句create database mydb default character set utf8;# 运用数据库语句use mydb;# 创建表格&#xff0c;这里只简单的创建一张表格# 设置InnoDB主要是为了事务操作的需要create table mytable(id int primary key auto_increment,name varchar(20),count int …

python网站开发 django沈阳网站哪家公司做的好

1. escape 和 unescapeescape()不能直接用于URL编码&#xff0c;它的真正作用是返回一个字符的Unicode编码值。采用unicode字符集对指定的字符串除0-255以外进行编码。所有的空格符、标点符号、特殊字符以及更多有联系非ASCII字符都将被转化成%xx格式的字符编码(xx等于该字符在…

微服务各个部分的作用 - 详解

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …