SpringBoot + SSE 实时异步流式推送

前言

在当今数字化时代,实时数据处理对于企业的决策和运营至关重要。许多业务场景需要及时响应数据库中的数据变化,例如电商平台实时更新库存、金融系统实时监控交易数据等。

本文将详细介绍如何通过Debezium捕获数据库变更事件,并利用Server - Sent Events(SSE)将这些变更实时推送给前端应用。

技术背景

+----------------+          +----------------+          +----------------+  
|   MySQL 数据库  | 监听变更  |  SpringBoot 服务  |  推送变更  |    Web 前端     |  
|  (Binlog 模式)  | ------>  | (Debezium CDC) | ------>  | (EventSource)  |  
+----------------+          +----------------+          +----------------+  
  • Debezium 是一个开源的分布式平台,它能够监控数据库的变化,并将这些变化以事件流的形式发送出去。它支持多种数据库,如 MySQL、PostgreSQL 等,通过模拟数据库的复制协议来实现对数据库变更的实时捕获。

  • Server - Sent Events(SSE)是一种允许网页自动获取服务器推送更新的技术。它基于 HTTP 协议,通过一个单向的连接,服务器可以持续向客户端发送事件流数据,非常适合实时数据推送的场景。

环境准备

MySQL 配置
-- 启用 Binlog(ROW 模式)  
SET GLOBAL log_bin = ON;  
SET GLOBAL binlog_format = 'ROW';  -- 创建 CDC 用户(需 REPLICATION 权限)  
CREATE USER 'cdc_user' IDENTIFIED BY 'cdc_pass';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user';  

配置要点:确保 Binlog 记录行级变更‌

引入依赖
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.6.0.Final</version>
</dependency>

核心代码实现

Debezium 监听服务
@Slf4j
@Component
public class BinlogListener {@Autowiredprivate SseService sseService;@PostConstructpublic void start() {Configuration config = Configuration.create().with("name", "mysql-connector-1").with("connector.class", "io.debezium.connector.mysql.MySqlConnector").with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore").with("offset.storage.file.filename", "D:\\usr\\debezium\\mysql-offsets.dat").with("offset.flush.interval.ms", "10000").with("database.server.name", "mysql-connector-1").with("database.hostname", "localhost").with("database.port", "3306").with("database.user", "root").with("database.password", "root").with("database.server.id", "1").with("database.include.list", "scf").with("table.include.list", "scf.user").with("include.schema.changes", "false").with("snapshot.mode", "initial").with("database.history.skip.unparseable.ddl", "true") // 忽略解析错误.with("database.connection.attempts", "5") // 最大重试次数.with("database.connection.backoff.ms", "10000") // 重试间隔 10s.with("database.history", "io.debezium.relational.history.FileDatabaseHistory").with("database.history.file.filename", "D:\\usr\\debezium\\mysql-history.dat").build();EmbeddedEngine engine = EmbeddedEngine.create().using(config).notifying(this::handleEvent).build();Executors.newSingleThreadExecutor().execute(engine::run);}private void handleEvent(SourceRecord record) {Struct value = (Struct) record.value();Struct after = value.getStruct("after");// 转换为 Map 并序列化Map<String, Object> dataMap = new HashMap<>();dataMap.put("id", after.getString("id"));dataMap.put("name", after.getString("name"));dataMap.put("age", after.getInt32("age"));sseService.broadcast(JSON.toJSONString(dataMap));}
}
SSE 推送服务
@Service  
public class SseService {  private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet();  public SseEmitter subscribe() {  SseEmitter emitter = new SseEmitter(60_000L);  emitter.onCompletion(() -> emitters.remove(emitter));  emitters.add(emitter);  return emitter;  }  public void broadcast(String data) {  emitters.forEach(emitter -> {  try {  emitter.send(SseEmitter.event()  .data(data)  .id(UUID.randomUUID().toString()));  } catch (IOException e) {  emitter.completeWithError(e);  }  });  }  
}  
控制器层
@RestController  
@RequestMapping("/sse")  
public class SseController {  @Autowired  private SseService sseService;  @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)  public SseEmitter stream() {  return sseService.subscribe();  }  
}  

