observable_在Completablefuture和Observable之间转换

observable

Java 8中的CompletableFuture<T>是对T类型的值将来将可用的承诺的高级抽象。 Observable<T>非常相似,但是它承诺将来会出现任意数量的项,从0到无穷大。 异步结果的这两种表示形式与仅使用一项即可使用Observable而不是CompletableFuture情况非常相似,反之亦然。 另一方面, CompletableFuture更专业,并且由于它现在是JDK的一部分,因此应该很快流行起来。 让我们用简短的文章来庆祝RxJava 1.0的发布,该文章展示了如何在不失去异步和事件驱动性质的情况下在两者之间进行转换。

CompletableFuture表示将来的一个值,因此将其变为Observable非常简单。 当Future以某个值完成时, Observable也将立即发出该值并关闭流:

class FuturesTest extends Specification {public static final String MSG = "Don't panic"def 'should convert completed Future to completed Observable'() {given:CompletableFuture<String> future = CompletableFuture.completedFuture("Abc")when:Observable<String> observable = Futures.toObservable(future)then:observable.toBlocking().toIterable().toList() == ["Abc"]}def 'should convert failed Future into Observable with failure'() {given:CompletableFuture<String> future = failedFuture(new IllegalStateException(MSG))when:Observable<String> observable = Futures.toObservable(future)then:observable.onErrorReturn({ th -> th.message } as Func1).toBlocking().toIterable().toList() == [MSG]}  CompletableFuture failedFuture(Exception error) {CompletableFuture future = new CompletableFuture()future.completeExceptionally(error)return future}}

尚未执行的 Futures.toObservable()第一个测试会将Future转换为Observable ,并确保正确传播值。 第二次测试创建了失败的Future ,将失败替换为异常的消息,并确保传播了异常。 实现要短得多:

public static <T> Observable<T> toObservable(CompletableFuture<T> future) {return Observable.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}}));
}

注意: Observable.fromFuture()存在,但是我们想充分利用ComplatableFuture的异步运算符。

实际上,有两种将Observable转换为Future -创建CompletableFuture<List<T>>CompletableFuture<T> (如果我们假设Observable只有一项)。 让我们从前一种情况开始,用以下测试用例进行描述:

def 'should convert Observable with many items to Future of list'() {given:Observable<Integer> observable = Observable>just(1, 2, 3)when:CompletableFuture<List<Integer>> future = Futures>fromObservable(observable)then:future>get() == [1, 2, 3]
}def 'should return failed Future when after few items exception was emitted'() {given:Observable<Integer> observable = Observable>just(1, 2, 3)>concatWith(Observable>error(new IllegalStateException(MSG)))when:Futures>fromObservable(observable)then:def e = thrown(Exception)e>message == MSG
}

显然,直到源Observable信号流结束, Future才完成。 因此, Observable.never()将永远不会完成包装Future ,而是用空列表完成它。 该实现更短,更甜蜜:

public static <T> CompletableFuture<List<T>> fromObservable(Observable<T> observable) {final CompletableFuture<List<T>> future = new CompletableFuture<>();observable.doOnError(future::completeExceptionally).toList().forEach(future::complete);return future;
}

关键是Observable.toList() ,它可以方便地从Observable<T>Observable<List<T>> 。 当源Observable<T>完成时,后者发出List<T>类型的一项。

当我们知道CompletableFuture<T>将恰好返回一项时,就会发生上一次转换的特殊情况。 在这种情况下,我们可以将其直接转换为CompletableFuture<T> ,而不是仅包含一项的CompletableFuture<List<T>> 。 首先测试:

def 'should convert Observable with single item to Future'() {given:Observable<Integer> observable = Observable.just(1)when:CompletableFuture<Integer> future = Futures.fromSingleObservable(observable)then:future.get() == 1
}def 'should create failed Future when Observable fails'() {given:Observable<String> observable = Observable.<String> error(new IllegalStateException(MSG))when:Futures.fromSingleObservable(observable)then:def e = thrown(Exception)e.message == MSG
}def 'should fail when single Observable produces too many items'() {given:Observable<Integer> observable = Observable.just(1, 2)when:Futures.fromSingleObservable(observable)then:def e = thrown(Exception)e.message.contains("too many elements")
}

同样,实现非常简单并且几乎相同:

public static <T> CompletableFuture<T> fromSingleObservable(Observable<T> observable) {final CompletableFuture<T> future = new CompletableFuture<>();observable.doOnError(future::completeExceptionally).single().forEach(future::complete);return future;
}

