1.概述
zk作为一个内存数据库产品,启动一个zk实例对外提供服务的方式成为单机模式。既然单机模式就可以实现请求处理,那为何要引入集群模式呢,引入集群模式付出了那些代价呢,相应的有取得了那些收益呢?
首先,所谓集群模式,就是本来启动一个zk实例对外提供服务的,现在启动多个zk实例,这多个zk持久同一份数据实体,请求被某个实例处理时,收到请求的实体不仅自己要处理,而且要将请求传递给其他zk实例去处理,然后,该实体还要等待收到半数以上成员的反馈后,再进入此请求的提交阶段。
这个过程中相比单机模式,我们付出的代价有:
 (1). 资源代价。
 多个zk实例占据了比单个zk实体更多的资源。但我们提供的还是同样的服务内容。
 (2). 时间代价
 多个zk实例下请求处理需收到请求实例,传递请求到其他实例,需等待半数以上成员反馈,再提交。相比单个zk实例,大大增加了请求处理的耗时。但我们提供的还是同样的服务内容。
zk集群下,提供的还是同样的服务内容,但却付出了如此大的代价。那我们得到了什么呢?
 (1). 高可靠性
 所谓高可靠性,指的是,zk集群下,集群的每个成员都保存同一份数据。这样,即使集群的某个成员的数据被损坏了,只要还有一个成员的数据是好的。那么,数据也不会丢失。在集群同步阶段,丢失数据的成员将从主那里得到丢失的数据信息。
 (2). 高可用性
 所谓高可用性,指的是,zk集群下,集群运行过程中,即使我们将某个成员下线了,集群作为一个整体依然可以对外提供服务,允许客户端继续接入,处理客户端的请求。
在衡量收益和付出后,认为收益大于付出的地方就是分布式发挥作用的地方。
2.zk集群的选举
我们以形如:
 (1). 实例1
...
// myid=1
clientPort=2181
server.1=127.0.0.1:3888:4888
server.2=127.0.0.1:3889:4889
server.3=127.0.0.1:3890:4890
(2). 实例2
...
// myid=2
clientPort=2182
server.1=127.0.0.1:3888:4888
server.2=127.0.0.1:3889:4889
server.3=127.0.0.1:3890:4890
(3). 实例3
...
// myid=1
clientPort=2183
server.1=127.0.0.1:3888:4888
server.2=127.0.0.1:3889:4889
server.3=127.0.0.1:3890:4890
这样的集群配置为例来讨论zk集群的选举过程。
2.1.启动阶段的选举过程
 zk集群下,每个zk实例启动阶段先要经过选举阶段。
 选举结束后需要经过同步阶段。
 此后,才能以确定下来的身份运行(启动服务端请求处理流水线)。
 我们分析zk实例启动时的选举阶段。
 (1). 通过QuorumPeer的startLeaderElection开启选举阶段。
 此时zk实例的集群状态为ServerState.LOOKING。
 此时需要构建自身一个初始选票:currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());这个初始选票代表自己要选的主是自己。这个主的数据实体的情况通过经过恢复后数据实体的lastProcessedZxid来表示。值得注意的是对于zxid是一个64位数值,高32位是轮次值,低32位是轮次下序号值。
 对于zk实例的currentEpoch意义和变迁的说明,现在暂且搁置。只需明白一个空的集群初始阶段所有集群成员的currentEpoch均为0即可。
 (2). 选举过程需要集群内各个成员互相连接,互发消息。
 为此需要一个QuorumCnxManager类型实例来管理这个连接过程。
QuorumCnxManager qcm = createCnxnManager();// 集群间连接管理
QuorumCnxManager.Listener listener = qcm.listener;
listener.start();// 允许集群选举成员连接。
我们分析,listener.start();这个操作。执行该操作时,使得自身作为一个服务端开启监听,并持续等待其他集群成员的接入。
 对形如server.1=127.0.0.1:3888:4888这样的地址配置,对集群Id为1的成员来说,在选举过程,它会在4888端口上作为服务端监听其他集群成员的接入;在选举结束后,若它成为集群的主,它会在3888端口上作为服务端监听其他集群成员的接入。
