Leader
构造方法
public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {this.self = self;this.proposalStats = new BufferStats();// 获取节点间通信地址Set<InetSocketAddress> addresses;if (self.getQuorumListenOnAllIPs()) {addresses = self.getQuorumAddress().getWildcardAddresses();} else {addresses = self.getQuorumAddress().getAllAddresses();}// 创建ServerSocket并bind地址,add到serverSockets集,启动LearnerCnxAcceptor时使用addresses.stream().map(address -> createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum())).filter(Optional::isPresent).map(Optional::get).forEach(serverSockets::add);this.zk = zk;
}
lead方法
QuorumPeer使用lead方法启动leader节点,从lead方法入手分析leader流程并分析重要的方法:
void lead() throws IOException, InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);self.start_fle = 0;self.end_fle = 0;zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);try {self.setZabState(QuorumPeer.ZabState.DISCOVERY);self.tick.set(0);// 使用ZooKeeperServer的loadData方法加载db数据// 加载数据、清理session、生成快照(takeSnapshot)zk.loadData();leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());// 启动线程接收Learner连接,创建LearnerHandler与客户端通信cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();// 获取上一次同步最终epoch并计算本次的epoch和zxidlong epoch = getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());// 设置新的zxidzk.setZxid(ZxidUtils.makeZxid(epoch, 0));synchronized (this) {lastProposed = zk.getZxid();}newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();QuorumVerifier curQV = self.getQuorumVerifier();if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {// qv.version == 0try {QuorumVerifier newQV = self.configFromString(curQV.toString());newQV.setVersion(zk.getZxid());self.setLastSeenQuorumVerifier(newQV, true);} catch (Exception e) {throw new IOException(e);}}newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());}// 等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch// follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式waitForEpochAck(self.getMyId(), leaderStateSummary);self.setCurrentEpoch(epoch); // 设置新的currentEpochself.setLeaderAddressAndId(self.getQuorumAddress(), self.getMyId());self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);try {// 等待follower的newLeaderAckwaitForNewLeaderAck(self.getMyId(), zk.getZxid());} catch (InterruptedException e) {// 略return;}// 启动zookeeperServerstartZkServer();self.setZabState(QuorumPeer.ZabState.BROADCAST);self.adminServer.setZooKeeperServer(zk);// We ping twice a tick, so we only update the tick every other iterationboolean tickSkip = true;String shutdownMessage = null;while (true) {synchronized (this) {long start = Time.currentElapsedTime();long cur = start;long end = start + self.tickTime / 2;// 等待tickTime / 2毫秒while (cur < end) {wait(end - cur);cur = Time.currentElapsedTime();}if (!tickSkip) {self.tick.incrementAndGet();}// 用来判断learner同步状态SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier() != null &&self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());}syncedAckSet.addAck(self.getMyId());// 查询learner的ack状态for (LearnerHandler f : getLearners()) {if (f.synced()) {syncedAckSet.addAck(f.getSid());}}if (!this.isRunning()) { // shutdownbreak;}// 判断超半数learner已是同步状态// 1个tickTime周期判断一次if (!tickSkip && !syncedAckSet.hasAllQuorums() &&!(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) &&self.getQuorumVerifier().revalidateOutstandingProp(this, new ArrayList<>(outstandingProposals.values()), lastCommitted))) {// Lost quorum of last committed and/or last proposedshutdownMessage = "Not sufficient followers synced";break;}tickSkip = !tickSkip;}// ping learner// 1个tickTime周期ping两次for (LearnerHandler f : getLearners()) {f.ping();}}if (shutdownMessage != null) {// leader goes in looking stateshutdown(shutdownMessage);}} finally {zk.unregisterJMX(this);}
}
getEpochToPropose方法
获取上一次同步的最终epoch并计算zxid的值:
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {synchronized (connectingFollowers) {if (!waitingForNewEpoch) {return epoch;}if (lastAcceptedEpoch >= epoch) {epoch = lastAcceptedEpoch + 1; // 更新最新epoch}if (isParticipant(sid)) {connectingFollowers.add(sid);}QuorumVerifier verifier = self.getQuorumVerifier();// 连接的follower超过了半数if (connectingFollowers.contains(self.getMyId()) && verifier.containsQuorum(connectingFollowers)) {waitingForNewEpoch = false;self.setAcceptedEpoch(epoch); // 设置新的epochconnectingFollowers.notifyAll();} else {long start = Time.currentElapsedTime();if (sid == self.getMyId()) {timeStartWaitForEpoch = start;}long cur = start;long end = start + self.getInitLimit() * self.getTickTime();// 等待initLimit*tickTime毫秒,如果还是waitingForNewEpoch状态抛错,会触发重新选举while (waitingForNewEpoch && cur < end && !quitWaitForEpoch) {connectingFollowers.wait(end - cur);cur = Time.currentElapsedTime();}if (waitingForNewEpoch) {throw new InterruptedException("Timeout while waiting for epoch from quorum");}}return epoch;}
}
waitForEpochAck方法
等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch,follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式:
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {synchronized (electingFollowers) {if (electionFinished) {return;}// 略QuorumVerifier verifier = self.getQuorumVerifier();if (electingFollowers.contains(self.getMyId()) && verifier.containsQuorum(electingFollowers)) {electionFinished = true;electingFollowers.notifyAll();} else {long start = Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit() * self.getTickTime();while (!electionFinished && cur < end) {electingFollowers.wait(end - cur);cur = Time.currentElapsedTime();}if (!electionFinished) {throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");}}}
}
waitForNewLeaderAck方法
等待足够数量的Leader.ACK请求上来,之后才能开始正常通信:
public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException {synchronized (newLeaderProposal.qvAcksetPairs) {if (quorumFormed) {return;}long currentZxid = newLeaderProposal.packet.getZxid();if (zxid != currentZxid) {LOG.error("NEWLEADER ACK from sid: {} is from a different epoch - current 0x{} received 0x{}",sid, Long.toHexString(currentZxid), Long.toHexString(zxid));return;}// Note that addAck already checks that the learner is a PARTICIPANT.newLeaderProposal.addAck(sid);if (newLeaderProposal.hasAllQuorums()) {quorumFormed = true;newLeaderProposal.qvAcksetPairs.notifyAll();} else {long start = Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit() * self.getTickTime();while (!quorumFormed && cur < end) {newLeaderProposal.qvAcksetPairs.wait(end - cur);cur = Time.currentElapsedTime();}if (!quorumFormed) {throw new InterruptedException("Timeout while waiting for NEWLEADER to be acked by quorum");}}}
}
LearnerCnxAcceptor类
启动LearnerCnxAcceptor线程:
// Start thread that waits for connection requests from new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
LearnerCnxAcceptor类:
public void run() {if (!stop.get() && !serverSockets.isEmpty()) {ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size());CountDownLatch latch = new CountDownLatch(serverSockets.size());// 启动LearnerCnxAcceptorHandlerserverSockets.forEach(serverSocket ->executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)));try {latch.await();} catch (InterruptedException ie) {} finally {// 关闭连接、线程池}}
}
LearnerCnxAcceptorHandler类启动监听,接受连接:
class LearnerCnxAcceptorHandler implements Runnable {private ServerSocket serverSocket;private CountDownLatch latch;LearnerCnxAcceptorHandler(ServerSocket serverSocket, CountDownLatch latch) {this.serverSocket = serverSocket;this.latch = latch;}@Overridepublic void run() {try {while (!stop.get()) {acceptConnections(); // 接受连接}} catch (Exception e) {// 关闭} finally {latch.countDown(); // countdown到0会唤醒LearnerCnxAcceptor}}private void acceptConnections() throws IOException {Socket socket = null;boolean error = false;try {socket = serverSocket.accept(); // 接受客户端连接socket.setSoTimeout(self.tickTime * self.initLimit); // timeoutsocket.setTcpNoDelay(nodelay);BufferedInputStream is = new BufferedInputStream(socket.getInputStream());// 封装LearnerHandler对象,与客户端通信LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);fh.start();} catch (Exception e) {// 略} finally {// 略}}
}
LearnerHandler
与客户端通信。
关键字段
protected final Socket sock; // 客户端socket
// Leader对象
final LearnerMaster learnerMaster;
// 给learner的唯一标识
protected long sid = 0;
// 发送队列
final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
// zxid
protected volatile long lastZxid = -1;
// 输出输入流
private BinaryInputArchive ia;
private BinaryOutputArchive oa;
private final BufferedInputStream bufferedInput;
private BufferedOutputStream bufferedOutput;
// learner类型 PARTICIPANT/OBSERVER
private LearnerType learnerType = LearnerType.PARTICIPANT;
run方法
-
接收Leader.FOLLOWERINFO或Leader.OBSERVERINFO数据包,解析type、sid等关键字段,计算newEpoch和newLeaderZxid
-
发送Leader.LEADERINFO数据包,包含newLeaderZxid值
-
读取Leader.ACKEPOCH数据包,解析对端的epoch、zxid
-
根据对端zxid判断是否需要同步数据、如何同步数据(txnlog/committedlog/snapshot)
peerLastZxid = ss.getLastZxid(); // 对端最新processZxid// 同步txnlog或committedlog数据,或者返回true使用SNAP方式同步快照数据 boolean needSnap = syncFollower(peerLastZxid, learnerMaster);// 比对maxCommittedLog、minCommittedLog与peerLastZxid同步txnlog和committedlog数据或者使用SNAP同步数据 // committedlog在内存里面,性能更好
-
同步txnlog和committedlog数据
if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {// 对端lastZxid在minCommittedLog和maxCommittedLog之间// 直接使用committedlog同步Iterator<Proposal> itr = db.getCommittedLog().iterator();currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);needSnap = false; } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {// 使用txnlog和committedLog同步// 默认"最新snapshot文件字节数 * 0.33"long sizeLimit = db.calculateTxnLogSizeLimit();// 从txnlog查找数据,当数据字节数大于sizeLimit将返回空集,强制使用SNAP同步Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);if (txnLogItr.hasNext()) {// 使用txnlog同步currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);// txnlog同步未达到minCommittedLog表示txnlog和committedLog数据存在缺失// 将强制使用SNAP同步if (currentZxid < minCommittedLog) {currentZxid = peerLastZxid;// Clear out currently queued requests and revert to sending a snapshotqueuedPackets.clear();needOpPacket = true;} else {// 使用committedlog同步Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);needSnap = false;}}// 略 }
-
启动转发功能
// Start forwarding leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);// 把toBeApplied数据(待commit状态)发出去 // 添加到forwardingFollowers/observingLearners集
-
如果needSnap为true则需要发送SNAP请求让learner读取输入流加载dataTree
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid(); // 发送SNAP请求 oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); messageTracker.trackSent(Leader.SNAP); bufferedOutput.flush();// 将dataTree序列化发给learner learnerMaster.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush();
-
发送NEWLEADER请求
if (getVersion() < 0x10000) {QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);oa.writeRecord(newLeaderQP, "packet"); } else {QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);queuedPackets.add(newLeaderQP); }
-
启动sendPackets线程:从queuedPackets取消息发给learner节点
-
等待NEWLEADER ACK响应
qp = new QuorumPacket();
ia.readRecord(qp, "packet");messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {return;
}learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
-
等待zookeeperServer启动完成
-
发送UPTODATE请求,告知follower处于最新状态,并且可以开始响应客户端
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
-
启动while循环与客户端保持通信,处理ACK、PING、REVALIDATE、REQUEST等请求
Follower
包含了follower的逻辑。
followLeader方法
the main method called by the follower to follow the leader.
void followLeader() throws InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);self.start_fle = 0;self.end_fle = 0;fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);long connectionTime = 0;boolean completedSync = false;try {self.setZabState(QuorumPeer.ZabState.DISCOVERY);// 查找leader服务器QuorumServer leaderServer = findLeader();try {// 连接leader服务器connectToLeader(leaderServer.addr, leaderServer.hostname);connectionTime = System.currentTimeMillis();// 获取事务idlong newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);if (self.isReconfigStateChange()) {throw new Exception("learned about role change");}// zxid >> 32L得到epochlong newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);if (newEpoch < self.getAcceptedEpoch()) {throw new IOException("Error: Epoch of leader is lower");}long startTime = Time.currentElapsedTime();self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);// 与leader同步数据syncWithLeader(newEpochZxid);self.setZabState(QuorumPeer.ZabState.BROADCAST);completedSync = true;long syncTime = Time.currentElapsedTime() - startTime;ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);if (self.getObserverMasterPort() > 0) {// 创建ObserverMaster用来链式复制,此处不做分析om = new ObserverMaster(self, fzk, self.getObserverMasterPort());om.start();} else {om = null;}// 保持通信QuorumPacket qp = new QuorumPacket();while (this.isRunning()) {readPacket(qp);processPacket(qp); // 处理leader的数据包}} catch (Exception e) {// ...}} finally {// ...}
}
connectToLeader方法
protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {this.leaderAddr = multiAddr;Set<InetSocketAddress> addresses;if (self.isMultiAddressReachabilityCheckEnabled()) {addresses = multiAddr.getAllReachableAddressesOrAll();} else {addresses = multiAddr.getAllAddresses();}ExecutorService executor = Executors.newFixedThreadPool(addresses.size());CountDownLatch latch = new CountDownLatch(addresses.size());AtomicReference<Socket> socket = new AtomicReference<>(null);// 使用LeaderConnector异步建立连接,此处考虑到了多地址的情况addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);try {latch.await();} catch (InterruptedException e) {} finally {// 关闭executor}if (socket.get() == null) {throw new IOException("Failed connect to " + multiAddr);} else {sock = socket.get();sockBeingClosed.set(false);}// 认证 略self.authLearner.authenticate(sock, hostname);// 获取输入输出流leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);// 启动发送线程,基于BlockingQueue的生产者消费者模式if (asyncSending) {startSendingThread();}
}
registerWithLeader方法
protected long registerWithLeader(int pktType) throws IOException {// 1. 先发送一个Leader.FOLLOWERINFO类型数据包:// Leader.FOLLOWERINFO, zxid, sid, protocolVersion, quorumVersionlong lastLoggedZxid = self.getLastLoggedZxid();QuorumPacket qp = new QuorumPacket();qp.setType(pktType);qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());ByteArrayOutputStream bsid = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);boa.writeRecord(li, "LearnerInfo");qp.setData(bsid.toByteArray());writePacket(qp, true); // 把数据包写出去// 2. 读取leader的Leader.LEADERINFO数据包readPacket(qp);final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); // 解析newEpochif (qp.getType() == Leader.LEADERINFO) { // 使用1.0版本协议leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();byte[] epochBytes = new byte[4];final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);if (newEpoch > self.getAcceptedEpoch()) {wrappedEpochBytes.putInt((int) self.getCurrentEpoch());self.setAcceptedEpoch(newEpoch); // 设置acceptEpoch} else if (newEpoch == self.getAcceptedEpoch()) {wrappedEpochBytes.putInt(-1);} else {throw new IOException("...");}// 3. 发送ACKEPOCH类型数据包:// 包含self.lastLoggedZxid和self.currentEpochQuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);writePacket(ackNewEpoch, true);return ZxidUtils.makeZxid(newEpoch, 0);} else {// 低版本分支,略}
}
syncWithLeader方法
-
读leader数据包
-
DIFF - 表示数据已经是最新,可以直接同步新数据
-
SNAP - 将leader输入流(leader的dataTree快照数据)反序列化到zkDb
zk.getZKDatabase().deserializeSnapshot(leaderIs); zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
TRUNC - 将数据truncate到指定位置
boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
-
继续读leader数据包,leader可能使用txnlog或committedlog同步数据
-
同步数据并提交:
-
PROPOSAL - 提案数据会放入packetsNotCommitted集待处理
-
COMMIT/COMMITANDACTIVATE - 提交数据会放入packetsCommitted集待处理
-
INFORM/INFORMANDACTIVATE - 同上
-
NEWLEADER - leader已经停止同步数据,follower会takeSnapshot、setCurrentEpoch、将packetsNotCommitted都提交给zk、响应ACK
// fzk.logRequest(p.hdr, p.rec, p.digest);public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());request.setTxnDigest(digest);if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request); // 待处理的事务集}syncProcessor.processRequest(request); // 持久化磁盘 }
-
UPTODATE - leader会等待足够的follower响应ACK并且确定各种组件已启动之后,发送一个UPTODATE数据包,表示follower已经处于同步状态,停止同步,跳出循环
-
-
处理packetsNotCommitted和packetsCommitted集,处理事务或写磁盘
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) {fzk.logRequest(p.hdr, p.rec, p.digest); } for (Long zxid : packetsCommitted) {fzk.commit(zxid); }// 使用RequestProcessor处理Request // 后续再详细介绍
-
Observer会执行下面代码处理
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) {Long zxid = packetsCommitted.peekFirst();if (p.hdr.getZxid() != zxid) {// log warning message if there is no matching commit// old leader send outstanding proposal to observercontinue;}packetsCommitted.remove();Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);request.setTxnDigest(p.digest);ozk.commitRequest(request); }
processPacket方法
在连接建立、数据处于同步状态后,follower会阻塞读取来自leader的数据包,之后使用processPacket方法处理:
// create a reusable packet to reduce gc impact
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {readPacket(qp);processPacket(qp);
}
processPacket方法:
protected void processPacket(QuorumPacket qp) throws Exception {switch (qp.getType()) {case Leader.PING:ping(qp);break;case Leader.PROPOSAL:ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());TxnHeader hdr = logEntry.getHeader();Record txn = logEntry.getTxn();TxnDigest digest = logEntry.getDigest();if (hdr.getZxid() != lastQueued + 1) {LOG.warn("Got zxid 0x{} expected 0x{}",Long.toHexString(hdr.getZxid()), Long.toHexString(lastQueued + 1));}lastQueued = hdr.getZxid();if (hdr.getType() == OpCode.reconfig) {SetDataTxn setDataTxn = (SetDataTxn) txn;QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));self.setLastSeenQuorumVerifier(qv, true);}// 封装Request使用syncProcessor.processRequest(request)写磁盘fzk.logRequest(hdr, txn, digest);// 略break;case Leader.COMMIT:ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);// 使用commitProcessor.commit(request)提交请求fzk.commit(qp.getZxid());// 略break;case Leader.COMMITANDACTIVATE:// get the new configuration from the requestRequest request = fzk.pendingTxns.element();SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));// get new designated leader from (current) leader's messageByteBuffer buffer = ByteBuffer.wrap(qp.getData());long suggestedLeaderId = buffer.getLong();final long zxid = qp.getZxid();boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);// commit (writes the new config to ZK tree (/zookeeper/config)fzk.commit(zxid);// 略break;case Leader.UPTODATE:// 正常情况下主从复制数据不应该出现这种类型数据包break;case Leader.REVALIDATE:if (om == null || !om.revalidateLearnerSession(qp)) {revalidate(qp);}break;case Leader.SYNC:fzk.sync();break;default:LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));break;}
}
Observer
observeLeader方法
void observeLeader() throws Exception {zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);long connectTime = 0;boolean completedSync = false;try {self.setZabState(QuorumPeer.ZabState.DISCOVERY);// 获取leader或一个observerMaster服务器QuorumServer master = findLearnerMaster();try {// 连接leader或observerMasterconnectToLeader(master.addr, master.hostname);connectTime = System.currentTimeMillis();long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);if (self.isReconfigStateChange()) {throw new Exception("learned about role change");}final long startTime = Time.currentElapsedTime();self.setLeaderAddressAndId(master.addr, master.getId());self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);syncWithLeader(newLeaderZxid);self.setZabState(QuorumPeer.ZabState.BROADCAST);completedSync = true;final long syncTime = Time.currentElapsedTime() - startTime;ServerMetrics.getMetrics().OBSERVER_SYNC_TIME.add(syncTime);QuorumPacket qp = new QuorumPacket();while (this.isRunning() && nextLearnerMaster.get() == null) {readPacket(qp);processPacket(qp);}} catch (Exception e) {closeSocket();// clear pending revalidationspendingRevalidations.clear();}} finally {currentLearnerMaster = null;zk.unregisterJMX(this);if (connectTime != 0) {long connectionDuration = System.currentTimeMillis() - connectTime;messageTracker.dumpToLog(leaderAddr.toString());}}
}
processPacket方法
protected void processPacket(QuorumPacket qp) throws Exception {TxnLogEntry logEntry;TxnHeader hdr;TxnDigest digest;Record txn;switch (qp.getType()) {case Leader.PING:ping(qp);break;case Leader.PROPOSAL:LOG.warn("Ignoring proposal");break;case Leader.COMMIT:LOG.warn("Ignoring commit");break;case Leader.UPTODATE:LOG.error("Received an UPTODATE message after Observer started");break;case Leader.REVALIDATE:revalidate(qp);break;case Leader.SYNC:((ObserverZooKeeperServer) zk).sync();break;case Leader.INFORM:ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);logEntry = SerializeUtils.deserializeTxn(qp.getData());hdr = logEntry.getHeader();txn = logEntry.getTxn();digest = logEntry.getDigest();Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);request.setTxnDigest(digest);ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;obs.commitRequest(request); // 提交break;case Leader.INFORMANDACTIVATE:// reconfig功能使用break;default:LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));break;}
}
Leader与Follower通信总结
Leader FollowerFOLLOWERINFO/OBSERVERINFO数据包发送acceptEpoch<-------------------------------------------------------------------leader计算newEpoch、newZxidLEADERINFO数据包发送最新的zxid------------------------------------------------------------------->follower接受newEpochACKEPOCH数据包发送lastLoggedZxid、currentEpoch<------------------------------------------------------------------leader确定数据同步方式DIFF/TRUNC/SNAP或者同步数据(loop)------------------------------------------------------------------->NEWLEADER数据包------------------------------------------------------------------->ACK数据包<------------------------------------------------------------------UPTODATE数据包------------------------------------------------------------------->PROPOSAL数据包------------------------------------------------------------------->ACK数据包<------------------------------------------------------------------COMMIT数据包------------------------------------------------------------------->ACK数据包<------------------------------------------------------------------