背压加载文件– RxJava常见问题解答

事实证明,将文件作为流进行处理非常有效且方便。 许多人似乎忘记了,自Java 8(3年多!)以来,我们可以很容易地将任何文件变成一行代码:

String filePath = "foobar.txt";
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {reader.lines().filter(line -> !line.startsWith("#")).map(String::toLowerCase).flatMap(line -> Stream.of(line.split(" "))).forEach(System.out::println);
}

reader.lines()返回Stream<String> ,您可以对其进行进一步转换。 在此示例中,我们丢弃以"#"开头的行,并通过将其拆分为单词来爆炸每行。 通过这种方式,我们可以实现单词流而不是行流。 使用文本文件几乎与使用普通Java集合一样简单。 在RxJava中, 我们已经学习了generate()运算符。 它也可以在这里用于从文件创建健壮的行流:

Flowable<String> file = Flowable.generate(() -> new BufferedReader(new FileReader(filePath)),(reader, emitter) -> {final String line = reader.readLine();if (line != null) {emitter.onNext(line);} else {emitter.onComplete();}},reader -> reader.close()
);

在上述示例中, generate()运算符稍微复杂一些。 第一个参数是状态工厂。 每次有人订阅此流时,都会调用工厂并创建有状态的BufferedReader 。 然后,当下游运营商或订户希望接收某些数据时,将调用第二个lambda(带有两个参数)。 此lambda表达式尝试从文件中精确提取一行,然后将其发送到下游( onNext() )或在遇到文件结尾时完成。 这很简单。 generate()的第三个可选参数是一个lambda表达式,可以对state进行一些清理。 在我们的情况下这非常方便,因为我们不仅必须在到达文件末尾时关闭文件,而且还必须在使用者过早取消订阅时关闭文件。

认识Flowable.using()运算符

这似乎需要做很多工作,尤其是当我们已经有了来自JDK 8的一行代码时。事实证明,有一个类似的工厂运算符using()很方便。 的翻译的所有最简单的方法首先Stream从Java到Flowable是通过转换StreamIterator (checked异常处理忽略):

Flowable.fromIterable(new Iterable<String>() {@Overridepublic Iterator<String> iterator() {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator();}
});

可以简化为:

Flowable.<String>fromIterable(() -> {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator();
});

但是我们忘了关闭BufferedReader从而关闭FileReader从而关闭文件句柄。 因此,我们引入了资源泄漏。 在这种情况下, using()运算符的作用就像是一种魅力。 在某种程度上,它类似于try-with-resources语句。 您可以基于某些外部资源创建流。 当有人订阅或取消订阅时,将为您管理此资源的生命周期(创建和处置):

Flowable.using(() -> new BufferedReader(new FileReader(filePath)),reader -> Flowable.fromIterable(() -> reader.lines().iterator()),reader -> reader.close()
);

