Spring WebFlux 中 WebSocket 使用 DataBuffer 的注意事项

以下是修改后的完整文档,包含在多个多线程环境中使用 retain()release() 方法的示例,且确保在 finally 块中调用 release()


在 Spring WebFlux 中,WebSocketMessage 主要用于表示 WebSocket 的消息载体,其中 getPayload() 方法返回 DataBuffer,用于处理二进制数据流。在使用 DataBuffer 时,需要注意其一次性读取特性,以及潜在的内存管理问题。本文将介绍如何正确使用 DataBuffer,避免重复读取和内存泄漏。

1. 避免重复读取 DataBuffer

DataBuffer 设计为一次性读取流数据,因此,一旦被消费,后续读取将无法获取数据。例如:

String firstRead = webSocketMessage.getPayload().toString(StandardCharsets.UTF_8);
String secondRead = webSocketMessage.getPayload().toString(StandardCharsets.UTF_8); // 此处读取会失败

解决方案

如果需要多次使用 DataBuffer 的数据,可以在第一次读取时缓存:

DataBuffer dataBuffer = webSocketMessage.getPayload();
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
String payload = new String(bytes, StandardCharsets.UTF_8);

这样,后续可以安全地使用 payload 变量,而不会影响 DataBuffer


2. 避免阻塞操作

Spring WebFlux 是基于响应式编程的,WebSocket 处理也应保持非阻塞。如果在 DataBuffer 处理中引入了阻塞操作(如同步 I/O 或 Thread.sleep()),可能会导致 Reactor 线程阻塞,影响整体吞吐量。

解决方案

使用 Flux/Mono 进行异步处理,例如:

session.receive().map(WebSocketMessage::getPayloadAsText)  // 避免直接操作 DataBuffer.flatMap(payload -> processMessage(payload)).subscribe();

3. 处理 DataBuffer 可能带来的内存泄漏

Spring WebFlux 采用 Netty 作为默认底层引擎,而 Netty 的 ByteBuf 需要手动释放,否则可能导致内存泄漏。Spring 提供了 DataBufferUtils.release() 方法来避免 DataBuffer 占用资源不被回收。

正确的释放方式

session.receive().doOnNext(message -> {try {String data = message.getPayloadAsText();System.out.println("Received: " + data);} finally {DataBufferUtils.release(message.getPayload());}}).subscribe();

DataBufferUtils.release() 仅在手动管理 DataBuffer 生命周期时才需要,如果直接通过 WebSocketMessage.getPayloadAsText() 处理字符串,不必显式释放。


4. 在 Flux/Mono 组合操作时避免数据丢失

如果 DataBuffermap() 操作多次消费,可能导致数据丢失或 DataBuffer 为空。例如:

session.receive().map(message -> {DataBuffer payload = message.getPayload();DataBufferUtils.release(payload); // 这里释放后,后续的 map() 操作会读取不到数据return payload;}).map(buffer -> buffer.toString(StandardCharsets.UTF_8)) // 这里可能会失败.subscribe();

正确的方式

  • 确保 DataBuffer 只在最终消费时释放。
  • 处理 DataBuffer 时,转换为 byte[] 以避免流式数据的重复读取。
session.receive().map(WebSocketMessage::getPayload).map(dataBuffer -> {byte[] bytes = new byte[dataBuffer.readableByteCount()];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);  // 读取完毕后释放return new String(bytes, StandardCharsets.UTF_8);}).subscribe(System.out::println);

5. retain()release() 方法的补充

Spring WebFlux 中,WebSocketMessage 还提供了 retain()release() 方法,用于管理 DataBuffer 的引用计数和释放资源。下面介绍如何在多线程环境中正确使用这些方法。

retain() 方法

retain() 方法确保 DataBuffer 的引用计数增加,以便在需要时能够安全使用:

public WebSocketMessage retain() {if (reactorNetty2Present) {return ReactorNetty2Helper.retain(this);}DataBufferUtils.retain(this.payload);return this;
}

retain() 方法会增加 DataBuffer 的引用计数,防止在处理过程中被提前释放。这对于需要多个组件共享同一 DataBuffer 实例的情况非常重要。

release() 方法

release() 方法用于释放 DataBuffer,减少引用计数,释放底层资源,防止内存泄漏:

public void release() {DataBufferUtils.release(this.payload);
}

release() 方法通常在处理完成后调用,确保底层的 DataBuffer 被正确释放。

使用示例:在多线程环境中使用 retain() 和 release()

在 WebSocket 消息处理时,确保在多线程环境中正确管理 DataBuffer 的生命周期。示例如下,使用 retain() 保证资源被正确引用,并在 finally 块中调用 release() 确保即使出现异常时也会释放资源:

session.receive().doOnNext(message -> {// 在多线程环境中保留引用message.retain();try {String data = message.getPayloadAsText();System.out.println("Received: " + data);// 模拟处理过程,可能会涉及多线程操作// 例如:通过某个线程池处理消息processMessageAsync(data);} finally {// 确保释放资源message.release();  // 释放资源}}).subscribe();

在上面的示例中,retain() 确保了 DataBuffer 在多个线程中可以安全访问,直到最终的 release() 被调用来释放资源。无论操作成功与否,finally 块中的 release() 都会被执行,确保不会发生内存泄漏。


6. 总结

在 Spring WebFlux 中使用 WebSocketMessageDataBuffer 需要注意以下几点:

  1. 避免重复读取 DataBuffer,建议在读取后缓存数据。
  2. 避免阻塞操作,尽量使用 Flux/Mono 进行异步处理。
  3. 防止内存泄漏,在手动管理 DataBuffer 生命周期时使用 DataBufferUtils.release() 释放资源。
  4. 确保 DataBuffer 只在最终消费时释放,避免 Flux 流程中数据丢失。
  5. 使用 retain()release() 方法 来管理 DataBuffer 的引用计数,确保资源的正确释放,特别是在多线程环境中,确保在 finally 中释放资源。

通过遵循这些实践,可以有效地管理 WebSocket 消息的内存使用,并提高应用的性能和可靠性。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/897239.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【CSS】Tailwind CSS 与传统 CSS:设计理念与使用场景对比

1. 开发方式 1.1 传统 CSS 手写 CSS:你需要手动编写 CSS 规则,定义类名、ID 或元素选择器,并为每个元素编写样式。 分离式开发:HTML 和 CSS 通常是分离的,HTML 中通过类名或 ID 引用 CSS 文件中的样式。 示例&#…

2025华为OD机试真题E卷 - 螺旋数字矩阵【Java】

题目描述 疫情期间,小明隔离在家,百无聊赖,在纸上写数字玩。他发明了一种写法:给出数字个数 n (0 < n ≤ 999)和行数 m(0 < m ≤ 999),从左上角的 1 开始,按照顺时针螺旋向内写方式,依次写出2,3,…,n,最终形成一个 m 行矩阵。小明对这个矩阵有些要求: 1、…

地下井室可燃气体监测装置:守护地下安全,防患于未“燃”!

在城市的地下&#xff0c;隐藏着无数的燃气管道和井室&#xff0c;它们是城市基础设施建设的重要部分&#xff0c;燃气的使用&#xff0c;给大家的生活提供了极大的便利。在便利生活的背后&#xff0c;也存在潜在的城市安全隐患。 近年来&#xff0c;地下井室可燃气体泄漏事故…

【使用hexo模板创建个人博客网站】

使用hexo模板创建个人博客网站 环境准备node安装hexo安装ssh配置 使用hexo命令搭建个人博客网站hexo命令 部署到github创建仓库修改_config.yml文件 编写博客主题扩展 环境准备 node安装 进入node官网安装node.js 使用node -v检查是否安装成功 安装成功后应该出现如上界面 …

C# OPC DA获取DCS数据(提前配置DCOM)

OPC DA配置操作手册 配置完成后&#xff0c;访问远程ip&#xff0c;就能获取到服务 C#使用Interop.OPCAutomation采集OPC DA数据&#xff0c;支持订阅&#xff08;数据变化&#xff09;、单个读取、单个写入、断线重连

发行思考:全球热销榜的频繁变动

几点杂感&#xff1a; 1、单机游戏销量与在线人数的衰退是剧烈的&#xff0c;有明显的周期性&#xff0c;而在线游戏则稳定很多。 如去年的某明星游戏&#xff0c;最高200多万在线&#xff0c;如今在线人数是48名&#xff0c;3万多。 而近期热门的是MH&#xff0c;在线人数8…

Unity自定义区域UI滑动事件

自定义区域UI滑动事件 介绍制作1.创建一个Image2.创建脚本 总结 介绍 一提到滑动事件联想到有太多的插件了比如EastTouchBundle&#xff0c;今天想单纯通过UI去做一个滑动事件而不是基于Box2d或者Box去做滑动事件。 制作 1.创建一个Image 2.创建脚本 using UnityEngine; us…

taosd 写入与查询场景下压缩解压及加密解密的 CPU 占用分析

在当今大数据时代&#xff0c;时序数据库的应用越来越广泛&#xff0c;尤其是在物联网、工业监控、金融分析等领域。TDengine 作为一款高性能的时序数据库&#xff0c;凭借独特的存储架构和高效的压缩算法&#xff0c;在存储和查询效率上表现出色。然而&#xff0c;随着数据规模…

《UE5_C++多人TPS完整教程》学习笔记34 ——《P35 网络角色(Network Role)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P35 网络角色&#xff08;Network Role&#xff09;》 的学习笔记&#xff0c;该系列教学视频为计算机工程师、程序员、游戏开发者、作家&#xff08;Engineer, Programmer, Game Developer, Author&#xff09; Stephe…

K8s 1.27.1 实战系列(七)Deployment

一、Deployment介绍 Deployment负责创建和更新应用程序的实例,使Pod拥有多副本,自愈,扩缩容等能力。创建Deployment后,Kubernetes Master 将应用程序实例调度到集群中的各个节点上。如果托管实例的节点关闭或被删除,Deployment控制器会将该实例替换为群集中另一个节点上的…

Linux(Centos 7.6)命令详解:vim

1.命令作用 vi/vim 是Linux 系统内置不可或缺的文本编辑命令&#xff0c;vim 是vi 的加强版本&#xff0c;兼容vi 的所有指令&#xff0c;不仅能编辑文本&#xff0c;而且还具有shell 程序编辑的功能&#xff0c;可以不同颜色的字体来辨别语法的正确性。 2.命令语法 usage: …

微信小程序引入vant-weapp组件教程

本章教程,介绍如何在微信小程序中引入vant-weapp。 vant-weapp文档:https://vant-ui.github.io/vant-weapp/#/button 一、新建一个小程序 二、npm初始化 npm init三、安装 Vant Weapp‘ npm i @vant/weapp -

C++ 作业 DAY5

作业 代码 Widtget.h class Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);~Widget();private:Ui::Widget *ui;/************************ 起始终止坐标 ************************/QPoint end;QPoint start;QVector<QPoint> per_start_lis…

Selenium 中 ActionChains 支持的鼠标和键盘操作设置及最佳实践

Selenium 中 ActionChains 支持的鼠标和键盘操作设置及最佳实践 一、引言 在使用 Selenium 进行自动化测试时&#xff0c;ActionChains 类提供了强大的功能&#xff0c;用于模拟鼠标和键盘的各种操作。通过 ActionChains&#xff0c;可以实现复杂的用户交互&#xff0c;如鼠标…

前端面试技术性场景题

87.场景面试之大数运算&#xff1a;超过js中number最大值的数怎么处理 在 JavaScript 中&#xff0c;Number.MAX_SAFE_INTEGER&#xff08;即 2^53 - 1&#xff0c;即 9007199254740991&#xff09;是能被安全表示的最大整数。超过此值时&#xff0c;普通的 Number 类型会出现…

【js逆向】iwencai国内某金融网站实战

地址&#xff1a;aHR0cHM6Ly93d3cuaXdlbmNhaS5jb20vdW5pZmllZHdhcC9ob21lL2luZGV4 在搜索框中随便输入关键词 查看请求标头&#xff0c;请求头中有一个特殊的 Hexin-V,它是加密过的&#xff1b;响应数据包中全是明文。搞清楚Hexin-V的值是怎么生成的&#xff0c;这个值和cooki…

ES Module 的 import 导入和 import () 动态导入

ES Module 的 import 导入和 import () 动态导入介绍 一、ES Module 简介 ES Module 是 JavaScript 官方提供的标准化模块系统&#xff0c;它的出现解决了长期以来 JavaScript 在模块管理方面的混乱局面。通过 ES Module&#xff0c;开发者可以更加方便地组织和复用代码&…

使用Node.js从零搭建DeepSeek本地部署(Express框架、Ollama)

目录 1.安装Node.js和npm2.初始化项目3.安装Ollama4.下载DeepSeek模型5.创建Node.js服务器6.运行服务器7.Web UI对话-Chrome插件-Page Assist 1.安装Node.js和npm 首先确保我们机器上已经安装了Node.js和npm。如果未安装&#xff0c;可以通过以下链接下载并安装适合我们操作系…

BUUCTF——[GYCTF2020]FlaskApp1 SSTI模板注入/PIN学习

目录 一、网页功能探索 二、SSTI注入 三、方法一 四、方法二 使用PIN码 &#xff08;1&#xff09;服务器运行flask登录所需的用户名 &#xff08;2&#xff09;modename &#xff08;3&#xff09;flask库下app.py的绝对路径 &#xff08;4&#xff09;当前网络的mac地…

Java基础关键_018_集合(二)

目 录 一、泛型 ※ 1.说明 2.实例 3.擦除与补偿 4.泛型的定义 &#xff08;1&#xff09;类定义 &#xff08;2&#xff09;静态方法定义 &#xff08;3&#xff09;接口定义 5.通配符 &#xff08;1&#xff09;无限定 &#xff08;2&#xff09;上限 &#xff…