zookeeper源码(07)leader、follower和observer

Leader

构造方法

public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {this.self = self;this.proposalStats = new BufferStats();// 获取节点间通信地址Set<InetSocketAddress> addresses;if (self.getQuorumListenOnAllIPs()) {addresses = self.getQuorumAddress().getWildcardAddresses();} else {addresses = self.getQuorumAddress().getAllAddresses();}// 创建ServerSocket并bind地址,add到serverSockets集,启动LearnerCnxAcceptor时使用addresses.stream().map(address -> createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum())).filter(Optional::isPresent).map(Optional::get).forEach(serverSockets::add);this.zk = zk;
}

lead方法

QuorumPeer使用lead方法启动leader节点,从lead方法入手分析leader流程并分析重要的方法:

void lead() throws IOException, InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);self.start_fle = 0;self.end_fle = 0;zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);try {self.setZabState(QuorumPeer.ZabState.DISCOVERY);self.tick.set(0);// 使用ZooKeeperServer的loadData方法加载db数据// 加载数据、清理session、生成快照(takeSnapshot)zk.loadData();leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());// 启动线程接收Learner连接,创建LearnerHandler与客户端通信cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();// 获取上一次同步最终epoch并计算本次的epoch和zxidlong epoch = getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());// 设置新的zxidzk.setZxid(ZxidUtils.makeZxid(epoch, 0));synchronized (this) {lastProposed = zk.getZxid();}newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();QuorumVerifier curQV = self.getQuorumVerifier();if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {// qv.version == 0try {QuorumVerifier newQV = self.configFromString(curQV.toString());newQV.setVersion(zk.getZxid());self.setLastSeenQuorumVerifier(newQV, true);} catch (Exception e) {throw new IOException(e);}}newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());}// 等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch// follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式waitForEpochAck(self.getMyId(), leaderStateSummary);self.setCurrentEpoch(epoch); // 设置新的currentEpochself.setLeaderAddressAndId(self.getQuorumAddress(), self.getMyId());self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);try {// 等待follower的newLeaderAckwaitForNewLeaderAck(self.getMyId(), zk.getZxid());} catch (InterruptedException e) {// 略return;}// 启动zookeeperServerstartZkServer();self.setZabState(QuorumPeer.ZabState.BROADCAST);self.adminServer.setZooKeeperServer(zk);// We ping twice a tick, so we only update the tick every other iterationboolean tickSkip = true;String shutdownMessage = null;while (true) {synchronized (this) {long start = Time.currentElapsedTime();long cur = start;long end = start + self.tickTime / 2;// 等待tickTime / 2毫秒while (cur < end) {wait(end - cur);cur = Time.currentElapsedTime();}if (!tickSkip) {self.tick.incrementAndGet();}// 用来判断learner同步状态SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier() != null &&self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());}syncedAckSet.addAck(self.getMyId());// 查询learner的ack状态for (LearnerHandler f : getLearners()) {if (f.synced()) {syncedAckSet.addAck(f.getSid());}}if (!this.isRunning()) { // shutdownbreak;}// 判断超半数learner已是同步状态// 1个tickTime周期判断一次if (!tickSkip && !syncedAckSet.hasAllQuorums() &&!(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) &&self.getQuorumVerifier().revalidateOutstandingProp(this, new ArrayList<>(outstandingProposals.values()), lastCommitted))) {// Lost quorum of last committed and/or last proposedshutdownMessage = "Not sufficient followers synced";break;}tickSkip = !tickSkip;}// ping learner// 1个tickTime周期ping两次for (LearnerHandler f : getLearners()) {f.ping();}}if (shutdownMessage != null) {// leader goes in looking stateshutdown(shutdownMessage);}} finally {zk.unregisterJMX(this);}
}

getEpochToPropose方法

获取上一次同步的最终epoch并计算zxid的值:

