Spring Kafka生产者/消费者样本

我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象。

示例场景

示例场景是一个简单的场景,我有一个系统,该系统生成一条消息,另一个系统对其进行处理

kafkaflow

使用Raw Kafka Producer / Consumer API的实施

首先,我使用原始的Kafka Producer和Consumer API来实现此方案。 如果您想看一下代码,可以在我的github仓库中找到它 。

制片人

以下设置了一个KafkaProducer实例,该实例用于向Kafka主题发送消息:

KafkaProducer<String, WorkUnit> producer = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

我使用了KafkaProducer构造函数的一种变体,该构造函数采用一个自定义的Serializer将域对象转换为json表示形式。

一旦有KafkaProducer实例可用,就可以将其用于向Kafka集群发送消息,这里我使用了同步版本的发送器,它等待响应返回。

ProducerRecord<String, WorkUnit> record = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();

消费者

在消费者方面,我们创建了一个KafkaConsumer,其中包含构造函数的一种变体,其中包含一个反序列化器 ,该解串器知道如何读取json消息并将其转换为域实例:

KafkaConsumer<String, WorkUnit> consumer
= new KafkaConsumer<>(props, stringKeyDeserializer()
, workUnitJsonValueDeserializer());

一旦KafkaConsumer实例可用,就可以建立一个监听器循环,以读取一批记录,对其进行处理,并等待更多记录通过:

consumer.subscribe("workunits);try {while (true) {ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);for (ConsumerRecord<String, WorkUnit> record : records) {log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}
} finally {this.consumer.close();
}

使用Spring Kafka的实现

我在github repo中有使用Spring-kafka的实现。

制片人

Spring-Kafka提供了一个KafkaTemplate类,作为KafkaProducer上的包装器,用于将消息发送到Kafka主题:

@Bean
public ProducerFactory<String, WorkUnit> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());
}@Bean
public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());kafkaTemplate.setDefaultTopic("workunits");return kafkaTemplate;
}

需要注意的一件事是,尽管我之前实现了一个自定义的Serializer / Deserializer,以将域类型作为json发送,然后将其转换回去,但是Spring-Kafka开箱即用地为json提供了Seralizer / Deserializer。

并使用KafkaTemplate发送消息:

SendResult<String, WorkUnit> sendResult = workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();RecordMetadata recordMetadata = sendResult.getRecordMetadata();LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);

消费者

使用者部分使用侦听器模式实现,对于已为RabbitMQ / ActiveMQ实现侦听器的任何人,应该熟悉该模式。 首先是设置侦听器容器的配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConcurrency(1);factory.setConsumerFactory(consumerFactory());return factory;
}@Bean
public ConsumerFactory<String, WorkUnit> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
}

以及响应容器读取的消息的服务:

@Service
public class WorkUnitsConsumer {private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);@KafkaListener(topics = "workunits")public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",topic, partition, offset, workUnit);}
}

这样就避免了像设置原始使用者一样设置侦听器循环的所有复杂性,并且很好地被侦听器容器隐藏了。

结论

我已经遍历了设置批处理大小,确认的变化以及不同的API签名的许多内部信息。 我的目的只是演示使用原始Kafka API的常见用例,并展示Spring-Kafka包装器如何简化它。

如果您有兴趣进一步探索, 可以在这里找到原始生产者消费者样本,在这里可以找到 Spring Kafka 。

翻译自: https://www.javacodegeeks.com/2016/11/spring-kafka-producerconsumer-sample.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/352495.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

homelede软路由设置方法_斐讯无线路由器怎么设置 斐讯无线路由器设置方法【详解】...

无线路由器的品类在不断增加&#xff0c;人们的生活也逐渐无法离开WIFI&#xff0c;各类无线路由器的设置方法大同小异&#xff0c;为了方便不为人知的小白更方便的设置无线路由器&#xff0c;这里详细介绍一下斐讯(phicomm)无线路由器怎么设置。配置前准备&#xff1a;A、请保…

