百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

zookeeper的Leader选举源码解析(zookeeper选取机制)

liebian365 2025-04-07 15:42 4 浏览 0 评论

作者:京东物流 梁吉超

zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对leader选举源进行解析,让读者们了解如何利用BIO通信机制,多线程多层队列实现高性能架构。

01Leader选举机制

Leader选举机制采用半数选举算法。

每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。

02Leader选举集群配置

1. 重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg

2. 修改zoo.cfg文件,修改值如下:

【plain】
zoo1.cfg文件内容:
dataDir=/export/data/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer


zoo2.cfg文件内容:
dataDir=/export/data/zookeeper-2
clientPort=2182
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer


zoo3.cfg文件内容:
dataDir=/export/data/zookeeper-3
clientPort=2183
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer


zoo4.cfg文件内容:
dataDir=/export/data/zookeeper-4
clientPort=2184
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer

3. server.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识

  • participant默认参与选举标识,可不写. observer不参与选举

4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创建myid文件,文件内容分别写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。

5. 启动三个zookeeper实例:

  • bin/zkServer.sh start conf/zoo1.cfg
  • bin/zkServer.sh start conf/zoo2.cfg
  • bin/zkServer.sh start conf/zoo3.cfg

6. 每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就可以知道其作为服务端身份信息sid以及集群中有多少个实例参与选举。

03Leader选举流程

图1 第一轮到第二轮投票流程

前提:

设定票据数据格式vote(sid,zxid,epoch)

  • sid是Server ID每台服务的唯一标识,是myid文件内容;
  • zxid是数据事务id号;
  • epoch为选举周期,为方便理解下面讲解内容暂定为1初次选举,不写入下面内容里。

按照顺序启动sid=1,sid=2节点

第一轮投票:

1. sid=1节点:初始选票为自己,将选票vote(1,0)发送给sid=2节点;

2. sid=2节点:初始选票为自己,将选票vote(2,0)发送给sid=1节点;

3. sid=1节点:收到sid=2节点选票vote(2,0)和当前自己的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid相同,选举最大sid。当前投票选举结果为vote(2,0),sid=1节点的选票变为vote(2,0);

4. sid=2节点:收到sid=1节点选票vote(1,0)和当前自己的选票vote(2,0),参照上述选举方式,选举结果为vote(2,0),sid=2节点的选票不变;

5. 第一轮投票选举结束。

第二轮投票:

1. sid=1节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;

2. sid=2节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;

3. sid=1节点:收到sid=2节点选票vote(2,0)和自己的选票vote(2,0), 按照半数选举算法,总共3个节点参与选举,已有2个节点选举出相同选票,推举sid=2节点为Leader,自己角色变为Follower;

4. sid=2节点:收到sid=1节点选票vote(2,0)和自己的选票vote(2,0),按照半数选举算法推举sid=2节点为Leader,自己角色变为Leader。

这时启动sid=3节点后,集群里已经选举出leader,sid=1和sid=2节点会将自己的leader选票发回给sid=3节点,通过半数选举结果还是sid=2节点为leader。

3.1 Leader选举采用多层队列架构

zookeeper选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端sid分成了多个队列,是为了避免给每台服务端发送消息互相影响。比如对某台机器发送不成功不会影响正常服务端的发送。

图2 多层队列上下关系交互流程图


04解析代码入口类

通过查看zkServer.sh文件内容找到服务启动类:

org.apache.zookeeper.server.quorum.QuorumPeerMain

05选举流程代码解析

图3 选举代码实现流程图

1. 加载配置文件QuorumPeerConfig.parse(path);

针对 Leader选举关键配置信息如下:

  • 读取dataDir目录找到myid文件内容,设置当前应用sid标识,做为投票人身份信息。下面遇到myid变量为当前节点自己sid标识。
    • 设置peerType当前应用是否参与选举
  • new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers参与选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.