上面的Helper方法还不够完善,但是,如果您需要在JDK 8和RxJava风格的异步计算之间进行转换,那么这篇文章应该足以帮助您入门。

翻译自: https://www.javacodegeeks.com/2014/12/converting-between-completablefuture-and-observable.html

observable

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/338681.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python idle使用anaconda中库怎么用_如何使用Anaconda学习Python

要学习Python进行数据分析&#xff0c;需要先安装Anaconda和Jupyter Notebook关于Anaconda1、为什么要用Anaconda&#xff1f;1)Anaconda自带常用数据科学包&#xff0c;你可以立即开始处理数据2)conda可帮助更好的安装和整理第三方包3)Conda可帮助你为不同的项目建立不同的运行…

C 为什么非要引入那几种类型转换?

为什么要引入这几种类型转换&#xff0c;它与C语言中的强制类型转换有什么区别&#xff1f;这四种类型转换分别应用在什么场景&#xff1f;C 为什么要引入这几种强制类型转换&#xff1f;我们都知道C 完全兼容C语言&#xff0c;C语言的转换方式很简单&#xff0c;可以在任意类型…

删除一个程序Linux,一天一个Linux基础命令删除文件或目录命令rm

rm删除文件命令1、命令格式rm [options] file...2、命令功能rm命令&#xff0c;删除文件命令&#xff0c;是Linux系统下常用命令&#xff0c;该命令的功能为删除一个文件或者整个目录或者目录中的部分文件&#xff0c;它也可以将某个目录及其下的所有文件及子目录均删除。对于…

C语言笔试两题,有坑

题目一最近遇到的一个华为笔试题题目&#xff1a;对字符串中的所有单词进行倒排。说明&#xff1a;1、构成单词的字符只有26个大写或小写英文字母&#xff1b;2、非构成单词的字符均视为单词间隔符&#xff1b;3、要求倒排后的单词间隔符以一个空格表示&#xff1b;如果原字符串…

python apply_async函数_进程池未执行apply_async中添加的函数就直接结束了

代码没有执行apply_async中添加的函数就直接结束了from bs4 import BeautifulSoupimport randomimport requestsimport pymongoimport datetimeimport randomimport timefrom multiprocessing import Pooluser_agents [Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X)…

java8 streams_Java 8 Streams API作为友好的ForkJoinPool外观

java8 streams我最喜欢Java 8的功能之一是流API。 最终&#xff0c;它消除了代码中的几乎所有循环&#xff0c;并使您可以编写更具表现力和重点的代码。 今天&#xff0c;我意识到它可以用于其他用途&#xff1a;作为ForkJoinPool一个不错的前端。 问题&#xff1a;执行器样板…

嵌入式linux文件系统启动,嵌入式Linux之文件系统启动分析【原创】

