莱芜建设网站图片合成器在线制作
莱芜建设网站,图片合成器在线制作,建设工程资质证书二维码扫描网站,软件开发模型有哪些各有什么特点概述
不同中间件#xff0c;有各自的使用方法#xff0c;代码也不一样。 可以使用Spring Cloud Stream解耦#xff0c;切换中间件时#xff0c;不需要修改代码。实现方式为使用绑定层#xff0c;绑定层对生产者和消费者提供统一的编码方式#xff0c;需要连接不同的中间…概述
不同中间件有各自的使用方法代码也不一样。 可以使用Spring Cloud Stream解耦切换中间件时不需要修改代码。实现方式为使用绑定层绑定层对生产者和消费者提供统一的编码方式需要连接不同的中间件时绑定层使用不同的绑定器即可也就是把切换中间件需要做相应的修改工作交给绑定层来做。 本文的操作是在 微服务调用链路追踪 的基础上进行。 环境说明
jdk1.8
maven3.6.3
mysql8
spring cloud2021.0.8
spring boot2.7.12
idea2022
rabbitmq3.12.4 步骤
消息生产者 创建子模块stream_producer 添加依赖 dependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependency/dependencies
刷新依赖 配置application.yml
server:port: 7001
spring:application:name: stream_producerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:destination: my-default #指定消息发送的目的地值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit 查看Source.class源码 编写生产者代码发送一条消息hello world到rabbitmq的my-default exchange中
package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;EnableBinding(Source.class)
SpringBootApplication
public class StreamProductApplication implements CommandLineRunner {Autowiredprivate MessageChannel output;Overridepublic void run(String... args) throws Exception {//发送消息// messageBuilder 工具类创建消息output.send(MessageBuilder.withPayload(hello world).build());}public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}} 查看rabbitmq web UI http://localhost:15672/ 看到Exchanges中还没有my-default 运行StreamProductApplication 刷新rabbitmq Web UI看到了my-dafault的exchange 消息消费者
创建子模块stream_consumer
添加依赖 dependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependency/dependencies 配置application.yml
server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit查看内置通道名称为input 编写消息消费者启动类在启动类监听接收消息
package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;SpringBootApplication
EnableBinding(Sink.class)
public class StreamConsumerApplication {StreamListener(Sink.INPUT)public void input(MessageString message){System.out.println(监听收到 message.getPayload());}public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}
}
运行stream_consumer消费者服务监听消息 运行stream_producer生产者服务发送消息 查看消费者服务控制台日志接收到了消息 优化代码
之前把生产和消费的消息都写在启动类中了代码耦合高。
优化思路是把不同功能的代码分开放。 消息生产者
stream_producer 代码结构如下 package org.example.stream.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
Component
EnableBinding(Source.class)
public class MessageSender {Autowiredprivate MessageChannel output;//通道//发送消息public void send(Object obj){output.send(MessageBuilder.withPayload(obj).build());}
}修改启动类
package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;SpringBootApplication
public class StreamProductApplication {public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}} pom.xml添加junit依赖
dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope
/dependency
刷新依赖 编写测试类
在stream_producerm模块的src/test目录下新建org.example.stream包再建出ProducerTest类代码如下
package org.example.stream;import org.example.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;RunWith(SpringJUnit4ClassRunner.class)
SpringBootTest
public class ProducerTest {Autowiredprivate MessageSender messageSender;//注入发送消息工具类Testpublic void testSend(){messageSender.send(hello world);}
} 消息消费者
stream_consumer代码结构如下 添加MessageListener类获取消息
package org.example.stream.consumer;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;Component
EnableBinding(Sink.class)
public class MessageListener {// 监听binding中的信息StreamListener(Sink.INPUT)public void input(String message){System.out.println(获取信息 message);}
} 修改启动类
package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;SpringBootApplication
public class StreamConsumerApplication {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}} 启动consumer接收消息
执行producer单元测试类ProducerTest的testSend()方法发送消息
查看consumer控制台输出接收到信息了 代码解耦后同样能成功生产消息和消费消息。 自定义消息通道
此前使用默认的消息通道output和input。
也可以自己定义消息通道例如myoutput和myinput
消息生产者
在org.example.stream包下新建channel包在channel包下新建MyProcessor接口类
package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产这的配置*/String MYOUTPUT myoutput;Output(myoutput)MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT myinput;Input(myinput)SubscribableChannel myinput();
}
修改MessageSender类
package org.example.stream.producer;import org.example.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
Component
EnableBinding(MyProcessor.class)
public class MessageSender {Autowiredprivate MessageChannel myoutput;//通道//发送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload(obj).build());}
}
修改application.yml
cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地myoutput:destination: custom-output 消息消费者
在stream_consumer服务的org.example.stream包下新建channel包在channel包下新建MyProcessor接口类
package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产者的配置*/String MYOUTPUT myoutput;Output(myoutput)MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT myinput;Input(myinput)SubscribableChannel myinput();
}
修改MessageListener类
package org.example.stream.stream;import org.example.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;Component
EnableBinding(MyProcessor.class)
public class MessageListener {// 监听binding中的信息StreamListener(MyProcessor.MYINPUT)public void input(String message){System.out.println(获取信息 message);}
}
修改application.yml配置
cloud:stream:bindings:input: #内置获取消息的通道从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地myinput:destination: custom-output 测试
启动stream_consumer
运行单元测试的testSend()方法生产消息
查看stream_consumer控制台能看到生产的消息如下
获取信息hello world 消息分组
采用复制配置方式运行两个消费者
启动第一个消费者端口为7002
修改端口为7003copy configuration再启动另一个消费者
执行生产者单元测试生产消息看到两个消费者都接收到了信息 说明如果有两个消费者生产一条消息后两个消费者均能收到信息。 但当我们发送一条消息只需要其中一个消费者消费消息时这时候就需要用到消息分组发送一条消息消费者组内只有一个消费者消费到。
我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可 重启两个消费者
修改端口号为7002重新启动第一个消费者
修改端口号为7003重新启动第二个消费者 生产者生产一条消息
查看消费者接收消息情况只有一个消费者接收到信息。 消息分区
消息分区就是实现特定消息只往特定机器发送。 修改生产者配置 cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地值为rabbit的exchange的名称myoutput:destination: custom-outputproducer:partition-key-expression: payload #分区关键字 可以是对象中的id或对象partition-count: 2 #分区数量 修改消费者1的application.yml配置
server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组有多个消费者时只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 0 #当前消费者的索引 启动消费者1 修改消费者2的配置
server:port: 7003
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组有多个消费者时只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 1 #当前消费者的索引
修改端口号为7003当前消费者的索引instance-index的值修改为1 启动消费者2 生产者发送消息看到只有Application(2)接收到消息 再用生产者发送一次消息也是Application(2)接收到消息 说明实现了消息分区
也可以更改发送的数据看是否能发送到不同消费者
修改生产者发送数据由hello world变为hello world1同时发送5次 public void testSend(){for (int i 0; i 5; i) {messageSender.send(hello world1);}}
看到hello world1全部被Application消费 所以消息分区是根据发送的消息不同发送到不同消费者中。 完成enjoy it!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/89513.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!