它与上一个generate()示例非常相似,但是中间最重要的lambda表达式却大不相同。 我们获得一个资源( reader )作为参数,并假设返回一个Flowable (而不是单个元素)。 该lambda仅被调用一次,而不是在每次下游请求新项时调用。 using()运算符给我们的是管理BufferedReaders的生命周期。 当我们有一个状态(可以一次生成整个Flowable ,而不是一次generate()一个using()时, using()很有用。

流XML文件

…或JSON。 假设您有一个非常大的XML文件,其中包含以下条目,其中包括数十万个条目:

<trkpt lat="52.23453" lon="21.01685"><ele>116</ele>
</trkpt>
<trkpt lat="52.23405" lon="21.01711"><ele>116</ele>
</trkpt>
<trkpt lat="52.23397" lon="21.0166"><ele>116</ele>
</trkpt>

这是标准GPS交换格式的片段,可以描述任意长度的地理路线。 每个<trkpt>是具有纬度,经度和海拔的单个点。 我们希望有一个跟踪点流(为简单起见忽略高程),以便可以部分使用文件,而不是一次加载所有文件。 我们有三个选择:

  • DOM / JAXB –必须将所有内容加载到内存中并映射到Java对象。 不适用于无限长的文件(甚至非常大的文件)
  • SAX –基于推送的库,一旦发现XML标签打开或关闭,就会调用回调。 似乎好一点,但可能无法支持背压–由库决定何时调用回调,并且无法减慢其速度
  • StAX –与SAX相似,但是我们必须积极地从XML文件中提取数据。 这对于支持背压至关重要-我们决定何时读取下一个数据块

让我们尝试使用StAX和RxJava实现可能很大的XML文件的解析和流传输。 首先,我们必须首先学习如何使用StAX 。 该解析器称为XMLStreamReader ,它是按照以下咒语和诅咒序列创建的:

XMLStreamReader staxReader(String name) throws XMLStreamException {final InputStream inputStream = new BufferedInputStream(new FileInputStream(name));return XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
}

只需闭上眼睛,并确保您始终有一个地方可以复制粘贴上面的代码片段。 情况变得更糟。 为了读取第一个<trkpt>标记及其属性,我们必须编写一些复杂的代码:

import lombok.Value;@Value
class Trackpoint {private final BigDecimal lat;private final BigDecimal lon;
}Trackpoint nextTrackpoint(XMLStreamReader r) {while (r.hasNext()) {int event = r.next();switch (event) {case XMLStreamConstants.START_ELEMENT:if (r.getLocalName().equals("trkpt")) {return parseTrackpoint(r);}break;case XMLStreamConstants.END_ELEMENT:if (r.getLocalName().equals("gpx")) {return null;}break;}}return null;
}Trackpoint parseTrackpoint(XMLStreamReader r) {return new Trackpoint(new BigDecimal(r.getAttributeValue("", "lat")),new BigDecimal(r.getAttributeValue("", "lon")));
}

API是低级报价,并且几乎是古董。 一切都发生在一个巨大的循环中,该循环读取... int类型的东西 。 此int可以是START_ELEMENTEND_ELEMENT或我们不感兴趣的其他一些东西。请记住,我们正在读取XML文件,但不是逐行或逐字符,而是通过逻辑XML标记(标记)。 因此,如果发现打开<trkpt>元素,则将其解析,否则继续。 第二个重要条件是当我们发现关闭</gpx> ,这应该是GPX文件中的最后一件事。 在这种情况下,我们返回null ,表示XML文件结束。

感觉复杂吗? 实际上,这是读取具有恒定内存使用量的大型XML(与文件大小无关)的最简单方法。 所有这些与RxJava有何关系? 在这一点上,我们可以很容易地构建Flowable<Trackpoint> 。 是的, Flowable ,没有Observable (见: ObsevableObservable )。 这样的流将完全支持背压,这意味着它将以适当的速度读取文件:

Flowable<Trackpoint> trackpoints = generate(() -> staxReader("track.gpx"),this::pushNextTrackpoint,XMLStreamReader::close);void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) {final Trackpoint trkpt = nextTrackpoint(reader);if (trkpt != null) {emitter.onNext(trkpt);} else {emitter.onComplete();}
}

哇,如此简单,如此反压! [1]我们首先创建一个XMLStreamReader ,并确保在文件结束或有人取消订阅时将其关闭。 请记住,每个订户将一次又一次打开并开始解析相同的文件。 中间的lambda表达式仅使用状态变量( XMLStreamReader )并发出另一个跟踪点。 所有这些似乎都很晦涩,事实是! 但是,现在我们有了一个使用很少的资源就可以从一个可能很大的文件中提取回溯感知流。 我们可以同时处理跟踪点,也可以将它们与其他数据源组合在一起。 在下一篇文章中,我们将学习如何以非常相似的方式加载JSON。

翻译自: https://www.javacodegeeks.com/2017/09/loading-files-backpressure-rxjava-faq.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/349837.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CISCN final 几道web题总结

