002 Java操作kafka客户端

Java操作kafka客户端

文章目录

  • Java操作kafka客户端
    • 3.Java操作kafka客户端
      • 1.引入依赖
      • 2. Kafka服务配置
      • 3、生产者(Producer)实现
        • 1. 基础配置与发送消息
        • 2. 关键配置说明
      • 4.消费者(Consumer)实现
        • 1. 基础配置与消费消息
        • 2. 关键配置说明
      • 3.auto.offset.reset参数可选值及行为
        • 1.代码示例与行为验证
          • 1. 配置为 `earliest`
          • 2. 配置为 `latest`
          • 3. 配置为 `none`
        • 2.关键注意事项
          • 1. Offset 提交机制的影响
          • 2. 消费者组隔离性
          • 3. 命令行验证 Offset
        • 3、生产环境最佳实践
        • 4、常见问题解答
          • Q:配置了 `latest`,为什么还能消费到旧消息?
          • Q:如何让消费者组永久保留 Offset?
      • 5.主题管理示例(AdminClient)
      • 6.最佳实践与注意事项
      • 7.关于flush和close方法的说明

来源参考的deepseek,如有侵权联系立删

3.Java操作kafka客户端

Java API提供以下核心接口:

  • Producer API:发送消息。
  • Consumer API:订阅消息。
  • Streams API:流式处理。
  • Admin API:管理Topic和集群。

1.引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version> 
</dependency>

2. Kafka服务配置

确保已启动Zookeeper和Kafka服务,默认端口分别为21819092

3、生产者(Producer)实现

1. 基础配置与发送消息

无需提前创建topic

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 1. 配置生产者参数Properties props = new Properties();// Broker地址props.put("bootstrap.servers", "127.0.0.1:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 消息确认机制props.put("acks", "all");// 重试次数props.put("retries", 3);// 2. 创建生产者实例try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 3. 构造消息并发送for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", // 主题名称"key-" + i,   // 消息键"value-" + i  // 消息值);// 异步发送(可改用get()同步等待)producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.printf("消息发送成功:topic=%s, partition=%d, offset=%d%n",metadata.topic(), metadata.partition(), metadata.offset());} else {exception.printStackTrace();}});}producer.flush(); // 确保所有消息发送完成}}
}

在这里插入图片描述

