【stomp 实战】spring websocket用户消息发送源码分析

这一节,我们学习用户消息是如何发送的。

消息的分类

spring websocket将消息分为两种,一种是给指定的用户发送(用户消息),一种是广播消息,即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了。

用户消息的前缀

  • 不配置的情况下,默认用户消息的前缀是/user
  • 也可以通过下面的方式来配置用户消息
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {/*** stompClient.subscribe("/user/topic/subNewMsg",...)* 这个时候,后端推送消息应该这么写* msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);* 即去掉了/user前缀*/registry.setUserDestinationPrefix(WsConstants.USER_DESTINATION_PREFIX);}
  • 默认情况下,/user是用户消息前缀,那么前端订阅的代码可以这么写
 //订阅用户消息topic1stompClient.subscribe("/user/topic/answer", function (response) {//do something});
  • 后端的发送消息的代码可以这么写,注意,在这里发送的时候,调用的convertAndSendToUser没有带/user前缀
    private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, "/topic/answer", msg);}

广播消息的前缀

  • 广播消息没有默认值,必须显示地指定
  • 配置广播消息的前缀是这么配置,通过/topic或者/queue前缀来订阅的,就是广播消息
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker("/topic", "/queue")//配置stomp协议里, server返回的心跳.setHeartbeatValue(new long[]{10000L, 10000L})//配置发送心跳的scheduler.setTaskScheduler(new DefaultManagedTaskScheduler());}
  • 前端代码可以这么写
//订阅广播消息topicstompClient.subscribe("/topic/boardCast/hello", function (response) {// do something});
  • 后端代码可以这么写
  private final SimpMessageSendingOperations msgOperations;public void echo2(Msg msg) {log.info("收到的消息为:{}", msg.getContent());msgOperations.convertAndSend("/topic/boardCast/hello", "hello boardCast Message");}

发送用户消息源码分析

用户订阅过程

发送消息,本质上就是从内存中找到注册的用户,通过用户名找到用户会话,在从用户会话中找到该用户的订阅,如果该用户有该订阅,那么就发送消息给前端。

总结一下用户和会话之间的关系,如下图
在这里插入图片描述
如果这块不太熟悉,建议回顾这篇文章,了解一下用户,用户会话,订阅之间的关系:【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

我们通过Debug来看一下,前端执行用户订阅,经历了哪些过程。
假设,当前登录用户是1001

  stompClient.subscribe("/user/topic/answer", function (response) {//do something});

该用户建立连接,并且绑定1001的用户会话后,执行后端的订阅注册
DefaultSimpUserRegistry响应订阅事件代码如下:
在这里插入图片描述
可以看到,当前的sessionId,destination

在这里插入图片描述
将订阅放到一个subscriptions的map里面。缓存在内存中。

用户消息的发送

后端代码是这么写的,我们来调试一下

    private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, "/topic/answer", msg);}

经过层层调用,发现调到了下面的方法
在这里插入图片描述
发现我们的发送目的地变成了这个:this.destinationPrefix + user + destination
通过调试时,发现值如上图所示。
也就是说,我们的发送目的,变成了/user+用户名+我们传的入参/topic/answer
然后再进入下面的代码

//AbstractMessageSendingTemplate@Overridepublic void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,@Nullable MessagePostProcessor postProcessor) throws MessagingException {//对消息进行转换,对象转字符串,或者字节数组之类的Message<?> message = doConvert(payload, headers, postProcessor);//调用Send发送send(destination, message);}

做了两个事:

  • 对消息进行转换,对象转字符串,或者字节数组之类的
  • 调用Send发送

再来看下send方法

	@Overridepublic void send(D destination, Message<?> message) {doSend(destination, message);}

再调用doSend,由子类SimpMessagingTemplate实现。

//SimpMessagingTemplate@Overrideprotected void doSend(String destination, Message<?> message) {Assert.notNull(destination, "Destination must not be null");SimpMessageHeaderAccessor simpAccessor =MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);if (simpAccessor != null) {if (simpAccessor.isMutable()) {simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);simpAccessor.setImmutable();sendInternal(message);return;}else {// Try and keep the original accessor typesimpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);initHeaders(simpAccessor);}}else {simpAccessor = SimpMessageHeaderAccessor.wrap(message);initHeaders(simpAccessor);}simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());sendInternal(message);}

其中最关键的是sendInternal

