建设银行官方网站个人深圳市建设交易中心官网
news/
2025/9/22 21:01:23/
文章来源:
建设银行官方网站个人,深圳市建设交易中心官网,厦门找一家做网站的公司,洛阳网站建设 恒凯科技应用场景
有的时候#xff0c;我们对于同一通道中的消息处理#xff0c;会通过判断头信息或者消息内容来做一些差异化处理#xff0c;比如#xff1a;可能在消息头信息中带入消息版本号#xff0c;然后通过if判断来执行不同的处理逻辑#xff0c;其代码结构可能是这样的…应用场景
有的时候我们对于同一通道中的消息处理会通过判断头信息或者消息内容来做一些差异化处理比如可能在消息头信息中带入消息版本号然后通过if判断来执行不同的处理逻辑其代码结构可能是这样的
StreamListener(value TestTopic.INPUT)public void receiveV1(String payload, Header(version) String version) { if(1.0.equals(version)) { // Version 1.0 } if(2.0.equals(version)) { // Version 2.0 }}那么当消息处理逻辑复杂的时候这段逻辑就会变得特别复杂。针对这个问题在StreamListener注解中提供了一个不错的属性condition可以用来优化这样的处理结构。
动手试试
下面通过编写一个简单的例子来具体体会一下这个属性的用法
EnableBinding(TestApplication.TestTopic.class)SpringBootApplicationpublic class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } RestController static class TestController { Autowired private TestTopic testTopic; /** * 消息生产接口 * * param message * return */ GetMapping(/sendMessage) public String messageWithMQ(RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).setHeader(version, 1.0).build()); testTopic.output().send(MessageBuilder.withPayload(message).setHeader(version, 2.0).build()); return ok; } } /** * 消息消费逻辑 */ Slf4j Component static class TestListener { StreamListener(value TestTopic.INPUT, condition headers[version]1.0) public void receiveV1(String payload, Header(version) String version) { log.info(Received v1 : payload , version); } StreamListener(value TestTopic.INPUT, condition headers[version]2.0) public void receiveV2(String payload, Header(version) String version) { log.info(Received v2 : payload , version); } } interface TestTopic { String OUTPUT example-topic-output; String INPUT example-topic-input; Output(OUTPUT) MessageChannel output(); Input(INPUT) SubscribableChannel input(); }}内容很简单既包含了消息的生产也包含了消息消费。在/sendMessage接口的定义中发送了两条消息一条消息的头信息中包含version1.0另外一条消息的头信息中包含version2.0。在消息监听类TestListener中对TestTopic.INPUT通道定义了两个StreamListener这两个监听逻辑有不同的condition这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。
在启动应用之前还要记得配置一下输入输出通道对应的物理目标exchange或topic名比如
spring.cloud.stream.bindings.example-topic-input.destinationtest-topicspring.cloud.stream.bindings.example-topic-input.groupstream-content-routespring.cloud.stream.bindings.example-topic-output.destinationtest-topic完成了上面配置之后就可以启动应用并尝试访问localhost:8080/sendMessage?messagehello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志
2018-12-24 15:50:33.361 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v1 : hello, 1.02018-12-24 15:50:33.363 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v2 : hello, 2.0从日志中可以看到两条带有不同头信息的消息分别通过不同的监听处理逻辑输出了对应的日志打印。
代码示例
本文示例读者可以通过查看下面仓库的中的stream-content-route项目
GithubGitee
如果您对这些感兴趣欢迎star、follow、收藏、转发给予支持
以下专题教程也许您会有兴趣
Spring Boot基础教程Spring Cloud基础教程
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/910360.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!