kafka生产者开发方式

【README】

本文记录了 kafka生产者开发方式;


【1】生产者概览

【1.1】kafka发送消息过程

【1.2】创建kafka生产者

1)创建kafka生产者, 有3个必选属性:

  1. bootstrap.servers: kakfa集群节点地址;
  2. key.serializer: 键序列化器;
  3. value.serializer:值序列化器;
/* 1.创建kafka生产者的配置信息 */
Properties props = new Properties();
/* 指定连接的kafka集群, broker-list */
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
/* key, value 的序列化类 */
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/** 设置压缩算法 */
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");/* 2.创建生产者对象 */
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

【2】发送消息到kafka

1)发送消息有3种方式

  1. 发送并忘记:把消息发送给服务器,不管它是否到达;
  2. 同步发送:调用send()方法, 返回一个Future对象,调用其get() 方法进入阻塞,服务器响应时,阻塞线程被唤醒并获得消息写入的元数据;
  3. 异步发送:调用send() 方法,并指定一个回调函数,服务器在响应是调用该函数;

【2.1】同步发送

/*** @Description 同步发送生产者* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/
public class MyProducerSync {public static void main(String[] args) {// 1.创建kafka生产者的配置信息Properties props = new Properties();// 指定连接的kafka集群, broker-listprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");// key, value 的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2.创建生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 3.发送数据Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10","k1", "v1"));try {// 当前线程阻塞,直到kafka响应返回写入消息的元数据RecordMetadata respMetadata = future.get();System.out.println("[生产者写入消息] 分区【" + respMetadata.partition() + "】-offset【" + respMetadata.offset() + "】");} catch (Exception e) {}// 关闭生产者producer.close();System.out.println("kafka生产者写入数据完成");}
}

kafka生产者一般发生两类错误:

  1. 可重试错误,如连接错误(通过再次建立连接来解决),无主错误(通过重新分区选举首领解决);
  2. 不可重试错误,如消息太大错误;

【2.2】异步发送消息 (带回调函数)

/*** @Description 【异步】发送生产者* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/
public class MyProducerAsync {public static void main(String[] args) {// 1.创建kafka生产者的配置信息Properties props = new Properties();// 指定连接的kafka集群, broker-listprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");// key, value 的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2.创建生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 3.发送数据producer.send(new ProducerRecord<String, String>("hello10","k1", "v1"), 
new MyProducerCallback());// 关闭生产者producer.close();System.out.println("kafka生产者写入数据完成");}/*** @Description 生产者发送消息后回调类* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/private static class MyProducerCallback implements Callback {// kafka服务器响应时回调方法@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("[生产者写入消息成功] 分区【" + metadata.partition() + "】-offset【" + metadata.offset() + "】");} else {System.out.printf("写入kafka失败,异常信息【%s】", exception);}}}
}

【2.3】生产者配置

1)acks: 有3个可选项;表示生产者消息被认为写入成功时,需要写入的副本个数;

  • 0:无需判断,只要把消息发送出去,就认为成功;
  • 1:仅首领副本;
  • all:所有副本;

2)buffer.memory 设置生产者内存缓冲区大小,用于缓冲发送到服务器的消息;

  • 若缓冲区不足,send() 方法要么阻塞,要么抛出异常;取决于如何设置  max.block.ms 参数(抛出异常前可以阻塞一段时间);

3)compression.type: 压缩算法;

  1. 默认不压缩;可选压缩算法包括 snappy, gzip ,lz4
  2. 使用压缩可以降低网络传输开销和存储开销,这是 kafka发送消息的瓶颈所在; 

4)retries发送消息失败时,生产者可以重试的次数;

  • 如果达到这个次数,生产者会放弃重试并返回错误; 默认情况下,生产者会在每次重试之间等待 100ms,通过 retry.backoff.ms  参数来改变这个时间间隔;
  • 一般情况下,没必须处理可重试错误。但需要处理不可重试错误或重试次数超过上限的情况;

5)batch.size  生产者把多个消息放在同一个批次里;该参数指定了一个批次可以使用的内存大小,单位字节;不过生产者不一定等到批次被填满才发送(参考 linger.ms);

6)linger.ms指定生产者在发送批次前等待更多消息加入批次的时间;

  • 生产者会在批次填满或linger.ms 达到上限时把批次发送出去;
    • 建议把linger.ms 设置为大于0的数,虽然增加了延时但提高了吞吐量;

7)client.id 任意字符串,服务器用它识别消息来源,还可以用在 日志和配额指标里;

8)max.in.flight.requests.per.connection指定生产者在收到服务器响应前可以发送多少个消息;

  • 把它设置为1,可以保证消息是按照顺序写入服务器的,即使发生了重试;

9)timeout.ms  , request.timeout.ms 和 metadata.fetch.timeout.ms

  1. request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间;
  2. metadata.fetch.timeout.ms  指定了生产者在获取元数据时等待服务器返回响应时间;若等待超时,要么重试,要么抛出异常;
  3. timeout.ms:指定了broker等待同步副本返回消息确认的时间, 与 acks 相匹配;

10)max.block.ms send() 方法或使用 partitionFor() 获取元数据时生产者的阻塞时间;

  • 当生产者发送缓冲区已满,或没有可用的元数据,这些方法就会阻塞;在阻塞时间达到 该值时,生产者抛出超时异常;

11)max.request.size 指定生产者发送的请求大小;

  • 可以指单个消息的最大值,也可以指单个请求所有消息总大小(如一批多个消息但走了一个请求);
  • 注意: broker对可接受的消息最大值有自己的限制(通过 message.max.bytes) 指定; 

12)receive.buffer.bytes 和 send.buffer.bytes

  • 分别指定 TCP socket接收和发送数据包的缓冲区大小; 如果设置为-1,使用操作系统默认值;

【2.4】生产者常用配置代码示例

public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超时时间props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 缓冲区大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化类 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 设置压缩算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");System.out.println(props);/* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);/* 10.发送数据 */String now = DateUtils.getNowTimestamp();int order = 1;for (int i = 0; i < 50000; i++) {for (int j = 0; j < 3; j++) {Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10",j, "", String.format("[%s] ", order++) + now + " > " + DataFactory.INSTANCE.genChar(5)));try {System.out.println("[生产者] 分区【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");} catch (Exception e) {}}}/* 11.关闭资源 */producer.close();System.out.println("kafka生产者写入数据完成");}
}

【3】分区

1)使用消息的键来做hash,以hash值作为分区号;

