北京展示型网站建设价格xmlrpc wordpress开启
news/
2025/9/23 8:21:45/
文章来源:
北京展示型网站建设价格,xmlrpc wordpress开启,个人网站设计论文摘要关键词,网站功能优化一、前言 接下来是开展一系列的 SpringCloud 的学习之旅#xff0c;从传统的模块之间调用#xff0c;一步步的升级为 SpringCloud 模块之间的调用#xff0c;此篇文章为第九篇#xff0c;即介绍 Stream 消息驱动。
二、消息驱动概念
2.1 消息驱动是什么 官方定义 Spring …一、前言 接下来是开展一系列的 SpringCloud 的学习之旅从传统的模块之间调用一步步的升级为 SpringCloud 模块之间的调用此篇文章为第九篇即介绍 Stream 消息驱动。
二、消息驱动概念
2.1 消息驱动是什么 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。屏蔽底层消息中间件的差异降低切换成本统一消息的编程模型。 说的通俗一点就是以前你的项目中可能既用到了 rabbitmq又用到了 kafka现在只需要使用一个 Spring Cloud Stream 就可以了。
2.2 官网 官网的地址在这也可以访问它的中文指导手册但是需要注意的是目前仅支持 RabbitMQ 和 Kafka。
2.3 设计思想
2.3.1 标准 mq 对于标准的 mq 来说架构如下图生产者/消费者之间靠消息媒介传递信息内容消息必须走特定的通道 2.3.2 Stream 比方说我们用到了 RabbitMQ 和 Kafka由于这两个消息中间件的架构上的不同像 RabbitMQ 有 exchangekafka 有 Topic 和 Partitions 分区。 这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰我们如果用了两个消息队列的其中一种后面的业务需求我想往另外一种消息队列进行迁移这时候无疑就是一个灾难性的一大堆东西都要重新推倒重新做因为它跟我们的系统耦合了这时候 springcloud Stream 给我们提供了一种解耦合的方式。 2.3.3 Stream 如何实现统一底层差异 Stream 通过定义绑定器 Binder 作为中间层实现了应用程序与消息中间件细节之间的隔离。 在没有绑定器这个概念的之前我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候由于各消息中间件构建的初衷不同它们的实现细节上会有较大的差异性通过定义绑定器作为中间层完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道使得应用程序不需要再考虑各种不同的消息中间件实现。
2.3.4 绑定器 Binder 通过定义绑定器 Binder 作为中间层实现了应用程序与消息中间件细节之间的隔离。 Binder 可以生成 BindingBinding 用来绑定消息容器的生产者和消费者它有两种类型 INPUT 和 OUTPUTINPUT 对应于消费者OUTPUT 对应于生产者。 Stream 中的消息通信方式遵循了发布-订阅模式在 RabbitMQ 就是 Exchange在 Kakfa 中就是 Topic。
2.4 Stream 标准流程套路 1、Binder很方便的连接中间件屏蔽差异 2、Channel通道是队列 Queue 的一种抽象在消息通讯系统中就是实现存储和转发的媒介通过 Channel 对队列进行配置。 3、Source 和 Sink简单的可理解为参照对象是 Spring Cloud Stream 自身从 Stream 发布消息就是输出接受消息就是输入。
2.5 编码 API 和常用注解 组成说明Middleware中间件目前只支持 Rabbitmq 和 KafkaBinderBinder 是应用与消息中间件之间的封装目前实现了 Kafka 和 Rabbitmq 的 Binder通过 Binder可以很方便的连接中间件可以动态的改变消息类型对应于 Kafka 的 topicRabbitmq 的 exchange这些都可以通过配置文件来实现Input注解标识输入通道通过该输入通道接收到的消息进入应用程序Output注解标识输出通道发布的消息将通过该通道离开应用程序StreamListener监听队列用于消费者的队列的消息的接收EnableBinding指信道 channel 和 exchange 绑定在一起
三、消息驱动之生产者
3.1 工程创建 新建一个 cloud-stream-rabbitmq-consumer8801 模块用来充当消息的生产者pom.xml 内容如下所示只有一个依赖比较特殊。
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdcom.springcloud/groupIdartifactIdSpringCloud/artifactIdversion1.0-SNAPSHOT/version/parentartifactIdcloud-stream-rabbitmq-provider8801/artifactIdpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-netflix-eureka-client/artifactId/dependency!--引入stream整合rabbit的依赖如果是kafka则也引入对于的依赖即可--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependency!--基础配置--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-devtools/artifactIdscoperuntime/scopeoptionaltrue/optional/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies/project application.yml 内容如下所示比较特殊的是引入了 stream 的相关配置binders 标签指定继承了哪种消息中间件并配置其连接地址、用户名和密码等。bindings 标签表示要对哪些服务进行整合output 表示这是一个消息的生产者destination 表示的是 rabbitmq 里面的交换机名称binder 绑定的是上面指定的消息中间件。
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为json文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔默认是30秒lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔默认是90秒instance-id: send-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址 主启动类的代码如下
SpringBootApplication
public class StreamMQMain8801
{public static void main(String[] args){SpringApplication.run(StreamMQMain8801.class,args);}
} 创建发送消息 Service 接口和其实现类代码如下
package com.springcloud.service;public interface IMessageProvider {public String send() ;
}
package com.springcloud.service.impl;import com.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;// 第一步可以理解为是一个消息的发送管道的定义消息的生产者用 source消息的消费者用 sink
EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider
{Resourceprivate MessageChannel output; // 第二步引入消息的发送管道Overridepublic String send(){String serial UUID.randomUUID().toString();// 第三步调用 send 方法发送消息this.output.send(MessageBuilder.withPayload(serial).build()); // 创建并发送消息System.out.println(***serial: serial);return serial;}
}创建 controller 类用于远程调用发送消息代码如下
RestController
public class SendMessageController
{Resourceprivate IMessageProvider messageProvider;GetMapping(value /sendMessage)public String sendMessage(){return messageProvider.send();}
}
3.2 测试 启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801 模块在浏览器输入 http://localhost:8801/sendMessage 进行测试多刷新几次浏览器。如下图 可以看到消息成功的被 rabbitmq 接收了。
四、消息驱动之消费者
4.1 工程创建 新建一个 cloud-stream-rabbitmq-consumer8802 模块用来充当消息的消费者pom.xml 内容如下所示只有一个依赖比较特殊。
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdcom.springcloud/groupIdartifactIdSpringCloud/artifactIdversion1.0-SNAPSHOT/version/parentartifactIdcloud-stream-rabbitmq-consumer8802/artifactIdpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-netflix-eureka-client/artifactId/dependency!--引入stream整合rabbit的依赖如果是kafka则也引入对于的依赖即可--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency!--基础配置--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-devtools/artifactIdscoperuntime/scopeoptionaltrue/optional/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies/project application.yml 内容如下所示比较特殊的是引入了 stream 的相关配置binders 标签指定继承了哪种消息中间件并配置其连接地址、用户名和密码等。bindings 标签表示要对哪些服务进行整合input 表示这是一个消息的消费者destination 表示的是 rabbitmq 里面的交换机名称binder 绑定的是上面指定的消息中间件。
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为对象json如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔默认是30秒lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔默认是90秒instance-id: receive-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址 主启动类的代码如下
SpringBootApplication
public class StreamMQMain8802
{public static void main(String[] args){SpringApplication.run(StreamMQMain8802.class,args);}
} 创建 listenr 类用于监听和接收消息代码如下
Component
// 第一步可以理解为是一个消息的发送管道的定义消息的生产者用 source消息的消费者用 sink
EnableBinding(Sink.class)
public class ReceiveMessageListener
{Value(${server.port})private String serverPort;// 第二步使用 StreamListener 注解进行消息的监听和接收StreamListener(Sink.INPUT)public void input(MessageString message){System.out.println(消费者1号-------接收到的消息 message.getPayload()\t port: serverPort);}
}
4.2 测试 接下来测试使用 8801 发送消息看 8802 是否可以正常的接收消息启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801 和 cloud-stream-rabbitmq-consumer8802 模块 在浏览器输入 http://localhost:8801/sendMessage 进行测试 可以看到 8081 发送的消息可以被 8082 正常的消费如下图 五、分组消费与持久化
5.1 工程创建 参照 cloud-stream-rabbitmq-consumer8802 模块我们重新创建一个消费者模块 cloud-stream-rabbitmq-consumer8803可能有点不一样的就是 application.yml我把它的内容粘出来如下
server:port: 8803spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为对象json如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔默认是30秒lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔默认是90秒instance-id: receive-8803.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
5.2 测试 启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块 在浏览器输入 http://localhost:8801/sendMessage 进行测试 可以看到 8081 发送的消息可以既被 8082 又被 8083 消费了如下图 5.3 重复消费问题 目前是生产者发送消息8802 和 8803 同时都收到了存在重复消费问题。 比如在如下场景中订单系统我们做集群部署都会从 RabbitMQ 中获取订单信息那如果一个订单同时被两个服务获取到那么就会造成数据错误我们得避免这种情况。这时我们就可以使用 Stream 中的消息分组来解决。 注意在 Stream 中处于同一个 group 中的多个消费者是竞争关系就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的重复消费同一组内会发生竞争关系只有其中一个可以消费。
5.4 分组
5.4.1 原理 微服务应用放置于同一个 group 中就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的同一个组内会发生竞争关系只有其中一个可以消费。
5.4.2 不同组测试 我们先把 8802 和 8803 都变成不同组进行测试首先修改 8802 的 application.yml添加一个 group 属性如下图 修改 8803 的 application.yml添加一个 group 属性如下图 启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块 在浏览器输入 http://localhost:8801/sendMessage 进行测试 可以得出结论不同组是可以全面消费的重复消费。
5.4.3 同组测试 使用 8802 和 8803 进行同组进行测试在 8802 和 8803 的 application.yml 中设置相同的 group 属性都属于 tansunA 组如下图 启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块 在浏览器输入 http://localhost:8801/sendMessage 进行测试 可以得出结论同一个组的多个微服务实例每次只会有一个拿到。
5.5 持久化 停止 8802 和 8803 并将 8802 的 group 属性去除掉然后让 8801 先发送 4 条消息到 rabbitmq。 先启动 8802无分组属性配置后台没有打出来消息如下图 再启动 8803有分组属性配置后台打出来了 MQ 上的消息 可以得出结论只要你有分组的属性你的数据就不会丢失。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/911891.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!