带有Java DSL的Spring Integration MongoDB适配器

1引言

这篇文章解释了如何使用Spring Integration从MongoDB数据库中保存和检索实体。 为了实现这一点,我们将使用Java DSL配置扩展来配置入站和出站MongoDB通道适配器。 例如,我们将构建一个应用程序,使您可以将订单写入MongoDB存储,然后检索它们进行处理。

应用程序流程可以分为两部分:

  • 新订单将发送到消息传递系统,在该系统中它们将被转换为实际产品,然后存储到MongoDB。
  • 另一方面,另一个组件正在连续轮询数据库并处理它找到的任何新产品。

可以在我的Spring Integration存储库中找到源代码。

2 MessagingGateway –进入消息传递系统

我们的应用程序对消息传递系统一无所知。 实际上,它只会创建新订单并将其发送到接口(OrderService):

@SpringBootApplication
@EnableIntegration
public class MongodbBasicApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(MongodbBasicApplication.class, args);new MongodbBasicApplication().start(context);}public void start(ConfigurableApplicationContext context) {resetDatabase(context);Order order1 = new Order("1", true);Order order2 = new Order("2", false);Order order3 = new Order("3", true);InfrastructureConfiguration.OrderService orderService = context.getBean(InfrastructureConfiguration.OrderService.class);orderService.order(order1);orderService.order(order2);orderService.order(order3);}private void resetDatabase(ConfigurableApplicationContext context) {ProductRepository productRepository = context.getBean(ProductRepository.class);productRepository.deleteAll();}
}

首先看一下配置,我们可以看到OrderService实际上是一个消息传递网关。

@Configuration
@ComponentScan("xpadro.spring.integration.endpoint")
@IntegrationComponentScan("xpadro.spring.integration.mongodb")
public class InfrastructureConfiguration {@MessagingGatewaypublic interface OrderService {@Gateway(requestChannel = "sendOrder.input")void order(Order order);}...
}

发送到order方法的任何订单都将通过“ sendOrder.input”直接通道作为Message <Order>引入消息系统。

3第一部分-处理订单

Spring Integration消息流的第一部分由以下组件组成:

flow_firstpart

我们使用lambda创建一个IntegrationFlow定义,该定义将DirectChannel注册为其输入通道。 输入通道的名称解析为'beanName + .input'。 因此,该名称就是我们在网关中指定的名称:“ sendOrder.input”

@Bean
@Autowired
public IntegrationFlow sendOrder(MongoDbFactory mongo) {return f -> f.transform(Transformers.converter(orderToProductConverter())).handle(mongoOutboundAdapter(mongo));
}

流程在收到新订单时要做的第一件事是使用变压器将订单转换为产品。 要注册一个变压器,我们可以使用DSL API提供的Transformers工厂。 在这里,我们有不同的可能性。 我选择的是使用PayloadTypeConvertingTransformer ,它将有效负载转换为对象的委托给转换器。

public class OrderToProductConverter implements Converter<Order, Product> {@Overridepublic Product convert(Order order) {return new Product(order.getId(), order.isPremium());}
}

订单流程的下一步是将新创建的产品存储到数据库中。 在这里,我们使用MongoDB出站适配器:

@Bean
@Autowired
public MessageHandler mongoOutboundAdapter(MongoDbFactory mongo) {MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongo);mongoHandler.setCollectionNameExpression(new LiteralExpression("product"));return mongoHandler;
}

如果您想知道消息处理程序在内部实际上在做什么,它将使用mongoTemplate保存该实体:

@Override
protected void handleMessageInternal(Message<?> message) throws Exception {String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);Object payload = message.getPayload();this.mongoTemplate.save(payload, collectionName);
}

4第二部分–加工产品

在第二部分中,我们还有另一个用于处理产品的集成流程:

flow_secondpart

为了检索以前创建的产品,我们定义了一个入站通道适配器,它将继续轮询MongoDB数据库:

@Bean
@Autowired
public IntegrationFlow processProduct(MongoDbFactory mongo) {return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(Pollers.fixedDelay(3, TimeUnit.SECONDS))).route(Product::isPremium, this::routeProducts).handle(mongoOutboundAdapter(mongo)).get();
}

MongoDB入站通道适配器是负责从数据库轮询产品的适配器。 我们在构造函数中指定查询。 在这种情况下,我们每次都会轮询一种未加工的产品:

@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'processed' : false}"));messageSource.setExpectSingleResult(true);messageSource.setEntityClass(Product.class);messageSource.setCollectionNameExpression(new LiteralExpression("product"));return messageSource;
}

路由器定义显示了如何根据“溢价”字段将产品发送到其他服务激活器方法:

private RouterSpec<Boolean, MethodInvokingRouter> routeProducts(RouterSpec<Boolean, MethodInvokingRouter> mapping) {return mapping.subFlowMapping(true, sf -> sf.handle(productProcessor(), "fastProcess")).subFlowMapping(false, sf -> sf.handle(productProcessor(), "process"));
}

作为服务激活器,我们有一个简单的bean,它记录一条消息并将产品设置为已处理。 然后,它将返回产品,以便流程中的下一个端点可以处理它。

public class ProductProcessor {public Product process(Product product) {return doProcess(product, String.format("Processing product %s", product.getId()));}public Product fastProcess(Product product) {return doProcess(product, String.format("Fast processing product %s", product.getId()));}private Product doProcess(Product product, String message) {System.out.println(message);product.setProcessed(true);return product;}
}

将产品设置为已处理的原因是因为下一步是更新其在数据库中的状态,以便不再对其进行轮询。 我们通过将流再次重定向到mongoDb出站通道适配器来保存它。

5结论

您已经了解了必须使用哪些端点才能使用Spring Integration与MongoDB数据库进行交互。 出站通道适配器将产品被动保存到数据库中,而入站通道适配器则主动轮询数据库以检索新产品。

如果您发现此帖子有用,请分享或给我的存储库加注星标。 我很感激 :)

我正在Google Plus和Twitter上发布我的新帖子。 如果您要更新新内容,请关注我。

翻译自: https://www.javacodegeeks.com/2016/11/spring-integration-mongodb-adapters-java-dsl.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/351047.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

matlab linspace

用法&#xff1a;linspace(x1,x2,N)   功能&#xff1a;linspace是Matlab中的一个指令&#xff0c;用于产生x1,x2之间的N点行矢量。其中x1、x2、N分别为起始值、中止值、元素个数。若缺省N&#xff0c;默认点数为100。在matlab的命令窗口下输入help linspace或者doc linspac…

Linux strace命令

简介 strace常用来跟踪进程执行时的系统调用和所接收的信号。 在Linux世界&#xff0c;进程不能直接访问硬件设备&#xff0c;当进程需要访问硬件设备(比如读取磁盘文件&#xff0c;接收网络数据等等)时&#xff0c;必须由用户态模式切换至内核态模式&#xff0c;通 过系统调用…

网站发布

1.文件发布 右击工程&#xff0c;选择发布 发布方法选择文件发布&#xff0c;打开你的程式路径&#xff0c;然后一步步操作即可。 转载于:https://www.cnblogs.com/alannxu/p/10613453.html

什么是javax.ws.rs.core.context? [第4部分]

如何使用Context批注 在什么是javax.ws.rs.core.context的第3部分中&#xff1f; 您学习了如何在请求和配置&#xff0c;提供程序和应用程序实例中使用Context批注。 在本文中&#xff0c;您将学习如何使用Context批注注入HttpServletResponse和HttpServletRequest类。 获取对…

matlab im2double

im2double函数&#xff0c;如果输入是 uint8 unit16 或者是二值的logical类型&#xff0c;则函数im2double 将其值归一化到0&#xff5e;1之间。

重学前端(一)

前端知识框架&#xff1a;自己觉得很不错的一个前端知识框架 转载于:https://www.cnblogs.com/angel1254/p/10616065.html

couchbase_Couchbase:使用Twitter和Java创建大型数据集

couchbase在播放/演示Couchbase或任何其他NoSQL引擎时&#xff0c;创建大型数据集的一种简单方法是将Twitter feed注入数据库。 对于这个小应用程序&#xff0c;我正在使用&#xff1a; Couchbase Server 2.0服务器 Couchbase Java SDK &#xff08;将由Maven安装&#xff0…

C编译器、链接器、加载器详解

一、概述 C语言的编译链接过程要把我们编写的一个c程序&#xff08;源代码&#xff09;转换成可以在硬件上运行的程序&#xff08;可执行代码&#xff09;&#xff0c;需要进行编译和链接。编译就是把文本形式源代码翻译为机器语言形式的目标文件的过程。链接是把目标文件、操作…

matlab bwdist

