注意: 本文内容于 2024-12-28 21:22:12 创建,可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容,请访问原文地址:ReactiveStreams、Reactor、SpringWebFlux。感谢您的关注与支持!
ReactiveStreams是一个处理异步流的规范,定义了Publisher、Subscriber、Subscription、Processor接口。
Reactor是ReactiveStreams的实现,对于Publisher提供了两个核心实现——Mono与Flux。
SpringWebFlux是构建在Reactor之上的响应式Web框架。
本文源码
一、Reactive Streams
Reactive Streams 是一个用于处理异步流数据的标准规范,特别适合处理非阻塞、背压控制的场景。
所谓的背压控制,是指在异步数据流中,消费者根据自身的能力向生产者获取数据进行消费,以避免数据积压导致系统过载或者崩溃。
TCP中的拥塞控制,也可以看作是背压控制的一种实现。
1.1 API规范
Reactive Streams 的四大API接口如下
org.reactivestreams.Publisher: 发布者接口,提供数据流。- void subscribe(Subscriber<? super T> subscriber)
org.reactivestreams.Subscriber: 订阅者接口,接收数据流。- void onSubscribe(Subscription subscription)
- void onNext(T item)
- void onError(Throwable throwable)
- void onComplete()
org.reactivestreams.Subscription: 订阅关系接口,提供控制机制。- void request(long n)
- void cancel()
org.reactivestreams.Processor: 继承Publisher和Subscriber的接口。
简单绘制一个时序图,加深对整个链路的理解。

