spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅

spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅

大型分布式系统,直接用mq解耦,那么单机系统怎么办,可以考虑用EventBus
用EventBus的好处也是异步解耦,和mq的类似,可以勉强认为是单机版的mq

先解释,后附示例代码(jdk1.8)

EventBus 处理的事情类似观察者模式,基于事件驱动,观察者们监听自己感兴趣的特定事件,进行相应的处理。
流程是这样的:
1、先自定义一个注解@EventBusListener,所有异步实现的类都加这个注解,代表自己是一个监听器
2、再定义一个实现类,就用这个@EventBusListener注解,代表自己是监听器
类中方法用@Subscribe注解,方法入参类和后面写的发送异步或同步消息的类保持一致即可,这个类也叫消息体,是这个消息的注意内容,比如订单号id,消息体最好是自己定义的不同的类
3、核心的类是EventBusCenter,负责将带有@EventBusListener注解的bean注册成一个监听器,并提供发送异步或同步消息的方法入口
4、业务service中注入EventBusCenter,并调用发送异步或同步消息的方法,方法入参是消息体,不同的消息体对应不同的发布订阅,订阅的时候也是根据这个消息体来区分不同类型的消息的

简单的源码解读

核心类是:com.google.common.eventbus.SubscriberRegistry
里面有个静态变量:subscriberMethodsCache
类在初始化的时候,会给这个静态变量复制,拿到所有的@Subscribe注释的方法和对应类的映射关系
然后我们代码中会注册监听器,就把方法和监听器对应上了
至此,方法的入参类型和方法和类的对应关系就知道了,key就是方法参数类型,value就是一组对应的方法和类

用的时候,调用asyncEventBus.post(event);
入参event就是真正的消息体类型,然后会根据这个类型去上面找对应的方法和类,然后获取bean,调用

所以说,这个发布订阅,就是通过消息体类型去唯一识别的方法的。
对比mq的主题概念,这个消息体类型,就可以看成是主题的意思。

下面附示例代码

模拟OrderService中订单创建后,发送订单创建的异步事件,再发送订单修改的异步事件,再发送订单修改的同步事件
订阅端是OrderChangeListener和OrderChangeListener02两个订阅
OrderChangeListener订阅了订单创建和订单修改事件
OrderChangeListener02订阅了订单创建事件

执行流程是:

启动springboot,注册bean的时候遇到EventBusCenter,开始注册OrderChangeListener和OrderChangeListener02为监听器
启动springboot后立即执行FistRun类,里面直接调用订单创建方法,发布订单创建和修改的消息
OrderChangeListener和OrderChangeListener02消费消息,完

