Nacos源码—4.Nacos集群高可用分析三

大纲

6.CAP原则与Raft协议

7.Nacos实现的Raft协议是如何写入数据的

8.Nacos实现的Raft协议是如何选举Leader节点的

9.Nacos实现的Raft协议是如何同步数据的

10.Nacos如何实现Raft协议的简版总结

6.CAP原则与Raft协议

(1)CAP分别指的是什么

(2)什么是分区以及容错

(3)为什么不能同时满足CAP原则

(4)Raft协议定义节点的三种状态

(5)Raft协议的数据同步流程

(6)Raft协议的Leader选举流程

(7)Raft协议如何解决脑裂问题

(8)总结

(1)CAP分别指的是什么

一.C指的是一致性Consistency

各个集群节点之间的数据,必须要保证一致。

二.A指的是可用性Availability

在分布式架构中,每个请求都能在合理的时间内获得符合预期的响应。

三.P指的是分区容错性Partition Tolerance

当集群节点间出现网络问题,整个系统依然能正常提供服务。

在CAP原则中,我们首先要保证P即分区容错性。

(2)什么是分区以及容错

分区指的是网络分区。如果在分布式架构中,出现了网络通信问题。比如节点A可以和节点B相互通信,但是不能和节点C、D进行通信。但是节点C、D之间是可以通信的,这种情况下就是出现了网络分区。

容错是指在分布式架构中,集群节点出现分区情况时,整个系统仍然要保持对外提供服务的能力,不能因为网络分区而导致整个系统不能对外提供服务。

在CAP原则下:由于P是首要保证的,所以C、A就不能兼得,必须要舍弃其一。因此需要根据业务来权衡,是更注重可用性、还是更加注重一致性。

(3)为什么不能同时满足CAP原则

首先前提条件是,需要满足P。

情况一:假设在分布式集群中选择使用CP架构,更加注重数据的一致性。这时出现了网络分区,节点A、B与节点C之间网络不互通。如果此时向集群写入一个数据,由于节点A、B能够网络互通,所以节点A、B写入的数据可以相互同步,但是节点C没办法做数据同步。那么在这种情况下,如何才能保证数据的一致性呢?

此时只能将节点C暂时看作不可用的状态,等网络恢复和数据同步好了,节点C才能正常地提供服务。否则下一次用户向集群请求获取数据时,请求到了节点C。但由于网络分区导致节点C并未同步数据,那么本次查询就查不到数据,这样就达不到CP架构的一致性要求了。所以肯定需要舍弃节点C的可用性。

情况二:假设在分布式集群中选择使用AP架构,更加注重数据的可用性。这时出现了网络分区,节点A、B与节点C之间网络不互通。虽然节点C暂时由于网络不通的原因,无法进行数据同步。但是由于集群更加注重服务的可用性,所以节点C还是可以正常提供服务。只是节点C和节点A、B之间的数据略有差异,但不影响节点的正常使用。所以就需要舍弃节点C的数据一致性。

在AP架构中,集群节点间的数据也需要同步。集群节点数据的同步一般都是通过一些异步任务来保证数据的最终一致性,只是同步时效没有那么及时。

在CP架构中,可以通过Raft协议实现数据一致性。Raft协议就是在分布式架构下,多节点保证数据一致性的协议。

(4)Raft协议定义节点的三种状态

Raft协议对集群节点定义了三种状态:

一.Follower追随者

这是默认的状态,所有的集群节点一开始都是Follower状态。

二.Candidate候选者

当某集群节点开始发起投票选举Leader时,首先会投给自己一票,这时就会从Follower状态变成Candidate状态。

三.Leader领导者

当某集群节点获得了大多数集群节点的投票,那么就会变成Leader状态。

(5)Raft协议的数据同步流程

一.Raft协议是如何处理数据写入请求

在Raft协议中,只有Leader节点才会处理客户端数据的写入请求。如果非Leader节点收到了写入请求,会转发到Leader节点上进行处理。