Angular 个人深究(四)【生命周期钩子】

Angular 个人深究&#xff08;四&#xff09;【生命周期钩子】 定义&#xff1a; 每个组件都有一个被 Angular 管理的生命周期。 Angular 创建它&#xff0c;渲染它&#xff0c;创建并渲染它的子组件&#xff0c;在它被绑定的属性发生变化时检查它&#xff0c;并在它从 DOM 中被…

BITMAPINFO结构

BITMAPINFO结构定义了Windows设备无关位图&#xff08;DIB&#xff09;的度量和颜色信息。 一、BITMAPINFO结构形式&#xff1a; typedef struct tagBITMAPINFO { BITMAPINFOHEADER bmiHeader; RGBQUAD bmiColors[1]; } BITMAPINFO; 二、BITMAPINFO成员 bmi…

多媒体技术基础第四版林福宗pdf_意大利留学|没有绘画基础,还能考研艺术类专业吗?...

想学艺术专业&#xff0c;却没有绘画基础还能学艺术类专业么&#xff1f;意大利研究生的入学会简单一点 是典型的“宽进严出”想学艺术相关的专业&#xff0c;有2种看你是偏理论&#xff0c;还是偏实践。一起看一下&#xff01;偏实践方面纯艺类&#xff1a;油画、版画、雕塑、…

Spring Boot 1:Introduction

Spring Spring 在不断发展的过程中&#xff0c;边界不断扩张&#xff0c;需要的配置文件也越来越多&#xff0c;使用起来也越复杂&#xff0c;项目中也经常因为配置文件配置错误产生很多问题。即&#xff1a;Spring 逐渐变成了一个大而全的框架&#xff0c;背离它简洁开发的理念…

Pinely Round 3 (Div. 1 + Div. 2)

Pinely Round 3 (Div. 1 Div. 2) Pinely Round 3 (Div. 1 Div. 2) A. Distinct Buttons 题意&#xff1a;当前处于(0, 0)原点&#xff0c;给出若干个平面坐标轴上的点&#xff0c;是否可以仅选择三个方向便可以到达所有给出的点。 思路&#xff1a;到达单一坐标点最多需要…

在MFC,Win32程序中向控制台(Console)窗口输出调试信息

在MFC程序中输出调试信息的方法有两种&#xff0c;一种是使用TRACE宏&#xff0c;可以向Output窗口输出调试信息&#xff1b;另一种是用MessageBox&#xff0c;弹出消息框来输出调试信息&#xff0c;但会影响程序的运行。其实有一种方法可以更为方便的输出调试信息&#xff0c;…

C++程序内存泄漏都与哪些方面有关,该如何处理和避免

动态内存分配有几种: 一个是用了malloc/free, new/delete 第二个使用了第三方的库&#xff0c;库里面的API使用了第一种方法&#xff0c;他们需要你来释放空间&#xff0c;这个会在库的说明文档里有说明。 第三是程序递归&#xff0c;大量的程序递归和互相调用而不推出导致栈空…

unity vr是加一个摄像机就行吗_梦工厂和皮克斯员工创办的Baobab谈互动叙事:传统动画与VR动画的探索...

ACGN洞察访问三文娱网站3wyu.com查看产业必读文章Baobab工程师谈VR动画&#xff0c;以获得艾美奖和安妮奖的作品为例&#xff0c;介绍在跨媒体平台提升动画的质量、交互性和故事性。上个周末&#xff0c;Unite Shanghai 2019 在上海国际会议中心举行&#xff0c;来自世界各地的…

hbase regions_使用Regions ADF 11g进行Master Detail CRUD操作

hbase regions此示例演示了如何使用Regions在表之间创建Master Detail关系。 区域的主要目的是可重用性的概念。 使用区域和有限的任务流&#xff0c;我们可以将页面重用到许多其他页面中&#xff0c;以保持相同的功能并采用更简洁的方法 下载示例应用程序。 对于此示例&#…

