streaming api_通过Spring Integration消费Twitter Streaming API

streaming api

1.概述

众所周知, Spring Integration具有用于与外部系统交互的大量连接器。 Twitter也不例外,而且很长一段时间以来,因为Spring Social一直是一个开箱即用的解决方案,Spring Integration利用该解决方案来连接到社交网络。

1.1Spring社交停产

不幸的是, Spring Social已经到了使用寿命 ,该项目现在处于维护模式。 Spring团队决定不进一步开发Spring Social的原因是,使API绑定与社交网络的API保持同步变得很繁琐。

除此之外,Spring Framework 5发布后,开发人员希望利用其响应式编程模型,这将要求团队在现有的响应式社交绑定旁边重新实现一个响应式Spring Social绑定。

现在建议开发人员要么实现自己的绑定,要么使用专用库之一连接社交网络。

1.2 Spring Integration的Twitter模块已移至扩展

Spring Social现在处于维护模式,这迫使Spring Integration团队将Twitter支持模块从主项目移至扩展。 由于Spring Social不会接收更新,因此它将基于较早的Spring Framework版本构建。 这将导致类路径冲突,也将阻碍Spring Integration的开发。

因此, 从Spring Integration 5.1开始,Twitter模块可作为扩展使用 。

1.3有哪些替代方案?

Twitter4J是Yamas Yusuke开发和维护的,用于Twitter API的非官方Java库。 官方的HBC库(由Twitter构建)是一个Java HTTP Client,用于使用Twitter的Streaming API。 自2016年以来,后者从未见过重大更新,而Twitter4J正在定期更新。

也可以选择实现自己的API绑定。 在使用RestTemplate的基于Spring的项目中,绝对是一种选择,并且这是进行REST调用的简便方法。

本指南以流模式使用Twitter4J,可以将其集成到Spring Integration消息流中。

1.4 Twitter流如何工作?

简而言之, 您的应用打开了一个与Twitter API的单一连接,只要发生新匹配,就会通过该连接发送新结果 。 相反,另一种方法是通过向REST API重复发送请求来批量传送数据。

流提供了一种低延迟的传递机制 ,该机制可以支持非常高的吞吐量,而不必处理速率限制。

2.示例项目

该示例项目展示了Twitter的Streaming API到Spring Integration消息流的集成,可在GitHub找到 : https : //github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming 。

Maven依赖

由于Spring Social现在已经停产,因此我们不会在此基础上继续发展。 我们引入的只是spring-integration-core和twitter4j-stream 。

<dependencies><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId></dependency><dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-stream</artifactId><version>4.0.1</version></dependency></dependencies>

该项目还使用了Lombok和Spring Boot测试支持,但是这些是可选的。

Spring Integration的可听消息源

Spring Integration提供了对实现入站消息组件的支持。 它们分为轮询和监听行为

最初依赖于Inbound Twitter Channel Adapter建立在Spring Social之上,现在已移至扩展, 它是轮询用户 。 也就是说,您必须提供一个轮询器配置才能使用它。 另一方面,Twitter实施速率限制,以管理应用程序获取更新的频率。 使用旧的Twitter Channel适配器时,您应该考虑速率限制,以便您配置的轮询间隔符合Twitter策略。

另一方面, 侦听入站组件更简单,通常只需要实现MessageProducerSupport 。 这样的侦听组件看起来像这样。

public class MyMessageProducer extends MessageProducerSupport {public MyMessageProducer(MessageChannel outputChannel) {// Defining an output channel is requiredsetOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();// Custom initialization - if applicable - comes here}@Overridepublic void doStart() {// Lifecycle method for starting receiving messages}@Overridepublic void doStop() {// Lifecycle method for stopping receiving messages}private void receiveMessage() {// Receive data from upstream serviceSomeData data = ...;// Convert it to a message as appropriate and send it outthis.sendMessage(MessageBuilder.withPayload(data).build());}}

只有两个必需的元素:

