java每日精进 5.11【WebSocket】

1.纯Websocket实现消息发送

1.1一对一发送

前端

  1. 用户在输入框输入消息内容(sendText)

  2. 选择特定接收用户(sendUserId)

  3. 点击发送按钮触发handlerSend方法

  4. 构造消息内容JSON:

    {text: "Hello", // 消息内容toUserId: 123   // 目标用户ID
    }
  5. 包装为WebSocket标准格式:

    {type: "demo-message-send", // 消息类型content: '{"text":"Hello","toUserId":123}' // 字符串化的内容
    }
  6. 通过send()方法发送

  • 前端在setup函数中,使用useWebSocket方法,根据server变量(WebSocket 服务地址)建立连接。server地址由VITE_BASE_URL(环境变量)、/infra/ws路径和token(通过getRefreshToken获取)组成。
  • 设置autoReconnecttrue,表示自动重连;heartbeattrue,表示开启心跳机制。
  • 当用户在前端输入消息并点击发送按钮时,handlerSend函数被调用。
  • 首先将发送内容sendText和接收用户sendUserId进行 JSON 化处理,构建消息内容messageContent
  • 然后将消息类型typedemo-message-send)和消息内容messageContent再次 JSON 化,形成最终的消息jsonMessage
  • 最后使用send函数将jsonMessage发送到后端。
const server = ref((import.meta.env.VITE_BASE_URL + '/infra/ws').replace('http', 'ws') +'?token=' +getRefreshToken() // 使用 getRefreshToken() 方法,而不使用 getAccessToken() 方法的原因:WebSocket 无法方便的刷新访问令牌
) // WebSocket 服务地址
const getIsOpen = computed(() => status.value === 'OPEN') // WebSocket 连接是否打开
const getTagColor = computed(() => (getIsOpen.value ? 'success' : 'red')) // WebSocket 连接的展示颜色/** 发起 WebSocket 连接 */
const { status, data, send, close, open } = useWebSocket(server.value, {autoReconnect: true,heartbeat: true
})
/** 发送消息 */
const sendText = ref('') // 发送内容
const sendUserId = ref('') // 发送人
const handlerSend = () => {// 1.1 先 JSON 化 message 消息内容const messageContent = JSON.stringify({text: sendText.value,toUserId: sendUserId.value})// 1.2 再 JSON 化整个消息const jsonMessage = JSON.stringify({type: 'demo-message-send',content: messageContent})// 2. 最后发送消息send(jsonMessage)sendText.value = ''
}

后端

  • 注册监听器DemoWebSocketMessageListener 类通过实现 WebSocketMessageListener<DemoSendMessage> 接口,并使用 @Component 注解将自己注册为 Spring Bean。框架启动时会扫描所有实现了该接口的 Bean,并将它们注册到消息处理器中。
