java作为kafka生产者实验及Expiring超时问题解决

【README】 java作为生产者,centos 作为消费者;

【1】生产者代码 

-- pom.xml 
<!-- 依赖 --> <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency></dependencies>

生产者

-- 生产者 
public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");  /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);  /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);  /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);  /* 10.发送数据 */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first01", "first01-20201229--A" + i));System.out.printf("写入数据%s \n", "first01-20201229--A" + i);try {System.out.println(future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.关闭资源 */  producer.close();System.out.println("kafka生产者写入数据完成"); } 
}

【2】centos 消费者 

[root@centos202 kafka-0.11]# kafka-console-consumer.sh --topic first01 --bootstrap-server centos201:9092 --from-beginning
first01-20201229--2
first01-20201229--6
first01-20201229--A0
first01-20201229--A1
first01-20201229--A2
first01-20201229--A3
first01-20201229--A4
first01-20201229--A5
first01-20201229--A6
first01-20201229--A7
first01-20201229--A8
first01-20201229--A9

【3】生产者发送消息超时问题

3.1、问题现场

kafka Expiring 1 record(s) for first01-3: 31539 ms has passed since batch creation plus linger time

3.2、解决方法

修改本地机器的hosts, 如下:

192.168.163.201 centos201 
192.168.163.202 centos202 
192.168.163.203 centos203 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/330216.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WEB攻击手段及防御-扩展篇

转载自 WEB攻击手段及防御&#xff0d;扩展篇 之前的文章介绍了常见的XSS攻击、SQL注入、CSRF攻击等攻击方式和防御手段&#xff0c;没有看的去翻看之前的文章&#xff0c;这些都是针对代码或系统本身发生的攻击&#xff0c;另外还有一些攻击方式发生在网络层或者潜在的攻击漏洞…

java客户端作为kafka生产者测试

【README】 1、本文主要对 java客户端作为kafka 生产者进行测试&#xff0c; 消费者由 centos的kafka命令行线程扮演&#xff1b; 2、消息发送&#xff1a; kafka的生产者采用异步发送消息的方式&#xff0c;在消息发送过程中&#xff0c;涉及到2个线程——main线程和sender…

Redis学习之缓存穿透、缓存击穿和缓存雪崩详解

目录缓存穿透解决方案缓存空对象布隆过滤器缓存击穿解决方案对访问数据库的操作加锁提前缓存热点数据&#xff0c;设置热点数据永不过期缓存雪崩解决方案Redis高可用限流降级数据预热设置合理的过期时间参考缓存穿透 指的是对某个一定不存在的数据进行请求&#xff0c;该请求将…

Redis高可用:主从复制及哨兵模式

目录主从复制作用复制原理使用的方式哨兵模式主从切换过程Redis Sentinel的配置文件参考主从复制 主从复制&#xff0c;是指将一台Redis服务器的数据&#xff0c;复制到其他的Redis服务器。前者称为主节点(master)&#xff0c;后者称为从节点(slave)&#xff1b;数据的复制是单…

java客户端作为kafka消费者测试

【README】 本文主要对 java客户端作为kafka 消费者进行测试&#xff0c; 生产者由 kafka客户端扮演&#xff1b; 【1】普通消费者 设置消费者组&#xff1b; 重置消费者的offset&#xff0c; 即每次都从最头开始消费&#xff08;默认仅保持7天内数据&#xff09; &#xf…

spring bean初始化及销毁你必须要掌握的回调方法。

转载自 spring bean初始化及销毁你必须要掌握的回调方法。 spring bean在初始化和销毁的时候我们可以触发一些自定义的回调操作。 初始化的时候实现的方法 1、通过java提供的PostConstruct注解&#xff1b; 2、通过实现spring提供的InitializingBean接口&#xff0c;并重写其a…

java生产者实现kafka拦截器

【RAEDME】 本文中&#xff0c; java客户端作为生产者&#xff0c; centos中consumer线程作为消费者&#xff1b; 【1】拦截器简述 1&#xff09;拦截器是什么&#xff1f; 很明显&#xff0c;为了实现面向切面编码&#xff0c;即在 具体逻辑的上下文 添加一些逻辑&#xff1…

Redis两种客户端:lettuce和Jedis的区别

spring boot 2的spring-boot-starter-data-redis中&#xff0c;默认使用的是lettuce作为redis客户端&#xff0c;它与jedis的主要区别如下&#xff1a; Jedis是同步的&#xff0c;不支持异步&#xff0c;Jedis客户端实例不是线程安全的&#xff0c;需要每个线程一个Jedis实例&…

mysql语句性能开销检测profiling详解