  • 必须定义输出消息通道
  • 每当组件收到消息时,都必须调用sendMessage

(可选)您可能希望控制组件的初始化并管理其生命周期。

由于Twitter的Streaming API本质上是消息驱动的,因此监听行为自然很合适。 让我们看看如何在这样的上下文中合并Twitter4J。

使用Twitter4J连接到Twitter Streaming API

Twitter4J管理连接处理的细微差别,并从Twitter的Streaming API接收更新。 我们需要做的就是获取一个TwitterStream实例,附加一个侦听器并定义过滤。

实例化

Twitter4J网站上的流示例表明,应通过TwitterStreamFactory创建一个TwitterStream实例。 这完全有道理,但是在Spring应用程序上下文中,我们希望它成为托管bean。

Spring的FactoryBean工具是包含创建单例TwitterStream实例的详细信息的简单FactoryBean方法。

public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {@Overridepublic Class<?> getObjectType() {return TwitterStream.class;}@Overrideprotected TwitterStream createInstance() {return new twitter4j.TwitterStreamFactory().getInstance();}@Overrideprotected void destroyInstance(TwitterStream twitterStream) {twitterStream.shutdown();}}

尽管我们也可以将其公开为普通的bean,而不用由FactoryBean创建FactoryBean ,但这并不会适当地将其关闭。

附加侦听器并定义过滤

这将是我们自定义MessageProducer实现的责任。

@Slf4j
public class TwitterMessageProducer extends MessageProducerSupport {private final TwitterStream twitterStream;private List<Long> follows;private List<String> terms;private StatusListener statusListener;private FilterQuery filterQuery;public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {this.twitterStream = twitterStream;setOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();statusListener = new StatusListener();long[] followsArray = null;if (!CollectionUtils.isEmpty(follows)) {followsArray = new long[follows.size()];for (int i = 0; i < follows.size(); i++) {followsArray[i] = follows.get(i);}}String[] termsArray = null;if (!CollectionUtils.isEmpty(terms)) {termsArray = terms.toArray(new String[0]);}filterQuery = new FilterQuery(0, followsArray, termsArray);}@Overridepublic void doStart() {twitterStream.addListener(statusListener);twitterStream.filter(filterQuery);}@Overridepublic void doStop() {twitterStream.cleanUp();twitterStream.clearListeners();}public void setFollows(List<Long> follows) {this.follows = follows;}public void setTerms(List<String> terms) {this.terms = terms;}StatusListener getStatusListener() {return statusListener;}FilterQuery getFilterQuery() {return filterQuery;}class StatusListener extends StatusAdapter {@Overridepublic void onStatus(Status status) {sendMessage(MessageBuilder.withPayload(status).build());}@Overridepublic void onException(Exception ex) {log.error(ex.getMessage(), ex);}@Overridepublic void onStallWarning(StallWarning warning) {log.warn(warning.toString());}}
}

MessageProducerSupportTwitterStream的管理界面提供的生命周期方法可以很好地配合使用。 这也将使我们能够在需要时在运行时停止和启动组件。

Java配置

尽管Spring可以自动装配组件,但我仍然更喜欢通过手动配置来控制依赖关系。

@Slf4j
@Configuration
public class TwitterConfig {@BeanTwitterStreamFactory twitterStreamFactory() {return new TwitterStreamFactory();}@BeanTwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {return twitterStreamFactory.getInstance();}@BeanMessageChannel outputChannel() {return MessageChannels.direct().get();}@BeanTwitterMessageProducer twitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {TwitterMessageProducer twitterMessageProducer =new TwitterMessageProducer(twitterStream, outputChannel);twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));return twitterMessageProducer;}@BeanIntegrationFlow twitterFlow(MessageChannel outputChannel) {return IntegrationFlows.from(outputChannel).transform(Status::getText).handle(m -> log.info(m.getPayload().toString())).get();}}

这里的重要部分是我们的自定义消息生成器如何与消息流集成。 基本上,除了在生产者的输出通道中列出消息之外,我们不需要执行任何其他操作。

测试中

只有Chuck Norris在生产中测试代码。 但是,像您和我这样的普通凡人,我们确实会编写测试用例。

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public class TwitterMessageProducerTest {@MockBeanprivate TwitterStream twitterStream;@Autowiredprivate PollableChannel outputChannel;@Autowiredprivate TwitterMessageProducer twitterMessageProducer;@Testpublic void shouldBeInitialized() {StatusListener statusListener = twitterMessageProducer.getStatusListener();verify(twitterStream).addListener(statusListener);FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();verify(twitterStream).filter(filterQuery);}@Testpublic void shouldReceiveStatus() {StatusListener statusListener = twitterMessageProducer.getStatusListener();Status status = mock(Status.class);statusListener.onStatus(status);Message<?> statusMessage = outputChannel.receive();assertSame(status, statusMessage.getPayload());}@Import(TwitterConfig.class)static class TestConfig {@BeanMessageChannel outputChannel() {return MessageChannels.queue(1).get();}}}

