004 Kafka异常处理

6.异常处理

文章目录

  • 6.异常处理
      • 1.异常分类与处理原则
      • 2.生产者异常处理
        • 1. 同步发送捕获异常
        • 2. 异步发送回调处理
      • 3.消费者异常处理
        • 1.全局异常处理器
        • 2.方法级处理
        • 3.重试yml配置
      • 4.死信队列(DLQ)配置
        • 1. 启用死信队列
        • 2. 手动发送到DLQ
      • 5.事务场景异常处理
        • 1. 声明式事务
        • 2. 事务异常回滚
      • 6.监控与告警
        • 1. Actuator 健康检查
        • 2. Prometheus 指标
      • 7.完整异常处理流程
      • 8.最佳实践总结

来源参考的deepseek,如有侵权联系立删

1.异常分类与处理原则

异常类型典型场景处理建议
可恢复异常网络抖动、数据库锁冲突重试机制(有限次数 + 退避策略)
不可恢复异常消息格式错误、权限不足直接记录日志并进入死信队列
事务异常事务超时、生产者ID冲突终止事务并回滚操作

2.生产者异常处理

1. 同步发送捕获异常
public void sendSync(String topic, String message) {try {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.get(5, TimeUnit.SECONDS); // 阻塞等待结果} catch (InterruptedException | ExecutionException | TimeoutException e) {// 记录日志并触发补偿逻辑log.error("消息发送失败: {}", e.getMessage());throw new BusinessException("消息发送失败", e);}
}
2. 异步发送回调处理
public void sendAsync(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> {// 发送成功处理log.info("消息发送成功: topic={}", result.getRecordMetadata().topic());},ex -> {// 发送失败处理log.error("消息发送失败", ex);if (ex instanceof RetriableException) {// 可重试异常(如网络问题)retrySend(topic, message);} else {// 不可重试异常(如消息过大)deadLetterService.saveToDlq(topic, message);}});
}

3.消费者异常处理

1.全局异常处理器
@Configuration
public class KafkaGlobalErrorConfig {// 定义全局错误处理器(支持批量/单消息模式)@Beanpublic CommonErrorHandler globalErrorHandler(KafkaTemplate<String, Object> template) {// 重试策略:3次重试,间隔5秒DefaultErrorHandler handler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template), // 死信队列恢复器new FixedBackOff(5000L, 3));// 指定可重试异常类型handler.addRetryableExceptions(NetworkException.class);handler.addNotRetryableExceptions(SerializationException.class);// 偏移量提交策略handler.setCommitRecovered(true);return handler;}// 容器工厂绑定全局处理器@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory,CommonErrorHandler globalErrorHandler) {ConcurrentKafkaListenerContainerFactory<String, Object> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setCommonErrorHandler(globalErrorHandler);return factory;}
}
2.方法级处理
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;@Slf4j
@Configuration
public class KafkaExceptionConfig {/*** 自定义异常处理器*/@Beanpublic ConsumerAwareListenerErrorHandler orderErrorHandler() {return (message, exception, consumer) -> {// 业务相关错误处理(如库存不足)/*   if (exception instanceof InventoryException) {retryService.scheduleRetry(message.getPayload());}*/System.out.println("异常执行:"+exception);return null;};}/*** 注册全局异常处理器*/@Beanpublic ConsumerAwareListenerErrorHandler globalExceptionHandler() {return (message, exception, consumer) -> {log.error("捕获消费异常: topic={}, message={}",message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),message.getPayload(),exception);// 反序列化异常特殊处理if (exception.getCause() instanceof DeserializationException) {// 跳过错消息并提交偏移量return null;}throw exception; // 其他异常继续抛出};}}
    @KafkaListener(topics = "test", groupId = "spring-group",errorHandler = "globalExceptionHandler")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));//异常测试int i = 1/0;ack.acknowledge();}
3.重试yml配置
spring:kafka:listener:retry:max-attempts: 3               # 最大重试次数backoff:initial-interval: 1000     # 初始间隔(毫秒)multiplier: 2.0            # 间隔倍数exclude-exceptions:          # 不重试的异常- javax.validation.ValidationException

4.死信队列(DLQ)配置

1. 启用死信队列
spring:kafka:listener:dead-letter-publish:enable: true                  # 自动发布到死信队列dead-letter-topic: dlq-${topic} # 死信队列命名规则
2. 手动发送到DLQ
@KafkaListener(topics = "payments")
public void handlePayment(PaymentEvent event, Acknowledgment ack) {try {paymentService.process(event);ack.acknowledge();} catch (InvalidPaymentException ex) {// 手动发送到DLQkafkaTemplate.send("dlq-payments", event);ack.acknowledge(); // 避免重复消费}
}

5.事务场景异常处理

1. 声明式事务
@Transactional
public void processWithTransaction(Order order) {// 数据库操作orderRepository.save(order);// Kafka事务消息kafkaTemplate.send("orders", order.toEvent());// 其他业务...
}
2. 事务异常回滚
@Bean
public KafkaTransactionManager<String, Object> transactionManager(ProducerFactory<String, Object> pf) {return new KafkaTransactionManager<>(pf);
}@Transactional(rollbackFor = {KafkaException.class, SQLException.class})
public void transactionalProcess() {// 数据库与Kafka操作
}

6.监控与告警

1. Actuator 健康检查
management:endpoints:web:exposure:include: health,kafkahealth:kafka:enabled: true
2. Prometheus 指标
@Bean
public MicrometerConsumerListener<K, V> consumerMetrics() {return new MicrometerConsumerListener<>("kafka.consumer");
}@Bean
public MicrometerProducerListener<K, V> producerMetrics() {return new MicrometerProducerListener<>("kafka.producer");
}

7.完整异常处理流程

  1. 捕获异常 → 2. 分类判断 → 3. 重试/记录/DLQ → 4. 提交Offset → 5. 监控告警

8.最佳实践总结

  • 分层处理:全局处理器兜底 + 方法级精细控制
  • 幂等消费:确保消息重复消费时的数据安全性
  • 监控覆盖:跟踪重试次数、DLQ堆积等关键指标
  • 事务隔离@Transactional + read_committed 保证数据一致性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/70803.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring MVC框架六:Ajax技术

精心整理了最新的面试资料&#xff0c;有需要的可以自行获取 点击前往百度网盘获取 点击前往夸克网盘获取 简介 jQuery.ajax Ajax原理 结语 创作不易&#xff0c;希望能对大家给予帮助 想要获取更多资源? 点击链接获取

数据结构与算法:二维前缀和、二维差分及离散化技巧

前言 有一维的前缀和以及差分当然有二维的~ 一、二维前缀和 1.内容 二维前缀和就是求二维数组上从&#xff08;0,0&#xff09;位置到&#xff08;i,j&#xff09;位置的累加和。 2.模板——二维区域和检索 - 矩阵不可变 class NumMatrix { public:vector<vector<i…

在 Vue 组件中,如何确认父组件在 add 模式下传入 value 的情况及其对子组件 getProducts() 方法的触发影响?

文章目录 父组件中 <ave-form> 的使用add 模式下触发逻辑value 的传入情况是否触发 getProducts()&#xff1f; 验证 add 模式下 getProducts() 是否触发结论&#xff1a; 检查父组件传入 value 的完整情况如何明确知道父组件传入的 value最终回答 父组件 index.vue子组件…

第十四届蓝桥杯Scratch11月stema选拔赛真题——小猫照镜子

编程实现&#xff1a; 小猫照镜子。(背景非源素材) 具体要求&#xff1a; 1). 运行程序&#xff0c;角色、背景如图所示&#xff1b; 完整题目可点击下方链接查看&#xff0c;支持在线编程~ 小猫照镜子_scratch_少儿编程题库学习中心-嗨信奥https://www.hixinao.com/tiku/s…

React + TypeScript 实现数据库逆向生成数据模型指南

React TypeScript 实现数据库逆向生成数据模型全栈指南 引言&#xff1a;逆向工程在现代开发中的价值 在微服务架构和快速迭代的背景下&#xff0c;数据库逆向生成数据模型已成为提升开发效率的核心技术。传统手动编写模型的方式存在模式同步延迟和类型安全缺失两大痛点。本文…

Android Audio实战——音频相关基础概念(附)

Android Audio 开发其实就是媒体源数字化的过程,通过将声波波形信号通过 ADC 转换成计算机支持的二进制的过程叫做音频采样 (Audio Sampling)。采样 (Sampling) 的核心是把连续的模拟信号转换成离散的数字信号。 一、声音的属性 1、响度 (Loudness) 响度是指人类可以感知到的…

小程序类目调整汇总公告

各位小程序开发者&#xff1a; 为进一步加强平台的规范管理&#xff0c;优化开发者类目选择体验&#xff0c;现对以下类目进行调整&#xff0c;请各位开发者知悉。 类目新增 非个人主体 #【交通服务-国际客运】 现资质要求 &#xff08;2选1&#xff09;&#xff1a; 1…

python的列表和元组别再傻傻分不清啦

目录 什么是下标&#xff1a; 正数索引&#xff1a;正数索引从左到右&#xff0c;从 0 开始。 负数索引&#xff1a;负数索引从右到左&#xff0c;从 -1 开始。 切片&#xff08;slice&#xff09;&#xff1a;除了单个元素&#xff0c;Python还支持通过切片访问序列的子集。…

dubbo转http方式调用

业务背景&#xff1a;在当前项目下&#xff0c;所有前端请求均通过外层网关转发到后端这边的dubbo服务&#xff0c;现计划去掉网关层&#xff0c;由前端直接http调用后端dubbo。 解决方案&#xff1a;在前端调用方式不变的前提下&#xff0c;后端服务新建controller层&#xf…

OpenHarmony构建系统实践-跨部件引用

上一篇通过gn构建系统利用部件构建了可执行程序、动态库和配置文件&#xff0c;以及部件内的引用&#xff0c;本篇通过实现跨部件的模块引用&#xff0c;通过实现部件间的使用方法&#xff0c;以此来达到复用三方部件和模块库的目的。 本节以实现两个自定义的部件为例&#xff…

在 compare-form.vue 中添加 compareDate 隐藏字段,并在提交时自动填入当前时间

在 compare-form.vue 中添加 compareDate 隐藏字段&#xff0c;并在提交时自动填入当前时间。 提交表单时存入的对象是FakeRegistration&#xff0c;这个对象里面有compareDate字段&#xff0c;刚好表格查询的对象也是FakeRegistration&#xff0c;所以表格展示的时间就是刚才…

Windows 11【1001问】如何安装Windows 11

紧接上篇内容&#xff0c;本文详细介绍了从准备工作到具体安装步骤的完整流程&#xff0c;帮助用户顺利完成Windows 11系统的安装。内容涵盖了ISO镜像文件的下载与校验、启动U盘的制作、硬件兼容性检查&#xff0c;以及BIOS/UEFI设置和系统安装过程中的关键步骤。通过逐步指导&…

Chromedriver与Chrome版本映射表

‌Chromedriver与Chrome版本映射表‌如下&#xff1a; ‌Chrome 71-73版本对应Chromedriver 2.46‌‌Chrome 70-72版本对应Chromedriver 2.45‌‌Chrome 69-71版本对应Chromedriver 2.44‌‌Chrome 68-70版本对应Chromedriver 2.43‌‌Chrome 67-69版本对应Chromedriver 2.42‌…

LSM-Tree (日志结构合并树)

LSM-Tree&#xff08;日志结构合并树&#xff09;是一种高效处理写操作的存储结构&#xff0c;广泛应用于NoSQL数据库如LevelDB和RocksDB。其核心思想是将随机写入转换为顺序写入&#xff0c;提升吞吐量。以下是其原理及Java实现示例&#xff1a; ### **LSM-Tree 原理** 1. **…

【玩转 Postman 接口测试与开发2_020】(完结篇)DIY 实战:随书示例 API 项目本地部署保姆级搭建教程(含完整调试过程)

《API Testing and Development with Postman》最新第二版封面 文章目录 最新版《Postman 接口测试与开发实战》示例 API 项目本地部署保姆级搭建教程1 前言2 准备工作3 具体部署3.1 将项目 Fork 到自己名下3.2 创建虚拟环境并安装依赖3.3 初始运行与项目调试 4 示例项目的用法…

3-提前结束训练

一、核心类 class EarlyStopping:# YOLOv5 simple early stopperdef __init__(self, patience30):self.best_fitness 0.0 # i.e. mAPself.best_epoch 0self.patience patience or float(inf) # epochs to wait after fitness stops improving to stopself.possible_stop …

若依 ruoyi-vue 根据角色切换路由菜单权限 SAAS

后端根据角色查询相应的菜单&#xff08;角色对应管理的系统&#xff09; /*** 获取路由信息根据角色&#xff08;系统类型&#xff09;** return 路由信息*/GetMapping("getRoutersBySystemType")public AjaxResult getRoutersBySystemType(String systemType) {Lon…

2024最新版鸿蒙纯血原生应用开发教程文档丨学习ArkTS语言-基本语法

ArkTS是HarmonyOS的主要应用开发语言&#xff0c;在TypeScript基础上进行了扩展&#xff0c;保留了其基本风格&#xff0c;并通过增强静态检查和分析来提高程序的稳定性和性能。本教程将帮助开发者掌握ArkTS的核心功能、语法及最佳实践&#xff0c;以便高效地构建高性能移动应用…

使用插件 `vue2-water-marker`添加全局水印

使用插件 vue2-water-marker添加全局水印 效果图 1、安装插件 npm install vue2-water-marker --save2、全局注册 // main.js import Vue from vue import Vue2WaterMarker from vue2-water-markerVue.use(Vue2WaterMarker)3、在组件中使用 <template><div id&q…

docker安装etcd:docker离线安装etcd、docker在线安装etcd、etcd镜像下载、etcd配置详解、etcd常用命令、安装常见问题总结

官方网站 官方网址&#xff1a;etcd 二进制包下载&#xff1a;Install | etcd GitHub社区项目&#xff1a;etcd-io GitHub GitHub社区项目版本历史&#xff1a;Releases etcd-io/etcd GitHub 一、镜像下载 1、在线下载 在一台能连外网的linux上执行docker镜像拉取命令…