Spring Boot整合Kafka+SSE实现实时数据展示

2024年3月10日

知识积累

为什么使用Kafka?

不使用Rabbitmq或者Rocketmq是因为Kafka是Hadoop集群下的组成部分,对于大数据的相关开发适应性好,且当前业务场景下不需要使用死信队列,不过要注意Kafka对于更新时间慢的数据拉取也较慢,因此对与实时性要求高可以选择其他MQ。
使用消息队列是因为该中间件具有实时性,且可以作为广播进行消息分发。

为什么使用SSE?

使用Websocket传输信息的时候,会转成二进制数据,产生一定的时间损耗,SSE直接传输文本,不存在这个问题
由于Websocket是双向的,读取日志的时候,如果有人连接ws日志,会发送大量异常信息,会给使用段和日志段造成问题;SSE是单向的,不需要考虑这个问题,提高了安全性
另外就是SSE支持断线重连;Websocket协议本身并没有提供心跳机制,所以长时间没有数据发送时,会将这个连接断掉,因此需要手写心跳机制进行实现。
此外,由于是长连接的一个实现方式,所以SSE也可以替代Websocket实现扫码登陆(比如通过SSE的超时组件在实现二维码的超时功能,具体实现我可以整理一下)
另外,如果是普通项目,不需要过高的实时性,则不需要使用Websocket,使用SSE即可

代码实现

Java代码

pom.xml引入SSE和Kafka

<!-- SSE,一般springboot开发web应用的都有 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
<!-- kafka,最主要的是第一个,剩下两个是测试用的 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency<groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>

application.properties增加Kafka配置信息

# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