我喜欢Twitter4J的设计,因为它利用了界面。 该库的大多数重要部分都作为普通接口公开。 TwitterStream也不例外。 也就是说,在测试案例中可以轻松地将其嘲笑。

六,结论

  • Spring Social现在已经停产了 -它不会收到新功能
  • Spring Integration的Twitter模块可作为扩展使用 -已从主项目中移出。
  • Twitter入站通道适配器是一个轮询用户 -选择轮询间隔时必须处理速率限制
  • Twitter的Streaming API符合入站通道适配器的监听行为

翻译自: https://www.javacodegeeks.com/2018/12/streaming-api-spring-integration.html

streaming api

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/332724.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux命令之 mount -- 文件系统挂载

文章目录简介参考实例加载指定的分区参考简介 mount 命令用于加载文件系统到指定的加载点。此命令的最常用于挂载 cdrom&#xff0c;使我们可以访问 cdrom 中的数据&#xff0c;因为你将光盘插入 cdrom 中&#xff0c;Linux 并不会自动挂载&#xff0c;必须使用 Linux mount 命…

java 舍,java 4舍六入五成双

java 四舍六入五成双1. 小于5舍去&#xff0c;即舍去部分的数值小于保留部分的末位的半个单位&#xff0c;则末位不变&#xff1b;2. 大于5进1&#xff0c;即舍去部分的数值大于保留部分的末位的半个单位&#xff0c;则末位加1&#xff1b;3. 等于5时取偶数&#xff0c;即舍去部…

Linux命令之 umount -- 卸载文件系统

文章目录介绍参考介绍 umount 是“unmount”的缩写&#xff0c;译为“不挂载。所以它的”的作用是卸载已挂载的文件系统、目录或文件。 利用设备名或挂载点都能umount文件系统&#xff0c;不过最好还是通过挂载点卸载&#xff0c;以免使用绑定挂载&#xff08;一个设备&#x…

aws技术峰会2018_AWS re:Invent 2018的5大公告

aws技术峰会2018AWS re&#xff1a;Invent刚刚完成。 这是一个巨大的活动&#xff0c;在拉斯维加斯7家最大的酒店中&#xff0c;有50,000多名与会者&#xff0c;并发布了许多新的服务公告。 无服务器端通过新的lambda增强功能和更好的容器支持继续受到很多关注。 AWS通过新的“…

php 降低图像大小,PHP图像重新调整大小

我有一个PHP脚本,可以重新调整JPEG图像的大小.但是,由于某种原因,图像被扭曲,即使我将其编程为按比例计算x或y(取决于照片方向).质量是100,所以我不明白为什么它会使它们扭曲.我究竟做错了什么&#xff1f;编辑原始图像为3264px x 2448px谢谢代码&#xff1a;$im ImageCreateF…

Linux命令之 mke2fs -- 格式化分区(为分区写入文件系统)

文章目录简介命令选项参考实例格式化指定的分区简介 在磁盘分区上创建 ext2、ext3、ext4 等文件系统&#xff0c;默认情况下会创建 ext2。 虽然 mkfs 命令非常简单易用&#xff0c;但其不能调整分区的默认参数&#xff08;比如块大小是 4096 Bytes&#xff09;&#xff0c;这…

Linux格式化分区的命令

查看系统所有的文件系统&#xff1a; [~]$ df -ah Filesystem Size Used Avail Capacity iused ifree %iused Mounted on /dev/disk1s5s1 234Gi 15Gi 65Gi 19% 575614 682553320 0% / devfs 190Ki 190Ki 0Bi 100% 658 …

java jwt 验证_教程:用Java创建和验证JWT

java jwt 验证“我喜欢编写身份验证和授权代码。” 〜从来没有Java开发人员。 厌倦了一次又一次地建立相同的登录屏幕&#xff1f; 尝试使用Okta API进行托管身份验证&#xff0c;授权和多因素身份验证。 Java对JWT&#xff08;JSON Web令牌&#xff09;的支持过去需要进行大量…

php网页布局边框,用CSS来设置网页当中的边框

摘要&#xff1a;跟着老师视频做的一个css边框小练习 css——边框 &nbs跟着老师视频做的一个css边框小练习 css——边框 .box{width:100px;height:100px;border:1px solid #ccc;border-radius:20px;}.main{width:100px;height:100px;border-top: 1px s…

Linux命令之 mkfs -- 在特定的分区创建 Linux 文件系统