bwdist函数用于计算元素之间的距离。 举个例子&#xff1a; 如果a 0 0 0 0 0 0 1 1 1 0 0 1 1 1 0 0 1 1 1 0 0 0 0 0 0 那么&#xff1a; [D,L]bwdist(a); D 1.4142 1.0000 1.0000 1.0000 1.4142 1.0000 0 0 0 1.0000 1.0000 0 0 0 1.0000 1.0000 0 0 0 1.0000 1.4142 1.000…

js函数库-D3

推荐&#xff1a; https://www.cnblogs.com/createGod/p/6884629.html转载于:https://www.cnblogs.com/john-hwd/p/10616166.html

配置Ubuntu Server高速apt-get源

今天刚装上Ubuntu Server 12&#xff0c;默认的apt-get源比较慢。更改一下源地址。 方法&#xff1a; 1、修改源地址&#xff1a; cp /etc/apt/sources.list /etc/apt/sources.list.bak vim /etc/apt/sources.list 加入如下内容&#xff08;中科大的&#xff09;&#xff1a; …

matlab find

find函数用于返回所需要元素的所在位置 (位置的判定&#xff1a;在矩阵中&#xff0c;第一列开始&#xff0c;自上而下&#xff0c;依次为1&#xff0c;2&#xff0c;3...,然后再从第二列&#xff0c;第三列依次往后数)find&#xff08;A&#xff09;返回矩阵A中非零元素所在位…

红黑树操作详解——很形象的过程

红黑树是一种很好的自平衡二叉排序树&#xff0c;在此&#xff0c;给出一个网友给出的红黑树操作详解&#xff1a; https://segmentfault.com/a/1190000012728513 里面给出了红黑树的详细操作&#xff0c;过程很形象&#xff01;&#xff01;&#xff01; 结合可视化数据结构网…

地图事件触发_使用地图触发功能处理相干事件

地图事件触发本文介绍如何通过使用映射触发器来处理一致性事件。 基本上&#xff0c;建议使用Oracle Coherence中的分布式数据管理来研究Oracle Coherence API的基本配置和实现。 映射触发器是Oracle Coherence提供最高度定制的缓存管理系统的最重要功能之一。 MapTrigger表示…

C++ 内存对齐

注&#xff1a;本文代码测试环境为win7 X64 cpu, 编译器为gcc4.7.1 和 vs2010 内存对齐是编译器为了便于CPU快速访问而采用的一项技术 我们先从一个例子开始&#xff0c;对下面的类(或者结构体) class node { char c; int i; short s; }no; sizeof(no)的值是多少呢&#xff0c;…

matlab sub2ind与ind2sub

sub2ind与ind2sub函数 A [1 2 3; 4 5 6;7,8,9]; >> fsub2ind(size(A), 2, 3) f 8 即把矩阵A中第二行第三列的元素的全下标标识&#xff08;2,3&#xff09;转换为对应的单下标标识8&#xff0c;即该元素从第一列顺次数过去是第八号元素。 而ind2sub则用于把矩阵中…

Spring Boot和Angular 2入门食谱

我主要是一名服务开发人员&#xff0c;必须不时创建一些可传递的UI。 我精通基于AngularJS1的基本UI&#xff0c;并且可以使用之前概述的方法来完成工作。 遗憾的是&#xff0c;随着Angular 2的出现&#xff0c;我不得不将以前的方法抛诸脑后&#xff0c;而现在使用Spring Boot…

Robbers' watch CodeForces - 685A (暴力)

大意: 一天n小时, m分钟, 表以7进制显示, 求表显示数字不同的方案数 注意到小时和分钟部分总长不超过7, 可以直接暴力枚举. 关键要特判0, 0的位数要当做1来处理 #include <iostream> #include <algorithm> #include <cstdio> #include <math.h> #inclu…

什么是javax.ws.rs.core.context? [第5部分]

如何使用Context批注 在什么是javax.ws.rs.core.context的第4部分中&#xff1f; 您学习了如何使用Context批注将HttpServletResponse和HttpServletRequest类注入资源方法。 在本文中&#xff0c;您将学习如何使用其余两个仅在servlet容器中可用的类&#xff0c;它们是&#x…

Linux字符界面和图形界面

Ubuntu图形界面和字符界面的切换 Ubuntu和其他的Linux系统一样&#xff0c;有图形界面和字符界面&#xff0c;同时能够设置默认的启动界面。 linux的显示界面分为命令行的字符界面和图形界面&#xff0c;我们可以设置linux的默认启动的显示界面。然后也可以手动的来回的切换。 …