Nacos源码—7.Nacos升级gRPC分析四

大纲

5.服务变动时如何通知订阅的客户端

6.微服务实例信息如何同步集群节点

6.微服务实例信息如何同步集群节点

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

(2)ClientChangedEvent事件的处理源码

(3)集群节点处理数据同步请求的源码

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

ClientChangedEvent事件的作用就是向集群节点同步服务实例数据的。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注册实例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注销实例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;//参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名//参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息//参数meta.getConnectionId():这个参数很关键,它是连接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}...
}@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {this.clientManager = clientManager;}@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {//从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));}//从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//将请求中的instance实例信息封装为InstancePublishInfo对象InstancePublishInfo instanceInfo = getPublishInfo(instance);//往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法client.addServiceInstance(singleton, instanceInfo);//设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间client.setLastUpdatedTime();//发布客户端注册服务实例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));//发布服务实例元数据的事件NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}...
}//Nacos naming client based ip and port.
//The client is bind to the ip and port users registered. It's a abstract content to simulate the tcp session client.
public class IpPortBasedClient extends AbstractClient {...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));}...
}//Abstract implementation of {@code Client}.
public abstract class AbstractClient implements Client {//publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务//存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象//key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);//subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {//服务注册时,如果是第一次put进去Service对象,会返回nullif (null == publishers.put(service, instancePublishInfo)) {//监视器记录MetricsMonitor.incrementInstanceCount();}//发布客户端改变事件,用于处理集群间的数据同步NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;}...
}

(2)ClientChangedEvent事件的处理源码

DistroClientDataProcessor的onEvent()方法会响应ClientChangedEvent。该方法如果判断出事件类型为ClientChangedEvent事件,那么就会执行DistroClientDataProcessor的syncToAllServer()方法,然后调用DistroProtocol的sync()方法进行集群节点同步处理。

DistroProtocol的sync()方法会遍历集群中除自身节点外的其他节点,然后对遍历到的每个节点执行DistroProtocol的syncToTarget()方法。

在DistroProtocol的syncToTarget()方法中,首先把要同步的集群节点targetServer包装成DistroKey对象,然后根据DistroKey对象创建DistroDelayTask延迟任务,接着调用NacosDelayTaskExecuteEngine的addTask()方法,往延迟任务执行引擎的tasks中添加任务。

NacosDelayTaskExecuteEngine在初始化时会启动一个定时任务,这个定时任务会定时执行ProcessRunnable的run()方法。而ProcessRunnable的run()方法会不断从任务池tasks中取出延迟任务处理,处理DistroDelayTask任务时会调用DistroDelayTaskProcessor的process()方法。

在执行DistroDelayTaskProcessor的process()方法时,会先根据DistroDelayTask任务封装一个DistroSyncChangeTask任务,然后调用NacosExecuteTaskExecuteEngine的addTask()方法。也就是调用TaskExecuteWorker的process()方法,将DistroSyncChangeTask任务添加到TaskExecuteWorker的阻塞队列中,同时创建TaskExecuteWorker时会启动线程不断从队列中取出任务处理。因此最终会执行DistroSyncChangeTask的run()方法。

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {private final ClientManager clientManager;private final DistroProtocol distroProtocol;...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//临时实例使用Distro协议,持久化实例使用Raft协议//ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客户端注销实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客户端注册实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}@Component
public class DistroProtocol {private final ServerMemberManager memberManager;private final DistroTaskEngineHolder distroTaskEngineHolder;...//Start to sync by configured delay.public void sync(DistroKey distroKey, DataOperation action) {sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());}//Start to sync data to all remote server.public void sync(DistroKey distroKey, DataOperation action, long delay) {//遍历集群中除自身节点外的其他节点for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}//Start to sync to target server.public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {//先把要同步的集群节点targetServer包装成DistroKey对象DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);//然后根据DistroKey对象创建DistroDelayTask任务DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//接着调用NacosDelayTaskExecuteEngine.addTask()方法//往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTaskdistroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}}...
}//延迟任务执行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {private final ScheduledExecutorService processingExecutor;protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//开启定时任务,即启动ProcessRunnable线程任务processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到任务池中tasks.put(key, newTask);} finally {lock.unlock();}}protected void processTasks() {//获取tasks中所有的任务,然后进行遍历Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通过任务key,获取具体的任务,并且从任务池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//通过任务key获取对应的NacosTaskProcessor延迟任务处理器NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {//ReAdd task if process failed//调用获取到的NacosTaskProcessor延迟任务处理器的process()方法if (!processor.process(task)) {//如果失败了,会重试添加task回tasks这个map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}}...private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}
}//Distro delay task processor.
public class DistroDelayTaskProcessor implements NacosTaskProcessor {...@Overridepublic boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();switch (distroDelayTask.getAction()) {case DELETE://处理客户端注销实例时的延迟任务(同步数据到集群节点)//根据DistroDelayTask任务封装一个DistroSyncTask任务DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);//调用NacosExecuteTaskExecuteEngine.addTask()方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD://处理客户端注册实例时的延迟任务(同步数据到集群节点)//根据DistroDelayTask任务封装一个DistroSyncChangeTask任务DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//调用NacosExecuteTaskExecuteEngine.addTask()方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}}
}//任务执行引擎
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {private final TaskExecuteWorker[] executeWorkers;public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {super(logger);executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];for (int mod = 0; mod < dispatchWorkerCount; ++mod) {executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());}}...@Overridepublic void addTask(Object tag, AbstractExecuteTask task) {//根据tag获取到TaskExecuteWorkerNacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}TaskExecuteWorker worker = getWorker(tag);//调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去worker.process(task);}private TaskExecuteWorker getWorker(Object tag) {int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();return executeWorkers[idx];}    ...
}public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {private final BlockingQueue<Runnable> queue;//任务存储容器public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {...this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);new InnerWorker(name).start();}@Overridepublic boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {//把NacosTask任务放入到阻塞队列中putTask((Runnable) task);}return true;}private void putTask(Runnable task) {try {//把NacosTask任务放入到阻塞队列中queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}}...private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {//一直取阻塞队列中的任务Runnable task = queue.take();long begin = System.currentTimeMillis();//调用NacosTask中的run方法task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[TASK-FAILED] " + e.toString(), e);}}}}
}

执行DistroSyncChangeTask的run()方法,其实就是执行AbstractDistroExecuteTask的run()方法。AbstractDistroExecuteTask的run()方法会先获取请求数据,然后调用DistroClientTransportAgent的syncData()方法同步集群节点,也就是调用ClusterRpcClientProxy的sendRequest()方法发送数据同步请求,最终会调用RpcClient的request()方法 -> GrpcConnection的request()方法。

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {...@Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type = getDistroKey().getResourceType();//获取请求数据DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return;}//默认调用DistroClientTransportAgent.syncData()方法同步集群节点getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}private DistroData getDistroData(String type) {DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null != result) {result.setType(OPERATION);}return result;}...
}public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {...@Overridepublic void run() {//Nacos:Naming:v2:ClientDataString type = getDistroKey().getResourceType();//获取DistroClientTransportAgent对象DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);if (null == transportAgent) {Loggers.DISTRO.warn("No found transport agent for type [{}]", type);return;}Loggers.DISTRO.info("[DISTRO-START] {}", toString());//默认返回trueif (transportAgent.supportCallbackTransport()) {//默认执行子类的doExecuteWithCallback()方法doExecuteWithCallback(new DistroExecuteCallback());} else {executeDistroTask();}}protected abstract void doExecuteWithCallback(DistroCallback callback);...
}public class DistroClientTransportAgent implements DistroTransportAgent {private final ClusterRpcClientProxy clusterRpcClientProxy;private final ServerMemberManager memberManager;...@Overridepublic boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}//创建请求对象DistroDataRequest request = new DistroDataRequest(data, data.getType());//找到集群节点Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {//向集群节点发送RPC异步请求Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;}...
}@Service
public class ClusterRpcClientProxy extends MemberChangeListener {...//send request to member.public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {RpcClient client = RpcClientFactory.getClient(memberClientKey(member));if (client != null) {//调用RpcClient.request()方法return client.request(request, timeoutMills);} else {throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);}}...
}public abstract class RpcClient implements Closeable {//在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,//会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性protected volatile Connection currentConnection;...//send request.public Response request(Request request, long timeoutMills) throws NacosException {int retryTimes = 0;Response response;Exception exceptionThrow = null;long start = System.currentTimeMillis();while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {...//发起gRPC请求,调用GrpcConnection.request()方法response = this.currentConnection.request(request, timeoutMills);...}...}...
}

(3)集群节点处理数据同步请求的源码

通过DistroClientTransportAgent的syncData()方法发送的数据同步请求,会被DistroDataRequestHandler的handle()方法处理。然后会调用DistroDataRequestHandler的handleSyncData()方法,接着调用DistroProtocol的onReceive()方法,于是最终会调用到DistroClientDataProcessor.processData()方法。

在执行DistroClientDataProcessor的processData()方法时,如果是同步服务实例新增、修改后的数据,则执行DistroClientDataProcessor的handlerClientSyncData()方法。该方法会和处理服务注册时一样,发布一个客户端注册服务实例的事件。如果是同步服务实例删除后的数据,则调用EphemeralIpPortClientManager的clientDisconnected()方法。首先移除客户端对象信息,然后发布一个客户端注销服务实例的事件。

其中客户端注销服务实例的事件ClientDisconnectEvent,首先会被ClientServiceIndexesManager的onEvent()方法进行处理,处理时会调用ClientServiceIndexesManager的handleClientDisconnect()方法,移除ClientServiceIndexesManager订阅者列表的元素和注册表的元素。然后会被DistroClientDataProcessor的onEvent()方法进行处理,进行集群节点之间的数据同步。

@Component
public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {private final DistroProtocol distroProtocol;...@Overridepublic DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE://服务实例新增、修改、删除的同步,都会由DistroDataRequestHandler.handleSyncData()方法处理return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}}private DistroDataResponse handleSyncData(DistroData distroData) {DistroDataResponse result = new DistroDataResponse();//调用DistroProtocol.onReceive()方法if (!distroProtocol.onReceive(distroData)) {result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("[DISTRO-FAILED] distro data handle failed");}return result;}...
}@Component
public class DistroProtocol {...//Receive synced distro data, find processor to process.public boolean onReceive(DistroData distroData) {Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());//Nacos:Naming:v2:ClientDataString resourceType = distroData.getDistroKey().getResourceType();//获取DistroClientDataProcessor处理对象DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);return false;}//调用DistroClientDataProcessor.processData()方法return dataProcessor.processData(distroData);}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE://服务实例添加和改变时的执行逻辑ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);handlerClientSyncData(clientSyncData);return true;case DELETE://服务实例删除时的执行逻辑String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);//调用EphemeralIpPortClientManager.clientDisconnected()方法clientManager.clientDisconnected(deleteClientId);return true;default:return false;}}private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());upgradeClient(client, clientSyncData);}private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);//如果和当前不一样才发布事件if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);//发布客户端注册服务实例的事件,与客户端进行服务注册时一样NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}}...
}@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {//key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();...@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);//移除客户端信息IpPortBasedClient client = clients.remove(clientId);if (null == client) {return true;}//发布客户端注销服务实例的事件NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));client.release();return true;}...
}@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {Client client = event.getClient();for (Service each : client.getAllSubscribeService()) {//移除订阅者列表的元素removeSubscriberIndexes(each, client.getClientId());}for (Service each : client.getAllPublishedService()) {//移除注册表的元素removePublisherIndexes(each, client.getClientId());}}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//临时实例使用Distro协议,持久化实例使用Raft协议//ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客户端注销实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客户端注册实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}

(4)总结

一.执行引擎的总结

延时任务执行引擎的实现原理是引擎有一个Map类型的tasks任务池,这个任务池可以根据key映射对应的任务处理器。引擎会定时从任务池中获取任务,执行任务处理器的处理方法处理任务。

任务执行引擎的实现原理是会创建多个任务执行Worker,每个任务执行Worker都会有一个阻塞队列。向任务执行引擎添加任务时会将任务添加到其中一个Woker的阻塞队列中,Worker在初始化时就会启动一个线程不断取出阻塞队列中的任务来处理。所以任务执行引擎会通过阻塞队列 + 异步任务的方式来实现。

二.用于向集群节点同步数据的客户端改变事件的处理流程总结

步骤一:先创建DistroDelayTask延迟任务放入到延迟任务执行引擎的任务池,DistroDelayTask延迟任务会由DistroDelayTaskProcessor处理器处理。

步骤二:DistroDelayTaskProcessor处理时会创建DistroSyncChangeTask任务,然后再将任务分发添加到执行引擎中的任务执行Worker的阻塞队列中。

步骤三:任务执行Worker会从队列中获取并执行DistroSyncChangeTask任务,也就是执行引擎会触发调用AbstractDistroExecuteTask的run()方法,从而调用DistroSyncChangeTask的doExecuteWithCallback()方法。

步骤四:doExecuteWithCallback()方法会获取最新的微服务实例列表,然后通过DistroClientTransportAgent的syncData()方法发送数据同步请求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/82283.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《Overlapping Experiment Infrastructure: More, Better, Faster》论文阅读笔记

文章目录 1 背景2 三个核心概念3 Launch层&#xff1a;特性发布的专用机制4 流量分发策略和条件筛选4.1 四种流量分发类型4.2 条件筛选机制 5 工具链与监控体系6 实验设计原则7 培训参考与推荐 1 背景 谷歌&#xff08;Google&#xff09;以数据驱动著称&#xff0c;几乎所有可…

国芯思辰| 医疗AED可使用2通道24位模拟前端SC2946(ADS1292)

生物电信号监测技术在医疗健康行业中发展迅速&#xff0c;成为评估人体生理健康状况的关键手段。心电&#xff08;ECG&#xff09;、脑电&#xff08;EEG&#xff09;和肌电&#xff08;EMG&#xff09;等信号&#xff0c;通过精密模拟前端芯片捕捉和处理&#xff0c;对医疗诊断…

数据结构【二叉搜索树(BST)】

二叉搜索树 1. 二叉搜索树的概念2. 二叉搜索树的性能分析3.二叉搜索树的插入4. 二叉搜索树的查找5. 二叉搜索树的删除6.二叉搜索树的实现代码7. 二叉搜索树key和key/value使用场景7.1 key搜索场景&#xff1a;7.2 key/value搜索场景&#xff1a; 1. 二叉搜索树的概念 二叉搜索…

RDMA高性能网络通信实践

RDMA高性能网络通信实践 一、背景介绍二、方法设计A.实现方案B.关键技术点 三、代码及注释四、注意事项 一、背景介绍 远程直接内存访问&#xff08;RDMA&#xff09;技术通过绕过操作系统内核和CPU直接访问远程内存&#xff0c;实现了超低延迟、高吞吐量的网络通信。该技术广…

ndarray数组掩码操作,True和False获取数据

#数组掩码的表示方法 def testht05():a np.arange(1,10)mask [True,False,True,True,False,True,False,True,True]print(a[mask]) 另外的用法&#xff1a; #掩码操作获取子集 def testht06():a np.arange(1,100)print(a[a%3 0 & (a%7 0)] )b np.array([A,"B&qu…

索引工具explain

EXPLAIN 是 MySQL 中一个非常有用的工具,用于分析查询的执行计划。通过 EXPLAIN,你可以了解 MySQL 是如何执行查询的,包括它如何使用索引、表的扫描方式等。这有助于优化查询性能。以下是 EXPLAIN 输出的各个字段的详细解释: 基本用法 EXPLAIN SELECT * FROM table_name …

Git回顾

参考视频:【GeekHour】一小时Git教程 一句话定义&#xff1a;Git是一个免费开源的分布式版本控制系统。 版本控制系统可以分为两种&#xff0c;1.集中式&#xff08;SVN&#xff0c;CVS&#xff09;&#xff1b;2.分布式&#xff08;git&#xff09; git的工作区域和文件状态…

python打卡day20

特征降维------特征组合&#xff08;以SVD为例&#xff09; 知识点回顾&#xff1a; 奇异值的应用&#xff1a; 特征降维&#xff1a;对高维数据减小计算量、可视化数据重构&#xff1a;比如重构信号、重构图像&#xff08;可以实现有损压缩&#xff0c;k 越小压缩率越高&#…

GuPPy-v1.2.0安装与使用-生信工具52

GuPPy&#xff1a;Python中用于光纤光度数据分析的免费开源工具 01 背景 Basecalling 是将原始测序信号转换为碱基序列的过程&#xff0c;通俗地说&#xff0c;就是“把碱基识别出来”。这一过程在不同代测序技术中各不相同&#xff1a; 一代测序是通过解析峰图实现&#xff1…

47. 全排列 II

题目 给定一个可包含重复数字的序列 nums &#xff0c;按任意顺序 返回所有不重复的全排列。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,2] 输出&#xff1a; [[1,1,2],[1,2,1],[2,1,1]] 示例 2&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[1,2,3…

ERP系统操作流程,如何快速搭建流程体系

ERP流程图&#xff0c;如何搭建和建立&#xff0c;ERP系统操作流程&#xff0c;ERP系统操作流程图&#xff0c;采购流程&#xff0c;销售流程&#xff0c;仓库流程&#xff0c;MRP流程&#xff0c;PMC流程&#xff0c;财务流程&#xff0c;应收流程&#xff0c;应付流程&#x…

class path resource [] cannot be resolved to absolute file path

问题情景 java应用程序在IDE运行正常&#xff0c;打成jar包后执行却发生异常&#xff1a; java.io.FileNotFoundException: class path resource [cert/sync_signer_pri_test.key] cannot be resolved to absolute file path because it does not reside in the file system:…

19、HashTable(哈希)、位图的实现和布隆过滤器的介绍

一、了解哈希【散列表】 1、哈希的结构 在STL中&#xff0c;HashTable是一个重要的底层数据结构, 无序关联容器包括unordered_set, unordered_map内部都是基于哈希表实现 哈希表又称散列表&#xff0c;一种以「key-value」形式存储数据的数据结构。哈希函数&#xff1a;负责将…

基于 Flask的深度学习模型部署服务端详解

基于 Flask 的深度学习模型部署服务端详解 在深度学习领域&#xff0c;训练出一个高精度的模型只是第一步&#xff0c;将其部署到生产环境中&#xff0c;为实际业务提供服务才是最终目标。本文将详细解析一个基于 Flask 和 PyTorch 的深度学习模型部署服务端代码&#xff0c;帮…

Vue3 + Node.js 实现客服实时聊天系统(WebSocket + Socket.IO 详解)

Node.js 实现客服实时聊天系统&#xff08;WebSocket Socket.IO 详解&#xff09; 一、为什么选择 WebSocket&#xff1f; 想象一下淘宝客服的聊天窗口&#xff1a;你发消息&#xff0c;客服立刻就能看到并回复。这种即时通讯效果是如何实现的呢&#xff1f;我们使用 Vue3 作…

MySQL数据库与表结构操作指南

前言&#xff1a;本文系统梳理MySQL核心操作语句。内容覆盖建库建表、结构调整、数据迁移全流程&#xff08;包含创建/修改/删除/备份场景&#xff09;。希望它们能帮你快速解决问题。 库结构操作 一、库的创建 一个库的简单创建&#xff1a; create database 库名; 注意&am…

【WEB3】区块链、隐私计算、AI和Web3.0——数据民主化(1)

区块链、隐私计算、AI&#xff0c;是未来Web3.0至关重要的三项技术。 1.数据民主化问题 数据在整个生命周期&#xff08;生产、传输、处理、存储&#xff09;内的隐私安全&#xff0c;则是Web3.0在初始阶段首要解决的问题。 数据民主化旨在打破数据垄断&#xff0c;让个体能…

C语言—指针2

1. const 修饰变量 1.1 const修饰变量 变量被const修饰时&#xff0c;变量此时为常变量&#xff0c;本质为常量&#xff0c;语法上不可被修改&#xff0c;但是如果此时需要修改变量值&#xff0c;可以通过指针的方式修改。 虽然此时通过指针的方式确实修改了变量的值&#xff…

高级架构软考之网络OSI网络模型

高级架构软考之网络&#xff1a; 1.OSI网络模型&#xff1a; a.物理层&#xff1a; a.物理传输介质物理连接&#xff0c;负责数据传输&#xff0c;并监控数据 b.传输单位&#xff1a;bit c.协议&#xff1a; d:对应设备&#xff1a;中继器、集线器 b.数据链路层&#xff1a; a.…

el-table计算表头列宽,不换行显示

1、在utils.js中封装renderHeader方法 2、在el-table-column中引入&#xff1a; 3、页面展示&#xff1a;