【MQ03】发布订阅模式

发布订阅模式

上一回我们已经学习了最典型的消息队列的应用。接下来,我们就要学习到的是消息队列中的另一个非常常见的模式。这个模式其实也是一种设计模式,它叫做发布订阅模式。之前我们学习过的,一个叫生产者,一个叫消费者。而到了这边,我们将生产者改个名字叫做发布者,它们两者之间可以看成是完全一样的。而消费者则变成了订阅者,这个就有很大的不同了。

发布订阅

对于传统的模式来说,一个消费者消费一条消息,这条消息被消费之后就不会再次被其它的消费者消费。而在发布订阅模式中,一条消息是可以被多个消费者消费的,这些消费者其实相当于是订阅了这条队列的消息。当有新的消息出现在队列中,就会像广播一样让所有订阅者都获得这条消息。

这种功能的应用场景是?假设我们有一个电商系统。当客户下单之后,是不是要马上通知商家、并且客户自己也会收到相应的订单确认信息。或者我们可以这样理解,一个事件被触发后,需要激活多个其它的功能。

如果是传统的同步代码,我们需要这样写:

// 下订单
// 订单入库
// 商家发送消息
// 商家发送邮件
// 客户发送消息
// 客户发送邮件

而使用发布订阅模式的话,我们就可以拆分为两个部分。下订单流程在最后直接调用发布者负责发出订单号即可。

// 下订单
// 订单入库
// 发布者发布消息 publish(订单号)

发布者到这里就结束了。我们的主订单流程就可以返回成功的信息了。之后的操作,就通过异步,让相应的订阅者去实现吧。

// 订阅者一,获取订单号,发送消息// 订阅者二,获取订单号,发送邮件// 订阅者三,获取订单号,向客户发送消息// 订阅者四,获取订单号,向客户发送邮件

不管是性能还是业务逻辑,其实这样的处理都是更好的。为什么呢?业务解耦的核心就是核心业务代码和非核心业务代码的分离。比如说在下订单的流程中,订单流程是最核心的部分,我们要保证这个过程的顺畅和无误。而下单之后的消息通知,说实话,并不是最核心的流程。即使没有通知,大家也可以通过客户端或者系统进行自主的查询。

总之,我们要实现的就是,基于一个事件传递出来的消息,可以通过不同的客户端进行消费。这就是发布订阅模式。

RabbitMQ实现

RabbitMQ 中,有交换机这一概念。交换机中,又有几种交换模式,其中,fanout 扇出交换,就是一个典型的发布订阅模式。和之前的例子中不同的就是,我们会多出一个定义交换机的步骤。还是先来看一下信息的发送方,之前我们叫做生产者,现在叫做发布者的代码。

// 3.rq.p.php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;require_once "./vendor/autoload.php";$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();// 定义交换机
$channel->exchange_declare('orders', 'fanout', false, false, false);$data = '订单号:' . time();
$msg = new AMQPMessage($data);// 注意,这里是指定的交换机,第三个参数还是队列名,之前普通队列我们指定的是第三个参数
$channel->basic_publish($msg, 'orders');echo '[x] 发送消息 ', $data, '\n';$channel->close();
$connection->close();

不同的地方已经在注释中说明了。如果用 Laravel 或者 TP 框架来类比的话,交换机可以看作是一个路由,而队列就是我们的控制器。由交换机来决定我们的数据应该放到哪个队列或者去哪个队列去取。扇出的意思就是和该交换机相关的队列都会收到相同的一份消息数据。我们在上面的代码中,以及后面的订阅者中都不会指定具体队列名,这样的话,RabbitMQ 就会自动生成队列,不需要我们过多去关注具体是使用哪一个队列。

好了,消费者,现在我们叫做订阅者的代码也改动不大,但这回有两个订阅者,一个是发送站内应用消息,一个是发送短信。