【Java】
public QuorumMaj(Properties props) throws ConfigException {
        for (Entry entry : props.entrySet()) {
            String key = entry.getKey().toString();
            String value = entry.getValue().toString();
            //读取集群配置文件中的server.开头的应用实例配置信息
            if (key.startsWith("server.")) {
                int dot = key.indexOf('.');
                long sid = Long.parseLong(key.substring(dot + 1));
                QuorumServer qs = new QuorumServer(sid, value);
                allMembers.put(Long.valueOf(sid), qs);
                if (qs.type == LearnerType.PARTICIPANT)
//应用实例绑定的角色为PARTICIPANT意为参与选举
                    votingMembers.put(Long.valueOf(sid), qs);
                else {
                    //观察者成员
                    observingMembers.put(Long.valueOf(sid), qs);
                }
            } else if (key.equals("version")) {
                version = Long.parseLong(value, 16);
            }
        }
        //过半基数
        half = votingMembers.size() / 2;
    }

2.
QuorumPeerMain.runFromConfig(config) 启动服务;

3.
QuorumPeer.startLeaderElection() 开启选举服务;

  • 设置当前选票new Vote(sid,zxid,epoch)
【plain】
synchronized public void startLeaderElection(){
try {
           if (getPeerState() == ServerState.LOOKING) {
               //首轮:当前节点默认投票对象为自己
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }
//........
}
  • 创建选举管理类:QuorumCnxnManager;
  • 初始化recvQueue接收投票队列(第二层传输队列);
  • 初始化queueSendMap按sid发送投票队列(第二层传输队列);
  • 初始化senderWorkerMap发送投票工作线程容器,表示着与sid投票节点已连接;
  • 初始化选举监听线程类QuorumCnxnManager.Listener。
【Java】
//QuorumPeer.createCnxnManager()
public QuorumCnxManager(QuorumPeer self,
                        final long mySid,
                        Map view,
                        QuorumAuthServer authServer,
                        QuorumAuthLearner authLearner,
                        int socketTimeout,
                        boolean listenOnAllIPs,
                        int quorumCnxnThreadsSize,
                        boolean quorumSaslAuthEnabled) {
    //接收投票队列(第二层传输队列)
    this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY);
    //按sid发送投票队列(第二层传输队列)
    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue>();
    //发送投票工作线程容器,表示着与sid投票节点已连接 
    this.senderWorkerMap = new ConcurrentHashMap();
    this.lastMessageSent = new ConcurrentHashMap();


    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }


    this.self = self;


    this.mySid = mySid;
    this.socketTimeout = socketTimeout;
    this.view = view;
    this.listenOnAllIPs = listenOnAllIPs;


    initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
            quorumSaslAuthEnabled);
    // Starts listener thread that waits for connection requests 
    //创建选举监听线程 接收选举投票请求
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();//启动选举监听线程
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
return le;}

4. 开启选举监听线程
QuorumCnxnManager.Listener;

  • 创建ServerSockket等待大于自己sid节点连接,连接信息存储到senderWorkerMap
  • sid>self.sid才可以连接过来。
