Acknowledgment.nack方法重试消费kafka消息异常

文章目录

  • 问题
    • 示例
    • 异常
  • 原因
    • nack方法
    • Acknowledgment接口
    • 实现类:ConsumerAcknowledgment
    • 实现类:ConsumerBatchAcknowledgment
  • 解决方案
    • 1 批量消费指定index
      • 示例
    • 2 单条消费
      • 示例

问题

使用BatchAcknowledgingMessageListener 批量消费Kafka消息,成功则手动提交offset,失败则重试。消费成功的情况下没有问题,但消费失败情况下,调用nack方法重试时则报异常。

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {private MessageHandler messageHandler;public BatchCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(List data, Acknowledgment acknowledgment) {try {messageHandler.handle(data); // 处理多条消息acknowledgment.acknowledge(); // 成功处理后提交偏移量} catch (Exception e) {// 消息处理失败,30min后重试// nack作用:将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。acknowledgment.nack(30 * 60 * 1000); // 这里报了异常}}
}

上边的代码乍一看没啥问题,编译,启动也都没报错。但是在执行nack的时候进到了Acknowledgment接口默认nack(sleep) 方法里边,并抛出异常。

nack(sleep) is not supported by this Acknowledgment

异常

在这里插入图片描述
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/8bbf4f37af134374bf1f4e34c2687dbe.png

原因

Acknowledgment接口有两个nack方法:nack(long sleep)nack(int index, long sleep), 以及两个实现类ConsumerAcknowledgmentConsumerBatchAcknowledgment。ConsumerAcknowledgment仅实现了nack(long sleep),而ConsumerBatchAcknowledgment仅实现了nack(int index, long sleep)。

nack方法

注意:调用nack方法后,将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。

Acknowledgment接口

在这里插入图片描述

实现类:ConsumerAcknowledgment

在这里插入图片描述

实现类:ConsumerBatchAcknowledgment

在这里插入图片描述

这样的设计也很好理解。。
当BatchAcknowledgingMessageListener批量消费消息时, 使用的是ConsumerBatchAcknowledgment,重试时需要告诉ConsumerBatchAcknowledgment要从这批量消息中的哪条开始重试消费,即要指定index值。我的例子中调用的是nack(long sleep),没有指定index,所以进到了默认方法里,抛了异常。

而使用AcknowledgingMessageListener消费单条消息时,使用的是ConsumerAcknowledgment,重试时它知道重试当前的消息,因为就这一条,所以只需要指定重试时间就可以了。

也就是说批量消费时,重试要调用nack(int index, long sleep),单条消费时,重试要调用nack(long sleep),二者不搭配,就会抛不支持该方法的异常。

解决方案

1 批量消费指定index

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {private MessageHandler messageHandler;public BatchCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(List data, Acknowledgment acknowledgment) {int index = 0;try {for (; index < data.size(); index++) {messageHandler.handle(data.get(index)); // 处理单条消息}// 成功处理后提交偏移量acknowledgment.acknowledge();} catch (Exception e) {// 消息处理失败,30min后重试index及index之后的消息acknowledgment.nack(index, 30 * 60 * 1000);}}}

2 单条消费

改成单条消费消息,调用nack(long sleep)

示例

public class SingleCustomMessageListener implements AcknowledgingMessageListener {private MessageHandler messageHandler;public SingleCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {try {messageHandler.handle(data); // 处理单条消息acknowledgment.acknowledge();} catch (Exception e) {acknowledgment.nack(30 * 60 * 1000);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/72083.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 反序列化 - commons collection 之困(一)

#01多余的碎碎念 说到 java 反序列化&#xff0c;去搜索的话能看到网上有很多分析关于 commons collection 利用链的文章&#xff0c;emm 我一开始看不懂&#xff0c;看到很多代码的图头晕。 这篇文章的话其实是我跟着 p 神的文章一路走下来的&#xff0c;所以整个逻辑会按照…

python LLM工具包

阿里云镜像pypi http://mirrors.aliyun.com/pypi/simple/ modelscope魔塔 pip install modelscope https://modelscope.cn/docs/models/download Sentence-transformers pip install -U sentence-transformers pip3 install torch -i https://pypi.tuna.tsinghua.edu.cn/sim…

Linux账号和权限管理

用户账户管理 理论 /etc/passwd 该目录用于保存用户名&#xff0c;宿主目录&#xff0c;登录shel等基本信息 /etc/shadow 该目录用于保存 用户密码&#xff0c;账户有效期等信息 图上每一行中都有用“&#xff1a;”隔断的字段 字段含义&#xff1a; 第1字段:用户账号的名…

晋升系列4:学习方法

每一个成功的人&#xff0c;都是从底层开始打怪&#xff0c;不断的总结经验&#xff0c;一步一步打上来的。在这个过程中需要坚持、总结方法论。 对一件事情长久坚持的人其实比较少&#xff0c;在坚持的人中&#xff0c;不断的总结优化的更少&#xff0c;所以最终达到高级别的…

win32汇编环境,对话框中使用树形视图示例四

;运行效果,当点击张辽时,展示张辽的图像 ;当点击曹仁时,展示曹仁的图像 ;win32汇编环境,对话框中使用树形视图示例四 ;当点击树形视图treeview控件中的某项时,展示某些功能。这里展示的是当点到某个将领时,显示某个将领的图像 ;直接抄进RadAsm可编译运行。重要部分加备注。…

智慧停车小程序:实时车位查询、导航与费用结算一体化

智慧停车小程序:实时车位查询、导航与费用结算一体化 一、城市停车困境的数字化突围 中国机动车保有量突破4.3亿辆,但车位供给缺口达8000万。传统停车管理模式存在三大致命伤: 盲盒式寻位:62%的车主遭遇"地图显示有位,到场已满员"的窘境迷宫式导航:商场停车场…

Windows server网络安全

摘要 安全策略 IP安全策略&#xff0c;简单的来说就是可以通过做相应的策略来达到放行、阻止相关的端口&#xff1b;放行、阻止相关的IP&#xff0c;如何做安全策略&#xff0c;小编为大家详细的写了相关的步骤&#xff1a; 解说步骤&#xff1a; 阻止所有&#xff1a; 打…

充电桩快速搭建springcloud(微服务)+前后端分离(vue),客户端实现微信小程序+ios+app使用uniapp(一处编写,处处编译)

充电桩管理系统是专为中小型充电桩运营商、企业和个人开发者设计的一套高效、灵活的管理平台。系统基于Spring Cloud微服务架构开发&#xff0c;采用模块化设计&#xff0c;支持单机部署与集群部署&#xff0c;能够根据业务需求动态扩展。系统前端使用uniapp框架&#xff0c;可…

小肥柴慢慢手写数据结构(C篇)(4-3 关于栈和队列的讨论)

小肥柴慢慢学习数据结构笔记&#xff08;C篇&#xff09;&#xff08;4-3 关于栈和队列的讨论&#xff09; 目录1 双端栈/队列2 栈与队列的相互转化2-1 栈转化成队列2-2 队列转化成栈 3 经典工程案例3-1 生产者和消费者模型&#xff08;再次重温环形缓冲区&#xff09;3-2 MapR…

labview实现大小端交换移位

在解码时遇到了大小端交换的问题&#xff0c;需要把高低字节的16进制值进行互换&#xff0c;这里一时间不知道怎么操作&#xff0c;本来打算先把16进制转字节数组&#xff0c;算出字节数组的大小&#xff0c;然后通过模2得到0&#xff0c;1&#xff0c;来判断是否为奇数位和偶数…

在Windows系统上安装和配置Redis服务

&#x1f31f; 在Windows系统上安装和配置Redis服务 Redis是一个高性能的键值存储数据库&#xff0c;广泛用于缓存、消息队列和实时分析等场景。虽然Redis最初是为Linux设计的&#xff0c;但也有Windows版本可供使用。今天&#xff0c;我将详细介绍如何在Windows系统上安装Red…

Ateme在云端构建可扩展视频流播平台

Akamai Connected Cloud帮助Ateme客户向全球观众分发最高质量视频内容。 “付费电视运营商和内容提供商现在可以在Akamai Connected Cloud上通过高质量视频吸引观众&#xff0c;并轻松扩展。”── Ateme首席战略官Rmi Beaudouin ​ Ateme是全球领先的视频压缩和传输解决方案提…

DeepSeek进阶应用(一):结合Mermaid绘图(流程图、时序图、类图、状态图、甘特图、饼图)

&#x1f31f;前言: 在软件开发、项目管理和系统设计等领域&#xff0c;图表是表达复杂信息的有效工具。随着AI助手如DeepSeek的普及&#xff0c;我们现在可以更轻松地创建各种专业图表。 名人说&#xff1a;博观而约取&#xff0c;厚积而薄发。——苏轼《稼说送张琥》 创作者&…

deepseek R1提供的3d迷宫设计方案

一、技术选型方案 核心渲染技术 &#x1f3a8; 采用Raycasting算法模拟3D透视效果使用Canvas 2D上下文进行逐像素绘制材质贴图系统实现墙面差异化表现 迷宫数据结构 &#x1f5fa;️ 二维数组存储迷宫布局&#xff08;0:通路&#xff0c;1:墙体&#xff09;递归回溯算法生成随…

时序数据库TimescaleDB基本操作示例

好的&#xff01;以下是使用 TimescaleDB 的 Java 示例&#xff08;基于 JDBC&#xff0c;因为 TimescaleDB 是 PostgreSQL 的扩展&#xff0c;官方未提供独立的 Java SDK&#xff09;&#xff1a; 1. 添加依赖&#xff08;Maven&#xff09; <dependency><groupId&g…

linux下的网络抓包(tcpdump)介绍

linux下的网络抓包[tcpdump]介绍 前言tcpdump1. 安装 tcpdump2. 基本抓包命令3. 过滤器使用4. 保存捕获的数据包 异常指标1. 连接建立与断开相关指标异常 SYN 包异常 FIN 或 RST 包 2. 流量相关指标异常流量峰值异常源或目的 IP 流量 3. 端口相关指标异常端口使用端口扫描 4. 数…

C/C++中使用CopyFile、CopyFileEx原理、用法、区别及分别在哪些场景使用

文章目录 1. CopyFile原理函数原型返回值用法示例适用场景 2. CopyFileEx原理函数原型返回值用法示例适用场景 3. 核心区别4. 选择建议5. 常见问题6.区别 在Windows系统编程中&#xff0c;CopyFile和CopyFileEx是用于文件复制的两个API函数。它们的核心区别在于功能扩展性和控制…

Bash和Zsh在处理大文件时差异

在处理大文件时&#xff0c;Bash 和 Zsh 的差异主要体现在几个方面&#xff1a; 1. 脚本执行速度 Bash: 性能: Bash在执行脚本时通常表现良好&#xff0c;尤其是在处理大量数据或大文件时。Bash的脚本执行速度相对较快&#xff0c;适合大多数日常使用场景。优化: Bash在处理大…

不同AI生成的PHP版雪花算法

OpenAI <?php /*** Snowflake 雪花算法生成器* 生成的 64 位 ID 结构&#xff1a;* 1 位 保留位&#xff08;始终为0&#xff0c;防止负数&#xff09;* 41 位 时间戳&#xff08;毫秒级&#xff0c;当前时间减去自定义纪元&#xff09;* 5 位 数据中心ID* 5 …

Android Telephony 四大服务和数据网络控制面数据面介绍

在移动通信和Android系统中,涉及的关键概念和服务以及场景案例说明如下: 一、概念 (一)Android Telephony 的四大服务 介绍Telephony Data 与 Android Data 的四大服务在Android系统中,与电话(Telephony)和移动数据(Data)相关的核心服务主要包括以下四类: 1. Tele…