Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

应用场景

前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略:

  • 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的成功率。
  • 自定义错误处理逻辑:如果业务上,消息处理失败之后有明确的降级逻辑可以弥补的,可以采用这种方式,但是2.0.x版本有Bug,2.1.x版本修复。

那么如果代码本身存在逻辑错误,无论重试多少次都不可能成功,也没有具体的降级业务逻辑,之前在深入思考中讨论过,可以通过日志,或者降级逻辑记录的方式把错误消息保存下来,然后事后分析、修复Bug再重新处理。但是很显然,这样做非常原始,并且太过笨拙,处理复杂度过高。所以,本文将介绍利用中间件特性来便捷地处理该问题的方案:使用RabbitMQ的DLQ队列。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程。也可以新建一个,然后创建如下代码的逻辑:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}

@RestController
static class TestController {

@Autowired
private TestTopic testTopic;

/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}

}

/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}

}

interface TestTopic {

String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

}

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

这里加入了一个重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true,用来开启DLQ(死信队列)。完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到消费失败后抛出了异常,消息消费失败,记录了日志。此时,可以查看RabbitMQ的控制台如下:

其中,test-topic.stream-exception-handler.dlq队列就是test-topic.stream-exception-handler的dlq(死信)队列,当test-topic.stream-exception-handler队列中的消息消费时候之后,就会将这条消息原封不动的转存到dlq队列中。这样这些没有得到妥善处理的消息就通过简单的配置实现了存储,之后,我们还可以通过简单的操作对这些消息进行重新消费。我们只需要在控制台中点击test-topic.stream-exception-handler.dlq队列的名字进入到详情页面之后,使用Move messages功能,直接将这些消息移动回test-topic.stream-exception-handler队列,这样这些消息就能重新被消费一次。

如果Move messages功能中是如下内容:

To move messages, the shovel plugin must be enabled, try:

$ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

那是由于没有安装对应的插件,只需要根据提示的命令安装就能使用该命令了。

深入思考

先来总结一下在引入了RabbitMQ的DLQ之后,对于消息异常处理更为完整一些的基本思路:

  1. 瞬时的环境抖动引起的异常,利用重试功能提高处理成功率
  2. 如果重试依然失败的,日志报错,并进入DLQ队列
  3. 日志告警通知相关开发人员,分析问题原因
  4. 解决问题(修复程序Bug、扩容等措施)之后,DLQ队列中的消息移回重新处理

在这样的整体思路中,可能还涉及一些微调,这里举几个常见例子,帮助读者进一步了解一些特殊的场景和配置使用!

场景一:有些消息在业务上存在时效性,进入死信队列之后,过一段时间再处理已经没有意义,这个时候如何过滤这些消息呢?

只需要配置一个参数即可:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000

该参数可以控制DLQ队列中消息的存活时间,当超过配置时间之后,该消息会自动的从DLQ队列中移除。

场景二:可能进入DLQ队列的消息存在各种不同的原因(不同异常造成的),此时如果在做补救措施的时候,还希望根据这些异常做不同的处理时候,我们如何区分这些消息进入DLQ的原因呢?

再来看看这个参数:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true

该参数默认是false,如果设置了死信队列的时候,会将消息原封不动的发送到死信队列(也就是上面例子中的实现),此时大家可以在RabbitMQ控制台中通过Get message(s)功能来看看队列中的消息,应该如下图所示:

这是一条原始消息。

如果我们该配置设置为true的时候,那么该消息在进入到死信队列的时候,会在headers中加入错误信息,如下图所示:

这样,不论我们是通过移回原通道处理还是新增订阅处理这些消息的时候就可以以此作为依据进行分类型处理了。

关于RabbitMQ的binder中还有很多关于DLQ的配置,这里不一一介绍了,上面几个是目前笔者使用过的几个,其他一些暂时认为采用默认配置已经够用,除非还有其他定制要求,或者是存量内容,需要去适配才会去配置。读者可以查看官方文档了解更多详情!

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-3项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477046.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1382. 将二叉搜索树变平衡(中序遍历+二分递归)

