企业级管理系统的站内信怎么轻量级优雅实现

news/2025/11/14 17:57:26/文章来源:https://www.cnblogs.com/gtnotgod/p/19222969

企业级管理系统的站内信怎么轻量级优雅实现

一、什么是站内信?

站内信(In-App Messaging 或 Internal Messaging)是指在一个软件系统或平台内部,用户之间或系统与用户之间进行非实时或准实时文字通信的功能模块。它不依赖外部通信渠道(如短信、邮件),而是完全在应用内部完成消息的发送、接收与管理。

在企业级管理系统(如 OA、ERP、CRM、HRM、项目管理平台等)中,站内信是信息触达和协同办公的重要基础设施

二、站内信的核心功能

功能类别

  1. 消息收发 支持系统自动发送通知,或用户之间发送私信。
  2. 消息分类 如:系统通知、审批提醒、任务指派、公告、私聊等。
  3. 已读/未读状态 用户可标记消息为已读,系统可统计未读数量(常用于红点提示)。
  4. 消息列表与详情 提供消息中心页面,支持分页、筛选、搜索、按时间排序。
  5. 批量操作 如“全部标为已读”、“批量删除”等,提升操作效率。
  6. 实时提醒(可选) 通过 WebSocket 或轮询,在新消息到达时即时通知用户。
  7. 多端同步 Web、移动端等不同终端的消息状态保持一致。
  8. 权限与安全 用户只能查看自己的消息,敏感内容需防泄露、防篡改。

三、对比多种实现方式

(一)、按消息投递模型分类

在企业级管理系统中,实现站内信(In-App Messaging)有多种技术路径和架构方案。不同的实现方式适用于不同规模、性能要求、实时性需求和系统复杂度。下面从核心维度出发,系统性地介绍实现站内信的多种方式,并对比其优缺点与适用场景。
一、按消息投递模型分类

  1. 写扩散(Push 模型 / Fan-out on Write)

原理:发送消息时,为每个接收者单独写入一条记录到其“收件箱”。
优点
查询快:用户读消息只需查自己的 inbox 表,无需 join。
支持个性化:可标记不同用户是否已读、是否删除。
缺点
写压力大:群发 1000 人 = 写 1000 条记录。
存储冗余:相同内容重复存储。
适用场景
用户量中等(<10万)、消息接收者较少(如审批通知、私信)。
对读性能要求高。

  1. 读扩散(Pull 模型 / Fan-out on Read)

原理:只存一份原始消息,用户读取时动态关联其“订阅关系”或“权限”来拉取。
查询时:根据用户角色/部门,动态筛选可看的消息。
优点
写入高效:广播一条消息只需插入1条记录。
存储节省。
缺点
读性能差:每次都要 join 权限表,难以分页。
无法记录“谁已读”——除非额外维护状态表。
适用场景
纯公告类消息(如全员通知),且不要求“已读回执”。
用户量极大但消息量少。

  1. 混合模型(Hybrid)

思路:
广播类消息用 读扩散(节省存储);
私信/关键通知用 写扩散(保证状态追踪)。
实现:
消息表增加 delivery_mode 字段(push/pull)。
查询时 union 两种来源。
优点:兼顾性能与功能。
缺点:逻辑复杂,需统一 API 层抽象。
适用场景:大型企业系统(如钉钉、飞书内部消息中心)。

(二)、按实时性实现方式分类

  1. 轮询(Polling)
    前端定时(如每30秒)请求 /api/unread-count。
    优点:实现简单,兼容性好。
    缺点:延迟高、浪费带宽、服务器压力随用户增长线性上升。
    适用:小型系统或对实时性要求不高的后台管理端。
  2. 长轮询(Long Polling)
    客户端发起请求,服务端 hold 住连接,直到有新消息或超时。
    优点:比普通轮询更实时、减少无效请求。
    缺点:占用服务器连接资源,扩展性有限。
    适用:中等规模系统,无 WebSocket 支持环境。
  3. WebSocket / Socket.IO
    建立持久双向通道,服务端可主动推送新消息。
    优点:
    实时性强(毫秒级);
    节省带宽;
    支持在线状态、已读回执等高级功能。
    缺点:
    需要维护连接状态;
    集群部署需考虑连接共享(如通过 Redis Pub/Sub 广播)。
    适用:对体验要求高的现代企业应用(如协同办公平台)。
  4. Server-Sent Events (SSE)
    服务端单向推送文本流(基于 HTTP)。
    优点:比 WebSocket 轻量,天然支持自动重连。
    缺点:仅支持服务端 → 客户端,无法用于聊天交互。
    适用:纯通知类场景(如“您有3条新消息”)。

这里使用SSE实现站内信

效果如下

在这里插入图片描述

前端代码如下