private void sendInternal(Message<?> message) {String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());Assert.notNull(destination, "Destination header required");long timeout = this.sendTimeout;boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));if (!sent) {throw new MessageDeliveryException(message,"Failed to send message to destination '" + destination + "' within timeout: " + timeout);}}

然后再通过messageChannel来发送此条消息。

//AbstractMessageChannel@Overridepublic final boolean send(Message<?> message, long timeout) {Assert.notNull(message, "Message must not be null");Message<?> messageToUse = message;ChannelInterceptorChain chain = new ChannelInterceptorChain();boolean sent = false;try {messageToUse = chain.applyPreSend(messageToUse, this);if (messageToUse == null) {return false;}sent = sendInternal(messageToUse, timeout);chain.applyPostSend(messageToUse, this, sent);chain.triggerAfterSendCompletion(messageToUse, this, sent, null);return sent;}catch (Exception ex) {chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);}catch (Throwable err) {MessageDeliveryException ex2 =new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);throw ex2;}}
  • 构造了一个拦截链,在发送前,可以进行前置处理和后置处理。这个拦截链就是扩展的关键了。我们可以定义自己的拦截器,在发送消息前后进行拦截处理。这里spring给我们的扩展点。
  • 通过sendInternal将消息发送出去

再来看下sendInternal方法,进入子类ExecutorSubscribableChannel

//ExecutorSubscribableChannel@Overridepublic boolean sendInternal(Message<?> message, long timeout) {for (MessageHandler handler : getSubscribers()) {SendTask sendTask = new SendTask(message, handler);if (this.executor == null) {sendTask.run();}else {this.executor.execute(sendTask);}}return true;}

可以看到,通过这个Channel,找到messageHandler,这个messageHandler有多个,依次将消息进行处理。
在这里插入图片描述
这里取到的有两个messageHandler

  • SimpleBrokerMessageHandler
  • UserDestinationMessageHandler

进入SendTask,看一下run方法

//
public void run() {Message<?> message = this.inputMessage;try {message = applyBeforeHandle(message);if (message == null) {return;}this.messageHandler.handleMessage(message);triggerAfterMessageHandled(message, null);}catch (Exception ex) {triggerAfterMessageHandled(message, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;throw new MessageDeliveryException(message, description, ex);}catch (Throwable err) {String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);triggerAfterMessageHandled(message, ex2);throw ex2;}
}

这里的关键点是:this.messageHandler.handleMessage(message);
首先会进入SimpleBrokerMessageHandler的handleMessage
在这里插入图片描述
可以看到,这里直接跳出去了。
SimpleBrokerMessageHandler的作用就是,看是不是我们配置的广播消息的前缀,要满足这个条件,才能发送消息。我们配置的前缀是/topic,/queue,这里destination前缀是/user,所以提前返回,不处理。
然后,我们还有一个UserDestinationMessageHandler会继续处理。

在这里插入图片描述
这里对destination进行了处理,发现生成了一个result对象,这里解析出一个targetDestinations,可以看到我们的destination变成了下面的样子
/topic/answer-usero2zuy4zg

  • 这个的构成实际上就是把/user前缀去掉
  • 然后加上-user,后面加上sessionId,就是当前会话的id
  • 最后再以这个新生成的targetDestination,将消息发送出去!
    在这里插入图片描述

这里的messagingTemplate,就是SimpMessagingTemplate。又会回到上面分析的代码。

  • SimpMessagingTemplate调用messageChannel来发送消息
  • messageChannel中会取得两个messageHandler来处理。
    像不像递归调用?
    不过这一次由于我们的destination已经变成了/topic/answer-usero2zuy4zg。这时候,在进入SimpleBrokerMessageHandler时,情况就不一样了

在这里插入图片描述
由于destination变成了/topic开头的,此时我们不会跳出去,会找到用户(-user后面跟了SessionId)订阅,将消息发送出去

可以看到,我们找到了一个用户订阅。在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其实是每个用户订阅时,会将/user前缀去掉,将用户的destination改写成了如下形式,
/user/topic/hello->/topic/hello-user{sessionId}
所以,经过UserDestinationMessageHandler处理,改写后的destination可以通过destination找到用户会话,将此消息发送出去。
到此,我们的用户消息的发送就分析完了

总结

