springcloud 相同服务名_SpringCloud系列之SpringCloud Stream

SpringCloud Stream

技术兴起的原因:为了解决系统中不同中间件的适配问题,出现了cloud stream,采用适配绑定的方式,自动给不同的MQ之间进行切换。

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。

应用程序通过inputs(消费者)或者outputs(生产者)来与Spring Cloud Stream中binder对象交互。通过我们配置来绑定,而Spring Cloud Stream的binder对象负责与消息中间件交互。

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动配置,引用了发布、订阅、消费、分区的三个核心概念。

官方版本目前仅仅支持RabbitMQ和Kafka。

MQ相关术语

Message:生产者/消费者之间靠消息媒介传递信息内容

MessageChannel:消息必须走特定的通道

消息通道的子接口SubscribableChannel,由MessageHandle消息处理器所订阅。

相关注解

Middleware:中间件,目前只支持RabbitMQ和Kafka

Binder:应用层和消息中间件之间的封装,实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型,这些可以通过配置文件修改。

Input:表示输入通道,消息进入该通道传到应用程序。

Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。

StreamListener:监听队列,用于消费者的队列的消息接收。

EnableBinding:将信道channel和exchange绑定在一起。

首先创建一个provider,服务提供者rabbitmq-provider8801

导入依赖

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-actuator

org.springframework.cloud

spring-cloud-starter-netflix-eureka-client

org.springframework.cloud

spring-cloud-starter-stream-rabbit

org.springframework.boot

spring-boot-devtools

runtime

true

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

编写配置文件application.yml

server:

port: 8801

spring:

application:

name: cloud-stream-provider

cloud:

stream:

binders: # 在此处配置要绑定的rabbitmq的服务信息;

defaultRabbit: # 表示定义的名称,用于于binding整合

type: rabbit # 消息组件类型

environment: # 设置rabbitmq的相关的环境配置

spring:

rabbitmq:

host: 192.168.31.52 #rabbitmq服务启动所在机器的IP地址

port: 5672

username: guest

password: guest

bindings: # 服务的整合处理

output: # 这个名字是一个通道的名称

destination: studyExchange # 表示要使用的Exchange名称定义

content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”

binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:

client: # 客户端进行Eureka注册的配置

service-url:

defaultZone: http://localhost:7001/eureka

instance:

lease-renewal-interval-in-seconds: 2# 设置心跳的时间间隔(默认是30秒)

lease-expiration-duration-in-seconds: 5# 如果现在超过了5秒的间隔(默认是90秒)

instance-id: send-8801.com # 在信息列表时显示主机名称

prefer-ip-address: true# 访问的路径变为IP地址

编写一个发送数据的接口IMessageProvider

public interface IMessageProvider {

String sendMessage();

}

接口的实现类IMessageProviderImpl

@EnableBinding(Source.class) //定义消息的推送管道

public class IMessageProviderImpl implements IMessageProvider

{

@Resource

private MessageChannel output; // 消息发送管道

@Override

public String sendMessage()

{

String serial = UUID.randomUUID().toString();

output.send(MessageBuilder.withPayload(serial).build());

System.out.println("*****serial: "+serial);

return null;

}

}

controller层下的SendMessageController

@RestController

public class SendMessageController {

@Autowired

private IMessageProvider iMessageProvider;

@GetMapping(value = "/sendMessage")

public String send(){

return iMessageProvider.sendMessage();

}

}

启动Eureka7001,启动服务提供者8801.启动虚拟机上的RabbitMQ

记得把虚拟机防火墙关了。

[hadoop@centos7 bin]$ systemctl stop firewalld

[hadoop@centos7 bin]$ systemctl status firewalld

然后测试一下服务提供者是否正常运行。

控制台输出UUID。

然后再创建一个服务消费者,在MQ的另一端进行消费消息。

创建另一个模块,cloud-stream-rabbitmq-consumer8802

导入依赖

org.springframework.boot

spring-boot-starter-web

org.springframework.cloud

spring-cloud-starter-netflix-eureka-client

org.springframework.cloud

spring-cloud-starter-stream-rabbit

org.springframework.boot

spring-boot-starter-actuator

org.springframework.boot

spring-boot-devtools

runtime

true

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

和上一个服务提供者的依赖一样。

写配置文件application.yml

server:

port: 8802

spring:

application:

name: cloud-stream-consumer

cloud:

stream:

binders: # 在此处配置要绑定的rabbitmq的服务信息;

defaultRabbit: # 表示定义的名称,用于于binding整合

type: rabbit # 消息组件类型

environment: # 设置rabbitmq的相关的环境配置