因为都有源码&#xff0c;所以这里直接从源码开始分析&#xff1a; 1.Easy web 这道题本来的意思应该是通过注入来load_file读取config.php来泄露cookie的加密密钥&#xff0c;从而伪造身份进行登陆再上传shell 这里本来addslashes以后就基本没法注入&#xff0c;但是这里却多了…

初一计算机下册理论知识,初中信息技术七年级下册(第七版)

内容简介暂无内容目录1  计算机图片与动画 ……………………………………………11.1  图片处理 ……………………………………………………11.1.1  活动 …………………………………………………………1活动1  制作校园风景人物画册 …………………………………1活动2  制…

南邮计算机学院是211,南京邮电大学是211还是985

南京邮电大学曾经是双非高校&#xff0c;目前进入了世界学科建设高校。而南京农业大学是211工程大学&#xff0c;现在也是世界学科建设高校。从学校的牌子上来看&#xff0c;南京农业大学要更具有影响力。而我们今天要和大家分享的是南京邮电大学&#xff0c;那问题来了南京邮电…

java 运行 .jar 文件乱码

http://yang3wei.github.io/blog/2013/02/10/java-dfile-dot-encoding-equals-utf-8-gan-diao-luan-ma/ 启动时加参数-Dfile.encodingUTF-8转载于:https://www.cnblogs.com/easyc/p/6385385.html

php cdi_涉及CDI和JSF的过期对话的定制错误页面

php cdi自上次写博客以来已经有一段时间了。 我一直在想写一些技术博客&#xff0c;但最终却忙于其他事情。 上周&#xff0c;在Coderanch论坛上进行了非常有趣的讨论。 甚至更有趣&#xff0c;因为它涉及JBoss。 熟悉Java EE Web应用程序的开发人员会知道&#xff0c;Web应用…

石家庄计算机职业学院张秋玉,九龙坡有什么大学

2019九龙坡有哪些大学&#xff1f;九龙坡所有大学名单【教育部最新8所】九龙坡有哪些大学是广大考生和家长朋友们比较关心的&#xff0c;以下是教育部最新公布的九龙坡所有大学名单&#xff0c;包含一本、二本、三本和大专总共8所大学&#xff0c;仅供参考。2019九龙坡大学名单…

struts2.3.4 问题

原问题为&#xff1a;严重: Exception starting filter struts2java.lang.ClassNotFoundException: org.apache.struts2.dispatcher.ng.filter.StrutsPrepareAndExecuteFilter 详细请参见http://q.cnblogs.com/q/38309/ 刚开始以为是struts2.3.4的问题&#xff0c;但最终发现为…

Java虚拟机(JVM)简介

什么是JVM Java虚拟机&#xff08;JVM&#xff09;是使计算机能够运行Java程序的抽象计算机。 JVM有三个概念&#xff1a; 1.规格 2.实施 3.实例。 该规范是正式描述JVM实现要求的文档。 具有单一规范可确保所有实现都可互操作。 JVM实现是满足JVM规范要求的计算机程序。 …

武汉数字工程研究所计算机软件分数,武汉数字工程研究所2017考研成绩查询时间:2月16日...

2017考研复试调剂群 460216643一、成绩公布湖北省2017年全国硕士研究生招生考试成绩由各招生单位公布&#xff0c;湖北省教育考试院于2017年2月16日在官网上公布统考科目成绩。武汉数字工程研究所2017考研成绩查询时间&#xff1a;2月16日二、成绩复核考生如对成绩有异议&#…

符号英语

plus 加号&#xff1b;正号 -  minus 减号&#xff1b;负号  plus or minus 正负号  is multiplied by 乘号  is divided by 除号 &#xff1d; is equal to 等于号 ≠ is not equal to 不等于号 ≡ is equivalent to 全等于号 ≌ …

408计算机考研大纲 doc,2020计算机专业408基础综合考研大纲