<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>SSE站内信系统</title><style>body {font-family: Arial, sans-serif;max-width: 1200px;margin: 0 auto;padding: 20px;}.message-container {border: 1px solid #ddd;border-radius: 5px;padding: 15px;margin-bottom: 20px;background-color: #f9f9f9;}.message-header {display: flex;justify-content: space-between;align-items: center;margin-bottom: 10px;}.message-title {font-weight: bold;color: #333;}.message-time {font-size: 0.8em;color: #666;}.message-content {color: #444;line-height: 1.5;}.unread {background-color: #e8f4fd;border-left: 4px solid #2196F3;}.controls {margin-bottom: 20px;padding: 15px;background-color: #f5f5f5;border-radius: 5px;}button {padding: 8px 16px;margin-right: 10px;background-color: #4CAF50;color: white;border: none;border-radius: 4px;cursor: pointer;}button:hover {background-color: #45a049;}#disconnectBtn {background-color: #f44336;}#disconnectBtn:hover {background-color: #da190b;}.status {padding: 10px;margin-bottom: 15px;border-radius: 4px;text-align: center;}.connected {background-color: #dff0d8;color: #3c763d;}.disconnected {background-color: #f2dede;color: #a94442;}#messagesList {height: 500px;overflow-y: auto;border: 1px solid #eee;padding: 10px;border-radius: 5px;}</style>
</head>
<body><h1>SSE站内信系统</h1><div id="status" class="status disconnected">未连接</div><div class="controls"><button id="connectBtn">连接SSE</button><button id="disconnectBtn">断开连接</button><button id="refreshBtn">刷新历史消息</button><button id="clearBtn">清空消息</button></div><div><h2>消息列表</h2><div id="messagesList"></div></div><script>let eventSource = null;let messages = [];// DOM元素引用const statusEl = document.getElementById('status');const messagesListEl = document.getElementById('messagesList');const connectBtn = document.getElementById('connectBtn');const disconnectBtn = document.getElementById('disconnectBtn');const refreshBtn = document.getElementById('refreshBtn');const clearBtn = document.getElementById('clearBtn');// 更新连接状态显示function updateStatus(connected) {if (connected) {statusEl.textContent = '已连接';statusEl.className = 'status connected';} else {statusEl.textContent = '未连接';statusEl.className = 'status disconnected';}}// 连接SSEfunction connectSSE() {if (eventSource) {eventSource.close();}// 注意:这里需要根据实际部署路径调整URLconst ssePath = 'http://localhost:8080/irs/sse/msg?token=Bearer eyJhbGciOiJIUzUxMiJ9.eyJsb2dpbl9jbGllbnRfa2V5IjoiNTkzNjM4MjUwNDA5MTQwOTU5IiwibG9naW5fdXNlcl9uYW1lX2tleSI6ImFkbWluIiwibG9naW5fdXNlcl9pZF9rZXkiOiIxIiwibG9naW5fcGhvbmVfa2V5IjoiMTU4ODg4ODg4ODgiLCJsb2dpbl91c2VyX2tleSI6IjZhYzkxOTZkOGZmYTQwMzY5MjQ2MjI0OTVkYjMyZDg5In0.36nnmmeN8LFDJpq9O1eiM3hqg0A7-GAPPU50fAt7bj2aL94EVr1xUJvhMpNGHsYIsFE3jLX9SdOWvmT7pHqRkA'; // 对应配置中的 ${sse.path}eventSource = new EventSource(ssePath);eventSource.onopen = function(event) {console.log('SSE连接已建立',event);updateStatus(true);loadHistoryMessages();};eventSource.addEventListener('unreadCount', function(event) {const unreadCount = event.data;console.log('未读消息数量:', unreadCount);// 在这里处理未读消息数量,比如更新页面上的徽章});eventSource.onerror = function(event) {console.error('SSE连接出错:', event);updateStatus(false);};// 监听消息事件eventSource.addEventListener('message', function(event) {const messageData = event.data;console.log("dddd",event);displayMessage({id: Date.now(),title: '新消息',content: messageData,time: new Date().toLocaleString(),unread: true});});// 监听注释事件(连接确认)eventSource.addEventListener('comment', function(event) {if (event.data === 'connected') {console.log('SSE连接成功确认');} else if (event.data === 'disconnected') {console.log('SSE连接已断开');updateStatus(false);}});}// 断开SSE连接function disconnectSSE() {if (eventSource) {eventSource.close();eventSource = null;updateStatus(false);// 调用后端断开连接接口fetch('http://localhost:8080/irs/sse/close', {method: 'GET'}).then(response => {if (response.ok) {console.log('服务端连接已关闭');}}).catch(error => {console.error('关闭连接时出错:', error);});}}// 加载历史消息(模拟实现,实际应调用后端API获取历史数据)function loadHistoryMessages() {// 在实际应用中,这里应该调用后端API获取历史消息// 示例数据const historyMessages = [{id: 1,title: '系统通知',content: '欢迎使用站内信系统!',time: '2023-10-01 09:00:00',unread: false},{id: 2,title: '重要提醒',content: '请及时查看最新公告',time: '2023-10-01 10:30:00',unread: false}];historyMessages.forEach(msg => {displayMessage(msg);});}// 显示消息function displayMessage(message) {messages.push(message);const messageEl = document.createElement('div');messageEl.className = `message-container ${message.unread ? 'unread' : ''}`;messageEl.innerHTML = `<div class="message-header"><span class="message-title">${message.title}</span><span class="message-time">${message.time}</span></div><div class="message-content">${message.content}</div>`;messagesListEl.insertBefore(messageEl, messagesListEl.firstChild);}// 清空消息列表function clearMessages() {messagesListEl.innerHTML = '';messages = [];}// 事件绑定connectBtn.addEventListener('click', connectSSE);disconnectBtn.addEventListener('click', disconnectSSE);refreshBtn.addEventListener('click', loadHistoryMessages);clearBtn.addEventListener('click', clearMessages);// 页面加载完成后自动连接window.addEventListener('load', function() {connectSSE();});// 页面关闭前断开连接window.addEventListener('beforeunload', function() {disconnectSSE();});</script>
</body>
</html>

后端实现

建立站内信记录表

CREATE TABLE "public"."sys_message" ("message_id" int8 NOT NULL,"title" varchar(255) COLLATE "pg_catalog"."default","business_type" varchar(255) COLLATE "pg_catalog"."default","content" text COLLATE "pg_catalog"."default","sender_id" int8,"sender_name" varchar(255) COLLATE "pg_catalog"."default","receiver_id" int8,"receiver_name" varchar(255) COLLATE "pg_catalog"."default","is_read" bool,"read_time" timestamp(6),"create_time" timestamp(6),"update_time" timestamp(6),"tenant_id" varchar(64) COLLATE "pg_catalog"."default" DEFAULT '000000'::character varying,CONSTRAINT "sys_message_pkey" PRIMARY KEY ("message_id")
)
;ALTER TABLE "public"."sys_message" OWNER TO "postgres";COMMENT ON COLUMN "public"."sys_message"."message_id" IS '主键';COMMENT ON COLUMN "public"."sys_message"."title" IS '标题';COMMENT ON COLUMN "public"."sys_message"."business_type" IS '业务分类【服务消息|系统消息|预警消息】';COMMENT ON COLUMN "public"."sys_message"."content" IS '站内信内容';COMMENT ON COLUMN "public"."sys_message"."sender_id" IS '站内信发送者Id';COMMENT ON COLUMN "public"."sys_message"."sender_name" IS '发送者名称';COMMENT ON COLUMN "public"."sys_message"."receiver_id" IS '站内信接收者Id';COMMENT ON COLUMN "public"."sys_message"."receiver_name" IS '接受者名称';COMMENT ON COLUMN "public"."sys_message"."is_read" IS 'true=已读';COMMENT ON COLUMN "public"."sys_message"."read_time" IS '站内信阅读时间';COMMENT ON COLUMN "public"."sys_message"."create_time" IS '站内信生产时间';COMMENT ON COLUMN "public"."sys_message"."update_time" IS '排序时间,默认就是生产时间';COMMENT ON COLUMN "public"."sys_message"."tenant_id" IS '租户Id';

实体类

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;import java.time.LocalDateTime;/** 站内信息* 万里悲秋常作客,百年多病独登台* @author : makeJava*/
@Data
@TableName("sys_message")
public class SysMessage {// 站内信消息Id@TableId(value = "message_id", type = IdType.ASSIGN_ID)@JsonFormat(shape = JsonFormat.Shape.STRING)private Long messageId;// 站内信标题private String title;// 业务分类【服务消息|系统消息|预警消息】private String businessType;// 站内信内容private String content;// 站内信发送者Id {如果发送者Id为-1就是所有人都能搜到}private Long senderId;// 站内信发送者名称private String senderName;// 站内信接收者Idprivate Long receiverId;// 站内信接收者名称private String receiverName;// true=已读private Boolean isRead;// 站内信阅读时间@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm")private LocalDateTime readTime;// 站内信创建时间@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm")private LocalDateTime createTime;// 排序使用时间@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm")private LocalDateTime updateTime;// 租户隔离private String tenantId;
}

Mapper层

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.dcqc.irs.system.domain.SysMessage;
import org.apache.ibatis.annotations.Mapper;/*** 万里悲秋常作客,百年多病独登台* @author : makeJava*/
@Mapper
public interface SysMessageMapper extends BaseMapper<SysMessage> {
}

Service层

import com.baomidou.mybatisplus.extension.service.IService;
import com.dcqc.irs.system.domain.SysMessage;/*** 万里悲秋常作客,百年多病独登台* @author : makeJava*/
public interface SysMessageService extends IService<SysMessage> {long countReadMessage(Long userId,String tenantId);
}/*** 万里悲秋常作客,百年多病独登台* @author : makeJava*/
@Service
@Slf4j
public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMessage> implements SysMessageService {@Resourceprivate SysMessageMapper sysMessageMapper;/*** 接收系统---站内信消息* @param event event*/@EventListenerpublic void onSysMessageEvent(SysMessageEvent event) {log.info("接收到系统消息: {}", event.getTitle());// 创建消息SysMessage sysMessage = new SysMessage();sysMessage.setMessageId(event.getMessageId());sysMessage.setTitle(event.getTitle());sysMessage.setContent(event.getContent());sysMessage.setSenderId(event.getSenderId());sysMessage.setSenderName(event.getSenderName());sysMessage.setReceiverId(event.getReceiverId());sysMessage.setReceiverName(event.getReceiverName());sysMessage.setTenantId(event.getTenantId());sysMessage.setIsRead(false);sysMessage.setBusinessType(event.getBusinessType());sysMessage.setCreateTime(LocalDateTime.now());sysMessage.setUpdateTime(LocalDateTime.now());if (sysMessage.getMessageId() == null) {sysMessage.setMessageId(IdWorker.getId());}save(sysMessage);}/*** 统计未读消息* @param userId userId* @param tenantId tenantId* @return long*/@Overridepublic long countReadMessage(Long userId, String tenantId) {return sysMessageMapper.selectCount(Wrappers.lambdaQuery(SysMessage.class).eq(SysMessage::getReceiverId, userId).eq(SysMessage::getTenantId, tenantId).eq(SysMessage::getIsRead, false));}
}

Controller层

/*** 站内信* 万里悲秋常作客,百年多病独登台* @author : makeJava*/
@RestController("/sysMessage")
@RequestMapping("/common")
public class SysMessageController {@Resourceprivate SysMessageService sysMessageService;@Resourceprivate SseEmitterManager sseEmitterManager;/*** 历史消息*/@RequestMapping(value = "/history", method = RequestMethod.GET)public TableDataInfo historyMessage(@RequestParam(value = "pageNum", required = false, defaultValue = "1") Integer pageNum,@RequestParam(value = "keywords", required = false) String keywords,@RequestParam(value = "businessType", required = false) String businessType,@RequestParam(value = "isRead", required = false) Boolean isRead,@RequestParam(value = "pageSize", required = false, defaultValue = "50") Integer pageSize) {Long userId = SecurityUtils.getUserId();if (userId == null) {TableDataInfo rspData = new TableDataInfo();rspData.setCode(200);rspData.setMsg("查询成功");rspData.setSuccess(true);return rspData;}String tenantId = SecurityUtils.getTenantId();Page<SysMessage> sysMessagePage = sysMessageService.page(new Page<>(pageNum, pageSize), Wrappers.lambdaQuery(SysMessage.class)// 非超管看自己的站内信{超管可以看所有的站内信}.eq(!SecurityUtils.isAdmin(userId), SysMessage::getReceiverId, userId).eq(SysMessage::getTenantId, tenantId)// 关键字搜索.like(StrUtil.isNotBlank(keywords), SysMessage::getTitle, keywords)// 已读未读搜索.eq(isRead != null, SysMessage::getIsRead, isRead)// 业务分类搜索.eq(StrUtil.isNotBlank(businessType), SysMessage::getBusinessType, businessType)// 时间倒序.orderByDesc(SysMessage::getUpdateTime));long total = sysMessagePage.getTotal();TableDataInfo rspData = new TableDataInfo(sysMessagePage.getRecords(), (int) total);rspData.setCode(200);rspData.setMsg("查询成功");rspData.setSuccess(true);return rspData;}/*** 阅读*/@RequestMapping(value = "/readMessage", method = RequestMethod.GET)public R<List<SysMessage>> readMessage(@RequestParam(value = "messageId") String messageId) {Long userId = SecurityUtils.getUserId();String tenantId = SecurityUtils.getTenantId();if (userId == null) {return R.fail(null, "用户不存在");}sysMessageService.update(Wrappers.lambdaUpdate(SysMessage.class).set(SysMessage::getIsRead, true).set(SysMessage::getReadTime, LocalDateTime.now()).eq(SysMessage::getMessageId, Long.valueOf(messageId)));// 更新设置推送long unreadCount = sysMessageService.countReadMessage(userId, tenantId);// 重新推送未读数量sseEmitterManager.sendMessage(userId + ":" + tenantId, unreadCount);return R.ok();}/*** 查询*/@RequestMapping(value = "/queryById", method = RequestMethod.GET)public R<SysMessage> queryById(@RequestParam(value = "messageId") String messageId) {Long userId = SecurityUtils.getUserId();if (userId == null) {return R.fail(null, "用户不存在");}// Pg库必须要类型一致SysMessage sysMessage = sysMessageService.getById(Long.valueOf(messageId));return R.ok(sysMessage);}/*** 删除*/@RequestMapping(value = "/removeMessage", method = RequestMethod.GET)public R<List<SysMessage>> removeMessage(@RequestParam(value = "messageId") String messageId) {Long userId = SecurityUtils.getUserId();if (userId == null) {return R.fail(null, "用户不存在");}String tenantId = SecurityUtils.getTenantId();sysMessageService.remove(Wrappers.lambdaUpdate(SysMessage.class).eq(SysMessage::getMessageId, Long.valueOf(messageId)));long unreadCount = sysMessageService.countReadMessage(userId, tenantId);// 重新推送未读数量sseEmitterManager.sendMessage(userId + ":" + tenantId, unreadCount);return R.ok();}/*** 置顶*/@RequestMapping(value = "/topToOne", method = RequestMethod.GET)public R<List<SysMessage>> topToOne(@RequestParam(value = "messageId") String messageId) {Long userId = SecurityUtils.getUserId();if (userId == null) {return R.fail(null, "用户不存在");}sysMessageService.update(Wrappers.lambdaUpdate(SysMessage.class).set(SysMessage::getUpdateTime, LocalDateTime.now()).eq(SysMessage::getMessageId, Long.valueOf(messageId)));return R.ok();}
}

接下来就是SSE的相关内容了

在这里插入图片描述

SSE开关和路径配置

/*** SSE 配置项** @author Lion Li*/
@Data
@ConfigurationProperties("sse.sys-message")
public class SseProperties {private Boolean enabled;/*** 路径*/private String path;
}

SSE连接管理器

/*** 管理 Server-Sent Events (SSE) 连接** @author Lion Li*/
@Slf4j
public class SseEmitterManager {@ResourceRedisTemplate<String, Object> redisTemplate;/*** 订阅的频道*/private final static String SSE_TOPIC = "global:message:sse";private final static Map<String, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();/*** 建立与指定用户的 SSE 连接** @param userIdAndTenantId 用户的唯一标识符,用于区分不同用户的连接* @param token  用户的唯一令牌,用于识别具体的连接* @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件*/public SseEmitter connect(String userIdAndTenantId, String token) {// 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap)// 每个用户可以有多个 SSE 连接,通过 token 进行区分Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userIdAndTenantId, k -> new ConcurrentHashMap<>());// 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞SseEmitter emitter = new SseEmitter(86400000L);emitters.put(token, emitter);// 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 tokenemitter.onCompletion(() -> {SseEmitter remove = emitters.remove(token);if (remove != null) {remove.complete();}});emitter.onTimeout(() -> {SseEmitter remove = emitters.remove(token);if (remove != null) {remove.complete();}});emitter.onError((e) -> {SseEmitter remove = emitters.remove(token);if (remove != null) {remove.complete();}});try {// 向客户端发送一条连接成功的事件emitter.send(SseEmitter.event().comment("connected"));} catch (IOException e) {// 如果发送消息失败,则从映射表中移除 emitteremitters.remove(token);}return emitter;}/*** 断开指定用户的 SSE 连接** @param userIdAndTenantId 用户的唯一标识符,用于区分不同用户的连接* @param token  用户的唯一令牌,用于识别具体的连接*/public void disconnect(String userIdAndTenantId, String token) {if (userIdAndTenantId == null || token == null) {return;}Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userIdAndTenantId);if (MapUtil.isNotEmpty(emitters)) {try {SseEmitter sseEmitter = emitters.get(token);sseEmitter.send(SseEmitter.event().comment("disconnected"));sseEmitter.complete();} catch (Exception ignore) {}emitters.remove(token);} else {USER_TOKEN_EMITTERS.remove(userIdAndTenantId);}}/*** 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息** @param consumer 处理SSE消息的消费者函数*/public void subscribeMessage(Consumer<SseMessageDto> consumer) {// 使用RedisTemplate实现订阅逻辑redisTemplate.execute(connection -> {connection.subscribe((message, pattern) -> {try {// 反序列化消息String body = new String(message.getBody());// 添加数据格式检查if (body.startsWith("[")) {log.warn("接收到意外的数组格式数据: {}", body);// 如果确实是数组格式,可以选择处理第一个元素或跳过JSONArray array = JSONUtil.parseArray(body);if (!array.isEmpty()) {SseMessageDto sseMessage = JSONUtil.toBean(array.getJSONObject(0), SseMessageDto.class);consumer.accept(sseMessage);}return;}SseMessageDto sseMessage = JSONUtil.toBean(body, SseMessageDto.class);// 执行消费逻辑consumer.accept(sseMessage);} catch (Exception e) {log.error("处理SSE订阅消息异常", e);}}, SSE_TOPIC.getBytes());return null;}, true);}/*** 向指定的用户会话发送消息** @param userIdAndTenantId  要发送消息的用户id* @param message 要发送的消息内容*/public void sendMessage(String userIdAndTenantId, String message) {Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userIdAndTenantId);if (MapUtil.isNotEmpty(emitters)) {for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {try {SseEmitter sseEmitter = entry.getValue();sseEmitter.send(SseEmitter.event().name("message").data(message));} catch (Exception e) {SseEmitter remove = emitters.remove(entry.getKey());if (remove != null) {remove.complete();}}}} else {USER_TOKEN_EMITTERS.remove(userIdAndTenantId);}}/*** 推送未读数量* @param userIdAndTenantId userId+租户Id* @param unreadCount  未读 数量*/public void sendMessage(String userIdAndTenantId, Long unreadCount) {Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userIdAndTenantId);if (MapUtil.isNotEmpty(emitters)) {for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {try {SseEmitter sseEmitter = entry.getValue();sseEmitter.send(SseEmitter.event().name("unreadCount").data(unreadCount < 0 ? 0 : unreadCount));} catch (Exception e) {SseEmitter remove = emitters.remove(entry.getKey());if (remove != null) {remove.complete();}}}} else {USER_TOKEN_EMITTERS.remove(userIdAndTenantId);}}/*** 本机全用户会话发送消息** @param message 要发送的消息内容*/public void sendMessage(String message) {for (String userIdAndTenantId : USER_TOKEN_EMITTERS.keySet()) {sendMessage(userIdAndTenantId, message);}}/*** 发布SSE订阅消息** @param sseMessageDto 要发布的SSE消息对象*/public void publishMessage(SseMessageDto sseMessageDto) {SseMessageDto broadcastMessage = new SseMessageDto();broadcastMessage.setMessage(sseMessageDto.getMessage());broadcastMessage.setUserIds(sseMessageDto.getUserIds());// 使用RedisTemplate发布消息redisTemplate.convertAndSend(SSE_TOPIC, broadcastMessage);log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());}/*** 向所有的用户发布订阅的消息(群发)** @param message 要发布的消息内容*/public void publishAll(String message) {SseMessageDto broadcastMessage = new SseMessageDto();broadcastMessage.setMessage(message);// 使用RedisTemplate发布消息redisTemplate.convertAndSend(SSE_TOPIC, broadcastMessage);log.info("向所有的用户发布订阅的消息:{} session keys:{} message:{}",SSE_TOPIC, "all", message);}
}

SSE的监听器

/*** SSE 主题订阅监听器** @author Lion Li*/
@Slf4j
public class SseTopicListener implements ApplicationRunner, Ordered {@Autowiredprivate SseEmitterManager sseEmitterManager;/*** 在Spring Boot应用程序启动时初始化SSE主题订阅监听器** @param args 应用程序参数* @throws Exception 初始化过程中可能抛出的异常*/@Overridepublic void run(ApplicationArguments args) throws Exception {sseEmitterManager.subscribeMessage((message) -> {log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());// 如果key不为空就按照key发消息 如果为空就群发if (CollUtil.isNotEmpty(message.getUserIds())) {message.getUserIds().forEach(key -> {sseEmitterManager.sendMessage(key.getUserId() + ":" + key.getTenantId(), message.getMessage());});} else {sseEmitterManager.sendMessage(message.getMessage());}});log.info("初始化SSE主题订阅监听器成功");}@Overridepublic int getOrder() {return -1;}
}

SSE收发消息的工具类

/*** SSE工具类** @author Lion Li*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class SseMessageUtils {private final static Boolean SSE_ENABLE = SpringUtil.getApplicationContext().getEnvironment().getProperty("sse.sys-message.enabled", Boolean.class, true);/*** SSE管理器*/private static SseEmitterManager MANAGER;static {if (isEnable() && MANAGER == null) {MANAGER = SpringUtil.getBean(SseEmitterManager.class);}}/*** 向指定的SSE会话发送消息** @param userId  要发送消息的用户id* @param message 要发送的消息内容*/public static void sendMessage(Long userId, String tenantId, String message) {if (!isEnable()) {return;}MANAGER.sendMessage(userId + ":" + tenantId, message);}/*** 本机全用户会话发送消息** @param message 要发送的消息内容*/public static void sendMessage(String message) {if (!isEnable()) {return;}MANAGER.sendMessage(message);}/*** 发布SSE订阅消息** @param sseMessageDto 要发布的SSE消息对象*/public static void publishMessage(SseMessageDto sseMessageDto) {if (!isEnable()) {return;}MANAGER.publishMessage(sseMessageDto);}/*** 向所有的用户发布订阅的消息(群发)** @param message 要发布的消息内容*/public static void publishAll(String message) {if (!isEnable()) {return;}MANAGER.publishAll(message);}/*** 是否开启*/public static Boolean isEnable() {return SSE_ENABLE;}}

SSE手动装配

/*** SSE 自动装配** @author Lion Li*/
@Configuration
@ConditionalOnProperty(value = "sse.sys-message.enabled", havingValue = "true")
@EnableConfigurationProperties(SseProperties.class)
public class SseAutoConfiguration {@Beanpublic SseEmitterManager sseEmitterManager() {return new SseEmitterManager();}@Beanpublic SseTopicListener sseTopicListener() {return new SseTopicListener();}}

SSE接口层

/*** SSE 控制器** @author Lion Li*/
@RestController
@ConditionalOnProperty(value = "sse.sys-message.enabled", havingValue = "true")
@RequiredArgsConstructor
@Slf4j
public class SseController implements DisposableBean {private final SseEmitterManager sseEmitterManager;private final ApplicationEventPublisher applicationEventPublisher;private final ISysUserService iSysUserService;private final TokenService tokenService;private final RedisCache redisCache;private final SysMessageService sysMessageService;/*** 建立 SSE 连接*/@GetMapping(value = "${sse.sys-message.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)@Anonymouspublic SseEmitter connect(@RequestParam("token") String token) {LoginUser loginUser = getLoginUser(token);if (loginUser == null) {return null;}// 获取认证信息Token值{基于SpringSecurity}的SecurityContextHolderLong userId = loginUser.getUserId();String tenantId = loginUser.getTenantId();// 租户隔离SseEmitter emitter = sseEmitterManager.connect(userId + ":" + tenantId, token);// 连接建立后立即发送未读消息数量if (emitter != null) {sendUnReadCount(userId, tenantId, emitter);}return emitter;}/*** 发送未读消息数量*/public void sendUnReadCount(Long userId, String tenantId, SseEmitter emitter) {try {// TO-DO: 需要获取用户未读消息数量的逻辑long unreadCount = sysMessageService.countReadMessage(userId, tenantId);emitter.send(SseEmitter.event().name("unreadCount").data(unreadCount));} catch (Exception e) {// 处理发送失败的情况log.error("发送失败: {}", e.getMessage(), e);}}public LoginUser getLoginUser(String tokenGet) {String token = tokenGet;if (StringUtils.isNotEmpty(tokenGet) && tokenGet.startsWith("Bearer ")) {token = tokenGet.replace("Bearer ", "");}Object obj = this.redisCache.getCacheObject(token);if (Objects.nonNull(obj)) {return (LoginUser) obj;} else {LoginUser user = tokenService.getLoginUserByToken(token);this.redisCache.setCacheObject(token, user, 1, TimeUnit.HOURS);return user;}}/*** 关闭 SSE 连接*/@Anonymous@GetMapping(value = "${sse.sys-message.path}/close")public R<Void> close(@RequestParam("token") String token) {LoginUser loginUser = getLoginUser(token);if (loginUser == null) {return null;}Long userId = loginUser.getUserId();String tenantId = loginUser.getTenantId();sseEmitterManager.disconnect(userId + ":" + tenantId, token);return R.ok();}// 以下为demo仅供参考 禁止使用 请在业务逻辑中使用工具发送而不是用接口发送/*** 向特定用户发送消息** @param userId 目标用户的 ID* @param msg    要发送的消息内容*/@GetMapping(value = "${sse.sys-message.path}/send")public R<Void> send(Long userId, String tenantId, String msg, String title) {// 发送给指定用户long unreadCount = sysMessageService.countReadMessage(userId, tenantId);SysUser sysUser = iSysUserService.selectUserById(userId);SysMessageEvent event = new SysMessageEvent(IdWorker.getId(),title, msg, SecurityUtils.getUserId(),SecurityUtils.getUsername(),userId, sysUser.getUserName(), "系统消息", false, tenantId, unreadCount + 1);// 消息体SseMessageDto dto = new SseMessageDto();dto.setUserIds(ListUtil.of(new SseMessageDto.UserAndTenant(userId, tenantId)));dto.setMessage(JSONUtil.toJsonStr(event));sseEmitterManager.publishMessage(dto);sseEmitterManager.sendMessage(userId + ":" + tenantId, unreadCount + 1);// 发送系统消息--记录historyapplicationEventPublisher.publishEvent(event);return R.ok();}/*** 向部分所有用户发送消息** @param sendMessageRequest 指定的某部分要发送的消息内容*/@PostMapping(value = "${sse.sys-message.path}/sendSysMessage")public R<Void> sendSysMessage(@RequestBody SendMessageRequest sendMessageRequest) {List<SendMessageRequest.UserAndTenant> userIds = sendMessageRequest.getUserId();for (SendMessageRequest.UserAndTenant item : userIds) {// 发送给指定用户long unreadCount = sysMessageService.countReadMessage(item.getUserId(), item.getTenantId());SysUser sysUser = iSysUserService.selectUserById(item.getUserId());SysMessageEvent event = new SysMessageEvent(IdWorker.getId(), sendMessageRequest.getTitle(),sendMessageRequest.getMsg(), SecurityUtils.getUserId(),SecurityUtils.getUsername(), item.getUserId(), sysUser.getUserName(),sendMessageRequest.getBusinessType(), false, item.getTenantId(), unreadCount + 1);// 消息体SseMessageDto dto = new SseMessageDto();dto.setUserIds(ListUtil.of(new SseMessageDto.UserAndTenant(item.getUserId(), item.getTenantId())));dto.setMessage(JSONUtil.toJsonStr(event));sseEmitterManager.publishMessage(dto);sseEmitterManager.sendMessage(item.getUserId() + ":" + item.getTenantId(), unreadCount + 1);// 发送系统消息--记录historyapplicationEventPublisher.publishEvent(event);}return R.ok();}/*** 向所有用户发送消息** @param msg 要发送的消息内容*/@GetMapping(value = "${sse.sys-message.path}/sendAll")public R<Integer> send(String msg, String title, String businessType) {SysMessageEvent event = new SysMessageEvent(IdWorker.getId(), title, msg, SecurityUtils.getUserId(), SecurityUtils.getUsername(),null, null, "系统消息", false, null, 1);sseEmitterManager.publishAll(JSONUtil.toJsonStr(event));Long userId = SecurityUtils.getUserId();String username = SecurityUtils.getUsername();// 全体用户使用批处理List<SysUser> sysUsers = iSysUserService.selectUserList(new SysUser());int index = 0;List<SysMessage> cacheListBatch = new ArrayList<>();for (SysUser sysUser : sysUsers) {index++;if (index / 100 == 0) {sysMessageService.saveBatch(cacheListBatch);cacheListBatch.clear();}SysMessage sysMessage = new SysMessage();sysMessage.setTitle(title);sysMessage.setContent(msg);sysMessage.setSenderId(userId);sysMessage.setSenderName(username);sysMessage.setReceiverId(sysUser.getUserId());sysMessage.setTenantId(sysUser.getTenantId());sysMessage.setReceiverName(sysUser.getUserName());sysMessage.setIsRead(false);sysMessage.setBusinessType(StringUtils.isEmpty(businessType) ? "系统消息" : businessType);sysMessage.setCreateTime(LocalDateTime.now());sysMessage.setUpdateTime(LocalDateTime.now());sysMessage.setMessageId(IdWorker.getId());cacheListBatch.add(sysMessage);}if (!cacheListBatch.isEmpty()) {sysMessageService.saveBatch(cacheListBatch);cacheListBatch.clear();}return R.ok(sysUsers.size());}/*** 清理资源。此方法目前不执行任何操作,但避免因未实现而导致错误*/@Overridepublic void destroy() throws Exception {// 销毁时不需要做什么 此方法避免无用操作报错}}

实体对象

/*** 万里悲秋常作客,百年多病独登台* @author : makeJava*/
@Data
public class SendMessageRequest {// 用户Idprivate List<UserAndTenant> userId;// 用户消息private String msg;// 站内信标题private String title;// 业务分类【服务消息|系统消息|预警消息】private String businessType;@Datastaticpublic class UserAndTenant {private Long userId;private String tenantId;}
}@Data
public class SseMessageDto implements Serializable {private static final long serialVersionUID = 1L;/*** 需要推送到的session key 列表*/private List<UserAndTenant> userIds;/*** 需要发送的消息*/private String message;/*** 未读数量*/private long unreadCount;@Datastatic@AllArgsConstructorpublic class UserAndTenant {private Long userId;private String tenantId;}
}

效果

在这里插入图片描述

模拟给他发一条消息

在这里插入图片描述

检查SSE的连接客户端收到

在这里插入图片描述

断开连接

在这里插入图片描述

F12查看SSE长连接

在这里插入图片描述

SSE搭建的站内信结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/965592.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入解析:【具身智能】具身机器人VLA算法入门及实战(三):VLA经典模型架构

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

长连接和短连接

目录长连接和短连接概念总结与类比TCP和UDPHTTP的长短连接HTTP 连接行为的演进1️⃣ 阶段一:HTTP/1.0 —— “用完即断”(Designed to Disconnect)2️⃣ 阶段二:HTTP/1.1 —— 引入 Keep-Alive(延迟断开)长连接/…

助力V2G,米尔SECC GreenPHY实战开发

随着电动汽车与电网双向交互(V2G)技术的快速发展,充电桩与车辆间的高效通信成为实现智能能源管理的关键。SECC作为充电桩的通信控制核心,其与电力线载波通信芯片的适配尤为重要。本文将分享基于米尔核心板,调试联…

洛谷题单指南-组合数学与计数-P1287 盒子与球

原题链接:https://www.luogu.com.cn/problem/P1287 题意解读:n个不同的球放入r个不同盒子的方案数 解题思路: 1、第二类斯特林数 用于解决n个不同的球放入r个相同盒子的方案数,不同盒子乘上r!即可。 可以用递推实现…

2025 年最新推荐铝板厂家排行榜,涵盖 5052/6061/7075 铝板及纯铝板/高纯铝板优质供应商精选

引言 铝板作为工业制造、建筑装饰、交通运输等领域的核心材料,其轻量化、耐腐蚀、易加工的特性备受青睐,但当前市场厂家资质参差不齐,采购痛点日益凸显。部分小型厂家采用劣质铝锭生产,导致产品纯度不足、强度不达…

Netty和Tomcat

Netty和Tomcat特性 Netty Tomcat定位 网络编程框架 Servlet容器 / Web服务器核心模型 异步事件驱动 基于Servlet规范的请求-响应协议支持 灵活。可构建HTTP、WebSocket及任何自定义协议。 固定。主要面向HTTP/1.x, HTT…

2025 年最新推荐铝管厂家权威排行榜:无缝铝管/合金铝管/6061/2A12 铝管优质企业综合测评推荐

在工业制造、建筑装饰、交通运输等核心领域,铝管凭借轻质、耐腐蚀、导热性佳的核心优势,成为全球制造业不可或缺的基础材料。然而当前市场品牌繁杂,产品质量参差不齐,采购者常面临选型难、品控无保障、交付周期长等…

【计算机、信息技术、电子、人工智能等均可投】第二届图像、信号处理与通信技术国际学术会议(ISPCT 2025)

征稿范围广→计算机、信息技术、电子、人工智能、光学等均可投 第二届图像、信号处理与通信技术国际学术会议(ISPCT 2025) 2025 2nd International Conference on Image, Signal Processing and Communication Techn…

2025 年微矩形 /圆形/矩形电连接器厂家最新推荐排行榜,涵盖 MDC/ZMDM/Y50X 等系列优质品牌精选

引言 在工业制造、通信技术、军工信息化等领域高速发展的当下,电连接器作为信号传输的核心部件,其品质稳定性、技术适配性直接影响终端设备的运行效率与安全性能。当前连接器市场品牌繁杂,部分厂商存在原材料劣质、…

2025 年 11 月铝合金门窗厂家推荐排行榜,断桥门窗,系统门窗,金属门窗,阳台门窗,平开推拉折叠门窗公司精选

2025 年 11 月铝合金门窗厂家推荐排行榜:断桥门窗、系统门窗、金属门窗、阳台门窗、平开推拉折叠门窗公司精选 行业背景与发展趋势 随着建筑节能标准的不断提升和消费者对居住环境品质要求的日益增长,铝合金门窗行业…

2025 年 11 月电动调节阀厂家推荐排行榜,西门子/霍尼韦尔/鲁泽节能,比例阀/蒸汽温控阀/二通阀/阀执行器公司精选

2025年11月电动调节阀厂家推荐排行榜,西门子/霍尼韦尔/鲁泽节能,比例阀/蒸汽温控阀/二通阀/阀执行器公司精选 行业背景与发展趋势 随着工业自动化和智能楼宇建设的快速发展,电动调节阀作为过程控制系统的关键执行元…

P9902 『PG2』模拟最大流 题解

Sol 模拟最大流的一般套路就是求最小割。 题目保证了 \(u<v\),所以我们可以得到如下暴力: 设 \(f_{i,S}\) 表示前 \(i\) 个点,从 \(1\) 能到集合 \(S\) 中的点,割掉的最小边权。那么转移有: \[f_{i,S} \to f_{…

2025 年 11 月蒸汽调节阀厂家推荐排行榜,上海鲁泽/西门子/霍尼韦尔蒸汽调节阀,西门子蒸汽比例调节阀,蒸汽温控阀公司推荐

2025 年蒸汽调节阀行业深度分析与厂家推荐指南 随着工业自动化水平的不断提升,蒸汽调节阀作为流程工业与暖通系统中的关键控制设备,其技术性能与可靠性直接影响着整个系统的运行效率与能耗水平。蒸汽调节阀、蒸汽比例…

2025年自动钢筋弯曲生产厂家权威推荐榜单:钢筋自动弯曲/数控式钢筋弯曲中心/钢筋自动弯曲中心源头厂家精选

在建筑业工业化与智能化浪潮的推动下,自动钢筋弯曲设备正以其高效率、高精度和低人工成本的优势,逐步替代传统加工作业模式。 据QYResearch调研统计,2031年全球钢筋弯曲机市场销售额预计将达到38.5亿元,2025年至20…

C++ 进阶知识点详细教程 - 第3部分

C++ 进阶知识点详细教程 - 第3部分 11. 搜索算法 11.1 深度优先搜索(DFS) 11.1.1 基本概念 DFS是一种递归的搜索算法,沿着一条路径深入到底,然后回溯。 基本模板 void dfs(int state) {// 1. 终止条件if (满足条件…

SQL 中 SELECT 查询语句知识点

SQL 中 SELECT 查询语句知识点介绍SELECT的基础使用SELECT 的完整语法 SELECT 列 FROM 表 WHERE 过滤条件 GROUP BY 分组列 聚合函数 HAVING 分组过滤条件 ORDER BY ASC|DESC SELECT 的执行顺序先执行 FROM 确定…

2025 年 11 月毛刷辊厂家推荐排行榜,工业毛刷辊,定做毛刷辊,清洁毛刷辊,纺织毛刷辊,钢制毛刷辊公司精选

2025年11月毛刷辊厂家推荐排行榜:工业毛刷辊、定做毛刷辊、清洁毛刷辊、纺织毛刷辊、钢制毛刷辊公司精选 行业背景与发展现状 毛刷辊作为工业生产中不可或缺的关键部件,在表面处理、清洁除尘、纺织整理、材料输送等领…

弧焊工业机械手混合气体实用方法

这场变革的核心在于提升生产效率的同时,如何更好地减少资源浪费并提高焊接品质。在这一背景下,WGFACS弧焊气体节约系统的引入,无疑为行业带来了一项革命性的突破。该系统通过智能化的实时监控和动态控制,确保了焊接…

2025 年 11 月合肥搬家公司推荐排行榜,合肥正规搬家公司,合肥市搬家公司,包河区搬家公司,蜀山区搬家公司,专业高效与贴心服务口碑之选

2025 年 11 月合肥搬家公司推荐排行榜,合肥正规搬家公司,合肥市搬家公司,包河区搬家公司,蜀山区搬家公司,专业高效与贴心服务口碑之选 随着合肥城市化进程的加速推进,人口流动性和企事业单位搬迁需求持续增长,搬…

消息队列原理和对比

主流消息队列对比 消息队列是一种重要的分布式系统组件,可用于异步通信、削峰填谷、解耦系统、数据缓存等多个方面。在选择消息队列时,需要考虑诸多因素,包括性能、可靠性、可用性、扩展性、可维护性、社区支持等等…