// 3.rq.c.sms.php
use PhpAmqpLib\Connection\AMQPStreamConnection;require_once "./vendor/autoload.php";$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();$channel->exchange_declare('orders', 'fanout', false,false,false);
// 使用空队列名,由 RabbitMQ 生成随机队列名
[$queue_name, ,] = $channel->queue_declare('',false, false, true,false);
// 队列绑定到 orders 交换机
$channel->queue_bind($queue_name, 'orders');echo "[x] 等待数据,退出请按 CTRL+C\n";$callback = function($msg) {echo '[x] 接收到 ', $msg->body, ",开始向相关方发送短信....\n";
};$channel->basic_consume($queue_name, '', false, true, false, false, $callback);while($channel->is_open()){$channel->wait();
}$channel->close();
$connection->close();// 3.rq.c.msg.php
use PhpAmqpLib\Connection\AMQPStreamConnection;require_once "./vendor/autoload.php";$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();$channel->exchange_declare('orders', 'fanout', false,false,false);
// 使用空队列名,由 RabbitMQ 生成随机队列名
[$queue_name, ,] = $channel->queue_declare('',false, false, true,false);
// 队列绑定到 orders 交换机
$channel->queue_bind($queue_name, 'orders');echo "[x] 等待数据,退出请按 CTRL+C\n";$callback = function($msg) {echo '[x] 接收到 ', $msg->body, ",开始向相关方发送站内消息....\n";
};$channel->basic_consume($queue_name, '', false, true, false, false, $callback);while($channel->is_open()){$channel->wait();
}$channel->close();
$connection->close();

当然,我们并没有真实的发送信息,这里只是通过伪代码的方式直接输出了一下。这两段代码唯一的差别其实就是在回调函数中的 echo 内容不同。

好了,现在我们有了一个发布者和两个订阅者。接下来就可以开始测试了。

# 命令行1
> php 3.rq.c.msg.php  
[x] 等待数据,退出请按 CTRL+C# 命令行2
> php 3.rq.c.sms.php 
[x] 等待数据,退出请按 CTRL+C

分别运行起两个订阅者之后,它们就进入了监听模式,等待消息队列中的数据。那么我们就来调用发布者进行发布吧。

> php 3.rq.p.php 
[x] 发送消息 订单号:1672212730\n%

赶紧看看订阅者那边吧。

# 命令行1
> php 3.rq.c.msg.php  
[x] 等待数据,退出请按 CTRL+C
[x] 接收到 订单号:1672212730,开始向相关方发送站内消息....# 命令行2
> php 3.rq.c.sms.php 
[x] 等待数据,退出请按 CTRL+C
[x] 接收到 订单号:1672212730,开始向相关方发送短信....

两个订阅者同时都接收到数据并且开始处理了。大家可以继续测试调用发布者进行消息发送,每次两个订阅者都会马上收到消息并进行处理。同样的,也可以再添加更多的订阅者来处理更多的业务场景。

Redis 实现

使用 RabbitMQ 实现发布订阅模式很简单吧,但使用 Redis 更简单,总共只需要两个方法,几行代码就可以实现。

// 3.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);$data = '订单号:' . time();$redis->publish('orders', $data);echo '[x] 发送消息 ', $data, '\n';

发布者就是调用一个 publish() 方法就可以了,这个在 redis-cli 中也是有相应的命令行的,之前我们在 Redis 系统中都学过。第一个参数是发布的频道名称,第二个是具体的数据内容。

// 3.rs.c.sms.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // 要设置连接超时时间,要不一会就断了echo "[x] 等待数据,退出请按 CTRL+C\n";$redis->subscribe(['orders'], function($r,$c,$msg){echo '[x] 接收到 ', $msg, ",开始向相关方发送短信....\n";
});// 3.rs.c.msg.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // 要设置连接超时时间,要不一会就断了echo "[x] 等待数据,退出请按 CTRL+C\n";$redis->subscribe(['orders'], function($r,$c,$msg){echo '[x] 接收到 ', $msg, ",开始向相关方发送站内消息....\n";
});

订阅者只需要实现 subscribe() 方法,而且这个方法是直接就会挂起当前应用程序的,不需要我们再使用 while 来做死循环挂起。一个 subscribe() 方法可以监听多个发布频道,所以它的第一个参数是数组。第二个参数就是一个回调函数,这个函数有三个参数,分别是 redis实例、频道名称、消息内容 。

