如何做全网影视网站手套外包加工网
news/
2025/9/24 20:35:05/
文章来源:
如何做全网影视网站,手套外包加工网,泊头市建设局官方网站,wordpress无法添加媒体Reactor 是一个基于响应式编程的库#xff0c;主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API#xff0c;包括创建、转换、过滤、组合等操作符#xff0c;用于处理异步数据流。以下是一些 Reactor 的主要 API 示例#xff1a; pom依赖 dependencyMan… Reactor 是一个基于响应式编程的库主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API包括创建、转换、过滤、组合等操作符用于处理异步数据流。以下是一些 Reactor 的主要 API 示例 pom依赖 dependencyManagementdependenciesdependencygroupIdio.projectreactor/groupIdartifactIdreactor-bom/artifactIdversion2023.0.0/versiontypepom/typescopeimport/scope/dependency/dependencies/dependencyManagementdependenciesdependencygroupIdio.projectreactor/groupIdartifactIdreactor-core/artifactId/dependencydependencygroupIdio.projectreactor/groupIdartifactIdreactor-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.junit.jupiter/groupIdartifactIdjunit-jupiter/artifactIdversion5.7.2/versionscopetest/scope/dependency/dependencies1. 创建 Mono 和 Flux
Mono: 用于表示包含零个或一个元素的异步序列。Flux: 用于表示包含零个或多个元素的异步序列。
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;public class ReactorCreateExample {public static void main(String[] args) {// 创建包含单个元素的 MonoMonoString mono Mono.just(Hello, Reactor!);// 创建包含多个元素的 FluxFluxInteger flux Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});mono.subscribe(System.out::println); // 输出: Hello, Reactor!flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}2. 转换操作符
使用转换操作符对数据流进行转换或处理。
import reactor.core.publisher.Flux;public class ReactorTransformExample {public static void main(String[] args) {FluxInteger source Flux.range(1, 5);// 对每个元素进行平方操作FluxInteger squared source.map(x - x * x);squared.subscribe(System.out::println); // 输出: 1, 4, 9, 16, 25}
}3. 过滤操作符
使用过滤操作符筛选数据流中的元素。
import reactor.core.publisher.Flux;public class ReactorFilterExample {public static void main(String[] args) {FluxInteger source Flux.range(1, 5);// 筛选偶数FluxInteger evenNumbers source.filter(x - x % 2 0);evenNumbers.subscribe(System.out::println); // 输出: 2, 4}
}4. 组合操作符
使用组合操作符组合多个数据流。
import reactor.core.publisher.Flux;public class ReactorCombineExample {public static void main(String[] args) {FluxInteger source1 Flux.range(1, 3);FluxInteger source2 Flux.range(4, 3);// 合并两个数据流FluxInteger merged Flux.concat(source1, source2);merged.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5, 6}
}这些只是 Reactor API 的一小部分示例。Reactor 提供了丰富的操作符和方法用于处理复杂的异步数据流。开发人员可以根据具体需求选择适当的操作符进行组合以构建出符合业务逻辑的异步处理链。
5. 错误处理
Reactor 提供了多种处理错误的方式例如使用 onErrorResume, onErrorReturn, doOnError 等方法。
import reactor.core.publisher.Flux;public class ReactorErrorHandlingExample {public static void main(String[] args) {FluxInteger source Flux.just(1, 2, 0, 4, 5);// 处理除零异常并提供默认值FluxInteger result source.map(x - 10 / x).onErrorResume(ex - Flux.just(-1));result.subscribe(System.out::println); // 输出: 10, 5, -1}
}6. 背压处理
Reactor 提供了背压处理的支持允许生产者和消费者之间实现合理的数据流控制。使用 onBackpressureBuffer 或者其他背压操作符可以处理高速生产者和慢速消费者之间的数据流。
import reactor.core.publisher.Flux;public class ReactorBackpressureExample {public static void main(String[] args) {FluxInteger source Flux.range(1, 1000);// 设置缓冲区大小FluxInteger buffered source.onBackpressureBuffer(10);buffered.subscribe(data - {// 模拟慢速消费者try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(data);},error - System.err.println(Error: error),() - System.out.println(Done));}
}TODO未能实现没有背压和有背压的对比
7. 使用 Reactor WebFlux 处理 Web 请求
Reactor 还提供了 WebFlux 模块用于处理响应式的 Web 请求。以下是一个简单的示例
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;RestController
public class WebFluxController {GetMapping(/hello)public MonoResponseEntityString hello() {return Mono.just(ResponseEntity.ok(Hello, Reactor WebFlux!));}
}8. Reactor 核心概念
Reactor 中有一些核心概念了解这些概念有助于更好地使用 Reactor API。 Publisher发布者: 代表一个生产数据的源头通常是 Mono 或 Flux。 Subscriber订阅者: 用于消费数据流的组件。通过 subscribe 方法订阅 Publisher。 Subscription订阅: 代表 Subscriber 和 Publisher 之间的连接。Subscriber 可以使用 Subscription 来请求数据取消订阅等。 Processor处理器: 既是 Publisher 又是 Subscriber用于在两者之间进行转换和处理。
public class ReactorCoreConceptsExample {public static void main(String[] args) {// 创建发布者FluxInteger source Flux.range(1, 5);// 创建处理器并进行数据处理UnicastProcessorInteger processor UnicastProcessor.create();source.map(value - value * 2) // Example: doubling the values.subscribe(processor);// 创建订阅者CustomSubscriberInteger subscriber new CustomSubscriber();// 订阅并处理数据processor.subscribe(subscriber);}// 自定义订阅者static class CustomSubscriberT extends BaseSubscriberT {Overrideprotected void hookOnNext(T value) {System.out.println(Processed Value: value);}Overrideprotected void hookOnError(Throwable throwable) {System.err.println(Error: throwable);}Overrideprotected void hookOnComplete() {System.out.println(Done);}}
}UnicastProcessor.create()已弃用可以使用Sinks.many().unicast().onBackpressureBuffer()
9. Reactor 调度器
Reactor 提供了多种调度器用于控制异步操作的执行线程。例如Schedulers.boundedElastic() 创建了一个弹性线程池可以根据需要动态调整线程数。
public class ReactorSchedulersExample {public static void main(String[] args) {Flux.range(1, 5).publishOn(Schedulers.boundedElastic()) // 在弹性线程池上发布.map(x - x * x).subscribeOn(Schedulers.parallel()) // 在并行线程池上订阅.subscribe(System.out::println);}
}经测试大概率只使用了一个线程
11. 组合多个 Mono 或 Flux
使用 zip 操作符可以组合多个 Mono 或 Flux将它们的元素进行组合。
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorZipExample {public static void main(String[] args) {MonoString mono1 Mono.just(Hello);MonoString mono2 Mono.just(Reactor);// 将两个 Mono 合并为一个 FluxFluxString result Flux.zip(mono1, mono2).map(tuple - tuple.getT1() tuple.getT2());result.subscribe(System.out::println); // 输出: Hello Reactor}
}12. 超时操作
使用 timeout 操作符可以在指定的时间内等待数据流产生结果如果超时则触发错误。
public class ReactorTimeoutExample {public static void main(String[] args) throws InterruptedException {FluxInteger source Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(2)); // 模拟延迟// 在指定时间内等待数据流产生结果否则触发超时source.timeout(Duration.ofSeconds(1)).subscribe(data - System.out.println(Received: data),error - System.err.println(Error: error),() - System.out.println(Done));//睡一会等待任务执行完成Thread.sleep(3333);}
}
13. 并行操作
使用 parallel 操作符可以将一个数据流并行处理提高处理速度。
public class ReactorParallelExample {public static void main(String[] args) throws InterruptedException {Flux.range(1, 10).parallel().runOn(Schedulers.parallel()).map(x - x * x).sequential().subscribe(System.out::println);//睡一会等待任务执行完成Thread.sleep(1111);}
}
14. 与 Java Stream 集成
Reactor 与 Java Stream 可以方便地进行集成。
import reactor.core.publisher.Flux;
import java.util.stream.Stream;public class ReactorJavaStreamIntegrationExample {public static void main(String[] args) {FluxInteger flux Flux.fromStream(Stream.of(1, 2, 3, 4, 5));flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}15. 使用 Mono 和 Flux 进行条件操作
Reactor 提供了条件操作符例如 switchIfEmpty 和 filter用于根据条件处理数据流。
public class ReactorConditionalOperatorsExample {public static void main(String[] args) {FluxInteger empty Flux.range(1, 0);FluxInteger source Flux.range(1, 5);// 如果数据流为空则切换到另一个数据流empty.switchIfEmpty(Flux.range(6, 3)).subscribe(System.out::println); // 输出: 6,7,8// 使用 filter 过滤元素source.filter(x - x % 2 0).subscribe(System.out::println); // 输出: 2, 4}
}16. 使用 Reactor StepVerifier 进行测试
代码需要写在test测试目录下
Reactor 提供了 StepVerifier 类用于测试异步操作的行为。
public class ReactorTestingExample {public static void main(String[] args) {FluxInteger flux Flux.range(1, 5);// 使用 StepVerifier 验证数据流的行为StepVerifier.create(flux).expectNext(1, 1, 3, 4, 5)//正确顺序应该是12345.expectComplete().verify();}
}17. 使用 Mono 和 Flux 进行重试
Reactor 提供了 retryWhen 方法结合 Backoff 操作符用于在发生错误时进行重试。
public class ReactorRetryExample {public static void main(String[] args) throws InterruptedException {MonoObject source Mono.fromCallable(() - {throw new RuntimeException(Simulated error);})//最大重试次数为3次初始重试间隔为1秒并且采用指数回退策略直到达到最大的回退时间这里是5秒。.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)));source.subscribe(data - System.out.println(Received: data),error - System.err.println(Error: error.getMessage()));//得多睡会儿让它跑完最大重试时间Thread.sleep(999999);}
}19. 使用 Reactor Context 进行上下文传递
Reactor 提供了 Context 类用于在操作链中传递上下文信息。这对于在异步操作中共享信息非常有用。
import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class ReactorContextExample {public static void main(String[] args) {MonoString mono Mono.deferContextual(contextView -Mono.just(Hello, contextView.get(user)));String result mono.contextWrite(Context.of(user, John)).block();System.out.println(result); // 输出: Hello, John}
}20. 使用 Reactor 的 doOn 方法进行副作用处理
doOn 系列方法允许在数据流的不同生命周期阶段执行副作用操作如日志记录、统计等。
import reactor.core.publisher.Flux;public class ReactorDoOnExample {public static void main(String[] args) {FluxInteger source Flux.range(1, 5);source.doOnNext(value - System.out.println(Processing element: value)).doOnComplete(() - System.out.println(Processing complete)).subscribe(System.out::println);}
}21. 使用 Reactor 的 transform 方法进行操作链重用
transform 方法允许对操作链进行重用将一系列操作组合为一个新的 Function。
import reactor.core.publisher.Flux;import java.util.function.Function;public class ReactorTransformExample {public static void main(String[] args) {FluxInteger source Flux.range(1, 5);// 定义一个操作链FunctionFluxInteger, FluxInteger customTransform flux -flux.filter(x - x % 2 0).map(x - x * 2);// 使用 transform 应用自定义操作链source.transform(customTransform).subscribe(System.out::println); // 输出: 4, 8}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/916209.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!