前言
在软件系统开发中,有时需要将系统元数据放在数据库中,使用时再从数据库中查询。为避免频繁访问数据库,提升系统性能,需要将更新不频繁的数据放到本地缓存中。在元数据变动的时候再更新本地缓存。如果单节点时不存在问题,但如果是在集群环境下,就需要同步更新中所有节点的本地缓存。如何做到多节点缓存同步呢,可使用redis消息队列广播功能,使用Redis订阅一个主题,注册监听,当有数据变更的时候往这个主题发布一个消息,集群中的各个节点都会收到这个消息执行本地缓存的更新操作。
一、订阅主题,注册监听
示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。
@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, UpdateMetaListener listener,HotDataConsumer consumer,HotDataDisabledConsumer disabledConsumer) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);if(cluster){//集群部署元数据缓存同步队列container.addMessageListener((message, pattern) ->listener.processMsg(new String(message.getBody())), new PatternTopic(DsConstant.QUEUE_BROADCAST_META));}log.info("RedisMessageListenerContainer create");return container;}
@Component
@Slf4j
public class UpdateMetaListener {@Autowiredprivate UpdateMetaCacheService metaCacheService;/*** 消费broadcastMeta 队列数据* @param msg*/public void processMsg(String msg){JSONObject json=JSONObject.parseObject(msg);String operator=json.getString("operator");String serviceId=json.getString("id");if("removeServiceCache".equals(operator)){removeServiceCache(serviceId);}if("removeDatasourceCache".equals(operator)){removeDatasourceCache(serviceId);}if("addDatasourceCache".equals(operator)){addDatasourceCache(serviceId);}}public void removeServiceCache(String serviceId) {ServiceMetaVo serviceMetaVo=metaCacheService.removeServiceMetaVo(serviceId);}public void removeDatasourceCache(String dsId) {metaCacheService.removeDataSource(dsId);}public void addDatasourceCache(String dsId) {metaCacheService.addDataSource(dsId);}
}
二、发布消息
该处使用的url网络请求的数据。
public void sendMessage(@PathVariable("serviceId") String serviceId) {JSONObject jsonObject=new JSONObject();jsonObject.put("operator","removeServiceCache");jsonObject.put("id",serviceId);redisTemplate.convertAndSend(DsConstant.QUEUE_BROADCAST_META,jsonObject.toJSONString());}
三、更新缓存
if("removeServiceCache".equals(operator)){removeServiceCache(serviceId);}if("removeDatasourceCache".equals(operator)){removeDatasourceCache(serviceId);}if("addDatasourceCache".equals(operator)){addDatasourceCache(serviceId);}
总结
也可以使用其它消息队列来实现,具体代码请访问
具体代码下载地址