配置Kafka信息

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
配置controller,通过web方式开启效果
@RestController
@RequestMapping(path = "sse")
public class KafkaSSEController {private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** @param message* @apiNote 发送信息到Kafka主题中*/@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}/*** 监听Kafka数据** @param message*/@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}/*** 连接sse服务** @param id* @return* @throws IOException*/@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})public SseEmitter push(@RequestParam("id") String id) throws IOException {// 超时时间设置为5分钟,用于演示客户端自动重连SseEmitter sseEmitter = new SseEmitter(5_60_000L);// 设置前端的重试时间为1s// send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));sseCache.put(id, sseEmitter);System.out.println("add " + id);sseEmitter.send("你好", MediaType.APPLICATION_JSON);SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈");sseEmitter.send(data);// onTimeout(): 超时回调触发sseEmitter.onTimeout(() -> {System.out.println(id + "超时");sseCache.remove(id);});// onCompletion(): 结束之后的回调触发sseEmitter.onCompletion(() -> System.out.println("完成!!!"));return sseEmitter;}/*** http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa* @param id* @param content* @return* @throws IOException*/@ResponseBody@GetMapping(path = "push")public String push(String id, String content) throws IOException {SseEmitter sseEmitter = sseCache.get(id);if (sseEmitter != null) {sseEmitter.send(content);}return "over";}@ResponseBody@GetMapping(path = "over")public String over(String id) {SseEmitter sseEmitter = sseCache.get(id);if (sseEmitter != null) {// complete(): 表示执行完毕,会断开连接sseEmitter.complete();sseCache.remove(id);}return "over";}}

前端方式

<html><head><script>console.log('start')const clientId = "your_client_id_x"; // 设置客户端IDconst eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 订阅服务器端的SSEeventSource.onmessage = event => {console.log(event.data)const message = JSON.parse(event.data);console.log(`Received message from server: ${message}`);};// 发送消息给服务器端 可通过 postman 调用,所以下面 sendMessage() 调用被注释掉了function sendMessage() {const message = "hello sse";fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {method: "POST",headers: { "Content-Type": "application/json" },body: JSON.stringify(message)});console.log('dddd'+JSON.stringify(message))}// sendMessage()</script></head>
</html>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/797254.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

73.矩阵置零

题目描述 解题思路 ——————初步思路&#xff0c;暴力求解—————— 1.遍历原矩阵&#xff0c;记录0位置的行列值&#xff0c;存入数组flags中&#xff0c;作为标志 2.读取flags中的元素&#xff0c;获取要置为0的行列值&#xff0c;然后两个循环将matrix中的对应元…

【代码】二分法求最小值

仅适用于以下情况&#xff1a;区间内单调或者最多一个极小值 代码 以[0,pi]内的三角函数为例 clc clear close allx0:pi/1000:pi; ytest(x); figure() plot(x,y,.)cutnum100;x1x(1); x2x(end); error_max10^-1000;%能接受的误差上限 for i1:cutnum%这里cutnum是取值上限num(…

电池二次利用走向可持续大循环周期的潜力和挑战(第一篇)

一、背景 当前&#xff0c;气候变化是全球可持续发展面临的重大挑战。缓解气候变化最具挑战性的目标是在本世纪中期实现碳中和&#xff08;排放量低到足以被自然系统安全吸收&#xff09;&#xff0c;其中电动汽车&#xff08;EV&#xff09;的引入是一项关键举措。电动汽车在…

SSL根证书是什么

根证书是什么&#xff1f; 根证书是CA认证中心给自己颁发的证书,是信任链的起始点。安装根证书意味着对这个CA认证中心的信任。 从技术上讲&#xff0c;证书其实包含三部分&#xff0c;用户的信息&#xff0c;用户的公钥&#xff0c;还有CA中心对该证书里面的信息的签名&#…

对代理模式的理解

目录 一、前言二、案例1 代码2 自定义代理类【静态代理】2.1 一个接口多个实现&#xff0c;到底注入哪个依赖呢&#xff1f;2.1.1 Primary注解2.1.2 Resource注解&#xff08;指定name属性&#xff09;2.1.3 Qualifier注解 2.2 面向接口编程2.3 如果没接口咋办呢&#xff1f;2.…

替代安全指标(Surrogate Safety Measures (SSM) )

替代安全措施&#xff08;Surrogate Safety Measures (SSM) &#xff09;用于从数据中寻找接近碰撞&#xff0c;或可能发生&#xff08;但实际没有发生&#xff09;的碰撞事件。 SSM的两个合格标准&#xff1a; &#xff08;1&#xff09;它应该来自与碰撞直接相关的交通冲突&…

阿里巴巴中国站获得1688商品详情 API:如何通过API接口批量获取价格、标题、图片、库存等数据

在数字化时代&#xff0c;数据的重要性不言而喻。对于电商从业者来说&#xff0c;获取商品详情数据是提升业务效率和用户体验的关键。阿里巴巴中国站作为电商行业的巨头&#xff0c;提供了丰富的API接口&#xff0c;方便开发者们批量获取商品信息。本文将详细叙述如何通过阿里巴…

C语言——详解字符函数和字符串函数(二)

Hi,铁子们好呀&#xff01;之前博主给大家简单地介绍了部分字符和字符串函数&#xff0c;那么这次&#xff0c;博主将会把这些字符串函数给大家依次讲完&#xff01; 今天讲的具体内容如下: 文章目录 6.strcmp函数的使用及模拟实现6.1 strcmp函数介绍和基本使用6.1.1 strcmp函…

总结:微信小程序中跨组件的通信、状态管理的方案

在微信小程序中实现跨组件通信和状态管理,有以下几种主要方案: 事件机制 通过事件机制可以实现父子组件、兄弟组件的通信。 示例: 父组件向子组件传递数据: 父组件: <child binddata"handleChildData" /> 子组件: Component({..., methods: { handleChildData(…

Linux网卡与IP地址:通往网络世界的通行证

在探索Linux网卡和IP地址的关系之前&#xff0c;我们得先理解Linux网卡是怎么工作的。想象一下&#xff0c;每台计算机都是一个世界&#x1f30e;&#xff0c;而网卡就是连接这些世界的门户&#x1f6aa;。网卡的工作就是接收和发送数据包&#xff0c;就像邮差&#x1f4ec;递送…

RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表

RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表 文章目录 RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表1. MQTT概览2. MQTT 5.0 特性1. 特性概要2. Docker中安装RabbitMQ及启用MQTT5.0协议 3. MQTT 5.0 功能列表1. 消息过期1. 描述2. 举例3. 实现 2. 订阅标识…

洛谷 1126.机器人搬重物

思路&#xff1a;BFS 这道BFS可谓是细节爆炸&#xff0c;对于编程能力和判断条件的能力的考察非常之大。 对于这道题&#xff0c;我们还需要额外考虑一些因素&#xff0c;那就是对于障碍物的考虑和机器人方位的考虑。 首先我们看第一个问题&#xff0c;就是对于障碍物的考虑…

linux命令学习(一)grep命令

–colorauto 或者 –color&#xff1a;表示对匹配到的文本着色显示 -i&#xff1a;在搜索的时候忽略大小写 -n&#xff1a;显示结果所在行号 -c&#xff1a;统计匹配到的行数&#xff0c;注意&#xff0c;是匹配到的总行数&#xff0c;不是匹配到的次数 -o&#xff1a;只显…

淘宝店商家电话提取软件操作经验

淘宝爬虫工具是一种用于自动化获取淘宝网站数据的程序。以下是一个简单的淘宝爬虫工具的代码示例&#xff1a; import requests from bs4 import BeautifulSoupdef get_taobao_data(keyword):url fhttps://s.taobao.com/search?q{keyword}headers {User-Agent: Mozilla/5.0…

【洛谷】P9236 [蓝桥杯 2023 省 A] 异或和之和

题目链接 P9236 [蓝桥杯 2023 省 A] 异或和之和 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路 1. 暴力求解 直接枚举出所有子数组&#xff0c;求每个子数组的异或和&#xff0c;再对所有的异或和求和 枚举所有子数组的时间复杂度为O&#xff08;N^2&#xff09;&…

AI技术创业机会之零售与电子商务

人工智能&#xff08;AI&#xff09;技术的创新与发展为零售与电子商务领域带来了深刻的变革&#xff0c;为创业者创造了丰富的创业机会。以下详述了零售与电子商务背景下AI技术的创业机会及其具体细节与内容&#xff0c;以5000字篇幅深入剖析各细分领域&#xff0c;为有志于投…

C++基础语法、算法、数据结构的简化总结

这里写目录标题 C基础语法变量与运算条件判断循环结构 数组和字符串一维数组二维数组字符串处理 函数与算法函数定义常用算法 数据结构链表与顺序表栈和队列树 计算机基础硬件组成数据单位进制转换 C知识点总结&#xff0c;适合六年级刚学C的学生&#xff1a; C基础语法 变量与…

Qt+OpenGL-part3

1-4EBO画矩形_哔哩哔哩_bilibili 可以绘制两个三角形来组成一个矩形&#xff08;OpenGL主要处理三角形&#xff09; 直接画两个三角形&#xff1a; #include "openglwidget.h" #include <QDebug>unsigned int VBO,VAO; unsigned int shaderProgram;//顶点着…

升级到springdoc的Swagger3

jdk 17、springboot3 依赖配置&#xff1a;就此一个依赖即可 <dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>2.5.0</version></dependency>Swagger2-…

Leetcode 215. 数组中的第K个最大元素

心路历程&#xff1a; 这道题本质上是排序不完全的过程&#xff0c;而且这道题有bug&#xff0c;直接用python的排序算法其实就能AC。 可以按照快排排到找到k-1个large元素的思维去做&#xff0c;不过这道题需要考虑空间复杂度&#xff0c;所以需要用指针快排。 其实也可以考虑…