发送用户消息的整个过程如下:

  • SimpMessageSendingOperations.convertAndSendToUser接口发送用户消息,这里不传/user前缀,注意一下
  • 接着SimpMessagingTemplate进行消息的发送
  • SimpMessagingTemplate会交由MessageChannel
  • MessageChannel将会调用MessageHandler来处理消息,有以下两个MessageHandler
    • SimpleBrokerMessageHandler
    • UserDestinationMessageHandler
  • 经过MessageHandler的处理,destination由/user/topic/answer,变成了/topic/answer-usero2zuy4zg。
  • 改写后的destination可以找到用户会话,将此消息发送出去

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/832241.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

我们说的数据分析,到底要分析些什么?

作者 Gam 本文为CDA志愿者投稿作品 “我们说数据分析&#xff0c;到底要分析些什么&#xff1f;” 数据分析这个话题自从进入人们的视线以来&#xff0c;这个话题就成为人们茶余饭后的谈资&#xff0c;但是一千个人眼中就有一千个哈姆雷特&#xff0c;就意味着每个人对数据分…

使用Photoshop压缩图片大小的4种方法

使用Photoshop压缩图片大小&#xff0c;一般可采用下面4种方法&#xff1a; 1.调整图片分辨率&#xff1a; 打开需要压缩的图片文件。 依次点击菜单栏中的“图像”>“图像大小”。 在弹出的对话框中&#xff0c;通过调整分辨率参数来减小文件大小。 2.降低图片品质&#…

什么是水经微图注册码?

水经微图&#xff08;以下简称“微图”&#xff09;注册码&#xff0c;是微图的一种授权方式。 什么是微图注册码&#xff1f; 注册码仅可授权一台电脑&#xff0c;绑定CPU和网卡&#xff0c;激活后不可更换电脑使用。 如果CPU或网卡被更换&#xff0c;以及电脑损坏无法开机…

数据库中索引的底层原理和SQL优化

文章目录 关于索引B 树的特点MySQL 为什么使用 B 树&#xff1f; 索引分类聚簇索引 和 非聚簇索引覆盖索引索引的最左匹配原则索引与NULL索引的代价大表结构修改 SQL优化EXPLAIN命令选择索引列其它细节 关于索引 索引是一种用来加快查找效率的数据结构&#xff0c;可以简单粗暴…

卸载、安装、配置快捷mysql

卸载mysql 1、筛选过滤出mysql相关组件 rpm -qa | grep mysql2、关闭MySQL服务 systemctl stop mysql.service 3、卸载对应组件命令如下&#xff1a; rpm -ev --nodeps [显示的组件名称] 4、查找MySQL对应的所有文件夹 find / -name mysql rm -rf [显示的文件夹路径] 检查…

基于若依框架搭建网站的开发日志(一):若依框架搭建、启动、部署

RuoYi&#xff08;基于SpringBoot开发的轻量级Java快速开发框架&#xff09; 链接&#xff1a;开源地址 若依是一款开源的基于VueSpringCloud的微服务后台管理系统&#xff08;也有SpringBoot版本&#xff09;&#xff0c;集成了用户管理、权限管理、定时任务、前端表单生成等…

linux的基础入门(2)

环境变量 在Shell中&#xff0c;正确的赋值语法是没有空格的&#xff0c;即变量名数值。所以&#xff0c;正确的方式是&#xff1a; tmpshy 这样就将变量tmp赋值为"shy"了。 注意&#xff1a;并不是任何形式的变量名都是可用的&#xff0c;变量名只能是英文字母、…

【neteq】tgcall的调用、neteq的创建及接收侧ReceiveStatisticsImpl统计

G:\CDN\P2P-DEV\Libraries\tg_owt\src\call\call.cc基本是按照原生webrtc的来的:G:\CDN\P2P-DEV\tdesktop-offical\Telegram\ThirdParty\tgcalls\tgcalls\group\GroupInstanceCustomImpl.cpptg对neteq的使用 worker 线程创建call Call的config需要neteqfactory Call::CreateAu…

Java中使用RediSearch进行高效数据检索

RediSearch是一款构建在Redis上的搜索引擎&#xff0c;它为Redis数据库提供了全文搜索、排序、过滤和聚合等高级查询功能。通过RediSearch&#xff0c;开发者能够在Redis中实现复杂的数据搜索需求&#xff0c;而无需依赖外部搜索引擎。本文将介绍如何在Java应用中集成并使用Red…

