meetup_使用RxNetty访问Meetup的流API

meetup

本文将涉及多个主题:响应式编程,HTTP,解析JSON以及与社交API集成。 完全在一个用例中:我们将通过非夸张的RxNetty库实时加载和处理新的metup.com事件,结合Netty框架的强大功能和RxJava库的灵活性。 Meetup提供了公开可用的流API ,可实时推送世界各地注册的每个Meetup。 只需浏览至stream.meetup.com/2/open_events并观察JSON块如何缓慢地出现在屏幕上。 每当有人创建新事件时,自包含的JSON就会从服务器推送到您的浏览器。 这意味着这样的请求永无止境,相反,只要需要,我们就会不断接收部分数据。 我们已经在将Twitter4J变成RxJava的Observable中研究了类似的情况。 每个新的Meetup事件都会发布一个独立的JSON文档,与此类似(省略许多细节):

{ "id" : "219088449","name" : "Silver Wings Brunch","time" : 1421609400000,"mtime" : 1417814004321,"duration" : 900000,"rsvp_limit" : 0,"status" : "upcoming","event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/","group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co","state" : "CA"...},"venue" : { "address_1" : "26860 Ortega Highway","city" : "San Juan Capistrano","country" : "US"...},"venue_visibility" : "public","visibility" : "public","yes_rsvp_count" : 1...
}

每当我们长时间轮询的HTTP连接(带有Transfer-Encoding: chunked响应标头)推送此类JSON时,我们都希望对其进行解析并以某种方式进一步传递。 我们讨厌回调,因此RxJava似乎是一个合理的选择(认为: Observable<Event> )。

步骤1:使用RxNetty接收原始数据

我们不能使用普通的HTTP客户端,因为它们专注于请求-响应语义。 这里没有任何响应,我们只是永远保持打开的连接,并在数据到达时使用它们。 RxJava具有开箱即用的RxApacheHttp库,但它假定为text/event-stream内容类型 。 相反,我们将使用底层的通用RxNetty库。 它是Netty(duh!)的包装,并且能够实现任意的 TCP / IP(包括HTTP)以及UDP客户端和服务器。 如果您不了解Netty,则它是基于数据包的,而不是面向流的,因此我们可以预期每次Meetup推送都会有一个Netty事件。 该API当然不是简单明了的,但是一旦您使用它,它就会变得有意义:

HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443).pipelineConfigurator(new HttpClientPipelineConfigurator<>()).withSslEngineFactory(DefaultFactories.trustAll()).build();final Observable<HttpClientResponse> responses = httpClient.submit(HttpClientRequest.createGet("/2/open_events"));
final Observable byteBufs = responses.flatMap(AbstractHttpContentHolder::getContent);
final Observable chunks = byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));

首先,我们创建HttpClient并设置SSL(请注意,关于服务器证书的trustAll()可能不是最佳的生产设置)。 稍后我们submit() GET请求,并返回Observable<HttpClientResponse<ByteBuf>>ByteBuf是Netty对通过网络发送或接收的一堆字节的抽象。 此观察结果将立即告诉我们从Meetup收到的每条数据。 从响应中提取ByteBuf ,我们将其转换为包含上述JSON的String 。 到目前为止,一切正常。

步骤2:将数据包与JSON文档对齐

Netty非常强大,因为它不会掩盖泄漏抽象所固有的复杂性。 每次通过TCP / IP线路接收到某些内容时,都会通知我们。 您可能会相信,当服务器发送100字节时,客户端的Netty会将收到的这100字节通知我们。 但是,TCP / IP堆栈可以自由地拆分和合并您通过有线发送的数据,尤其是因为它假定是流,因此如何将其拆分为数据包应该是无关紧要的。 Netty的文档中对此警告做了很大的解释。 对我们意味着什么? 当Meetup发送单个事件时,我们可能仅收到一个可观察到的chunks String 。 但是同样可以将其划分为任意数量的数据包,因此chunks将发出多个String 。 更糟糕的是,如果Meetup接连发送两个事件,则它们可能适合一个数据包。 在这种情况下, chunks将发出一个带有两个独立JSON文档的String 。 事实上,我们不能假设JSON字符串和收到的网络数据包之间有任何对齐。 我们所知道的是,代表事件的各个JSON文档由换行符分隔。 令人惊讶的是, RxJavaString官方附加组件RxJavaString提供了一种精确的方法:

Observable jsonChunks = StringObservable.split(chunks, "\n");

实际上,甚至还有更简单的StringObservable.byLine(chunks) ,但它使用的是平台相关的行尾。 最好在官方文档中解释split()作用:

圣分裂

现在我们可以安全地解析jsonChunks发出的每个String了:

步骤3:解析JSON

有趣的是,这一步骤并不是那么简单。 我承认,我排序的享受WSDL时间,因为我很容易,可预见生成如下web服务的合同Java模型。 JSON,特别是在JSON模式的边缘市场渗透方面,基本上是集成的“狂野西部”。 通常,您会得到非正式的文档或请求和响应的样本。 没有类型信息或格式,无论字段是否为必填项,等等。此外,由于我不情愿使用地图映射 (在那里,Clojure程序员),为了使用基于JSON的REST服务,我必须自己编写映射POJO。 好吧,有解决方法。 首先,我举了一个由Meetup流API生成的JSON的典型示例,并将其放在src/main/json/meetup/event.json 。 然后,我使用jsonschema2pojo-maven-plugin ( 也存在Gradle和Ant版本)。 插件的名称令人困惑,它还可以与JSON示例(不仅是架构)一起使用以生成Java模型:

<plugin><groupId>org.jsonschema2pojo</groupId><artifactId>jsonschema2pojo-maven-plugin</artifactId><version>0.4.7</version><configuration><sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory><targetPackage>com.nurkiewicz.meetup.generated</targetPackage><includeHashcodeAndEquals>true</includeHashcodeAndEquals><includeToString>true</includeToString><initializeCollections>true</initializeCollections><sourceType>JSON</sourceType><useCommonsLang3>true</useCommonsLang3><useJodaDates>true</useJodaDates><useLongIntegers>true</useLongIntegers><outputDirectory>target/generated-sources</outputDirectory></configuration><executions><execution><id>generate-sources</id><phase>generate-sources</phase><goals><goal>generate</goal></goals></execution></executions>
</plugin>

此时,Maven将创建与Jackson兼容的Event.javaVenue.javaGroup.java等:

private Event parseEventJson(String jsonStr) {try {return objectMapper.readValue(jsonStr, Event.class);} catch (IOException e) {throw new UncheckedIOException(e);}
}

很好,它很好:

final Observableevents = jsonChunks.map(this::parseEventJson);

步骤5:获利!!!

有了Observable<Event>我们可以实现一些非常有趣的用例。 是否要查找刚刚创建的波兰所有聚会的名称? 当然!

events.filter(event -> event.getVenue() != null).filter(event -> event.getVenue().getCountry().equals("pl")).map(Event::getName).forEach(System.out::println);

寻找统计信息每分钟创建多少个事件? 没问题!

events.buffer(1, TimeUnit.MINUTES).map(List::size).forEach(count -> log.info("Count: {}", count));

或者,您是否想继续搜索将来最远的聚会,而跳过比已发现的聚会更近的聚会?

events.filter(event -> event.getTime() != null).scan(this::laterEventFrom).distinct().map(Event::getTime).map(Instant::ofEpochMilli).forEach(System.out::println);//...private Event laterEventFrom(Event first, Event second) {return first.getTime() > second.getTime() ?first :second;
}

此代码过滤掉没有已知时间的事件,发出当前事件或前一个事件( scan() ),具体取决于后面的事件,过滤出重复事件并显示时间。 这个运行了几分钟的微型程序已经发现一个计划于2015年11月创建的聚会,而在撰写本文时它是2014年12月。 可能性是无止境的。

希望我能对如何轻松地将各种技术融合在一起有一个很好的了解:React式编程以编写超快速的网络代码,无样板代码的类型安全的JSON解析和RxJava来快速处理事件流。 请享用!

翻译自: https://www.javacodegeeks.com/2014/12/accessing-meetups-streaming-api-with-rxnetty.html