2)如果键为null,则使用默认分区器;默认使用 轮询(Round Robin)算法把消息均衡分布到各个分区上; 

【3.1】实现自定义分区策略

/*** @Description 自定义分区策略的生产者* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/
public class MyProducerWithPartition {public static void main(String[] args) {// 1.创建kafka生产者的配置信息Properties props = new Properties();// 指定连接的kafka集群, broker-listprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");// key, value 的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置分区器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());// 2.创建生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 3.发送数据Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10","31", "v1"));try {// 当前线程阻塞,直到kafka响应返回写入消息的元数据RecordMetadata respMetadata = future.get();System.out.println("[生产者写入消息] 分区【" + respMetadata.partition() + "】-offset【" + respMetadata.offset() + "】");} catch (Exception e) {}// 关闭生产者producer.close();System.out.println("kafka生产者写入数据完成");}
}

分区器 

/*** @Description 分区器* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/
public class MyPartitioner implements Partitioner {// 对键首位字符ascii取分区数的模获得分区号@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int partitionSize = cluster.partitionCountForTopic(topic);int operand = 0;if (key != null && String.valueOf(key).length() > 0) {operand = String.valueOf(key).codePointAt(0);}return operand % partitionSize;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}


【4】拦截器

定义拦截器,设置拦截器属性(可配置多个拦截器);

/** 设置拦截器 */
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
Arrays.asList(TimeInterceptor.class.getName()));
/*** @Description 时间拦截器* @author xiao tang* @version 1.0.0* @createTime 2021年12月10日*/
public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在消息被序列化以及计算分区前调用, 追加时间戳(偷梁换柱)return new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value() + "[TimeInterceptor]" + DateUtils.getNowTimestamp());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 在消息从 RecordAccumulator 成功发送到Kafka Broker之后,或者在发送过程中失败时调用// 写入数据库}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

消费消息日志:

消费者-分区【0】offset【7774】 -> 2021-12-10 21:05:32--[1]  > ABCDE[TimeInterceptor]2021-12-10 21:05:30
消费者-分区【1】offset【7644】 -> 2021-12-10 21:05:32--[2]  > ABCDE[TimeInterceptor]2021-12-10 21:05:32
消费者-分区【2】offset【7626】 -> 2021-12-10 21:05:32--[3]  > ABCDE[TimeInterceptor]2021-12-10 21:05:32

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329752.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA面试常考系列三

转载自 JAVA面试常考系列三 题目一 什么是迭代器(Iterator)&#xff1f; 迭代器&#xff08;iterator&#xff09;是一种对象&#xff0c;它能够用来遍历标准模板库容器中的部分或全部元素&#xff0c;每个迭代器对象代表容器中确定的地址。迭代器提供了一种方法&#xff0c;可…

linux wait函数头文件_手把手教Linux驱动9-等待队列waitq

在上一篇《手把手教Linux驱动8-Linux IO模型》我们已经了解了阻塞、非阻塞、同步和异步等相关概念&#xff0c;本文主要讲解如何通过等待队列实现对进程的阻塞。应用场景&#xff1a;当进程要获取某些资源(例如从网卡读取数据)的时候&#xff0c;但资源并没有准备好(例如网卡还…

HoloLens开发手记-配置开发环境 Install the tools

随着Build 2016开发者大会的结束&#xff0c;HoloLens开发包也正式开放下载。Hololens没有独立的SDK&#xff0c;开发特性被集成到最新的Visual Studio Update 2中。如果你没有HoloLens真机&#xff0c;那么可以安装HoloLens模拟器。 安装清单 注意: 这里为了方便大家顺利下载安…

kafka再均衡监听器测试

【README】 本文使用的kafka是最新的 kafka3.0.0&#xff1b;本文kafka集群有3个节点分别是 centos201, centos202, centos203 &#xff1b; brokerid 分别为 1,2&#xff0c;3&#xff1b;本文主要用于测试 再均衡监听器&#xff1b;当有新消费者加入时&#xff0c;会发生分区…

java面试常考系列四

转载自 java面试常考系列四 题目一 大O符号(big-O notation)的作用是什么&#xff1f;有哪些使用方法&#xff1f; 大O符号描述了当数据结构里面的元素增加的时候&#xff0c;算法的规模或者是性能在最坏的场景下有多么好。大O符号也可用来描述其他的行为&#xff0c;比如&…

用python进行自然语言处理_Python自然语言处理示例:SVM和贝叶斯分类

❝关于自然语言处理(NLP)方面的文章、书籍非常之多&#xff0c;对于自然语言处理的上手&#xff0c;很多人是不知所措的。通过对本文的浏览&#xff0c;您应该能够对自然语言处理会有一个能够完整的有趣的基于Python的对自然语言处理的了解。❞什么是文本分类文本分类是将文本按…

Build 2016,你可能忽视的几个细节

微软公司主办的Build 2016大会尚在进程中&#xff0c;但是两场重量级的主题演讲已经结束。下面列举了我个人非常关注的几个细节&#xff0c;介绍一些背景知识以饲读者。 Bash on Windows背后的历史和未来 微软和IBM二十多年前联合开发NT内核的时候就已经为接驳多种操作系统留下…

JAVA面试常考系列五

转载自 JAVA面试常考系列五 题目一 串行(serial)收集器和吞吐量(throughput)收集器的区别是什么&#xff1f;吞吐量收集器使用并行版本的新生代垃圾收集器&#xff0c;它用于中等规模和大规模数据的应用程序。串行收集器对大多数的小应用(在现代处理器上需要大概100M左右的内存…

kafka消费者开发方式小结

【README】 1&#xff0c; 本文总结了 kafka消费者开发方式&#xff1b;2&#xff0c; 本文使用的是最新的kafka版本 3.0.0&#xff1b;【1】 kafka消费则 【1.1】消费者与消费者组 1&#xff09;消费者&#xff1a; 应用程序需要创建消费者对象&#xff0c;订阅主题并开始接…

微软发布Azure Functions、Service Fabric和IoT Starter Kits新服务