public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {synchronized (connectingFollowers) {if (!waitingForNewEpoch) {return epoch;}if (lastAcceptedEpoch >= epoch) {epoch = lastAcceptedEpoch + 1; // 更新最新epoch}if (isParticipant(sid)) {connectingFollowers.add(sid);}QuorumVerifier verifier = self.getQuorumVerifier();// 连接的follower超过了半数if (connectingFollowers.contains(self.getMyId()) && verifier.containsQuorum(connectingFollowers)) {waitingForNewEpoch = false;self.setAcceptedEpoch(epoch); // 设置新的epochconnectingFollowers.notifyAll();} else {long start = Time.currentElapsedTime();if (sid == self.getMyId()) {timeStartWaitForEpoch = start;}long cur = start;long end = start + self.getInitLimit() * self.getTickTime();// 等待initLimit*tickTime毫秒,如果还是waitingForNewEpoch状态抛错,会触发重新选举while (waitingForNewEpoch && cur < end && !quitWaitForEpoch) {connectingFollowers.wait(end - cur);cur = Time.currentElapsedTime();}if (waitingForNewEpoch) {throw new InterruptedException("Timeout while waiting for epoch from quorum");}}return epoch;}
}

waitForEpochAck方法

等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch,follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式:

public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {synchronized (electingFollowers) {if (electionFinished) {return;}// 略QuorumVerifier verifier = self.getQuorumVerifier();if (electingFollowers.contains(self.getMyId()) && verifier.containsQuorum(electingFollowers)) {electionFinished = true;electingFollowers.notifyAll();} else {long start = Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit() * self.getTickTime();while (!electionFinished && cur < end) {electingFollowers.wait(end - cur);cur = Time.currentElapsedTime();}if (!electionFinished) {throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");}}}
}

waitForNewLeaderAck方法

等待足够数量的Leader.ACK请求上来,之后才能开始正常通信:

public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException {synchronized (newLeaderProposal.qvAcksetPairs) {if (quorumFormed) {return;}long currentZxid = newLeaderProposal.packet.getZxid();if (zxid != currentZxid) {LOG.error("NEWLEADER ACK from sid: {} is from a different epoch - current 0x{} received 0x{}",sid, Long.toHexString(currentZxid), Long.toHexString(zxid));return;}// Note that addAck already checks that the learner is a PARTICIPANT.newLeaderProposal.addAck(sid);if (newLeaderProposal.hasAllQuorums()) {quorumFormed = true;newLeaderProposal.qvAcksetPairs.notifyAll();} else {long start = Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit() * self.getTickTime();while (!quorumFormed && cur < end) {newLeaderProposal.qvAcksetPairs.wait(end - cur);cur = Time.currentElapsedTime();}if (!quorumFormed) {throw new InterruptedException("Timeout while waiting for NEWLEADER to be acked by quorum");}}}
}

LearnerCnxAcceptor类

启动LearnerCnxAcceptor线程:

// Start thread that waits for connection requests from new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();

LearnerCnxAcceptor类:

public void run() {if (!stop.get() && !serverSockets.isEmpty()) {ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size());CountDownLatch latch = new CountDownLatch(serverSockets.size());// 启动LearnerCnxAcceptorHandlerserverSockets.forEach(serverSocket ->executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)));try {latch.await();} catch (InterruptedException ie) {} finally {// 关闭连接、线程池}}
}

LearnerCnxAcceptorHandler类启动监听,接受连接:

class LearnerCnxAcceptorHandler implements Runnable {private ServerSocket serverSocket;private CountDownLatch latch;LearnerCnxAcceptorHandler(ServerSocket serverSocket, CountDownLatch latch) {this.serverSocket = serverSocket;this.latch = latch;}@Overridepublic void run() {try {while (!stop.get()) {acceptConnections(); // 接受连接}} catch (Exception e) {// 关闭} finally {latch.countDown(); // countdown到0会唤醒LearnerCnxAcceptor}}private void acceptConnections() throws IOException {Socket socket = null;boolean error = false;try {socket = serverSocket.accept(); // 接受客户端连接socket.setSoTimeout(self.tickTime * self.initLimit); // timeoutsocket.setTcpNoDelay(nodelay);BufferedInputStream is = new BufferedInputStream(socket.getInputStream());// 封装LearnerHandler对象,与客户端通信LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);fh.start();} catch (Exception e) {// 略} finally {// 略}}
}

LearnerHandler

与客户端通信。

关键字段

protected final Socket sock; // 客户端socket
// Leader对象
final LearnerMaster learnerMaster;
// 给learner的唯一标识
protected long sid = 0;
// 发送队列
final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
// zxid
protected volatile long lastZxid = -1;
// 输出输入流
private BinaryInputArchive ia;
private BinaryOutputArchive oa;
private final BufferedInputStream bufferedInput;
private BufferedOutputStream bufferedOutput;
// learner类型 PARTICIPANT/OBSERVER
private LearnerType learnerType = LearnerType.PARTICIPANT;

run方法

  1. 接收Leader.FOLLOWERINFO或Leader.OBSERVERINFO数据包,解析type、sid等关键字段,计算newEpoch和newLeaderZxid

  2. 发送Leader.LEADERINFO数据包,包含newLeaderZxid值

  3. 读取Leader.ACKEPOCH数据包,解析对端的epoch、zxid

  4. 根据对端zxid判断是否需要同步数据、如何同步数据(txnlog/committedlog/snapshot)

    peerLastZxid = ss.getLastZxid(); // 对端最新processZxid// 同步txnlog或committedlog数据,或者返回true使用SNAP方式同步快照数据
    boolean needSnap = syncFollower(peerLastZxid, learnerMaster);// 比对maxCommittedLog、minCommittedLog与peerLastZxid同步txnlog和committedlog数据或者使用SNAP同步数据
    // committedlog在内存里面,性能更好
    
  5. 同步txnlog和committedlog数据

    if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {// 对端lastZxid在minCommittedLog和maxCommittedLog之间// 直接使用committedlog同步Iterator<Proposal> itr = db.getCommittedLog().iterator();currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);needSnap = false;
    } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {// 使用txnlog和committedLog同步// 默认"最新snapshot文件字节数 * 0.33"long sizeLimit = db.calculateTxnLogSizeLimit();// 从txnlog查找数据,当数据字节数大于sizeLimit将返回空集,强制使用SNAP同步Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);if (txnLogItr.hasNext()) {// 使用txnlog同步currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);// txnlog同步未达到minCommittedLog表示txnlog和committedLog数据存在缺失// 将强制使用SNAP同步if (currentZxid < minCommittedLog) {currentZxid = peerLastZxid;// Clear out currently queued requests and revert to sending a snapshotqueuedPackets.clear();needOpPacket = true;} else {// 使用committedlog同步Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);needSnap = false;}}// 略
    }
    
  6. 启动转发功能

    // Start forwarding
    leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);// 把toBeApplied数据(待commit状态)发出去
    // 添加到forwardingFollowers/observingLearners集
    
  7. 如果needSnap为true则需要发送SNAP请求让learner读取输入流加载dataTree

    long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
    // 发送SNAP请求
    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
    messageTracker.trackSent(Leader.SNAP);
    bufferedOutput.flush();// 将dataTree序列化发给learner
    learnerMaster.getZKDatabase().serializeSnapshot(oa);
    oa.writeString("BenWasHere", "signature");
    bufferedOutput.flush();
    
  8. 发送NEWLEADER请求

    if (getVersion() < 0x10000) {QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);oa.writeRecord(newLeaderQP, "packet");
    } else {QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);queuedPackets.add(newLeaderQP);
    }
    
  9. 启动sendPackets线程:从queuedPackets取消息发给learner节点

  10. 等待NEWLEADER ACK响应

qp = new QuorumPacket();
ia.readRecord(qp, "packet");messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {return;
}learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
  1. 等待zookeeperServer启动完成

  2. 发送UPTODATE请求,告知follower处于最新状态,并且可以开始响应客户端

    queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
    
  3. 启动while循环与客户端保持通信,处理ACK、PING、REVALIDATE、REQUEST等请求

Follower

包含了follower的逻辑。

followLeader方法

the main method called by the follower to follow the leader.

void followLeader() throws InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);self.start_fle = 0;self.end_fle = 0;fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);long connectionTime = 0;boolean completedSync = false;try {self.setZabState(QuorumPeer.ZabState.DISCOVERY);// 查找leader服务器QuorumServer leaderServer = findLeader();try {// 连接leader服务器connectToLeader(leaderServer.addr, leaderServer.hostname);connectionTime = System.currentTimeMillis();// 获取事务idlong newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);if (self.isReconfigStateChange()) {throw new Exception("learned about role change");}// zxid >> 32L得到epochlong newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);if (newEpoch < self.getAcceptedEpoch()) {throw new IOException("Error: Epoch of leader is lower");}long startTime = Time.currentElapsedTime();self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);// 与leader同步数据syncWithLeader(newEpochZxid);self.setZabState(QuorumPeer.ZabState.BROADCAST);completedSync = true;long syncTime = Time.currentElapsedTime() - startTime;ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);if (self.getObserverMasterPort() > 0) {// 创建ObserverMaster用来链式复制,此处不做分析om = new ObserverMaster(self, fzk, self.getObserverMasterPort());om.start();} else {om = null;}// 保持通信QuorumPacket qp = new QuorumPacket();while (this.isRunning()) {readPacket(qp);processPacket(qp); // 处理leader的数据包}} catch (Exception e) {// ...}} finally {// ...}
}

connectToLeader方法

protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {this.leaderAddr = multiAddr;Set<InetSocketAddress> addresses;if (self.isMultiAddressReachabilityCheckEnabled()) {addresses = multiAddr.getAllReachableAddressesOrAll();} else {addresses = multiAddr.getAllAddresses();}ExecutorService executor = Executors.newFixedThreadPool(addresses.size());CountDownLatch latch = new CountDownLatch(addresses.size());AtomicReference<Socket> socket = new AtomicReference<>(null);// 使用LeaderConnector异步建立连接,此处考虑到了多地址的情况addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);try {latch.await();} catch (InterruptedException e) {} finally {// 关闭executor}if (socket.get() == null) {throw new IOException("Failed connect to " + multiAddr);} else {sock = socket.get();sockBeingClosed.set(false);}// 认证 略self.authLearner.authenticate(sock, hostname);// 获取输入输出流leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);// 启动发送线程,基于BlockingQueue的生产者消费者模式if (asyncSending) {startSendingThread();}
}

registerWithLeader方法

protected long registerWithLeader(int pktType) throws IOException {// 1. 先发送一个Leader.FOLLOWERINFO类型数据包:// Leader.FOLLOWERINFO, zxid, sid, protocolVersion, quorumVersionlong lastLoggedZxid = self.getLastLoggedZxid();QuorumPacket qp = new QuorumPacket();qp.setType(pktType);qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());ByteArrayOutputStream bsid = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);boa.writeRecord(li, "LearnerInfo");qp.setData(bsid.toByteArray());writePacket(qp, true); // 把数据包写出去// 2. 读取leader的Leader.LEADERINFO数据包readPacket(qp);final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); // 解析newEpochif (qp.getType() == Leader.LEADERINFO) { // 使用1.0版本协议leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();byte[] epochBytes = new byte[4];final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);if (newEpoch > self.getAcceptedEpoch()) {wrappedEpochBytes.putInt((int) self.getCurrentEpoch());self.setAcceptedEpoch(newEpoch); // 设置acceptEpoch} else if (newEpoch == self.getAcceptedEpoch()) {wrappedEpochBytes.putInt(-1);} else {throw new IOException("...");}// 3. 发送ACKEPOCH类型数据包:// 包含self.lastLoggedZxid和self.currentEpochQuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);writePacket(ackNewEpoch, true);return ZxidUtils.makeZxid(newEpoch, 0);} else {// 低版本分支,略}
}

syncWithLeader方法

  1. 读leader数据包

    • DIFF - 表示数据已经是最新,可以直接同步新数据

    • SNAP - 将leader输入流(leader的dataTree快照数据)反序列化到zkDb

      zk.getZKDatabase().deserializeSnapshot(leaderIs);
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
      
    • TRUNC - 将数据truncate到指定位置

      boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
      
  2. 继续读leader数据包,leader可能使用txnlog或committedlog同步数据

  3. 同步数据并提交:

    • PROPOSAL - 提案数据会放入packetsNotCommitted集待处理

    • COMMIT/COMMITANDACTIVATE - 提交数据会放入packetsCommitted集待处理

    • INFORM/INFORMANDACTIVATE - 同上

    • NEWLEADER - leader已经停止同步数据,follower会takeSnapshot、setCurrentEpoch、将packetsNotCommitted都提交给zk、响应ACK

      // fzk.logRequest(p.hdr, p.rec, p.digest);public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());request.setTxnDigest(digest);if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request); // 待处理的事务集}syncProcessor.processRequest(request); // 持久化磁盘
      }
      
    • UPTODATE - leader会等待足够的follower响应ACK并且确定各种组件已启动之后,发送一个UPTODATE数据包,表示follower已经处于同步状态,停止同步,跳出循环

  4. 处理packetsNotCommitted和packetsCommitted集,处理事务或写磁盘

    FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
    for (PacketInFlight p : packetsNotCommitted) {fzk.logRequest(p.hdr, p.rec, p.digest);
    }
    for (Long zxid : packetsCommitted) {fzk.commit(zxid);
    }// 使用RequestProcessor处理Request
    // 后续再详细介绍
    
  5. Observer会执行下面代码处理

    ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
    for (PacketInFlight p : packetsNotCommitted) {Long zxid = packetsCommitted.peekFirst();if (p.hdr.getZxid() != zxid) {// log warning message if there is no matching commit// old leader send outstanding proposal to observercontinue;}packetsCommitted.remove();Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);request.setTxnDigest(p.digest);ozk.commitRequest(request);
    }
    

processPacket方法

在连接建立、数据处于同步状态后,follower会阻塞读取来自leader的数据包,之后使用processPacket方法处理:

// create a reusable packet to reduce gc impact
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {readPacket(qp);processPacket(qp);
}

processPacket方法:

protected void processPacket(QuorumPacket qp) throws Exception {switch (qp.getType()) {case Leader.PING:ping(qp);break;case Leader.PROPOSAL:ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());TxnHeader hdr = logEntry.getHeader();Record txn = logEntry.getTxn();TxnDigest digest = logEntry.getDigest();if (hdr.getZxid() != lastQueued + 1) {LOG.warn("Got zxid 0x{} expected 0x{}",Long.toHexString(hdr.getZxid()), Long.toHexString(lastQueued + 1));}lastQueued = hdr.getZxid();if (hdr.getType() == OpCode.reconfig) {SetDataTxn setDataTxn = (SetDataTxn) txn;QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));self.setLastSeenQuorumVerifier(qv, true);}// 封装Request使用syncProcessor.processRequest(request)写磁盘fzk.logRequest(hdr, txn, digest);// 略break;case Leader.COMMIT:ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);// 使用commitProcessor.commit(request)提交请求fzk.commit(qp.getZxid());// 略break;case Leader.COMMITANDACTIVATE:// get the new configuration from the requestRequest request = fzk.pendingTxns.element();SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));// get new designated leader from (current) leader's messageByteBuffer buffer = ByteBuffer.wrap(qp.getData());long suggestedLeaderId = buffer.getLong();final long zxid = qp.getZxid();boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);// commit (writes the new config to ZK tree (/zookeeper/config)fzk.commit(zxid);// 略break;case Leader.UPTODATE:// 正常情况下主从复制数据不应该出现这种类型数据包break;case Leader.REVALIDATE:if (om == null || !om.revalidateLearnerSession(qp)) {revalidate(qp);}break;case Leader.SYNC:fzk.sync();break;default:LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));break;}
}