在这里还需要注意的是,Redis 在使用 subscribe() 挂起程序的时候,要设置一下连接超时时间,要不过一会超过默认的连接超时时间后就会断开连接了。设置成 -1 就是一直保持连接,就像长连接一样。如果是在生产环境,也可以在外面再套一层循环,然后 try..catch 一下 subscribe() ,这样当连接中断之后,可以通过死循环挂起再次调用 connect() 连接服务器。这个大家有兴趣或者确实需要用到的话,可以自己再找一下相关资料哦。

总结

使用发布订阅模式时需要注意的一点是,如果我们的订阅者是在消息发布之后才开始订阅的,那么之前发布的消息是没有办法进行消费的。也就是说,一条消息数据,只对当时已经订阅的客户端会发送数据,就像广播一样,如果你现在打开了收音机,正在听某个频道,那么你就能听到这个频道里面正在播出的内容。而如果你根本就没有打开收音机,或者根本没有调到指定的频道,自然也就听不到当前正在播放的内容啦。

不过也有例外,之前我们学习过,Redis 中的 Stream 也是一种发布订阅模式的实现,而且它的消费数据是不会删除的,新的订阅者可以选择性地消费之前的内容。RabbitMQ 中没有这样的功能。

测试代码:

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/3.rq.c.msg.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/3.rq.c.sms.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/3.rq.p.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/3.rs.c.msg.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/3.rs.c.sms.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/3.rs.p.php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/661142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频融合平台EasyCVR推流成功但平台显示不在线是什么原因?

TSINGSEE青犀视频监控汇聚平台EasyCVR可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力&…

环形链表找入环点----链表OJ---三指针

https://leetcode.cn/problems/linked-list-cycle-ii/description/?envTypestudy-plan-v2&envIdtop-100-liked 首先,需要判断是否有环,而这里我们不单纯判断是否有环,还要为下一步做准备,需要让slow指针和fast都从头结点开始…

使用pygame建立一个简单的使用键盘方向键移动的方块小游戏

import pygame import sys# 初始化pygame pygame.init()# 设置窗口大小 screen_size (640, 480) # 创建窗口 screen pygame.display.set_mode(screen_size) # 设置窗口标题 pygame.display.set_caption("使用键盘方向键移动的方块的简单小游戏")# 设置颜色 bg_colo…

RabbitMQ实战指南(三)—— 高级特性

RabbitMQ实战指南(三)—— 高级特性 RabbitMQ是一个功能强大的消息队列系统,提供了许多高级特性来满足各种消息传递的需求。下面是一些常用的高级特性的详细描述和代码示例: 详细描述 1.TTL(Time-To-Live)…

MQ回顾之rabbitmq速通

rabbitMQ相对来说功能比较完善,吞吐量会低一点。 持续更新…… 安装 docker 测试选择docker安装 官方安装操作 1、docker pull rabbitmq:latest 2、docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq 3、docker…

springboot mapstruct

官网 官网 官方例子 1.POM <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.ap…

transformer_正余弦位置编码代码笔记

transformer_正余弦位置编码代码笔记 transformer输入的序列中&#xff0c;不同位置的相同词汇可能会表达不同的含义&#xff0c;通过考虑位置信息的不同来区分序列中不同位置的相同词汇。 位置编码有多种方式&#xff0c;此处仅记录正余弦位置编码 正余弦位置编码公式如下&…

idea+javafx的真正打包方式

概述&#xff1a;看到网上很多乱说javafx如何打包的&#xff0c;这里写一篇真正可以打包javafx的。 注&#xff1a;使用java17即使里面没有javafx&#xff0c;也是可以运行javafx项目的&#xff0c;只要你们按照下面一步一步来即可。 第一步&#xff1a;编写一个类去调用主类…

ArXiv| Graph-Toolformer: 基于ChatGPT增强提示以赋予大语言模型图数据推理能力

ArXiv| Graph-Toolformer: 基于ChatGPT增强提示以赋予大语言模型图数据推理能力. 来自加利福利亚大学戴维斯分校计算机科学系的IFM实验室发表在arXiv上的文章:“Graph-ToolFormer: To Empower LLMs with Graph Reasoning Ability via Prompt Augmented by ChatGPT”。 文章的…

2.1总结