【Java】
//上面的listener.start()执行后,选择此方法
public void run() {
    int numRetries = 0;
    InetSocketAddress addr;
    Socket client = null;
    while((!shutdown) && (numRetries < 3 try ss='new' serversocket ss.setreuseaddresstrue if self.getquorumlistenonallips int port='self.getElectionAddress().getPort();' addr='new' inetsocketaddressport else resolve hostname for this server in case the underlying ip address has changed. self.recreatesocketaddressesself.getid addr='self.getElectionAddress();' log.infomy election bind port: addr.tostring setnameaddr.tostring ss.bindaddr while shutdown client='ss.accept();' setsockoptsclient log.inforeceived connection request client.getremotesocketaddress receive and handle the connection request asynchronously if the quorum sasl authentication is enabled. this is required because sasl server authentication process may take few seconds to finish this may delay next peer connection requests. if quorumsaslauthenabled receiveconnectionasyncclient else receiveconnectionclient numretries='0;' catch ioexception e if shutdown break log.errorexception while listening e numretries try ss.close thread.sleep1000 catch ioexception ie log.errorerror closing server socket ie catch interruptedexception ie log.errorinterrupted while sleeping. ignoring exception ie closesocketclient log.infoleaving listener if shutdown log.erroras im leaving the listener thread i wont be able to participate in leader election any longer: self.getelectionaddress else if ss clean up for shutdown. try ss.close catch ioexception ie dont log an error for shutdown. log.debugerror closing server socket ie receiveconnection->handleConnection(...)
private void handleConnection(Socket sock, DataInputStream din)
            throws IOException {
//...省略
     if (sid < self.getId()) {
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }


            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: {}", sid);
            closeSocket(sock);


            if (electionAddr != null) {
                connectOne(sid, electionAddr);
            } else {
                connectOne(sid);
            }


        } else { // Otherwise start worker threads to receive data.
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);


            SendWorker vsw = senderWorkerMap.get(sid);


            if (vsw != null) {
                vsw.finish();
            }
  //存储连接信息
            senderWorkerMap.put(sid, sw);


            queueSendMap.putIfAbsent(sid,
                    new ArrayBlockingQueue(SEND_CAPACITY));


            sw.start();
            rw.start();
     }
}

5. 创建FastLeaderElection快速选举服务;

  • 初始选票发送队列sendqueue(第一层队列)
  • 初始选票接收队列recvqueue(第一层队列)
  • 创建线程WorkerSender
  • 创建线程WorkerReceiver
【Java】
//FastLeaderElection.starter
private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;
    //发送队列sendqueue(第一层队列)
    sendqueue = new LinkedBlockingQueue();
    //接收队列recvqueue(第一层队列)
    recvqueue = new LinkedBlockingQueue();
    this.messenger = new Messenger(manager);
}
//new Messenger(manager)
Messenger(QuorumCnxManager manager) {
    //创建线程WorkerSender
    this.ws = new WorkerSender(manager);


    this.wsThread = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    this.wsThread.setDaemon(true);
    //创建线程WorkerReceiver
    this.wr = new WorkerReceiver(manager);


    this.wrThread = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    this.wrThread.setDaemon(true);
}

6. 开启WorkerSender和WorkerReceiver线程。

WorkerSender线程自旋获取sendqueue第一层队列元素

  • sendqueue队列元素内容为相关选票信息详见ToSend类;
  • 首先判断选票sid是否和自己sid值相同,相等直接放入到recvQueue队列中;
  • 不相同将sendqueue队列元素转储到queueSendMap第二层传输队列中。
【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{
//...
  public void run() {
    while (!stop) {
        try {
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;
  //将投票信息发送出去
            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
  }
}
//QuorumCnxManager#toSend
public void toSend(Long sid, ByteBuffer b) {
    /*
     * If sending message to myself, then simply enqueue it (loopback).
     */
    if (this.mySid == sid) {
         b.position(0);
         addToRecvQueue(new Message(b.duplicate(), sid));
        /*
         * Otherwise send to the corresponding thread to send.
         */
    } else {
         /*
          * Start a new connection if doesn't have one already.
          */
         ArrayBlockingQueue bq = new ArrayBlockingQueue(
            SEND_CAPACITY);
         ArrayBlockingQueue oldq = queueSendMap.putIfAbsent(sid, bq);
         //转储到queueSendMap第二层传输队列中
         if (oldq != null) {
             addToSendQueue(oldq, b);
         } else {
             addToSendQueue(bq, b);
         }
         connectOne(sid);     
    }
}

WorkerReceiver线程自旋获取recvQueue第二层传输队列元素转存到recvqueue第一层队列中。

【Java】
//WorkerReceiver
public void run() {
    Message response;
    while (!stop) {
      // Sleeps on receive
      try {
          //自旋获取recvQueue第二层传输队列元素
          response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
          if(response == null) continue;
          // The current protocol and two previous generations all send at least 28 bytes
          if (response.buffer.capacity() < 28) {
              LOG.error("Got a short response: " + response.buffer.capacity());
              continue;
          }
          //...
  if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
         //第二层传输队列元素转存到recvqueue第一层队列中
         recvqueue.offer(n);
         //...
      }
    }
//...
}


06选举核心逻辑

1. 启动线程QuorumPeer

开始Leader选举投票makeLEStrategy().lookForLeader();

sendNotifications()向其它节点发送选票信息,选票信息存储到sendqueue队列中。sendqueue队列由WorkerSender线程处理。

【plain】
//QuorunPeer.run
//...
try {
   reconfigFlagClear();
    if (shuttingDownLE) {
       shuttingDownLE = false;
       startLeaderElection();
       }
    //makeLEStrategy().lookForLeader() 发送投票
    setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
    setPeerState(ServerState.LOOKING);
}  
//...
//FastLeaderElection.lookLeader
public Vote lookForLeader() throws InterruptedException {
//...
  //向其他应用发送投票
sendNotifications();
//...
}


private void sendNotifications() {
    //获取应用节点
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        QuorumVerifier qv = self.getQuorumVerifier();
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch, qv.toString().getBytes());
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        //储存投票信息
        sendqueue.offer(notmsg);
    }
}