spring:

rabbitmq:

host: 192.168.31.52

port: 5672

username: guest

password: guest

bindings: # 服务的整合处理

input: # 这个名字是一个通道的名称

destination: studyExchange # 表示要使用的Exchange名称定义

content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”

binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:

client: # 客户端进行Eureka注册的配置

service-url:

defaultZone: http://localhost:7001/eureka

instance:

lease-renewal-interval-in-seconds: 2# 设置心跳的时间间隔(默认是30秒)

lease-expiration-duration-in-seconds: 5# 如果现在超过了5秒的间隔(默认是90秒)

instance-id: receive-8802.com # 在信息列表时显示主机名称

prefer-ip-address: true# 访问的路径变为IP地址

创建一个消费者的ReceiveMessageController

@Component

@EnableBinding(Sink.class)

public class ReceiveMessageController {

@Value("${server.port}")

private String serverPort;

@StreamListener(Sink.INPUT)

public void input(Message message){

System.out.println("message = "+message.getPayload()+"\t"+"serverPort= "+serverPort);

}

}

如果消费者成功接收消息,则在控制台输出产生的UUID和端口号。

启动Eureka7001,启动服务提供者8801,启动服务消费者8802,还有MQ。

在Eureka中可以看到两个服务已经启动。

每次请求http://localhost:8801/sendMessage;消费者都能输出结果,输出的UUID与提供者的一致。

登录RabbitMQ的web管理,可以看到我们新建的exchange,并且可以查看消息队列中的请求次数的情况。

发送的消息除了可以是字符串类型还可以发送对象,在消费者接受数据的时候,会将实体转换成JSON字符串。

配置文件中,如果你使用的消息中间件是kafka,type: kafka;environment是设置消息中间件的配置信息,端口,主机地址,用户名,密码等,可以设置多个binder,适应不同的场景。

重复消费问题

默认情况下,每个消费者的分组名都是随机的,不同的,对于不同的组会引起重复消费的问题,例如:消息提供者只向消息队列中发送了一个消息,正常情况下,消费者A从队列中拿走之后,消费者B不能再获得相同的消息,但是由于AB是不同的组,所以A和B都会获取相同的消息,这就导致了资源被重复消费。

微服务应用放置到同一个group中,就能够保证消息只会被其中应用消费一次,不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

同一个应用的不同微服务,只用在配置文件中指定相同的group。

再次发送消息时,只有消费者其中一个能消费。避免了重复消费。

消息持久化

当两个消费者A和B,A设置了group属性值,B没有设置,这时,消费者全部宕机,但是消息生产者一直响MQ中生产消息,这时候重启A和B两者有什么区别呢?

正因为B没有这时分组,B再次启动后不会再向MQ中取数据,而A启动成功后可以正常消费消息队列中的消息。

因此设置了group的消费者,可以保证消息队列中的消息持久化,group对于消费者来讲很重要,既能避免重复消费,又能在消费者重启后依然可以消费消息队列中未消费的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/259903.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机意外重启或遇错误无法继续,计算机意外地重新启动或遇到错误如何解决?...

电脑小白在重装系统后难免会遇到些问题,有的容易处理,有的会有些棘手。那么,计算机意外地重新启动或遇到错误如何解决?今天快启动小编为大家分享详细的计算机意外地重新启动或遇到错误的解决方法,献给对系统重装知识不太了解的小…

jqueryui的Tooltip使用方法

http://api.jqueryui.com/tooltip/#option-position,详细使用方法。 http://jqueryui.com/tooltip/,DEMO。 content使用 $( ".selector" ).tooltip({ content: "Awesome title!" });//div及相关标签使用样式,鼠标放上去时…

iOS 开发者账号共用发布证书 (Distribution)问题