前端实现

<html lang="en">
<head><meta charset="UTF-8"><title>实时数据推送测试</title>
</head>
<body>
<div id="updates"></div>
<script>const eventSource = new EventSource('/sse/stream');eventSource.onmessage = e => {const data = JSON.parse(e.data);document.getElementById('updates').innerHTML +=`<p>用户变更: ID=${data.id}, 姓名=${data.name}</p>`;};eventSource.onerror = e => console.error("SSE 错误:", e);
</script>
</body>
</html> 

测试数据变更

图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/81042.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ADS1299模拟前端(AFE)代替芯片——LHE7909

在现代医疗科技的飞速发展中&#xff0c;精确的生物电势测量设备变得越来越重要。领慧立芯推出的LHE7909&#xff0c;是一款专为心电图&#xff08;ECG&#xff09;和其他生物电势测量设计的低噪声24位模数转换器&#xff08;ADC&#xff09;&#xff0c;为医疗设备制造商提供了…

如何实现Redis和Mysql中数据双写一致性

一、引言 今天我们来聊聊一个在分布式系统中非常常见但又十分棘手的问题——Redis与MySQL之间的双写一致性。我们在项目中多多少少都遇到过类似的困扰&#xff0c;缓存是用Redis&#xff0c;数据库是用MySQL&#xff0c;但如何确保两者之间的数据一致性呢&#xff1f;接下来我…

面试篇 - Transformer前馈神经网络(FFN)使用什么激活函数?

1. FFN结构分解 原始Transformer的FFN层 FFN(x) max(0, xW₁ b₁)W₂ b₂ # 原始论文公式 输入&#xff1a;自注意力层的输出 x&#xff08;维度 d_model512&#xff09; 扩展层&#xff1a;xW₁ b₁&#xff08;扩展为 d_ff2048&#xff09; 激活函数&#xff1a;Re…

基于Python Flask的深度学习电影评论情感分析可视化系统(2.0升级版,附源码)

博主介绍&#xff1a;✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3…

前端vue2修改echarts字体为思源黑体-避免侵权-可以更换为任意字体统一管理

1.下载字体 npm install fontsource/noto-sans-sc 不知道为什么我从github上面下载的不好使&#xff0c;所以就用了npm的 2.引用字体 import fontsource/noto-sans-sc; 在入口文件-main.js中引用 3.设置echats模板样式 import * as echarts from echarts; // 在import的后…

51c自动驾驶~合集37

我自己的原文哦~ https://blog.51cto.com/whaosoft/13878933 #DETR->DETR3D->Sparse4D 走向长时序稀疏3D目标检测 一、DETR 图1 DETR架构 DETR是第一篇将Transformer应用到目标检测方向的算法。DETR是一个经典的Encoder-Decoder结构的算法&#xff0c;它的骨干网…

【MongoDB篇】MongoDB的集合操作!

目录 引言第一节&#xff1a;集合的“诞生”——自动出现还是手动打造&#xff1f;&#x1f914;第二节&#xff1a;集合的“查阅”——看看这个数据库里有哪些柜子&#xff1f;&#x1f4c2;&#x1f440;第三节&#xff1a;集合的“重命名”——给文件柜换个名字&#xff01;…

Goland终端PowerShell命令失效

Goland终端Terminal的PowerShell不能使用&#xff0c;明明windows上升级了PowerShell 7设置了配置文件&#xff0c;但是只能在windows终端下使用&#xff0c;goland终端下直接失效报错&#xff0c;安装升级PowerShell请看Windows11终端升级PowerShell7 - HashFlag - 博客园 问…

简单分析自动驾驶发展现状与挑战

一、技术进展与市场渗透 技术分级与渗透率 当前量产乘用车的自动驾驶等级以L2为主&#xff08;渗透率约51%&#xff09;&#xff0c;L3级处于初步落地阶段&#xff08;渗透率约20%&#xff09;&#xff0c;而L4级仍处于测试和示范运营阶段&#xff08;渗透率约11%&#xff09;2…

