Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)

应用场景

我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次。然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送。对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理。

为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用。RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍以下如何利用Spring Cloud Stream以及RabbitMQ轻松的处理上述问题。

动手试试

插件安装

关于RabbitMQ延迟消息的插件介绍可以查看官方网站:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

安装方式很简单,只需要在这个页面:http://www.rabbitmq.com/community-plugins.html 中找到rabbitmq_delayed_message_exchange插件,根据您使用的RabbitMQ版本选择对应的插件版本下载即可。

注意:只有RabbitMQ 3.6.x以上才支持

在下载好之后,解压得到.ez结尾的插件包,将其复制到RabbitMQ安装目录下的plugins文件夹。

然后通过命令行启用该插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

该插件在通过上述命令启用后就可以直接使用,不需要重启。

另外,如果您没有启用该插件,您可能为遇到类似这样的错误:

ERROR 156 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)

应用编码

下面通过编写一个简单的例子来具体体会一下这个属性的用法:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}

@Slf4j
@RestController
static class TestController {

@Autowired
private TestTopic testTopic;

/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
log.info("Send: " + message);
testTopic.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 5000).build());
return "ok";
}

}

/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload);
}

}

interface TestTopic {

String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

}

内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了一条消息,一条消息的头信息中包含了x-delay字段,该字段用来指定消息延迟的时间,单位为毫秒。所以上述代码发送的消息会在5秒之后被消费。在消息监听类TestListener中,对TestTopic.INPUT通道定义了@StreamListener,这里会对延迟消息做具体的逻辑。由于消息的消费是延迟的,从而变相实现了从消息发送那一刻起开始的定时任务。

在启动应用之前,还要需要做一些必要的配置,下面分消息生产端和消费端做说明:

消息生产端

spring.cloud.stream.bindings.example-topic-output.destination=delay-topic
spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true

注意这里的一个新参数spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange,用来开启延迟消息的功能,这样在创建exchange的时候,会将其设置为具有延迟特性的exchange,也就是用到上面我们安装的延迟消息插件的功能。

消息消费端

spring.cloud.stream.bindings.example-topic-input.destination=delay-topic
spring.cloud.stream.bindings.example-topic-input.group=test
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.delayed-exchange=true

在消费端也一样,需要设置spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true。如果该参数不设置,将会出现类似下面的错误:

ERROR 9340 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay-topic' in vhost '/': received 'topic' but current is ''x-delayed-message'', class-id=40, method-id=10)

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2019-01-02 23:28:45.318  INFO 96164 --- [ctor-http-nio-3] c.d.s.TestApplication$TestController     : Send: hello
2019-01-02 23:28:45.328 INFO 96164 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-01-02 23:28:45.333 INFO 96164 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#5c5f9a03:0/SimpleConnection@3278a728 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 53536]
2019-01-02 23:28:50.349 INFO 96164 --- [ay-topic.test-1] c.d.stream.TestApplication$TestListener : Received: hello

从日志中可以看到,Send: helloReceived: hello两条输出之间间隔了5秒,符合我们上面编码设置的延迟时间。

深入思考

在代码层面已经完成了定时任务,那么我们如何查看延迟的消息数等信息呢?

此时,我们可以打开RabbitMQ的Web控制台,首先可以进入Exchanges页面,看看这个特殊exchange,具体如下:

可以看到,这个exchange的Type类型是x-delayed-message。点击该exchange的名称,进入详细页面,就可以看到更多具体信息了:

代码示例

本文示例读者可以通过查看下面仓库的中的stream-delayed-message项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477050.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1380. 矩阵中的幸运数(set)

1. 题目 给你一个 m * n 的矩阵&#xff0c;矩阵中的数字 各不相同 。请你按 任意 顺序返回矩阵中的所有幸运数。 幸运数是指矩阵中满足同时下列两个条件的元素&#xff1a; 在同一行的所有元素中最小在同一列的所有元素中最大 示例 1&#xff1a; 输入&#xff1a;matrix …

GARFIELD@10-07-2004

tit for tat转载于:https://www.cnblogs.com/rexhost/archive/2004/10/07/49560.html

DeepMind 发了篇论文,把我看笑了

文 | severus近日&#xff0c;曾开发出举世瞩目的 AlphaGo 的 DeepMind&#xff0c;在 ArXiv 上发表了一篇文章&#xff0c;名为&#xff1a;Meaning without reference in large language models文中提到&#xff0c;大参数规模的语言模型是已经具备了部分类人智能的&#xff…

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

应用场景 前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略&#xff1a; 自动重试&#xff1a;对于一些因环境原因&#xff08;如&#xff1a;网络抖动等不稳定因素&#xff09;引发的问题可以起到比较好的作用&#xff0c;提高消息处理的成功率。自定义错误…

LeetCode 1382. 将二叉搜索树变平衡(中序遍历+二分递归)

1. 题目 给你一棵二叉搜索树&#xff0c;请你返回一棵 平衡后 的二叉搜索树&#xff0c;新生成的树应该与原来的树有着相同的节点值。 如果一棵二叉搜索树中&#xff0c;每个节点的两棵子树高度差不超过 1 &#xff0c;我们就称这棵二叉搜索树是 平衡的 。 如果有多种构造方…

电影:『新警察故事』

【电影名称】&#xff1a;『新警察故事』 【主 演】&#xff1a; 成龙 谢霆锋 杨采妮 蔡卓妍 吴彦祖  【导 演】&#xff1a; 陈木胜 【内容简介】&#xff1a;《新警察故事》是成龙英皇电影公司的处女作&#xff0c;投资超过1亿6000万港元&#xff0c;请来香港顶尖电影…

NLP顶级赛事LIC2022霸榜经验分享!