(3). 有了qcm,集群成员间选举阶段的相互通讯就有了保证。
 为了选举逻辑的实现我们还需要一个FastLeaderElection 类型实例来实现选举的逻辑。
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start()
{this.messenger.start(){this.wsThread.start();this.wrThread.start();}
}
我们分析,fle.start();这个操作。执行该操作时,会开启两个线程。
 一个线程是WorkerReceiver,该线程持续执行:
 a. 从接收队列获取来自集群其他成员发来的选举包
 b. 处理包
一个线程是WorkerSender,该线程持续执行:
 a. 从fle实例中LinkedBlockingQueue<ToSend> sendqueue;队列取待发送给其他集群成员的包。
 b. 基于包构建消息,并通过manager.toSend(m.sid, requestBuffer);将包发送给指定的集群成员。
(4). 之后我们将开启主线程,主线程逻辑处理是QuorumPeer::run()。
 启动阶段,这里判断自身状态为LOOKING时,将执行一个关键操作:setCurrentVote(makeLEStrategy().lookForLeader());
 这个关键操作里makeLEStrategy().lookForLeader()将通过(3)中的fle的lookForLeader结束自身的选举过程,并返回选举结束后自身的选票。这个选票会包含选举的主的信息。
我们着重分析FastLeaderElection::lookForLeader来解析选举过程如何进行,如何结束。
 a. 初始时,更新自身选票信息。
 选择自己,自己的zxid,自己的currentEpoch。
 b.执行sendNotifications()
 具体为,针对集群每个选举成员构建一个new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes(UTF_8));对象放入sendqueue。(3)中的WorkerSender线程最终会对每个放入的ToSend执行manager.toSend(m.sid, requestBuffer);
我们首先分析下构造的ToSend。其类别为notification,其leader,zxid被设置为我们选票里主的sid和zxid。其electionEpoch被设置为logicalclock,这个logicalclock每个zk实例启动时为0,初次执行lookForLeader是递增为1。其state被设置为LOOKING,表示发出消息一方现在是LOOKING。其sid被设置为sid,表示消息要发给那个集群成员。其peerEpoch被设置为我们选票里的currentEpoch。其configData被设置为包含集群完整信息的集群配置对象。
接下来我们分析下WorkerSender中对每个ToSend的处理。
 先是buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);基于ToSend序列化一个包。
 我们将以以下方式得到一个包的数据流
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
requestBuffer.clear();
requestBuffer.putInt(state);// 消息发送者的状态
requestBuffer.putLong(leader);// 发送者投票中的主
requestBuffer.putLong(zxid);// 发送者投票中的主的zxid
requestBuffer.putLong(electionEpoch);// 发送者的logicalclock
requestBuffer.putLong(epoch);// 发送者投票中的主的currentEpoch
requestBuffer.putInt(Notification.CURRENTVERSION);// 0x2
requestBuffer.putInt(configData.length);// 集群配置对象尺寸
requestBuffer.put(configData);// 完整的集群配置信息对象
再是manager.toSend(m.sid, requestBuffer);
 若发向自己:
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
这是直接向qcm的recvQueue放入一个构建的Message。这个Message的buffer被设置为收到的包,sid被设置为此包来自哪个集群成员。
 若发向其他集群成员:
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
connectOne(sid);
这是先为发送目标分配一个包的队列,将目标id,其包队列加入map管理。
 向其包队列放入此包。
 为了将此包发给目标,我们需要执行connectOne与其建立连接。
对于connectOne:
 我们需要先分析与sid的连接是否已经存在。
 若连接已经存在,则直接返回。
 若连接不存在,则会执行如下流程:
 b.1. 通过sid查询集群信息获取目标的选举监听端口。
 b.2. 创建套接字,发起到sid代表的服务端的连接。
 b.3. 若对端此时尚未启动,则会触发连接超时,结束连接。
 b.4. 若对端此时启动了,触发连接成功。
 这里连接成功后,会发出首个包
// 我们不考虑多地址
dout.writeLong(protocolVersion);// PROTOCOL_VERSION_V1
dout.writeLong(self.getId());// 自身的Id
Collection<InetSocketAddress> addressesToSend = protocolVersion == Arrays.asList(self.getElectionAddress().getOne());
String addr = addressesToSend.stream().map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);// 自身的用于集群选举的ip:port
接下来判断,
 b.1. 若是自身Id小于所连目标的Id,
 主动连接的一方:断开连接。表示不允许小Id作为客户端去连大Id。
 被动连接的一方:
protocolVersion = din.readLong();
InitialMessage init = InitialMessage.parse(protocolVersion, din){sid = din.readLong();读取后续内容得到一个List<InetSocketAddress> addresses对象return new InitialMessage(sid, addresses);
}
此后判断连接发起者的sid小于自身sid时,执行:
closeSocket(sock);
connectOne(sid, electionAddr);
即可,小Id主动连接大Id在连接成功后,连接方,被动连接方双向断开。再由被动连接方主动发起大Id到小Id的主动连接。
b.2. 若是自身Id大于所连目标的Id,
 对主动连接方,执行:
SendWorker sw = new SendWorker(sock, sid);// sid是目标方sid
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
即客户端方面为连接分配一个SendWorker 实例,一个RecvWorker 实例。并建立<sid,sw>,<sid, 发送队列>的映射。值得注意的是,这里的<sid, 发送队列>映射在连接发起前已经建立好了,且已经放入了消息包。
 其中SendWorker是一个独立线程,负责不停的从sid关联的发送队列去消息并发送此消息到对端。
 其中RecvWorker是一个独立线程,负责不停从连接收取数据包,对每个收取的数据包构建new Message(ByteBuffer.wrap(msgArray), sid)对象放入隶属的QuorumCnxManager对象的recvQueue中。
对被动连接方,
protocolVersion = din.readLong();
InitialMessage init = InitialMessage.parse(protocolVersion, din){sid = din.readLong();读取后续内容得到一个List<InetSocketAddress> addresses对象return new InitialMessage(sid, addresses);
}
此后判断连接发起者的sid大于自身sid时,执行:
SendWorker sw = new SendWorker(sock, sid);// sid是另一端的sid
RecvWorker rw = new RecvWorker(sock, din, sid, sw);// 一个RecvWorker实例负责连接数据接收
sw.setRecv(rw);// 设置关联的rw
senderWorkerMap.put(sid, sw);// 放入map
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();// 启动线程
rw.start();// 启动线程
这里和客户端方面为新建连接做的事情是一致的。
总结上述过程,我们知道执行sendNotifications()的效果是,会与集群其他在线且开启了选举监听的成员彼此间建立一个连接通道。通过此通道,我们向其发出我们构建的一个选举信息包。对于集群中未上线的成员,选举信息包暂时存在于其sid所对应的发送队列里。
c. 接下来将是一个循环处理逻辑
 在这个循环里面最终将完成主节点的选举并将选举得到的关于主节点的自身投票信息作为lookForLeader的结果返回出去。
 下面我们分析循环处理逻辑。
 c.1. 从fle 的recvqueue取出一个Notification实例。
 这个recvqueue的生产者是fle下messenger的wrThread线程。这个wrThread线程做的事情(3)里描述过。
 这里再次描述下,并分析生产过程。
 这个线程也是一个循环,它的流程是:
 c.1.1.从fle持有的qcm的recvQueue队列中取出一个Message实例。
 c.1.2.对包进行分析