/*** WebSocket 示例:单发消息*/
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {@Resourceprivate WebSocketMessageSender webSocketMessageSender;@Overridepublic void onMessage(WebSocketSession session, DemoSendMessage message) {Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);// 情况一:单发if (message.getToUserId() != null) {DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true);webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 给指定用户"demo-message-receive", toMessage);return;}// 情况二:群发DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(false);webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 给所有用户"demo-message-receive", toMessage);}@Overridepublic String getType() {return "demo-message-send";}}
  • 消息类型绑定getType() 方法返回 "demo-message-send",这表明该监听器专门处理类型为 "demo-message-send" 的消息。当后端接收到消息时,会根据消息类型路由到对应的监听器。

当 WebSocket 服务器接收到消息后:

  1. 消息解析:框架首先解析消息的 JSON 格式,提取 type 字段(如 "demo-message-send")。
  2. 类型匹配:后端框架会自动将 type 为 "demo-message-send" 的消息路由到 DemoWebSocketMessageListener 的 onMessage 方法。
  3. 调用回调:将消息反序列化为 DemoSendMessage 对象,并调用监听器的 onMessage 方法。
/*** JSON 格式 {@link WebSocketHandler} 实现类* 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {/*** type 与 WebSocketMessageListener 的映射* 用于存储不同消息类型对应的监听器,键为消息类型,值为对应的监听器实例*/private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();@SuppressWarnings({"rawtypes", "unchecked"})public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {// 遍历传入的监听器列表listenersList.forEach((Consumer<WebSocketMessageListener>)listener -> {// 将监听器的类型(通过 getType() 方法获取)作为键,监听器实例作为值,存入 listeners 映射中listeners.put(listener.getType(), listener);});}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 1.1 空消息,跳过// 如果消息的负载长度为 0,说明是一个空消息,直接返回,不进行后续处理if (message.getPayloadLength() == 0) {return;}// 1.2 ping 心跳消息,直接返回 pong 消息。// 如果消息的负载长度为 4 且负载内容为 "ping",则向客户端发送 "pong" 消息,表示响应心跳if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {session.sendMessage(new TextMessage("pong"));return;}// 2.1 解析消息try {// 将文本消息的负载解析为 JsonWebSocketMessage 对象JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);// 如果解析后的消息为空,记录错误日志并返回,不进行后续处理if (jsonMessage == null) {log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());return;}// 如果解析后的消息类型为空,记录错误日志并返回,不进行后续处理if (StrUtil.isEmpty(jsonMessage.getType())) {log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());return;}// 2.2 获得对应的 WebSocketMessageListener// 根据消息类型从 listeners 映射中获取对应的监听器WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());// 如果没有找到对应的监听器,记录错误日志并返回,不进行后续处理if (messageListener == null) {log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());return;}// 2.3 处理消息// 获取监听器泛型参数类型Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);// 将消息内容解析为对应类型的对象Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);// 获取当前会话的租户 IDLong tenantId = WebSocketFrameworkUtils.getTenantId(session);// 执行租户相关的操作,调用监听器的 onMessage 方法处理消息TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));} catch (Throwable ex) {// 如果在处理消息过程中发生异常,记录错误日志log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());}}}

WebSocketMessageListener 之所以能监听消息,是因为:

  1. 接口契约:实现 WebSocketMessageListener 接口并指定消息类型(getType())。
  2. 框架支持:Spring 框架自动扫描并注册监听器,实现消息的解析和分发。
  3. 类型匹配:前端发送的消息 type 与后端监听器的 getType() 一致,触发回调。

这个过程类似于 HTTP 请求的路由机制,只不过 WebSocket 是长连接,需要持续监听消息。

通常,WebSocket 框架(如 Spring WebSocket)会提供以下核心组件:

  • 消息解码器:将二进制数据转换为 Java 对象(如 DemoSendMessage)。
  • 消息路由器:根据消息类型将消息路由到对应的监听器。
  • 会话管理器:维护所有 WebSocket 会话(WebSocketSession),并提供获取用户信息的工具(如 WebSocketFrameworkUtils.getLoginUserId)。
  • 后端的DemoWebSocketMessageListener类实现了WebSocketMessageListener接口的onMessage方法。
  • 当有消息到达时,onMessage方法被调用,从WebSocketSession中获取登录用户 ID(fromUserId)。
  • 根据消息中的toUserId判断是单发还是群发:
    • 如果toUserId不为空,则创建DemoReceiveMessage对象,设置fromUserIdtextsingletrue,通过webSocketMessageSendersendObject方法将消息发送给指定用户。
    • 如果toUserId为空,则创建DemoReceiveMessage对象,设置fromUserIdtextsinglefalse,通过webSocketMessageSendersendObject方法将消息发送给所有用户。
  1. JsonWebSocketMessageHandler接收并解析消息

  2. 根据type="demo-message-send"找到DemoWebSocketMessageListener

  3. 调用onMessage方法:

    • 从Session中获取发送者ID(fromUserId)

    • 检查message.getToUserId()不为null,进入单发逻辑

  4. 构造响应消息:

    new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true)
  5. 通过webSocketMessageSender发送给指定用户:

    webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 用户类型message.getToUserId(),         // 目标用户ID"demo-message-receive",       // 消息类型toMessage                    // 消息内容
    )

