转自:
SpringBoot接入两套kafka集群 - 风小雅 - 博客园引入依赖 compile 'org.springframework.kafka:spring-kafka' 第一套kafka配置 package myapp.kafka; importhttps://www.cnblogs.com/ylty/p/13673357.html
引入依赖
compile 'org.springframework.kafka:spring-kafka'
第一套kafka配置
package myapp.kafka;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/*** 默认的kafka配置** @author zhengqian*/
@Slf4j
@Configuration
@Data
public class K1KafkaConfiguration {@Value("${app-name.kafka.k1.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k1.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k1.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k1.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
第二套kafka配置
package myapp.kafka;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/*** 默认的kafka配置** @author zhengqian*/
@Slf4j
@Configuration
@Data
public class K2KafkaConfiguration {@Value("${app-name.kafka.k2.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k2.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k2.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k2.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryK2() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryK2());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactoryK2() {return new DefaultKafkaConsumerFactory<>(consumerConfigsK2());}@Beanpublic Map<String, Object> consumerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactoryK2() {return new DefaultKafkaProducerFactory<>(producerConfigsK2());}@Beanpublic KafkaTemplate<String, String> kafkaTemplateK2() {return new KafkaTemplate<>(producerFactoryK2());}
}
配置文件
app-name: kafka:k1:consumer:bootstrap-servers: host1:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host1:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerk2:consumer:bootstrap-servers: host2:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host2:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
指定消费的kafka集群
@KafkaListener(topics = "topic-name", containerFactory = "kafkaListenerContainerFactoryK2")public void onEvent(ConsumerRecord<String, String> record) {// 省略}
指定生产者发生的kafka集群
public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void test() {ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("topic", "data");try {SendResult<String, String> value = result.get(2, TimeUnit.SECONDS);System.out.println(value.getProducerRecord());System.out.println(value.getRecordMetadata());} catch (Exception e) {e.printStackTrace();}}
}