语言是人类传递信息最重要的媒介&#xff0c;让机器理解语言并进行交互是人工智能的重要挑战。为推动语言与智能领域的技术发展和应用&#xff0c;中国中文信息学会、中国计算机学会和百度公司连续五年联合举办“语言与智能技术竞赛”&#xff0c;为中文NLP研究者和开发者提供同…

Spring Cloud Zuul中使用Swagger汇总API接口文档

有很多读者问过这样的一个问题&#xff1a;虽然使用Swagger可以为Spring MVC编写的接口生成了API文档&#xff0c;但是在微服务化之后&#xff0c;这些API文档都离散在各个微服务中&#xff0c;是否有办法将这些接口都整合到一个文档中&#xff1f;之前给大家的回复都只是简单的…

LeetCode 1381. 设计一个支持增量操作的栈(deque/数组)

1. 题目 请你设计一个支持下述操作的栈。 实现自定义栈类 CustomStack &#xff1a; CustomStack(int maxSize)&#xff1a;用 maxSize 初始化对象&#xff0c;maxSize 是栈中最多能容纳的元素数量&#xff0c;栈在增长到 maxSize 之后则不支持 push 操作。void push(int x)…

GARFIELD@10-31-2004

apprentice转载于:https://www.cnblogs.com/rexhost/archive/2004/10/31/59013.html

Spring Cloud构建微服务架构:分布式服务跟踪(入门)【Dalston版】

通过之前的N篇博文介绍&#xff0c;实际上我们已经能够通过使用它们搭建起一个基础的微服务架构系统来实现我们的业务需求了。但是&#xff0c;随着业务的发展&#xff0c;我们的系统规模也会变得越来越大&#xff0c;各微服务间的调用关系也变得越来越错综复杂。通常一个由客户…

有哪些值得计算机专业学生加入的国企?

文 |重庆搬砖喵知乎知乎上最近有个问题很火&#xff1a;有哪些值得计算机专业学生加入的国企&#xff1f;这个问题确实很应今年秋招的景&#xff0c;于是转载了知乎答主重庆搬砖喵 的高赞回答分享给大家。原回答链接&#xff1a;https://www.zhihu.com/question/285730093/answ…

上海著名综合性商厦一览 (1)

上海著名综合性商厦一览 jxjb 2004-10-29 11:53:21 发表于搜狐焦点上海房地产网-谈房论市-东方康洛论坛 主要包括&#xff1a;第一百货东楼 华联商厦 置地广场 友谊欧洲商城 东方商厦 港汇广场 汇金百货 梅龙镇广场 中环广场二百永新 正大广场 上海第一八佰伴友谊南方商城…

LeetCode 1383. 最大的团队表现值(贪心,优先队列,难)

1. 题目 公司有编号为 1 到 n 的 n 个工程师&#xff0c;给你两个数组 speed 和 efficiency &#xff0c;其中 speed[i] 和 efficiency[i] 分别代表第 i 位工程师的速度和效率。 请你返回由最多 k 个工程师组成的 ​​​​​​最大团队表现值 &#xff0c;由于答案可能很大&am…

程序员坐牢了,会被安排去写代码吗?

文 | 无念源 | 知乎今天给大家分享一篇有意思的爽文&#xff0c;但也是根据多年之前一个真实报道改编而来的。本文字数较多&#xff0c;建议先收藏&#xff0c;上下班路上、带薪上厕所、浑水摸鱼时再慢慢看~本故事纯属虚构请大家不要随意模仿&#xff0c;后果自负&#xff01;因…

Dubbo将积极适配Spring Cloud生态,Spring Cloud体系或将成为微服务的不二选择!

2016年&#xff0c;我在博客中发表过一篇《微服务架构的基础框架选择&#xff1a;Spring Cloud还是Dubbo&#xff1f;》获得了很大的阅读量和转载量。在这篇文章中&#xff0c;我主要对比了Spring Cloud与Dubbo所具备的能力&#xff0c;并阐述了个人推崇Spring Cloud的原因。但…

Java sdk及tomcat安装设置

在安装好Java SDK后&#xff0c;还需要设置一些系统变量&#xff0c;系统变量的设置可以在系统属性-高级-环境变量中进行 JAVA_HOMEJAVA安装目录 CLASSPATH… 库路径&#xff0c;可以是目录或jar文件&#xff0c;如C:\j2sdk1.4.0_01\lib\dt.jar;d:\java&#xf…

剑指Offer - 面试题51. 数组中的逆序对(归并排序,求逆序对)

1. 题目 在数组中的两个数字&#xff0c;如果前面一个数字大于后面的数字&#xff0c;则这两个数字组成一个逆序对。输入一个数组&#xff0c;求出这个数组中的逆序对的总数。 示例 1: 输入: [7,5,6,4] 输出: 5限制&#xff1a; 0 < 数组长度 < 50000来源&#xff1a;力…

【小马哥】Spring Cloud系列讲座

这里推荐一个不错的Spring Cloud系列讲座&#xff0c;讲师简介如下&#xff1a; 小马哥&#xff0c;阿里巴巴技术专家&#xff0c;从事十余年Java EE 开发&#xff0c;国内微服务技术讲师。目前主要负责微服务技术推广、架构设计、基础设施、迁移等。重点关注云计算、微服务以及…

名校女教授,强迫其男博士“发生性关系多年”!索赔750万

转自 | 论文项目硕博招聘、磐创AI来源 | FOREIGNDAILY 、双一流高校等早稻田大学是日本的知名大学&#xff0c;但最近这所知名大学却引来了不小的争议。早稻田大学一名25岁的学生&#xff0c;将他的女导师和学校一起告上了法庭&#xff0c;要求他们赔偿自己750万日元&#xff…