还是一样水更一天&#xff0c;就随便做了几个题&#xff0c;有一个周期有点长&#xff0c;后面更一篇长的 随手刷的一道水题&#xff0c;就不往今天的行程单添了 问题&#xff1a;最大公约数 题解&#xff1a;题目太水了&#xff0c;就是求三个数&#xff0c;其中两组的最大公…

PyTorch识别验证码

## 一、生成测试集数据pip install captcha common.py import random import time captcha_array list("0123456789abcdefghijklmnopqrstuvwxyz") captcha_size 4from captcha.image import ImageCaptchaif __name__ __main__:for i in range(10):image ImageC…

2024美赛数学建模C题思路源码

比赛当天第一时间更新&#xff01; 赛题目的 赛题目的&#xff1a; 问题描述&#xff1a; 解题的关键&#xff1a; 问题一. 问题分析 问题解答 问题二. 问题分析 问题解答 问题三. 问题分析 问题解答 问题四. 问题分析 问题解答 问题五. 问题分析 问题解答

this.$store undefined

报错&#xff1a;vuex报错 this.$store显示undefined&#xff0c;可能存在的问题&#xff0c;从以下几个方向排查 1、查看store文件中的vuex实例对象是否暴漏 2、main.js中是否注入store 3、如果上边均没问题&#xff0c;打开package.json&#xff0c;查看vue与vuex的版本&am…

RT-Thread: STM32F103的DAC 操作、DAC驱动

说明&#xff1a;文档记录基于RT-Thread的STM32F103外设DAC操作。 1.DAC的GPIO配置 函数位于 drivers\stm32f1xx_hal_msp.c //DAC底层驱动&#xff0c;时钟配置&#xff0c;引脚 配置 //此函数会被HAL_DAC_Init()调用 //hdac:DAC句柄 void HAL_DAC_MspInit(DAC_HandleTypeDe…

el-upload子组件上传多张图片(上传为files或base64url)

场景&#xff1a; 在表单页&#xff0c;有图片需要上传&#xff0c;表单的操作行按钮中有上传按钮&#xff0c;点击上传按钮。 弹出el-dialog进行图片的上传&#xff0c;可以上传多张图片。 由于多个表单页都有上传多张图片的操作&#xff0c;因此将上传多图的el-upload定义…

Web中的转发与重定向

转发与重定向 一、转发和重定向的概念1.转发2.重定向 二、JavaWeb 中的转发和重定向三、SpringMVC 中的转发和重定向1.转发(1) 默认的方式(2) 完整的方式 2.重定向 四、总结 一、转发和重定向的概念 在 Web 应用中&#xff0c;转发和重定向都是用于将请求从一个页面传递到另一…

09-信息收集-APP及其他资产等

信息收集-APP及其他资产等 信息收集-APP及其他资产等一、APP提取季抓包及后续配合1、某APK一键提取反编译2、利用bp抓取更多URL 二、某IP无web框架下的第三方测试1、各种端口一顿乱扫 —— 思路2、各种接口一顿乱扫 —— 思路3、接口部分一顿测试 —— 思路 三、**案例演示**1、…

【Redis笔记】缓存——缓存分类、更新策略、缓存穿透、缓存雪崩、缓存击穿

缓存 缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于高速存储媒介上。 缓存的本质就是用空间换时间&#xff0c;牺牲数据的实时性&#xff0c;以服务器内存中的数据暂时代替从数据库读取最新的数据&#xff0c;减少数据库IO&#…

民事二审案件庭审应如何准备?

一、你要明确审理范围&#xff0c;固定上诉请求 首先&#xff0c;第二审人民法院围绕当事人的上诉请求进行审理。 其次&#xff0c;在第二审程序中&#xff0c;原审原告增加独立的诉讼请求或者原审被告提出反诉的&#xff0c;第二审人民法院可以根据当事人自愿的原则就新增加的…

技术革新与市场需求:探索亚信安慧AntDB的发展之路

在这个信息爆炸的时代&#xff0c;企业对数据处理的需求日益增长&#xff0c;而传统的数据库系统往往难以应对海量数据的存储和处理。亚信安慧AntDB的出现&#xff0c;为解决这一难题提供了有力的工具。它不仅具备高吞吐、高并发、高性能的特点&#xff0c;还拥有极佳的扩展性和…