1. 题目 给你一棵二叉搜索树,请你返回一棵 平衡后 的二叉搜索树,新生成的树应该与原来的树有着相同的节点值。 如果一棵二叉搜索树中,每个节点的两棵子树高度差不超过 1 ,我们就称这棵二叉搜索树是 平衡的 。 如果有多种构造方…

电影:『新警察故事』

【电影名称】:『新警察故事』 【主 演】: 成龙 谢霆锋 杨采妮 蔡卓妍 吴彦祖  【导 演】: 陈木胜 【内容简介】:《新警察故事》是成龙英皇电影公司的处女作,投资超过1亿6000万港元,请来香港顶尖电影…

NLP顶级赛事LIC2022霸榜经验分享!

语言是人类传递信息最重要的媒介,让机器理解语言并进行交互是人工智能的重要挑战。为推动语言与智能领域的技术发展和应用,中国中文信息学会、中国计算机学会和百度公司连续五年联合举办“语言与智能技术竞赛”,为中文NLP研究者和开发者提供同…

Spring Cloud Zuul中使用Swagger汇总API接口文档

有很多读者问过这样的一个问题:虽然使用Swagger可以为Spring MVC编写的接口生成了API文档,但是在微服务化之后,这些API文档都离散在各个微服务中,是否有办法将这些接口都整合到一个文档中?之前给大家的回复都只是简单的…

LeetCode 1381. 设计一个支持增量操作的栈(deque/数组)

1. 题目 请你设计一个支持下述操作的栈。 实现自定义栈类 CustomStack : CustomStack(int maxSize):用 maxSize 初始化对象,maxSize 是栈中最多能容纳的元素数量,栈在增长到 maxSize 之后则不支持 push 操作。void push(int x)…

GARFIELD@10-31-2004

apprentice转载于:https://www.cnblogs.com/rexhost/archive/2004/10/31/59013.html

Spring Cloud构建微服务架构:分布式服务跟踪(入门)【Dalston版】

通过之前的N篇博文介绍,实际上我们已经能够通过使用它们搭建起一个基础的微服务架构系统来实现我们的业务需求了。但是,随着业务的发展,我们的系统规模也会变得越来越大,各微服务间的调用关系也变得越来越错综复杂。通常一个由客户…

有哪些值得计算机专业学生加入的国企?

文 |重庆搬砖喵知乎知乎上最近有个问题很火:有哪些值得计算机专业学生加入的国企?这个问题确实很应今年秋招的景,于是转载了知乎答主重庆搬砖喵 的高赞回答分享给大家。原回答链接:https://www.zhihu.com/question/285730093/answ…

上海著名综合性商厦一览 (1)

上海著名综合性商厦一览 jxjb 2004-10-29 11:53:21 发表于搜狐焦点上海房地产网-谈房论市-东方康洛论坛 主要包括:第一百货东楼 华联商厦 置地广场 友谊欧洲商城 东方商厦 港汇广场 汇金百货 梅龙镇广场 中环广场二百永新 正大广场 上海第一八佰伴友谊南方商城…

LeetCode 1383. 最大的团队表现值(贪心,优先队列,难)

1. 题目 公司有编号为 1 到 n 的 n 个工程师,给你两个数组 speed 和 efficiency ,其中 speed[i] 和 efficiency[i] 分别代表第 i 位工程师的速度和效率。 请你返回由最多 k 个工程师组成的 ​​​​​​最大团队表现值 ,由于答案可能很大&am…

程序员坐牢了,会被安排去写代码吗?

文 | 无念源 | 知乎今天给大家分享一篇有意思的爽文,但也是根据多年之前一个真实报道改编而来的。本文字数较多,建议先收藏,上下班路上、带薪上厕所、浑水摸鱼时再慢慢看~本故事纯属虚构请大家不要随意模仿,后果自负!因…