数据的写入一共有两个状态:uncommit和commit。这个两个状态对应于两阶段提交,可以保证数据正确写入成功。

当Leader节点接收到一个数据写入请求时:首先会在自身的节点进行数据处理,然后马上同步给集群的其他节点,此时Leader节点的这个数据的状态是uncommit状态。只有当半数以上的其他节点写入成功,Leader节点才会把数据写入成功。当Leader节点最终把数据写入成功后,会通知其他节点进行commit,此时Leader节点的这个数据的状态是commit状态。

二.非Leader节点写入失败如何处理

由于Leader节点只需要有半数以上的节点写入成功即可,所以如果有部分非Leader节点没有写入或写入失败,该如何处理?

Raft协议中的Leader节点和Follower节点会有心跳机制。在心跳传输过程中,Leader节点会把最新的数据传给其他Follower节点,以保证Follower节点中的数据和Leader节点的数据是一致的。

需要注意的是:当Follower节点没有在指定时间内接收到Leader节点发送过来的心跳包,Follower节点就会认为Leader节点挂掉了,此时Follower节点会把自身状态修改为Candidate并且重新发起投票。

https://thesecretlivesofdata.com/raft/#home
Let's say we have a single node system.
For this example, you can think of our node as a database server that stores a single value.
We also have a client that can send a value to the server.
Coming to agreement, or consensus, on that value is easy with one node.
But how do we come to consensus if we have multiple nodes?
That's the problem of distributed consensus.
Raft is a protocol for implementing distributed consensus.
Let's look at a high level overview of how it works.A node can be in 1 of 3 states: the Follower state, the Candidate state, or the Leader state.
All our nodes start in the follower state.
If followers don't hear from a leader then they can become a candidate.
The candidate then requests votes from other nodes.
Nodes will reply with their vote.
The candidate becomes the leader if it gets votes from a majority of nodes.
This process is called Leader Election.All changes to the system now go through the leader.
Each change is added as an entry in the node's log.
This log entry is currently uncommitted so it won't update the node's value.
To commit the entry the node first replicates it to the follower nodes...
then the leader waits until a majority of nodes have written the entry.
The entry is now committed on the leader node and the node state is "5".
The leader then notifies the followers that the entry is committed.
The cluster has now come to consensus about the system state.
This process is called Log Replication.

(6)Raft协议的Leader选举流程

Leader是如何选举出来的?

一.选举超时时间和选举步骤

假设使用了Raft协议的集群有3个节点:那么一开始,三个节点都会在倒计时中进行等待,此时会有一个称为Election Timeout的随机休眠时间或选举超时时间,该选举超时时间会被随机分配到150ms到300ms之间。

等待超过选举超时时间过后,节点会马上进行投票,投票分为如下几个步骤:

步骤一:先投给自己一票,并且把自己节点状态修改为Candidate

步骤二:向其他集群节点进行投票

步骤三:获取投票结果,如果过半节点投自己,则把状态修改为Leader

一旦Leader节点选举出来,其他节点的数据都要以Leader节点的为准。因此Leader节点会马上通过心跳机制,同步数据给其他节点。

https://thesecretlivesofdata.com/raft/#election
In Raft there are two timeout settings which control elections.First is the election timeout.
The election timeout is the amount of time a follower waits until becoming a candidate.
The election timeout is randomized to be between 150ms and 300ms.
After the election timeout the follower becomes a candidate and starts a new election term...
...votes for itself...
...and sends out Request Vote messages to other nodes.
If the receiving node hasn't voted yet in this term then it votes for the candidate...
...and the node resets its election timeout.
Once a candidate has a majority of votes it becomes leader.
The leader begins sending out Append Entries messages to its followers.Second is the heartbeat timeout.
These messages are sent in intervals specified by the heartbeat timeout.
Followers then respond to each Append Entries message.This election term will continue until a follower stops receiving heartbeats and becomes a candidate.
Let's stop the leader and watch a re-election happen.
Node B is now leader of term 2.
Requiring a majority of votes guarantees that only one leader can be elected per term.If two nodes become candidates at the same time then a split vote can occur.
Let's take a look at a split vote example...
Two nodes both start an election for the same term...
...and each reaches a single follower node before the other.
Now each candidate has 2 votes and can receive no more for this term.
The nodes will wait for a new election and try again.
Node A received a majority of votes in term 5 so it becomes leader.