Observer

observeLeader方法

void observeLeader() throws Exception {zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);long connectTime = 0;boolean completedSync = false;try {self.setZabState(QuorumPeer.ZabState.DISCOVERY);// 获取leader或一个observerMaster服务器QuorumServer master = findLearnerMaster();try {// 连接leader或observerMasterconnectToLeader(master.addr, master.hostname);connectTime = System.currentTimeMillis();long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);if (self.isReconfigStateChange()) {throw new Exception("learned about role change");}final long startTime = Time.currentElapsedTime();self.setLeaderAddressAndId(master.addr, master.getId());self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);syncWithLeader(newLeaderZxid);self.setZabState(QuorumPeer.ZabState.BROADCAST);completedSync = true;final long syncTime = Time.currentElapsedTime() - startTime;ServerMetrics.getMetrics().OBSERVER_SYNC_TIME.add(syncTime);QuorumPacket qp = new QuorumPacket();while (this.isRunning() && nextLearnerMaster.get() == null) {readPacket(qp);processPacket(qp);}} catch (Exception e) {closeSocket();// clear pending revalidationspendingRevalidations.clear();}} finally {currentLearnerMaster = null;zk.unregisterJMX(this);if (connectTime != 0) {long connectionDuration = System.currentTimeMillis() - connectTime;messageTracker.dumpToLog(leaderAddr.toString());}}
}

