文章目录  1、生产者拦截器 1.1、创建生产者拦截器 1.2、KafkaTemplate配置生产者拦截器 1.3、使用Java代码创建主题分区副本 1.4、application.yml配置----v1版 1.5、屏蔽 kafka debug 日志 logback.xml 1.6、引入spring-kafka依赖 1.7、控制台日志   
 
package  com. atguigu. kafka. interceptor ; 
import  org. apache. kafka. clients. producer.  ProducerInterceptor ; 
import  org. apache. kafka. clients. producer.  ProducerRecord ; 
import  org. apache. kafka. clients. producer.  RecordMetadata ; 
import  org. springframework. stereotype.  Component ; 
import  java. util.  Map ; 
@Component 
public  class  MyKafkaInterceptor  implements  ProducerInterceptor < String , String > { @Override public  ProducerRecord < String ,  String > onSend ( ProducerRecord < String ,  String > )  { System . out. println ( "生产者即将发送消息:topic = " +  producerRecord. topic ( ) + ",partition:" + producerRecord. partition ( ) + ",key = " + producerRecord. key ( ) + ",value = " + producerRecord. value ( ) ) ; return  null ; } @Override public  void  onAcknowledgement ( RecordMetadata  recordMetadata,  Exception  e)  { if ( e ==  null ) { System . out. println ( "消息发送成功:topic = " +  recordMetadata. topic ( ) + ",partition:" + recordMetadata. partition ( ) + ",offset=" + recordMetadata. offset ( ) + ",timestamp=" + recordMetadata. timestamp ( ) ) ; } } @Override public  void  close ( )  { } @Override public  void  configure ( Map < String ,  ? > )  { } 
} package  com. atguigu. kafka. producer ; import  com. atguigu. kafka. interceptor.  MyKafkaInterceptor ; 
import  jakarta. annotation.  PostConstruct ; 
import  jakarta. annotation.  Resource ; 
import  org. junit. jupiter. api.  Test ; 
import  org. springframework. boot. test. context.  SpringBootTest ; 
import  org. springframework. kafka. core.  KafkaTemplate ; 
import  java. io.  IOException ; @SpringBootTest 
class  KafkaProducerApplicationTests  { @Resource KafkaTemplate  kafkaTemplate; @Resource MyKafkaInterceptor  myKafkaInterceptor; @PostConstruct public  void  init ( )  { kafkaTemplate. setProducerInterceptor ( myKafkaInterceptor) ; } @Test void  contextLoads ( )  throws  IOException  { kafkaTemplate. send ( "my_topic1" ,  "spring-kafka-生产者拦截器" ) ; System . in. read ( ) ; } 
} package  com. atguigu. kafka. config ; 
import  org. apache. kafka. clients. admin.  NewTopic ; 
import  org. springframework. context. annotation.  Bean ; 
import  org. springframework. kafka. config.  TopicBuilder ; 
import  org. springframework. stereotype.  Component ; 
@Component 
public  class  KafkaTopicConfig  { @Bean public  NewTopic  myTopic1 ( )  { return  TopicBuilder . name ( "my_topic1" ) . partitions ( 3 ) . replicas ( 3 ) . build ( ) ; } 
} server: port:  8110 # v1
spring: kafka: bootstrap- servers:  192.168 .74 .148 : 9095 , 192.168 .74 .148 : 9096 , 192.168 .74 .148 : 9097 producer:  # producer 生产者retries:  0  # 重试次数 0 表示不重试acks:  - 1  # 应答级别: 多少个分区副本备份完成时向生产者发送ack确认( 可选0 、1 、- 1 / all) batch- size:  16384  # 批次大小 单位bytebuffer- memory:  33554432  # 生产者缓冲区大小 单位bytekey- serializer:  org. apache. kafka. common. serialization. StringSerializer # key的序列化器value- serializer:  org. apache. kafka. common. serialization. StringSerializer # value的序列化器< configuration> < loggername = " org.apache.kafka.clients" level = " debug" /> </ configuration> <?xml version="1.0" encoding="UTF-8"?> 
< projectxmlns = " http://maven.apache.org/POM/4.0.0" xmlns: xsi= " http://www.w3.org/2001/XMLSchema-instance" xsi: schemaLocation= " http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion> </ modelVersion> < parent> < groupId> </ groupId> < artifactId> </ artifactId> < version> </ version> < relativePath/> </ parent> < groupId> </ groupId> < artifactId> </ artifactId> < version> </ version> < name> </ name> < description> </ description> < properties> < java.version> </ java.version> </ properties> < dependencies> < dependency> < groupId> </ groupId> < artifactId> </ artifactId> </ dependency> < dependency> < groupId> </ groupId> < artifactId> </ artifactId> < scope> </ scope> </ dependency> < dependency> < groupId> </ groupId> < artifactId> </ artifactId> </ dependency> </ dependencies> < build> < plugins> < plugin> < groupId> </ groupId> < artifactId> </ artifactId> </ plugin> </ plugins> </ build> </ project> 生产者即将发送消息:topic =  my_topic1,partition: null ,key =  null ,value =  spring- kafka- 生产者拦截器
消息发送成功:topic =  my_topic1,partition: 0 ,offset= 0 ,timestamp= 1717490776329 
[ [ { "partition" :  0 , "offset" :  0 , "msg" :  "spring-kafka-生产者拦截器" , "timespan" :  1717490776329 , "date" :  "2024-06-04 08:46:16" } ] 
]