class WorkerSender extends ZooKeeperThread {
    //...
    public void run() {
    while (!stop) {
        try {
//提取已储存的投票信息
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;


            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
  }
//...
}

自旋recvqueue队列元素获取投票过来的选票信息:

【Java】
public Vote lookForLeader() throws InterruptedException {
//...
/*
 * Loop in which we exchange notifications until we find a leader
 */
while ((self.getPeerState() == ServerState.LOOKING) &&
        (!stop)){
    /*
     * Remove next notification from queue, times out after 2 times
     * the termination time
     */
    //提取投递过来的选票信息
    Notification n = recvqueue.poll(notTimeout,
            TimeUnit.MILLISECONDS);
/*
 * Sends more notifications if haven't received enough.
 * Otherwise processes new notification.
 */
if(n == null){
    if(manager.haveDelivered()){
        //已全部连接成功,并且前一轮投票都完成,需要再次发起投票
        sendNotifications();
    } else {
        //如果未收到选票信息,manager.contentAll()自动连接其它socket节点
        manager.connectAll();
    }
    /*
     * Exponential backoff
     */
    int tmpTimeOut = notTimeout*2;
    notTimeout = (tmpTimeOut < maxNotificationInterval?
            tmpTimeOut : maxNotificationInterval);
    LOG.info("Notification time out: " + notTimeout);
         }
     //....
    }
  //...
}
【Java】
//manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...)


private boolean startConnection(Socket sock, Long sid)
        throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        // Use BufferedOutputStream to reduce the number of IP packets. This is
        // important for x-DC scenarios.
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);


        // Sending id and challenge
        // represents protocol version (in other words - message type)
        dout.writeLong(PROTOCOL_VERSION);
        dout.writeLong(self.getId());
        String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();


        din = new DataInputStream(
                new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        LOG.warn("Ignoring exception reading or writing challenge: ", e);
        closeSocket(sock);
        return false;
    }


    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        // TODO - investigate why reconfig makes qps null.
        authLearner.authenticate(sock, qps.hostname);
    }


    // If lost the challenge, then drop the new connection
    //保证集群中所有节点之间只有一个通道连接
    if (sid > self.getId()) {
        LOG.info("Have smaller server identifier, so dropping the " +
                "connection: (" + sid + ", " + self.getId() + ")");
        closeSocket(sock);
        // Otherwise proceed with the connection
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);


        SendWorker vsw = senderWorkerMap.get(sid);


        if(vsw != null)
            vsw.finish();


        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(
                SEND_CAPACITY));


        sw.start();
        rw.start();


        return true;


    }
    return false;
}

如上述代码中所示,sid>self.sid才可以创建连接Socket和SendWorker,RecvWorker线程,存储到senderWorkerMap中。对应第2步中的sid<self.sid逻辑,保证集群中所有节点之间只有一个通道连接。

图4 节点之间连接方式

【Java】


public Vote lookForLeader() throws InterruptedException {
//...
    if (n.electionEpoch > logicalclock.get()) {
        //当前选举周期小于选票周期,重置recvset选票池
        //大于当前周期更新当前选票信息,再次发送投票
        logicalclock.set(n.electionEpoch);
        recvset.clear();
        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
            updateProposal(n.leader, n.zxid, n.peerEpoch);
        } else {
            updateProposal(getInitId(),
                    getInitLastLoggedZxid(),
                    getPeerEpoch());
        }
        sendNotifications();
    } else if (n.electionEpoch < logicalclock.get()) {
        if(LOG.isDebugEnabled()){
            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                    + Long.toHexString(n.electionEpoch)
                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
        }
        break;
    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
            proposedLeader, proposedZxid, proposedEpoch)) {//相同选举周期
        //接收的选票与当前选票PK成功后,替换当前选票
        updateProposal(n.leader, n.zxid, n.peerEpoch);
        sendNotifications();
    }