processPacket方法

protected void processPacket(QuorumPacket qp) throws Exception {TxnLogEntry logEntry;TxnHeader hdr;TxnDigest digest;Record txn;switch (qp.getType()) {case Leader.PING:ping(qp);break;case Leader.PROPOSAL:LOG.warn("Ignoring proposal");break;case Leader.COMMIT:LOG.warn("Ignoring commit");break;case Leader.UPTODATE:LOG.error("Received an UPTODATE message after Observer started");break;case Leader.REVALIDATE:revalidate(qp);break;case Leader.SYNC:((ObserverZooKeeperServer) zk).sync();break;case Leader.INFORM:ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);logEntry = SerializeUtils.deserializeTxn(qp.getData());hdr = logEntry.getHeader();txn = logEntry.getTxn();digest = logEntry.getDigest();Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);request.setTxnDigest(digest);ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;obs.commitRequest(request); // 提交break;case Leader.INFORMANDACTIVATE:// reconfig功能使用break;default:LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));break;}
}

Leader与Follower通信总结

Leader                                                                           FollowerFOLLOWERINFO/OBSERVERINFO数据包发送acceptEpoch<-------------------------------------------------------------------leader计算newEpoch、newZxidLEADERINFO数据包发送最新的zxid------------------------------------------------------------------->follower接受newEpochACKEPOCH数据包发送lastLoggedZxid、currentEpoch<------------------------------------------------------------------leader确定数据同步方式DIFF/TRUNC/SNAP或者同步数据(loop)------------------------------------------------------------------->NEWLEADER数据包------------------------------------------------------------------->ACK数据包<------------------------------------------------------------------UPTODATE数据包------------------------------------------------------------------->PROPOSAL数据包------------------------------------------------------------------->ACK数据包<------------------------------------------------------------------COMMIT数据包------------------------------------------------------------------->ACK数据包<------------------------------------------------------------------

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/655974.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ANAPF有源电力滤波器选型计算——安科瑞赵嘉敏