实际示例:

  • 用户A(ID:100)发送"下午开会"给用户B(ID:101)

  • 前端发送:

    {"type":"demo-message-send","content":"{\"text\":\"下午开会\",\"toUserId\":101}"}
  • 后端处理后发送给用户B:

    {"type":"demo-message-receive","content":"{\"fromUserId\":100,\"text\":\"下午开会\",\"single\":true}"}

1.2一对多发送

前端

  1. 用户在输入框输入消息内容(sendText)

  2. 不选择特定用户(或选择"所有人")

  3. 点击发送按钮触发handlerSend方法

  4. 构造消息内容JSON:

    {text: "系统维护通知", // 消息内容toUserId: ""      // 空表示群发
    }
  5. 包装为WebSocket标准格式并发送

后端

  1. 同上接收解析流程

  2. onMessage方法中检查message.getToUserId()为null,进入群发逻辑

  3. 构造响应消息:

    new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(false)
  4. 通过webSocketMessageSender发送给所有用户:

    webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 用户类型"demo-message-receive",       // 消息类型toMessage                    // 消息内容
    )

实际示例:

  • 管理员发送"系统即将升级"给所有用户

  • 前端发送:

    {"type":"demo-message-send","content":"{\"text\":\"系统即将升级\",\"toUserId\":\"\"}"}

2.总结及类补充

