Reactor 事件流 vs. Spring 事件 (ApplicationEvent)

Reactor 事件流 vs. Spring 事件 ApplicationEvent

  • Reactor 事件流 vs. Spring 事件 (`ApplicationEvent`)
    • 1️⃣ 核心区别
    • 2️⃣ Spring 事件 (`ApplicationEvent`)
      • ✅ 示例:Spring 事件发布 & 监听
        • 1️⃣ 定义事件
        • 2️⃣ 发布事件
        • 3️⃣ 监听事件
        • 🔹 进阶:异步监听
    • 3️⃣ Reactor 事件流(`Flux` / `Mono` / `Sinks.Many`)
      • ✅ 示例:事件流处理
        • 1️⃣ 冷流(每个订阅者独立获取数据)
          • ✅ 方法 1:使用 `Flux.create()`,手动推送数据
          • **✅ 方法 2:使用 `Flux.generate()`,按需推送数据**
          • **✅ 方法 3:使用 `Sinks.many().multicast()`,支持多个订阅者**
          • **✅ 结论**
        • 2️⃣ 热流(共享事件流)
        • **📌 ReplayProcessor:可重放历史事件的 Reactor 处理器**
        • **📌 1️⃣ 关键特点**
        • **📌 2️⃣ 基本使用**
        • **📌 运行结果**
        • **📌 3️⃣ `ReplayProcessor` vs `Sinks.Many`**
        • **📌 4️⃣ 适用场景**
    • 4️⃣ 什么时候用哪个?
    • 5️⃣ 总结

Reactor 事件流 vs. Spring 事件 (ApplicationEvent)

1️⃣ 核心区别