meetup

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/338572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html约束验证的例子,HTML5利用约束验证API来检查表单的输入数据的代码实例

HTML5对于表单有着极大程度的优化&#xff0c;无论是语义&#xff0c;小部件&#xff0c;还是数据格式的验证。我猜你肯定会以浏览器兼容作为借口不愿意使用这些“新功能”&#xff0c;但这绝不应该成为使你停滞不前的原因&#xff0c;况且还有像Modernizr和ployfill这样的工具…

C语言经典题(1)

输入某年某月某日&#xff0c;判断这一天是这一年的第几天&#xff1f;程序分析&#xff1a;以3月5日为例&#xff0c;应该先把前两个月的加起来&#xff0c;然后再加上5天即本年的第几天&#xff0c;特殊情况&#xff0c;闰年且输入月份大于3时需考虑多加一天#include int mai…

jpg图片使用pil的resize后_如何使用PIL调整图像大小并保持其纵横比?

是否有一种显而易见的方法可以解决这个问题&#xff1f; 我只是想制作缩略图。#1楼PIL已经可以选择裁剪图像img ImageOps.fit(img, size, Image.ANTIALIAS)#2楼我丑陋的例子。函数获取文件如&#xff1a;“pic [0-9a-z]。[extension]”&#xff0c;将它们调整为120x120&#x…

