使用Spring Integration重试RabbitMQ

我最近阅读了有关使用RabbitMQ重试的方法
在这里,并想尝试类似的方法
Spring Integration ,提供了一组很棒的集成抽象。

TL; DR解决的问题是重试一次消息(在处理失败的情况下),两次重试之间有较大的延迟(例如10分钟以上)。 该方法利用RabbitMQ支持
死信交换 ,看起来像这样

canvas-2

流程的要点是:

1.工作调度员创建“工作单元”,并通过交换机将其发送到RabbitMQ队列。

2.工作队列设置为
死信交换 。 如果消息处理由于任何原因失败,则“工作单元”将以“工作单元死信队列”结束。

3.依次将工作单位死信队列与工作单位交换设置为死信交换,以此方式创建一个循环。 此外,将死信队列中的消息过期设置为10分钟,这样,一旦消息过期,它将再次返回到工作单元队列中。

4.要打破周期,一旦超过某个计数阈值,处理代码就必须停止处理。

使用Spring Integration实现

我已经使用Spring Integration和RabbitMQ讲述了一条快乐的小路
在之前 ,这里我将主要基于此代码构建。

设置的一个很好的部分是适当的死信交换/队列的配置,当使用Spring的Java配置表示时,看起来像这样:

@Configuration
public class RabbitConfig {@Autowiredprivate ConnectionFactory rabbitConnectionFactory;@BeanExchange worksExchange() {return ExchangeBuilder.topicExchange("work.exchange").durable().build();}@Beanpublic Queue worksQueue() {return QueueBuilder.durable("work.queue").withArgument("x-dead-letter-exchange", worksDlExchange().getName()).build();}@BeanBinding worksBinding() {return BindingBuilder.bind(worksQueue()).to(worksExchange()).with("#").noargs();}// Dead letter exchange for holding rejected work units..@BeanExchange worksDlExchange() {return ExchangeBuilder.topicExchange("work.exchange.dl").durable().build();}//Queue to hold Deadletter messages from worksQueue@Beanpublic Queue worksDLQueue() {return QueueBuilder.durable("works.queue.dl").withArgument("x-message-ttl", 20000).withArgument("x-dead-letter-exchange", worksExchange().getName()).build();}@BeanBinding worksDlBinding() {return BindingBuilder.bind(worksDLQueue()).to(worksDlExchange()).with("#").noargs();}...
}

请注意,这里我将“死信”队列的TTL设置为20秒,这意味着20秒后,一条失败的消息将返回到处理队列中。 一旦完成此设置并在RabbitMQ中创建了适当的结构,代码的消耗部分将如下所示,使用
Spring Integration Java DSL :