2. 关键配置说明
参数说明
bootstrap.serversBroker地址列表,多个用逗号分隔
key.serializer键的序列化类(如StringSerializer)
value.serializer值的序列化类
acks消息持久化确认机制(0/1/all
retries发送失败后的重试次数
batch.size批量发送的消息大小(字节)

4.消费者(Consumer)实现

1. 基础配置与消费消息
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {// 1. 配置消费者参数Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test-group"); // 消费者组IDprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费props.put("enable.auto.commit", "false");   // 关闭自动提交偏移量// 2. 创建消费者实例try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题while (true) {// 3. 轮询消息(超时时间100ms)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}// 4. 手动提交偏移量(同步提交)consumer.commitSync();}}}
}

在这里插入图片描述
在这里插入图片描述

可收到实时消费的消息,但队列中消息并没有移除,

  • 消息保留规则由 Broker 配置控制,与消费者无关。
  • 消费者 Offset 仅标记消费进度,不会删除消息。
  • 通过 kafka-consumer-groups.sh 工具监控消费状态。
  • 生产环境中,合理设置 log.retention.hourslog.retention.bytes
2. 关键配置说明
参数说明
group.id消费者组ID,相同组内共享分区
auto.offset.reset无偏移量时的策略(earliest/latest
enable.auto.commit是否自动提交偏移量(建议false手动控制)
max.poll.records单次poll最大消息数

3.auto.offset.reset参数可选值及行为

作用典型场景
earliest从分区的最早消息开始消费(从头消费)需要处理 Topic 中所有历史消息
latest从分区的最新消息开始消费(仅消费新消息)实时处理最新数据,忽略历史消息
none抛出异常(NoOffsetForPartitionException需要严格确保 Offset 有效性
1.代码示例与行为验证
1. 配置为 earliest
props.put("auto.offset.reset", "earliest");

参数生效的触发条件

场景auto.offset.reset 是否生效消费起始位置
消费者组首次启动(无 Offset)根据参数值(earliest/latest
Offset 已提交且有效(未过期)从已提交 Offset 继续消费
Offset 已过期(消息被删除)根据参数值重新定位

行为

  • 如果消费者组首次启动,会从 Topic 每个分区的第一条消息开始消费。
  • 如果 Offset 过期(例如消息被删除),会从现存的最早消息开始消费。

适用场景

  • 数据回放(重放全部历史数据)
  • 测试环境需要消费完整数据集
2. 配置为 latest
props.put("auto.offset.reset", "latest");

行为

  • 如果消费者组首次启动,只消费启动后新写入的消息。
  • 如果 Offset 过期,会从当前最新消息开始消费。

适用场景

  • 生产环境实时处理(避免处理历史积压数据)
  • 日志收集系统(只需最新日志)
3. 配置为 none
props.put("auto.offset.reset", "none");

行为

  • 如果 Offset 无效,直接抛出 NoOffsetForPartitionException
  • 需手动处理异常或确保 Offset 始终有效。

适用场景

  • 高可靠性系统(需严格监控 Offset 有效性)
2.关键注意事项
1. Offset 提交机制的影响
  • 如果启用了自动提交 (enable.auto.commit=true),消费者会定期提交 Offset。
    重复消费风险:若消息处理失败但 Offset 已提交,会导致消息丢失。
  • 推荐做法
  props.put("enable.auto.commit", "false"); // 关闭自动提交// 处理完消息后手动提交 Offsetconsumer.commitSync();
2. 消费者组隔离性
  • 不同group.id的 Offset 互相独立。例如:
    • 消费者组 A(group.id=group1)配置为 latest → 只消费新消息。
    • 消费者组 B(group.id=group2)配置为 earliest → 可以消费全部消息。
3. 命令行验证 Offset

通过 Kafka 工具查看消费者组的 Offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group your-group-id

输出示例:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test-topic      0          5000            10000           5000
  • LAG:未消费的消息数量。若 LAG 持续增长,说明消费速度跟不上生产速度。

3、生产环境最佳实践
  1. 明确业务需求
    • 需要重放数据 → earliest
    • 仅处理实时数据 → latest
  2. 监控 Offset 提交
    • 使用 kafka-consumer-groups.sh 定期检查 LAG。
    • 集成监控系统(如 Prometheus + Grafana)。
  3. 防御性代码
   try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 处理消息consumer.commitSync(); // 同步提交}} catch (NoOffsetForPartitionException e) {// 处理 Offset 无效的极端情况logger.error("Offset 无效,需人工介入!", e);}

4、常见问题解答
Q:配置了 latest,为什么还能消费到旧消息?
  • 可能原因
    消费者组之前已提交过 Offset,且当前 Offset 指向旧消息位置。
  • 解决
    重置消费者组 Offset:
  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--group your-group-id --reset-offsets --to-latest --execute --topic test-topic
Q:如何让消费者组永久保留 Offset?
  • Kafka 默认行为
    Offset 存储在内部 Topic __consumer_offsets 中,默认保留时间为 7 天。
  • 修改保留策略
  # 修改 Offset 保留时间(单位:毫秒)bin/kafka-configs.sh --bootstrap-server localhost:9092 \--entity-type topics --entity-name __consumer_offsets \--alter --add-config retention.ms=604800000

5.主题管理示例(AdminClient)

import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaAdminDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");try (AdminClient admin = AdminClient.create(props)) {// 创建主题(3分区,1副本)NewTopic newTopic = new NewTopic("test-topic2", 3, (short) 1);CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));result.all().get(); // 阻塞等待创建完成System.out.println("主题创建成功");} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

在这里插入图片描述

6.最佳实践与注意事项

  1. 生产者优化
    • 启用压缩(compression.type=snappy)减少网络开销。
    • 合理设置batch.sizelinger.ms提高吞吐量。
  2. 消费者可靠性
    • 使用手动提交偏移量,避免消息丢失或重复消费。
    • 处理CommitFailedException,防止因处理超时导致提交失败。
  3. 序列化选择
    • 默认支持String、ByteArray等序列化器。
    • 复杂对象推荐使用JSON(Jackson)或Avro。
  4. 消费者组管理
    • 通过kafka-consumer-groups.sh工具监控消费进度。
    • 避免频繁重平衡(Rebalance),调整session.timeout.ms参数。

7.关于flush和close方法的说明

  • flush():强制发送缓冲区中所有未发送的消息(同步等待发送完成)
  • close():释放生产者占用的所有资源(包括线程、网络连接、内存等)

若未调用close()可能导致:

  • 线程泄漏:生产者后台的Sender线程未终止
  • 连接泄漏:与Broker的TCP连接未关闭
  • 内存泄漏:未释放消息缓冲区内存

可通过jstackVisualVM工具检查线程状态验证。

关键区别说明

方法作用是否必须调用是否自动包含对方功能
flush()清空发送缓冲区,确保所有消息被发送可选(按需调用)❌ 不释放资源
close()关闭生产者并释放资源必须调用✅ 内部会自动调用flush()

正确写法(推荐):

try (Producer<String, String> producer = new KafkaProducer<>(props)) {producer.send(record);producer.flush(); // 显式清空缓冲区(可选)
} // 自动调用close(),包含flush()

错误写法(资源泄漏):

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(record);
producer.flush(); 
// 忘记调用close() → 线程/连接未释放!

最佳实践建议

1.优先使用try-with-resources(Java 7+特性):

   try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 发送消息...} // 自动调用close()

这是最安全的写法,无需手动调用flush()close()

2.需要立即发送时

   producer.send(record);producer.flush(); // 强制立即发送(如实时系统关键消息)// ...其他操作...producer.close(); // 仍需显式关闭

3.不要依赖finalize()
Kafka客户端的finalize()方法已废弃,不能保证资源释放。

4.KafkaProducer.close()源码:

public void close() {close(Duration.ofMillis(Long.MAX_VALUE)); // 默认无限等待
}public void close(Duration timeout) {// ...flush();    // 内部自动调用flush()client.close(); // 释放网络资源metrics.close(); // 关闭监控指标// ...
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/71923.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SRC实战】信息泄露导致越权会员功能

01 — 漏洞证明 1、VIP功能 2、SVIP功能 3、点击任意用户发起私聊&#xff0c;发现userId纯数字可遍历 4、返回包泄露身高范围height&#xff0c;星座constellation&#xff0c;属相zodiac&#xff0c;恋爱目标purpose&#xff0c;教育程度degree&#xff0c;成功越权VIP功能 …

游戏引擎学习第125天

仓库:https://gitee.com/mrxiao_com/2d_game_3 回顾并为今天的内容做准备。 昨天&#xff0c;当我们离开时&#xff0c;工作队列已经完成了基本的功能。这个队列虽然简单&#xff0c;但它能够执行任务&#xff0c;并且我们已经为各种操作编写了测试。字符串也能够正常推送到队…

蓝桥杯 Java B 组之记忆化搜索(滑雪问题、斐波那契数列)

Day 5&#xff1a;记忆化搜索&#xff08;滑雪问题、斐波那契数列&#xff09; &#x1f4d6; 一、记忆化搜索简介 记忆化搜索&#xff08;Memoization&#xff09; 是一种优化递归的方法&#xff0c;它利用 哈希表&#xff08;HashMap&#xff09;或数组 存储已经计算过的结果…

反爬虫策略

反爬虫策略是网站用于防止自动化程序&#xff08;爬虫&#xff09;恶意抓取数据的核心手段&#xff0c;其设计需兼顾有效性、用户体验和合法性。 一、 基础检测与拦截 User-Agent检测&#xff1a;验证请求头中的User-Agent&#xff0c;拦截非常见或已知爬虫标识。IP频率限制&…

Java 实现快速排序算法:一条快速通道,分而治之

大家好&#xff0c;今天我们来聊聊快速排序&#xff08;QuickSort&#xff09;算法&#xff0c;这个经典的排序算法被广泛应用于各种需要高效排序的场景。作为一种分治法&#xff08;Divide and Conquer&#xff09;算法&#xff0c;快速排序的效率在平均情况下非常高&#xff…

深入解析 Spring 中的 BeanDefinition 和 BeanDefinitionRegistry

在 Spring 框架中&#xff0c;BeanDefinition 和 BeanDefinitionRegistry 是两个非常重要的概念&#xff0c;它们共同构成了 Spring IoC 容器的核心机制。本文将详细介绍这两个组件的作用、实现以及它们之间的关系。 一、BeanDefinition&#xff1a;Bean 的配置描述 1.1 什么…

《OpenCV》——光流估计

什么是光流估计&#xff1f; 光流估计的前提&#xff1f; 基本假设 亮度恒定假设&#xff1a;目标像素点的亮度在相邻帧之间保持不变。这是光流计算的基础假设&#xff0c;基于此可以建立数学方程来求解光流。时间连续或运动平滑假设&#xff1a;相邻帧之间的时间间隔足够小&a…

信息系统的安全防护

文章目录 引言**1. 物理安全****2. 网络安全****3. 数据安全****4. 身份认证与访问控制****5. 应用安全****6. 日志与监控****7. 人员与管理制度****8. 其他安全措施****9. 安全防护框架**引言 从技术、管理和人员三个方面综合考虑,构建多层次、多维度的安全防护体系。 信息…

如何进行OceanBase 运维工具的部署和表性能优化

本文来自OceanBase 用户的实践分享 随着OceanBase数据库应用的日益深入&#xff0c;数据量不断攀升&#xff0c;单个表中存储数百万乃至数千万条数据的情况变得愈发普遍。因此&#xff0c;部署专门的运维工具、实施针对性的表性能优化策略&#xff0c;以及加强指标监测工作&…

如何防止 Instagram 账号被盗用:安全设置与注意事项

如何防止 Instagram 账号被盗用&#xff1a;安全设置与注意事项 在这个数字化时代&#xff0c;社交媒体平台如 Instagram 已成为我们日常生活的一部分。然而&#xff0c;随着网络犯罪的增加&#xff0c;保护我们的在线账户安全变得尤为重要。以下是一些关键的安全设置和注意事…

Redis|复制 REPLICA

文章目录 是什么能干嘛怎么玩案例演示复制原理和工作流程复制的缺点 是什么 官网地址&#xff1a;https://redis.io/docs/management/replication/Redis 复制机制用于将数据从一个主节点&#xff08;Master&#xff09;复制到一个或多个从节点&#xff08;Slave&#xff09;&a…

对象存储之Ceph

Ceph 对象存储概述 Ceph 是一个开源分布式存储系统&#xff0c;旨在提供高度可扩展、高度可用、容错、性能优异的存储解决方案。它结合了块存储、文件系统存储和对象存储的功能&#xff0c;且在设计上具有极高的可扩展性和灵活性。 在 Ceph 中&#xff0c;对象存储&#xff0…

Document对象

DOM4j中&#xff0c;获得Document对象的方式有三种&#xff1a; 1.读取XML文件,获得document对象 SAXReader reader new SAXReader(); Document document reader.read(new File("input.xml")); 2.解析XML形式的文本,得到document对象…

树莓集团南京产业园再布局:深入剖析背后逻辑

在产业园区蓬勃发展的当下&#xff0c;树莓集团在南京的产业园再布局行动备受瞩目。这一举措并非偶然&#xff0c;其背后蕴含着深刻且多元的战略逻辑。 一、顺应区域产业发展趋势 南京作为长三角地区的重要城市&#xff0c;产业基础雄厚且多元。近年来&#xff0c;南京大力推动…

Pytorch实现之脑电波图像生成

简介 简介:采用双GAN模型架构来生成脑电波与目标图像。 论文题目:Image Generation from Brainwaves using Dual Generative Adversarial Training(使用双生成对抗训练的脑电波图像生成) 会议:IEEE Global Conference on Consumer Electronics (GCCE) 摘要:表示通过无…

HTML解析 → DOM树 CSS解析 → CSSOM → 合并 → 渲染树 → 布局 → 绘制 → 合成 → 屏幕显示

一、关键渲染流程 解析 HTML → 生成 DOM 树 浏览器逐行解析 HTML&#xff0c;构建**DOM&#xff08;文档对象模型&#xff09;**树状结构 遇到 <link> 或 <style> 标签时会暂停 HTML 解析&#xff0c;开始加载 CSS 解析 CSS → 生成 CSSOM 将 CSS 规则解析为**…

剑指offer - 面试题11 旋转数组的最小数字

题目链接&#xff1a;旋转数组的最小数字 第一种&#xff1a;正确写法&#xff08;num[m]和nums[r]比较&#xff09; class Solution { public:/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可** * param nums int整型v…

Spring源码分析の循环依赖

文章目录 前言一、循环依赖问题二、循环依赖的解决三、整体流程分析 前言 常见的可能存在循环依赖的情况如下&#xff1a; 两个bean中互相持有对方作为自己的属性。   类似于&#xff1a; 两个bean中互相持有对方作为自己的属性&#xff0c;且在构造时就需要传入&#xff1a…

Docker 部署 Jenkins持续集成(CI)工具

[TOC](Docker 部署 Jenkins持续集成(CI)工具) 前言 Jenkins 是一个流行的开源自动化工具&#xff0c;广泛应用于持续集成&#xff08;CI&#xff09;和持续交付&#xff08;CD&#xff09;的环境中。通过 Docker 部署 Jenkins&#xff0c;可以简化安装和配置过程&#xff0c;并…