【C++类和数据抽象】消息处理示例(1):从设计模式到实战应用

目录 一、数据抽象概述 二、消息处理的核心概念 2.1 什么是消息处理&#xff1f; 2.2 消息处理的核心目标 三、基于设计模式的消息处理实现 3.1 观察者模式&#xff08;Observer Pattern&#xff09; 3.2 命令模式&#xff08;Command Pattern&#xff09; 四、实战场景…

【统计方法】交叉验证:Resampling, nested 交叉验证等策略 【含R语言】

Resampling (重采样方法) 重采样方法是从训练数据中反复抽取样本&#xff0c;并在每个&#xff08;重新&#xff09;样本上重新调整模型&#xff0c;以获得关于拟合模型的附加信息的技术。 两种主要的重采样方法 Cross-Validation (CV) 交叉验证 &#xff1a; 用于估计测试误…

常见的 CSS 知识点整理

1. 盒模型&#xff08;Box Model&#xff09;是什么&#xff1f;标准盒模型和 IE 盒模型的区别&#xff1f; 答案&#xff1a; CSS 盒模型将元素视为一个盒子&#xff0c;由内容&#xff08;content&#xff09;、内边距&#xff08;padding&#xff09;、边框&#xff08;bor…

Educational Codeforces Round 178 div2(题解ABCDE)

A. Three Decks #1.由于最后三个数会相等&#xff0c;提前算出来和&#xff0c;%3判断&#xff0c;再判前两个数是否大于 #include<iostream> #include<vector> #include<stdio.h> #include<map> #include<string> #include<algorithm> #…

如何创建一个导入模板?全流程图文解析

先去找到系统内可以上传东西的按钮 把你的模板上传上去,找到对应的fileName 图里的文字写错了,是复制粘贴"filePath"到URL才能下载

通信原理第七版与第六版区别附pdf

介绍 我用夸克网盘分享了「通信原理 第7版》樊昌信」&#xff0c;链接&#xff1a;https://pan.quark.cn/s/be7c5af4cdce 《通信原理&#xff08;第7版&#xff09;》是在第6版的基础上&#xff0c;为了适应当前通信技术发展和教学需求&#xff0c;并吸取了数十所院校教师的反…

Mysql唯一性约束

唯一性约束&#xff08;Unique Constraint&#xff09;是数据库设计中用于保证表中某一列或多列组合的值具有唯一性的一种规则。它可以防止在指定列中插入重复的数据&#xff0c;有助于维护数据的完整性和准确性。下面从几个方面为你详细解释 作用 确保数据准确性&#xff1a…

测试基础笔记第十六天

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、UI自动化介绍1.认识UI自动化测试2.实施UI自动化测试前置条件3.UI自动化测试执行时机4.UI自动化测试核心作用和劣势 二、认识Web自动化测试工具-Selenium021.Sel…

PaddleX的安装

参考&#xff1a;安装PaddlePaddle - PaddleX 文档 1、安装PaddlePaddle 查看 docker 版本 docker --version 若您通过 Docker 安装&#xff0c;请参考下述命令&#xff0c;使用飞桨框架官方 Docker 镜像&#xff0c;创建一个名为 paddlex 的容器&#xff0c;并将当前工作目…

长效住宅IP是什么?如何获取长效住宅IP?

在当今的互联网世界里&#xff0c;IP地址作为连接用户与网站之间的桥梁&#xff0c;其重要性不言而喻。对于跨境电商、社交媒体运营以及数据采集等领域的专业人士而言&#xff0c;普通的IP地址已无法满足日益复杂的需求。他们更需要一种稳定、安全且持久的长效住宅IP来完成各类…

02 业务流程架构

业务流程架构提供了自上而下的组织鸟瞰图&#xff0c;是业务流程的全景图。根据所采用的方法不同&#xff0c;有时被称为流程全景图或高层级流程图&#xff0c;提供了业务运营中所有业务流程的整体视图。 这样有助于理解企业内部各个业务流程之间的相互关系以及它们如何共同工…