【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

通过Spring websocket 用户校验和业务会话绑定我们学会了如何将业务会话绑定到spring websocket会话上。通过这一节,我们来分析一下会话和订阅的实现

用户会话的数据结构

SessionInfo 用户会话

用户会话定义如下:

private static final class SessionInfo {// subscriptionId -> Subscriptionprivate final Map<String, Subscription> subscriptionMap = new ConcurrentHashMap<>();public Collection<Subscription> getSubscriptions() {return this.subscriptionMap.values();}@Nullablepublic Subscription getSubscription(String subscriptionId) {return this.subscriptionMap.get(subscriptionId);}public void addSubscription(Subscription subscription) {this.subscriptionMap.putIfAbsent(subscription.getId(), subscription);}@Nullablepublic Subscription removeSubscription(String subscriptionId) {return this.subscriptionMap.remove(subscriptionId);}}
  • 用户会话中有subscriptionMap。这个表示一个会话中,可以有多个订阅,可以根据subscriptionId找到订阅。

SessionRegistry 用户会话注册

private static final class SessionRegistry {private final ConcurrentMap<String, SessionInfo> sessions = new ConcurrentHashMap<>();@Nullablepublic SessionInfo getSession(String sessionId) {return this.sessions.get(sessionId);}public void forEachSubscription(BiConsumer<String, Subscription> consumer) {this.sessions.forEach((sessionId, info) ->info.getSubscriptions().forEach(subscription -> consumer.accept(sessionId, subscription)));}public void addSubscription(String sessionId, Subscription subscription) {SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());info.addSubscription(subscription);}@Nullablepublic SessionInfo removeSubscriptions(String sessionId) {return this.sessions.remove(sessionId);}}
  • SessionRegistry 中sessions 表示多个会话。根据sessionId可以找到唯一一个会话SessionInfo

Subscription 用户订阅

	private static final class Subscription {private final String id;private final String destination;private final boolean isPattern;@Nullableprivate final Expression selector;public Subscription(String id, String destination, boolean isPattern, @Nullable Expression selector) {Assert.notNull(id, "Subscription id must not be null");Assert.notNull(destination, "Subscription destination must not be null");this.id = id;this.selector = selector;this.destination = destination;this.isPattern = isPattern;}public String getId() {return this.id;}public String getDestination() {return this.destination;}public boolean isPattern() {return this.isPattern;}@Nullablepublic Expression getSelector() {return this.selector;}@Overridepublic boolean equals(@Nullable Object other) {return (this == other ||(other instanceof Subscription && this.id.equals(((Subscription) other).id)));}@Overridepublic int hashCode() {return this.id.hashCode();}@Overridepublic String toString() {return "subscription(id=" + this.id + ")";}}

SimpUserRegistry 用户注册接口

用户注册的接口如下:

public interface SimpUserRegistry {/**根据用户名,获取到用户信息* Get the user for the given name.* @param userName the name of the user to look up* @return the user, or {@code null} if not connected*/@NullableSimpUser getUser(String userName);/**获取现在所有的注册的用户* Return a snapshot of all connected users.* <p>The returned set is a copy and will not reflect further changes.* @return the connected users, or an empty set if none*/Set<SimpUser> getUsers();/**获取在线用户数量* Return the count of all connected users.* @return the number of connected users* @since 4.3.5*/int getUserCount();/*** Find subscriptions with the given matcher.* @param matcher the matcher to use* @return a set of matching subscriptions, or an empty set if none*/Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher);}

SimpUser实际上就是代表着一个用户,我们来看其实现:LocalSimpUser的定义

	private static class LocalSimpUser implements SimpUser {private final String name;private final Principal user;private final Map<String, SimpSession> userSessions = new ConcurrentHashMap<>(1);public LocalSimpUser(String userName, Principal user) {Assert.notNull(userName, "User name must not be null");this.name = userName;this.user = user;}}