this.p{ m:2,b:2,loftPermalink:,id:fks_094068082086089066084084095095080087080066082082083075,blogTitle:嵌入式Linux之文件系统启动分析【原创】,blogAbstract:author&#xff1a;张继飞写在前面&#xff0c;这一切必须是在Linux内核挂载文件系统后。在Linux内核中找到/i…

C语言:如何定义一个和库函数名一样的函数,并在函数中调用该库函数

某个函数fun_1()是在lib内&#xff0c;没法修改的&#xff0c;在程序中大量的使用了该函数&#xff0c;现在想把原本fun_1失效&#xff08;现在失效的方法是#define fun_1(..)),用另外一个函数fun_2(),可是fun_2最后也需要调用fun_1,上面的失效方法感觉就不行了&#xff0c;请问…

python计算派的近似值、当任意项的值小于10^-4_编程计算e的近似值,直到最后一项的绝对值小于10^-5时为止,输出e的值并统计累加项数...

展开全部1、问62616964757a686964616fe4b893e5b19e31333433633530题主要是类型的问题。因为a 是int型的&#xff0c;在计算的时候不转型成浮点型是不对的(1/a0)因此要写成&#xff1a;pipi1.0/(2*a-1);pipi-1.0/(2*a-1);还有判断条件&#xff1a;while ((1.0/a)>1e-4);另外&…

jdk8集合类流_JDK 8中的流驱动的集合功能

jdk8集合类流这篇文章介绍了JDK 8的应用–引入了带有集合的 流 &#xff0c;以更简洁地完成通常需要的与集合相关的功能。 在此过程中&#xff0c;将演示并简要说明使用Java Streams的几个关键方面。 请注意&#xff0c;尽管JDK 8 Streams通过并行化支持提供了潜在的性能优势&a…

C语言可变参数只会用算啥本事?看我来抽丝剥茧干翻它!

看山是山&#xff0c;看山不是山&#xff0c;最终看山才是山&#xff0c;并且是无穷的山峦。当我们学习一门技术的时候&#xff0c;起初是先模仿&#xff0c;但是最终是为了超越&#xff0c;也就是得到秘籍&#xff0c;看到本质。于是&#xff0c;今天来继续看可变参数&#xf…

Linux下CMAKE编译jsoncpp,使用CMake引入jsoncpp

在jni的cpp中使用json&#xff0c;百度了一下&#xff0c;大多都是下面这个库&#xff0c;但是和之前的文档有点出入了&#xff0c;记录一下。jsoncpp库地址&#xff1a;添加jsoncpp库在cpp目录下创建一个jsoncpp目录下载jsoncpp项目把src\lib_json文件夹下的文件都拉到项目的c…

python打印星星居中_python实现while循环打印星星的四种形状

在控制台连续输出五行*,每一行星号数量一次递增***************#1.定义一个行计数器row 1while row < 5:#定义一个列计数器col 1#开始循环while col < row:print(*,end)col 1print()row 1如果想要星星倒过来呢#1.定义一个行计数器row 1while row < 5:#定义一个列…

java8升级java12_为什么现在是升级到Java 8的最佳时机

java8升级java12有兴趣了解如何通过AppDynamics充分利用Java 8的新功能吗&#xff1f; 立即开始免费试用 &#xff01; 今年3月&#xff0c;Oracle发布了近十年来最受期待的版本Java8。自发布以来&#xff0c;最新版本引起了越来越多的关注&#xff0c;各种规模的公司都渴望升…

C语言#include还有些你不知道的事

#include简介在C语言中#include是preprocessor的一条指令&#xff0c;告诉预处理器将指定头文件的内容插入到预处理器命令的相应位置。#include "xxx.h" 和 #include有两种方式可以指定插入头文件&#xff1a;#include #include "filename"如果需要包含标…

java常见的ide_在三个Java IDE中生成的三种常见方法

java常见的ide在本文中&#xff0c;我研究了NetBeans 8.0.2 &#xff0c; IntelliJ IDEA 14.0.2和Eclipse Luna 4.4.1生成的三种“通用”方法[ equals&#xff08;Object&#xff09; &#xff0c; hashCode&#xff08;&#xff09;和toString&#xff08;&#xff09; ]的区别…

深度linux安装依赖,Linux -- Ubuntu下载deepin wine依赖问题笔记

问题开始下载deepin-wine安装包, 请稍后…1.1udis86_1.72-2_i3 100%[>] 34.18K 87.3KB/s 用时 0.4s1.2deepin-fonts-win 15%[> ] 31.18K 1.72KB/s 用时 18s1.2deepin-fonts-win 100%[>] 207.88K 26.2KB/s 用时 6.7s2.1deepin-libwine_2 100%[>] 18.97M 132KB/s 用时…

什么是C语言中的隐式函数声明?

「1、什么是C语言的隐式函数声明」在C语言中&#xff0c;函数在调用前不一定非要声明。如果没有声明&#xff0c;那么编译器会自动按照一种隐式声明的规则&#xff0c;为调用函数的C代码产生汇编代码。下面是一个例子&#xff1a;int main(int argc, char** argv) {double x a…

群晖 上传 源文件不存在_群晖NAS连接百度网盘报错?原因是这样的

群晖NAS附带的云同步套件可以与国内外多个网盘连接 , 连接后可从云上下载数据亦可从本地将数据上传到云上。例如通过云同步套件连接百度网盘账号后可以便捷上传和下载数据 , 若网盘空间较大甚至可用来备份整个NAS等。不过现在看来群晖与百度网盘的合作似乎已经结束&#xff0c;…

ssl/tls服务器瞬时_SSL / TLS REST服务器–带有Spring和TomEE的客户端

ssl/tls服务器瞬时在构建系统时&#xff0c;开发人员通常会忽略安全性方面。 安全一直是令人担忧的重要问题&#xff0c;但是它比以前吸引了更高的关注。 就在今年&#xff0c;我们发生了像Heartbleed Bug或CelebrityGate丑闻这样的案件。 这与帖子无关&#xff0c;只是安全真正…