后端代码

  1. 配置类:

    • YudaoWebSocketAutoConfiguration: 配置 WebSocket 端点、拦截器、会话管理和消息发送器,支持多种发送类型(local, redis, rocketmq, rabbitmq, kafka)。
    • 条件注解 @ConditionalOnProperty 允许通过配置启用/禁用 WebSocket 或切换发送类型。
    • 注册 WebSocketConfigurer、HandshakeInterceptor、WebSocketHandler 和 WebSocketSessionManager。
    • /*** WebSocket 自动配置类* 负责 WebSocket 服务的初始化和相关组件的注册*/
      @AutoConfiguration(before = YudaoRedisMQConsumerAutoConfiguration.class) 
      // 在 YudaoRedisMQConsumerAutoConfiguration 之前加载,确保 RedisWebSocketMessageConsumer 先创建
      @EnableWebSocket // 启用 Spring WebSocket 支持
      @ConditionalOnProperty(prefix = "moyun.websocket", value = "enable", matchIfMissing = true) 
      // 通过配置项 moyun.websocket.enable 控制是否启用 WebSocket,默认启用
      @EnableConfigurationProperties(WebSocketProperties.class) // 启用 WebSocket 配置属性类
      public class YudaoWebSocketAutoConfiguration {/*** 配置 WebSocket 处理器和握手拦截器*/@Beanpublic WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors,WebSocketHandler webSocketHandler,WebSocketProperties webSocketProperties) {return registry -> registry// 注册 WebSocket 处理器并指定连接路径.addHandler(webSocketHandler, webSocketProperties.getPath())// 添加握手拦截器,用于验证和预处理.addInterceptors(handshakeInterceptors)// 允许所有域名跨域访问,否则前端连接会被阻止.setAllowedOriginPatterns("*");}/*** 创建握手拦截器,用于在 WebSocket 握手阶段进行用户认证和权限检查*/@Beanpublic HandshakeInterceptor handshakeInterceptor() {return new LoginUserHandshakeInterceptor();}/*** 创建 WebSocket 消息处理器* 包装 JsonWebSocketMessageHandler 并添加会话管理功能*/@Beanpublic WebSocketHandler webSocketHandler(WebSocketSessionManager sessionManager,List<? extends WebSocketMessageListener<?>> messageListeners) {// 1. 创建消息处理器,负责消息类型路由和分发JsonWebSocketMessageHandler messageHandler = new JsonWebSocketMessageHandler(messageListeners);// 2. 包装消息处理器,添加会话管理功能(如连接建立和关闭时的回调)return new WebSocketSessionHandlerDecorator(messageHandler, sessionManager);}/*** 创建 WebSocket 会话管理器,用于管理所有活动的 WebSocket 会话*/@Beanpublic WebSocketSessionManager webSocketSessionManager() {return new WebSocketSessionManagerImpl();}/*** 创建 WebSocket 请求授权自定义器,用于配置安全规则*/@Beanpublic WebSocketAuthorizeRequestsCustomizer webSocketAuthorizeRequestsCustomizer(WebSocketProperties webSocketProperties) {return new WebSocketAuthorizeRequestsCustomizer(webSocketProperties);}// ==================== 消息发送器配置 ====================/*** 本地模式消息发送器配置(单节点部署)*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "local")public class LocalWebSocketMessageSenderConfiguration {@Beanpublic LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) {return new LocalWebSocketMessageSender(sessionManager);}}/*** Redis 模式消息发送器配置(分布式部署)*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "redis")public class RedisWebSocketMessageSenderConfiguration {@Beanpublic RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager,RedisMQTemplate redisMQTemplate) {return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);}@Beanpublic RedisWebSocketMessageConsumer redisWebSocketMessageConsumer(RedisWebSocketMessageSender redisWebSocketMessageSender) {return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender);}}/*** RocketMQ 模式消息发送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "rocketmq")public class RocketMQWebSocketMessageSenderConfiguration {@Beanpublic RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate,@Value("${moyun.websocket.sender-rocketmq.topic}") String topic) {return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic);}@Beanpublic RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer(RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) {return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender);}}/*** RabbitMQ 模式消息发送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "rabbitmq")public class RabbitMQWebSocketMessageSenderConfiguration {@Beanpublic RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate,TopicExchange websocketTopicExchange) {return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange);}@Beanpublic RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer(RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) {return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender);}/*** 创建 RabbitMQ 主题交换机,用于消息广播*/@Beanpublic TopicExchange websocketTopicExchange(@Value("${moyun.websocket.sender-rabbitmq.exchange}") String exchange) {return new TopicExchange(exchange,true,  // durable: 持久化交换机,重启后不丢失false); // exclusive: 非排他性,允许多个连接使用}}/*** Kafka 模式消息发送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "kafka")public class KafkaWebSocketMessageSenderConfiguration {@Beanpublic KafkaWebSocketMessageSender kafkaWebSocketMessageSender(WebSocketSessionManager sessionManager, KafkaTemplate<Object, Object> kafkaTemplate,@Value("${moyun.websocket.sender-kafka.topic}") String topic) {return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic);}@Beanpublic KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer(KafkaWebSocketMessageSender kafkaWebSocketMessageSender) {return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender);}}}

  2. 认证与拦截:

    • TokenAuthenticationFilter: 解析 WebSocket URL 中的 token 参数,验证用户身份,构建 LoginUser 并存储到 Spring Security 上下文中。
    • /*** Token 过滤器,验证 token 的有效性* 验证通过后,获得 {@link LoginUser} 信息,并加入到 Spring Security 上下文*/
      @RequiredArgsConstructor
      public class TokenAuthenticationFilter extends OncePerRequestFilter {private final SecurityProperties securityProperties;private final GlobalExceptionHandler globalExceptionHandler;private final OAuth2TokenApi oauth2TokenApi;@Override@SuppressWarnings("NullableProblems")protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)throws ServletException, IOException {String token = SecurityFrameworkUtils.obtainAuthorization(request,securityProperties.getTokenHeader(), securityProperties.getTokenParameter());if (StrUtil.isNotEmpty(token)) {Integer userType = WebFrameworkUtils.getLoginUserType(request);try {// 1.1 基于 token 构建登录用户LoginUser loginUser = buildLoginUserByToken(token, userType);// 1.2 模拟 Login 功能,方便日常开发调试if (loginUser == null) {loginUser = mockLoginUser(request, token, userType);}// 2. 设置当前用户if (loginUser != null) {SecurityFrameworkUtils.setLoginUser(loginUser, request);}} catch (Throwable ex) {CommonResult<?> result = globalExceptionHandler.allExceptionHandler(request, ex);ServletUtils.writeJSON(response, result);return;}}// 继续过滤链chain.doFilter(request, response);}private LoginUser buildLoginUserByToken(String token, Integer userType) {try {OAuth2AccessTokenCheckRespDTO accessToken = oauth2TokenApi.checkAccessToken(token);if (accessToken == null) {return null;}// 用户类型不匹配,无权限// 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比对用户类型// 类似 WebSocket 的 /ws/* 连接地址,是不需要比对用户类型的if (userType != null&& ObjectUtil.notEqual(accessToken.getUserType(), userType)) {throw new AccessDeniedException("错误的用户类型");}// 构建登录用户return new LoginUser().setId(accessToken.getUserId()).setUserType(accessToken.getUserType()).setInfo(accessToken.getUserInfo()) // 额外的用户信息.setTenantId(accessToken.getTenantId()).setScopes(accessToken.getScopes()).setExpiresTime(accessToken.getExpiresTime());} catch (ServiceException serviceException) {// 校验 Token 不通过时,考虑到一些接口是无需登录的,所以直接返回 null 即可return null;}}/*** 模拟登录用户,方便日常开发调试** 注意,在线上环境下,一定要关闭该功能!!!** @param request 请求* @param token 模拟的 token,格式为 {@link SecurityProperties#getMockSecret()} + 用户编号* @param userType 用户类型* @return 模拟的 LoginUser*/private LoginUser mockLoginUser(HttpServletRequest request, String token, Integer userType) {if (!securityProperties.getMockEnable()) {return null;}// 必须以 mockSecret 开头if (!token.startsWith(securityProperties.getMockSecret())) {return null;}// 构建模拟用户Long userId = Long.valueOf(token.substring(securityProperties.getMockSecret().length()));return new LoginUser().setId(userId).setUserType(userType).setTenantId(WebFrameworkUtils.getTenantId(request));}}
    • LoginUserHandshakeInterceptor: 在 WebSocket 握手阶段将 LoginUser 存入 WebSocketSession 的 attributes。
    • /*** 登录用户的 {@link HandshakeInterceptor} 实现类** 流程如下:* 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过* 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中*/
      public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) {LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();if (loginUser != null) {WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);}return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Exception exception) {// do nothing}}
  3. 会话管理:

    • WebSocketSessionHandlerDecorator: 装饰 WebSocketHandler,在连接建立/关闭时管理 WebSocketSession。
    • /*** {@link WebSocketHandler} 的装饰类,实现了以下功能:** 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理* 2. 封装 {@link WebSocketSession} 支持并发操作*/
      public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {/*** 发送时间的限制,单位:毫秒* 这里定义了发送消息的时间限制为 5 秒(1000 毫秒 * 5),用于控制消息发送的时间范围,* 可能是为了防止长时间的消息发送操作,避免资源占用或其他潜在问题。*/private static final Integer SEND_TIME_LIMIT = 1000 * 5;/*** 发送消息缓冲上限,单位:bytes* 定义了发送消息的缓冲大小上限为 1024 * 100 字节,用于限制消息缓冲的大小,* 防止缓冲过大导致内存占用过多等问题。*/private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;// WebSocket 会话管理器,用于管理 WebSocket 会话private final WebSocketSessionManager sessionManager;/*** 构造函数,接收被装饰的 WebSocketHandler 和 WebSocketSessionManager** @param delegate      被装饰的 WebSocketHandler* @param sessionManager WebSocket 会话管理器*/public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,WebSocketSessionManager sessionManager) {// 调用父类构造函数,传入被装饰的 WebSocketHandlersuper(delegate);// 初始化会话管理器this.sessionManager = sessionManager;}/*** 当 WebSocket 连接建立时调用此方法** @param session WebSocket 会话*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149// 使用定义的时间限制和缓冲大小限制,创建一个支持并发的 WebSocketSession 装饰类实例session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);// 将新的会话添加到 WebSocketSessionManager 中进行管理sessionManager.addSession(session);}/*** 当 WebSocket 连接关闭时调用此方法** @param session    WebSocket 会话* @param closeStatus 关闭状态*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {// 从 WebSocketSessionManager 中移除对应的会话,完成连接关闭时的会话管理操作sessionManager.removeSession(session);}
      }
    • WebSocketSessionManager 和 WebSocketSessionManagerImpl:管理会话,支持按用户类型和 ID 查询。
    • /*** {@link WebSocketSession} 管理器的接口*/
      public interface WebSocketSessionManager {/*** 添加 Session** @param session Session*/void addSession(WebSocketSession session);/*** 移除 Session** @param session Session*/void removeSession(WebSocketSession session);/*** 获得指定编号的 Session** @param id Session 编号* @return Session*/WebSocketSession getSession(String id);/*** 获得指定用户类型的 Session 列表** @param userType 用户类型* @return Session 列表*/Collection<WebSocketSession> getSessionList(Integer userType);/*** 获得指定用户编号的 Session 列表** @param userType 用户类型* @param userId 用户编号* @return Session 列表*/Collection<WebSocketSession> getSessionList(Integer userType, Long userId);}
      /*** 默认的 {@link WebSocketSessionManager} 实现类*/
      public class WebSocketSessionManagerImpl implements WebSocketSessionManager {/*** id 与 WebSocketSession 映射** key:Session 编号*/private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();/*** user 与 WebSocketSession 映射** key1:用户类型* key2:用户编号*/private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions= new ConcurrentHashMap<>();@Overridepublic void addSession(WebSocketSession session) {// 添加到 idSessions 中idSessions.put(session.getId(), session);// 添加到 userSessions 中LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);if (user == null) {return;}ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());if (userSessionsMap == null) {userSessionsMap = new ConcurrentHashMap<>();if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {userSessionsMap = userSessions.get(user.getUserType());}}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());if (sessions == null) {sessions = new CopyOnWriteArrayList<>();if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {sessions = userSessionsMap.get(user.getId());}}sessions.add(session);}@Overridepublic void removeSession(WebSocketSession session) {// 移除从 idSessions 中idSessions.remove(session.getId());// 移除从 idSessions 中LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);if (user == null) {return;}ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());if (userSessionsMap == null) {return;}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());sessions.removeIf(session0 -> session0.getId().equals(session.getId()));if (CollUtil.isEmpty(sessions)) {userSessionsMap.remove(user.getId(), sessions);}}@Overridepublic WebSocketSession getSession(String id) {return idSessions.get(id);}@Overridepublic Collection<WebSocketSession> getSessionList(Integer userType) {ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);if (CollUtil.isEmpty(userSessionsMap)) {return new ArrayList<>();}LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容Long contextTenantId = TenantContextHolder.getTenantId();for (List<WebSocketSession> sessions : userSessionsMap.values()) {if (CollUtil.isEmpty(sessions)) {continue;}// 特殊:如果租户不匹配,则直接排除if (contextTenantId != null) {Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));if (!contextTenantId.equals(userTenantId)) {continue;}}result.addAll(sessions);}return result;}@Overridepublic Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);if (CollUtil.isEmpty(userSessionsMap)) {return new ArrayList<>();}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();}}
  4. 消息处理:

    • JsonWebSocketMessageHandler:解析 JsonWebSocketMessage,根据 type 分发给 WebSocketMessageListener。
    • DemoWebSocketMessageListener: 处理 demo-message-send 类型的消息,支持单发和群发。
    • JsonWebSocketMessage:包含 type 和 content 字段。
  5. 消息发送:

    • WebSocketMessageSender: 定义消息发送接口。
    • AbstractWebSocketMessageSender: 实现消息发送逻辑,查询会话并发送 JsonWebSocketMessage。
    • LocalWebSocketMessageSender: 本地发送实现,适合单机场景。
  6. 配置websocket的Ss权限
/*** WebSocket 的权限自定义* 负责为 WebSocket 端点的 HTTP 握手请求配置 Spring Security 权限,通过 permitAll() 确保握手请求不被阻止* 同时保留自定义的 token 认证逻辑(由 TokenAuthenticationFilter 和 LoginUserHandshakeInterceptor 处理)* 它解决了 Spring Security 对 WebSocket 握手请求的限制问题,是集成 Spring Security 的 WebSocket 功能的关键组件*/
@RequiredArgsConstructor
public class WebSocketAuthorizeRequestsCustomizer extends AuthorizeRequestsCustomizer {private final WebSocketProperties webSocketProperties;@Overridepublic void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) {registry.requestMatchers(webSocketProperties.getPath()).permitAll();}}

前端代码

  • InfraWebSocket.vue: 使用 @vueuse/core 的 useWebSocket 建立连接,发送/接收 JsonWebSocketMessage,支持单发和群发消息。
  • 依赖用户列表 API(UserApi.getSimpleUserList)和 token 获取逻辑(getRefreshToken)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/82887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【NextPilot日志移植】params.c解析

params.c 参数说明 params.c 文件的主要作用是定义与 SD卡日志记录 相关的参数。这些参数用于配置日志记录的行为&#xff0c;包括日志记录的时间、内容、存储管理以及加密设置等。 1. UTC 偏移量 (SDLOG_UTC_OFFSET) PARAM_DEFINE_INT32(SDLOG_UTC_OFFSET, 0);用途&#xf…

jFinal 使用 SolonMCP 开发 MCP(拥抱新潮流)

MCP 官方的 java-sdk 目前只支持 java17。直接基于 mcp-java-sdk 也比较复杂。使用 SolonMCP&#xff0c;可以基于 java8 开发&#xff08;像 MVC 的开发风格&#xff09;&#xff0c;且比较简单。 1、SolonMCP 简介 SolonMCP&#xff08;全称&#xff1a;solon-ai-mcp&#…

“端 - 边 - 云”三级智能协同平台的理论建构与技术实现

摘要 随着低空经济与智能制造的深度融合&#xff0c;传统集中式云计算架构在实时性、隐私保护和资源效率上的瓶颈日益凸显。本文提出“端 - 边 - 云”三级智能协同平台架构&#xff0c;以“时空 - 资源 - 服务”三维协同理论为核心&#xff0c;构建覆盖终端感知、边缘计算、云端…

【如何搭建开发环境】

了解java程序 JAVA体系结构 跨平台原理与编译和反编译 如何学习java语言&#xff0c;如何搭建环境 设置JAVA_HOME&#xff0c;指向jdk的安装目录这一级即可。比如我的JDK安装在C:\java\jdk1.8.0_25&#xff0c;那JAVA_HOME的值就是C:\java\jdk1.8.0_25设置Path变量 在Path值后…

LegoGPT,卡内基梅隆大学推出的乐高积木设计模型

LegoGPT 是由卡内基梅隆大学开发的一款创新性乐高积木设计模型&#xff0c;能够根据用户的文本提示生成结构稳固、可组装的乐高模型。该模型基于自回归语言模型和大规模乐高设计数据集进行训练&#xff0c;用户只需输入简单的文字描述&#xff0c;LegoGPT 就能逐步构建出物理稳…

深入理解 NumPy:Python 科学计算的基石

在数据科学、人工智能和科学计算的世界里&#xff0c;NumPy 是一块绕不过去的基石。它是 Python 语言中用于高性能科学计算的基础包&#xff0c;几乎所有的数据分析与机器学习框架&#xff08;如 Pandas、TensorFlow、Scikit-learn&#xff09;都离不开它的支持。 一、什么是 …

Java基础(IO)

所有操作都在内存&#xff0c;不能长时间保存&#xff0c;IO主要在硬盘&#xff0c;可以长时间保存。 一、File类 File类被定义为文件和目录路径名的抽象表示形式&#xff0c;这是因为 File 类既可以表示文件也可以表示目录&#xff0c;他们都通过对应的路径来描述。 提供构…

仿正点原子驱动BMP280气压传感器实例

文章目录 前言 一、寄存器头文件定义 二、设备树文件中添加节点 三、驱动文件编写 四、编写驱动测试文件并编译测试 总结 前言 本文驱动开发仿照正点原子的iic驱动实现&#xff0c;同时附上bmp280的数据手册&#xff0c;可访问下面的链接&#xff1a; BMP280_Bosch(博世…

论坛系统(中-1)

软件开发 编写公共代码 定义状态码 对执⾏业务处理逻辑过程中可能出现的成功与失败状态做针对性描述(根据需求分析阶段可以遇见的问题提前做出定义)&#xff0c;⽤枚举定义状态码&#xff0c;先定义⼀部分&#xff0c;业务中遇到新的问题再添加 定义状态码如下 状态码类型描…

E+H流量计通过Profibus DP主站转Modbus TCP网关与上位机轻松通讯

EH流量计通过Profibus DP主站转Modbus TCP网关与上位机轻松通讯 在现代工业自动化的广阔舞台上&#xff0c;Profibus DP与Modbus TCP这两种通信协议各领风骚&#xff0c;它们在不同的应用场景中发挥着举足轻重的作用。但工业生产的复杂性往往要求不同设备、系统之间能够顺畅沟…

服务器中存储空间不足该怎么办?

服务器作为存储数据信息的重要网络设备&#xff0c;随着企业业务的不断拓展&#xff0c;所需要存储的数据信息也在不断增加&#xff0c;最终会导致服务器中存储空间不足&#xff0c;这不仅会影响到服务器系统性能&#xff0c;还会造成业务无法正常执行&#xff0c;那么&#xf…

C++23 views::chunk_by (P2443R1) 详解

文章目录 引言C23 范围库概述范围视图&#xff08;Range Views&#xff09;范围算法&#xff08;Range Algorithms&#xff09;范围适配器&#xff08;Range Adapters&#xff09; std::views::chunk_by 介绍基本概念特性使用场景 示例代码简单示例自定义谓词示例 总结 引言 在…

零碳园区能源系统-多能互补体系

构建以可再生能源为核心的零碳园区能源系统&#xff0c;需整合光储直柔、光伏发电、微电网、氢能与储能技术&#xff0c;通过多能协同与智能调控实现能源生产、存储、消费全链条优化。以下是系统性实施方案&#xff1a; 一、系统架构设计 1. 多能互补体系 &#xff08;图示&a…

elastic search学习

首先在自己电脑上安装elastic search。安装成功后&#xff0c;查看ES是否启动成功。 安装过程参考&#xff1a;ElasticSearch入门1: mac 安装 - 霜井 - 博客园 安装完成后&#xff0c;直接执行bin目录中的elastic search命令后&#xff0c;就可以启动成功&#xff01; 在网页…

mysql8常用sql语句

查询结果带行号 -- 表名为 mi_user&#xff0c; 假设包含列 id &#xff0c;address SELECT ROW_NUMBER() OVER (ORDER BY id) AS row_num, t.id, t.address FROM mi_user t ; SELECT ROW_NUMBER() OVER ( ) AS row_num, t.id, t.address FROM mi_user t ; 更新某列数…

Memcached 服务搭建和集成使用的详细步骤示例

以下是 Memcached 服务搭建和集成使用的详细步骤示例&#xff1a; 一、搭建 Memcached 服务 安装 Memcached Linux 系统 yum 安装&#xff1a;执行命令 yum install -y memcached memcached-devel。源码安装 下载源码&#xff1a;wget http://www.memcached.org/files/memcach…

2. 盒模型/布局模块 - 响应式产品展示页_案例:电商产品网格布局

2. 盒模型/布局模块 - 响应式产品展示页 案例&#xff1a;电商产品网格布局 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title></head><style type"text/css">:root {--primary-color…

Go基于plugin的热更新初体验

背景 对于一个部署在生产环境的项目来说&#xff0c;我们希望当代码出现bug的时候&#xff0c;可以不用重启进程而达到动态修改代码的目的—— 这就是代码热部署&#xff01; 使用java做游戏服务器&#xff0c;最大的好处是&#xff0c;当代码出现bug&#xff0c;可以直接热…

【RabbitMQ】工作队列和发布/订阅模式的具体实现

文章目录 建立连接工作队列模式实现创建队列和交换机生产者代码消费者代码运行程序启动消费者启动生产者 发布/订阅模式实现创建队列和交换机生产者代码创建交换机声明两个队列绑定队列和交换机发送消息完整代码 消费者代码完整代码 运行程序启动生产者启动消费者 建立连接 我…

Codeforces Round 998 (Div. 3)

A. Fibonacciness 题目大意 给你四个数字abde&#xff0c;让你找到一个中间值c&#xff0c;问 a b c a b c abc &#xff0c; b c d b c d bcd &#xff0c; c d e c d e cde 最多能有几个式子成立 解题思路 显然最多就六种情况&#xff0c;暴力枚举即可 代…