做视频网站广告收费定制网站
做视频网站广告收费,定制网站,乐陵关键词seo,做网站 sql 用哪种【JavaWeb】Spring非阻塞通信 - Spring Reactive之WebFlux的使用 文章目录 【JavaWeb】Spring非阻塞通信 - Spring Reactive之WebFlux的使用参考资料一、初识WebFlux1、什么是函数式编程1#xff09;面向对象编程思维 VS 函数式编程思维#xff08;封装、继承和多态描述事物间…【JavaWeb】Spring非阻塞通信 - Spring Reactive之WebFlux的使用 文章目录 【JavaWeb】Spring非阻塞通信 - Spring Reactive之WebFlux的使用参考资料一、初识WebFlux1、什么是函数式编程1面向对象编程思维 VS 函数式编程思维封装、继承和多态描述事物间的联系 VS 对运算过程函数即变量间的映射关系进行抽象2Java中的函数式编程流式编程 2、对ChatGPT进行关于WebFlux的提问1基本概念理解问题2代码理解问题 二、WebFlux基本概念1、WebFlux核心理念Reactive 响应式宣言快速响应弹性伸缩消息驱动2、WebFlux特点与MVC的比较异步非阻塞去Servlet函数式编程3、WebFlux模块的整体架构响应式模式 观察者模式上下游数据传递Subscriber调用Publisher 的 subscribe() 订阅数据Publisher 调用 Subscriber 的 onSubscribe()传递数据订阅媒介SubscriptionSubscriber可以通过Subscription#request请求数据通过Subscription#cancel取消数据发布等4、WebFlux中关于Flux和Mono的介绍Flux和 Mono 都是数据流的发布者可以发出元素值错误信号完成信号 二、WebFlux的基本使用1、WebFlux的使用 - Flux / Mono / 操作符1WebFlux如何接入到Springboot项目2快速上手Reactive3Flux的使用常用方法just()fromArray()fromIterable()和 fromStream()empty()error()never()range(int start, int count)interval(Duration period) 和 interval(Duration delay, Duration period)4Mono的使用常用方法fromCallable()、fromCompletionStage()、delay(Duration duration)、ignoreElements(Publisher source)、justOrEmpty(Optional? extends T data)5、操作符的使用bufferbufferTimeoutbufferUntilbufferWhilefilterwindowzipWithBiFunctiontakereducereduceWithmergemergeSequentialflatMapflatMapSequentialconcatMapcombineLatest6其他说明 2、WebFlux响应式编程说明容易踩的坑、以及编程规范代码待实践验证1基本概念2选择合适的操作符响应式/非响应式方法比如flatMap是响应式方法doOnNext不是响应式方法因为返回值类型是ConsumerT3null处理4非阻塞与阻塞5上下文6常见问题a问题一我写的操作看上去是正确的但是没有执行b问题二**我想获取流中的元素怎么办 - 不要试图取值**c问题三我需要在非响应式方法中使用响应式怎么办 参考资料
官方文档
Spring Reactive官方文档Reactor 3 Reference Guide关于为什么会有这个WebFlux框架有很详细的介绍Spring Reactor-core 3.6.4官方文档Reactive Streams 官方介绍Spring Reactive Stream APISpring WebFlux 官方文档
博客
WebFlux响应式框架快速入门Flux中的map、flatMap、concatMap的区别WebFlux 响应式编程介绍及简单实现响应式编程入门之 Project ReactorProject Reactor 响应式编程响应式编程说明
视频
【Spring Webflux】深入Spring5新特性Webflux模块详解
一、初识WebFlux
1、什么是函数式编程
1面向对象编程思维 VS 函数式编程思维封装、继承和多态描述事物间的联系 VS 对运算过程函数即变量间的映射关系进行抽象
参考 https://blog.csdn.net/weixin_38255079/article/details/122437312
函数式编程(Functional Programming, FP)FP 是编程范式之一我们常听说的编程范式还有面向过程编程、面向对象编程。
面向对象编程的思维方式把现实世界中的事物抽象成程序世界中的类和对象通过封装、继承和多态来演示事物事件的联系函数式编程的思维方式把现实世界的事物和事物之间的联系抽象到程序世界对运算过程进行抽象 程序的本质根据输入通过某种运算获得相应的输出程序开发过程中会涉及很多有输入和输出的函数 x - f(联系、映射) - yyf(x)函数式编程中的函数指的不是程序中的函数(方法)而是数学中的函数即映射关系例如y sin(x)x和y的关系相同的输入始终要得到相同的输出(纯函数)函数式编程用来描述数据(函数)之间的映射
2Java中的函数式编程流式编程 常见的函数式编程接口SupplierConsumerFunctionUnaryOperatorBiFunction public class LambdaDemo2 {public static void main(String[] args) {/*** 常见的函数式编程接口*//*** Supplier: 没有输入 只有一个输出*/SupplierString supplier () - {return Supplier: Hello world;};System.out.println(supplier.get());/*** Consumer: 只有一个输入 没有输出*/ConsumerString consumer (e) - {System.out.println(Consumer: e);};consumer.accept(Hello world);/*** Function: 输入T 输出R*/FunctionString,String function (e) - {return Function: e;};System.out.println(function.apply(Hello world));/*** UnaryOperator: 输出输入都是T*/UnaryOperatorString unaryOperator (e) - {return UnaryOperator: e;};System.out.println(unaryOperator.apply(Hello world));/*** BiFunction: 输入T,U 输出R*/BiFunctionInteger,Integer,String BiFunction (x,y) - {return BiFunction: (x y);};System.out.println(BiFunction.apply(1,2));}
} 流式编程的中间操作可以执行多次和终止操作只能执行1次 函数式编程小例子 public class LambdaDemo1 {public static void main(String[] args) {/*** 通过流式编程从如下数组中提取得到 belo* 流程如下bo_le - bole - b,o,l,e - belo*/String arr[] new String[]{bo_le, bo_le, ba*ll, ba(e, ba*e,};OptionalString first Stream.of(arr).filter(e - {return e.equals(bo_le);}).distinct().map(e - e.replaceAll(_, ))
// .map(e - Stream.of(e.split())).flatMap(e - Stream.of(e.split())).sorted().findFirst();System.out.println(first.get());/*** Note:* map(Function f)* 接收一个函数作为参数该函数会被应用到每个元素上并将其映射成一个新的元素返回的数据还是一个流。* flatMap(Function f)* 接收一个函数作为参数将流中的每个值都换成另一个流然后把所有流连接成一个流。** 比如上面的flatMap是将StreamString[]转换为StreamString* map是将StreamString转换成StreamString[]** 参考 https://www.cnblogs.com/bigjor/p/17009630.html*/}
}2、对ChatGPT进行关于WebFlux的提问
1基本概念理解问题 问题1什么是背压- 处理生产者消费者速度不匹配的问题 背压Backpressure是在响应式编程中用来处理生产者和消费者之间速度不匹配的情况的一种策略。在异步数据流中如果生产者产生的事件或数据速度过快而消费者的处理速度跟不上就可能导致数据积压最终引发问题。 背压机制的主要目标是协调生产者和消费者之间的速度以避免资源耗尽、内存溢出等问题。有几种常见的处理背压的策略 缓冲Buffering 数据流中的元素被缓存在一个有限大小的缓冲区中当缓冲区满时生产者可能需要等待或者新的元素可能会被抛弃。丢弃Dropping 当消费者处理速度不足以跟上生产者时可以选择丢弃一些元素以保持整体的流畅性。这可能会导致部分数据的丢失。最新元素Latest Element 只保留最新产生的元素丢弃之前的元素。这样确保消费者获得的是最新的数据。异常Error 当出现背压情况时直接抛出异常中止数据流。 在背压的处理中要根据具体的应用场景和需求选择合适的策略。不同的响应式编程框架和库提供了不同的背压处理机制如 Reactor 中的 onBackpressureBuffer、RxJava 中的 onBackpressureBuffer 等。 问题2那生产者是如何产生元素的呢产生的元素是怎样被订阅到的呢- 通过回调函数来订阅或通知 在响应式编程中生产者通常是通过生成异步事件或者数据来产生元素。这些异步事件可以是来自外部系统、用户输入、定时器、网络请求等。 在 Reactor 中生产者主要使用 Mono 和 Flux 这两个类来表示。 Mono单值 用于表示包含零个或一个元素的数据流。可以通过 Mono.just(value) 创建一个包含单个元素的 Mono。 MonoString mono Mono.just(Hello, World!);Flux多值 用于表示包含零个、一个或多个元素的数据流。可以通过 Flux.just(value1, value2, ...) 创建一个包含多个元素的 Flux。 FluxInteger flux Flux.just(1, 2, 3, 4, 5);这些生产者创建的数据流可以通过订阅者Subscriber进行订阅。订阅者是实现了 Subscriber 接口的对象通过实现接口的方法可以定义对数据流中元素的处理方式。 以下是一个简单的例子展示了如何使用 Reactor 中的 Mono 和 Flux 创建生产者并通过订阅者进行订阅 import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorExample {public static void main(String[] args) {// 创建一个包含单个元素的 MonoMonoString mono Mono.just(Hello, World!);// 创建一个包含多个元素的 FluxFluxInteger flux Flux.just(1, 2, 3, 4, 5);// 订阅 Mono并定义订阅者的行为mono.subscribe(value - System.out.println(Mono Received value: value),error - System.err.println(Mono Error: error),() - System.out.println(Mono Completed));// 订阅 Flux并定义订阅者的行为flux.subscribe(value - System.out.println(Flux Received value: value),error - System.err.println(Flux Error: error),() - System.out.println(Flux Completed));}
}在这个例子中通过 Mono.just 创建了一个包含单个元素的 Mono通过 Flux.just 创建了一个包含多个元素的 Flux。然后通过 subscribe 方法订阅这两个生产者传递了实现了 Subscriber 接口的对象定义了对数据流中元素的处理方式。 问题3响应式编程和传统编程是不是只在设计理念上有所不同在压测下最终的效果都差不多- 响应式编程适合并发实时性要求高、事件驱动的场景而传统编程更适合简单、同步的应用场景 响应式编程和传统编程在设计理念和编码模型上确实有很大的不同主要是由于响应式编程旨在处理异步和事件驱动的场景。虽然在某些情况下它们可以达到相似的效果但在特定的应用场景中响应式编程具有独特的优势。 异步处理 响应式编程更强调异步和非阻塞的操作适用于处理事件流、用户界面交互、网络请求等异步场景。传统编程往往以同步的方式进行数据处理可能需要使用回调或者多线程来实现异步操作。背压处理 响应式编程通过背压机制来处理生产者和消费者速度不匹配的问题确保系统在高负载情况下能够优雅地处理数据流。传统编程中可能需要手动处理这些情况而响应式编程提供了一些内置的机制。组合性 响应式编程强调数据流的组合性通过操作符来构建复杂的数据处理逻辑。这使得编写具有高度可组合性的代码更为容易。传统编程通常采用命令式的风格可能需要更多的代码来实现相同的逻辑。反应式流式处理 响应式编程通过流式处理的方式更容易处理数据流使得代码更具表达性和简洁性。在传统编程中可能需要使用循环、迭代等结构来处理流程。 在一些高并发、实时性要求高、事件驱动的场景下响应式编程的优势更为明显。然而在某些简单、同步的应用场景中传统编程可能更为直观和容易理解。对于性能问题具体的影响可能要根据具体的实现和使用场景来评估而不是简单地归结为响应式编程和传统编程的选择。 问题4那reactor projector是如何实现异步处理的是不使用Tomcat服务器使用Netty服务器的缘故吗 Reactor Projector 是 Reactor 生态系统中的一个模块用于提供反应式流Reactive Streams的实现。它的异步处理是通过使用 Reactor 的核心模块提供的反应式编程特性来实现的。 Reactor Projector 主要建立在 Reactor Core 之上而 Reactor Core 是 Reactor 生态系统的核心库提供了一套用于构建反应式应用程序的基础设施。Reactor Core 中包含了 Flux、Mono 等类用于表示异步数据流和异步单值。 在 Reactor 的异步处理中关键的组件是 Scheduler它负责管理并发执行异步任务。Reactor 使用 Netty、Project Reactor 的 Reactor Netty、以及其他一些调度器来提供异步执行的能力。这些调度器允许你指定在哪个线程上执行数据流的处理以及如何处理并发的问题。 在 Web 应用程序中Netty 是一个高性能的异步事件驱动的网络框架被用于构建响应式的、非阻塞的网络应用。因此当使用 Reactor Projector 来构建反应式 Web 应用时它可能倾向于使用 Netty 作为底层服务器以便更好地适应异步和非阻塞的要求。 在传统的 Servlet 容器如 Tomcat中处理方式可能更偏向于阻塞式的同步处理因此 Reactor 在这样的环境中可能不如在异步的 Netty 中表现出色。不过也可以在传统的 Servlet 容器中使用 Reactor但可能需要一些适配工作。 总体来说Reactor Projector 和 Reactor Core 提供了一套用于构建异步、响应式应用程序的工具和模块而底层的异步处理是由调度器和特定的底层实现如 Netty来支持的。
2代码理解问题 问题1Reactor projector中关于flux.subscribe订阅的是什么数据 在 Reactor Projector 中Flux 是一个表示异步多值序列的类。通过 Flux你可以创建一个数据流并通过 subscribe 方法添加订阅者让其对数据流中的元素进行消费。 Flux 表示的数据流可以包含来自不同源的多个元素并且可以在异步的情况下进行处理。当你调用 subscribe 方法时你可以传递一个订阅者对象该订阅者对象定义了如何处理数据流中的元素、错误以及完成信号。 具体来说在 Reactor 中Flux.subscribe 方法的订阅者Subscriber接口如下 public interface SubscriberT {void onSubscribe(Subscription s);void onNext(T t);void onError(Throwable t);void onComplete();
}onSubscribe(Subscription s): 当订阅发生时会调用这个方法传递一个 Subscription 对象通过它可以请求数据或者取消订阅。onNext(T t): 当新的元素可用时会调用这个方法传递新的元素。onError(Throwable t): 当发生错误时会调用这个方法传递错误信息。onComplete(): 当数据流完成时会调用这个方法。 当你使用 Flux.subscribe 订阅一个 Flux 时你需要提供一个实现了 Subscriber 接口的对象该对象定义了对数据流中不同事件的处理方式。 下面是一个简单的示例展示了如何使用 Flux 创建一个数据流并通过订阅者处理其中的元素 import reactor.core.publisher.Flux;public class FluxExample {public static void main(String[] args) {FluxInteger flux Flux.just(1, 2, 3, 4, 5);// 订阅并定义订阅者的行为flux.subscribe(value - System.out.println(Received value: value),error - System.err.println(Error: error),() - System.out.println(Completed));}
}在这个例子中flux.subscribe 的三个参数分别对应订阅者的 onNext、onError 和 onComplete 方法定义了对数据流中元素的处理方式。 问题2在 Reactor Projector 库中sinks有什么作用 在 Reactor Projector 库中Sinks 是用于创建不同种类的 Sinks可观察对象的生产者的工厂类。Sinks 提供了一组方法允许你创建不同类型的 Sinks包括单值和多值的 Sinks。这些 Sinks 可以被用于创建 Flux 或 Mono 对象从而构建响应式数据流。 其中Sinks.many() 方法用于创建多值的 Sink而 Sinks.one() 方法用于创建单值的 Sink。这两种类型的 Sink 分别对应于多值的数据流和单值的数据流。在这些 Sinks 上你可以进行一系列的操作例如添加元素、触发错误、完成流等。 下面是一个简单的例子演示了如何使用 Sinks.many() 创建一个 Flux import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;public class SinksExample {public static void main(String[] args) {// 创建一个多值的 SinkSinks.ManyString sink Sinks.many().multicast().onBackpressureBuffer();// 使用 Sink 创建 FluxFluxString flux sink.asFlux();// 向 Sink 中添加元素sink.tryEmitNext(Element 1);sink.tryEmitNext(Element 2);// 订阅 Flux处理元素flux.subscribe(System.out::println);// 向 Sink 中添加更多元素sink.tryEmitNext(Element 3);sink.tryEmitNext(Element 4);// 完成 Sink结束数据流sink.tryEmitComplete();}
}在这个例子中通过 Sinks.many() 创建了一个多值的 Sink然后通过 sink.asFlux() 将其转换为 Flux。之后通过 sink.tryEmitNext() 向 Sink 中添加元素通过订阅 Flux 来处理这些元素。最后通过 sink.tryEmitComplete() 完成 Sink结束数据流。 问题3在react projector库中下面这代码是什么意思Sinks.Many processor Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false); 在 Reactor Projector 库中这段代码是用于创建 FluxSink 的配置。让我们逐步解释 Sinks.many(): 这是用于创建多值多元素FluxSink的方法。Sinks 是 Reactor 提供的用于创建不同种类的 Sinks可观察对象的生产者的工厂类。many() 表示创建一个多值的 Sink。multicast(): 这是一个操作符用于将 FluxSink 转换为一个可以被多个订阅者Subscribers共享的多播multicast的 Flux。这样多个订阅者可以共享相同的数据流。onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false): 这是配置背压backpressure策略的部分。在反应式编程中当生产者速度超过消费者速度时就会出现背压。在这里onBackpressureBuffer 用于定义缓冲区的大小和是否使用异步线程来处理溢出。 Queues.SMALL_BUFFER_SIZE: 这是用于定义缓冲区大小的常量。在这里使用了一个小型缓冲区大小表示缓冲区能够存储的元素数量有限。false: 表示不使用异步线程来处理溢出。当缓冲区满了新的元素就会被抛弃而不是等待缓冲区有空间。 综合起来这段代码的目的是创建一个多值的 FluxSink并将其转换为一个多播的 Flux同时配置了一个缓冲区当订阅者处理速度不足以跟上生产者时使用缓冲区来缓存元素当缓冲区满了时新的元素会被抛弃。 问题4那processor.asFlux();这个方法有什么作用 processor.asFlux() 方法用于将创建的 Sinks.Many 转换为 Reactor 的 Flux 对象。在 Reactor 中Flux 是表示多值异步序列的主要类而 Sinks.Many 是用于生产多值的数据流的生产者。 通过 processor.asFlux()你可以将 Sinks.Many 转换为 Flux然后将其用于订阅者消费数据流。这样你可以使用 Reactor 提供的各种操作符和方法来处理这个数据流例如 map、filter、flatMap 等。 举例来说可以像下面这样使用 processor.asFlux() import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;public class ProcessorExample {public static void main(String[] args) {// 创建一个多值的 SinkSinks.ManyString processor Sinks.many().multicast().onBackpressureBuffer();// 转换为 FluxFluxString flux processor.asFlux();// 订阅 Flux处理元素flux.subscribe(System.out::println);// 向 Sink 中添加元素processor.tryEmitNext(Element 1);processor.tryEmitNext(Element 2);// 完成 Sink结束数据流processor.tryEmitComplete();}
}在这个例子中通过 processor.asFlux() 将 Sinks.Many 转换为 Flux然后通过 flux.subscribe(System.out::println) 订阅这个 Flux最后通过 processor.tryEmitNext 向 Sinks.Many 添加元素。这样就可以通过 Flux 的订阅者机制来消费数据流。
二、WebFlux基本概念
Spring Reactive官方介绍如下https://spring.io/reactive Reactive systems have certain characteristics that make them ideal for low-latency, high-throughput workloads. Project Reactor and the Spring portfolio work together to enable developers to build enterprise-grade reactive systems that are responsive, resilient, elastic, and message-driven. Reactive的技术栈如下 The Spring portfolio provides two parallel stacks. One is based on a Servlet API with Spring MVC and Spring Data constructs. The other is a fully reactive stack that takes advantage of Spring WebFlux and Spring Data’s reactive repositories. In both cases, Spring Security has you covered with native support for both stacks. 反应式技术组件的关系WebFlux SpringBoot r2dbc底层用到了JDK 8的函数式编程
在IOT平台中使用webFlux非阻塞通信使用**Netty容器**而非Tomcat容器不使用Servlet APISpring WebFlux与Spring MVC是同级别的如果共存则优先使用SpringMVC
1、WebFlux核心理念Reactive 响应式宣言快速响应弹性伸缩消息驱动
参考WebFlux响应式框架快速入门
响应式编程顾名思义就是在于响应二字我们需要在某个事件发生时做出响应。
这里提一个著名的设计原则好莱坞原则(Hollywood principle)
Dont call us, we will call you.
演员提交简历之后回家等着就好演艺公司会主动打电话给你。 传统的Spring MVC是基于Servlet API的框架Spring WebFlux是一套全新的Reactive Web技术栈实现完全非阻塞支持Reactive Streams背压等特性并且运行环境不限于Servlet容器Tomcat、Jetty、Undertow如Netty等
“Reactive”响应式宣言是WebFlux的核心目标它是指应用对变化负载的变化、外部服务可用性的变化等做出即时响应使得应用保持正常。 根据响应式宣言(The Reactive Manifesto)响应式系统具备以下特点
快速响应(Responsive) 系统在各种情况下都会尽全力保证及时响应。它是可用性和实用性的基础, 还意味着问题能被迅速发现,并得到有效处理。回弹性(Resilient) 系统在面临故障时依然保持快速响应。它是通过**复制(replication)、抑制(containment)、隔离(isolation)和委托(delegation)**来实现的故障被抑制在单个组件中, 且各组件相互隔离, 使系统在部分失败和恢复时, 可以不影响整体的功能。每个组件的恢复都委托给另一个(外部)组件, 高可用在必要时通过复制来保证。 3.可伸缩性(Elastic) 系统在不同的负载下都能保持快速的响应。响应式系统可以根据服务的请求量, 动态增加或减少相应的资源。消息驱动(Message Driven) 响应式系统依赖异步的消息传递信号量 以确定各种组件的边界, 并确保松耦合(loose coupling)、隔离性(isolation)、位置透明性(location transparency), 并提供将错误封装为消息的手段。
2、WebFlux特点与MVC的比较异步非阻塞去Servlet函数式编程 异步和非阻塞支持了应用层的异步和底层的IO非阻塞 函数式编程使用Lambda表达式和函数式接口来定义请求处理程序。 去servlet允许可以不基于servlet API。默认的Netty容器不基于Servlet API。
Spring WebFlux与Spring MVC的比较通义千问 Spring MVC和Spring WebFlux是Spring框架中用于构建Web应用程序的两种不同模型它们在设计理念、并发模型以及对资源利用上有所不同。 Spring MVC (Model-View-Controller) 同步阻塞模型基于Servlet API遵循传统的请求/响应模式每个请求会创建一个新的线程来处理直到响应被完全生成并发送给客户端。架构MVC架构清晰易于理解和实现。它包含控制器Controller、模型对象Model和视图View适合构建基于HTTP的同步RESTful服务。资源利用对于短时且密集型I/O操作不是特别高效因为每个请求都会占用一个线程直到完成可能会导致大量的线程消耗。适用场景适用于大部分传统Java Web应用尤其是需要与大量现有Servlet容器集成、依赖同步API或者对异步编程不敏感的应用。 Spring WebFlux 非阻塞、反应式编程模型基于Reactor或Project Reactor库实现支持异步非阻塞I/O可以在单个线程上处理多个请求从而提高服务器的并发性能和资源利用率。架构采用函数式编程风格使用HandlerFunction、RouterFunction等组件进行路由和处理请求。可以结合Reactive Streams规范处理流式数据。资源利用非常适用于高并发、低延迟的场景如实时分析、物联网设备数据处理等因为它可以避免不必要的线程上下文切换并在较少的线程上处理更多的请求。适用场景适用于云原生环境、微服务架构中的高并发场景尤其当系统设计倾向于事件驱动和反应式编程时。 总结 Spring MVC更适合传统的多线程同步服务开发兼容性好生态广泛易用性强。Spring WebFlux则适用于现代的、异步和非阻塞的服务设计能够充分利用底层操作系统和硬件的性能在处理大量并发请求时表现更优但可能需要开发者具备一定的响应式编程基础。 3、WebFlux模块的整体架构响应式模式 观察者模式上下游数据传递Subscriber调用Publisher 的 subscribe() 订阅数据Publisher 调用 Subscriber 的 onSubscribe()传递数据订阅媒介SubscriptionSubscriber可以通过Subscription#request请求数据通过Subscription#cancel取消数据发布等
Project Reactor模块架构 参考 响应式编程入门之 Project Reactor 响应式可以理解为观察者模式通过订阅和发布数据流中的数据对数据进行处理。 在 Reactor 中大部分实现都是按照下图的逻辑来执行的
1首先是Subscriber订阅者通过调用 Publisher 的 subscribe 方法主动订阅 Publisher发布者2Publisher 在向下游发送数据之前会先调用 Subscriber 的 onSubscribe 方法传递的参数为 Subscription订阅媒介3Subscriber 通过 Subscription#request 来请求数据或者 Subscription#cancel 来取消数据发布这就是响应式编程中的背压订阅者可以控制数据发布4Subscription 在接收到订阅者的调用后通过 Subscriber#onNext 向下游订阅者传递数据。5在数据发布完成后调用 Subscriber#onComplete 结束本次流如果数据发布或者处理遇到错误会调用 Subscriber#onError 调用 Subscriber#onNextonCompleteonError 这三个方法可能是在 Publisher 中做的也可能是在 Subscription 中做的根据不同的场景有不同的实现方式并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布 Notejava中生产消费者模式和观察者模式有啥区别 参考https://worktile.com/kb/ask/38120.html 概念不同 生产消费者模式就是一个多线程并发协作的模式。在这个模式中一部分线程被用于去生产数据另一部分线程去处理数据于是便有了形象的生产者与消费者了。观察者模式是一种对象行为模式它定义对象间的一种一对多的依赖关系当一个对象的状态发生改变时所有依赖于它的对象都得到通知并被自动更新。 编程范式不同 生产者消费者模式本身并不属于设计模式中的任何一种而观察者模式属于Gang of Four提出的23种设计模式中的一种也是面向对象的设计模式中的一种。 4、WebFlux中关于Flux和Mono的介绍Flux和 Mono 都是数据流的发布者可以发出元素值错误信号完成信号
参考WebFlux 响应式编程介绍及简单实现
Reactor提供了2种返回类型Mono和Flux。
Mono0…1个数据Flux 0…N个数据
使用 Flux 和 Mono 都可以发出三种数据信号Flux 和 Mono 都是数据流的发布者使用 Flux 和 Mono 都可以发出三种数据信号元素值错误信号完成信号错误信号和完成信号都代表终止信号终止信号用于告诉订阅者数据流结束了错误信号终止数据流同时把错误信息传递给订阅者。 三种信号的特点
1错误信号和完成信号都是终止信号不能共存2如果没有发送任何元素值而是直接发送错误或者完成信号表示是空数据流3如果没有错误信号也没有完成信号表示是无限数据流
jdk8想使用flux需要引入依赖org.projectreactor.reactor-core
just()创建Flux序列并声明指定数据流subscribe()订阅Flux序列只有进行订阅后才回触发数据流不订阅就什么都不会发生
关于Project Reactor 响应式编程中MonoFlux操作符的使用 参考 Project Reactor 响应式编程 反应式技术组件的关系WebFlux SpringBoot r2dbc底层用到了JDK 8的函数式编程 响应式编程是观察者模式订阅者和生产者之间通过设置缓存队列的大小来实现背压 Reactive Stream组件模型中Processor即作为上游节点的订阅者也作为下游节点的发布者起到数据转换、过滤的作用。
二、WebFlux的基本使用
1、WebFlux的使用 - Flux / Mono / 操作符
参考 Project Reactor 响应式编程
1WebFlux如何接入到Springboot项目 如果你是springboot的WebMVC项目很容易就可以改为WebFlux项目 pom中引入spring-boot-starter-webflux即可使用WebFlux注意不要引入webMVC否则会走spring MVC如果走webflux启动会打印NettyWebServer 和springMVC的用法一定程度兼容 1支持spring的各种注解 Controller
RequestMapping
ResponseBody2允许返回非Mono、非Flux类型但这样就是阻塞代码了。 3非阻塞返回Mono、Flux类型 4HttpServletRequest变成了ServerHttpRequestresponse也是类似的变化。
2快速上手Reactive
参考【Spring Webflux】深入Spring5新特性Webflux模块详解 例子1 - 响应式编程基本流程创建发布者创建订阅者建立两者关系发布者发布数据 public static void main(String[] args) {// Step1: 创建发布者 (直接创建Flux/Mono等发布者同时会以流的形式发送数据
// Publisher publisher new Publisher() {
// Override
// public void subscribe(Subscriber subscriber) {
// System.out.println(订阅者完成订阅);
// }
//
// public void submit(String data){
// System.out.println(开始发送数据: data);
// }
// };// Step1: 创建发布者并发布数据找不到SubmissionPublisher类直接创建Flux/Mono等发布者同时会以流的形式发送数据FluxString stringFlux Flux.just(Hello,World);// Step2: 创建订阅者Subscriber subscriber new Subscriber() {Subscription subscription;Overridepublic void onSubscribe(Subscription subscription) {System.out.println(开始建立订阅关系);this.subscription subscription;subscription.request(1); // 第一次需要}Overridepublic void onNext(Object o) {System.out.println(开始接收数据: o);// 业务处理subscription.request(10); // 背压}Overridepublic void onError(Throwable throwable) {System.out.println(接收了错误数据);}Overridepublic void onComplete() {System.out.println(已成功接收完数据);}};// Step3: 建立订阅关系stringFlux.subscribe(subscriber);}---
开始建立订阅关系
开始接收数据:Hello
开始接收数据:World
已成功接收完数据 例子2 - 使用Processor中间处理角色的响应式编程Processor即作为上游节点的订阅者也作为下游节点的发布者起到数据转换、过滤的作用 /*** Processor和发布者建立订阅关系并将信息转发给订阅者*/public static void main(String[] args) {// Step1: 创建订阅者找不到SubmissionPublisher类直接创建Flux/Mono等发布者同时会以流的形式发送数据Flux stringFlux Flux.just(Hello, World);// Step2: 创建ProcessorProcessor processor new ProcessorString,String(){Subscription subscription;Overridepublic void onSubscribe(Subscription subscription) {System.out.println(【Processor】开始建立与发布者的订阅关系);this.subscription subscription;subscription.request(1); // 第一次需要}Overridepublic void onNext(String s) {System.out.println(【Processor】接收数据: s);subscription.request(10); // 背压}Overridepublic void onError(Throwable throwable) {System.out.println(【Processor】接收了错误数据);}Overridepublic void onComplete() {System.out.println(【Processor】已成功接收完数据);}Overridepublic void subscribe(Subscriber subscriber) {System.out.println(【Processor】订阅者完成订阅);}};// Step3: 创建订阅者Subscriber subscriber new Subscriber() {Subscription subscription;Overridepublic void onSubscribe(Subscription subscription) {System.out.println(【Subscriber】开始建立订阅关系);this.subscription subscription;subscription.request(1); // 第一次需要}Overridepublic void onNext(Object o) {System.out.println(【Subscriber】接收数据: o);// 业务处理subscription.request(10); // 背压}Overridepublic void onError(Throwable throwable) {System.out.println(【Subscriber】接收了错误数据);}Overridepublic void onComplete() {System.out.println(【Subscriber】已成功接收完数据);}};// Step5: 订阅者和Processor建立订阅关系processor.subscribe(subscriber);// Step4: Processor和发布者建立订阅关系stringFlux.subscribe(processor);// Step6: 发布者发布数据Flux/Mono一开始就发布了}---
【Processor】订阅者完成订阅
【Processor】开始建立与发布者的订阅关系
【Processor】接收数据:Hello
【Processor】接收数据:World
【Processor】已成功接收完数据 例子3 -使用响应式编程进行简单业务处理 /*** 输入: hello guys i am bole welcome to normal school jdk quick fox prizev* 输出: abcdefghijklmnopqrstuvwxyz*/public static void main(String[] args) {String src hello guys i am bole welcome to normal school jdk quick fox prizev;Flux.fromArray(src.split()).filter(e - !e.equals( )).flatMap(e - Flux.just(e.split()))
// .doOnNext(e - {
// System.out.println(doOnNext: e);
// }).distinct().sort().subscribe(e - {System.out.print(e);});}---abcdefghijklmnopqrstuvwxyz3Flux的使用常用方法just()fromArray()fromIterable()和 fromStream()empty()error()never()range(int start, int count)interval(Duration period) 和 interval(Duration delay, Duration period)
Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时订阅者中对应的方法 onNext(), onComplete() 和 onError() 会被调用。 just()的使用 可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。 Flux.just(hello, world).doOnNext((i) - {System.out.println([doOnNext] i);}).doOnComplete(() - System.out.println([doOnComplete])).subscribe(i - System.out.println([subscribe] i));// 执行结果[doOnNext] hello
[subscribe] hello
[doOnNext] world
[subscribe] world
[doOnComplete]fromArray()fromIterable()和 fromStream() 可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。 ListString arr Arrays.asList(flux, mono, reactor, core);
Flux.fromIterable(arr).doOnNext((i) - {System.out.println([doOnNext] i);}).subscribe((i) - {System.out.println([subscribe] i);});
//执行结果
[doOnNext] flux
[subscribe] flux
[doOnNext] mono
[subscribe] mono
[doOnNext] reactor
[subscribe] reactor
[doOnNext] core
[subscribe] core empty() 创建一个不包含任何元素只发布结束消息的序列。 Flux.empty().doOnNext(i - {System.out.println([doOnNext] i);}).doOnComplete(() - {System.out.println([DoOnComplete] );}).subscribe(i - {System.out.println([subscribe] i);});
//执行结果
[DoOnComplete] error(Throwable error) 创建一个只包含错误消息的序列。 try {int []arr new int[5];arr[10] 2;
} catch (Exception e) {Flux.error(e).subscribe(i - {System.out.println(error subscribe);});
}
//执行结果never() 创建一个不包含任何消息通知的序列 Flux.never().doOnNext(i - {System.out.println(doOnNext i);}).doOnComplete(() - {System.out.println(doOnComplete);}).subscribe((i) - {System.out.println(subscribe i);});
//执行结果
空 range(int start, int count) 创建包含从 start 起始的 count个数量的 Integer 对象的序列。 Flux.range(5, 10).doOnNext(i - {System.out.println(doOnNext i);}).doOnComplete(() - {System.out.println(doOnComplete);}).subscribe((i) - {System.out.println(subscribe i);});
//执行结果
doOnNext 5
subscribe 5
doOnNext 6
subscribe 6
doOnNext 7
subscribe 7
doOnNext 8
subscribe 8
doOnNext 9
subscribe 9
doOnNext 10
subscribe 10
doOnNext 11
subscribe 11
doOnNext 12
subscribe 12
doOnNext 13
subscribe 13
doOnNext 14
subscribe 14
doOnComplete interval(Duration period)和 interval(Duration delay, Duration period) 创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外还可以指定起始元素发布之前的延迟时间。 Flux.interval(Duration.ofSeconds(4), Duration.ofSeconds(2)).doOnNext(i - {System.out.println(doOnNext i);}).doOnComplete(() - {System.out.println(doOnComplete new Date());}).subscribe((i) - {System.out.println(subscribe i , date: new Date());});
try {Thread.sleep(10000);
} catch (InterruptedException e) {e.printStackTrace();
}
//执行结果
doOnNext 0
subscribe 0, date: Fri Jun 25 10:17:56 CST 2021
doOnNext 1
subscribe 1, date: Fri Jun 25 10:17:58 CST 2021
doOnNext 2
subscribe 2, date: Fri Jun 25 10:18:00 CST 2021
doOnNext 3
subscribe 3, date: Fri Jun 25 10:18:02 CST 2021 intervalMillis(long period)和 intervalMillis(long delay, long period) 与 interval()方法的作用相同只不过该方法通过毫秒数来指定时间间隔和延迟时间。
4Mono的使用常用方法fromCallable()、fromCompletionStage()、delay(Duration duration)、ignoreElements(Publisher source)、justOrEmpty(Optional? extends T data)
Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。 fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier() 分别从Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。 Mono.fromCallable(() - {System.out.println(begin callable);return Hello;}).subscribeOn(Schedulers.elastic()).doOnNext((i) - System.out.println(doOnNext i , thread : Thread.currentThread().getName())).subscribe(System.out::println);
Thread.sleep(10000);
//执行结果
begin callable
doOnNext Hello, thread :elastic-2
HelloMono.fromFuture(CompletableFuture.supplyAsync(() - {System.out.println(begin);return hello;})).subscribeOn(Schedulers.elastic()).doOnNext((i) - System.out.println(doOnNext i , thread : Thread.currentThread().getName())).subscribe(System.out::println);
Thread.sleep(10000);
//执行结果
begin
doOnNext hello, thread :elastic-2
hellodelay(Duration duration)和 delayMillis(long duration) 创建一个 Mono 序列在指定的延迟时间之后产生数字 0 作为唯一值。 Mono.delay(Duration.ofSeconds(1)).subscribe(System.out::println);
Thread.sleep(3000);
//执行结果, 延迟一秒后打印
0ignoreElements(Publisher source) 创建一个 Mono 序列忽略作为源的 Publisher 中的所有元素只产生结束消息。 Mono.ignoreElements((i) - {System.out.println(ignoreElements);}).doOnNext((i) - System.out.println(doOnNext i)).subscribe(System.out::println);
//执行结果
ignoreElementsjustOrEmpty(Optional? extends T data)和 justOrEmpty(T data) 从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时Mono 序列才产生对应的元素 OptionalInteger optional Optional.empty();
Mono.justOrEmpty(optional).doOnNext((i) - System.out.println(doOnNext i)).subscribe(System.out::println);System.out.println();optional Optional.of(100);
Mono.justOrEmpty(optional).doOnNext((i) - System.out.println(doOnNext i)).subscribe(System.out::println);
//执行结果doOnNext 100
100
5、操作符的使用bufferbufferTimeoutbufferUntilbufferWhilefilterwindowzipWithBiFunctiontakereducereduceWithmergemergeSequentialflatMapflatMapSequentialconcatMapcombineLatest buffer 和 bufferTimeoutbufferUntil 和 bufferWhile 这两个操作符的作用是把当前流中的元素收集到集合中并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件所包含的元素的最大数量或收集的时间间隔。 方法 buffer()仅使用一个条件而 bufferTimeout()可以同时指定两个条件。指定时间间隔时可以使用 Duration 对象或毫秒数即使用 bufferMillis() 或 bufferTimeoutMillis()两个方法。 除了元素数量和时间间隔之外还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象。 bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中bufferWhile则只有当 Predicate 返回 true 时才会收集。一旦值为false会立即开始下一次收集。 Flux.range(1, 100).buffer(20).subscribe(System.out::println);//执行结果
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]filter 对流中包含的元素进行过滤只留下满足 Predicate 指定条件的元素。 Flux.range(1, 10).filter(i - i%20).doOnNext(i - {System.out.println([doOnNext] i);}).subscribe(i - {System.out.println(subscribe i);});
//执行结果
[doOnNext] 2
subscribe 2
[doOnNext] 4
subscribe 4
[doOnNext] 6
subscribe 6
[doOnNext] 8
subscribe 8
[doOnNext] 10
subscribe 10window window 操作符的作用类似于 buffer所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中因此返回值类型是 Flux。 Flux.range(1, 15).window(5).doOnNext((flux - {})).subscribe(flux - {flux.doOnNext((item) - {System.out.println([window] flux: item);}).doOnComplete(() - System.out.println(flux item complete)).subscribe();
});
// 执行结果
[window] flux: 1
[window] flux: 2
[window] flux: 3
[window] flux: 4
[window] flux: 5
flux item complete
[window] flux: 6
[window] flux: 7
[window] flux: 8
[window] flux: 9
[window] flux: 10
flux item complete
[window] flux: 11
[window] flux: 12
[window] flux: 13
[window] flux: 14
[window] flux: 15
flux item complete zipWith zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理由此得到的是一个元素类型为 Tuple2 的流 也可以通过一个 BiFunction 函数对合并的元素进行处理所得到的流的元素类型为该函数的返回值。 Flux.just(Hello, Project).zipWith(Flux.just(World, Reactor)).subscribe(System.out::println);System.out.println();Flux.just(Hello, Project).zipWith(Flux.just(World, Reactor), (s1, s2) - String.format(%s!%s!, s1, s2)).subscribe(System.out::println);
// 执行结果
Hello,World
Project,ReactorHello!World!
Project!Reactor! take take 系列操作符用来从当前流中提取元素。提取的方式可以有很多种。 take(long n)take(Duration timespan)和 takeMillis(long timespan)按照指定的数量或时间间隔来提取。 1take从当前流中提取元素。 Flux.range(1, 10).take(2).subscribe(System.out::println);
// 执行结果
1
22takeLast(long n)提取流中的最后 N 个元素。 Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
// 执行结果
9
103takeUntil(Predicate? super T predicate)提取元素直到 Predicate 返回 true。 Flux.range(1, 10).takeUntil(i - i 6).subscribe(System.out::println);
// 执行结果
1
2
3
4
5
6 4takeWhile(Predicate? super T continuePredicate) 当 Predicate 返回 true 时才进行提取。 Flux.range(1, 10).takeWhile(i - i 5).subscribe(System.out::println);
// 执行结果
1
2
3
45takeUntilOther(Publisher? other)提取元素直到另外一个流开始产生元素。 Flux.range(1, 5).takeUntilOther((i) - {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}
}).subscribe(System.out::println);
// 执行结果暂停1000ms后开始输出
1
2
3
4
5 reduce 和 reduceWith reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值则序列的第一个元素作为初始值。 Flux.range(1, 10).reduce((x, y) - {System.out.println(x: x , y: y);return xy;}).subscribe(System.out::println);
// 执行结果
x:1, y:2
x:3, y:3
x:6, y:4
x:10, y:5
x:15, y:6
x:21, y:7
x:28, y:8
x:36, y:9
x:45, y:10
55 Flux.range(1, 10).reduceWith(() - 100, (x, y) - {System.out.println(x: x , y: y);return xy;}).subscribe(System.out::println);
// 执行结果
x:100, y:1
x:101, y:2
x:103, y:3
x:106, y:4
x:110, y:5
x:115, y:6
x:121, y:7
x:128, y:8
x:136, y:9
x:145, y:10
155 merge 和 mergeSequential merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。不同之处在于 merge 按照所有流中元素的实际产生顺序来合并而 mergeSequential 则按照所有流被订阅的顺序以流为单位进行合并。 Flux.merge(Flux.interval(Duration.of(0, ChronoUnit.MILLIS),Duration.of(100, ChronoUnit.MILLIS)).take(2),Flux.interval(Duration.of(50, ChronoUnit.MILLIS),Duration.of(100, ChronoUnit.MILLIS)).take(2)).toStream().forEach(System.out::println);
System.out.println();
Flux.mergeSequential(Flux.interval(Duration.of(0, ChronoUnit.MILLIS),Duration.of(100, ChronoUnit.MILLIS)).take(2),Flux.interval(Duration.of(50, ChronoUnit.MILLIS),Duration.of(100, ChronoUnit.MILLIS)).take(2)).toStream().forEach(System.out::println);
// 执行结果
0
0
1
10
1
0
1 flatMap 和 flatMapSequential flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流再把所有流中的元素进行合并。flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的。 Flux.just(1, 2).flatMap(x - Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS), Duration.of(10, ChronoUnit.MILLIS)).take(x)).toStream().forEach(System.out::println);
// 执行结果
0
0
1concatMap 和 combineLatest concatMap 操作符的作用也是把流中的每个元素转换成一个流再把所有流进行合并。 与 flatMap 不同的是concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并与 flatMapSequential 不同的是concatMap 对转换之后的流的订阅是动态进行的而 flatMapSequential 在合并之前就已经订阅了所有的流。 Flux.just(5, 10).concatMap(x - Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);Flux.combineLatest(Arrays::toString,Flux.intervalMillis(100).take(5),Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);6其他说明 Flux中的map、flatMap、concatMap的区别 参考Flux中的map、flatMap、concatMap的区别 1map用于一对一的转换返回一个新的Flux元素顺序不变。 2flatMap用于一对多的转换返回一个新的Flux元素顺序可能发生变化。 3concatMap用于一对多的转换返回一个新的Flux元素顺序与原始Flux中的元素顺序保持一致。 选择使用哪种方法取决于具体的业务需求和对元素顺序的要求。如果不关心元素顺序可以考虑使用flatMap它的并行执行可以提高性能。如果要保持元素顺序可以使用concatMap但要注意可能会影响性能。而map适用于简单的一对一转换场景。
2、WebFlux响应式编程说明容易踩的坑、以及编程规范代码待实践验证
参考响应式编程说明
1基本概念
Publisher发布者(数据流)表示数据的生产者。
Subscriber订阅者表示数据的消费者。
Mono: 包含0-1个数据的发布者实现了Publisher。
Flux: 包含0-n个数据的发布者实现了Publisher。
Operator: 操作符表示对数据流中的数据的操作描述。用于改变发布者的行为。 当发布者被订阅时发布者才开始生产消息 编写代码实际上是使用操作符来一个描述数据处理逻辑当发布者被订阅时才会执行这些处理逻辑。 2选择合适的操作符响应式/非响应式方法比如flatMap是响应式方法doOnNext不是响应式方法因为返回值类型是ConsumerT
系统中大量使用到了reactor其核心类只有2个FluxMono。
常用操作符:
map: 转换上游数据: flux.map(UserEntity::getId)mapNotNull: 转换上游数据, 并忽略null值.(reactor 3.4提供)flatMap: 转换上游数据但是结果是一个数据流并将这个数据流平铺: flux.flatMap(this::findById)flatMapMany: 转换Mono中的元素为Flux(1个转多个): mono.flatMapMany(this::findChildren)concat: 将多个流连接在一起组成一个流(按顺序订阅) : Flux.concat(header,body)merge: 将多个流合并在一起, 同时订阅流: Flux.merge(save(info),saveDetail(detail))zip: 压缩多个流中的元素: Mono.zip(getData(id), getDetail(id), UserInfo::of)then: 上游流完成后执行其他的操作.doOnNext: 流中产生数据时执行.doOnError: 发送错误时执行.doOnCancel: 流被取消时执行.如: http未响应前, 客户端断开了连接.onErrorContinue: 流发生错误时,继续处理数据而不是终止整个流.defaultIfEmpty: 当流为空时,使用默认值.switchIfEmpty: 当流为空时,切换为另外一个流.as: 将流作为参数,转为另外一个结果:flux.as(this::save)
完整文档请查看官方文档
3null处理
数据流中到元素不允许为null因此在进行数据转换的时候要注意null处理。
//存在缺陷
return this.findById(id)//getDescription可能返回null,为null时会抛出空指针,.map(UserEntity::getDescription); //使用以下方式替代
return this.findById(id).mapNotNull(UserEntity::getDescription); 4非阻塞与阻塞
默认情况下reactor的调度器由数据的生产者(Publisher)决定。在WebFlux中则是netty的工作线程。 为了防止工作线程被阻塞导致服务崩溃在一个请求的流中禁止执行存在阻塞(如执行JDBC)可能的操作的。如果无法避免阻塞操作应该指定调度器。
Note之前在通过定时任务处理请求流时出现任务阻塞未执行的问题最后的解决方法是通过指定的调度器去执行阻塞的操作。
paramMono.publishOn(Schedulers.boundedElastic()) //指定调度器去执行下面的操作.map(param- jdbcService.select(param))5上下文
在响应式中大部分情况是禁止使用ThreadLocal的(可能造成内存泄漏)。因此基于ThreadLocal的功能都无法使用。reactor中引入了上下文在一个流中可共享此上下文 。通过上下文进行变量共享例如事务, 权限等功能。
//从上下文中获取
GetMapping
public MonoUserInfo getCurrentUser(){return Mono.deferContextual(ctx-userService.findById(ctx.getOrEmpty(userId).orElseThrow(IllegalArgumentException::new));
}//定义过滤器设置数据到上下文中
class MyFilter implements WebFilter{public MonoVoid filter(ServerWebExchange exchange, WebFilterChain chain){return chain.filter(exchange).contextWrite(Context.of(userId,...))}
}6常见问题
a问题一我写的操作看上去是正确的但是没有执行
有3种可能上游流为空多个流未组合在一起在不支持响应式的地方使用了响应式 a没有使用return关键字返回 错误示例 public MonoResponse handleRequest(Request request){
// 没有returnthis.findOldData(request);
}正确示例 public MonoResponse handleRequest(Request request){return this.findOldData(request);
}b上游流为空 public MonoResponse handleRequest(Request request){return this.findOldData(request).flatMap(old - {//这里为什么不执行? return ....})
}当findOldData返回的流为空时下游的flatMap等操作符需要操作流中元素的操作符是不会执行的。 可以通过switchIfEmpty操作符来处理空流的情况。 c多个流未组合在一起 只要方法返回值是Mono或者Flux都不能单独行动只要方法中调用了任何响应式操作那这个方法也应该是响应式。(返回Mono或者Flux class Service{MonoVoid handleRequest(request);
}错误示例 //错误示例handleRequest是响应式的但是此方法没有使用响应式操作。
public Result handleRequest(Request request){service.handleRequest(request);return ok;
}正确示例 //正确示例
public MonoResult handleRequest(Request request){return service//处理请求.handleRequest(request)//返回结果.thenReturn(ok);
}d在不支持响应式的操作符中使用响应式 错误示例saveLog是响应式的但是doOnNext并不支持响应式操作
public MonoVoid saveLog(Request req,Response resp){...
}public MonoResult handleRequest(Request request){return service//处理请求.handleRequest(request)//记录日志 此为错误的用法saveLog是响应式的但是doOnNext并不支持响应式操作.doOnNext(response- saveLog(request,response) )//返回结果.thenReturn(ok);
}doOnNext()方法并不支持响应式从doOnNext方法的语义以及参数ConsumerT可知此方法是不支持响应式的ConsumerT只有参数没有返回值而saveLog是响应式的方法因此这就不能在doOnNext()方法中使用响应式操作。 正确示例 return service//处理请求.handleRequest(request)//记录日志.flatMap(response- saveLog(request,response) )//返回结果.thenReturn(ok);e在流内部订阅终止了整个流
public MonoVoid saveLog(Request req,Response resp){...
}错误示例 //错误
public MonoResponse handleRequest(Request request){return service//处理请求.handleRequest(request)//记录日志 此为错误的用法.flatMap(response- {saveLog(request,response).subscribe();return Mono.emtpy();})//返回结果.thenReturn(ok);
}正确示例 //正确
public MonoResponse handleRequest(Request request){return service//处理请求.handleRequest(request)//记录日志.flatMap(response- {return saveLog(request,response);})//返回结果.thenReturn(ok);
}b问题二我想获取流中的元素怎么办 - 不要试图取值 不要试图从流中获取数据出来而是先思考需要对流中元素做什么。 需要对流中的数据进行操作时都应该使用对应操作符来处理根据Flux或者Mono提供的操作符API进行组合操作。 传统编程方式 public ListBook getAllBooks(){ListBookEntity bookEntities repository.findAll();ListBook books new ArrayList(bookEntities.size());for(BookEntity entity : bookEntities){Book book entity.copyTo(new Book());books.add(book);}return books;
}错误示例响应式编程错误取值示例 public Book getAllBooks(){return getRepository().createQuery().where(id,1).fetchOne().block();
} 警告 在响应式编程中在任何时候执行业务代码时都不要使用block()方法使用block()方法可能引发以下问题 阻塞线程调用block()方法会阻塞当前线程导致无法处理其他并发请求。这会降低系统的吞吐量和响应性能。死锁风险如果在处理响应式流时使用了block()方法而其中某些操作也依赖于同一个线程的结果则可能导致死锁。内存资源浪费阻塞调用将持续占用线程而每个线程都需要额外的内存资源。如果应用程序中同时有大量的阻塞操作可能导致线程池耗尽和内存资源浪费。 正确示例响应式编程正确取值示例 为了避免使用block()我们应该尽可能地使用响应式操作符如map、flatMap、filter等对数据流进行转换和处理并使用其他响应式方法如subscribe()来订阅数据流并触发异步处理。 public FluxBook getAllBooks(){return repository.findAll().map(entity- entity.copyTo(new Book()))
}
c问题三我需要在非响应式方法中使用响应式怎么办 尽量减少在响应式方法中使用block() public void handleRequest(Request request){//不到万不得已请勿使用block方法//logService.saveLog(request).block()//logService.saveLog(request).subscribe(result-log.debug(保存成功 {},request),error-log.warn(保存失败 {},request,error))
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/87710.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!