300V直流充电桩测试有哪些实验项目

300V直流充电桩测试的实验项目主要包括以下几个方面&#xff1a; 1. 电气性能测试&#xff1a; - 输入电压范围测试&#xff1a;检查充电桩在不同输入电压下的正常工作情况。 - 输出电压范围测试&#xff1a;检查充电桩在不同输出电压下的正常工作情况。 - 输出电流范围测试…

MySQL#MySql数据库的操作

目录 一、创建数据库 二、字符集和校验规则 1.查看系统默认字符集以及校验规则 2.查看数据库支持的字符集 3.查看数据库支持的字符集校验规则 4.校验规则对数据库的影响 1.以UTF-8格式创建数据库 2.不区分大小写 3.区分大小写 4 大小写对数据库的影响 三、操纵数据…

流程:采集1688店铺内有成交的商品列表||1688商品订单列表+订单详情API接口

此API目前支持以下基本接口&#xff1a; item_get 获得1688商品详情item_search 按关键字搜索商品item_search_img 按图搜索1688商品&#xff08;拍立淘&#xff09;item_search_suggest 获得搜索词推荐item_fee 获得商品快递费用seller_info 获得店铺详情item_search_shop 获得…

沃伦·巴菲特将AI比做原子弹:“瓶中精灵”使诈骗成为最快增长产业|TodayAI

在伯克希尔哈撒韦公司的年度股东大会上&#xff0c;投资大师沃伦巴菲特对人工智能的未来提出了严重警告。巴菲特对这项可以模拟现实并产生误导性内容的技术表示担忧&#xff0c;他认为这将成为史上增长最快的行业之一。 巴菲特在会上说&#xff1a;“当你思考诈骗人们的潜力时…

NCBI GEO王炸:GEO2R直接分析RNA-seq数据,几家欢喜几家愁?

GEO2R是NCBI GEO团队针对上传到GEO的芯片数据开发的一款在线差异分析、可视化作图工具&#xff0c;是广大数据分析人员的福音。然而&#xff0c;一直以来GEO2R仅针对芯片数据&#xff0c;对于越来越多的测序数据&#xff0c;只能下载所上传的matrix矩阵&#xff0c;进行分析&am…

ENZO:Insulin(胰岛素)ELISA kit

胰岛素是一种由胰岛β细胞合成分泌的肽类激素。它促进葡萄糖从血液中吸收到组织中&#xff0c;并以糖原和脂肪的形式储存起来&#xff1b;此外胰岛素还抑制肝脏产生葡萄糖。通过控制葡萄糖水平&#xff0c;胰岛素成为脂肪和碳水化合物代谢的调节器。当由于缺乏胰岛素分泌而失去…

成都最大视听产业园更新数字影像创作空间,影像技术打造沉浸式体验

国际数字影像产业园&#xff0c;位于成都金牛区的心脏地带&#xff0c;借助数字影视制作技术的力量&#xff0c;努力“破墙而出”&#xff0c;耕耘数字影像技术研发和创新创作的热土。 随着科技的飞速发展&#xff0c;数字影像已经成为成都文创产业的重要组成部分。为了满足国际…

航天系统工程介绍

01 什么是系统工程 系统工程&#xff08;SE&#xff09;是一个视角、一个流程、一门专业&#xff0c;正如以下三种代表性定义所阐明&#xff1a; 系统工程是一种使系统能成功实现的跨学科的方法和手段。系统工程专注于&#xff1a;在开发周期的早期阶段定义客观需要与所要求的…

【考研数学】只做《1800》和真题,能不能考90分?

网上的经验贴推荐的习题册&#xff0c;说自己刷完以后最后考了130 你以为自己也能和大佬一样&#xff0c;刷完数学也能考那么高 但实际上很多人是1800、660、880刷题刷下来最后考了四五十的 他们也不会在网上说 刷题多少&#xff0c;或者是刷哪一本习题册并不能衡量你最后能…

实力登榜!蓝卓入选2024中国未来独角兽TOP100企业

日前&#xff0c;由民建中央、中国科协指导&#xff0c;民建浙江省委会、中国投资发展促进会联合主办的第八届万物生长大会中国未来独角兽大会在杭州举办。 会上&#xff0c;《2024中国未来独角兽TOP100榜单》隆重揭晓。其中&#xff0c;蓝卓在全国参评企业中脱颖而出&#xff…