转载自 mysql语句性能开销检测profiling详解 之前我介绍过msyql查询优化explain检查命令的使用&#xff0c;explain主要是检查sql语句的基本性能&#xff0c;sql是否优秀&#xff0c;但不能查看具体的涉及硬件资源的开销&#xff0c;今天要介绍的这个profiling工具可以更细节的…

数据库主从和主备部署介绍

转自&#xff1a; https://www.cnblogs.com/fengzheng/p/13401783.html 数据库层的几种模式 在系统架构中&#xff0c;数据库层主要由如下几种模式&#xff0c;分别是单点模式、主备模式、主从模式。 单点模式 单点模式是最简单的模式&#xff0c;只有一台数据库服务器&…

lv官网编码查询_图文讲解,如何免费自主查询商标近似情况

每一个商业主体&#xff0c;都会给自己的企业、产品、服务注册一个好听的商标&#xff0c;但一个商标名并不是我们喜欢就可以&#xff0c;还要能够得到核准注册。为此&#xff0c;如何能判断出一个商标名是否能够被注册呢&#xff1f;这便需要利用商标的近似判断了(查询该商标名…

hashCode到底有什么用?

转载自 hashCode到底有什么用&#xff1f; hashCode概念 hashCode是jdk根据对象的地址算出来的一个int数字&#xff0c;即对象的哈希码值&#xff0c;代表了该对象在内存中的存储位置。 我们都知道hashCode()方法是顶级类Object类的提供的一个方法&#xff0c;所有的类都可以进…

MySQL、MongoDB、列数据库的区别及应用场景

目录什么是行存储和列存储&#xff1f;什么是MongoDB&#xff08;NoSQL&#xff09;?OLTP和OLAP什么是CAP定理&#xff1f;使用场景行存储的适用场景&#xff1a;列存储的适用场景&#xff1a;MongoDB相对于MySQL的优点更适用MySQL的场景更适用MongoDB的场景个人理解扩展参考什…

转-Redis AOF 持久化详解

转自&#xff1a; https://juejin.cn/post/6844903902991630349 Redis AOF 持久化详解 Redis 是一种内存数据库&#xff0c;将数据保存在内存中&#xff0c;读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是一旦进程退出&#xff0c;Redis 的数据就会丢失。 为了…

设计模式之动态代理模式实战

转载自 设计模式之动态代理模式实战 昨天分享了静态代理的概念及存在的缺点&#xff0c;所以今天讲的动态代理模式十分重要。动态代理在我们工作当中应用相当广泛&#xff0c;如Srping AOP就是动态代理的在开源框架的比较出名的应用。 动态代理有两种试&#xff0c;一是通过JD…

南理工计算机博士 年薪_计算机专业的女博士毕业后,进入211大学当讲师,年薪曝光...

现如今&#xff0c;对于很多年轻人来说&#xff0c;高学历就意味着高收入&#xff0c;大家在进入职场以后&#xff0c;通过高学历找到高薪的工作&#xff0c;能够在职场中少走很多弯路&#xff0c;他们在大公司能够跟很多的同事在一起勤快的合作&#xff0c;能够决定他们在以后…

MySQL优化(一):表结构优化

目录表优化数据类型的选择避免列的值为NULLVARCHAR和CHAR日期和时间类型选择标识符&#xff08;主键&#xff09;的类型错误的表结构一张表中有太多列太多的关联适当建立冗余数据混用范式和反范式建立缓存表和汇总表参考表优化 此文章用于记录《高性能MySQL》一书的知识点。 …

centos上安装redis

【README】 本文旨在阐述在linux上安装 redis 的步骤&#xff1b; 【1】具体步骤 step1&#xff09; 把 redis 压缩包 上传到 /opt/software, 并解压 step2&#xff09;打开 redis目录&#xff0c;执行 make&#xff0c; 若没有gcc编译器&#xff0c; 执行命令安装gcc&#…

设计模式之静态代理模式实战

转载自 设计模式之静态代理模式实战静态代理模式很简单&#xff0c;代理类和实现类都实现相同的接口&#xff0c;然后通过代理类来调用实现类的方法。如我们想保存用户信息之前打印用户信息&#xff0c;或者保存用户信息之后把这些信息缓存下来&#xff0c;即在运行方法前后插入…

pandas追加写入excel_[Excel]如果你爱Excel,请学好pandas

现在坐办公室的各种大小团体里&#xff0c;都会有一个“懂Excel的人”&#xff0c;可能那个人是团体里的一员&#xff0c;也可能是和这个团队关系比较好的一个热心人&#xff0c;但总之&#xff0c;你的身边会有这么一个人。如果你环顾四周也没发现&#xff0c;那可能你就是那个…