特性Spring ApplicationEventReactor Flux /Sinks.Many
数据处理方式一次性事件(同步或异步)流式处理(持续事件流)
是否支持多个订阅者✅ 支持(多个 @EventListener✅ 支持(Sinks.Many 广播)
是否支持流式操作❌ 不支持支持map(), filter(), zip()
是否支持回放历史事件❌ 不支持❌(默认不支持,但可用 ReplayProcessor
适用场景业务事件通知(用户注册、订单支付)高吞吐数据流处理(日志、消息队列、WebFlux)

2️⃣ Spring 事件 (ApplicationEvent)

🔹 特点

  • 适用于应用内部组件通信,解耦业务逻辑。
  • 默认同步,可使用 @Async 进行异步处理。
  • 一次性事件,无法流式处理。

✅ 示例:Spring 事件发布 & 监听

1️⃣ 定义事件
public class UserRegisteredEvent extends ApplicationEvent {private final String username;public UserRegisteredEvent(Object source, String username) {super(source);this.username = username;}public String getUsername() { return username; }
}
2️⃣ 发布事件
@Component
public class UserService {@Autowiredprivate ApplicationEventPublisher eventPublisher;public void registerUser(String username) {eventPublisher.publishEvent(new UserRegisteredEvent(this, username));}
}
3️⃣ 监听事件
@Component
public class WelcomeEmailListener {@EventListenerpublic void handleUserRegisteredEvent(UserRegisteredEvent event) {System.out.println("📧 发送欢迎邮件给: " + event.getUsername());}
}

可多个 @EventListener 监听同一个事件,同时触发

@Component
public class LoggingListener {@EventListenerpublic void logUserRegisteredEvent(UserRegisteredEvent event) {System.out.println("📜 记录日志: 用户 " + event.getUsername() + " 已注册");}
}
@Component
public class PointsRewardListener {@EventListenerpublic void giveWelcomePoints(UserRegisteredEvent event) {System.out.println("🎁 赠送 100 积分给: " + event.getUsername());}
}
🔹 进阶:异步监听

🔹 1️⃣ 监听器可以指定异步 需要启用 Spring 异步,@EnableAsync

@Async
@EventListener
public void sendWelcomeEmail(UserRegisteredEvent event) {System.out.println("📧 发送欢迎邮件给: " + event.getUsername() + " [异步]");
}

🔹 2️⃣ 监听多个事件 如果多个事件需要相同的处理逻辑,你可以用 classes 监听多个事件:

@EventListener(classes = {UserRegisteredEvent.class, OrderPlacedEvent.class})
public void handleMultipleEvents(Object event) {System.out.println("📢 事件触发: " + event.getClass().getSimpleName());
}

🔹 3️⃣ 条件监听 可以使用 condition 属性,让监听器只处理 符合条件 的事件:

@EventListener(condition = "#event.username.startsWith('A')")
public void handleUserStartingWithA(UserRegisteredEvent event) {System.out.println("🎯 处理用户名以 A 开头的用户: " + event.getUsername());
}

🔹 适用场景 ✅ 适用于业务事件通知(如用户注册、订单支付)。 ❌ 不适用于流式数据处理

3️⃣ Reactor 事件流(Flux / Mono / Sinks.Many

🔹 特点

  • 异步 & 流式 处理,可以并行、合并、过滤、转换数据。
  • 冷流(Flux、Mono) 每个订阅者独立处理数据
  • 热流(Sinks.Many) 可用于事件广播

✅ 示例:事件流处理

1️⃣ 冷流(每个订阅者独立获取数据)
Flux<String> flux = Flux.just("事件1", "事件2", "事件3");
flux.subscribe(event -> System.out.println("订阅者 1 收到: " + event));
flux.subscribe(event -> System.out.println("订阅者 2 收到: " + event));
✅ 方法 1:使用 Flux.create(),手动推送数据
 import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CustomFluxExample {public static void main(String[] args) throws InterruptedException {Flux<String> customFlux = Flux.create(emitter -> {Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {String event = "自定义事件:" + System.currentTimeMillis();System.out.println("发布:" + event);emitter.next(event);}, 0, 1, TimeUnit.SECONDS);}, FluxSink.OverflowStrategy.BUFFER);// 订阅者 1customFlux.subscribe(event -> System.out.println("订阅者 1 收到:" + event));Thread.sleep(5000); // 5 秒后再添加订阅者// 订阅者 2customFlux.subscribe(event -> System.out.println("订阅者 2 收到:" + event));Thread.sleep(10000); // 让主线程等待一会儿,看效果}
}

🔹 运行结果

python-repl复制编辑发布:自定义事件:1712101234567
订阅者 1 收到:自定义事件:1712101234567
发布:自定义事件:1712101235567
订阅者 1 收到:自定义事件:1712101235567
发布:自定义事件:1712101236567
订阅者 1 收到:自定义事件:1712101236567
发布:自定义事件:1712101237567
订阅者 1 收到:自定义事件:1712101237567
发布:自定义事件:1712101238567
订阅者 1 收到:自定义事件:1712101238567
(5 秒后,订阅者 2 加入)
订阅者 2 收到:自定义事件:1712101239567
订阅者 1 收到:自定义事件:1712101239567
发布:自定义事件:1712101240567
订阅者 2 收到:自定义事件:1712101240567
订阅者 1 收到:自定义事件:1712101240567
...

📌 特点

  1. 你可以随时手动推送数据(每 1 秒发布一次)。

  2. 新订阅者不会收到历史数据,只会接收到之后的事件(如果你想让新订阅者也能收到历史数据,可以用 .replay())。

    示例:

    Flux<String> flux = Flux.just("事件A", "事件B", "事件C").replay(2)  // 缓存最后 2 个事件.autoConnect();  // 至少一个订阅者连接后开始发布flux.subscribe(event -> System.out.println("订阅者 1 收到: " + event));// 新的订阅者会从缓存中接收事件
    flux.subscribe(event -> System.out.println("订阅者 2 收到: " + event));

    输出:

    订阅者 1 收到: 事件A
    订阅者 1 收到: 事件B
    订阅者 1 收到: 事件C
    订阅者 2 收到: 事件B
    订阅者 2 收到: 事件C
    
  3. 不会自动结束,Flux 会一直运行

✅ 方法 2:使用 Flux.generate(),按需推送数据

如果你的数据是基于前一个数据计算出来的,可以使用 Flux.generate()

import reactor.core.publisher.Flux;import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;public class GenerateFluxExample {public static void main(String[] args) throws InterruptedException {Flux<String> generatedFlux = Flux.generate(() -> new AtomicInteger(1),  // 初始状态(state, sink) -> {String event = "事件 " + state.getAndIncrement();System.out.println("发布:" + event);sink.next(event);try { Thread.sleep(1000); } catch (InterruptedException e) {}return state;});// 订阅者 1generatedFlux.subscribe(event -> System.out.println("订阅者 1 收到:" + event));Thread.sleep(5000); // 5 秒后再添加订阅者// 订阅者 2generatedFlux.subscribe(event -> System.out.println("订阅者 2 收到:" + event));Thread.sleep(10000);}
}

🔹 运行结果

python-repl复制编辑发布:事件 1
订阅者 1 收到:事件 1
发布:事件 2
订阅者 1 收到:事件 2
发布:事件 3
订阅者 1 收到:事件 3
发布:事件 4
订阅者 1 收到:事件 4
发布:事件 5
订阅者 1 收到:事件 5
(5 秒后,订阅者 2 加入)
发布:事件 6
订阅者 1 收到:事件 6
订阅者 2 收到:事件 6
发布:事件 7
订阅者 1 收到:事件 7
订阅者 2 收到:事件 7
...

📌 区别

  • Flux.generate() 一次只能推送一个数据,适合基于状态逐步生成数据
  • Flux.create() 可以异步推送多个数据,适合事件流、网络请求等异步数据
✅ 方法 3:使用 Sinks.many().multicast(),支持多个订阅者

如果你希望多个订阅者可以同时消费,并且可以随时加入

import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;import java.time.Duration;public class SinkExample {public static void main(String[] args) throws InterruptedException {Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();Flux<String> flux = sink.asFlux();// 订阅者 1flux.subscribe(event -> System.out.println("订阅者 1 收到:" + event));// 模拟定时推送数据new Thread(() -> {int i = 1;while (true) {String event = "事件 " + i++;System.out.println("发布:" + event);sink.tryEmitNext(event);try { Thread.sleep(1000); } catch (InterruptedException e) { }}}).start();Thread.sleep(5000); // 5 秒后再添加订阅者// 订阅者 2flux.subscribe(event -> System.out.println("订阅者 2 收到:" + event));Thread.sleep(10000);}
}

🔹 运行结果

python-repl复制编辑发布:事件 1
订阅者 1 收到:事件 1
发布:事件 2
订阅者 1 收到:事件 2
发布:事件 3
订阅者 1 收到:事件 3
发布:事件 4
订阅者 1 收到:事件 4
发布:事件 5
订阅者 1 收到:事件 5
(5 秒后,订阅者 2 加入)
发布:事件 6
订阅者 1 收到:事件 6
订阅者 2 收到:事件 6
发布:事件 7
订阅者 1 收到:事件 7
订阅者 2 收到:事件 7
...

📌 特点

  • Sinks.many().multicast() 允许多个订阅者同时消费
  • 适用于 WebSocket、事件总线、消息队列等场景

✅ 结论
方式特点适用场景
Flux.create()手动推送数据,支持异步适合事件流、消息队列、WebSocket
Flux.generate()按需推送,每次一个适合基于前一个状态生成新数据
Sinks.many().multicast()支持多个订阅者,实时推送适合多订阅者共享数据
2️⃣ 热流(共享事件流)
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hotFlux = sink.asFlux();hotFlux.subscribe(event -> System.out.println("订阅者 1 收到: " + event));
hotFlux.subscribe(event -> System.out.println("订阅者 2 收到: " + event));sink.tryEmitNext("全局事件 1");
sink.tryEmitNext("全局事件 2");

🔹 适用场景 ✅ 适用于高吞吐、异步、多订阅者的事件流。 ✅ 适用于数据流式处理(如 WebFlux、消息队列、日志流)。 ❌ 不适用于简单的业务事件通知

📌 ReplayProcessor:可重放历史事件的 Reactor 处理器

ReplayProcessor<T> 是 Reactor 提供的一种 热流(Hot Publisher),它允许新的订阅者 回放之前发送的事件。适用于 日志系统、消息队列、数据缓存 等场景。


📌 1️⃣ 关键特点

存储历史事件,新的订阅者可以看到之前的事件。
类似 RxJava 的 ReplaySubject,可以 指定缓存大小
适用于需要回放数据的场景,如 日志系统、WebSocket 消息队列


📌 2️⃣ 基本使用
import reactor.core.publisher.ReplayProcessor;public class ReplayProcessorExample {public static void main(String[] args) {// 创建 ReplayProcessor,缓存 2 条数据ReplayProcessor<String> processor = ReplayProcessor.create(2);// 订阅者 1 订阅processor.subscribe(data -> System.out.println("订阅者 1 收到:" + data));// 发送数据processor.onNext("事件 A");processor.onNext("事件 B");processor.onNext("事件 C");// 订阅者 2 订阅processor.subscribe(data -> System.out.println("订阅者 2 收到:" + data));// 发送更多数据processor.onNext("事件 D");}
}
📌 运行结果
mathematica复制编辑订阅者 1 收到:事件 A
订阅者 1 收到:事件 B
订阅者 1 收到:事件 C
订阅者 2 收到:事件 B  // 只回放最近 2 条(事件 B 和 C)
订阅者 2 收到:事件 C
订阅者 1 收到:事件 D
订阅者 2 收到:事件 D

📌 说明

  • ReplayProcessor.create(2) 最多缓存 2 条数据,新订阅者只能收到最近的 2 条事件。
  • 订阅者 1 先订阅,它会收到所有事件。
  • 订阅者 2 后订阅,但它可以收到最近的 2 条缓存(事件 B 和 C)。

📌 3️⃣ ReplayProcessor vs Sinks.Many
特性ReplayProcessorSinks.Many(Multicast)
是否缓存历史数据,可指定缓存大小,不会缓存历史数据
新订阅者是否能收到旧数据可以不能
适用场景回放数据,如日志、历史消息实时消息推送,不存储历史

📌 4️⃣ 适用场景

日志回放(新订阅者也能看到之前的日志)。
聊天系统(新加入的用户可以看到最近的聊天记录)。
数据缓存(保存最近 N 条数据,避免重复请求)。

🚀 如果你需要 缓存历史数据,并让新的订阅者能收到过去的事件ReplayProcessor 是一个很好的选择!

4️⃣ 什么时候用哪个?

使用 ApplicationEvent(Spring 事件)

  • 简单的应用事件通知(如用户注册、邮件通知、订单完成)。
  • 解耦业务逻辑,但不需要流式处理

使用 Reactor 事件流

  • 高吞吐、并发的数据流(日志流、消息流、WebFlux)。
  • 多个订阅者同时消费事件,如广播消息、实时数据推送

结合使用(最佳实践)

@EventListener
public void handleUserRegisteredEvent(UserRegisteredEvent event) {Flux.just(event).map(e -> "处理事件: " + e.getUsername()).doOnNext(System.out::println).subscribe();
}

5️⃣ 总结

方案订阅者是否独立事件是否广播适用场景
Spring ApplicationEvent✅ 每个订阅者独立❌ 不是广播业务事件通知
Reactor Flux(冷流)✅ 每个订阅者独立❌ 不是广播流式数据处理
Reactor Sinks.Many(热流)❌ 共享数据流✅ 所有订阅者都收到事件驱动架构

🚀 如果你是做 WebFlux、日志流、消息队列,选 Reactor!如果是应用内部事件解耦,选 Spring ApplicationEvent 🎯

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/75376.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM生产环境问题定位与解决实战(六):总结篇——问题定位思路与工具选择策略

本文已收录于《JVM生产环境问题定位与解决实战》专栏&#xff0c;完整系列见文末目录 引言 在前五篇文章中&#xff0c;我们深入探讨了JVM生产环境问题定位与解决的实战技巧&#xff0c;从基础的jps、jmap、jstat、jstack、jcmd等工具&#xff0c;到JConsole、VisualVM、MAT的…

【5090d】配置运行和微调大模型所需基础环境【一】

RuntimeError: Failed to import transformers.integrations.bitsandbytes because of the following error (look up to see its traceback): No module named triton.ops 原因&#xff1a;是因为在导入 transformers.integrations.bitsandbytes 时缺少必要的依赖项 triton.op…

华为交换综合实验——VRRP、MSTP、Eth-trunk、NAT、DHCP等技术应用

一、实验拓扑 二、实验需求 1,内网Ip地址使用172.16.0.0/16分配 2,sw1和SW2之间互为备份 3, VRRP/STP/VLAN/Eth-trunk均使用 4,所有Pc均通过DHCP获取IP地址 5,ISP只能配置IP地址 6,所有电脑可以正常访问IsP路由器环回 三、需求分析 1、设备连接需求 二层交换机&#xff08;LS…

DeepSeek 开源的 3FS 如何?

DeepSeek 3FS&#xff08;Fire-Flyer File System&#xff09;是一款由深度求索&#xff08;DeepSeek&#xff09;于2025年2月28日开源的高性能并行文件系统&#xff0c;专为人工智能训练和推理任务设计。以下从多个维度详细解析其核心特性、技术架构、应用场景及行业影响&…

Qt实现HTTP GET/POST/PUT/DELETE请求

引言 在现代应用程序开发中&#xff0c;HTTP请求是与服务器交互的核心方式。Qt作为跨平台的C框架&#xff0c;提供了强大的网络模块&#xff08;QNetworkAccessManager&#xff09;&#xff0c;支持GET、POST、PUT、DELETE等HTTP方法。本文将手把手教你如何用Qt实现这些请求&a…

echarts+HTML 绘制3d地图,加载散点+散点点击事件

首先&#xff0c;确保了解如何本地引入ECharts库。 html 文件中引入本地 echarts.min.js 和 echarts-gl.min.js。 可以通过官网下载或npm安装&#xff0c;但这里直接下载JS文件更简单。需要引入 echarts.js 和 echarts-gl.js&#xff0c;因为3D地图需要GL模块。 接下来是HTM…

深度剖析 MySQL 与 Redis 缓存一致性:理论、方案与实战

在当今的互联网应用开发中&#xff0c;MySQL 作为可靠的关系型数据库&#xff0c;与 Redis 这一高性能的缓存系统常常协同工作。然而&#xff0c;如何确保它们之间的数据一致性&#xff0c;成为了开发者们面临的重要挑战。本文将深入探讨 MySQL 与 Redis 缓存一致性的相关问题&…

DAO 类的职责与设计原则

1. DAO 的核心职责 DAO&#xff08;Data Access Object&#xff0c;数据访问对象&#xff09;的主要职责是封装对数据的访问逻辑&#xff0c;但它与纯粹的数据实体类&#xff08;如 DTO、POJO&#xff09;不同&#xff0c;也与 Service 业务逻辑层不同。 DAO 应该做什么&…

【Kubernetes】如何使用 kubeadm 搭建 Kubernetes 集群?还有哪些部署工具?

使用 kubeadm 搭建 Kubernetes 集群是一个比较常见的方式。kubeadm 是 Kubernetes 提供的一个命令行工具&#xff0c;它可以简化 Kubernetes 集群的初始化和管理。下面是使用 kubeadm 搭建 Kubernetes 集群的基本步骤&#xff1a; 1. 准备工作 确保你的环境中有两台或更多的机…

Pycharm(十二)列表练习题

一、门和钥匙 小X在一片大陆上探险&#xff0c;有一天他发现了一个洞穴&#xff0c;洞穴里面有n道门&#xff0c; 打开每道门都需要对应的钥匙&#xff0c;编号为i的钥匙能用于打开第i道门&#xff0c; 而且只有在打开了第i(i>1)道门之后&#xff0c;才能打开第i1道门&#…

在未归一化的线性回归模型中,特征的尺度差异可能导致模型对特征重要性的误判

通过数学公式来更清晰地说明归一化对模型的影响&#xff0c;以及它如何改变特征的重要性评估。 1. 未归一化的情况 假设我们有一个线性回归模型&#xff1a; y β 0 β 1 x 1 β 2 x 2 ϵ y \beta_0 \beta_1 x_1 \beta_2 x_2 \epsilon yβ0​β1​x1​β2​x2​ϵ 其…

JS—页面渲染:1分钟掌握页面渲染过程

个人博客&#xff1a;haichenyi.com。感谢关注 一. 目录 一–目录二–页面渲染过程三–DOM树和渲染树 二. 页面渲染过程 浏览器的渲染过程可以分解为以下几个关键步骤 2.1 解析HTML&#xff0c;形成DOM树 浏览器从上往下解析HTML文档&#xff0c;将标签转成DOM节点&#…

niuhe插件, 在 go 中渲染网页内容

思路 niuhe 插件生成的 go 代码是基于 github.com/ma-guo/niuhe 库进行组织管理的, niuhe 库 是对 go gin 库的一个封装&#xff0c;因此要显示网页, 可通过给 gin.Engine 指定 HTMLRender 来实现。 实现 HTMLRender 我们使用 gitee.com/cnmade/pongo2gin 实现 1. main.go …

openEuler24.03 LTS下安装HBase集群

前提条件 安装好Hadoop完全分布式集群&#xff0c;可参考&#xff1a;openEuler24.03 LTS下安装Hadoop3完全分布式 安装好ZooKeeper集群&#xff0c;可参考&#xff1a;openEuler24.03 LTS下安装ZooKeeper集群 HBase集群规划 node2node3node4MasterBackup MasterRegionServ…

LVGL移植说明

https://www.cnblogs.com/FlurryHeart/p/18104596 参考&#xff0c;里面说明了裸机移植以及freeRTOS系统移植。 移植到linux https://blog.csdn.net/sunchao124/article/details/144952514

ubuntu虚拟机裁剪img文件系统

1. 定制文件系统前期准备 将rootfs.img文件准备好&#xff0c;并创建target文件夹2. 挂载文件系统 sudo mount rootfs.img target #挂载文件系统 sudo chroot target #进入chroot环境3. 内裁剪文件系统 增删裁剪文件系统 exit #退出chroot环境 sudo umount target…

esp826601s固件烧录方法(ch340+面包板)

esp826601s固件烧录方法&#xff08;ch340面包板&#xff09; 硬件 stm32f10c8t6&#xff0c;esp826601s,面包板&#xff0c;ch340(usb转ttl),st_link&#xff08;供电&#xff09; 接线 烧录时&#xff1a; stm32f10c8t6&#xff1a;gnd->负极&#xff0c; 3.3->正极…

Servlet 点击计数器

Servlet 点击计数器 引言 Servlet 是 Java 企业版&#xff08;Java EE&#xff09;技术中的一种服务器端组件&#xff0c;用于处理客户端请求并生成动态内容。本文将详细介绍如何使用 Servlet 实现一个简单的点击计数器&#xff0c;帮助读者了解 Servlet 的基本用法和原理。 …

LangChain vs. LlamaIndex:深入对比与实战应用

目录 引言LangChain 与 LlamaIndex 概述 什么是 LangChain&#xff1f;什么是 LlamaIndex&#xff1f;两者的核心目标与适用场景 架构与设计理念 LangChain 的架构设计LlamaIndex 的架构设计关键技术差异 核心功能对比 数据连接与处理查询与检索机制上下文管理能力插件与扩展性…

【Java中级】10章、内部类、局部内部类、匿名内部类、成员内部类、静态内部类的基本语法和细节讲解配套例题巩固理解【5】

❤️ 【内部类】干货满满&#xff0c;本章内容有点难理解&#xff0c;需要明白类的实例化&#xff0c;学完本篇文章你会对内部类有个清晰的认知 &#x1f495; 内容涉及内部类的介绍、局部内部类、匿名内部类(重点)、成员内部类、静态内部类 &#x1f308; 跟着B站一位老师学习…