userSessions 表示当前一个用户可以对应多个会话。
这个Principal 是啥,还记得我们上一节通过Spring websocket 用户校验和业务会话绑定中,我们是怎么注册用户的吗

    private void connect(Message<?> message, StompHeaderAccessor accessor) {//1通过请求头获取到tokenString token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);//2如果token为空或者用户id没有解析出来,抛出异常,spring会将此websocket连接关闭if (StringUtils.isEmpty(token)) {throw new MessageDeliveryException("token missing!");}String userId = TokenUtil.parseToken(token);if (StringUtils.isEmpty(userId)) {throw new MessageDeliveryException("userId missing!");}//这个是每个会话都会有的一个sessionIdString simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);//3创建自己的业务会话session对象UserSession userSession = new UserSession();userSession.setSimpleSessionId(simpleSessionId);userSession.setUserId(userId);userSession.setCreateTime(LocalDateTime.now());//4关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息accessor.setUser(new UserSessionPrincipal(userSession));}

从token中解析出用户的userId,并通过下面的代码,把当前用户和会话绑定起来。一个用户实际上是可以绑定多个会话的。

 accessor.setUser(new UserSessionPrincipal(userSession));

总结一下用户和会话之间的关系,如下图
在这里插入图片描述

订阅过程的源码分析

前端订阅的代码如下

  stompClient.subscribe("/user/topic/answer", function (response) {createElement("answer", response.body);});

当后端收到订阅消息后,会由SimpleBrokerMessageHandler来处理

	@Overrideprotected void handleMessageInternal(Message<?> message) {MessageHeaders headers = message.getHeaders();String destination = SimpMessageHeaderAccessor.getDestination(headers);String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);updateSessionReadTime(sessionId);if (!checkDestinationPrefix(destination)) {return;}SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);if (SimpMessageType.MESSAGE.equals(messageType)) {logMessage(message);sendMessageToSubscribers(destination, message);}else if (SimpMessageType.CONNECT.equals(messageType)) {logMessage(message);if (sessionId != null) {if (this.sessions.get(sessionId) != null) {if (logger.isWarnEnabled()) {logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");}return;}long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);long[] heartbeatOut = getHeartbeatValue();Principal user = SimpMessageHeaderAccessor.getUser(headers);MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);initHeaders(connectAck);connectAck.setSessionId(sessionId);if (user != null) {connectAck.setUser(user);}connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());getClientOutboundChannel().send(messageOut);}}else if (SimpMessageType.DISCONNECT.equals(messageType)) {logMessage(message);if (sessionId != null) {Principal user = SimpMessageHeaderAccessor.getUser(headers);handleDisconnect(sessionId, user, message);}}else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {logMessage(message);this.subscriptionRegistry.registerSubscription(message);}else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {logMessage(message);this.subscriptionRegistry.unregisterSubscription(message);}}

当消息类型为SUBSCRIBE时,会调用subscriptionRegistry.registerSubscription(message)
接着来看下subscriptionRegistry.registerSubscription(message)

//AbstractSubscriptionRegistry@Overridepublic final void registerSubscription(Message<?> message) {MessageHeaders headers = message.getHeaders();SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);}String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);if (sessionId == null) {if (logger.isErrorEnabled()) {logger.error("No sessionId in  " + message);}return;}String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);if (subscriptionId == null) {if (logger.isErrorEnabled()) {logger.error("No subscriptionId in " + message);}return;}String destination = SimpMessageHeaderAccessor.getDestination(headers);if (destination == null) {if (logger.isErrorEnabled()) {logger.error("No destination in " + message);}return;}addSubscriptionInternal(sessionId, subscriptionId, destination, message);}

这个代码很简单,就是从消息中取出三个东西,sessionId, subscriptionId, destination,进行注册。

//DefaultSubscriptionRegistry@Overrideprotected void addSubscriptionInternal(String sessionId, String subscriptionId, String destination, Message<?> message) {boolean isPattern = this.pathMatcher.isPattern(destination);Expression expression = getSelectorExpression(message.getHeaders());Subscription subscription = new Subscription(subscriptionId, destination, isPattern, expression);this.sessionRegistry.addSubscription(sessionId, subscription);this.destinationCache.updateAfterNewSubscription(sessionId, subscription);}//其实就是添加到sessions map中。会话里把订阅添加到订阅map中public void addSubscription(String sessionId, Subscription subscription) {SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());info.addSubscription(subscription);}

其实就是添加到sessions map中。会话里把订阅添加到订阅map中

那用户和会话是如何关联起来的?
在这里插入图片描述

  • 当订阅事件发生时,取出当前的Principal( accessor.setUser(xxx)设置的),然后生成LocalSimpleUser,即用户
  • 把当前会话,添加到当前用户会话中。这样就给用户绑定好了会话了。

用户会话事件

通过Spring事件机制,管理注册用户信息和会话,包括订阅、取消订阅,会话断连。代码如下

//DefaultSimpUserRegistry@Overridepublic void onApplicationEvent(ApplicationEvent event) {AbstractSubProtocolEvent subProtocolEvent = (AbstractSubProtocolEvent) event;Message<?> message = subProtocolEvent.getMessage();MessageHeaders headers = message.getHeaders();String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);Assert.state(sessionId != null, "No session id");if (event instanceof SessionSubscribeEvent) {LocalSimpSession session = this.sessions.get(sessionId);if (session != null) {String id = SimpMessageHeaderAccessor.getSubscriptionId(headers);String destination = SimpMessageHeaderAccessor.getDestination(headers);if (id != null && destination != null) {session.addSubscription(id, destination);}}}else if (event instanceof SessionConnectedEvent) {Principal user = subProtocolEvent.getUser();if (user == null) {return;}String name = user.getName();if (user instanceof DestinationUserNameProvider) {name = ((DestinationUserNameProvider) user).getDestinationUserName();}synchronized (this.sessionLock) {LocalSimpUser simpUser = this.users.get(name);if (simpUser == null) {simpUser = new LocalSimpUser(name, user);this.users.put(name, simpUser);}LocalSimpSession session = new LocalSimpSession(sessionId, simpUser);simpUser.addSession(session);this.sessions.put(sessionId, session);}}else if (event instanceof SessionDisconnectEvent) {synchronized (this.sessionLock) {LocalSimpSession session = this.sessions.remove(sessionId);if (session != null) {LocalSimpUser user = session.getUser();user.removeSession(sessionId);if (!user.hasSessions()) {this.users.remove(user.getName());}}}}else if (event instanceof SessionUnsubscribeEvent) {LocalSimpSession session = this.sessions.get(sessionId);if (session != null) {String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);if (subscriptionId != null) {session.removeSubscription(subscriptionId);}}}}

优雅停机

当服务器停机时,最好给客户端发送断连消息,而不是让客户端过了一段时间发现连接断开。
Spring websocket是如何来实现优雅停机的?

public class SubProtocolWebSocketHandlerimplements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {@Overridepublic final void stop() {synchronized (this.lifecycleMonitor) {this.running = false;this.clientOutboundChannel.unsubscribe(this);}// Proactively notify all active WebSocket sessionsfor (WebSocketSessionHolder holder : this.sessions.values()) {try {holder.getSession().close(CloseStatus.GOING_AWAY);}catch (Throwable ex) {if (logger.isWarnEnabled()) {logger.warn("Failed to close '" + holder.getSession() + "': " + ex);}}}}@Overridepublic final void stop(Runnable callback) {synchronized (this.lifecycleMonitor) {stop();callback.run();}}
}

其奥秘就是其实现了SmartLifecycle。这个是Spring的生命周期接口。我们可以通过实现此接口,在相应的生命周期阶段注册回调事件!
上面的代码,通过调用stop接口,给客户端发送了一个断连的消息。即实现了关机时的主动通知断连。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/831666.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怎么让电脑耳机和音响都有声音

电脑耳机音响不能同时用没声音怎么办 一般来说&#xff0c;重新开机后问题能够得到解决。右击“我的电脑”---“属性”---“硬件”---“设备管理器”&#xff0c;打开“声音、视频和游戏控制器”有无问题&#xff0c;即看前面有没有出现黄色的“”。 如果您的 电脑 耳机能正常…

VMware虚拟机中ubuntu使用记录(4)—— 如何在VMware虚拟机中调用本机电脑的摄像头

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、虚拟机调用本机摄像头(1) 启动VMware USB 服务(2) 连接本机摄像头(3) 测试摄像头的连接 前言 通过配置虚拟机调用本机摄像头&#xff0c;用户可以在虚拟机…

Redis---------实现商品秒杀业务,包括唯一ID,超卖问题,分布式锁

订单ID必须是唯一 唯一ID构成&#xff1a; 代码生成唯一ID&#xff1a; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.tim…

【论文阅读】Learning Texture Transformer Network for Image Super-Resolution

Learning Texture Transformer Network for Image Super-Resolution 论文地址Abstract1. 简介2.相关工作2.1单图像超分辨率2.2 Reference-based Image Super-Resolution 3. 方法3.1. Texture TransformerLearnable Texture Extractor 可学习的纹理提取器。Relevance Embedding.…

Qt QImageWriter类介绍

1.简介 QImageWriter 用于写入图像文件的类。它提供了将 QImage 对象保存到不同图像格式文件的功能&#xff0c;包括但不限于 PNG、JPEG、BMP 等。QImageWriter 可以将图像写入文件&#xff0c;也可以写入任何 QIODevice&#xff0c;如 QByteArray&#xff0c;这使得它非常灵活…

python中type,object,class 三者关系

type,object,class 三者关系 在python中&#xff0c;所有类的创建关系遵循&#xff1a; type -> int -> 1 type -> class -> obj例如&#xff1a; a 1 b "abc" print(type(1)) # <class int> 返回对象的类型 print(type(int)) …

基于OpenCv的图像金字塔

⚠申明&#xff1a; 未经许可&#xff0c;禁止以任何形式转载&#xff0c;若要引用&#xff0c;请标注链接地址。 全文共计3077字&#xff0c;阅读大概需要3分钟 &#x1f308;更多学习内容&#xff0c; 欢迎&#x1f44f;关注&#x1f440;【文末】我的个人微信公众号&#xf…

【讲解如何OpenCV入门】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

需求规格说明书编制书(word原件)

1 范围 1.1 系统概述 1.2 文档概述 1.3 术语及缩略语 2 引用文档 3 需求 3.1 要求的状态和方式 3.2 系统能力需求 3.3 系统外部接口需求 3.3.1 管理接口 3.3.2 业务接口 3.4 系统内部接口需求 3.5 系统内部数据需求 3.6 适应性需求 3.7 安全性需求 3.8 保密性需…

GiantPandaCV | FasterTransformer Decoding 源码分析(二)-Decoder框架介绍

本文来源公众号“GiantPandaCV”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;FasterTransformer Decoding 源码分析(二)-Decoder框架介绍 作者丨进击的Killua 来源丨https://zhuanlan.zhihu.com/p/669303360 编辑丨GiantPand…

【Python编程实践1/3】模块

目录 目标 模块 import ​编辑 代码小结 题目 from...import 随机模块 代码小结 randint函数 骰子大战 choice函数 总结 目标 拧一颗螺丝&#xff0c;只会用到螺丝刀&#xff1b;但是修一台汽车&#xff0c;需要一整套汽修的工具。函数就像螺丝刀&#xff0c;可以帮…

python项目==一个web项目,配置模板指定文件清洗规则,调用模板规则清洗文件

代码地址 一个小工具。 一个web项目&#xff0c;配置模板指定文件清洗规则&#xff0c;调用模板规则清洗文件 https://github.com/hebian1994/csv-transfer-all 技术栈&#xff1a; SQLite python flask vue3 elementplus 功能介绍&#xff1a; A WEB tool for cleaning…

JavaScript:Web APIs(三)

本篇文章的内容包括&#xff1a; 一&#xff0c;事件流 二&#xff0c;移除事件监听 三&#xff0c;其他事件 四&#xff0c;元素尺寸与位置 一&#xff0c;事件流 事件流是什么呢&#xff1f; 事件流是指事件执行过程中的流动路径。 我们发现&#xff0c;一个完整的事件执行…

Delta lake with Java--利用spark sql操作数据1

今天要解决的问题是如何使用spark sql 建表&#xff0c;插入数据以及查询数据 1、建立一个类叫 DeltaLakeWithSparkSql1&#xff0c;具体代码如下&#xff0c;例子参考Delta Lake Up & Running第3章内容 import org.apache.spark.sql.SaveMode; import org.apache.spark.…

区域文本提示的实时文本到图像生成;通过一致性自注意力机制的视频生成工具保持视频的一致性;专门为雪佛兰汽车设计的客服聊天机器人

✨ 1: StreamMultiDiffusion StreamMultiDiffusion是首个基于区域文本提示的实时文本到图像生成框架&#xff0c;实现了高速且互动的图像生成。 StreamMultiDiffusion 旨在结合加速推理技术和基于区域的文本提示控制&#xff0c;以克服之前解决方案中存在的速度慢和用户交互性…

约瑟夫问题新解法

前言 又碰到了约瑟夫问题&#xff0c;这样的题目本来用环形链表模拟的话就能做出来。然而&#xff0c;最近新学习了一种做法&#xff0c;实在是有点震惊到我了。无论是思路上&#xff0c;还是代码量上&#xff0c;都是那么的精彩。就想也震惊一下其他人。谁能想到原来模拟出来四…

C/C++程序设计实验报告综合作业 | 小小计算器

本文整理自博主本科大一《C/C程序设计》专业课的课内实验报告&#xff0c;适合C语言初学者们学习、练习。 编译器&#xff1a;gcc 10.3.0 ---- 注&#xff1a; 1.虽然课程名为C程序设计&#xff0c;但实际上当时校内该课的内容大部分其实都是C语言&#xff0c;C的元素最多可能只…

深度解析 Spring 源码:探寻Bean的生命周期

文章目录 一、 Bean生命周期概述二、Bean生命周期流程图三、Bean生命周期验证3.1 代码案例3.2 执行结果 四、Bean生命周期源码4.1 setBeanName()4.2 setBeanFactory()4.3 setApplicationContext()4.4 postProcessBeforeInitialization()4.5 afterPropertiesSet()4.6 postProces…

力扣刷题第1天:消失的数字

大家好啊&#xff0c;从今天开始将会和大家一起刷题&#xff0c;从今天开始小生也会开辟新的专栏。&#x1f61c;&#x1f61c;&#x1f61c; 目录 第一部分&#xff1a;题目描述 第二部分&#xff1a;题目分析 第三部分&#xff1a;解决方法 3.1 思路一&#xff1a;先排序…

十、多模态大语言模型(MLLM)

1 多模态大语言模型&#xff08;Multimodal Large Language Models&#xff09; 模态的定义 模态&#xff08;modal&#xff09;是事情经历和发生的方式&#xff0c;我们生活在一个由多种模态(Multimodal)信息构成的世界&#xff0c;包括视觉信息、听觉信息、文本信息、嗅觉信…