Kafka2.0生产者客户端使用

1 初始化配置

  Kafka 通过 KafkaProducer 构造器初始化生产者客户端的配置。
  常用的重要配置,详见官网。

  • bootstrap.servers:Kafka 集群地址(host1:post,host2:post),Kafka 客户端初始化时会自动发现地址,所以可以不填写所有地址。
  • key.serializer:实现了 Kafka 序列化接口的类,用来序列化 key。
  • value.serializer:实现了 Kafka 序列化接口的类,用来序列化 value。
  • acks:leader 接收到的 follower 确认的数量需要满足 acks 的配置。
     0:生产者把消息发送出去就认为发送完成了。
     1:leader 接收到消息后,不用等 follower 的确认,就表示发送完成了。
     all/-1:leader 接收到消息后,需要所有在 ISR 集合的 follower 确认后,才表示完成了。
  • retries:消息发送失败后的重试次数。如果允许重试,而 max.in.flight.requests.per.connection>1,则可能导致消息乱序,因为如果把两批消息发送到同一个分区,第一批失败并重试,而第二批成功了,则第二批消息可能先生成了。
  • retry.backoff.ms:消息重试发送的间隔。
  • client.id:标识客户端的 id。
  • compression.type:压缩类型。可选:none、gzip、snappy、lz4。
  • buffer.memory:记录累加器可以使用的最大内存缓冲池大小。
  • batch.size:内存缓冲池的缓冲列表大小。当 batch 的大小超过 batch.size 或者时间达到 linger.ms 就会发送 batch。
  • transactional.id:事务 ID。
// 基础配置
Map<String, Object> configs = new HashMap<>();
// Kafka broker 集群
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// key 序列化
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value 序列化
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

2 构造消息

  Kafka 提供了6种构造器来构造消息。

  • topic:消息主题,必填;
  • partition:分区号,非必填。如果为空,会计算 key 的 hash 值,再和该主题的分区总数取余得到分区号;如果 key 也为空,客户端会生成递增的随机整数,再和该主题的分区总数区域得到分区号。
  • timestamp:时间戳,非必填。如果为空,默认为 KafkaProducer 构造器初始化的时间。
  • key:消息 key,非必填。关系到分区分配,broker 会对带 key 的消息进行日志压缩。
  • value:消息内容,必填。
  • headers:消息头,非必填。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, K key, V value);
public ProducerRecord(String topic, K key, V value);
public ProducerRecord(String topic, V value);

3 发送消息

  支持同步发送和异步发送消息。

  同步发送

producer.send(record).get();

  异步发送

producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 回调处理流程}
});

转载于:https://www.cnblogs.com/bigshark/p/11182403.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/247659.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vuex小例

少废话&#xff0c;先出东西 vuex main.js import Vue from vue import App from ./App import router from ./router import store from ./store Vue.config.productionTip falsenew Vue({el: #app,router,store,render: xx>xx(App) })store.js 平级目录未建文件夹import…

[论文笔记]CVPR2017_Joint Detection and Identification Feature Learning for Person Search

Title: Joint Detection and Identification Feature Learning for Person Search; aXiv上该论文的第一个版本题目是 End-to-End Deep Learning for Person SearchAuthors: Tong Xiao1* ; Shuang Li1* ; Bochao Wang2 ; Liang Lin2; Xiaogang Wang1 Affilations: 1.The Chines…

php下的原生ajax请求

浏览器中为我们提供了一个JS对象XMLHttpRequet&#xff0c;它可以帮助我们发送HTTP请求&#xff0c;并接受服务端的响应。 意味着我们的浏览器不提交&#xff0c;通过JS就可以请求服务器。ajax(Asynchronous Javascript And XML)其实就是通过XHR对象&#xff0c;执行HTTP请求。…

HBase性能优化总结

HBase性能优化方法总结&#xff08;一&#xff09;&#xff1a;表的设计 1. 表的设计 1.1 Pre-Creating Regions 默认情况下&#xff0c;在创建HBase表的时候会自动创建一个region分区&#xff0c;当导入数据的时候&#xff0c;所有的HBase客户端都向这一个region写数据&#x…

.NetCore如何使用ImageSharp进行图片的生成

ImageSharp是对NetCore平台扩展的一个图像处理方案&#xff0c;以往网上的案例多以生成文字及画出简单图形、验证码等方式进行探讨和实践。 今天我分享一下所在公司项目的实际应用案例&#xff0c;导出微信二维码图片&#xff0c;圆形头像等等。 一、源码获取 Git项目地址&…

vue2工程

vue当然可以使用script标签引入&#xff0c;不需任何依赖即可按照vue的语法进行使用。但中大型商用项目中&#xff0c;还是建议使用工程化方式使用vue&#xff0c;vue提供了官方脚手架vue-cli&#xff0c;可以快速构建vue项目&#xff0c;脚手架会帮助开发者创建好建议的工程目…

flutte的第一个hello world程序