苹果客服回复: 1.第一台申请发布证书的电脑,从钥匙串中导出发布证书(Distribution)颁发的request文件?然后在第二台电脑上用request文件新生成一个Distribution证书,这个是可以共用的?(不理解还是理解错了&…

JMeter web 测试

2019独角兽企业重金招聘Python工程师标准>>> JMeter web 测试 http://jmeter.apache.org/usermanual/build-web-test-plan.html 转载于:https://my.oschina.net/276172622/blog/808957

scala 连接oracle_一分钟教你学会用java连接Oracle数据库

package java_jdbc;//java连接Oracle数据库import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;public class JdbcOracleTest {public static void main(String[] args) {// TODO Auto-generated method stub//1.…

计算机软件记不住设置,想知道电脑密码记不住了怎么办

635509517回答数:23216 | 被采纳数:32017-01-09 17:51:10方法一:(1)启动电脑,使用DOS启动盘(比如:Windows 98启动盘)进入纯DOS状态。(2)在DOS提示符下,根据下面步骤操作:cd\\ (切换到根目录)c…

vue-cli#2.0 webpack 配置分析

目录结构: ├── README.md ├── build │ ├── build.js │ ├── check-versions.js │ ├── dev-client.js │ ├── dev-server.js │ ├── utils.js │ ├── webpack.base.conf.js │ ├── webpack.dev.conf.js │ └── webpack.prod.conf.js…

initWithNibName与viewDidLoad的执行关系以及顺序

一个ViewController,一般通过init或initWithNibName来加载。二者没有什么不同,init最终还是要调用initWithNibName方法(除非这个ViewController没有nib文件)。 我们经常在initWithNibName方法中初始化视图,变量或者其他…

120xa正反转参数_你知道变频器的“正反转死区时间”吗?它的“停机方式”有几种?...

若你我之间有缘,关注作者又何妨?两情若是久长时,又岂在朝朝暮暮。大家好!我是江郎,一个踏踏实实的维修工。本期我们仍然探讨两个问题,如标题所述,#变频器#“死区时间”和“停机方式”&#xff0…

【转】游戏编程中的人工智能技术--神经网络

原文:http://blog.csdn.net/ecitnet/article/details/1799444 游戏编程中的人工智能技术.>. (连载之一)用平常语言介绍神经网络(Neural Networks in Plain English)因为我们没有很好了解大脑,我们经常试图用最新的技术作为一种模型来解释它。在我童年…

w8计算机配置要求,win8系统最低配置要求有哪些|win8系统是否有最低配置要求-系统城...

2013-10-17 17:08:08  浏览量:5753小编这里要为大家带来的是win8系统最低配置要求和部分安装截图,很多用户想要将自己的电脑装上win8,但也不是每一台电脑都可以安装win8系统的,为了避免一些低配置的用户安装了win8之后却无法运行…

Session 丢失问题

项目从.NET Framework3.5 升级 .NET Framework4.0后,如果用Response.Redirect();进行页面的跳转,服务端会把这个跳转动作当作是一个“新”的用户去访问网页。 而这个时候,会给这个“新”的用户一个SessionID,那造成的结果是&#…

财务管理专业应该报计算机二级哪个科目,我是应该报计算机二级还是三级呢

2008-12-01怎样学好财务管理?“五步”学好财务管理:学习这门课程前,首先就不要认为它“很难”,只要相信“难而不会,会而不难”,充满信心一定就能学好。我在学习过程中总结了几条经验,以供各位学友参考&…

libsvm java 调用说明

libsvm是著名的SVM开源组件,目前有JAVA.C/C,.NET 等多个版本,本人使用的是2.9libsvm命名空间下主要使用类:svm_model 为模型类,通过训练或加载训练好的模型文件获得svm_parameter 为参数类,主要为支持向量机设定参数&a…

java字符串排序_对字符串排序持一种宽容的心态

在Java中一涉及中文处理就会冒出很多问题来,其中排序也是一个让人头疼的课题,我们来看下面的代码:上面的代码定义一个数组,然后进行升序排序,我们期望的结果是按照拼音升序排列,即为李四、王五、张三&#…

rails开发随手记-0

helper默认是只在view中可用的,如果在controller中也要使用,要在ApplicationController中 include 如果model中如果有叫做type的列的话,会触发rails的Single Table Inheritance ,放弃它吧,不好用,还是安心使…

nagios 监控配置介绍(二)

#配置服务端监控客户端[rootnagios etc]# cd objects/[rootnagios objects]# vi hosts.cfg# Define a host for the local machinedefine host{use linux-serverhost_name 1.3-sambaalias 1.3-sambaaddress …

spoj SUBLEX (Lexicographical Substring Search) RE的欢迎来看看

SPOJ.com - Problem SUBLEX 这么裸的一个SAM,放在了死破OJ上面就是个坑。 注意用SAM做的时候输出要用一个数组存下来,然后再puts,不然一个一个字符输出会更慢。 还有一个就是不要多数据输入,估计最后多了几个没用的数字&#xff0…

mt4双线macd_3年内从亏损90多万到获利近760万,我只坚持我的:60分钟MACD双回拉战法!附选股公式...

MACD指标被普遍认为是最经典实用的技术指标之一。其实并不是因为MACD有多么精妙的算法,而是MACD遵循了最基本的“均线指导原则”,形象的将经典双均线系统换了一种更加直观的表达方式。在MT4中,默认应用的是单线MACD指标,而在证券市…