研究生考试信息&#xff0c;欢迎访问北京研究生招生信息网。考试大纲是我们每个考研的学生所强烈关注的&#xff0c;因为考试大纲里所公布的内容&#xff0c;是我们复习过程中必须掌握的。随着考研大军逐年壮大&#xff0c;竞争力越来越强。我们可以发现&#xff0c;虽然几乎每…

兰花三七

中文名称&#xff1a;兰花三七 拉丁学名&#xff1a;Liriope cymbidiomorpha (ined) 别称&#xff1a;小叶麦冬 科目&#xff1a;百合科、植物界 生长环境&#xff1a; 耐寒、耐热性均好&#xff0c;可生长于微碱性土壤&#xff0c;对光照适应性强。 主要价值&#xff1a; 夏季…

play!框架_在Play上使用twitter4j! 框架和安全社交很容易

play!框架在昨天的个人黑客马拉松期间&#xff0c;我启动了一个项目&#xff0c;我可能会在这里介绍。 但是&#xff0c;最酷的启示是&#xff08;再次&#xff09;启动和运行起来有多么容易。 创建一个新的Play项目 添加Secure Social并为Twitter配置它&#xff0c;并使用示…

目前个人计算机使用的电子电路主要是,计算机基础知识理论题库.xls

文档介绍&#xff1a;计算机基础知识理论题库_题源章题目 ABCD答案提示 2007ZN MK1 对两个二进制数 1与1分别进行算术加、逻辑加运算,其结果用二进制形式分别表示为________ 。1、 101、1 10、1 10、 10 C 2007 秋 JYBD 1 移动通信系统中关于移动台的叙述正确的是_____ 。移动台…

消失之物

几种方法 容斥&#xff0c;时间二分&#xff0c;dp数据结构强艹&#xff0c;主席树 不要拘泥自己思路&#xff0c;思路放开其实不难 主要讲容斥和时间二分 容斥&#xff08;其实不是容斥&#xff0c;只是类似于容斥&#xff09; 我们算出来总值&#xff0c;然后减去当前贡献即可…

JSON处理1.1:100DaysOfJavaEE8

JSON指针挑战 给定下面代码片段中的JSON文档&#xff0c;使用JSON-P 1.1中的JSON指针功能会得到什么结果&#xff1f; JSON对象的新形状如下&#xff1a; {"name": "Duke","likes": ["Java EE 8","Java","Ice Cream…

航天信息计算机设备如何入账,​收到航天信息发票账务处理

收到航天信息发票账务处理航天信息给开的发票是用于税控软件的,可以进入管理费用(不入固定资产)并予以抵扣.1、税控设备不管金额大小可以直接列入管理费用,不用列入固定资产;应缴税费科目没有待抵扣进项税子目,习惯上在待摊费用科目核算,分录借:管理费用-办公费待摊费用-待抵扣…

公用机房计算机主板选购,才知道,IDC机房运维工作居然可以如此简单!

原标题&#xff1a;才知道&#xff0c;IDC机房运维工作居然可以如此简单&#xff01;机房运维事情比较多且繁琐&#xff0c;每一个细节都要做到标准、专业和完美&#xff01;如何将繁琐的机房运维简化&#xff0c;就从这几点开始&#xff01;机房运维关于电力定期检测机房内市电…

postman接口自动化

文档&#xff1a;postman接口自动化.note链接&#xff1a;http://note.youdao.com/noteshare?id3b00660bc661504c622c147dc5437af6&sub3FF2993B85AB46C0976CA2E76E594461转载于:https://www.cnblogs.com/Mezhou/p/11279465.html

primefaces_轻量级Web应用程序:PrimeFaces(JSF)+ Guice + MyBatis(第1部分)

primefaces最近&#xff0c;我的朋友问我如何构建轻量级的Java Web应用程序。 许多Java Web开发人员会选择Spring和Hibernate来构建传统的Web应用程序。 但是&#xff0c;它可能不够轻巧。 我建议他尝试使用Guice和MyBatis构建应用程序框架。 尽管Spring比Guice功能更丰富&…