kafka消费者监听消费

1. pom

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2. kafka 监听消费

消费成功调用 ack.acknowledge()方法确认。

import com.xxx.gsc.sci.order.entity.SciMbgPsdHistoryEntity;
import com.xxx.gsc.sci.order.mapper.SciMbgPsdHistoryMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;/*** 同步SCI系统mbg_psd_partial_line_history表。</p>* 上游增量推送,PK: VERSION_NUM,SO,SO_ITEM,PSD,PLANNED_QTY** @date 2022/08/05 18:06* @see org.springframework.kafka.listener.MessageListener*/
@Component
@Slf4j
public class SyncSciMbgPsdHistory {@Autowiredprivate SqlSessionTemplate sqlSessionTemplate;@KafkaListener(topics = "#{'${customer.kafka.topics}'.split(',')[1]}")public void sync(List<SciMbgPsdHistoryEntity> dataList, Acknowledgment ack) {SqlSession session = null;try {log.info("Starting to consume of PSD data ...");long startTime = System.currentTimeMillis();session = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.BATCH, false);SciMbgPsdHistoryMapper mapper = session.getMapper(SciMbgPsdHistoryMapper.class);dataList.forEach(v -> mapper.upsert(v));session.commit();ack.acknowledge();long duration = System.currentTimeMillis() - startTime;log.info("Finished to consume of PSD data! total count: {}条, total time: {} s", dataList.size(), duration / 1000.0);} catch (Throwable e) {e.printStackTrace();log.error(e.getMessage());} finally {if (null != session) {session.close();}}}
}

