kafka入门(二)

Java客户端访问Kafka

引入maven依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka‐clients</artifactId>
<version>2.4.1</version>
</dependency>

消息发送端代码

package com.tuling.kafka.kafkaDemo;import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class MsgProducer {private final static String TOPIC_NAME = "my‐replicated‐topic";public static void main(String[] args) throws InterruptedException, ExecutionException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// props.put(ProducerConfig.ACKS_CONFIG, "1");// props.put(ProducerConfig.RETRIES_CONFIG, 3);// props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// props.put(ProducerConfig.LINGER_MS_CONFIG, 10);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(props);int msgNum = 5;final CountDownLatch countDownLatch = new CountDownLatch(msgNum);for (int i = 1; i <= msgNum; i++) {Order order = new Order(i, 100 + i, 1, 1000.00);// 指定发送分区// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0, order.getOrderId().toString(), JSON.toJSONString(order));// 未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));// 等待消息发送成功的同步阻塞方法// RecordMetadata metadata = producer.send(producerRecord).get();// System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"//         + metadata.partition() + "|offset-" + metadata.offset());// 异步回调方式发送消息producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("发送消息失败:" + exception.getStackTrace());}if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}countDownLatch.countDown();}});// 送积分 TODO}countDownLatch.await(5, TimeUnit.SECONDS);producer.close();}
}

消息接收端代码

package com.tuling.kafka.kafkaDemo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MsgConsumer {private final static String TOPIC_NAME = "my‐replicated‐topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC_NAME));// 消费指定分区// consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 消息回溯消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/// 指定offset消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/// 从指定时间点开始消费/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);// 从1小时前开始消费long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() + "|offset-" + offset);System.out.println();// 根据消费里的timestamp确定offsetif (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}*/while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}/*if (records.count() > 0) {// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了consumer.commitSync();// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.printl

Spring Boot整合Kafka

引入spring boot kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring‐kafka</artifactId></dependency>

application.yml配置如下:

server:port: 8080spring:kafka:bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch-size: 16384buffer-memory: 33554432acks: 1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manual_immediate

发送者代码:

package com.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final static String TOPIC_NAME = "my‐replicated‐topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}
}

消费者代码:

package com.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class MyConsumer {/*** @KafkaListener(groupId = "testGroup", topicPartitions = {* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),* @TopicPartition(topic = "topic2", partitions = "0",* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))* }, concurrency = "6")* // concurrency 就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数* @param record*/@KafkaListener(topics = "my‐replicated‐topic", groupId = "zhugeGroup")public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);// 手动提交offsetack.acknowledge();}/* // 配置多个消费组@KafkaListener(topics = "my‐replicated‐topic", groupId = "tulingGroup")public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);ack.acknowledge();} */
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/81760.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python----目标检测(PASCAL VOC数据集)

一、PASCAL VOC数据集 PASCAL VOC&#xff08;Visual Object Classes&#xff09;数据集是计算机视觉领域中广泛使用的一个 标准数据集&#xff0c;用于目标检测、图像分割、图像分类、动作识别等任务。该数据集由 PASCAL&#xff08;Pattern Analysis, Statistical Modelling …

mariadb 升级 (通过yum)

* 注意下 服务名, 有的服务器上是mysql,有的叫mariadb,mysqld的 #停止 systemctl stop mysql #修改源 vi /etc/yum.repos.d/MariaDB.repo baseurl http://yum.mariadb.org/11.4/centos7-amd64 #卸载 yum remove mysql #安装 yum install MariaDB-server galera-4 MariaDB-…

vuejs处理后端返回数字类型精度丢失问题

标题问题描述 后端返回数据有5.00和3.30这种数据&#xff0c;但是前端展示的时候返回对应分别为5和3.0&#xff0c;小数点后0都丢失了。 接口返回数据展示network-Response&#xff1a; 接口返回数据展示network-Preview&#xff1a; 错误数据效果展示 发现问题 浏览器接口…

ubuntu kubeasz 部署高可用k8s 集群

ubuntu kubeasz 部署高可用k8s 集群 测试环境主机列表软件清单kubeasz 部署高可用 kubernetes配置源配置host文件安装 ansible 并进行 ssh 免密登录:下载 kubeasz 项⽬及组件部署集群部署各组件开始安装修改 config 配置文件增加 master 节点增加 kube_node 节点登录dashboard…

IDEA2025版本使用Big Data Tools连接Linux上Hadoop的HDFS

目录 Windows的准备 1. 将与Linux上版本相同的hadoop压缩包解压到本地 ​编辑2.设置$HADOOP HOME环境变量指向:E:\hadoop-3.3.4 3.下载hadoop.dll和winutils.exe文件 4.将hadoop.dll和winutils.exe放入$HADOOP HOME/bin中 IDEA中操作 1.下载Big Data Tools插件 2.添加并连…

Java转Go日记(三十九):Gorm查询

1.1.1. 查询 // 获取第一条记录&#xff0c;按主键排序db.First(&user)SELECT * FROM users ORDER BY id LIMIT 1;// 获取最后一条记录&#xff0c;按主键排序db.Last(&user)SELECT * FROM users ORDER BY id DESC LIMIT 1;// 获取所有记录db.Find(&users)SELECT *…

bisheng系列(二)- 本地部署(前后端)

