(转) SpringBoot接入两套kafka集群

转自:

SpringBoot接入两套kafka集群 - 风小雅 - 博客园引入依赖 compile 'org.springframework.kafka:spring-kafka' 第一套kafka配置 package myapp.kafka; importhttps://www.cnblogs.com/ylty/p/13673357.html


引入依赖

  compile 'org.springframework.kafka:spring-kafka'

第一套kafka配置

package myapp.kafka;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/*** 默认的kafka配置** @author zhengqian*/
@Slf4j
@Configuration
@Data
public class K1KafkaConfiguration {@Value("${app-name.kafka.k1.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k1.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k1.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k1.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

第二套kafka配置

package myapp.kafka;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/*** 默认的kafka配置** @author zhengqian*/
@Slf4j
@Configuration
@Data
public class K2KafkaConfiguration {@Value("${app-name.kafka.k2.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k2.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k2.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k2.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryK2() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryK2());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactoryK2() {return new DefaultKafkaConsumerFactory<>(consumerConfigsK2());}@Beanpublic Map<String, Object> consumerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactoryK2() {return new DefaultKafkaProducerFactory<>(producerConfigsK2());}@Beanpublic KafkaTemplate<String, String> kafkaTemplateK2() {return new KafkaTemplate<>(producerFactoryK2());}
}

配置文件

app-name: kafka:k1:consumer:bootstrap-servers: host1:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host1:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerk2:consumer:bootstrap-servers: host2:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host2:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

指定消费的kafka集群

    @KafkaListener(topics = "topic-name", containerFactory = "kafkaListenerContainerFactoryK2")public void onEvent(ConsumerRecord<String, String> record) {// 省略}

指定生产者发生的kafka集群

public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void test() {ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("topic", "data");try {SendResult<String, String> value = result.get(2, TimeUnit.SECONDS);System.out.println(value.getProducerRecord());System.out.println(value.getRecordMetadata());} catch (Exception e) {e.printStackTrace();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329825.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

idea tomcat部署web项目_项目开发之部署帆软到Tomcat服务一

书接上回上一篇文章介绍了两种图表取数的方式&#xff0c;新增数据库查询和通过存储过程取数&#xff0c;其他的内置数据集&#xff0c;文件数据集和关联数据集等方式暂时还没有用到&#xff0c;先暂时不介绍了&#xff0c;等之后用到了或者等小编有时间试过之后再来做个简单的…

C#工业物联网和集成系统解决方案的技术路线

前言 2000年以后&#xff0c;互联网在中国的大地上如火如荼的发展&#xff0c;在这个行业竞争中比的是加速度。我清晰的记得《世界是平的》中有这样一段话&#xff1a;在非洲&#xff0c;羚羊每天早上醒来时&#xff0c;它知道自己必须跑得比最快的狮子还快&#xff0c;否则就会…

转:Kafka事务使用和编程示例/实例

Kafka事务使用和编程示例/实例_JobShow裁员加班实况-微信小程序-CSDN博客一、概述​ Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中&#xff0c;或者说是一个原子操作&#xff0c;生产消息和提交偏移量同时成功或者失败。注意&#xff1a;kafk…

[初级]Java中的switch对整型、字符型、字符串的具体实现细节

转载自 [初级]Java中的switch对整型、字符型、字符串的具体实现细节Java 7中&#xff0c;switch的参数可以是String类型了&#xff0c;这对我们来说是一个很方便的改进。到目前为止switch支持这样几种数据类型&#xff1a;byteshort int char String 。但是&#xff0c;作为一个…

SpringBoot-Cache整合redis

前言 SpringBoot的众多Starter有两个很重要的缓存Starter&#xff0c;其中一个是我们经常用到的Redis&#xff08;spring-boot-starter-data-redis&#xff09;还有一个是 spring-boot-starter-cache。 今天主要是简单介绍一个如何整合这两个组件&#xff0c;达到相互合作的关系…

C#跨平台物联网通讯框架ServerSuperIO(SSIO)

一.SSIO的特点 轻型高性能通信框架&#xff0c;适用于多种应用场&#xff0c;轮询模式、自控模式、并发模式和单例模式。设备驱动、IO通道、控制模式场景协调统一。设备驱动内轩命令驱动器、命令缓存器、自定义参数和实时数据元素。框架平台支持按设备命令优先级别进行调度&…

spring boot 单元测试_spring-boot-plus1.2.0-RELEASE发布-快速打包-极速部署-在线演示

spring-boot-plusspring-boot-plus集成spring boot常用开发组件的后台快速开发脚手架Purpose每个人都可以独立、快速、高效地开发项目&#xff01;Everyone can develop projects independently, quickly and efficiently&#xff01;官网地址&#xff1a;springboot.plusGITHU…

在Java中如何高效的判断数组中是否包含某个元素

转载自 在Java中如何高效的判断数组中是否包含某个元素如何检查一个数组(无序)是否包含一个特定的值&#xff1f;这是一个在Java中经常用到的并且非常有用的操作。同时&#xff0c;这个问题在Stack Overflow中也是一个非常热门的问题。在投票比较高的几个答案中给出了几种不同的…

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍

【README】 0&#xff0c;为啥要看 DefaultKafkaProducerFactory&#xff1f; 最近在基于 springboot 开发kafka模块&#xff0c;发现 kafakTemplate构造器传入了 DefaultKafkaProducerFactory实例&#xff0c; kafkaTemplate内部使用了 很多 DefaultKafkaProducerFactory的方…

【SpringSecurity】【JJWT】JJWT踩坑LocalDateTime

前言 最近自己又在开始闲搞&#xff0c;主要原因还是下山无望&#xff08;买显卡&#xff09;。只能晚上下班找点事情做啦~~ 环境 版本请根据实际情况参考JJWT官网选择使用&#xff0c;这里只说明一下问题大概思路&#xff01; <!-- 增加token生成依赖 --> <depen…

针对Linux ASP.NET MVC网站中 httpHandlers配置无效的解决方案

近期有Linux ASP.NET用户反映&#xff0c;在MVC网站的Web.config中添加 httpHandlers 配置用于处理自定义类型&#xff0c;但是在运行中并没有产生预期的效果&#xff0c;服务器返回了404&#xff08;找不到网页&#xff09;错误。经我亲自测试&#xff0c;在WebForm网站中&…

简单介绍Java中Comparable和Comparator

转载自 简单介绍Java中Comparable和ComparatorComparable 和 Comparator是Java核心API提供的两个接口&#xff0c;从它们的名字中&#xff0c;我们大致可以猜到它们用来做对象之间的比较的。但它们到底怎么用&#xff0c;它们之间有又哪些差别呢&#xff1f;下面有两个例子可以…

spring-kafka整合:KafkaTemplate-kafka模板类介绍

【README】 1&#xff0c;本文主要关注 KafkaTemplate的重点方法&#xff0c;并非全部方法&#xff1b; 2&#xff0c;KafkaTemplate 底层依赖于 DefaultKafkaProducerFactory &#xff0c; 关于 DefaultKafkaProducerFactory 的介绍&#xff0c;refer2 spring-kafka整合:…

安卓 on a null object reference_详解Object.prototype.__proto__

Object.prototype 的 __proto__ 属性是一个访问器属性(一个getter函数和一个setter函数), 暴露了通过它访问的对象的内部[[Prototype]] (一个对象或 null)。使用__proto__是有争议的&#xff0c;也不鼓励使用它。因为它从来没有被包括在EcmaScript语言规范中&#xff0c;但是现…

【SpringBoot】服务器JVM远程调试

目的 当系统部署到测试环境服务器时&#xff0c;难免会遇到bug。这个时候如果能远程调试&#xff0c;那么能够大大提高我们的生产效率&#xff0c;快速完成服务调试&#xff0c;最快发布生产环境。&#xff08;领导好评不就到手了&#xff09; 准备 Idea&#xff08;Java最好…

图说:为什么Java中的字符串被定义为不可变的

转载自 图说&#xff1a;为什么Java中的字符串被定义为不可变的字符串&#xff0c;想必大家最熟悉不过了&#xff0c;通常我们在代码中有几种方式可以创建字符串&#xff0c;比如&#xff1a;String s "Hollis";这时&#xff0c;其实会在堆内存中创建一个字符串对象…

值得推荐的微软技术公众号推荐

为开阔技术人眼界&#xff0c;促进技术人职业成长。小二在此诚意推荐最值得关注的微软技术公众号。平时关注推送的文章或多或少要么学到知识技能&#xff0c;要么收到一些启发&#xff0c;有利于个人成长。这些公众号为笔者个人积累&#xff0c;不一定都合大家口味&#xff0c;…

springboot:BeanPostProcessor示例及分析

【README】 1&#xff0c;本文主要分析 BeanPostProcessor 的作用&#xff0c; 开发方式&#xff1b; 2&#xff0c;BeanPostProcessor 是bean后置处理器&#xff0c; 简而言之就是bean被创建好了&#xff0c;之后如果需要对其属性进行修改&#xff0c;则 需要使用 BeanPost…

实现了某一个接口的匿名类的例子_java中的内部类内部接口详解,一文搞定

简介一般来说&#xff0c;我们创建类和接口的时候都是一个类一个文件&#xff0c;一个接口一个文件&#xff0c;但有时候为了方便或者某些特殊的原因&#xff0c;java并不介意在一个文件中写多个类和多个接口&#xff0c;这就有了我们今天要讲的内部类和内部接口。内部类先讲内…

Java 8 日期和时间解读

转载自 Java 8 日期和时间解读现在&#xff0c;一些应用程序仍然在使用java.util.Date和java.util.Calendar API和它们的类库&#xff0c;来使我们在生活中更加轻松的处理日期和时间&#xff0c;比如&#xff1a;JodaTime。然而&#xff0c;Java 8 引进的新的类库来处理日期和时…