用命令行创建项目&#xff1a; flutter create flutterdemo VSCode或者AS连接手机后 输入 flutter run 编译后就可以将默认的代码显示在手机上了 开始写hello world 代码&#xff0c;这段代码写在根目录\lib\main.dart文件中&#xff0c;也是Flutter主文件。 整个代码如下 impo…

Ajax 设置Access-Control-Allow-Origin实现跨域访问

之前遇到的问题整理 ajax跨域访问是一个老问题了&#xff0c;解决方法很多&#xff0c;比较常用的是JSONP方法&#xff0c;JSONP方法是一种非官方方法&#xff0c;而且这种方法只支持GET方式&#xff0c;不如POST方式安全。 即使使用jquery的jsonp方法&#xff0c;type设为POST…

vue工程webpack模板配置说明

vue工程webpack模板下的配置文件非常多&#xff0c;只能在实际开发过程中反复熟悉&#xff0c;才能渐渐体会官方将配置文件拆分细化的合理性。 主要配置文件中代码的作用从网上摘录了比较全的一份注释&#xff0c;做下记录。 dev-server.js 开发服务端配置 require(./check-v…

目录的拼接

找到被拼接文件所在的目录&#xff0c;然后进行拼接 import os 获取当前目录&#xff1a; os.path.dirname(__file__) 如下&#xff0c;被拼接文件所在目录与当前目录的上级目录在同一文件夹下&#xff1a; os.path.join(os.path.dirname(os.path.dirname(__file__)),‘文件夹路…

vue-resource 拦截器(interceptor)的使用

拦截器-interceptor 在现代的一些前端框架上&#xff0c;拦截器基本上是很基础但很重要的一环&#xff0c;比如Angular原生就支持拦截器配置&#xff0c;VUE的Axios模块也给我们提供了拦截器配置&#xff0c;那么拦截器到底是什么&#xff0c;它有什么用&#xff1f;拦截器能帮…

【GamePlay】入门篇

【GamePlay】入门篇 游戏性编程是指通过一系列游戏系统将游戏想法变成现实的过程。 本次的简例以NPC设计为主。 通常在进行脚本设计前&#xff0c;对NPC的属性进行基本的添加和设定&#xff0c;诸如动画系统、物理系统等等。 1.动画系统 添加Animator组件&#xff0c;绑定骨骼。…

vue axios POST请求中参数以form data和request payload形式的原因

HTTP请求中&#xff0c;如果是get请求&#xff0c;那么表单参数以namevalue&name1value1的形式附到url的后面&#xff0c;如果是post请求&#xff0c;那么表单参数是在请求体中&#xff0c;也是以namevalue&name1value1的形式在请求体中。通过chrome的开发者工具可以看…

vue-resource使用

vue-resource是一个http请求插件&#xff0c;遵循promise&#xff0c;类似jquery中ajax操作。 vue-resource已不被官方推荐&#xff0c;官方推荐axios插件来操作http协议。 vue-resource中提供的方法 get(url, [options]) head(url, [options]) delete(url, [options]) jso…

HttpHttps

http协议与https Http 客户端发送一个HTTP请求到服务器的请求消息包括以下格式&#xff1a; **请求行&#xff08;request line&#xff09;、请求头部&#xff08;header&#xff09;、空行 和请求数据四个部分组成。** Get请求例子&#xff0c;使用Charles抓取的request&…

vue2使用axios post跳坑,封装成模块

终于将vue-resource替换成axios了&#xff0c;其中像application/x-www-form-urlencoded发送的头信息以及返回的response结果这两点都需要注意一下。 其实https://github.com/mzabriskie/axios也有说明的。因为我在vue-resource中使用了Vue.http.options.emulateJSON true;&am…

axios使用

axios和vue-resource一样&#xff0c;是一个vue中操作http的插件&#xff0c;遵循promise&#xff0c;vue官方也推荐使用axios。 安装axios npm i axios -S axios也是在运行时需要的&#xff0c;所以要保存在dependencies中。 引入axios import axios from axios Vue.proto…

jQuery length 和 size()区别

jQuery length和size()区别总结如下&#xff1a; 1.length是属性&#xff0c;size()是方法。 2.如果你只是想获取元素的个数&#xff0c;两者效果一样既 $("img").length 和 $("img").size() 获取的值是一样的&#xff1b;但是如果是获取字符串的长…

一些关于自己的未来的东西

2019.7.4 自己大一建立对编程的基础认识&#xff0c;确实培养了一些兴趣&#xff0c;入了个门&#xff0c;不过没有接触到本质。大二加入到了学校的网站开发团队&#xff0c;对网站开发后端进行了学习&#xff0c;对后台开发也有了基础的学习吧&#xff0c;哈哈可能以后就是要走…

Javascript面向对象编程:构造函数的继承

今天要介绍的是&#xff0c;对象之间的"继承"的五种方法。 比如&#xff0c;现在有一个"动物"对象的构造函数。 function Animal(){ this.species "动物"; } 还有一个"猫"对象的构造函数。 function Cat(name,color){ this.name nam…