(兔子繁殖问题)有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到三个月后每个月又生一对兔子,假如兔子都不死,问32个月过后的兔子总数为多少?...

网上关于这个问题有很多人说这个符合斐波那契数列&#xff0c;但是我个人在推算的时候发现并不是这样的&#xff0c;所以想自己解决这个问题 建立一个兔子类 属性有ID&#xff0c;age public class rabbit{ public  $id; public age; } 第1个月&#xff1a;(id1,age1)-------…

IF-ERRORLEVEL使用方法

我们都知道if是命令行下的一个条件判断语句&#xff0c;ERRORLEVEL是它的一个参数&#xff0c;翻译过来就是“错误返回码”的意思&#xff0c;它的作用是判断前一条命令的错误返回值&#xff0c;然后和定义的字符值进行比较&#xff0c;再决定进行什么动作今天远程一个客户&…

Eclipse IDE中的Java 9 module-info文件

请注意&#xff0c;本文并非旨在更新状态&#xff1b; 这只是基于我对Beta代码所做的一些实验而得出的快速更新。 已经有一段时间了&#xff0c;但是我要回到Eclipse IDE中尝试Java 9支持。 为了进行测试&#xff0c;我从Eclipse Project下载了最新的Oxygen &#xff08;4.7&…

局域网mysql数据库访问

1、调出mysql 命令界面 输入命令&#xff1a; mysql> use mysql Database changed mysql> select host,user,password from user; ------------------------------------------------------------ | host | user | password | --…

java.util中,util是什么意思

Util是utiliy的缩写&#xff0c;是一个多功能、基于工具的包。 java.util是包含集合框架、遗留的 collection 类、事件模型、日期和时间设施、国际化和各种实用工具类&#xff08;字符串标记生成器、随机数生成器和位数组、日期Date类、堆栈Stack类、向量Vector类等&#xff09…

Mathematics 9.0 绘制不等式确定的区域

在最新的mathematics软件中使用如下代码&#xff0c;无法产生程序&#xff1a; 在搜索了Mathematics 9.0 -> 帮助 -> 参考资料中心的内容&#xff0c;查到一个更简单好用的函数&#xff0c;正确结果如下&#xff1a;

无法打开包括文件:“mysql..h”: No such file or directory

出现这个问题后&#xff0c;根据网上提供的方法做了很多&#xff0c;但都没有用。后来在盘里面找了一下确实没有mysql.h这个文件&#xff0c;原来mysql必须选择complete安装方式才会有这个头文件。首先重新安装了一次mysql&#xff0c;然后选择自己的工程&#xff0c;进行环境配…

C# WPF 显示图片和视频显示 EmuguCv、AForge.Net测试

C# WPF 显示图片和视频显示 EmuguCv、AForge.Net测试 原文:C# WPF 显示图片和视频显示 EmuguCv、AForge.Net测试WPF 没有用到 PictureBox, 而是用Image代替. 下面我试着加载显示一个图片 。 XAML <Image x:Name"srcImg"Width"400"Height"300"…

一般一个前端项目完成需要多久_一种按周迭代的敏捷式项目管理方法

项目管理有很多理论&#xff0c;并且相关内容非常丰富&#xff0c;例如经典的项目管理的教材《项目管理&#xff1a;计划、进度和控制的系统方法》&#xff0c;字数达到了100万字。但是从源头来说&#xff0c;经典项目管理理论都是源自于对生产项目的过程中需要的管理的总结。对…

excel连接mysql 数据库

最近做个项目需要对收集到的数据进行实时刷新&#xff0c;原先考虑让获得的数据之间输出到txt文本&#xff0c;然后又文本导入到数据库&#xff0c;但是txt在修改查找的时候要把数据都读入到内存在进行相关改动&#xff0c;这样就很耗内存&#xff0c;而且文件占用率比较高&…