二.不同的选举情况分析

如果集群启动时,节点C率先等待超过了选举超时时间。那么节点C会马上发起投票,并改变它自己的状态变为Candidate。等节点C获取超过半数以上的投票,那么它就会成为Leader节点。

如果在集群运行中,Leader节点突然下线。那么这时候其他的Follower节点会重新进行Leader选举。假设原本的Leader节点是B,但由于B突然下线,节点A、C会重新发起投票,最终节点C成为新的Leader节点。并且重新选举Leader后,Trem(任期)会进行递增。Term可理解为Leader的选举次数,次数越大说明数据肯定是最全的。

如果有四个节点,其中有两个Candidate节点都有2票,没有过半。在这种情况下,则会让全部节点重新进行随机睡眠,重新进行Leader选举。

(7)Raft协议如何解决脑裂问题

在Raft协议的一些情况下,可能会产生多个Leader节点。那么多个Leader节点是如何产生的?多个Leader会不会有冲突?

如果在一个集群下,出现了两个Leader节点,那么这就是脑裂问题。假设集群节点有5个,节点B是Leader,但由于发生了网络分区问题。节点A、B可以相互通信,可是节点C、D、E不能和Leader进行通信。那么节点C、D、E将会重新进行Leader选举,最终节点C也成为了Leader。此时,在原本一个集群下,就会产生两个Leader节点。

此时,如果有客户端来进行写数据:

第一个客户端请求到了节点B,由于节点B所在分区网络只有一个Follower节点,达不到半数以上要求,所以节点B的数据一直处于uncommit状态,数据也不会写入成功。

第二个客户端请求到了节点C,由于节点C所在分区网络有两个Follower节点,有半数以上支持,所以节点C的数据是能够写入成功的。

假如网络突然恢复,5个节点都可以相互通信,那么怎么处理两个Leader。这时两个Leader会相互发送心跳。节点B会发现节点C的Term比自己大,所以会认节点C为Leader并自动转换为Follower节点。

https://thesecretlivesofdata.com/raft/#replication
Once we have a leader elected we need to replicate all changes to our system to all nodes.
This is done by using the same Append Entries message that was used for heartbeats.
Let's walk through the process.First a client sends a change to the leader. Set value by "5".
The change is appended to the leader's log...
...then the change is sent to the followers on the next heartbeat.
An entry is committed once a majority of followers acknowledge it...
...and a response is sent to the client.Now let's send a command to increment the value by "2".
Our system value is now updated to "7".
Raft can even stay consistent in the face of network partitions.Let's add a partition to separate A & B from C, D & E.
Because of our partition we now have two leaders in different terms.
Let's add another client and try to update both leaders.
One client will try to set the value of node B to "3".
Node B cannot replicate to a majority so its log entry stays uncommitted.
The other client will try to set the value of node C to "8".
This will succeed because it can replicate to a majority.Now let's heal the network partition.
Node B will see the higher election term and step down.
Both nodes A & B will roll back their uncommitted entries and match the new leader's log.
Our log is now consistent across our cluster.

(8)总结

Raft协议相关论文:

https://raft.github.io/raft.pdf

Raft协议详细流程演示:

https://thesecretlivesofdata.com/raft/

Nacos既支持AP架构,也支持CP架构。前面介绍的集群源码,是属于AP架构的。在源码中可以看到很多异步任务,说明是比较看重可用性。由于是使用定时任务,那么数据会在某些特定时间出现不一致的情况,但最终还是会保证一致性。

