springboot整合kafka_springboot整合kafka实现消息的发送消费

如下是springboot整合kafka的一个案例,方便需要的小伙伴。

启动kafka Server

cd 到kafka的bin目录下:前提是启动zk./kafka-server-start.sh /Users/hz/programs/kafka_2.12-2.2.1/config/server.properties &

kafka创建topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-kafka

kafka 发送信息命令:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test-kafka

kafka 接收信息命令:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-kafka --from-beginning

整合springboot+kafka

KafkaProducerConfig.java

 @Configuration public class KafkaProducerConfig {    @Bean public ProducerFactory producerFactory() {      Map configProps = new HashMap<>();      configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");     configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);      configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);      return new DefaultKafkaProducerFactory<>(configProps);   } @Bean public KafkaTemplate kafkaTemplate() {   return new KafkaTemplate<>(producerFactory());   } }

KafkaConsumerConfig.java

@EnableKafka @Configuration public class KafkaConsumerConfig {  @Bean public ConsumerFactory consumerFactory() {     Map props = new HashMap<>();     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test-kafka");    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);    return new DefaultKafkaConsumerFactory<>(props);   } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {   ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();  factory.setConsumerFactory(consumerFactory());   return factory;   } }

KafkaProducer.java

@Component @AllArgsConstructor @Slf4j public class KafkaProducer {   private final KafkaTemplate kafkaTemplate;   public void data(String data){   try {     kafkaTemplate.send("test-kafka", "这是测试的数据==>"+data );   }catch (Exception e){     e.printStackTrace();     log.error("出错!!!!!!!!!!!"); }  } }

KafkaConsumer.java

@Component @AllArgsConstructor @Slf4j public class KafkaConsumer { @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group}",errorHandler = "consumerAwareErrorHandler")                             public void data(ConsumerRecord consumerRecord) throws Exception {                               Object value = consumerRecord.value();                               if (log.isInfoEnabled()) {                                log.info("offset {}, value {}", consumerRecord.offset(), consumerRecord.value());                               } if (null == value) {                                 log.error("kafka消费数据为空");                               } log.info((String) value);                               //模拟异常情况处理消息 throw new Exception("模拟消费异常,执行errorhandle方法"); }                               @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {                                return (message, exception, consumer) -> {                                   System.out.println("消费异常:"+message.getPayload()); return null;                               };                             } }

TestKafkaController.java

@RestController @AllArgsConstructorpublic class TestController {  private final KafkaProducer kafkaProducer;   @GetMapping("/testKafka") public String aVoid(String str){     try { kafkaProducer.data(str);         }catch (Exception e){           e.printStackTrace();         }     return "成功==============================";   } }

application.properties

spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.producer.batch-size=16 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.retries=0 spring.kafka.producer.enable-auto-commit=false #设置大于0的值,则客户端会将发送失败的记录重新发送 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer kafka.topic: test-kafka #主题消费分组 kafka.group: group-test-kafka

启动项目测试一下:http://127.0.0.1:8686/testKafka

7d0da58d172d72000cf099f16e33286a.png
eaa506354fe234fec5104dfe21576378.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/476685.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于如何在BCB中使用CodeGuard

关于如何在BCB中使用CodeGuard点击数&#xff1a;231 录入时间&#xff1a;2007-6-7 一、 为什么写这篇东西 自己在使用 BCB5 写一些程序时需要检查很多东西&#xff0c;例如内存泄漏、资源是否有释放等等&#xff0c;在使用了很多工具后&#xff0c;发觉 BCB5 本…

港大腾讯提出DiffusionDet:第一个用于目标检测的扩散模型

编&#xff5c;杜伟、陈萍源&#xff5c;机器之心扩散模型不但在生成任务上非常成功&#xff0c;这次在目标检测任务上&#xff0c;更是超越了成熟的目标检测器。扩散模型&#xff08; Diffusion Model &#xff09;作为深度生成模型中的新 SOTA&#xff0c;已然在图像生成任务…

无监督学习概论

文章目录1. 无监督学习基本原理2. 基本问题2.1 聚类 Clustering2.2 降维 Dimensionality Reduction2.3 概率模型估计3. 机器学习三要素4. 无监督学习方法4.1 聚类4.2 降维4.3 话题分析4.4 图分析1. 无监督学习基本原理 机器学习或统计学习一般包括监督学习、无监督学习、强化学…

python xlrd使用_python处理Excel xlrd的简单使用

xlrd主要用于读取Excel文件&#xff0c;本文为大家分享了python处理Excel的具体代码&#xff0c;供大家参考&#xff0c;具体内容如下 安装 pip install xlrd api使用 import xlrd # 打开Excel文件读取数据 workbook xlrd.open_workbook(a.xlsx); # 打印所有的sheet列出所有的…

学习Duwamish7的MSDN说明及相关技术策略

&#xff08;一&#xff09;.MSDN对Duwamish7的说明   Duwamish 7.0 是由 MSDN 开发的通用 Duwamish 系列应用程序的功能性端口&#xff08;完全使用 .NET 技术&#xff09;。尽管示例本身是围绕虚拟网上书店建立的&#xff0c;但本示例主要关注的方面却是性能&#xff0c;与…

程序员面试金典 - 面试题 17.08. 马戏团人塔(最长上升子序 DP/二分查找)

文章目录1. 题目2. 解题2.1 超时解2.2 二分查找1. 题目 有个马戏团正在设计叠罗汉的表演节目&#xff0c;一个人要站在另一人的肩膀上。出于实际和美观的考虑&#xff0c;在上面的人要比下面的人矮一点且轻一点。 已知马戏团每个人的身高和体重&#xff0c;请编写代码计算叠罗…

海洋分享lol皮肤插件_LOL战斗之夜客户端BUG频出服务器爆满!如何提前领取皮肤?...

Hello大家好&#xff0c;我是Taker&#xff01;昨天的战斗之夜&#xff0c;小编可是单排奋战了一整晚~排位段位和箱子积分都定格在了钻石&#xff0c;不知道各位如何呢&#xff1f;(^o^)/~今天终于到了收获的日子了&#xff01;之前我们打的积分所获得的皮肤箱子今天就可以打开…

socket异步处理问题

由于一个项目要和第三方软件做接口&#xff0c;第三方软件是Unix的操作系统&#xff0c;所以用了Socket来传输数据。具体结构是这样的&#xff1a;本项目作为服务器端&#xff0c;第三方软件是客户端&#xff0c;并且有多个客户端。通常情况下&#xff0c;要开多个线程来处理多…

2022爆火的AIGC,能给AI续命吗

文 | 陈彩娴源 | AI科技评论生产力如已成熟&#xff0c;想象力还会远吗&#xff1f;“你们是从什么时候开始注意到人类的&#xff1f;”“当第一个原始人开始仰望星空的时候。”AI 的类人猿&#xff0c;早已开始仰望人类。来自机器的审视在过去短短的两年间&#xff0c;算法从业…

LeetCode 354. 俄罗斯套娃信封问题(最长上升子序 DP/二分查找)

1. 题目 给定一些标记了宽度和高度的信封&#xff0c;宽度和高度以整数对形式 (w, h) 出现。 当另一个信封的宽度和高度都比这个信封大的时候&#xff0c;这个信封就可以放进另一个信封里&#xff0c;如同俄罗斯套娃一样。 请计算最多能有多少个信封能组成一组“俄罗斯套娃”…

python语言语句块标记是_Python的基本语法——语句块

1.语句块是在条件为真&#xff08;条件语句&#xff09;时执行或者执行多次&#xff08;循环语句&#xff09;的一组语句&#xff1b; 2在代码前放置空格来缩进语句即可创建语句块&#xff0c;语句块中的每行必须是同样的缩进量&#xff1b; 3.缩进&#xff1a;Python开发者有意…

[导入]设计模式初学者系列-工厂方法

摘要: 闲谈工厂方法 设计模式系列到了第四篇了&#xff0c;如果还不谈谈工厂方法设计模式就太对不起GoF了&#xff0c;为什么有如此一说&#xff1f;实际上工厂方法模式是好些模式的基石&#xff0c;她们或多或少的使用了工厂方法模式或以工厂方法为模型。 工厂方法模式是一种类…

推荐一个好发论文的研究方向

今天给大家推荐一个研究的好方向—— 图神经网络。它是近些年学术界和工业界最新的研究热点&#xff01;在社交网络、知识图谱、推荐系统等工业界有广阔的应用前景。最重要的是&#xff0c;图神经网络与CV和NLP交叉&#xff0c;容易有创新点&#xff0c;是出论文的好方向。对于…

LeetCode 31. 下一个排列(线性扫描)

1. 题目 实现获取下一个排列的函数&#xff0c;算法需要将给定数字序列重新排列成字典序中下一个更大的排列。 如果不存在下一个更大的排列&#xff0c;则将数字重新排列成最小的排列&#xff08;即升序排列&#xff09;。 必须原地修改&#xff0c;只允许使用额外常数空间。…

Google工作10年的职场感悟

源&#xff5c;电子发烧友网、程序厨哈喽大家好&#xff0c;今天坐地铁读到了一位在 Google 工作10年的“老”工程师关于技术、管理和职场生涯的感悟。我看完后觉得很有收获&#xff0c;因此在这里也分享给大家。以下是全文&#xff0c;后文中的「我」均指「原作者」。我在 Goo…

博客堂joycode被黑掉了

博客堂现在用ie7已经打不开了&#xff0c;用telnet www.joycode.com 80 链接&#xff0c;链接成功之后输入大写的命令 GET / 然后连续两次回车&#xff0c;就可以看到第一行的木马代码<iframe src?????? width20 height0 frameborder0></iframe>。访问其中任…

python的注释符_Python3 注释和运算符

Python3 注释 确保对模块, 函数, 方法和行内注释使用正确的风格 Python中的注释有单行注释和多行注释&#xff1a; Python中单行注释以 # 开头&#xff0c;例如&#xff1a;&#xff1a; #这是一个注释 print("Hello, World!") 多行注释用三个单引号 或者三个双引号…

程序员面试金典 - 面试题 05.04. 下一个数(线性扫描)

文章目录1. 题目2. 解题2.1 STL2.2 线性扫描2.3 位运算1. 题目 下一个数。给定一个正整数&#xff0c;找出与其二进制表达式中1的个数相同且大小最接近的那两个数&#xff08;一个略大&#xff0c;一个略小&#xff09;。 例1:输入&#xff1a;num 2&#xff08;或者0b10&am…

同花顺如何切换k线_K线之形态学:浅谈纸白银产品该如何去做好交易?

K线之形态学&#xff1a;浅谈纸白银产品该如何去做好交易&#xff1f;由于疫情期间&#xff0c;明显感觉到今年做投资理财的朋友多了&#xff0c;特别是银行的纸产品&#xff0c;我是分析外盘伦敦银伦敦金为主&#xff0c;自己也是只操作外盘产品。国内的不管是纸白银或者TD白银…

从NeurIPS论文来看,中美学者很少互相引用

源&#xff5c;机器之心想要改变这种情况还很难。不知从何时起&#xff0c;我们习惯了人工智能的学术顶会上中美研究数量排名前两位的形势。不论在工业还是学术上&#xff0c;两者很大程度上引领了技术的发展&#xff0c;中美的交流也非常密切&#xff1a;不少大厂都会在对面设…