openshift_OpenShift上的Java EE工作流(技术提示#64)

openshift该网络研讨会展示了如何使用WildFly &#xff0c; JBoss Tools &#xff0c; Forge &#xff0c; Arquillian和OpenShift在OpenShift上创建Java EE工作流。 具体来说&#xff0c;它谈论&#xff1a; 如何使用JBoss Developer Studio轻松开发Java EE应用程序并将其直接…

C语言面试-指针和引用的使用场景?

先解决两个疑问◆ 指针和引用的不同之处是什么&#xff1f;◆ 何时用用指针&#xff1f;何时用引用&#xff1f;指针和引用的不同之处看如下代码&#xff1a;指针是用来表示内存地址的&#xff0c;而指针这个整数正是被指向的变量地址。而引用就是给变量重新起了一个名字&#…

排序算法html,排序算法总结.html

排序算法总结 | borens blog排序算法总结borens blog首页所有文章关于作者排序算法总结Apr 6, 2018| 技术人生| 阅读排序,顾名思义,将数据按照某种规则排列起来.这种规则可以是根据基本的数值大小排序,也可以是通过字符串长度比较来排序,又或者是优先根据两个字符串的第一个不…

win10商店下载位置_win10应用商店下载的东西在哪

win10应用商店下载的东西在哪&#xff1f;我们都知道&#xff0c;微软从win8开始&#xff0c;就推出了应用商店&#xff0c;到现在的win10&#xff0c;还是一样。现在的win10应用商店里的东西也越来越丰富了。但是有些win10新用户朋友们在win10应用商店下载了东西&#xff0c;准…

异步http 超时_具有CompletableFuture的异步超时

异步http 超时有一天&#xff0c;我重写了执行不佳的多线程代码&#xff0c;该代码在Future.get()某个时刻被阻塞&#xff1a; public void serve() throws InterruptedException, ExecutionException, TimeoutException {final Future<Response> responseFuture async…

C语言指针变量与一维数组

数组元素之间的地址是相连的&#xff1b;变量地址绝对不是相连的&#xff0c;如果找到规律那也只是一个偶然的&#xff0c;不是必然的&#xff1b;1. 指针变量和一位数组下面对指针数组进行分析&#xff0c;index(小标是从0开始)&#xff0c;array数组是int类型&#xff0c;每一…

天干地支计算公式_高大上:天干地支计算方法

天干地支是我国古代计算年月日的重要依据&#xff0c;作为现代人的我们&#xff0c;有必要去了解一下&#xff0c;他们&#xff0c;以备不时之需。首先介绍一下&#xff0c;天干和地支。共有十天干和十二地支。十天干&#xff1a;甲(jiǎ)、乙(yǐ)、丙(bǐng)、丁(dīng)、戊(…

input发送a.jax_JAX-RS 2.0:服务器端处理管道

input发送a.jax这篇文章的灵感来自JAX-RS 2.0规范文档 &#xff08;附录C&#xff09;中的Processing Pipeline部分。 我喜欢它的原因在于它提供了JAX-RS中所有模块的漂亮快照-以准备好吞咽的胶囊形式&#xff01; 礼貌– JAX-RS 2.0规范文档 因此&#xff0c;我想到了使用此…

html 登录失败,qq音乐登录失败 QQ音乐总是显示登录失败是怎么回事

urlproc.exe是什么进程?没见过&#xff0c;请前辈们指点&#xff1f;造成QQ音乐登录不上现象的原因有如下三种可能&#xff1a; 一、木马病毒对QQ音乐的必要组件或文件进行破坏&#xff0c;导致QQ音乐登录失败&#xff0c;登陆不上的情况发生。 二、Windows系统防火墙(或其他安…

C 常对象成员

C 常对象成员在C 中&#xff0c;可以将对象的成员声明为const&#xff0c;包括常数据成员和常成员函数C 常数据成员 常数据成员的作用与一般常变量相似&#xff0c;用关键字const来声明常数据成员。常数据成员的值是不能改变的&#xff0c;只能通过构造函数的参数初始化表对常数…

python gitlab_Python Gitlab Api 使用方法

简述公司使用gitlab 来托管代码,日常代码merge request 以及其他管理是交给测试&#xff0c;鉴于操作需经常打开网页,重复且繁琐,所以交给Python 管理。安装pip install python-gitlab环境: py3DEMO# -*- coding: utf-8 -*-__Author__ "xiewm"__Date__ 2017/12/26 …

tomee_Apache TomEE + JMS。 从未如此简单。

tomee我记得J2EE &#xff08;1.3和1.4&#xff09;的过去&#xff0c;使用JMS启动项目非常困难。 您需要安装JMS 代理 &#xff0c;创建主题或队列 &#xff0c;最后使用服务器配置文件和JNDI开始自己的战斗。 感谢JavaEE 6及其它&#xff0c;使用JMS确实非常简单。 但是&…

娄底二中高考2021成绩查询,2021年娄底高考状元名单公布,娄底高考状元学校资料及最高分...

2019年高考已经落下帷幕&#xff0c;高考放榜时刻就要到来&#xff0c;每年的高考状元都会被各界高度关注&#xff0c;那么今年娄底高考状元花落谁家呢&#xff1f;娄底高考状元会给人带来惊喜吗&#xff0c;让我们一起期待2019年娄底高考状元的诞生。下面小编为给为梳理下历年…

C 常指针

C 指向对象的常指针C 定义指向对象的常指针的一般形式为 类名 * const指针变量名&#xff1b;也可以在定义指针变量时使之初始化在C 中&#xff0c;虽然指向对象的常指针变量的值不能改变&#xff0c;但可以改变其所指向对象的值&#xff0c;如果想将一个指针变量固定地与一个对…

microsoftexchange邮箱容量怎样看_企业邮箱申请注册的要求有哪些?_网站建设_创客网络...

随着现在互联网信息化发展的速度不断加快&#xff0c;企业邮箱作为企业之间信息传输的重要工具&#xff0c;其重要性不亚于企业网站&#xff0c;但市场上企业邮箱种类繁多&#xff0c;包含的功能也各不相同&#xff0c;稍有差异&#xff0c;你知道怎么去选择好企业邮箱吗&#…

java 登陆验证失败_使用Java 8流进行快速失败的验证

java 登陆验证失败我已经失去了看过使用快速失败验证代码状态的代码的次数&#xff0c;方法如下&#xff1a; public class PersonValidator {public boolean validate(Person person) {boolean valid person ! null;if (valid) valid person.givenName ! null;if (valid) v…

Unity中Shader观察空间推导(在Shader中实现)

文章目录 前言一、观察空间矩阵推导1、求观察空间基向量2、求观察空间的基向量在世界空间中的矩阵 的 逆矩阵2、求平移变换矩阵3、相乘得出 观察空间转化矩阵4、得到顶点的世界空间坐标&#xff0c;然后转化到观察空间5、把观察空间坐标转化为齐次裁剪坐标输出到屏幕 二、最终效…