@Configuration
public class WorkInbound {@Autowiredprivate RabbitConfig rabbitConfig;@Beanpublic IntegrationFlow inboundFlow() {return IntegrationFlows.from(Amqp.inboundAdapter(rabbitConfig.workListenerContainer())).transform(Transformers.fromJson(WorkUnit.class)).log().filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel")).handle("workHandler", "process").get();}}

这里的大多数重试逻辑是由RabbitMQ基础结构处理的,这里唯一的变化是通过在特定的2次重试后显式丢弃消息来打破周期。 此中断表示为上面的过滤器,查看了RabbitMQ一旦发送到Dead Letter交换后将其添加到消息的称为“ x-death”的标头。 过滤器确实有些丑陋-可以用Java代码更好地表达它。

还有一点要注意的是,重试逻辑可以使用Spring Integration在过程中表示,但是我想研究一个重试时间可能很长(例如15到20分钟)的流程,该流程在过程中无法正常工作而且也不安全,因为我希望应用程序的任何实例都可以处理消息重试。

如果您想进一步探索,请尝试在
我的github仓库 – https://github.com/bijukunjummen/si-dsl-rabbit-sample

参考:

使用RabbitMQ重试:http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq

翻译自: https://www.javacodegeeks.com/2016/09/rabbitmq-retries-using-spring-integration.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/351113.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue_(Router路由)-vue-router路由的基本用法

vue-router官网&#xff1a;传送门 vue-router起步&#xff1a;传送门 vue-router路由&#xff1a;Vue.js官网推出的路由管理器&#xff0c;方便的构建单页应用 单页应用&#xff1a;Single Page Application简称SPA&#xff0c;只有一个web页面的应用&#xff0c;用户与应用交…

利用boost做string到wstring转换,以及字符集转换

#include <boost/locale.hpp> int _tmain(int argc, _TCHAR* argv[]) {//std::locale::global(std::locale("utf-8"));std::locale::global(std::locale("")); // 设置全局的C运行库locale 可以针对cout fstream等单独设置 空表示默认使用当前系统…

P4198 楼房重建

[Luogu4198] 原题解 19.3.21 用线段树维护有关单调栈的问题 不要pushdown , 但是pushup的时候需要特别注意. 19.3.31 这里的\(pushup2\)其实就是几个特判 : 没有 , 直接返回当前区间答案 , 区间长度为\(1\) , 以及剩下两大类 , 这里有一个模板 : if(mx[ls]<tmp) return push…

Linux多线程实践(1) --线程理论

线程概念 在一个程序里的一个执行路线就叫做线程&#xff08;thread&#xff09;。更准确的定义是&#xff1a;线程是“一个进程内部的控制序列/指令序列”; 一切进程至少有一个执行线程; 进程 VS. 线程 1.进程是资源分配(进程需要参与资源的竞争)的基本单位,而线程是处理器调…

蓝桥杯 密文搜索(全排列)

题目描述福尔摩斯从X星收到一份资料&#xff0c;全部是小写字母组成。他的助手提供了另一份资料&#xff1a;许多长度为8的密码列表。福尔摩斯发现&#xff0c;这些密码是被打乱后隐藏在先前那份资料中的。请你编写一个程序&#xff0c;从第一份资料中搜索可能隐藏密码的位置。…

openshift_为Openshift + MongoDb应用程序编写验收测试

openshift验收测试用于确定是否满足规范要求。 它应在与生产环境尽可能相似的环境中运行。 因此&#xff0c;如果您的应用程序已部署到Openshift中&#xff0c;则您将需要一个与生产环境中使用的帐户平行的帐户&#xff0c;以运行测试。 在这篇文章中&#xff0c;我们将为部署到…

学 Win32 汇编[28] - 跳转指令: JMP、JECXZ、JA、JB、JG、JL、JE、JZ、JS、JC、JO、JP 等

跳转指令分三类:一、无条件跳转: JMP;二、根据 CX、ECX 寄存器的值跳转: JCXZ(CX 为 0 则跳转)、JECXZ(ECX 为 0 则跳转);三、根据 EFLAGS 寄存器的标志位跳转, 这个太多了.根据标志位跳转的指令: JE ;等于则跳转 JNE ;不等于则跳转JZ ;为 0 则跳转 JNZ ;不为 0 则跳转JS…

广告行业一些常用物料的尺寸

10-13 14:13 设计 /淘宝 海报 50cm 70cm &#xff08;宽 高&#xff09; 57cm 84cm &#xff08;宽 高&#xff09; 横幅 横幅尺寸高度默认为整10为单位&#xff0c;50、60、70、长度视环境而定&#xff0c;材料一般为牛津布&#xff0c;旗帜布&#xff0c;颜色有双色有彩…

Spring Security和自定义密码编码

在上一篇文章中&#xff0c;我们使用jdbc和md5密码编码将密码编码添加到了我们的spring安全配置中。 但是&#xff0c;在定制UserDetailsS​​ervices的情况下&#xff0c;我们需要对安全配置进行一些调整。 我们需要创建一个DaoAuthenticationProvider bean&#xff0c;并将…

智能变电站协议系列-2、SV/SMV协议示例(IEC61850)以及5G专网下的电力方案分析

文章目录 一、前言二、资料准备三、libiec61850的SV运行示例及抓包分析1、单独编译示例程序2、运行示例程序及5G专网场景下部署3、wireshark抓包分析 四、最后 一、前言 之前我们对IEC61850协议有了整体的了解&#xff0c;对一些概念有了一定的认识&#xff0c;并针对GOOSE协议…

php 常用的知识点归集(下)

24、静态属性与静态方法在类中的使用 需求&#xff1a;在玩CS的时候不停有伙伴加入&#xff0c;那么现在想知道共有多少人在玩&#xff0c;这个时候就可能用静态变量的方法来处理 利用原有的全局变量的方法来解决以上的问题 <?php header(content-type:text/html;charsetut…

GDB下查看内存命令(x命令)

可以使用examine命令(简写是x)来查看内存地址中的值。x命令的语法如下所示&#xff1a; x/<n/f/u> <addr> n、f、u是可选的参数。 n是一个正整数&#xff0c;表示需要显示的内存单元的个数&#xff0c;也就是说从当前地址向后显示几个内存单元的内容&#xff0c;一…

[19/03/21-星期四] 异常(Exception) (一)

一、引言 在实际工作中&#xff0c;我们遇到的情况不可能是非常完美的。比如&#xff1a;你写的某个模块&#xff0c;用户输入不一定符合你的要求;你的程序要打开某个文件&#xff0c; 这个文件可能不存在或者文件格式不对 &#xff0c;你要读取数据库的数据&#xff0c;数据可…

jax-rs jax-ws_JAX-RS Bean验证错误消息国际化

jax-rs jax-wsBean验证简介 JavaBeans验证&#xff08;Bean验证&#xff09;是Java EE 6平台的一部分提供的新验证模型。 约束通过以JavaBeans组件&#xff08;例如托管Bean&#xff09;的字段&#xff0c;方法或类上的注释形式的约束来支持Bean验证模型。 javax.validation.c…

补码和原码的转化过程

在计算机系统中&#xff0c;数值一律用补码来表示&#xff08;存储&#xff09;。 主要原因&#xff1a; 使用补码&#xff0c;可以将符号位和其它位统一处理&#xff1b;同时&#xff0c;减法也可按加法来处理。另外&#xff0c;两个用补 码表示的数相加时&#xff0c;如果最高…

Flask实现群聊

后端 from geventwebsocket.handler import WebSocketHandler from gevent.pywsgi import WSGIServer from geventwebsocket.websocket import WebSocket from flask import Flask,request,render_template user_socket_list [] app Flask(__name__)app.route("/conn_ws…

第一单元总结:基于基础语言、继承和接口的简单OOP

前情提要 到目前为止&#xff0c;OO课程已经完成了前三次的作业&#xff0c;分别为&#xff1a; 第一次作业&#xff1a;简单多项式的构造和求导。【正则表达式】【数据结构】【排序】第二次作业&#xff1a;含三角函数因子的复杂多项式的构造、求导和化简。【递归下降】【DFS】…

JUnit与TestNG:您应该选择哪种测试框架?

JUnit与TestNG&#xff1a;测试框架对决 在平衡良好的开发人员团队中&#xff0c;测试是软件发布周期中不可分割的一部分。 并非总是那样。 单元测试&#xff0c;集成测试&#xff0c;系统测试等并不总是存在的。 如今&#xff0c;我们很幸运能及时到达一个测试很重要且其价值…

小尾数,大尾数

大尾小尾 是数据在存储器中的存储格式&#xff0c;INtel采用的是小尾表示&#xff0c;即数据的高位存储在存储器的高地址&#xff0c;低位存储在存储器的低地址&#xff0c;例如一个十六进制数据0x1234存储在内存中&#xff0c;那么该数据在内存中的存储格式为&#xff1a; 34 …

s11.1 lsof:查看进程打开的文件

功能说明 lsof 全名为list open files&#xff0c;也就是列举系统中已经被打开的文件&#xff0c;通过lsof命令&#xff0c;就可以根据文件找到对应的进程信息&#xff0c;也可以根据进程信息找到进程打开的文件。【语法格式】 lsof [option]lsof [选项]参数选项 …