一、导读 环境&#xff1a;Ubuntu 24.04、open Euler 23.03、Windows 11、WSL 2、Python 3.10 、bisheng 1.1.1 背景&#xff1a;需要bisheng二开商用&#xff0c;故而此处进行本地部署&#xff0c;便于后期调试开发 时间&#xff1a;20250519 说明&#xff1a;bisheng前后…

5G金融互联:迈向未来金融服务的极速与智能新时代

5G金融互联:迈向未来金融服务的极速与智能新时代 大家好,我是Echo_Wish,今天咱们聊聊一个大家都十分关心的话题:5G网络在金融服务中的应用。咱们平时可能觉得5G只是打个电话、刷个视频更流畅了,但在金融服务领域,5G的低延时、大带宽和高可靠性正在悄然改变整个游戏规则。…

UE5 GAS框架解析内部数据处理机制——服务器与客户端

当&#xff0c; gas通过点击鼠标光标触发事件时&#xff0c;内部的处理机制。 当通过点击事件&#xff0c;命中中目标时&#xff0c; 可获取到对应的TargetData 目标数据。处理相应的操作。 仅有本地的客户端的情况下。命中并不会有什么异常。 当存在服务器时&#xff0c; 服…

Golang的Web应用架构设计

# Golang的Web应用架构设计 介绍 是一种快速、高效、可靠的编程语言&#xff0c;它在Web应用开发中越来越受欢迎。Golang的Web应用架构设计通常包括前端、后端和数据库三个部分。在本篇文章中&#xff0c;我们将详细介绍Golang的Web应用架构设计及其组成部分。 前端 在Golang的…

对比 HTTP-REST 与 gRPC:各自的优缺点以及适用的场景

文章目录 对比 HTTP-REST 与 gRPC&#xff1a;各自的优缺点以及适用的场景HTTP-REST 与 gRPC 的核心区别gRPC 的优缺点HTTP-REST 的优缺点适用场景 模糊点什么是 Protobuf&#xff1f;HTTP/2 会将 HTTP 消息拆分并封装为二进制帧&#xff0c;那还能过使用 HTTP/2 构建 RESTful …

现代健康生活养生指南

现代社会中&#xff0c;熬夜加班、久坐不动、饮食不规律成为许多人的生活常态&#xff0c;由此引发的健康问题也日益增多。想要摆脱亚健康&#xff0c;不必依赖中医理念&#xff0c;从以下这些现代科学养生方法入手&#xff0c;就能逐步改善身体状况。​ 饮食上&#xff0c;注…

Go语言数组的定义与操作 - 《Go语言实战指南》

在 Go 语言中&#xff0c;数组&#xff08;Array&#xff09; 是一种定长、同类型的集合。它在内存中是连续分布的&#xff0c;适合用于性能敏感的场景。 一、数组的定义 数组的基本语法如下&#xff1a; var 数组名 [长度]元素类型 示例&#xff1a; var nums [5]int …

Helm Chart 中配置多个 Docker Registry 地址以实现备用访问

在 Helm Chart 中配置多个 Docker Registry 地址以实现备用访问&#xff0c;可以通过以下几种方式实现&#xff1a; 1. 在 values.yaml 中定义多个 Registry 在 values.yaml 中定义主 Registry 和备用 Registry&#xff0c;以便在部署时灵活切换&#xff1a; # values.yaml …

云原生安全:错误策略S3存储桶ACL设置为Everyone:FullControl

🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 ——从基础到实践的深度解析 1. 基础概念 S3存储桶与ACL Amazon S3(Simple Storage Service)是AWS提供的对象存储服务,支持存储和检索任意规模的数据。ACL(访问控制列表…

.NET 8 kestrel 配置PEM,实现内网https

一、生成证书 mkcert 是一个简单的工具&#xff0c;用于制作本地信任的开发证书。它不需要配置。 mkcert官方仓库地址&#xff1a;GitHub - FiloSottile/mkcert: A simple zero-config tool to make locally trusted development certificates with any names youd like. 简…

nodejs快速入门到精通1

参考 nodejs快速入门到精通 菜鸟教程-nodejs nodejs官方文档 原因 视频免费 资料收费 笔记还是自己写吧 安装 nodejs官网 windows下&#xff1a; #查看nodejs版本 node -v #查看npm版本 npm -v #设置npm为淘宝镜像源 npm config set registry https://registry.npmmirror.…

nginx负载均衡及keepalive高可用

实验前期准备&#xff1a; 5台虚拟机&#xff1a;4台当做服务器&#xff0c;1台当做客户机&#xff08;当然&#xff0c;也可以使用主机的浏览器&#xff09;&#xff0c;4台服务器中&#xff0c;2台服务器当做后端真实访问服务器&#xff1b;另外2台服务器当做负载均衡服务器…

go语法大赏

前些日子单机房稳定性下降&#xff0c;找了好一会才找到真正的原因。这里面涉及到不少go语法细节&#xff0c;正好大家一起看一下。 一、仿真代码 这是仿真之后的代码 package mainimport ("fmt""go.uber.org/atomic""time" )type StopSignal…

Android 14 解决打开app出现不兼容弹窗的问题

应用安装到 Android 14 上&#xff0c;出现如下提示 This app isn’t compatible with the latest version of Android. Check for an update or contact the app’s developer. 通过源码找原因。 提示的字符 根据字符找到 ./frameworks/base/core/res/res/values/strings.xm…