3. 配置

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.xxx.gsc.tech.framework.GscTechAutoConfiguration;
import com.xxx.gsc.tech.framework.jackson.deserializer.DateDeserializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.system.ApplicationHome;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;/*** Kafka自动载入类。<p/>* 实现初始化Kafka配置工厂** @author zhengwei16* @date 2022/8/12 15:29* @version 1.0*/
@SpringBootConfiguration
@AutoConfigureAfter(GscTechAutoConfiguration.class)
@ConditionalOnClass(KafkaTemplate.class)
@Slf4j
public class KafkaAutoConfiguration {private final Logger logger = LoggerFactory.getLogger(getClass());private KafkaProperties properties;public KafkaAutoConfiguration(KafkaProperties properties) throws IOException {Resource trustStoreLocation = properties.getSsl().getTrustStoreLocation();log.info("SSL file path:" + (Objects.isNull(trustStoreLocation) ? "" : trustStoreLocation.getURI().toString()));if (trustStoreLocation != null && !trustStoreLocation.isFile()) {ApplicationHome applicationHome = new ApplicationHome(getClass());log.info("Application Home:" + applicationHome.getDir().getPath());File sslFile = new File(applicationHome.getSource().getParentFile(), Objects.requireNonNull(trustStoreLocation.getFilename()));FileUtils.copyInputStreamToFile(Objects.requireNonNull(trustStoreLocation.getInputStream(), "SSL File Not Exist"), sslFile);properties.getSsl().setTrustStoreLocation(new FileSystemResource(sslFile));}this.properties = properties;}@Bean@Primarypublic KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,ProducerListener<Object, Object> kafkaProducerListener,RecordMessageConverter messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);if (messageConverter != null) {kafkaTemplate.setMessageConverter(messageConverter);}kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean@Primarypublic ConsumerFactory<?, ?> kafkaConsumerFactory() {Map<String, Object> configs = this.properties.buildConsumerProperties();configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));return new DefaultKafkaConsumerFactory<>(configs);}@Bean@Primarypublic ProducerFactory<?, ?> kafkaProducerFactory() {Serializer stringSerializer = new StringSerializer();Serializer jsonSerializer = new JsonSerializer(new ObjectMapper() {{setSerializationInclusion(JsonInclude.Include.NON_NULL);configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));}});DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(this.properties.buildProducerProperties(), stringSerializer, jsonSerializer);String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix + "_" + UUID.randomUUID());}return factory;}@Bean@Primarypublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);return factory;}@Bean@Primarypublic RecordMessageConverter kafkaMessageConverter(ObjectMapper objectMapper) {ObjectMapper om = new ObjectMapper();return new StringJsonMessageConverter(om.setSerializationInclusion(JsonInclude.Include.NON_NULL).setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).registerModule(new SimpleModule().addDeserializer(Date.class, new DateDeserializer())).setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")));}@Bean@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {KafkaTransactionManager<?, ?> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);// kafkaTransactionManager.setTransactionSynchronization(KafkaTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);kafkaTransactionManager.setNestedTransactionAllowed(true);return kafkaTransactionManager;}@Bean@ConditionalOnBean(KafkaTransactionManager.class)public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(DataSourceTransactionManager dataSourceTransactionManager, KafkaTransactionManager<?, ?> kafkaTransactionManager) {return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, dataSourceTransactionManager);}}

4. application配置

spring:kafka:
# Tstbootstrap-servers: n1-mkt-sy.xxx.com:9092,n2-mkt-sy.xxx.com:9092,n3-mkt-sy.xxx.com:9092consumer:group-id: mbgcpfrauto-offset-reset: earliestenable-auto-commit: falseauto-commit-interval: 1000max-poll-records: 5000security:protocol: SASL_SSLproperties:max.partition.fetch.bytes: 104857600fetch.min.bytes: 2108576fetch.max.wait.ms: 10000session.timeout.ms: 300000  # default 10000request.timeout.ms: 600000 # default 30000max.poll.interval.ms: 600000 # default 300000sasl:mechanism: SCRAM-SHA-512jaas:config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf-username' password='passwd';ssl:trust-store-location: classpath:client_truststore.jkstrust-store-password: PASSWDlistener:#concurrency: 2 #容器中的线程数,用于提高并发量#      ack-count: # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".ack-mode: manual_immediate # Listener AckMode. See the spring-kafka documentation.#      ack-time: # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".#poll-timeout: # Timeout to use when polling the consumer.type: batch # Listener type.missing-topics-fatal: false

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/865088.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker liunx的底层逻辑是什么,docker 的原理是什么?怎么部署及应用,Docker的来龙去脉

Docker 是一种用于开发、交付和运行应用程序的开源平台。它使应用程序能够在容器中运行&#xff0c;提供了轻量级的虚拟化环境。以下是 Docker 的底层逻辑、原理以及部署和应用的方法。 Docker 的底层逻辑 Docker 的核心是利用 Linux 内核的几个特性来实现轻量级的虚拟化&…

使用dot来画流程图

Dot是一种图形描述语言&#xff0c;属于Graphviz软件的一部分。Graphviz是一个用于可视化图形&#xff08;图表、网络图等&#xff09;的开源工具集。使用Dot语言&#xff0c;你可以创建并描述节点和边&#xff0c;从而生成图形。以下是如何使用Dot语言画图的基本步骤&#xff…

【CSAPP】-attacklab实验

目录 实验目的与要求 实验原理与内容 实验设备与软件环境 实验过程与结果&#xff08;可贴图&#xff09; 实验总结 实验目的与要求 1. 强化机器级表示、汇编语言、调试器和逆向工程等方面基础知识&#xff0c;并结合栈帧工作原理实现简单的栈溢出攻击&#xff0c;掌握其基…

C++线程安全是如何保证的?线程不安全是如何出现的?有什么处理方案呢

在C中&#xff0c;保证线程安全有如下几种机制&#xff1a; 1. 互斥锁&#xff08;Mutex&#xff09; 互斥锁用于保护共享资源&#xff0c;确保同一时间只有一个线程可以访问&#xff1a; #include <mutex> #include <thread>std::mutex mtx; // 全局互斥锁void…

游游的水果大礼包(枚举)

题目链接&#xff1a;https://ac.nowcoder.com/acm/problem/255193 题解 题目解析 就拿第一个例子来看&#xff0c;当选择组成1个一号礼包和1个二号礼包时最大的价值是3元&#xff0c;而选择2个二号礼包时&#xff0c;最大的价值是4元&#xff0c;因此选择2个二号礼包。 算法…

2-23 基于matlab的小波变换碰磨故障信号的特征提取

基于matlab的小波变换碰磨故障信号的特征提取&#xff0c;可以画出信号原图&#xff0c;轴心轨迹&#xff0c;频谱图以及多层小波变换的重构信号。程序已调通&#xff0c;可直接运行。 2-23 小波变换 碰磨故障信号 轴心轨迹 - 小红书 (xiaohongshu.com)

工厂设计模式的实现与应用场景分析

工厂设计模式的实现与应用场景分析 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 工厂设计模式&#xff08;Factory Pattern&#xff09;是一种创建型设计模…

Spring Boot 中使用 Spring Security 实现安全访问权限管理:详尽指南

引言&#xff1a; 在现代Web应用开发中&#xff0c;安全是一个至关重要的环节。Spring Security 是一个功能强大且高度可定制的安全框架&#xff0c;能够为Spring Boot应用提供全面的安全解决方案&#xff0c;包括认证&#xff08;Authentication&#xff09;和授权&#xff0…

html+css+js写的多人在线积分系统

可以添加成员&#xff0c;成员名称自定义 可以对各个成员加分减分➕➖ 可以删除成员 源码在图片下面&#xff0c;记得点赞加关注❤️❤️❤️ 界面 源代码 <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8">…

2.2.5 C#中显示控件BDPictureBox 的实现----ROI交互续2

2.2.5 C#中显示控件BDPictureBox 的实现----ROI交互续2 1 ROI数组作用说明 变量&#xff1a;m_ROIs[5] ROI 使用效果图 ROI数组说明 2 ROI显示逻辑图 ROI 交互主要是在设定状态下&#xff0c; runmode下只要普通显示即可 3 主要ROI显示函数函数 判断当前鼠标是否获取…

怎么快速给他人分享图片?扫描二维码看图的简单做法

现在通过二维码来查看图片是一种很常见的方法&#xff0c;通过二维码来查看图片不仅能够减少对手机存储空间的占用&#xff0c;而且获取图片变得更加方便快捷&#xff0c;只需要扫码就能够查看图片&#xff0c;有利于图片的展现。很多的场景中都有图片二维码的应用&#xff0c;…

个人微信二次开发

​ 由于自身在机器人方面滚爬多年&#xff0c;现在收藏几个宝藏机器人 推荐一下自己常用的机器人&#xff1a; 适合有技术开发的公司&#xff0c;可以自主开发所需要的功能&#xff01;十分齐全 测试问文档&#xff1a;https://www.wkteam.cn/ 有需要的兄弟可以看一下&#…

题库-编程题

1&#xff0e;用循环的嵌套&#xff0c;输出输出如下图形 * * * * * * * * * * * * * * * * * * * * * * * * * package Exercises.One_Hundred; ​ public class Demo01 {public static void main(String[] args) {for(int i1;i<5;i){for(int j1;j<2*i-1;j){Sys…

音视频同步的关键:深入解析PTS和DTS

&#x1f60e; 作者介绍&#xff1a;我是程序员行者孙&#xff0c;一个热爱分享技术的制能工人。计算机本硕&#xff0c;人工制能研究生。公众号&#xff1a;AI Sun&#xff0c;视频号&#xff1a;AI-行者Sun &#x1f388; 本文专栏&#xff1a;本文收录于《音视频》系列专栏&…

App备案过程中遇到的一些注意事项

上一篇从零开始完成App的ICP备案我们大致的把App的备份过了一遍&#xff0c;这次主要说下中途可能遇到的坑。 App备案需要通过接入服务商备案&#xff0c;同时需要域名、备案码&#xff08;1个app一个&#xff0c;android和ios版本共享&#xff09; 1. 备案名称 备案的名称就…

Vue.js 中 ref 和 reactive 的区别及用法

Vue.js 中 ref 和 reactive 的区别及用法 ref 目的&#xff1a;创建一个对值的响应式引用。 用法&#xff1a;通过 .value 属性来访问和修改值。 示例&#xff1a; import { ref } from vue;const count ref(0);count.value; // 增加值 console.log(count.value); // 访…

STM32 ADC精度提升方法

STM32 ADC精度提升方法 Fang XS.1452512966qq.com如果有错误&#xff0c;希望被指出&#xff0c;学习技术的路难免会磕磕绊绊量的积累引起质的变化 硬件方法 优化布局布线&#xff0c;尽量减小其他干扰增加电源、Vref去耦电容使用低通滤波器&#xff0c;或加磁珠使用DCDC时尽…

RS232隔离器的使用

RS232隔离器在通信系统中扮演着至关重要的角色&#xff0c;其主要作用可以归纳如下&#xff1a; 一、保护通信设备 电气隔离&#xff1a;RS232隔离器通过光电隔离技术&#xff0c;将RS-232接口两端的设备电气完全隔离&#xff0c;从而避免了地线回路电压、浪涌、感应雷击、静电…

el-upload组件封装方案

basic-upload.vue——基本上传组件 <template><div class"basic-upload-wrap"><el-uploadref"uploadRef":file-list"fileList":accept"accept"update:file-list"(data) > emits(update:file-list, data)"…

vue+js实现鼠标右键页面时在鼠标位置出现弹窗

首先是弹窗元素 <div class"tanchuang move-win1"id"tanchuang1"><el-button>111</el-button></div>然后在需要弹窗的地方监听点击事件&#xff0c;可以将这个方法写在页面载入事件中 // 获取弹窗元素 var tanchuang document.…