//...


}

在上代码中,自旋从recvqueue队列中获取到选票信息。开始进行选举:

  • 判断当前选票和接收过来的选票周期是否一致
  • 大于当前周期更新当前选票信息,再次发送投票
  • 周期相等:当前选票信息和接收的选票信息进行PK
【Java】
//接收的选票与当前选票PK
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }


        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));
  }

在上述代码中的totalOrderPredicate方法逻辑如下:

  • 竞选周期大于当前周期为true
  • 竞选周期相等,竞选zxid大于当前zxid为true
  • 竞选周期相等,竞选zxid等于当前zxid,竞选sid大于当前sid为true
  • 经过上述条件判断为true将当前选票信息替换为竞选成功的选票,同时再次将新的选票投出去。
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
   //存储节点对应的选票信息
    // key:选票来源sid  value:选票推举的Leader sid
    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));


    //半数选举开始
    if (termPredicate(recvset,
            new Vote(proposedLeader, proposedZxid,
                    logicalclock.get(), proposedEpoch))) {
        // Verify if there is any change in the proposed leader
        while((n = recvqueue.poll(finalizeWait,
                TimeUnit.MILLISECONDS)) != null){
            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    proposedLeader, proposedZxid, proposedEpoch)){
                recvqueue.put(n);
                break;
            }
        }
        /*WorkerSender
         * This predicate is true once we don't read any new
         * relevant message from the reception queue
         */
        if (n == null) {
            //已选举出leader 更新当前节点是否为leader 
            self.setPeerState((proposedLeader == self.getId()) ?
                    ServerState.LEADING: learningState());


            Vote endVote = new Vote(proposedLeader,
                    proposedZxid, proposedEpoch);
            leaveInstance(endVote);
            return endVote;
        }
    }
//...
}
/**
     * Termination predicate. Given a set of votes, determines if have
     * sufficient to declare the end of the election round.
     *
     * @param votes
     *            Set of votes
     * @param vote
     *            Identifier of the vote received last  PK后的选票
     */
private boolean termPredicate(HashMap votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
                    .getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }
    /*
     * First make the views consistent. Sometimes peers will have different
     * zxids for a server depending on timing.
     */
    //votes 来源于recvset 存储各个节点推举出来的选票信息
    for (Map.Entry entry : votes.entrySet()) {
//选举出的sid和其它节点选择的sid相同存储到voteSet变量中。
        if (vote.equals(entry.getValue())) {
//保存推举出来的sid
            voteSet.addAck(entry.getKey());
        }
    }
    //判断选举出来的选票数量是否过半
    return voteSet.hasAllQuorums();
}
//QuorumMaj#containsQuorum
public boolean containsQuorum(Set ackSet) {
    return (ackSet.size() > half);
   }

在上述代码中:recvset是存储每个sid推举的选票信息。

第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);

第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。

最终经过选举信息vote(2,0,1)为推荐leader,并用推荐leader在recvset选票池里比对持相同票数量为2个。因为总共有3个节点参与选举,sid1和sid2都选举sid2为leader,满足票数过半要求,故确认sid2为leader。

  • setPeerState更新当前节点角色;
  • proposedLeader选举出来的sid和自己sid相等,设置为Leader;
  • 上述条件不相等,设置为Follower或Observing;
  • 更新currentVote当前选票为Leader的选票vote(2,0,1)。


07总结

通过对Leader选举源码的解析,可以了解到:

1. 多个应用节点之间网络通信采用BIO方式进行相互投票,同时保证每个节点之间只使用一个通道,减少网络资源的消耗,足以见得在BIO分布式中间件开发中的技术重要性。

2. 基于BIO的基础上,灵活运用多线程和内存消息队列完好实现多层队列架构,每层队列由不同的线程分工协作,提高快速选举性能目的。