7.Nacos实现的Raft协议是如何写入数据的

(1)Nacos 1.4.1版本实现Raft协议说明

(2)Nacos实现的Raft协议是如何写入数据的

(3)RaftCore的signalPublish()方法总结

(1)Nacos 1.4.1版本实现Raft协议说明

Nacos 1.4.1版本并没有完全按照标准的Raft协议所定义的流程来实现,所以该版本的实现中会存在一些问题。并且Nacos 1.4.1版本,已标注后期会删除这套实现。

Nacos 2.x版本会采用JRaft来实现Raft协议,JRaft就是完全按照Raft协议定义的流程来实现的。所以早期版本实现的Raft协议,没必要仔细研究,大概知道流程即可。

(2)Nacos实现的Raft协议是如何写入数据的

在Raft协议里只有Leader节点才会操作数据,并且会有两阶段提交的动作,所以可以通过服务实例注册的处理为入口进行分析。

在进行服务实例注册时:会通过一个key来选择调用不同ConsistencyService实现类的put()方法。而这个key中会包含一个很关键的属性叫做ephemeral,ephemeral默认是true,所以最终会执行AP架构下的服务注册。我们可以在yml配置文件中,把ephemeral属性设置为false,那么在服务实例注册时,就会执行CP架构下的服务注册。不过,注册中心一般很少使用CP架构。

如果执行的是CP架构下的服务注册,那么最终会调用RaftConsistencyServiceImpl的put()方法,从而触发调用Raft协议的核心方法:RaftCore的signalPublish()方法。

@Component
public class ServiceManager implements RecordListener<Service> {...//Add instance to service. 添加服务实例public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {//构建要注册的服务实例对应的服务的keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//根据命名空间以及服务名获取要注册的服务实例对应的服务Service service = getService(namespaceId, serviceName);//使用synchronized锁住要注册的服务实例对应的服务synchronized (service) {//由于一个服务可能存在多个服务实例,所以需要根据当前注册请求的服务实例ips,获取对应服务的最新服务实例列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);//Instances实现了用于在Nacos集群进行网络传输的Record接口Instances instances = new Instances();instances.setInstanceList(instanceList);//执行DelegateConsistencyServiceImpl的put()方法consistencyService.put(key, instances);}}...
}@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {    private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;private final EphemeralConsistencyService ephemeralConsistencyService;public DelegateConsistencyServiceImpl(PersistentConsistencyServiceDelegateImpl persistentConsistencyService, EphemeralConsistencyService ephemeralConsistencyService) {this.persistentConsistencyService = persistentConsistencyService;this.ephemeralConsistencyService = ephemeralConsistencyService;}@Overridepublic void put(String key, Record value) throws NacosException {//如果是临时实例,则调用DistroConsistencyServiceImpl.put()方法//如果是持久化实例,则调用PersistentConsistencyServiceDelegateImpl.put()方法mapConsistencyService(key).put(key, value);}...private ConsistencyService mapConsistencyService(String key) {//根据不同的key选择不同的ConsistencyServicereturn KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;}
}@Component("persistentConsistencyServiceDelegate")
public class PersistentConsistencyServiceDelegateImpl implements PersistentConsistencyService {private final RaftConsistencyServiceImpl oldPersistentConsistencyService;private final BasePersistentServiceProcessor newPersistentConsistencyService;private volatile boolean switchNewPersistentService = false;...@Overridepublic void put(String key, Record value) throws NacosException {switchOne().put(key, value);}private PersistentConsistencyService switchOne() {return switchNewPersistentService ? newPersistentConsistencyService : oldPersistentConsistencyService;}...
}//Use simplified Raft protocol to maintain the consistency status of Nacos cluster.
@Deprecated
@DependsOn("ProtocolManager")
@Service
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {private final RaftCore raftCore;...@Overridepublic void put(String key, Record value) throws NacosException {checkIsStopWork();try {//Raft协议的核心实现raftCore.signalPublish(key, value);} catch (Exception e) {Loggers.RAFT.error("Raft put failed.", e);throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);}}...
}

RaftCore的signalPublish()方法中的逻辑大概分成三部分:

第一部分:方法一开始就会判断自身节点是不是Leader节点,如果不是则会通过HTTP方式转发给Leader节点进行处理。

第二部分:RaftCore的signalPublish()方法中有一行核心代码onPublish(),即如果是Leader节点则会执行RaftCore的onPublish()方法来处理数据。该方法会先把数据写入到本地文件,然后马上同步给内存注册表。

RaftStore的write(datum)方法会把服务实例信息持久化到本地文件,即把Instance服务实例信息以JSON格式持久化到Nacos服务端目录下,并且存储的文件是以命名空间##分组@@服务名来命名的。而持久化的服务实例信息,在下一次服务端重启时会重新加载到内存注册表中。

服务实例信息持久化后,会通过NotifyCenter发布ValueChangeEvent事件更新注册表。RaftCore的init()方法会向NotifyCenter注册一个订阅者PersistentNotifier。所以NotifyCenter发布ValueChangeEvent事件时,就会被PersistentNotifier的onEvent()方法监听到,然后执行PersistentNotifier的notify()方法,最后会执行Service的onChange()方法来更新内存注册表。

第三部分:主要就是遍历集群节点,向每个节点发起通知请求来进行数据同步,这里会使用CountDownLatch闭锁来实现控制集群半数节点同步成功。

在创建CountDownLatch闭锁时,会获取集群半数的数量来创建闭锁。每当有一个集群节点同步成功,就对CountDownLatch闭锁进行减1。最后使用闭锁的await()方法进行等待,直到闭锁减完或超时才继续执行。这样通过CountDownLatch并发工具类就能实现需要过半节点成功的功能。

@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {private final RaftProxy raftProxy;private final RaftStore raftStore;public final PersistentNotifier notifier;...@PostConstructpublic void init() throws Exception {...//注册订阅者NotifyCenter.registerSubscriber(notifier);...}...//Signal publish new record. If not leader, signal to leader. If leader, try to commit publish.public void signalPublish(String key, Record value) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}//第一部分:判断自己是不是Leader节点if (!isLeader()) {ObjectNode params = JacksonUtils.createEmptyJsonNode();params.put("key", key);params.replace("value", JacksonUtils.transferToJsonNode(value));Map<String, String> parameters = new HashMap<>(1);parameters.put("key", key);//获取Leader节点final RaftPeer leader = getLeader();//将写请求转发给Leader节点raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);return;}OPERATE_LOCK.lock();try {final long start = System.currentTimeMillis();final Datum datum = new Datum();datum.key = key;datum.value = value;if (getDatum(key) == null) {datum.timestamp.set(1L);} else {datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());}ObjectNode json = JacksonUtils.createEmptyJsonNode();json.replace("datum", JacksonUtils.transferToJsonNode(datum));json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));//第二部分:Leader节点会执行到这里进行数据处理,把服务实例信息写入磁盘以及内存onPublish(datum, peers.local());//第三部分:final String content = json.toString();//通过闭锁来控制半数以上节点,peers.majorityCount()就是获取集群半数以上的节点数量final CountDownLatch latch = new CountDownLatch(peers.majorityCount());//同步给其他节点for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}final String url = buildUrl(server, API_ON_PUB);//通过HTTP方式通知其他节点HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, result.getCode());return;}//某个节点成功,闭锁-1latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}//通过闭锁的await()方法来等待半数集群节点同步成功if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {//only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}long end = System.currentTimeMillis();Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);} finally {OPERATE_LOCK.unlock();}}...//Do publish. If leader, commit publish to store. If not leader, stop publish because should signal to leader.public void onPublish(Datum datum, RaftPeer source) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}RaftPeer local = peers.local();if (datum.value == null) {Loggers.RAFT.warn("received empty datum");throw new IllegalStateException("received empty datum");}if (!peers.isLeader(source.ip)) {Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader()));throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");}if (source.term.get() < local.term.get()) {Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local));throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());}local.resetLeaderDue();//if data should be persisted, usually this is true:if (KeyBuilder.matchPersistentKey(datum.key)) {//先把数据写到本地文件中raftStore.write(datum);}//同步缓存datums.put(datum.key, datum);if (isLeader()) {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);} else {if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {//set leader term:getLeader().term.set(source.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);}}raftStore.updateTerm(local.term.get());//通过发布ValueChangeEvent事件来同步内存注册表NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);}
}@Deprecated
@Component
public class RaftStore implements Closeable {...//Write datum to cache file.public synchronized void write(final Datum datum) throws Exception {String namespaceId = KeyBuilder.getNamespace(datum.key);File cacheFile = new File(cacheFileName(namespaceId, datum.key));if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {MetricsMonitor.getDiskException().increment();throw new IllegalStateException("can not make cache file: " + cacheFile.getName());}ByteBuffer data;data = ByteBuffer.wrap(JacksonUtils.toJson(datum).getBytes(StandardCharsets.UTF_8));try (FileChannel fc = new FileOutputStream(cacheFile, false).getChannel()) {fc.write(data, data.position());fc.force(true);} catch (Exception e) {MetricsMonitor.getDiskException().increment();throw e;}//remove old format file:if (StringUtils.isNoneBlank(namespaceId)) {if (datum.key.contains(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER)) {String oldDatumKey = datum.key.replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY);cacheFile = new File(cacheFileName(namespaceId, oldDatumKey));if (cacheFile.exists() && !cacheFile.delete()) {Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key, datum.value);throw new IllegalStateException("failed to delete old format datum: " + datum.key);}}}}...
}//事件发布中心:事件发布机制的实现
public class NotifyCenter {...//注册订阅者public static <T> void registerSubscriber(final Subscriber consumer) {...addSubscriber(consumer, subscribeType);}private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);//执行DefaultPublisher.addSubscriber()方法publisher.addSubscriber(consumer);}//发布事件public static boolean publishEvent(final Event event) {return publishEvent(event.getClass(), event);}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {//执行DefaultPublisher.publish()方法return publisher.publish(event);}...}...
}public class DefaultPublisher extends Thread implements EventPublisher {protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();...@Overridepublic void addSubscriber(Subscriber subscriber) {subscribers.add(subscriber);}@Overridepublic boolean publish(Event event) {...  receiveEvent(event);...}void receiveEvent(Event event) {...for (Subscriber subscriber : subscribers) {...notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {final Runnable job = new Runnable() {@Overridepublic void run() {subscriber.onEvent(event);}};final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception : {}", e);}}}...
}

(3)RaftCore的signalPublish()方法总结

首先会判断自身节点是不是Leader,如果不是,则会转发给Leader处理。如果是Leader,则会对数据进行处理,先是写入到本地文件,然后同步到内存注册表,最后会通知其他Follower节点进行数据同步。

可见Nacos 1.4.1版本在数据的写入实现上,并没有两阶段提交的处理。而是Leader自身处理数据完成后,直接就去同步给其他集群节点。哪怕集群节点同步失败或没有过半节点成功,Leader的数据也不会回滚而只抛出异常。所以,Nacos 1.4.1版本只是实现了Raft的简化版,后续也会被废弃掉的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/80175.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

普通IT的股票交易成长史--20250509晚复盘

声明&#xff1a; 本文章的内容只是自己学习的总结&#xff0c;不构成投资建议。价格行为理论学习可参考简介中的几位&#xff0c;感谢他们的无私奉献。 送给自己的话&#xff1a; 仓位就是生命&#xff0c;绝对不能满仓&#xff01;&#xff01;&#xff01;&#xff01;&…

python实现点餐系统

使用python实现点餐系统的增加菜品及价格&#xff0c;删除菜品&#xff0c;查询菜单&#xff0c;点菜以及会员折扣价等功能。 代码&#xff1a; 下面展示一些 内联代码片。 # coding utf-8menu {拍黄瓜: 6, 小炒肉: 28, 西红柿炒蛋: 18, 烤鱼: 30, 红烧肉: 38, 手撕鸡: 45,…

从ellisys空口分析蓝牙耳机回连手机失败案例

问题背景&#xff1a; 前两天同事发现我们现在做的项目&#xff0c;耳机在跟某些特定类型安卓手机&#xff08;尤其是比较新的手机&#xff09;回连会失败&#xff0c;然后我帮他分析了一些log&#xff0c;记录如下&#xff1a; 回连失败所做步骤如下&#xff1a; 手机和耳机…

教育+AI:个性化学习能否颠覆传统课堂?

近年来&#xff0c;人工智能&#xff08;AI&#xff09;技术迅猛发展&#xff0c;逐渐渗透到各行各业&#xff0c;教育领域也不例外。从智能辅导系统到自适应学习平台&#xff0c;AI正在改变传统的教学模式&#xff0c;使个性化学习成为可能。然而&#xff0c;这种变革能否真正…

【C++设计模式之Strategy策略模式】

C设计模式之Strategy策略模式 模式定义核心思想动机(Motivation)结构(Structure)实现步骤1. 定义策略接口&#xff08;基于继承&#xff09;2.实现具体策略3.上下文类(Context)4. 在main中调用 应用场景&#xff08;基于继承&#xff09;1.定义策略接口2.实现具体策略3.上下文类…

Python企业级MySQL数据库开发实战指南

简介 Python与MySQL的完美结合是现代Web应用和数据分析系统的基石,能够创建高效稳定的企业级数据库解决方案。本文将从零开始,全面介绍如何使用Python连接MySQL数据库,设计健壮的表结构,实现CRUD操作,并掌握连接池管理、事务处理、批量操作和防止SQL注入等企业级开发核心…

matlab转python

1 matlab2python开源程序 https://blog.csdn.net/qq_43426078/article/details/123384265 2 网址 转换网址&#xff1a;https://app.codeconvert.ai/code-converter?inputLangMatlab&outputLangPython 文件比较网址&#xff1a;https://www.diffchecker.com/text-comp…

Vue 3 中编译时和运行时的概念区别

文章目录 前言Vue 3 中的编译时 vs 运行时区别模板在编译时转化为渲染函数编译时的优化处理运行时的工作:创建组件实例与渲染流程前言 详细整理 Vue 3 中编译时和运行时的概念区别,并重点解释为什么组件实例是在运行时创建的。 我会结合官方文档、源码分析和社区解释,确保内…

Spring 框架实战:如何实现高效的依赖注入,优化项目结构?

Spring 框架实战&#xff1a;如何实现高效的依赖注入&#xff0c;优化项目结构&#xff1f; 在当今的 Java 开发领域&#xff0c;Spring 框架占据着举足轻重的地位。而依赖注入作为 Spring 的核心概念之一&#xff0c;对于构建高效、灵活且易于维护的项目结构有着关键作用。本…

创建虚拟服务时实现持久连接。

在调度器中配置虚拟服务&#xff0c;实现持久性连接&#xff0c;解决会话保持问题。 -p 【timeout】 -p 300 这5分钟之内调度器会把来自同一个客户端的请求转发到同一个后端服务器。【不管使用的调度算法是什么。】【称为持久性连接。】 作用&#xff1a;将客户端一段时间…

说下RabbitMQ的整体架构

RabbitMQ 是一个基于 AMQP&#xff08;Advanced Message Queuing Protocol&#xff09; 协议的开源消息中间件&#xff0c;RabbitMQ的整体架构围绕消息的生产、路由、存储和消费设计&#xff0c;旨在实现高效、可靠的消息传递&#xff0c;它由多个核心组件协同工作。 核心组件 …

STM32--GPIO

教程 视频 博主教程 STM32系统结构图 GPIO GPIO&#xff08;General Purpose Input/Output&#xff09;是STM32内部的一种外设。 一个STM32芯片内存在多个GPIO外设&#xff0c;每个GPIO外设有16个引脚&#xff1b; 比如GPIOA&#xff1a;PA0~PA15; GPIOB&#xff1a;PB0~…

QUIC协议优化:HTTP_3环境下的超高速异步抓取方案

摘要 随着 QUIC 和 HTTP/3 的普及&#xff0c;基于 UDP 的连接复用与内置加密带来了远超 HTTP/2 的性能提升&#xff0c;可显著降低连接握手与拥塞恢复的开销。本文以爬取知乎热榜数据为目标&#xff0c;提出一种基于 HTTPX aioquic 的异步抓取方案&#xff0c;并结合代理 IP设…

[论文阅读]MCP Guardian: A Security-First Layer for Safeguarding MCP-Based AI System

MCP Guardian: A Security-First Layer for Safeguarding MCP-Based AI System http://arxiv.org/abs/2504.12757 推出了 MCP Guardian&#xff0c;这是一个框架&#xff0c;通过身份验证、速率限制、日志记录、跟踪和 Web 应用程序防火墙 &#xff08;WAF&#xff09; 扫描来…

Redis客户端缓存的4种实现方式

Redis作为当今最流行的内存数据库和缓存系统&#xff0c;被广泛应用于各类应用场景。然而&#xff0c;即使Redis本身性能卓越&#xff0c;在高并发场景下&#xff0c;应用与Redis服务器之间的网络通信仍可能成为性能瓶颈。 这时&#xff0c;客户端缓存技术便显得尤为重要。 客…

eNSP中路由器OSPF协议配置完整实验和命令解释

本实验使用三台华为路由器&#xff08;R1、R2和R3&#xff09;相连&#xff0c;配置OSPF协议实现网络互通。拓扑结构如下&#xff1a; 实验IP规划 R1: GE0/0/0: 192.168.12.1/24 (Area 0)Loopback0: 1.1.1.1/32 (Area 0) R2: GE0/0/0: 192.168.12.2/24 (Area 0)GE0/0/1: 192.…

内网渗透——红日靶场三

目录 一、前期准备 二、外网探测 1.使用nmap进行扫描 2.网站信息收集 3.漏洞复现(CVE-2021-23132) 4.disable_function绕过 5.反弹shell&#xff08;也&#xff0c;并不是&#xff09; 6.SSH登录 7.权限提升&#xff08;脏牛漏洞&#xff09; 8.信息收集 9.上线msf 三…

解决Win11下MySQL服务无法开机自启动问题

问题描述 在win11系统中&#xff0c;明明将MySQL服务设置成了自动启动&#xff0c;但在重启电脑后MySQL服务还是无法自动启动&#xff0c;每次都要重新到计算机管理的服务中找到服务再手动启动。 解决方式 首先确保mysql服务的启动类型为自动。 设置方法&#xff1a;找到此电…

后端项目进度汇报

项目概述 本项目致力于构建一个先进的智能任务自动化平台。其核心技术是一套由大型语言模型&#xff08;LLM&#xff09;驱动的后端系统。该系统能够模拟一个多角色协作的团队&#xff0c;通过一系列精心设计或动态生成的处理阶段&#xff0c;来高效完成各种复杂任务&#xff…

深度学习中学习率调整:提升食物图像分类模型性能的关键实践

深度学习中学习率调整&#xff1a;提升食物图像分类模型性能的关键实践 接上篇保存最优模型&#xff0c;在深度学习模型训练过程中&#xff0c;学习率作为核心超参数&#xff0c;其设置直接影响模型的收敛速度与最终性能。本文将结合食物图像分类项目&#xff0c;深入探讨学习…