配电系统中谐波电流的计算涉及很多因素。对于改造项目&#xff0c;可使用专业电能质量分析仪测得所需谐波数据&#xff1b;对于新建项目&#xff0c;设计人员并不能直接获得供电系统的的谐波数据&#xff0c;因此&#xff0c;我司研发人员通过众多不同行业、不同类型的项目&…

MySQL原理(二)存储引擎(2)MyISAM

一、MyISAM介绍 1、介绍&#xff1a; MyISAM引擎是MySQL5.5版本之前的数据库所默认的数据表引擎。每一个采用MyISAM引擎的数据表在实际存储中都是由三个文件组成&#xff0c;分别是frm文件保存表的结构&#xff0c;MYD文件保存表的数据、MYI文件保存表的索引&#xff0c;文件…

Android C++生成complier_command.json

Android C 程序开发现状 在 Android 下开发 C 程序&#xff0c;我见过绝大多数人都是不使用任何语法插件&#xff0c;就靠硬写&#xff0c;写完之后再根据编译报错来修改语法错误。这也怪不得程序员&#xff0c;一方面&#xff0c;Android 使用 Arm 平台的 clang 编译器&#x…

JSP仓储管理系统myeclipse定制开发SQLServer数据库网页模式java编程jdbc

一、源码特点 JSP仓储管理系统系统是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库 &#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为SQLServer2008&#x…

