spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅
大型分布式系统,直接用mq解耦,那么单机系统怎么办,可以考虑用EventBus
用EventBus的好处也是异步解耦,和mq的类似,可以勉强认为是单机版的mq
先解释,后附示例代码(jdk1.8)
EventBus 处理的事情类似观察者模式,基于事件驱动,观察者们监听自己感兴趣的特定事件,进行相应的处理。
流程是这样的:
1、先自定义一个注解@EventBusListener,所有异步实现的类都加这个注解,代表自己是一个监听器
2、再定义一个实现类,就用这个@EventBusListener注解,代表自己是监听器
类中方法用@Subscribe注解,方法入参类和后面写的发送异步或同步消息的类保持一致即可,这个类也叫消息体,是这个消息的注意内容,比如订单号id,消息体最好是自己定义的不同的类
3、核心的类是EventBusCenter,负责将带有@EventBusListener注解的bean注册成一个监听器,并提供发送异步或同步消息的方法入口
4、业务service中注入EventBusCenter,并调用发送异步或同步消息的方法,方法入参是消息体,不同的消息体对应不同的发布订阅,订阅的时候也是根据这个消息体来区分不同类型的消息的
简单的源码解读
核心类是:com.google.common.eventbus.SubscriberRegistry
里面有个静态变量:subscriberMethodsCache
类在初始化的时候,会给这个静态变量复制,拿到所有的@Subscribe注释的方法和对应类的映射关系
然后我们代码中会注册监听器,就把方法和监听器对应上了
至此,方法的入参类型和方法和类的对应关系就知道了,key就是方法参数类型,value就是一组对应的方法和类
用的时候,调用asyncEventBus.post(event);
入参event就是真正的消息体类型,然后会根据这个类型去上面找对应的方法和类,然后获取bean,调用
所以说,这个发布订阅,就是通过消息体类型去唯一识别的方法的。
对比mq的主题概念,这个消息体类型,就可以看成是主题的意思。
下面附示例代码
模拟OrderService中订单创建后,发送订单创建的异步事件,再发送订单修改的异步事件,再发送订单修改的同步事件
订阅端是OrderChangeListener和OrderChangeListener02两个订阅
OrderChangeListener订阅了订单创建和订单修改事件
OrderChangeListener02订阅了订单创建事件
执行流程是:
启动springboot,注册bean的时候遇到EventBusCenter,开始注册OrderChangeListener和OrderChangeListener02为监听器
启动springboot后立即执行FistRun类,里面直接调用订单创建方法,发布订单创建和修改的消息
OrderChangeListener和OrderChangeListener02消费消息,完
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>22.0</version>
</dependency>
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EventBusListener {
}
@Component
public class EventBusCenter {// 管理同步事件private EventBus syncEventBus = new EventBus();// 管理异步事件private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool());public void postSync(Object event) {syncEventBus.post(event);}public void postAsync(Object event) {asyncEventBus.post(event);}@PostConstructpublic void init() {// 获取所有带有 @EventBusListener 的 bean,将他们注册为监听者List<Object> listeners = SpringContextUtils.getBeansWithAnnotation(EventBusListener.class);for (Object listener : listeners) {asyncEventBus.register(listener);syncEventBus.register(listener);}}
}
@Component
public class SpringContextUtils implements BeanFactoryPostProcessor {private static ConfigurableListableBeanFactory beanFactory;@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {SpringContextUtils.beanFactory = configurableListableBeanFactory;}public static <T> T getBean(String name) throws BeansException {return (T) beanFactory.getBean(name);}public static <T> T getBean(Class<T> clz) throws BeansException {T result = beanFactory.getBean(clz);return result;}public static <T> List<T> getBeansOfType(Class<T> type) {return beanFactory.getBeansOfType(type).entrySet().stream().map(entry -> entry.getValue()).collect(Collectors.toList());}// 上面的例子用到了这个public static List<Object> getBeansWithAnnotation(Class<? extends Annotation> annotationType) {Map<String, Object> beansWithAnnotation = beanFactory.getBeansWithAnnotation(annotationType);// java 8 的写法,将 map 的 value 收集起来到一个 list 中return beansWithAnnotation.entrySet().stream().map(entry -> entry.getValue()).collect(Collectors.toList());// java 7
// List<Object> result = new ArrayList<>();
// for (Map.Entry<String, Object> entry : beansWithAnnotation.entrySet()) {
// result.add(entry.getValue());
// }
// return result;}
}
@Data
public class OrderCreatedEvent {private long orderId;private long userId;public OrderCreatedEvent(long orderId, long userId) {this.setOrderId(orderId);this.setUserId(userId);}
}
@Data
public class OrderChangeEvent {private long orderId;private long userId;public OrderChangeEvent(long orderId, long userId) {this.setOrderId(orderId);this.setUserId(userId);}
}
@Component
@EventBusListener
@Slf4j
public class OrderChangeListener {@Subscribepublic void created(OrderCreatedEvent event) throws InterruptedException {long orderId = event.getOrderId();Thread.sleep(300);log.info("订单创建监听,发送短信,orderId=" + orderId);}@Subscribepublic void change(OrderChangeEvent event) throws InterruptedException {long orderId = event.getOrderId();Thread.sleep(200);log.info("订单修改监听,物流变化,orderId=" + orderId);}
}
@Component
@EventBusListener
@Slf4j
public class OrderChangeListener02 {@Subscribepublic void created(OrderCreatedEvent event) {long orderId = event.getOrderId();long userId = event.getUserId();// 订单创建成功后的各种操作,如发短信、发邮件等等。// 注意,事件可以被订阅多次,也就是说可以有很多方法监听 OrderCreatedEvent 事件,// 所以没必要在一个方法中处理发短信、发邮件、更新库存等log.info("订单创建监听02,修改库存,orderId=" + orderId);}}
@Service
@Slf4j
public class OrderService {@Autowiredprivate EventBusCenter eventBusCenter;public void createOrder() throws InterruptedException {// 创建订单// 发送异步事件eventBusCenter.postAsync(new OrderCreatedEvent(1L, 1L));System.out.println("发送异步事件,订单创建");eventBusCenter.postAsync(new OrderChangeEvent(1L, 1L));System.out.println("发送异步事件,订单修改");//发送同步事件Thread.sleep(500);try {System.out.println("发送同步事件,订单修改,开始");eventBusCenter.postSync(new OrderChangeEvent(1L, 1L));System.out.println("发送同步事件,订单修改,结束");} catch (Exception e) {log.error("发送同步事件,抓异常");}}
}
@Component
@Slf4j
@Order(1)
public class FistRun implements CommandLineRunner {@Autowiredprivate OrderService orderService;@Overridepublic void run(String... args) throws Exception {log.info("FistRun start===============");orderService.createOrder();}
}