使用Publisher、Subscriber、Subscription实现一个简单的订阅功能,示例如下
以下代码,并没有异步相关的内容。只是为了学习整个API流转链路。
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;public class Example01 {private static final Logger log = LoggerFactory.getLogger(Example01.class);/*** 订阅关系*/public static Subscription getSubscription(Subscriber<? super String> subscriber, String... items) {return new Subscription() {private final AtomicBoolean canceled = new AtomicBoolean(false);private final AtomicInteger sendItems = new AtomicInteger(0);/*** request数据* 内部onNext会request后面的数据,而onComplete应该要等所有的数据消费完毕后,才会执行。* 故需要加锁保证线程安全,此处采取CAS。源码参考reactor.core.publisher.Operators.ScalarSubscription#request(long)*/@Overridepublic void request(long n) {if (n > 0) {if (canceled.get()) {return;}if (sendItems.get() >= items.length) {subscriber.onComplete();} else {subscriber.onNext(items[sendItems.getAndIncrement()]);}}}@Overridepublic void cancel() {canceled.compareAndSet(true, true);}};}/*** 发布者*/private static Publisher<String> getPublisher(String... items) {return new Publisher<String>() {@Overridepublic void subscribe(Subscriber<? super String> subscriber) {subscriber.onSubscribe(getSubscription(subscriber, items));}};}/*** 订阅者*/private static Subscriber<String> getSubscriber() {return new Subscriber<String>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;log.info("Subscribed to {}", s);// 请求第一个元素subscription.request(1);}@Overridepublic void onNext(String s) {log.info("Received {}", s);// 请求下一个元素subscription.request(1);}@Overridepublic void onError(Throwable t) {log.error("Error occurred", t);}@Overridepublic void onComplete() {log.info("All items received");}};}public static void main(String[] args) {// 订阅Flux// Flux.just("first", "second", "third").delayElements(Duration.ofSeconds(2))// .subscribe(getSubscriber());/*** org.reactivestreams.Publisher: 发布者* org.reactivestreams.Subscriber: 订阅者* org.reactivestreams.Subscription: 发布者和订阅者之间的桥梁,数据流控制的核心机制。*/// 订阅自定义PublishergetPublisher("first", "second", "third", "fourth", "fifth").subscribe(getSubscriber());while (true) {}}
}
运行结果

1.2 API实现库
Reactive Streams实现如下
- Java9+ java.util.concurrent.Flow
- RxJava: Reactive Extension Java
- Reactor: Reactor Framework
Java9+中提供了java.util.concurrent.Flow,在标准库中提供ReactiveStreams规范的接口。

ReactiveStreams内部也提供了适配JDK中Flow的适配器org.reactivestreams.FlowAdapters。
RxJava以及Reactor,分别用于Java开发中不同领域。RxJava一般用于Android开发,Reactor一般用于Spring开发。
二、Reactor
Reactor提供了两个核心类
- reactor.core.publisher.Flux:发布0或N个元素的异步数据流
- reactor.core.publisher.Mono:发布0或1个元素的异步数据流
这两者都是Publisher,主要区别在于发送数据的数量。因此在使用上,相关的API都是差不多的。
2.1 Mono
Mono中的静态方法,用于创建Mono实例。
Mono实例中的成员方法如下
| 方法名 | 说明 |
|---|---|
| and | 合并多个 Mono 实例,等所有 Mono 完成后返回一个新的 Mono。 |
| as | 用指定的类型包裹当前 Mono,通常用于类型转换。 |
| block | 阻塞并获取 Mono 的结果,直到执行完成。 |
| blockOptional | 类似于 block,但返回 Optional 包裹的结果。 |
| cache | 缓存当前 Mono 的值,使得未来的订阅者可以共享相同的结果。 |
| cacheInvalidateIf | 缓存失效条件满足时重新缓存,适用于动态失效策略。 |
| cacheInvalidateWhen | 在指定条件下使缓存失效。 |
| cancelOn | 当给定的 Publisher 发出信号时,取消当前 Mono。 |
| cast | 强制类型转换为指定的类型。 |
| checkpoint | 在流的执行过程中插入检查点,用于调试。 |
| concatWith | 与另一个 Mono 或 Flux 连接,按顺序执行。 |
| contextWrite | 修改 Mono 的上下文。 |
| defaultIfEmpty | 如果 Mono 为空,返回默认值。 |
| delayElement | 延迟发出元素的时间。 |
| delaySubscription | 延迟订阅,等到指定的时间或事件发生才开始订阅。 |
| delayUntil | 延迟直到指定的 Publisher 发出信号时才开始执行。 |
| dematerialize | 将一个包含 Signal 的 Mono 转换为原始值的 Mono。 |
| doAfterSuccessOrError | 在执行成功或出错后执行的操作。 |
| doAfterTerminate | 在 Mono 结束时执行的操作,不论成功或失败。 |
| doFinally | 在 Mono 完成时执行的最终操作。 |
| doFirst | 在 Mono 执行前执行的操作。 |
| doOnCancel | 当订阅者取消时执行的操作。 |
| doOnDiscard | 当元素被丢弃时执行的操作。 |
| doOnEach | 对每个发出的信号执行操作。 |
| doOnError | 当发生错误时执行的操作。 |
| doOnNext | 每次元素发出时执行的操作。 |
| doOnRequest | 在请求信号到达时执行的操作。 |
| doOnSubscribe | 在订阅时执行的操作。 |
| doOnSuccess | 当成功完成时执行的操作。 |
| doOnSuccessOrError | 无论成功还是失败,都执行的操作。 |
| doOnTerminate | 在终止时执行的操作。 |
| elapsed | 返回每个信号的时间戳。 |
| expand | 展开 Mono,生成新的 Mono,直到满足某个条件。 |
| expandDeep | 深度展开 Mono,通常递归调用直到满足条件。 |
| filter | 过滤元素,只有符合条件的元素才会发出。 |
| filterWhen | 使用 Publisher 的元素条件来过滤当前 Mono。 |
| flatMap | 转换元素,返回新的 Mono 或 Flux。 |
| flatMapIterable | 将每个元素转换为一个可迭代的元素。 |
| flatMapMany | 将元素转换为 Flux。 |
| flux | 将 Mono 转换为 Flux。 |
| handle | 基于元素的条件来决定如何处理流。 |
| hasElement | 判断是否包含元素。 |
| hide | 隐藏 Mono 的实现细节,返回一个不可观察的 Mono。 |
| ignoreElement | 忽略元素,只关心是否完成。 |
| log | 记录 Mono 中的信号,便于调试。 |
| map | 将元素映射为另一个元素。 |
| mapNotNull | 映射并排除空值。 |
| materialize | 将信号转化为一个 Signal 对象。 |
| mergeWith | 合并当前 Mono 和另一个 Mono。 |
| metrics | 获取流的度量信息。 |
| name | 为 Mono 设置名称,用于调试和监控。 |
| ofType | 根据类型过滤信号。 |
| onErrorContinue | 在发生错误时继续执行。 |
| onErrorMap | 将错误映射为其他类型。 |
| onErrorResume | 在发生错误时恢复操作。 |
| onErrorReturn | 在发生错误时返回默认值。 |
| onErrorStop | 在发生错误时终止流。 |
| onTerminateDetach | 在终止时解除与订阅者的连接。 |
| or | 连接另一个 Mono,如果当前 Mono 没有值或为空时执行。 |
| publish | 启动 Mono 并返回一个共享的流。 |
| publishOn | 指定在哪个线程调度上下文中执行 Mono。 |
| repeat | 重复执行 Mono,直到满足某个条件。 |
| repeatWhen | 基于另一个 Publisher 的信号来控制重复。 |
| repeatWhenEmpty | 当 Mono 为空时重复执行。 |
| retry | 在发生错误时重试操作。 |
| retryWhen | 基于另一个 Publisher 来控制重试。 |
| share | 共享执行的结果,避免重复执行。 |
| single | 获取 Mono 中唯一的元素。 |
| subscribe | 启动流的执行并订阅。 |
| subscribeOn | 指定在哪个线程调度上下文中订阅 Mono。 |
| subscribeWith | 通过指定的 Subscriber 订阅 Mono。 |
| subscriberContext | 获取或修改订阅时的上下文。 |
| switchIfEmpty | 如果 Mono 为空,则切换到另一个 Mono。 |
| tag | 为 Mono 打上标签,用于调试和日志。 |
| take | 限制只获取前 N 个元素。 |
| takeUntilOther | 当另一个 Publisher 发出信号时停止当前 Mono。 |
| then | 在当前 Mono 执行完后执行另一个操作。 |
| thenEmpty | 在当前 Mono 执行完后返回一个空的 Mono。 |
| thenMany | 在当前 Mono 执行完后返回一个 Flux。 |
| thenReturn | 在当前 Mono 执行完后返回指定的值。 |
| timed | 返回元素和其时间戳。 |
| timeout | 如果 Mono 在指定时间内没有发出信号,则触发超时。 |
| timestamp | 返回元素及其时间戳。 |
| toFuture | 将 Mono 转换为 Future。 |
| toProcessor | 将 Mono 转换为 Processor,适用于与 Flux 的结合。 |
| toString | 返回 Mono 的字符串表示。 |
| transform | 使用转换函数修改 Mono。 |
| transformDeferred | 延迟转换,直到订阅发生。 |
| transformDeferredContextual | 延迟转换并访问上下文。 |
| zipWhen | 与另一个 Mono 的信号配对,形成 Mono 的组合。 |
| zipWith | 与另一个 Mono 的信号进行合并,形成 Mono 的组合。 |
2.2 Flux
Flux中的静态方法,用于创建Flux实例。
Flux实例中的成员方法如下
| 方法名 | 说明 |
|---|---|
| all | 判断 Flux 中的所有元素是否满足给定条件。 |
| any | 判断 Flux 中是否有任何一个元素满足给定条件。 |
| as | 将 Flux 转换为指定类型的 Publisher。 |
| blockFirst | 阻塞并返回 Flux 中的第一个元素。 |
| blockLast | 阻塞并返回 Flux 中的最后一个元素。 |
| buffer | 将 Flux 中的元素分成固定大小的缓冲区。 |
| bufferTimeout | 按照指定的时间或缓冲区大小将元素分块。 |
| bufferUntil | 在满足某个条件时开始一个新的缓冲区。 |
| bufferUntilChanged | 将相邻相同的元素合并到同一个缓冲区。 |
| bufferWhen | 根据外部 Publisher 切换缓冲区。 |
| bufferWhile | 按照指定条件将元素分组为缓冲区。 |
| cache | 缓存 Flux 的值,使得未来的订阅者可以共享相同的结果。 |
| cancelOn | 当另一个 Publisher 发出信号时取消当前的 Flux。 |
| cast | 将 Flux 强制转换为指定的类型。 |
| checkpoint | 在执行流中插入检查点,用于调试和分析。 |
| collect | 收集流中的元素,按给定规则生成结果。 |
| collectList | 收集 Flux 中的所有元素并返回一个 List。 |
| collectMap | 将 Flux 中的元素收集为一个 Map。 |
| collectMultimap | 将 Flux 中的元素收集为一个多值 Map。 |
| collectSortedList | 将 Flux 中的元素收集为排序的 List。 |
| concatMap | 将元素转换为 Mono,按顺序处理。 |
| concatMapDelayError | 与 concatMap 类似,但在错误发生时延迟处理。 |
| concatMapIterable | 将每个元素转换为可迭代的元素,并按顺序合并。 |
| concatWith | 与另一个 Flux 连接,按顺序执行。 |
| concatWithValues | 连接多个值作为新的 Flux。 |
| contextWrite | 修改 Flux 的上下文。 |
| count | 统计 Flux 中元素的数量。 |
| defaultIfEmpty | 如果 Flux 为空,则返回默认值。 |
| delayElements | 延迟元素的发出。 |
| delaySequence | 延迟整个序列的发出。 |
| delaySubscription | 延迟订阅,直到指定的时间或事件发生。 |
| delayUntil | 延迟直到另一个 Publisher 发出信号。 |
| dematerialize | 将一个包含 Signal 的 Flux 转换为原始元素的 Flux。 |
| distinct | 过滤掉重复的元素,保持唯一性。 |
| distinctUntilChanged | 过滤掉相邻重复的元素。 |
| doAfterTerminate | 在 Flux 完成后执行的操作。 |
| doFinally | 在 Flux 终止时执行的操作。 |
| doFirst | 在 Flux 执行前执行的操作。 |
| doOnCancel | 在 Flux 被取消时执行的操作。 |
| doOnComplete | 在 Flux 完成时执行的操作。 |
| doOnDiscard | 在元素被丢弃时执行的操作。 |
| doOnEach | 对 Flux 发出的每个元素执行操作。 |
| doOnError | 在发生错误时执行的操作。 |
| doOnNext | 每次 Flux 发出元素时执行的操作。 |
| doOnRequest | 在请求信号到达时执行的操作。 |
| doOnSubscribe | 在订阅时执行的操作。 |
| doOnTerminate | 在 Flux 终止时执行的操作。 |
| elapsed | 获取每个元素的时间戳和持续时间。 |
| elementAt | 获取指定索引处的元素。 |
| expand | 对每个元素进行展开,生成新的元素流。 |
| expandDeep | 深度展开 Flux,通常递归展开元素。 |
| filter | 过滤出符合条件的元素。 |
| filterWhen | 使用外部 Publisher 的信号过滤 Flux 中的元素。 |
| flatMap | 将元素转换为 Flux,并合并其发出的所有元素。 |
| flatMapDelayError | 在发生错误时延迟元素的转换。 |
| flatMapIterable | 将元素转换为可迭代的 Flux。 |
| flatMapSequential | 顺序地将元素转换为 Flux。 |
| flatMapSequentialDelayError | 顺序转换,并在发生错误时延迟。 |
| getPrefetch | 获取 Flux 的预取量。 |
| groupBy | 将元素按指定的键分组。 |
| groupJoin | 类似 groupBy,但用于联接多个流。 |
| handle | 根据元素的条件进行流的处理。 |
| hasElement | 判断 Flux 中是否包含某个元素。 |
| hasElements | 判断 Flux 中是否包含多个元素。 |
| hide | 隐藏 Flux 的实现细节,返回不可观察的流。 |
| ignoreElements | 忽略 Flux 中的所有元素,只关心终止信号。 |
| index | 返回元素在流中的索引。 |
| join | 将多个 Flux 中的元素合并为一个字符串。 |
| last | 获取 Flux 中的最后一个元素。 |
| limitRate | 限制从流中请求的元素数量。 |
| limitRequest | 限制从流中请求的最大元素数量。 |
| log | 记录流中的元素,用于调试。 |
| map | 将元素映射为新的类型。 |
| mapNotNull | 映射并排除空值。 |
| materialize | 将信号转换为 Signal 对象。 |
| mergeComparingWith | 将两个 Flux 合并并根据比较条件排序。 |
| mergeOrderedWith | 将两个有序的 Flux 合并。 |
| mergeWith | 合并当前 Flux 和另一个 Flux。 |
| metrics | 获取流的度量信息。 |
| name | 为 Flux 设置名称,便于调试。 |
| next | 获取 Flux 中的下一个元素。 |
| ofType | 根据类型过滤信号。 |
| onBackpressureBuffer | 在背压时缓存元素。 |
| onBackpressureDrop | 在背压时丢弃元素。 |
| onBackpressureError | 在背压时触发错误。 |
| onBackpressureLatest | 在背压时保留最新的元素。 |
| onErrorContinue | 在发生错误时继续执行。 |
| onErrorMap | 在错误时将其映射为其他类型。 |
| onErrorResume | 在错误时恢复操作。 |
| onErrorReturn | 在错误时返回默认值。 |
| onErrorStop | 在错误时终止流。 |
| onTerminateDetach | 在终止时分离与订阅者的连接。 |
| or | 连接另一个 Flux,如果当前 Flux 为空时执行。 |
| parallel | 将 Flux 分发到多个线程进行并行处理。 |
| publish | 启动 Flux 并返回一个共享流。 |
| publishNext | 在流的每个元素发出时开始新的发布。 |
| publishOn | 指定在哪个线程调度上下文中执行流。 |
| reduce | 将流中的所有元素合并为单一值。 |
| reduceWith | 使用指定初始值对元素进行合并。 |
| repeat | 重复执行 Flux 直到满足某个条件。 |
| repeatWhen | 基于另一个 Publisher 的信号来控制重复。 |
| replay | 缓存并重播流中的元素。 |
| retry | 在发生错误时重试操作。 |
| retryWhen | 基于另一个 Publisher 来控制重试。 |
| sample | 每隔指定时间间隔取一个元素。 |
| sampleFirst | 获取流中的第一个元素。 |
| sampleTimeout | 超过指定时间间隔时触发超时操作。 |
| scan | 对流中的元素执行累加操作。 |
| scanWith | 使用给定的初始值对元素执行累加操作。 |
| share | 共享流的执行,避免重复执行。 |
| shareNext | 将下一个发出的元素共享给多个订阅者。 |
| single | 获取 Flux 中唯一的元素。 |
| singleOrEmpty | 获取 Flux 中唯一的元素,如果为空返回空。 |
| skip | 跳过流中的前 N 个元素。 |
| skipLast | 跳过流中的最后 N 个元素。 |
| skipUntil | 跳过直到满足某个条件的元素。 |
| skipUntilOther | 跳过直到另一个 Flux 发出信号时的元素。 |
| skipWhile | 跳过直到满足条件的元素。 |
| sort | 对流中的元素进行排序。 |
| startWith | 在流的开始处添加额外元素。 |
| subscribe | 订阅并启动 Flux。 |
| subscribeOn | 指定在哪个线程调度上下文中订阅流。 |
| subscribeWith | 通过指定的 Subscriber 订阅流。 |
| subscriberContext | 获取或修改订阅时的上下文。 |
| switchIfEmpty | 如果 Flux 为空,则切换到另一个 Flux。 |
| switchMap | 将元素转换为另一个 Flux 并切换执行。 |
| switchOnFirst | 在流开始时选择一个 Flux 进行切换。 |
| tag | 为 Flux 打标签,便于调试和日志。 |
| take | 限制只获取前 N 个元素。 |
| takeLast | 获取流中的最后 N 个元素。 |
| takeUntil | 获取直到满足条件为止的元素。 |
| takeUntilOther | 获取直到另一个 Flux 发出信号时的元素。 |
| takeWhile | 获取满足条件的元素,直到条件不满足为止。 |
| then | 在当前流完成后执行另一个操作。 |
| thenEmpty | 在当前流完成后返回一个空流。 |
| thenMany | 在当前流完成后返回另一个 Flux。 |
| timed | 返回每个元素的时间戳和持续时间。 |
| timeout | 如果 Flux 在指定时间 |
三、SpringWebFlux
3.1 WebHandler与WebFilter
在SpringMVC中,有Servlet、Filter。
在SpringWebFlux中,有WebHandler、WebFilter,对标的其实就是Servlet API中的Servlet、Filter。甚至执行链也是相似的设计。
Servlet相关知识阅读Servlet - 言成言成啊
Filter相关知识阅读Filter和Listener - 言成言成啊

WebFilter的注册如下
@Bean
@Order(0) // 值越小,优先级越高
@ConditionalOnProperty(name = "allowAllCors.learnFilter", havingValue = "true")
public WebFilter aFilter() {/*** 在servlet中。请求的扭转是 aFilter-->bFilter-->servlet-->bFilter-->aFilter* 在webflux中同理。Filter对应WebFilter,Servlet对应WebHandler*/return (exchange, chain) -> {log.info("aFilter start");return chain.filter(exchange).doOnSuccess(t -> log.info("aFilter end"));};
}
3.2 实际案例
跨域配置
@Bean
@Order(Integer.MIN_VALUE)
@ConditionalOnProperty(name = "allowAllCors.personal", havingValue = "true")
public WebFilter personalCorsFilter(WebSocketHandlerAdapter webFluxWebSocketHandlerAdapter) {WebFilter webFilter = (exchange, chain) -> {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();HttpHeaders headers = response.getHeaders();//用*会导致范围过大,浏览器出于安全考虑,在allowCredentials为true时会不认*这个操作,因此可以使用如下代码,间接实现允许跨域headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, request.getHeaders().getFirst("origin"));headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, "*");headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, "*");//允许跨域发送cookieheaders.set(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");if ("OPTIONS".equalsIgnoreCase(request.getMethod().name())) {response.setStatusCode(HttpStatus.OK);return Mono.empty();} else {return chain.filter(exchange);}};log.info("allowAllCors.personal is set to true");return webFilter;}
全局异常拦截/定义响应格式
首先,定义通用响应格式
import lombok.Data;
import reactor.core.publisher.Mono;@Data
public class Resp<T> {private int code;private String msg;private T data;public static <T> Resp<T> ok(T t) {Resp<T> resp = new Resp<>();resp.setCode(0);resp.setMsg("成功");resp.setData(t);return resp;}public static Resp<Void> failure(String msg) {Resp<Void> resp = new Resp<>();resp.setCode(1);resp.setMsg("失败: " + msg);return resp;}public static Resp<Void> error() {Resp<Void> resp = new Resp<>();resp.setCode(500);resp.setMsg("服务器内部错误");return resp;}public static <T> Mono<Resp<T>> getSuccessResp(Mono<T> mono) {return mono.map(Resp::ok);}public static Mono<Resp<Void>> getFailureResp(String msg) {return Mono.just(failure(msg));}public static Mono<Resp<Void>> getErrorResp() {return Mono.just(error());}
}
其次,定义自定义异常DIYException。
最后,配置全局异常拦截。
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import reactor.core.publisher.Mono;
import top.meethigher.utils.Resp;@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {@ExceptionHandler(Exception.class)public Mono<Resp<Void>> handleException(Exception e) {log.error("api occurred exception", e);return Resp.getErrorResp();}@ExceptionHandler(DIYException.class)public Mono<Resp<Void>> handleDiyException(DIYException e) {log.error("api occurred exception", e);return Resp.getFailureResp(e.getMessage());}
}