我在使用Netflix Rx-Java库方面有良好的工作经验,并且以前曾写过关于使用Rx-Java和Java 8 CompletableFuture解决分散式问题的博客。 在这里,我想探索使用Spring Reactor Core库应用相同的模式。
tldr –如果您熟悉Netflix Rx-Java,您已经很熟悉Spring Reactor Core,API的地图,并且我很高兴看到Spring Reactor团队在Javadoc API中勤奋地使用了Marble图。
另一个快速点是, rx.Observable根据是否要发射许多项目或是否要发射一个项目来映射到Flux或Mono 。
有了这个,我可以直接进入示例–我执行了一个简单的任务(使用延迟模拟),该任务被生成了几次,我需要同时执行这些任务,然后收集结果,使用rx表示如下。可观察的代码:
@Test
public void testScatterGather() throws Exception {ExecutorService executors = Executors.newFixedThreadPool(5);List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());Observable<List<String>> merged = Observable.merge(obs).toList();List<String> result = merged.toBlocking().first();logger.info(result.toString());}private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);s.onNext( i + "-test");s.onCompleted();}).subscribeOn(Schedulers.from(executorService));
}
请注意,我纯粹是出于测试目的。 现在,使用Spring Reactor Core的类似代码可以转换为以下代码:
@Test
public void testScatterGather() {ExecutorService executors = Executors.newFixedThreadPool(5);List<Flux<String>> fluxList = IntStream.range(0, 10).boxed().map(i -> generateTask(executors, i)).collect(Collectors.toList());Mono<List<String>> merged = Flux.merge(fluxList).toList();List<String> list = merged.get();logger.info(list.toString());}public Flux<String> generateTask(ExecutorService executorService, int i) {return Flux.<String>create(s -> {Util.delay(2000);s.onNext(i + "-test");s.onComplete();}).subscribeOn(executorService);
}
它或多或少地一对一映射。 Mono类型的区别很小,我个人认为该类型是反应式库的不错的介绍,因为它可以很清楚地表明是否发出了多个项目,而只发出了一个。样品。 这些对我来说仍然是早期的探索,我希望对这个优秀的图书馆更加熟悉。
翻译自: https://www.javacodegeeks.com/2016/04/scatter-gather-using-spring-reactor-core.html