3. 为BIO在多线程技术上的实践带来了宝贵的经验。

相关推荐

zookeeper的Leader选举源码解析(zookeeper选取机制)

作者:京东物流梁吉超zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致...

接待外国人英文口语(接待外国人英文口语翻译)

接待外国人英文口语询问访客身份:  MayIhaveyourname,please?  请问您贵姓?  Whatcompanyareyoufrom?  您是哪个公司的?  Could...

一文深入理解AP架构Nacos注册原理

Nacos简介Nacos是一款阿里巴巴开源用于管理分布式微服务的中间件,能够帮助开发人员快速实现动态服务发现、服务配置、服务元数据及流量管理等。这篇文章主要剖析一下Nacos作为注册中心时其服务注册与...

Android面试宝典之终极大招(android 面试宝典)

以下内容来自兆隆IT云学院就业部,根据多年成功就业服务经验,以及职业素养课程部分内容,归纳总结:18.请描述一下Intent和IntentFilter。Android中通过Intent...

除了Crontab,Swoole Timer也可以实现定时任务的

一般的定时器是怎么实现的呢?我总结如下:1.使用Crontab工具,写一个shell脚本,在脚本中调用PHP文件,然后定期执行该脚本;2.ignore_user_abort()和set_time_li...

Spark源码阅读:DataFrame.collect 作业提交流程思维导图

本文分为两个部分:作业提交流程思维导图关键函数列表作业提交流程思维导图collect后Job的提交流程点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。关键函数列表Data...

Arduino通过串口透传ESP 13板与java程序交互

ESP13---是一个无线板子,配置通过热点通信Arduino通过串口透传ESP13板与java程序交互这个程序最基本的想法是用java把Arduino抽象出来,忙活了好几天,虽然没有达到最后的...

Arduino与两个或多个Arduino板之间的通信

问题您希望让两个或多个Arduino板一起工作。您可能希望增加I/O能力或执行比单个板上能够实现的更多处理。您可以使用I2C在板间传递数据,以便它们可以共享工作负载。解决方案本示例中的两段代码展示了如...

Android开发者必知的5个开源库(安卓开源库)

过去的时间里,Android开发逐步走向成熟,一个个与Android相关的开发工具也层出不穷。不过,在面对各种新鲜事物时,不要忘了那些我们每天使用的大量开源库。在这里,向大家介绍的就是,在这个任劳任怨...

Android 开发中文引导-应用小部件

应用小部件是可以嵌入其它应用(例如主屏幕)并收到定期更新的微型应用视图。这些视图在用户界面中被叫做小部件,并可以用应用小部件提供者发布。可以容纳其他应用部件的应用组件叫做应用部件的宿主(1)。下面的截...

Android | 如何在设备启动完成后打开应用

读完这篇文章大概需要1分钟最近,在做一个应用(暂且称之为MyApp),里面需要有provisioning(配置)的部分,也就是在应用启动前有个配置的过程,由好几个Activity组成,一个...

2021款欧版标致3008 1.5T柴油版:更为舒适的家用SUV

自文艺复兴时期及以后以来闻名的法国人的外向性为现代汽车文化注入了另一块石头,但如今已被大量技术所装饰。这标致3008出生时是畅销书。这是您在看到新船体时就知道的情况之一,那是在2016年,当年几乎...

媒库文选估计波士顿动力公司机器狗大军的拉力

EstimatethePullingForceofBostonDynamics'Robo-DogArmy估计波士顿动力公司机器狗大军的拉力RhettAllain雷特·阿兰When...

Three.js建模基础(threejs3d)

在Three.js中,一个可见的物体是由几何体和材料构成的。在这个教程中,我们将学习如何从头开始创建新的网格几何体,研究Three.js为处理几何对象和材质所提供的相关支持。1、索引面集/Indexe...

如何用2 KB代码实现3D赛车游戏?2kPlus Jam大赛了解一下

选自frankforce作者:Frank机器之心编译参与:王子嘉、GeekAI控制复杂度一直是软件开发的核心问题之一,一代代的计算机从业者纷纷贡献着自己的智慧,试图降低程序的计算复杂度。然而,将一款...

取消回复欢迎 发表评论: