Nacos源码—5.Nacos配置中心实现分析二

大纲

1.关于Nacos配置中心的几个问题

2.Nacos如何整合SpringBoot读取远程配置

3.Nacos加载读取远程配置数据的源码分析

4.客户端如何感知远程配置数据的变更

5.集群架构下节点间如何同步配置数据

4.客户端如何感知远程配置数据的变更

(1)ConfigService对象使用介绍

(2)客户端注册监听器的源码

(3)回调监听器的方法的源码

(1)ConfigService对象使用介绍

ConfigService是一个接口,定义了获取配置、发布配置、移除配置等方法。ConfigService只有一个实现类NacosConfigService,Nacos配置中心源码的核心其实就是这个NacosConfigService对象。

步骤一:手动创建ConfigService对象

首先定义好基本的Nacos信息,然后利用NacosFactory工厂类来创建ConfigService对象。

public class Demo {public static void main(String[] args) throws Exception {//步骤一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步骤一:获取配置中心服务ConfigService configService = NacosFactory.createConfigService(properties);}
}

步骤二:获取配置、发布配置

创建好ConfigService对象后,就可以使用ConfigService对象的getConfig()方法来获取配置信息,还可以使用ConfigService对象的publishConfig()方法来发布配置信息。

如下Demo先获取一次配置数据,然后发布新配置,紧接着重新获取数据。发现第二次获取的配置数据已发生变化,从而也说明发布配置成功了。

public class Demo {public static void main(String[] args) throws Exception {//步骤一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步骤一:获取配置中心服务ConfigService configService = NacosFactory.createConfigService(properties);//步骤二:从配置中心获取配置String content = configService.getConfig(dataId, group, 5000);System.out.println("发布配置前" + content);//步骤二:发布配置configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());Thread.sleep(300L);//步骤二:从配置中心获取配置content = configService.getConfig(dataId, group, 5000);System.out.println("发布配置后" + content);}
}

步骤三:添加监听器

可以使用ConfigService对象的addListener()方法来添加监听器。通过dataId + group这两个参数,就可以注册一个监听器。当dataId + group对应的配置在服务端发生改变时,客户端的监听器就可以马上感知并对配置数据进行刷新。

public class Demo {public static void main(String[] args) throws Exception {//步骤一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步骤一:获取配置中心服务ConfigService configService = NacosFactory.createConfigService(properties);//步骤二:从配置中心获取配置String content = configService.getConfig(dataId, group, 5000);System.out.println("发布配置前" + content);//步骤二:发布配置configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());Thread.sleep(300L);//步骤二:从配置中心获取配置content = configService.getConfig(dataId, group, 5000);System.out.println("发布配置后" + content);//步骤三:注册监听器configService.addListener(dataId, group, new Listener() {@Overridepublic void receiveConfigInfo(String configInfo) {System.out.println("感知配置变化:" + configInfo);}@Overridepublic Executor getExecutor() {return null;}});//阻断进程关闭Thread.sleep(Integer.MAX_VALUE);}
}

(2)客户端注册监听器的源码

Nacos客户端是什么时候为dataId + group注册监听器的?

在nacos-config下的spring.factories文件中,有一个自动装配的配置类NacosConfigAutoConfiguration,在该配置类中定义了一个NacosContextRefresher对象,而NacosContextRefresher对象会监听ApplicationReadyEvent事件。

在NacosContextRefresher的onApplicationEvent()方法中,会执行registerNacosListenersForApplications()方法,这个方法中会遍历每一个dataId + group注册Nacos监听器。

对于每一个dataId + group,则通过调用registerNacosListener()方法来进行Nacos监听器的注册,也就是最终调用ConfigService对象的addListener()方法来注册监听器。

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigAutoConfiguration {...@Beanpublic NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);}...
}public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {private final ConfigService configService;...@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {//many Spring contextif (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}}//register Nacos Listeners.private void registerNacosListenersForApplications() {if (isRefreshEnabled()) {//获取全部的配置for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) {//判断当前配置是否需要刷新if (!propertySource.isRefreshable()) {continue;}String dataId = propertySource.getDataId();//注册监听器registerNacosListener(propertySource.getGroup(), dataId);}}}private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group, String configInfo) {//监听器的回调方法处理逻辑refreshCountIncrement();//记录刷新历史nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);//发布RefreshEvent刷新事件applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));}}});try {//注册监听器configService.addListener(dataKey, groupKey, listener);} catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), e);}}...
}

(3)回调监听器的方法的源码

给每一个dataId + group注册Nacos监听器后,当Nacos服务端的配置文件发生变更时,就会回调监听器的方法,也就是会触发调用AbstractSharedListener的innerReceive()方法。然后调用applicationContext.publishEvent()发布RefreshEvent刷新事件,而发布的RefreshEvent刷新事件会被RefreshEventListener类来处理。

RefreshEventListener类不是Nacos中的类了,而是SpringCloud的类。它在处理刷新事件时,会销毁被@RefreshScope注解修饰的类的Bean,也就是会调用添加了@RefreshScope注解的类的destroy()方法。把Bean实例销毁后,后面需要用到这个Bean时才重新进行创建。重新进行创建的时候,就会获取最新的配置文件,从而完成刷新效果。

(4)总结

客户端注册Nacos监听器,服务端修改配置后,客户端刷新配置的流程:

5.集群架构下节点间如何同步配置数据

(1)Nacos控制台的配置管理模块

(2)变更配置数据时的源码

(3)集群节点间的配置数据变更同步

(4)服务端通知客户端配置数据已变更

(5)总结

(1)Nacos控制台的配置管理模块

在这个模块中,可以通过配置列表维护我们的配置文件,可以通过历史版本找到配置的发布记录,并且支持回滚操作。当编辑配置文件时,客户端可以及时感知变化并刷新其配置文件。当服务端通知客户端配置变更时,也会通知集群节点进行数据同步。

当用户在Nacos控制台点击确认发布按钮时,Nacos会大概进行如下处理:

一.修改配置文件数据

二.保存配置发布历史

三.通知并触发客户端监听事件进行配置文件变更

四.通知集群对配置文件进行变更

点击确认发布按钮时,会发起HTTP请求,地址为"/nacos/v1/cs/configs"。通过请求地址可知处理入口是ConfigController的publishConfig()方法。

(2)变更配置数据时的源码

ConfigController的publishConfig()方法中的两行核心代码是:一.新增或修改配置数据的PersistService的insertOrUpdate()方法,二.发布配置变更事件的ConfigChangePublisher的notifyConfigChange()方法。

一.新增或者修改配置数据

其中PersistService有两个实现类:一是EmbeddedStoragePersistServiceImpl,它是Nacos内置的Derby数据库。二是ExternalStoragePersistServiceImpl,它是Nacos外置数据库如MySQL。

在ExternalStoragePersistServiceImpl的insertOrUpdate()方法中,如果执行ExternalStoragePersistServiceImpl的updateConfigInfo()方法,那么会先查询对应的配置,然后更新配置,最后保存配置历史。

@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {private final PersistService persistService;...@PostMapping@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {final String srcIp = RequestUtil.getRemoteIp(request);final String requestIpApp = RequestUtil.getAppName(request);srcUser = RequestUtil.getSrcUserName(request);//check typeif (!ConfigType.isValidType(type)) {type = ConfigType.getDefaultType().getType();}//check tenantParamUtils.checkTenant(tenant);ParamUtils.checkParam(dataId, group, "datumId", content);ParamUtils.checkParam(tag);Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);ParamUtils.checkParam(configAdvanceInfo);if (AggrWhitelist.isAggrDataId(dataId)) {LOGGER.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request), dataId, group);throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");}final Timestamp time = TimeUtils.getCurrentTime();String betaIps = request.getHeader("betaIps");ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);configInfo.setType(type);if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {//新增配置或者修改配置persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);//发布配置改变事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);//发布配置改变事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}} else {//beta publishpersistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);//发布配置改变事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));}ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content);return true;}...
}//External Storage Persist Service.
@SuppressWarnings(value = {"PMD.MethodReturnWrapperTypeRule", "checkstyle:linelength"})
@Conditional(value = ConditionOnExternalStorage.class)
@Component
public class ExternalStoragePersistServiceImpl implements PersistService {private DataSourceService dataSourceService;...@Overridepublic void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time, Map<String, Object> configAdvanceInfo, boolean notify) {try {addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);} catch (DataIntegrityViolationException ive) { // Unique constraint conflictupdateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);}}@Overridepublic void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser, final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {boolean result = tjt.execute(status -> {try {//查询已存在的配置数据ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());String appNameTmp = oldConfigInfo.getAppName();if (configInfo.getAppName() == null) {configInfo.setAppName(appNameTmp);}//更新配置数据updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");if (configTags != null) {// delete all tags and then recreateremoveTagByIdAtomic(oldConfigInfo.getId());addConfigTagsRelation(oldConfigInfo.getId(), configTags, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());}//保存到发布配置历史表insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);throw e;}return Boolean.TRUE;});}@Overridepublic ConfigInfo findConfigInfo(final String dataId, final String group, final String tenant) {final String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant;try {return this.jt.queryForObject("SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?", new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_ROW_MAPPER);} catch (EmptyResultDataAccessException e) { // Indicates that the data does not exist, returns null.return null;} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);throw e;}}...
}

二.发布配置变更事件

执行ConfigChangePublisher的notifyConfigChange()方法发布配置变更事件时,最终会把事件添加到DefaultPublisher.queue阻塞队列中,完成事件发布。

NotifyCenter在其静态方法中,会创建DefaultPublisher并进行初始化。在执行DefaultPublisher的init()方法时,就会开启一个异步任务。该异步任务便会不断从阻塞队列DefaultPublisher.queue中获取事件,然后调用DefaultPublisher的receiveEvent()方法处理配置变更事件。

在DefaultPublisher的receiveEvent()方法中,会循环遍历事件订阅者。其中就会包括来自客户端,以及来自集群节点的两个订阅者。前者会通知客户端发生了配置变更事件,后者会通知各集群节点发生了配置变更事件。而且进行事件通知时,都会调用DefaultPublisher的notifySubscriber()方法。该方法会异步执行订阅者的监听逻辑,也就是subscriber.onEvent()方法。

具体的subscriber订阅者有:用来通知集群节点进行数据同步的订阅者AsyncNotifyService,用来通知客户端处理配置文件变更的订阅者LongPollingService。

事件发布机制的实现简单总结:发布者需要一个Set存放注册的订阅者,发布者发布事件时,需要遍历调用订阅者处理事件的方法。

public class ConfigChangePublisher {//Notify ConfigChange.public static void notifyConfigChange(ConfigDataChangeEvent event) {if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {return;}NotifyCenter.publishEvent(event);}
}//Unified Event Notify Center.
public class NotifyCenter {static {...try {// Create and init DefaultSharePublisher instance.INSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);        } catch (Throwable ex) {LOGGER.error("Service class newInstance has error : {}", ex);}ThreadUtils.addShutdownHook(new Runnable() {@Overridepublic void run() {shutdown();}});}//注册订阅者public static <T> void registerSubscriber(final Subscriber consumer) {...addSubscriber(consumer, subscribeType);}private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);//执行DefaultPublisher.addSubscriber()方法publisher.addSubscriber(consumer);}...//Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is actually published.public static boolean publishEvent(final Event event) {try {return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : {}", ex);return false;}}//Request publisher publish event Publishers load lazily, calling publisher.private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {//执行DefaultPublisher.publish()方法return publisher.publish(event);}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}...
}//The default event publisher implementation.
public class DefaultPublisher extends Thread implements EventPublisher {protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();private BlockingQueue<Event> queue;...@Overridepublic void addSubscriber(Subscriber subscriber) {//注册事件订阅者subscribers.add(subscriber);}@Overridepublic boolean publish(Event event) {checkIsStart();//将事件添加到阻塞队列,则表示已完成事件发布boolean success = this.queue.offer(event);if (!success) {LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);receiveEvent(event);return true;}return true;}@Overridepublic void init(Class<? extends Event> type, int bufferSize) {setDaemon(true);setName("nacos.publisher-" + type.getName());this.eventType = type;this.queueMaxSize = bufferSize;this.queue = new ArrayBlockingQueue<Event>(bufferSize);start();}@Overridepublic synchronized void start() {if (!initialized) {//执行线程的run()方法,start just called oncesuper.start();if (queueMaxSize == -1) {queueMaxSize = ringBufferSize;}initialized = true;}}@Overridepublic void run() {openEventHandler();}void openEventHandler() {try {//This variable is defined to resolve the problem which message overstock in the queue.int waitTimes = 60;//To ensure that messages are not lost, enable EventHandler when waiting for the first Subscriber to registerfor (; ;) {if (shutdown || hasSubscriber() || waitTimes <= 0) {break;}ThreadUtils.sleep(1000L);waitTimes--;}for (; ;) {if (shutdown) {break;}final Event event = queue.take();receiveEvent(event);UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error("Event listener exception : {}", ex);}}//Receive and notifySubscriber to process the event.void receiveEvent(Event event) {final long currentEventSequence = event.sequence();//循环遍历事件的订阅者for (Subscriber subscriber : subscribers) {// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());continue;}//通知事件订阅者notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);final Runnable job = new Runnable() {@Overridepublic void run() {//异步执行订阅者的监听逻辑subscriber.onEvent(event);}};final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception : {}", e);}}}...
}

(3)集群节点间的配置数据变更同步

核心处理方法便是AsyncNotifyService的onEvent()方法。该方法首先会获取集群节点列表,然后遍历集群列表构造通知任务NotifySingleTask,接着把通知任务NotifySingleTask添加到队列queue当中,最后根据通知任务队列queue封装一个异步任务提交到线程池去处理,也就是异步任务AsyncTask的run()方法会处理通知任务NotifySingleTask。

在异步任务AsyncTask的run()方法中,会一直从queue中获取通知任务,以便将配置数据同步到对应的集群节点。具体就是在while循环中,首先获得通知任务中对应的集群节点的IP地址。然后判断该集群节点的IP是否在当前节点的配置中,并且是否是健康状态。如果该集群节点不健康,则放入队列并将队列提交给异步任务来延迟处理。如果该集群节点是健康状态,则通过HTTP方式发起配置数据的同步,地址是"/v1/cs/communication/dataChange"。

@Service
public class AsyncNotifyService {...@Autowiredpublic AsyncNotifyService(ServerMemberManager memberManager) {this.memberManager = memberManager;//Register ConfigDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);//Register A Subscriber to subscribe ConfigDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {//配置中心数据变更,同步其他集群节点数据if (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;//获取集群节点列表Collection<Member> ipList = memberManager.allMembers();Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();//遍历集群列表构造通知任务NotifySingleTask去同步数据for (Member member : ipList) {//把通知任务NotifySingleTask添加到队列queue当中queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta));}//根据通知任务队列Queue<NotifySingleTask>,封装一个异步任务AsyncTask,提交到线程池执行ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));}}@Overridepublic Class<? extends Event> subscribeType() {return ConfigDataChangeEvent.class;}});}...class AsyncTask implements Runnable {private Queue<NotifySingleTask> queue;private NacosAsyncRestTemplate restTemplate;public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {this.restTemplate = restTemplate;this.queue = queue;}@Overridepublic void run() {executeAsyncInvoke();}private void executeAsyncInvoke() {while (!queue.isEmpty()) {//一直从queue队列中获取通知任务,以便将配置数据同步到对应的集群节点NotifySingleTask task = queue.poll();//获取通知任务中对应的集群节点的IP地址String targetIp = task.getTargetIP();if (memberManager.hasMember(targetIp)) {//start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify//判断该集群节点的ip是否在当前节点的配置中,并且是否是健康状态boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);if (unHealthNeedDelay) {//target ip is unhealthy, then put it in the notification list//如果该集群节点不健康,则放入另外一个队列,同样会将队列提交给异步任务,然后延迟处理ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);//get delay time and set fail count to the taskasyncTaskExecute(task);} else {//如果该集群节点是健康状态,则通过HTTP方式发起配置数据的同步Header header = Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());if (task.isBeta) {header.addParam("isBeta", "true");}AuthHeaderUtil.addIdentityToHeader(header);//通过HTTP方式发起配置数据的同步,请求的HTTP地址:/v1/cs/communication/dataChangerestTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));}}}}}private void asyncTaskExecute(NotifySingleTask task) {int delay = getDelayTime(task);Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();queue.add(task);AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);//提交异步任务给线程池延迟执行ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);}
}

当集群节点处理"/v1/cs/communication/dataChange"这个HTTP请求时,会调用CommunicationController的notifyConfigInfo()方法,接着调用DumpService的dump()方法将请求包装成DumpTask同步数据任务,然后调用TaskManager的addTask()方法将DumpTask同步数据任务放入map。

TaskManager的父类NacosDelayTaskExecuteEngine在初始化时,会开启一个异步任务执行ProcessRunnable的run()方法,也就是会不断从map中取出DumpTask同步数据任务,然后调用DumpProcessor的process()方法处理具体的配置数据同步逻辑。也就是查询数据库最新的配置,然后持久化配置数据到磁盘上,从而完成集群之间配置数据的同步。

@RestController
@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)
public class CommunicationController {private final DumpService dumpService;...@GetMapping("/dataChange")public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,@RequestParam("group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "tag", required = false) String tag) {dataId = dataId.trim();group = group.trim();String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);String isBetaStr = request.getHeader("isBeta");if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);} else {dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);}return true;}...
}public abstract class DumpService {private TaskManager dumpTaskMgr;public DumpService(PersistService persistService, ServerMemberManager memberManager) {...this.processor = new DumpProcessor(this);this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");this.dumpTaskMgr.setDefaultTaskProcessor(processor);...}...public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));}...
}public final class TaskManager extends NacosDelayTaskExecuteEngine implements TaskManagerMBean {...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {super.addTask(key, newTask);MetricsMonitor.getDumpTaskMonitor().set(tasks.size());}...
}public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//开启延时任务processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到ConcurrentHashMap中tasks.put(key, newTask);} finally {lock.unlock();}}...private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}@Overridepublic Collection<Object> getAllTaskKeys() {Collection<Object> keys = new HashSet<Object>();lock.lock();try {keys.addAll(tasks.keySet());} finally {lock.unlock();}return keys;}protected void processTasks() {//获取tasks中所有的任务,然后进行遍历Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通过任务key,获取具体的任务,并且从任务池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//DumpService在初始化时会设置TaskManager的默认processor是DumpProcessor//根据taskKey获取NacosTaskProcessor延迟任务处理器:DumpProcessorNacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {//ReAdd task if process failed//调用DumpProcessor.process()方法if (!processor.process(task)) {//如果失败了,会重试添加task回tasks这个map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error : " + e.toString(), e);retryFailedTask(taskKey, task);}}}
}

(4)服务端通知客户端配置数据已变更

服务端通知客户端配置文件变更的方法是LongPollingService.onEvent()。

由前面客户端如何感知远程配置数据的变更可知,Nacos客户端启动时:会调用ConfigService的addListener()方法为每个dataId + group添加一个监听器。而NacosConfigService初始化时会创建ClientWorker对象,此时会开启多个长连接任务即执行LongPollingRunnable的run()方法。

执行LongPollingRunnable的run()方法时,会触发执行ClientWorker的checkUpdateDataIds()方法,该方法最后会调用服务端的"/v1/cs/configs/listener"接口,将当前客户端添加到LongPollingService的allSubs属性中。

这样当以后dataId + group的配置发生变更时,服务端会触发执行LongPollingService的onEvent()方法,然后遍历LongPollingService.allSubs属性通知客户端配置已变更。

客户端收到变更事件通知后,会将最新的配置刷新到容器中,同时将@RefreshScope注解修饰的Bean从缓存中删除。这样再次访问这些Bean,就会重新创建Bean,从而读取到最新的配置。

public class NacosConfigService implements ConfigService {//long polling.private final ClientWorker worker;...public NacosConfigService(Properties properties) throws NacosException {...this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);}...
}//Long polling.
public class ClientWorker implements Closeable {public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {...this.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);}//Check config info.public void checkConfigInfo() {//Dispatch taskes.int listenerSize = cacheMap.size();//Round up the longingTaskCount.int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {//The task list is no order.So it maybe has issues when changing.//执行长连接任务:LongPollingRunnable.run()executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}...class LongPollingRunnable implements Runnable {...@Overridepublic void run() {...//check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);...}}//Fetch the dataId list from server.List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {...return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}//Fetch the updated dataId list from server.List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {...try {//In order to prevent the server from handling the delay of the client's long task, increase the client's read timeout to avoid this problem.long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);//发起HTTP请求:/v1/cs/configs/listener,将客户端添加到LongPollingService.allSubs属性中HttpRestResult<String> result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs);...} catch (Exception e) {...}return Collections.emptyList();}
}@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {private final ConfigServletInner inner;...@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {...//do long-pollinginner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}...
}@Service
public class ConfigServletInner {...//轮询接口.public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {//Long polling.if (LongPollingService.isSupportLongPolling(request)) {longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}...}...
}@Service
public class LongPollingService {//客户端长轮询订阅者final Queue<ClientLongPolling> allSubs;...public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) {...//添加订阅者ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}class ClientLongPolling implements Runnable {@Overridepublic void run() {...allSubs.add(this);}...}...public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();...NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// Ignore.} else {if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent) event;//触发执行DataChangeTask.run()方法ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}...});}class DataChangeTask implements Runnable {@Overridepublic void run() {try {ConfigCacheService.getContentBetaMd5(groupKey);//遍历订阅了配置变更事件的客户端for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();if (clientSub.clientMd5Map.containsKey(groupKey)) {...getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // Delete subscribers' relationships.//发送服务端数据变更的响应给客户端clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));}}}...
}

(5)总结

一.配置中心数据变更同步集群节点的整体逻辑

当在Nacos后台变更配置数据后:首先自身节点会把最新的配置数据更新到数据库中,并且添加变更历史。然后利用事件发布订阅机制来通知订阅者,其中订阅者AsyncNotifyService会通过HTTP方式来通知其他集群节点。当其他集群节点收到通知后,会重新查询数据库最新的配置数据。然后持久化到磁盘上,因为获取配置数据的接口是直接读磁盘文件的。集群节点的配置数据同步完成后,还要通知客户端配置数据已变更。

二.服务端通知客户端配置数据已变更

在客户端给dataId + group添加监听器后,会和服务端建立一个长轮询,所以另外一个订阅者LongPollingService会通过长轮询通知客户端。也就是会遍历每一个客户端,通过长轮询向客户端进行响应。最终会调用到客户端监听器的回调方法,从而去刷新客户端的配置Bean。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/904491.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电力MOSFET的专用集成驱动电路IR2233

IR2233是IR2133/IR2233/IR2235 系列驱动芯片中的一种,是专为高电压、高速度的电力MOSFET和IGBT驱动而设计的。该系列驱动芯片内部集成了互相独立的三组板桥驱动电路,可对上下桥臂提供死区时间,特别适合于三相电源变换等方面的应用。其内部集成了独立的运算放大器可通过外部桥…

六级阅读———2024.12卷一 仔细阅读2

文章 An awakening has been taking place in the physical world against the beauty model that has been dictated to us for years.But in the digital arena,social media determines what is considered beautiful.(51) The two opposing struggles are taking place i…

【C/C++】errno/strerror 和 GetLastError()/FormatMessage 的区别

strerror 和 errno 详解 printf("Error: %s\n", strerror(errno));这行代码用于在 C 语言中输出系统错误信息&#xff0c;但它与 Windows 的 GetLastError() 有重要区别。下面我将详细解释每个部分及其工作原理。 1. 组件解析 errno 定义&#xff1a;errno 是一个…

Unicode和UTF - 8主要有以下区别

Unicode和UTF - 8主要有以下区别 概念范畴 Unicode:是字符集 。它为世界上几乎所有的字符(包括各国文字、标点符号、特殊符号等)分配了唯一的编号,这个编号也叫码位、码点,比如“中”字的Unicode码点是U+4E2D 。它规定了字符的抽象表示,只关注字符与数字编号的对应关系,…

企业数字化转型第二课:接受不完美(1/2)

一.引言 先看一组中国企业数字化转型相关的数据&#xff1a; 战略认知层面&#xff1a;92%中国企业将数字化纳入战略核心&#xff08;麦肯锡2023&#xff09;执行困境层面&#xff1a;63%企业转型首年遭遇重大挫折&#xff08;BCG 2024追踪&#xff09;价值释放周期&#xff1…

OSCP - Proving Grounds - Sumo

主要知识点 ShellShock漏洞dirtycow提权 具体步骤 执行nmap扫描,比较直观&#xff0c;22和80端口开放&#xff0c;但是80端口没有什么内容 Nmap scan report for 192.168.210.87 Host is up (0.44s latency). Not shown: 65533 closed tcp ports (reset) PORT STATE SERV…

pyqt写一个TCP(UDP)检测工具

先用电脑连接到目标WIFI&#xff0c;再运行以下代码。 import sys from PyQt5.QtWidgets import * from PyQt5.QtCore import * from PyQt5.QtNetwork import *class NetTestTool(QWidget):def __init__(self):super().__init__()self.init_ui()self.tcp_socket QTcpSocket()…

趣味编程:梦幻万花筒

目录 1.效果展示 2.源码展示 3.代码逻辑详解 3.1 头文件与宏定义 3.2 HSV函数转RGB颜色函数 3.3 主函数 初始化部分 循环部分 线条绘制部分 刷新和延时部分 结束部分 4.小结 本篇博客主要介绍趣味编程用C语言实现万花筒小程序。 1.效果展示 2.源码展示 #define…

软件开发各阶段的自动化测试技术详解

引言 在当今快速迭代的软件开发环境中&#xff0c;自动化测试已成为保证软件质量、提高测试效率的重要手段。本文将深入探讨软件开发生命周期各个阶段的自动化测试技术&#xff0c;包括单元测试、代码级集成测试、Web Service测试和GUI测试的自动化实现方法。 单元测试的自动…

Elasticsearch:我们如何在全球范围内实现支付基础设施的现代化?

作者&#xff1a;来自 Elastic Kelly Manrique SWIFT 和 Elastic 如何应对基础设施复杂性、误报问题以及日益增长的合规要求。 金融服务公司在全球范围内管理实时支付方面面临前所未有的挑战。SWIFT&#xff08;Society for Worldwide Interbank Financial Telecommunication -…

day009-用户管理专题

文章目录 1. 创建包含时间的文件2. 与用户相关的文件3. 用户分类4. 与用户相关的命令4.1 添加用户4.2 删除用户4.3 查看用户4.4 修改用户密码 5. sudo6. 思维导图7. 老男孩思想-学习方法 1. 创建包含时间的文件 或$()是替换符号&#xff0c;可以将命令的结果作为字符串或变量的…

shell脚本实现远程重启多个服务器

直接deepseek帮写脚本 remoteReboot.sh #!/bin/bash # 配置文件路径&#xff08;格式&#xff1a;每行一个服务器地址&#xff09; SERVER_FILE"servers.list" # 读取服务器列表 mapfile -t SERVERS < "$SERVER_FILE" for server in "${SERVER…

如何利用 QuickAPI 生成 PostgreSQL 样本测试数据:全面解析与实用指南

目录 一、什么是 QuickAPI&#xff1f; 二、为什么需要生成样本测试数据&#xff1f; 三、如何在 QuickAPI 中生成 PostgreSQL 样本测试数据&#xff1f; 1. 登录 QuickAPI 平台 2. 选择 PostgreSQL 数据库和目标表 3. 配置样本数据生成规则 4. 导出或直接插入数据 四、…

黑马点评day04(分布式锁-setnx)

4、分布式锁 4.1 、基本原理和实现方式对比 分布式锁&#xff1a;满足分布式系统或集群模式下多进程可见并且互斥的锁。 分布式锁的核心思想就是让大家都使用同一把锁&#xff0c;只要大家使用的是同一把锁&#xff0c;那么我们就能锁住线程&#xff0c;不让线程并行&#x…

‌人工智能在农作物病虫害识别中的应用前景分析

近年来&#xff0c;全球气候变化加剧、农业种植规模化发展&#xff0c;农作物病虫害对粮食安全的威胁日益凸显。据统计&#xff0c;全球每年因病虫害造成的农作物损失约占总产量的20%-40%&#xff0c;而传统依赖人工经验的防治方式效率低、成本高&#xff0c;难以满足现代农业需…

C++ 完美转发

C 完美转发逐步详解 1. 问题背景与核心目标 在 C 模板编程中&#xff0c;若直接将参数传递给其他函数&#xff0c;参数的 值类别&#xff08;左值/右值&#xff09;和 类型信息&#xff08;如 const&#xff09;可能会丢失。例如&#xff1a; template<typename T> voi…

Midjourney 绘画 + AI 配音:组合玩法打造爆款短视频!

一、引言:AI 重构短视频创作范式 在某短视频工作室的深夜剪辑室里,资深编导正在为一条古风剧情视频发愁:预算有限无法实拍敦煌场景,人工绘制分镜耗时 3 天,配音演员档期排到一周后。而使用 Midjourney 生成敦煌壁画风格的场景图仅需 15 分钟,AI 配音工具实时生成多角色台…

AI基础知识(02):机器学习的任务类型、学习方式、工作流程

03 机器学习(Machine Learning)的任务类型与学习方式 广义的机器学习主要是一个研究如何让计算机通过数据学习规律,并利用这些规律进行预测和决策的过程。这里的Machine并非物理意义上的机器,可以理解为计算机软硬件组织;Learning可以理解为一个系统或平台经历了某些过程…

数据结构、刷leetcode返航版--二分5/7

1.排序 快排&#xff1a; 第一章 基础算法&#xff08;一&#xff09; - AcWing 如何调整范围 经典二分 递归结束条件&#xff1b;条件满足时&#xff0c;进行处理&#xff1b;递归左边&#xff0c;递归右边 分界点划分可以是l,r,(lr)/2,但是如果是选l&#xff0c;比如是1…

LeetCode 267:回文排列 II —— Swift 解法全解析

文章目录 摘要描述题解答案题解代码分析统计字符频率判断是否可能构成回文构建半边字符数组回溯生成半边排列 示例测试及结果时间复杂度空间复杂度实际使用场景&#xff1a;回文排列在真实项目里能干啥&#xff1f;文本处理、数据清洗类系统游戏开发&#xff1a;名字合法性验证…