Kafka客户端基础使用

依赖

引入以下依赖

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.1.0</version></dependency>

生产者

  • 文档: https://kafka.apache.org/documentation/#producerapi
  1. 设置服务配置,例如序列化,IP端口等
        final Properties props = new Properties() {{// 服务IP端口配置put(BOOTSTRAP_SERVERS_CONFIG, "study.fedora01.com:9092,study.fedora02.com:9092,study.fedora03.com:9092");// 序列化put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());}};
  1. 发送消息
    2.1 单向发送,不关心是否发送成功
        try (final Producer<String, String> producer = new KafkaProducer<>(props)) {final Random rnd = new Random();final int numMessages = 10;for (int i = 0; i < numMessages; i++) {String user = users[rnd.nextInt(users.length)];String item = items[rnd.nextInt(items.length)];// 单向发送, 不关心是否发送成功producer.send(new ProducerRecord<>(topic, user, item));System.out.printf("%s events were produced to topic %s%n", numMessages, topic);}

2.2 同步发送, 发送完成后获取服务端发送情况,如果一直未发送成功那么就会进行阻塞

     try (final Producer<String, String> producer = new KafkaProducer<>(props)) {final Random rnd = new Random();final int numMessages = 10;for (int i = 0; i < numMessages; i++) {String user = users[rnd.nextInt(users.length)];String item = items[rnd.nextInt(items.length)];// 同步发送, 发送完成后获取服务端发送情况,如果一直未发送成功那么就会进行阻塞RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, user, item),(event, ex) -> {if (ex != null)ex.printStackTrace();elseSystem.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);}).get();}

2.3 异步发送, 回调函数处理异步响应信息

      try (final Producer<String, String> producer = new KafkaProducer<>(props)) {final Random rnd = new Random();final int numMessages = 10;for (int i = 0; i < numMessages; i++) {String user = users[rnd.nextInt(users.length)];String item = items[rnd.nextInt(items.length)];// 异步发送, 回调函数处理异步响应信息producer.send(new ProducerRecord<>(topic, user, item),(event, ex) -> {if (ex != null)ex.printStackTrace();elseSystem.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);});}System.out.printf("%s events were produced to topic %s%n", numMessages, topic);}

消费者

  • 文档: https://kafka.apache.org/documentation/#consumerapi
  1. 设置配置信息
        String topic = "producer-topic";Properties props = new Properties();// 设置服务地址props.put("bootstrap.servers", "study.fedora01.com:9092,study.fedora02.com:9092,study.fedora03.com:9092");// 消费者端需要设置一个group id来消费信息props.put("group.id", "test-group-2"); // 序列化props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);
  1. 监听topic
        consumer.subscribe(Collections.singletonList(topic));System.out.println("开始消费topic: " + topic);
  1. 处理消息,每隔1000毫秒拉取一次
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));if (records.isEmpty()) {System.out.println("还没有读取到消息...");}for (ConsumerRecord<String, String> record : records) {System.out.printf("Key=%s, Value=%s, Partition=%d, Offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/963364.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java EE初阶--多线程 - 教程

Java EE初阶--多线程 - 教程2025-11-12 13:21 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important;…

【ArcMap】查看、反转线的方向

1、查看 双击左侧图层中的线形,选择箭头即可 2、反转:首先开始编辑,选中需要反转的线,点击编辑折点(Editor Verticea)按钮 然后在选中的线上右键(或者左键双击选中,再右键),选则flip反转 反转成功

systemd-timedated.service Dbus参考

https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.timedate1.htmlName org.freedesktop.timedate1 — The D-Bus interface of systemd-timedatedIntroduction systemd-timedated.service(8)…

2025年比较好的防火岩棉板厂家实力及用户口碑排行榜

2025年比较好的防火岩棉板厂家实力及用户口碑排行榜行业背景与市场趋势随着我国建筑节能标准的不断提高和消防安全意识的增强,防火岩棉板作为A级不燃材料在建筑外墙保温领域的应用日益广泛。据中国绝热节能材料协会统…

2025年口碑好的链条输送机实力厂家TOP推荐榜

2025年口碑好的链条输送机实力厂家TOP推荐榜行业背景与市场趋势随着全球制造业智能化升级步伐加快,链条输送机作为工业自动化生产线的核心设备,市场需求持续增长。据《2024-2029年中国输送机械行业市场调研与投资前景…

2025年比较好的超强承重天地铰链厂家实力及用户口碑排行榜

2025年超强承重天地铰链厂家实力及用户口碑排行榜行业背景与市场趋势随着现代家居设计向极简风格发展,天地铰链作为高端柜门系统的核心部件,市场需求持续增长。据中国五金制品协会2024年数据显示,国内高端五金配件市…

传统企业能源管理痛点破解:MyEMS 如何解决 “数据散、分析难、优化慢” 三大核心问题?

在 “双碳” 目标与制造业转型升级的双重驱动下,能源管理已成为传统企业降本增效、实现可持续发展的核心抓手。然而,多数传统企业(如化工、机械、冶金等)在能源管理中仍深陷 “数据散、分析难、优化慢” 的困境 —…

Magisk体系:Android Root权限的工程化部署方案

Magisk的技术定位与核心价值 Magisk作为Android平台的开源Root框架,以APK形态实现无系统分区侵入(Systemless)的权限接管机制。其技术突破在于规避对/system分区的直接修改,通过动态挂载技术完成提权,同时构建了可…

2025年评价高的绿篱修剪机最新TOP厂家排名

2025年评价高的绿篱修剪机最新TOP厂家排名行业背景与市场趋势随着城市化进程加快和绿化需求提升,全球园林机械市场持续增长。据《2024-2029年全球园林机械行业市场调研与前景预测报告》显示,2023年全球园林机械市场规…

MX Round 23 解题报告

T1 破环为链,枚举区间。 接下来考虑本质不同的顺序只有:\(ABC\) 和 \(CBA\),第二种可以通过序列逆序后重复操作得到。 接下来我们在枚举区间时,统计每一个元素在最后占区间中的每个字母出现次数。 我们发现交换有两…

2025年质量好的载带成型机用户口碑最好的厂家榜

2025年质量好的载带成型机用户口碑最好的厂家榜行业背景与市场趋势载带成型机作为电子元器件包装领域的关键设备,近年来随着半导体、电子元件行业的快速发展而迎来新的增长机遇。据《2024-2029年中国载带行业市场调研…

【转】Milo库OPCUA协议java实现

原文地址:Milo库OPCUA协议java实现 - 知乎 Milo库 今天跟大家来介绍一下一个OPC UA协议的开源库,我们使用的现场设备为西门子的S7-1500 CPU,西门子的S7-1500在V2.1版本后就直接可以作为OPC UA的服务器来供其他客户端…

2025年热门的超低压反渗透膜最新TOP品牌厂家排行

2025年热门的超低压反渗透膜最新TOP品牌厂家排行行业背景与市场趋势随着全球水资源短缺问题日益严峻,水处理技术尤其是反渗透膜技术正迎来快速发展期。根据Global Water Intelligence最新报告显示,2024年全球反渗透膜…

2025年口碑好的门式起重机最新TOP厂家排名

2025年口碑好的门式起重机最新TOP厂家排名行业背景与市场趋势门式起重机作为现代工业生产和物流运输中不可或缺的重型设备,近年来随着制造业转型升级和基础设施建设的持续推进,市场需求呈现稳定增长态势。根据中国重…

2025年热门的立式明装风机盘管TOP品牌厂家排行榜

2025年热门的立式明装风机盘管TOP品牌厂家排行榜行业背景与市场趋势随着建筑节能要求的不断提高和中央空调系统的广泛应用,立式明装风机盘管作为中央空调末端设备的重要组成部分,近年来市场需求持续增长。据《2024-2…

2025年耐用的微型磁力齿轮泵厂家推荐及选购指南

2025年耐用的微型磁力齿轮泵厂家推荐及选购指南行业背景与市场趋势微型磁力齿轮泵作为工业流体输送领域的关键设备,近年来随着新能源、半导体、化工等行业的快速发展,市场需求持续增长。根据《2024-2029年中国磁力泵…

2025年11月酶制剂品牌对比榜:五家代表企业深度解析

站在生产线末端,看着淀粉糖化液缓缓流入储罐,李工最担心的是酶活波动导致收率下滑;烘焙研发部的小赵则纠结于改良剂里的蛋白酶是否会让面包侧壁塌陷;饲料集团采购总监老周每年四季度都要重新评估供应商,因为原料价…

2025年靠谱的轻型卡车天窗用户好评厂家排行

2025年靠谱的轻型卡车天窗用户好评厂家排行行业背景与市场趋势随着中国物流运输业的持续发展,轻型卡车作为城市配送和短途运输的主力车型,其市场需求稳步增长。据中国汽车工业协会统计,2024年我国轻型卡车销量达到1…

2025年11月酶制剂品牌评价榜:五强性能与口碑综合排行

临近年底,食品、饲料、医药三大行业进入配方升级窗口期,酶制剂作为“生物芯片”被频繁写进工艺单。采购经理们面对的问题是:同一酶种,不同厂家给出的酶活单位、添加量、吨成本差异可达三成;换供应商又要重新做稳定…

2025年11月白酒曲厂家推荐榜:机械化制曲排行评测

立冬一过,新粮入仓,各大酒厂陆续启动秋酿。对酿酒师而言,曲是酒的骨,曲药选得准,出酒率、香气、口感才能稳得住。可现实里,不少酒厂仍被“曲味杂、升酸快、香气薄”反复折磨:小曲糖化慢、大曲成本高、机械化车间…