扩展学习|一文明晰推荐系统应用开发核心技术发展

文献来源&#xff1a;Lu J, Wu D, Mao M, et al. Recommender system application developments: a survey[J]. Decision support systems, 2015, 74: 12-32. 主题&#xff1a;关于推荐系统应用开发的调查研究 关键词:推荐系统、电子服务个性化、电子商务、电子学习、电子政务 …

除了Adobe之外,还有什么方法可以将Excel转为PDF?

前言 Java是一种广泛使用的编程语言&#xff0c;它在企业级应用开发中发挥着重要作用。而在实际的开发过程中&#xff0c;我们常常需要处理各种数据格式转换的需求。今天小编为大家介绍下如何使用葡萄城公司的的Java API 组件GrapeCity Documents for Excel&#xff08;以下简…

Java递归结构

1.递归是一种常见的算法思路&#xff0c;在很多算法中都会用到。比如&#xff1a;深度优先搜索&#xff08;DFS&#xff09;等。 2.递归的基本思想就是“自己调用自己”。 递归结构包括两个部分&#xff1a; 递归头&#xff1a;什么时候不调用自身方法。如果没有递归头&#…

人机协同的效果并不总能达到预期效果

当前的人工智能技术仍然存在一定的局限性&#xff0c;尤其是在处理复杂的任务或灵活适应环境变化时&#xff0c;往往难以达到人类的水平&#xff0c;还有&#xff0c;人类参与人机协同时可能存在主观意识、情绪波动或偏见等因素&#xff0c;这些因素都可能会影响到人机协同的结…