/*
requestBuffer.clear();
requestBuffer.putInt(state);// 消息发送者的状态
requestBuffer.putLong(leader);// 发送者投票中的主
requestBuffer.putLong(zxid);// 发送者投票中的主的zxid
requestBuffer.putLong(electionEpoch);// 发送者的logicalclock
requestBuffer.putLong(epoch);// 发送者投票中的主的currentEpoch
requestBuffer.putInt(Notification.CURRENTVERSION);// 0x2
requestBuffer.putInt(configData.length);// 集群配置对象尺寸
requestBuffer.put(configData);// 完整的集群配置信息对象
*/
response.buffer.clear();
Notification n = new Notification();
int rstate = response.buffer.getInt();// 消息发送者状态
long rleader = response.buffer.getLong();// 发送者投票中的主
long rzxid = response.buffer.getLong();// 发送者投票中的主的zxid
long relectionEpoch = response.buffer.getLong();// 发送者的logicalclock
long rpeerepoch;
int version = 0x0;
QuorumVerifier rqv = null;
rpeerepoch = response.buffer.getLong();// 发送者投票中的主的currentEpoch
version = response.buffer.getInt();
接下来:
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {case 0:ackstate = QuorumPeer.ServerState.LOOKING;break;case 1:ackstate = QuorumPeer.ServerState.FOLLOWING;break;case 2:ackstate = QuorumPeer.ServerState.LEADING;break;case 3:ackstate = QuorumPeer.ServerState.OBSERVING;break;default:continue;}Notification n = new Notification();n.leader = rleader;n.zxid = rzxid;n.electionEpoch = relectionEpoch;n.state = ackstate;n.sid = response.sid;n.peerEpoch = rpeerepoch;n.version = version;n.qv = rqv;
c.1.3.若自身目前处于选举阶段
// 基于收到的集群成员的包构建Notification 实例并放入fle的recvqueue
recvqueue.offer(n);// 记录下来
if ((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())) {Vote v = getVote();QuorumVerifier qv = self.getQuorumVerifier();ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes());sendqueue.offer(notmsg);
}
基于收到包构建一个Notification实例,放入fle的recvqueue。
 当我们发现发送者此时也在选举且发送者的logicalclock小于我们时,我们才需要将自己的选举信息发送给对方。
 什么时候会发生这种情况呢?
 考虑一个1,2,3三个成员的集群。
 一开始1,2启动并完成选举,此时1,2的logicalclock均为1。
 假设1是主。后续1下线,引发2再次选举,此时2的logicalclock为2,此时3上线,也开始选举。
 那么当2收到3的包时,就会发现3的logicalclock小于我们,此时我们需要把自己的选举信息立即发给3帮助3修正自己。即使这里不发,由于一开始我们执行了sendNotifications(),在3与我们的通信建立后,也会把之前要发给3的投票信息发给它。不过考虑到我们的投票信息可能在选举中发生改变,这里发现3的投票是落后的,再发一次也是安全的。
c.1.4.若自身目前没在选举且发送者此时在选举
 则,认为自身一定是已经和集群多数成员已经就主达成一致了。
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes());
sendqueue.offer(notmsg);
此时我们需要将集群主的选票信息告知对方。
 什么时候会发生这种情况呢?
 考虑一个1,2,3三个成员的集群。
 一开始1,2启动并完成选举。
 假设1是主。此时3上线,也开始选举。
 那么当1收到3的包时,会发现3在选举,我们自身已经完成选取,知道由多数成员选出的主的信息。此时我们需要把自己的选举信息立即发给3帮助3修正自己。即使这里不发,由于一开始我们执行了sendNotifications(),在3与我们的通信建立后,也会把之前要发给3的投票信息发给它。不过考虑到我们的投票信息可能在选举中发生改变,这里发现3的投票是落后的,再发一次也是安全的。
总结以上分析,我们可知fle的recvqueue在我们处于选举阶段时,会容纳来自集群成员给我们的选票信息(包括自己给自己的)。
c.2.若无法取到Notification。
 比如集群中上线成员数量不足以完成选举过程。这时,我们需要再次向集群所有成员发送我们当前的选票信息。发送的动作对尚未连接到的对端会引发和此对端的连接建立尝试。对已经建立连接的对端,再次向其发送重复的选票信息也是无害的。
c.3.若取到了Notification。
 此时需进一步分析发出这个Notification的成员的状态。
 c.3.1.若发出者此时是LOOKING状态:
 比如我们一开始启动了1,2。则1执行lookForLeader时,就会收到来自2的Nofication,且2此时是LOOKING状态。
 进一步:
 c.3.1.1.若n.electionEpoch > logicalclock.get(),即发送者的logicalclock大于我们的logicalclock。
 这表明消息发送者从启动以来经历过的选举次数比我们多。