Dubbo将积极适配Spring Cloud生态,Spring Cloud体系或将成为微服务的不二选择!

2016年,我在博客中发表过一篇《微服务架构的基础框架选择:Spring Cloud还是Dubbo?》获得了很大的阅读量和转载量。在这篇文章中,我主要对比了Spring Cloud与Dubbo所具备的能力,并阐述了个人推崇Spring Cloud的原因。但…

Java sdk及tomcat安装设置

在安装好Java SDK后,还需要设置一些系统变量,系统变量的设置可以在系统属性-高级-环境变量中进行 JAVA_HOMEJAVA安装目录 CLASSPATH… 库路径,可以是目录或jar文件,如C:\j2sdk1.4.0_01\lib\dt.jar;d:\java&#xf…

剑指Offer - 面试题51. 数组中的逆序对(归并排序,求逆序对)

1. 题目 在数组中的两个数字&#xff0c;如果前面一个数字大于后面的数字&#xff0c;则这两个数字组成一个逆序对。输入一个数组&#xff0c;求出这个数组中的逆序对的总数。 示例 1: 输入: [7,5,6,4] 输出: 5限制&#xff1a; 0 < 数组长度 < 50000来源&#xff1a;力…

【小马哥】Spring Cloud系列讲座

这里推荐一个不错的Spring Cloud系列讲座&#xff0c;讲师简介如下&#xff1a; 小马哥&#xff0c;阿里巴巴技术专家&#xff0c;从事十余年Java EE 开发&#xff0c;国内微服务技术讲师。目前主要负责微服务技术推广、架构设计、基础设施、迁移等。重点关注云计算、微服务以及…

名校女教授,强迫其男博士“发生性关系多年”!索赔750万

转自 | 论文项目硕博招聘、磐创AI来源 | FOREIGNDAILY 、双一流高校等早稻田大学是日本的知名大学&#xff0c;但最近这所知名大学却引来了不小的争议。早稻田大学一名25岁的学生&#xff0c;将他的女导师和学校一起告上了法庭&#xff0c;要求他们赔偿自己750万日元&#xff…

剑指Offer - 面试题43. 1~n整数中1出现的次数(找规律+公式)

1. 题目 输入一个整数 n &#xff0c;求1&#xff5e;n这n个整数的十进制表示中1出现的次数。 例如&#xff0c;输入12&#xff0c;1&#xff5e;12这些整数中包含1 的数字有1、10、11和12&#xff0c;1一共出现了5次。 示例 1&#xff1a; 输入&#xff1a;n 12 输出&…

从网上看到的很搞笑的东西

[转]天下无贼经典台词IT版 戴尔&#xff1a;“人心散了&#xff0c;队伍不好带了!“ IBM&#xff1a;“我最烦你们些胡乱降价的了&#xff0c;一点技术含量都没有!“ 惠普&#xff1a;“实话告诉你们&#xff0c;惠普很生气&#xff0c;后果很严重!“ Intel&#xff1a;“你过得…

Spring Cloud实战小贴士:turbine如何聚合设置了context-path的hystrix数据

之前在spring for all社区看到这样一个问题&#xff1a;当actuator端点设置了context-path之后&#xff0c;turbine如何聚合数据&#xff1f;首先&#xff0c;我们要知道actuator端点设置了context-path是什么意思&#xff1f;也就是说&#xff0c;此时spring boot actuator的端…

prompt范式有了商业模式,卖提示词可以赚钱??

文 | 衡宇 发自 凹非寺源 | 量子位DALLE 2、GPT-3提示词在线交易平台&#xff0c;了解一下&#xff1f;最近&#xff0c;一家初创公司PromptBase&#xff0c;允许用户在该平台买卖提示词&#xff08;Prompt&#xff09;&#xff0c;提示词售价为1.99美元。PromptBase将向卖家抽…