BL808学习日志-3-DPI-RGB屏幕使用-LVGL D0

一、DPI-RGB驱动 BL808的手册上显示是支持RGB565屏幕显示输出的&#xff0c;但是一直没找到网上的使用例程。且官方的SDK显示也是能够使用的&#xff0c;只是缺少了驱动。这一部分驱动在SIPEED的SDK中已经内置了&#xff0c;今天就是简单的点亮一个800*480 RGB565的屏幕。 二、…

Java基础数据结构之Lambda表达式

一.语法 基本语法&#xff1a;(parameters)->expression或者(parameters)->{statements;} parameters&#xff1a;类似方法中的形参列表&#xff0c;这里的参数是函数式接口里面的参数。这里的参数可以明确说明&#xff0c;也可以不声明而由JVM隐含的推断。当只有一个推…

C++实习报告(集合交,并,差运算的实现)

一、问题描述 1、问题描述 集合元素类型可以是整数、字符串和小数&#xff0c;实现集合的交、并、差运算。 2、功能要求 &#xff08;1&#xff09;用户能够输入两个集合元素&#xff1b; &#xff08;2&#xff09;能够完成集合的交、并、差运算&#xff1b; &#xff08;3&a…

07. STP的基本配置

文章目录 一. 初识STP1.1. STP概述1.2. STP的出现1.3. STP的作用1.4. STP的专业术语1.5. BPDU的报文格式1.6. STP的选择原则&#xff08;1&#xff09;选择根桥网桥原则&#xff08;2&#xff09;选择根端口原则 1.7. 端口状态1.8. STP报文类型1.9. STP的收敛时间 二. 实验专题…