logicalclock.set(n.electionEpoch);// 更新自身logicalclock
recvset.clear();// 清空自身收到的投票
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);// n中请求处理阶段靠前。
} else {updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();// 给集群所有参与者(包括自己)发投票信息
// 将来自sid的最新Notification收集到容器
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
这时我们更新自身的logicalclock,这样选举结束后,参加选举的每个成员的logicalclock会保持一致。且logicalclock表示的意思是这个集群从启动开始,到当前已经走过的选举次数。
选举达成一致时,需要选举的成员都在进行集群的同一轮的选举。
 所以,我们清理掉之前收到的选票信息,更新自身logicalclock和选票后,再次向集群所有成员发出自身选票信息。相当于重新开始选举,但我们此时是在进行集群的第logicalclock轮选举。
正常情况下,来自大logicalclock的成员应该投票信息反映的请求处理阶段应该比小的靠前。
 但考虑人为干预数据下,比如还是,1,2,3三个成员的集群。
 阶段1,1,2,3全部开启,且经过了长时间的运行。
 此后,1,2,3,全部停止。
 然后,清除1,2的快照和redo后,启动1,2。
 此时,1,2可以完成选举,对外提供服务。假如此时2下线,触发1的选举,而3此时上线。
 此时就会发生对于3,收到的来自1的Notification中n.electionEpoch > logicalclock.get(),但自身的getInitId(), getInitLastLoggedZxid(), getPeerEpoch()反映的请求处理更靠前的情况。
 其实这时我们无论以1中为准,还是以3中为准,总是存在由于多数集群成员数据被手动删除或故障损坏而产生的数据丢失问题。
 只是上述处理中,我们尽可能采取会使得数据丢失更少的一种方案。
c.3.1.2.若n.electionEpoch < logicalclock.get(),即发送者的logicalclock小于我们的logicalclock。
 直接忽略即可。发送者会在收到我们给它的通知后,再次发给我们和我们集群选举轮次一致的Notification的。到那时再处理即可。
 c.3.1.3.若n.electionEpoch == logicalclock.get()
 这是我们收到了其他成员的关于本轮集群选举的Notification。
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();
}
这时我们需要比较确定最靠前的请求处理阶段,此拥有此阶段的成员的sid。
 若我们发现了新的更靠前的sid,则我们需要更新自身的选票信息,并把自身最新的选择告知集群的每个成员。
 这样通过选举,参与本轮选举的所有成员将会达成一致的选票信息。
 接下来
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {// 是的。过半参与者与自己达成一致。while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {// 继续分析队列中其余选票if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {recvqueue.put(n);// 若存在,更优的主节点推荐break;}}// 若接收队列里也没有更优的主节点推荐。且已经有过半参与者达成一致。if (n == null) {setPeerState(proposedLeader, voteSet);// 进入自身角色Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;// 主节点信息}}
由于我们总是可以保证参与选举的集群成员就选举信息达成一致。
 所以,我们只需在每次收集到来自集群选举成员的Notification后,收集起来。
 然后,验证自身的选举信息是否已经和多数成员达成一致,达成一致后。
由于可能存在的c.3.1.1中的情况。为了这类情况下,尽可能少的减少数据丢失。我们继续分析recvqueue,直到现有的分析完,且继续等待也没有新的Notification,我们结束选举阶段。
 依据投票信息决定自身成为主,还是备。
 并将自身关于主的选票信息作为lookForLeader的结果返回。
c.3.2.若发出者此时是FOLLOWING状态:
 比如,1,2,3三个成员的集群。
 1,2先启动并完成选举。
 此时3启动,则3会从1,2各自收到Notification其中一个发出者是FOLLOWING,一个发出者是LEADING。
 这时,若自身的logicalclock和Notification中的electionEpoch 一致。比如,1,2,启动完成选举。3启动选举过程收到来自1,2的Notification的情况。我们这时期望的是收集这个已经结束选举的成员集合里每个成员的Notification,以便我们可以对此集群主节点达成一致认识。然后,以集群主节点作为我们自身的选票信息,并结束选举,设定自身身份。
考虑,1,2,3三个成员的集群经过长时间运行后全部停止。
 手动清理,1,2的快照和redo。再启动1,2,使其完成选举,假设2为主节点,形成集群对外服务。
 此时启动3,按上述策略。3最终将以2作为集群主节点,并设置自身为从节点,构建关于2的选票作为lookForLeader的结果返回。
 这时,策略下,其实导致原来集群的大量数据信息都丢失了。
若自身的logicalclock和Notification中的electionEpoch 不一致。比如,1,2启动并较长时间运行。此时3启动并在选举过程收到来自1,2的Notification的情况。这样也无妨,我们用一个专门的Map<Long, Vote>实例来收集已经结束选举的成员集合里每个成员的Notification,以便我们可以对此集群主节点达成一致认识。然后,以集群主节点作为我们自身的选票信息,并结束选举,设定自身身份,同时更新自身的logicalclock和集群的electionEpoch 一致。
c.3.3.若发出者此时是LEADING状态:
 参考c.3.2中的情况。
总结,集群每个实例在启动阶段均通过上述过程达成对集群主节点的一致认识。然后,确定自身身份,进入集群同步阶段。