<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>22.0</version>
</dependency>
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EventBusListener {
}
@Component
public class EventBusCenter {// 管理同步事件private EventBus syncEventBus = new EventBus();// 管理异步事件private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool());public void postSync(Object event) {syncEventBus.post(event);}public void postAsync(Object event) {asyncEventBus.post(event);}@PostConstructpublic void init() {// 获取所有带有 @EventBusListener 的 bean,将他们注册为监听者List<Object> listeners = SpringContextUtils.getBeansWithAnnotation(EventBusListener.class);for (Object listener : listeners) {asyncEventBus.register(listener);syncEventBus.register(listener);}}
}
@Component
public class SpringContextUtils implements BeanFactoryPostProcessor {private static ConfigurableListableBeanFactory beanFactory;@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {SpringContextUtils.beanFactory = configurableListableBeanFactory;}public static <T> T getBean(String name) throws BeansException {return (T) beanFactory.getBean(name);}public static <T> T getBean(Class<T> clz) throws BeansException {T result = beanFactory.getBean(clz);return result;}public static <T> List<T> getBeansOfType(Class<T> type) {return beanFactory.getBeansOfType(type).entrySet().stream().map(entry -> entry.getValue()).collect(Collectors.toList());}// 上面的例子用到了这个public static List<Object> getBeansWithAnnotation(Class<? extends Annotation> annotationType) {Map<String, Object> beansWithAnnotation = beanFactory.getBeansWithAnnotation(annotationType);// java 8 的写法,将 map 的 value 收集起来到一个 list 中return beansWithAnnotation.entrySet().stream().map(entry -> entry.getValue()).collect(Collectors.toList());// java 7
//        List<Object> result = new ArrayList<>();
//        for (Map.Entry<String, Object> entry : beansWithAnnotation.entrySet()) {
//            result.add(entry.getValue());
//        }
//        return result;}
}
@Data
public class OrderCreatedEvent {private long orderId;private long userId;public OrderCreatedEvent(long orderId, long userId) {this.setOrderId(orderId);this.setUserId(userId);}
}
@Data
public class OrderChangeEvent {private long orderId;private long userId;public OrderChangeEvent(long orderId, long userId) {this.setOrderId(orderId);this.setUserId(userId);}
}
@Component
@EventBusListener
@Slf4j
public class OrderChangeListener {@Subscribepublic void created(OrderCreatedEvent event) throws InterruptedException {long orderId = event.getOrderId();Thread.sleep(300);log.info("订单创建监听,发送短信,orderId=" + orderId);}@Subscribepublic void change(OrderChangeEvent event) throws InterruptedException {long orderId = event.getOrderId();Thread.sleep(200);log.info("订单修改监听,物流变化,orderId=" + orderId);}
}
@Component
@EventBusListener
@Slf4j
public class OrderChangeListener02 {@Subscribepublic void created(OrderCreatedEvent event) {long orderId = event.getOrderId();long userId = event.getUserId();// 订单创建成功后的各种操作,如发短信、发邮件等等。// 注意,事件可以被订阅多次,也就是说可以有很多方法监听 OrderCreatedEvent 事件,// 所以没必要在一个方法中处理发短信、发邮件、更新库存等log.info("订单创建监听02,修改库存,orderId=" + orderId);}}
@Service
@Slf4j
public class OrderService {@Autowiredprivate EventBusCenter eventBusCenter;public void createOrder() throws InterruptedException {// 创建订单// 发送异步事件eventBusCenter.postAsync(new OrderCreatedEvent(1L, 1L));System.out.println("发送异步事件,订单创建");eventBusCenter.postAsync(new OrderChangeEvent(1L, 1L));System.out.println("发送异步事件,订单修改");//发送同步事件Thread.sleep(500);try {System.out.println("发送同步事件,订单修改,开始");eventBusCenter.postSync(new OrderChangeEvent(1L, 1L));System.out.println("发送同步事件,订单修改,结束");} catch (Exception e) {log.error("发送同步事件,抓异常");}}
}
@Component
@Slf4j
@Order(1)
public class FistRun implements CommandLineRunner {@Autowiredprivate OrderService orderService;@Overridepublic void run(String... args) throws Exception {log.info("FistRun start===============");orderService.createOrder();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/499810.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

我们的java项目,要不要废弃switch

java项目中要不要废弃使用switch 先看switch是怎么产生的&#xff1a; 很久以前&#xff0c;计算能力很昂贵&#xff0c;电脑性能很差&#xff0c;人们便想着法子的提高执行效率 先看看if(test0)的处理逻辑&#xff1a; 先是把test的值放在一个寄存器中&#xff0c;然后把0放…

限流算法(漏桶算法、令牌桶算法)对比

限流算法&#xff08;漏桶算法、令牌桶算法&#xff09; 漏桶算法&#xff1a; 有个桶&#xff0c;比如最大能进2个单位的水&#xff08;请求&#xff09;&#xff0c;桶底有个洞&#xff0c;每个单位的水都会在桶里待3秒后漏下去。 那么这个桶就可以同时处理2个单位的水。 如…

guns框架

分享一个框架guns https://gitee.com/naan1993/guns/ 这算是国内比较优秀的框架&#xff0c;简单的套路都有。 权限啊&#xff0c;代码自动生成啊等等 快速搭建一套后台管理项目 下面是作者介绍&#xff1a; Guns基于Spring Boot2&#xff0c;致力于做更简洁的后台管理系统。…

URLDecoder: Illegal hex characters in escape (%) pattern ...

URL中含有%&#xff0c;报错如下&#xff1a; URLDecoder: Illegal hex characters in escape (%) pattern … 解决&#xff1a; uriStr uriStr.replaceAll("%","%25"); 这种情况一般是出现在连接mongoDB数据库的时候&#xff0c;因为要把用户名密码写…

[设计模式] ------ 策略模式实战:java中替代if-else的大段逻辑

java中用策略模式替代if-else的大段逻辑 问题&#xff1a; java本来是一门以面向对象为主的语言&#xff0c;但很多人嘴上说着java面向对象&#xff0c;然后除了表映射实体之外&#xff0c;其他的还是面向过程的思路。 就比如今天要说的&#xff0c;代码中大段大段的if-else判…

mongodb 索引详解

使用springboot连接mongodb的时候&#xff0c;涉及到索引的使用 举例&#xff1a; Document(collection"book") //注释的是复合索引 //CompoundIndexes( // { // CompoundIndex(name "复合索引名字",def "{字段01:1,字段02:…

[转载] --- 让线程按顺序执行8种方法

看到一篇比较用心的总结&#xff0c;涉及到很多知识点&#xff0c;转来保存&#xff0c;而且我把里面的每个方法都试了一遍&#xff0c;亲测没问题 此次转载&#xff0c;还新增了一些说明和结构 我的总结&#xff1a; 其实&#xff0c;让线程按顺序执行&#xff0c;其实就是…

mongodb数据库,批量插入性能测试记录

spring boot 框架下&#xff0c;操作mongodb数据库 maven&#xff1a;spring-data-mongodb:2.1.3.RELEASE mongo数据库用的是本地的mongo&#xff0c;所以环境不一样&#xff0c;可能结果不一样。但趋势应该是一样的。 测试保证每次批量插入时&#xff0c;库里的数据量都是一…

[转载] --- 数据库基本知识

里面的很多点&#xff0c;我之前都总结过&#xff0c;但是感觉这篇把这些都连起来了&#xff0c;总结的挺好&#xff0c;转载保存一下 【从入门到入土】令人脱发的数据库底层设计前言 说到数据库这个词&#xff0c;我只能用爱恨交加这个词来形容它。两年前在自己还单纯懵懂的时…

java中使用lua脚本

第一步&#xff1a; windows下&#xff0c;先下载安装lua&#xff08;其他操作系统自行百度&#xff0c;我只说主要基本的流程&#xff09; 下载地址 我选了lua-5.3.4_Win64_bin.zip为例 第二步&#xff1a; 解压到D盘根路径的lua文件夹中 配置环境变量&#xff0c;增加D:\l…

java中使用lua操作redis

java中使用lua脚本参见我的上一篇文章 lua基础 本篇简单说下java中使用lua操作redis的示例&#xff0c;如下&#xff1a; 先引入jedis <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</…

spring-boot发送邮件失败 AuthenticationFailedException: 535 Authentication Failed

发送邮件失败&#xff0c;平时一直是好的&#xff0c;突然有天开始失败了&#xff0c;最后是发现邮箱密码失效了。。。 有的邮箱&#xff0c;需要定期更改密码。

windows本地项目开机自启动设置

记录下&#xff0c;本地项目开机自启动 vue项目 新建vue.bat文件 echo off :: nodejs安装目录下的nodevars.bat set nodevars "D:\Program Files\nodejs\nodevars.bat" :: 切换到D盘 d: :: 移动到需要启动的目录 cd D:\Users\curry.zhang\IdeaProjects\data-chec…

互联网广告行业(01)------ 初识了解DSP、SSP、ADX

最近有幸接触到公司的一个实时竞价系统&#xff0c;也算是公司的核心系统之一了&#xff0c;增加了很多新的知识&#xff0c;可能有点乱&#xff0c;先总结一波&#xff1a; 广告行业&#xff0c;先介绍概念 广告主&#xff1a;需要打广告的站点&#xff0c;一般就是卖东西的…

互联网广告行业(02)------OpenRTB(实时竞价)规范解读

RTB&#xff1a;(Real Time Bidding实时竞价)&#xff0c;RTB是一种广告交易的方式 OpenRTB&#xff1a;简单理解就是一个行业规范&#xff0c;是一个为了促进RTB方式广告的标准&#xff0c;有对应的api文档&#xff0c;大家都按照这个规范去传参数&#xff0c;那么发送方和接收…

[go]---从java到go(01)---基础与入门上手

为什么用go&#xff0c;就是为了快速响应并且高并发。 一样的逻辑&#xff0c;用java也能实现&#xff0c;但用go可能就比java快点。 如果你很熟练java了&#xff0c;那么学习go就会很快。 go的社区环境相比java没那么大&#xff0c;但一般问题都足够了。 go是谷歌出品&#xf…

[go]---从java到go(02)---一个简单的handler模式的实现

类似于责任链模式吧&#xff0c;不同类实现相同的入参&#xff0c;执行不同的操作&#xff0c;一个执行完再确定要不要执行下一个。 用go实现&#xff1a; 1.定义一个接口 后面所有的handler都要实现这个接口的handler方法 type IHandler interface {/**true 表示通过 false…

[数据库] --- clickhouse

clickhouse是一个列式数据库&#xff08;系统&#xff09;。 官方文档 官网比较全&#xff0c;但也可以说比较杂&#xff0c;下面就是我个人的一些总结&#xff0c;以及在实际工作中的应用场景。 1.clickhouse适用场景 clickhouse主要适合那种大量数据做分析的场景。 一般数据…

错误记录:expected single matching bean but found 2

springboot项目&#xff0c;之前有mysql数据源&#xff0c;现在又新增了clickhouse数据源&#xff0c;于是 新增了一个clickhouseDatasource的配置bean&#xff0c;如下&#xff1a; Beanpublic DataSource dataSource() throws PropertyVetoException {HikariConfig config …

消息队列(5):RocketMQ

介绍 RocketMQ是一款成熟的分布式消息中间件。 由阿里2012年开源&#xff0c;2017年成为Apache顶级项目。 源码是java写的。 高性能&#xff0c;低延迟&#xff0c;高可靠。历经多次双十一大促&#xff0c;整体很稳定。 RocketMQ对比其他mq的优势 对比kafka和Rabbitmq&#…