文章目录简介选项参考示例在 /dev/hda5 上建一个 msdos 的文件系统&#xff0c;同时检查是否有坏轨存在&#xff0c;并且将过程详细列出来&#xff1a;将指定分区格式化成各种类型的文件系统简介 该命令用来在特定的分区创建 Linux 文件系统&#xff0c;常见的文件系统有 ext2…

本地线程分配缓冲_线程本地分配缓冲区

本地线程分配缓冲最近&#xff0c;我一直在研究遭受严重性能问题的Java应用程序。 在许多问题中&#xff0c;真正引起我注意的一个问题是新对象的分配速度相对较慢&#xff08;应用程序分配了大量的相当大的对象&#xff09;。 后来发现&#xff0c;原因是大量的分配发生在TLAB…

mysql 触发器计算总价,mysql’插入’触发器根据其他字段计算字段

我正在尝试创建一个触发器,它将根据用户输入的lat / lng列更新GEOMETRY列.我的触发器看起来像这样 –CREATE TRIGGER tbl.fooAFTER INSERT ON tbl FOR EACH ROWBEGINUPDATE tblSET coord Point(lng, lat)WHERE id NEW.id;END但是,当我插入一个带有lng,lat值的新行时,我收到以…

oracle连接外部数据库_使用Oracle验证外部数据

oracle连接外部数据库我经常在Corda Slack频道中闲逛&#xff0c;并尽可能回答问题。 我尝试回答的合理数量的问题与Oracle有关。 更具体地说&#xff0c;何时使用。 我觉得我可以回答&#xff0c;“当您需要验证可能经常更改的外部数据时使用Oracle”。 我大概在某个时候写了一…

macOS如何格式化移动硬盘和U盘

1.打开磁盘工具 2.在左侧选择要格式化的外置磁盘设备&#xff0c;接着在右上角点击【抹掉】 3.点击【抹掉】会弹出如下的对话框&#xff0c;在格式中建议选择 ExFAT 格式&#xff0c;这是一个在 Windows 和 macOS 都可以使用的文件系统格式 选择好要格式化的文件系统格式后&…

macOS查看IP地址的命令

查看内网的 IP 地址&#xff1a; [~]$ ipconfig getifaddr en0 192.168.30.25 # 或者 [~]$ ifconfig en0 en0: flags8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500options400<CHANNEL_IO>ether d4:61:9d:11:c2:94 inet6 fe80::cbc:309b:57a4:5cf6…

mysql注入漏洞语句,web安全之sql注入漏洞

概念通过把SQL命令插入到Web表单提交或输入域名或页面请求的查询字符串&#xff0c;最终达到欺骗服务器执行恶意的SQL命令。通俗地讲&#xff0c;它是利用现有应用程序&#xff0c;将(恶意)的SQL命令注入到后台数据库引擎执行的能力&#xff0c;它可以通过在Web表单中输入(恶意…

ssrs 基于表达式显示_基于表达式的访问控制

ssrs 基于表达式显示1.概述 今天&#xff0c;我们将回顾基于表达式的访问控制&#xff08;EBAC&#xff09;&#xff0c;基于角色的访问控制&#xff08;RBAC&#xff09;和基于属性的访问控制&#xff08;ABAC&#xff09;之间的区别&#xff0c;并将重点放在EBAC上。 2.什么…

macOS下载、安装、使用tomcat服务器及IntelliJ IDEA for Mac 如何集成、配置、运行tomcat

文章目录web 服务器软件tomcat如何下载安装 tomcatmacOS 下如何启动 tomcatWindows 启动 tomcat部署项目的方式直接将项目放到 webapps 目录下即可在 tomcat 的配置文件 server.xml 中进行配置部署在 tomcat 的 localhost 目录下通过 xml 文件完成部署IntelliJ IDEA 集成 tomca…

run spark pi_Spark Run本地设计模式

run spark pi现在&#xff0c;许多Spark应用程序已成为遗留应用程序&#xff0c;很难在本地进行增强&#xff0c;测试和运行。 Spark具有很好的测试支持&#xff0c;但仍有许多Spark应用程序不可测试。 当您尝试运行一些旧的Spark应用程序时&#xff0c;我将分享一个常见错误…

php 表单 同步,Jquery点击按钮 异步和同步提交表单

最近在开发一个jsp学生信息管理系统&#xff0c;由于刚刚接触jsp&#xff0c;遇到问题比较多&#xff0c;特此记录与大家分享。Jquery ajax提交表单到servlet示例前台部分代码&#xff1a;姓名学号 ajax提交表单代码&#xff1a;//增加学生&#xff0c;异步提交学生表单$(&q…