微软此次 Build 2016 大会的重点主题一直都围绕开发和 Microsoft Azure 云服务&#xff0c;今天更是对外发布了 Azure Functions、Service Fabric 和 IoT Starter Kit 等一系列新服务。就目前与其它友商的竞争而言&#xff0c;微软近期不断的修炼内功&#xff0c;使 Microsoft …

python发送邮件 退回_python 发送邮件(收到的邮件要有发送方才能回复)

Python使用SMTP(简单邮件传输协议)发送邮件普通文本邮件普通文本邮件发送的实现&#xff0c;关键是要将MIMEText中_subtype设置为plain## -*- coding: UTF-8 -*-import smtplibfrom email.mime.text import MIMEText#导入MIMEText类from email import encodersfrom email.heade…

JAVA面试常考系列六

转载自 JAVA面试常考系列六 题目一一个Applet有哪些生命周期&#xff1f; 一个Applet的生命周期分为以下四个阶段&#xff1a; Init 每次加载时都会初始化一个小程序。此方法通知Applet&#xff0c;方法已经被装入系统&#xff0c;在第一次调用start方法之前总是先调用它。Init…

.NET的未来包含一个开源的Mono

在微软Build 2016大会的第二天&#xff0c;微软项目经理Scott Hunter和Scott Hanselman就.NET平台的现状和未来计划做了一场演讲。演讲的题目是“.NET概述”&#xff0c;他们的精彩演讲耗时一个小时&#xff0c;描绘了公司对于.NET的目标以及开发人员可以期待什么。就像开幕式主…

kafka消费者接收分区测试

【README】 本文演示了当有新消费者加入组后&#xff0c;其他消费者接收分区情况&#xff1b;本文还模拟了 broker 宕机的情况&#xff1b;本文使用的是最新的 kafka3.0.0 &#xff1b;本文测试案例&#xff0c;来源于 消费者接收分区的5种模型&#xff0c;建议先看模型&#…

python数据分析架构_Python数据分析

引言&#xff1a;本文重点是用十分钟的时间帮读者建立Python数据分析的逻辑框架。其次&#xff0c;讲解“如何通过Python 函数或代码和统计学知识来实现数据分析”。本次介绍的建模框架图分为六大版块&#xff0c;依次为导入数据&#xff0c;数据探索&#xff0c;数据处理&…

JAVA面试常考系列七

转载自 JAVA面试常考系列七 题目一 Swing的方法中&#xff0c;有哪些是线程安全的&#xff1f; Swing的规则是&#xff1a;当Swing组件被具现化时&#xff0c;所有可能影响或依赖于组件状态的代码都应该在事件派发线程中执行。 因此有3个线程安全的方法&#xff1a; repaint()…

图片中的Build 2016

微软主办的Build 2016大会刚刚落幕&#xff0c;让我们通过下面的图片集锦来回顾大会的一些容易被人忽视的细节。 Xamarin加入微软大家庭 微软公司于二月底花大价钱买下了Xamarin这家移动开发平台提供商&#xff0c;终于补全了它Mobile First Cloud First战略的短板。 图片一&am…

diy实现spring依赖注入

【README】 本文diy代码实现了 spring 依赖注入&#xff0c;一定程度上揭示了依赖注入原理&#xff1b; 【1】控制反转-Inversion of Control 是一种编码思想&#xff0c;简而言之就是 应用程序A可以使用组件B&#xff0c;但A无法控制B的生命周期&#xff08;如创建&#xff…

html 中一个格子拆分成两个_一个效果惊人的数字游戏

安爸曾多次讲过数学推理能力对孩子成长的重要性&#xff0c;听到有位家长说自己用扔骰子的方法教孩子数学等式。步骤大致是扔骰子时&#xff0c;如果骰子是3&#xff0c;就在棋盘上从0出发走3步&#xff0c;并且写出033的加法等式。扔到负数就后退&#xff0c;写出减法等式。科…

JAVA面试常考系列八

转载自 JAVA面试常考系列八 题目一 JDBC是什么&#xff1f; JDBC&#xff08;Java DataBase Connectivity,java数据库连接&#xff09;是一种用于执行SQL语句的Java API&#xff0c;可以为多种关系数据库提供统一访问&#xff0c;由一组用Java语言编写的类和接口组成。JDBC提供…