【MySQL 流浪之旅】 第六讲 浅谈 MySQL 锁

系列文章目录 【MySQL 流浪之旅】 第一讲 MySQL 安装【MySQL 流浪之旅】 第二讲 MySQL 基础操作【MySQL 流浪之旅】 第三讲 MySQL 基本工具【MySQL 流浪之旅】 第四讲 MySQL 逻辑备份【MySQL 流浪之旅】 第五讲 数据库设计的三个范式 目录 系列文章目录 一、什么是锁&#x…

07-Nacos-接入Mysql实现持久化

1、默认内嵌的数据库 Derby 存于/data目录 2、扩展仅支持Mysql 5.6.5 执行Nacos中的SQL脚本&#xff0c;该脚本是Nacos-server文件夹中的nacos-mysql.sql 详见 01-Nacos源码打包、部署-CSDN博客 3、修改配置文件 Nacos-server中的conf目录下&#xff0c;application.proper…

【协程报错】TypeError: An asyncio.Future, a coroutine or an awaitable is required

报错语句&#xff1a; loop.run_until_complete(tasks) 改成&#xff1a; loop.run_until_complete(asyncio.wait(tasks))说明&#xff1a; 解决方法&#xff1a;将任务对象封装到asyncio.wait中。这里wait函数的作用是给tasks列表中每一个任务对象赋予一个可被挂起的权限。 …

5种ai智能自动写作,让你的写作效率秒拔高

写作是一项需要耗费大量时间和精力的任务&#xff0c;但现在有了AI智能自动写作软件&#xff0c;我们可以轻松提高写作效率。在国内市场上&#xff0c;有许多优秀的写作软件可供选择。本文将向您推荐5款国内的写作软件&#xff0c;并详细说明每款软件的功能特点。 爱制作AI 使…

小项目:使用MQTT上传温湿度到Onenet服务器

前言 我们之前分别编写了 DHT11、ESP8266 和 MQTT 的代码&#xff0c;现在我们将它们仨整合在一起&#xff0c;来做一个温湿度检测小项目。这个项目可以实时地将 DHT11 传感器获取到的温湿度数据上传到 OneNET 平台。通过登录 OneNET&#xff0c;我们随时随地可以查看温湿度数…

Hive 排名函数ROW_NUMBER、RANK()、DENSE_RANK等功能介绍、对比和举例

目录 1. ROW_NUMBER() 2. RANK() 3. DENSE_RANK() 4. NTILE() 5. CUME_DIST() 6. PERCENT_RANK() 1. ROW_NUMBER() 功能&#xff1a;ROW_NUMBER() 函数为每个分组内的行提供唯一的序列号&#xff0c;从1开始。如果在 OVER() 子句中使用 ORDER BY 语句&#xff0c;它将根据…

山体滑坡在线安全监测预警系统(解决方案)

在近年来&#xff0c;随着全球气候变化的影响&#xff0c;山体滑坡等自然灾害频发&#xff0c;给人们的生命财产安全带来了严重威胁。为了有效预防和减少山体滑坡带来的危害&#xff0c;许多地方开始在山上安装山体滑坡在线安全监测预警系统&#xff08;解决方案&#xff09;。…

Java版大厂算法题1——数字颠倒

问题描述 输入一个整数&#xff0c;将这个整数以字符串的形式逆序输出&#xff0c;程序不考虑负数的情况&#xff0c;若数字含有0&#xff0c;则逆序形式也含有0。如果输入为100&#xff0c;则输出为001。 数据范围&#xff1a;0<n<(2^30)-1 * 输入描述&#xff1a;输入…