zookeeper的Leader選舉原始碼解析
作者:京東物流 樑吉超
zookeeper是一個分散式服務框架,主要解決分散式應用中常見的多種資料問題,例如叢集管理,狀態同步等。為解決這些問題zookeeper需要Leader選舉進行保障資料的強一致性機制和穩定性。本文通過叢集的配置,對leader選舉源進行解析,讓讀者們瞭解如何利用BIO通訊機制,多執行緒多層佇列實現高效能架構。
01Leader選舉機制
Leader選舉機制採用半數選舉演算法。
每一個zookeeper服務端稱之為一個節點,每
個節點都有投票權,把其選票投向每一個有選舉權的節點,當其中一個節點選舉出票數過半,這個節點就會成為Leader,其它節點成為Follower。
02Leader選舉叢集配置
-
重新命名zoo_sample.cfg檔案為zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg
-
修改zoo.cfg檔案,修改值如下:
【plain】
zoo1.cfg檔案內容:
dataDir=/export/data/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo2.cfg檔案內容:
dataDir=/export/data/zookeeper-2
clientPort=2182
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo3.cfg檔案內容:
dataDir=/export/data/zookeeper-3
clientPort=2183
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo4.cfg檔案內容:
dataDir=/export/data/zookeeper-4
clientPort=2184
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
- server.第幾號伺服器(對應myid檔案內容)=ip:資料同步埠:選舉埠:選舉標識
- participant預設參與選舉標識,可不寫. observer不參與選舉
4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目錄下建立myid檔案,檔案內容分別寫1 ,2,3,4,用於標識sid(全稱:Server ID)賦值。
- 啟動三個zookeeper例項:
- bin/zkServer.sh start conf/zoo1.cfg
- bin/zkServer.sh start conf/zoo2.cfg
- bin/zkServer.sh start conf/zoo3.cfg
- 每啟動一個例項,都會讀取啟動引數配置zoo.cfg檔案,這樣例項就可以知道其作為服務端身份資訊sid以及叢集中有多少個例項參與選舉。
03Leader選舉流程
圖1 第一輪到第二輪投票流程
前提:
設定票據資料格式vote(sid,zxid,epoch)
- sid是Server ID每臺服務的唯一標識,是myid檔案內容;
- zxid是資料事務id號;
- epoch為選舉週期,為方便理解下面講解內容暫定為1初次選舉,不寫入下面內容裡。
按照順序啟動sid=1,sid=2節點
第一輪投票:
-
sid=1節點:初始選票為自己,將選票vote(1,0)傳送給sid=2節點;
-
sid=2節點:初始選票為自己,將選票vote(2,0)傳送給sid=1節點;
-
sid=1節點:收到sid=2節點選票vote(2,0)和當前自己的選票vote(1,0),首先比對zxid值,zxid越大代表資料最新,優先選擇zxid最大的選票,如果zxid相同,選舉最大sid。當前投票選舉結果為vote(2,0),sid=1節點的選票變為vote(2,0);
-
sid=2節點:收到sid=1節點選票vote(1,0)和當前自己的選票vote(2,0),參照上述選舉方式,選舉結果為vote(2,0),sid=2節點的選票不變;
-
第一輪投票選舉結束。
第二輪投票:
-
sid=1節點:當前自己的選票為vote(2,0),將選票vote(2,0)傳送給sid=2節點;
-
sid=2節點:當前自己的選票為vote(2,0),將選票vote(2,0)傳送給sid=1節點;
-
sid=1節點:收到sid=2節點選票vote(2,0)和自己的選票vote(2,0), 按照半數選舉演算法,總共3個節點參與選舉,已有2個節點選舉出相同選票,推舉sid=2節點為Leader,自己角色變為Follower;
-
sid=2節點:收到sid=1節點選票vote(2,0)和自己的選票vote(2,0),按照半數選舉演算法推舉sid=2節點為Leader,自己角色變為Leader。
這時啟動sid=3節點後,叢集裡已經選舉出leader,sid=1和sid=2節點會將自己的leader選票發回給sid=3節點,通過半數選舉結果還是sid=2節點為leader。
3.1 Leader選舉採用多層佇列架構
zookeeper選舉底層主要分為選舉應用層和訊息傳輸佇列層,第一層應用層佇列統一接收和傳送選票,而第二層傳輸層佇列,是按照服務端sid分成了多個佇列,是為了避免給每臺服務端傳送訊息互相影響。比如對某臺機器傳送不成功不會影響正常服務端的傳送。
圖2 多層佇列上下關係互動流程圖
04解析程式碼入口類
通過檢視zkServer.sh檔案內容找到服務啟動類:
org.apache.zookeeper.server.quorum.QuorumPeerMain
05選舉流程程式碼解析
圖3 選舉程式碼實現流程圖
- 載入配置檔案QuorumPeerConfig.parse(path);
針對 Leader選舉關鍵配置資訊如下:
- 讀取dataDir目錄找到myid檔案內容,設定當前應用sid標識,做為投票人身份資訊。下面遇到myid變數為當前節點自己sid標識。
-
- 設定peerType當前應用是否參與選舉
- new QuorumMaj()解析server.字首載入叢集成員資訊,載入allMembers所有成員,votingMembers參與選舉成員,observingMembers觀察者成員,設定half值votingMembers.size()/2.
【Java】
public QuorumMaj(Properties props) throws ConfigException {
for (Entry<Object, Object> entry : props.entrySet()) {
String key = entry.getKey().toString();
String value = entry.getValue().toString();
//讀取叢集配置檔案中的server.開頭的應用例項配置資訊
if (key.startsWith("server.")) {
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
QuorumServer qs = new QuorumServer(sid, value);
allMembers.put(Long.valueOf(sid), qs);
if (qs.type == LearnerType.PARTICIPANT)
//應用例項繫結的角色為PARTICIPANT意為參與選舉
votingMembers.put(Long.valueOf(sid), qs);
else {
//觀察者成員
observingMembers.put(Long.valueOf(sid), qs);
}
} else if (key.equals("version")) {
version = Long.parseLong(value, 16);
}
}
//過半基數
half = votingMembers.size() / 2;
}
-
QuorumPeerMain.runFromConfig(config) 啟動服務;
-
QuorumPeer.startLeaderElection() 開啟選舉服務;
- 設定當前選票new Vote(sid,zxid,epoch)
【plain】
synchronized public void startLeaderElection(){
try {
if (getPeerState() == ServerState.LOOKING) {
//首輪:當前節點預設投票物件為自己
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//........
}
- 建立選舉管理類:QuorumCnxnManager;
- 初始化recvQueue<Message(sid,ByteBuffer)>接收投票佇列(第二層傳輸佇列);
- 初始化queueSendMap<sid,queue>按sid傳送投票佇列(第二層傳輸佇列);
- 初始化senderWorkerMap<sid,SendWorker>傳送投票工作執行緒容器,表示著與sid投票節點已連線;
- 初始化選舉監聽執行緒類QuorumCnxnManager.Listener。
【Java】
//QuorumPeer.createCnxnManager()
public QuorumCnxManager(QuorumPeer self,
final long mySid,
Map<Long,QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled) {
//接收投票佇列(第二層傳輸佇列)
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
//按sid傳送投票佇列(第二層傳輸佇列)
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
//傳送投票工作執行緒容器,表示著與sid投票節點已連線
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.self = self;
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
quorumSaslAuthEnabled);
// Starts listener thread that waits for connection requests
//建立選舉監聽執行緒 接收選舉投票請求
listener = new Listener();
listener.setName("QuorumPeerListener");
}
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();//啟動選舉監聽執行緒
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;}
- 開啟選舉監聽執行緒QuorumCnxnManager.Listener;
- 建立ServerSockket等待大於自己sid節點連線,連線資訊儲存到senderWorkerMap<sid,SendWorker>;
- sid>self.sid才可以連線過來。
【Java】
//上面的listener.start()執行後,選擇此方法
public void run() {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
if (self.getQuorumListenOnAllIPs()) {
int port = self.getElectionAddress().getPort();
addr = new InetSocketAddress(port);
} else {
// Resolve hostname for this server in case the
// underlying ip address has changed.
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress();
}
LOG.info("My election bind port: " + addr.toString());
setName(addr.toString());
ss.bind(addr);
while (!shutdown) {
client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
//接收連線資訊
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {
if (shutdown) {
break;
}
LOG.error("Exception while listening", e);
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
closeSocket(client);
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ self.getElectionAddress());
} else if (ss != null) {
// Clean up for shutdown.
try {
ss.close();
} catch (IOException ie) {
// Don't log an error for shutdown.
LOG.debug("Error closing server socket", ie);
}
}
}
//程式碼執行路徑:receiveConnection()->handleConnection(...)
private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
//...省略
if (sid < self.getId()) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: {}", sid);
closeSocket(sock);
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
} else { // Otherwise start worker threads to receive data.
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
//儲存連線資訊<sid,SendWorker>
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid,
new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();
}
}
- 建立FastLeaderElection快速選舉服務;
- 初始選票傳送佇列sendqueue(第一層佇列)
- 初始選票接收佇列recvqueue(第一層佇列)
- 建立執行緒WorkerSender
- 建立執行緒WorkerReceiver
【Java】
//FastLeaderElection.starter
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
//傳送佇列sendqueue(第一層佇列)
sendqueue = new LinkedBlockingQueue<ToSend>();
//接收佇列recvqueue(第一層佇列)
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
//new Messenger(manager)
Messenger(QuorumCnxManager manager) {
//建立執行緒WorkerSender
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
//建立執行緒WorkerReceiver
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
- 開啟WorkerSender和WorkerReceiver執行緒。
WorkerSender執行緒自旋獲取sendqueue第一層佇列元素
- sendqueue佇列元素內容為相關選票資訊詳見ToSend類;
- 首先判斷選票sid是否和自己sid值相同,相等直接放入到recvQueue佇列中;
- 不相同將sendqueue佇列元素轉儲到queueSendMap<sid,queue>第二層傳輸佇列中。
【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{
//...
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
//將投票資訊傳送出去
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
}
//QuorumCnxManager#toSend
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
//轉儲到queueSendMap<sid,queue>第二層傳輸佇列中
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
connectOne(sid);
}
}
WorkerReceiver執行緒自旋獲取recvQueue第二層傳輸佇列元素轉存到recvqueue第一層佇列中。
【Java】
//WorkerReceiver
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
//自旋獲取recvQueue第二層傳輸佇列元素
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
// The current protocol and two previous generations all send at least 28 bytes
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: " + response.buffer.capacity());
continue;
}
//...
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
//第二層傳輸佇列元素轉存到recvqueue第一層佇列中
recvqueue.offer(n);
//...
}
}
//...
}
06選舉核心邏輯
- 啟動執行緒QuorumPeer
開始Leader選舉投票makeLEStrategy().lookForLeader();
sendNotifications()向其它節點發送選票資訊,選票資訊儲存到sendqueue佇列中。sendqueue佇列由WorkerSender執行緒處理。
【plain】
//QuorunPeer.run
//...
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//makeLEStrategy().lookForLeader() 傳送投票
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
//...
//FastLeaderElection.lookLeader
public Vote lookForLeader() throws InterruptedException {
//...
//向其他應用傳送投票
sendNotifications();
//...
}
private void sendNotifications() {
//獲取應用節點
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
//儲存投票資訊
sendqueue.offer(notmsg);
}
}
class WorkerSender extends ZooKeeperThread {
//...
public void run() {
while (!stop) {
try {
//提取已儲存的投票資訊
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
//...
}
自旋recvqueue佇列元素獲取投票過來的選票資訊:
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//提取投遞過來的選票資訊
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
//已全部連線成功,並且前一輪投票都完成,需要再次發起投票
sendNotifications();
} else {
//如果未收到選票資訊,manager.contentAll()自動連線其它socket節點
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
//....
}
//...
}
【Java】
//manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...)
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// If lost the challenge, then drop the new connection
//保證叢集中所有節點之間只有一個通道連線
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
如上述程式碼中所示,sid>self.sid才可以建立連線Socket和SendWorker,RecvWorker執行緒,儲存到senderWorkerMap<sid,SendWorker>中。對應第2步中的sid<self.sid邏輯,保證叢集中所有節點之間只有一個通道連線。
圖4 節點之間連線方式
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
if (n.electionEpoch > logicalclock.get()) {
//當前選舉週期小於選票週期,重置recvset選票池
//大於當前週期更新當前選票資訊,再次傳送投票
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {//相同選舉週期
//接收的選票與當前選票PK成功後,替換當前選票
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
//...
}
在上程式碼中,自旋從recvqueue佇列中獲取到選票資訊。開始進行選舉:
- 判斷當前選票和接收過來的選票週期是否一致
- 大於當前週期更新當前選票資訊,再次傳送投票
- 週期相等:當前選票資訊和接收的選票資訊進行PK
【Java】
//接收的選票與當前選票PK
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));
}
在上述程式碼中的totalOrderPredicate方法邏輯如下:
- 競選週期大於當前週期為true
- 競選週期相等,競選zxid大於當前zxid為true
- 競選週期相等,競選zxid等於當前zxid,競選sid大於當前sid為true
- 經過上述條件判斷為true將當前選票資訊替換為競選成功的選票,同時再次將新的選票投出去。
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
//儲存節點對應的選票資訊
// key:選票來源sid value:選票推舉的Leader sid
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//半數選舉開始
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*WorkerSender
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
//已選舉出leader 更新當前節點是否為leader
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
//...
}
/**
* Termination predicate. Given a set of votes, determines if have
* sufficient to declare the end of the election round.
*
* @param votes
* Set of votes
* @param vote
* Identifier of the vote received last PK後的選票
*/
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
//votes 來源於recvset 儲存各個節點推舉出來的選票資訊
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
//選舉出的sid和其它節點選擇的sid相同儲存到voteSet變數中。
if (vote.equals(entry.getValue())) {
//儲存推舉出來的sid
voteSet.addAck(entry.getKey());
}
}
//判斷選舉出來的選票數量是否過半
return voteSet.hasAllQuorums();
}
//QuorumMaj#containsQuorum
public boolean containsQuorum(Set<Long> ackSet) {
return (ackSet.size() > half);
}
在上述程式碼中:recvset是儲存每個sid推舉的選票資訊。
第一輪 sid1:vote(1,0,1) ,sid2:vote(2,0,1);
第二輪 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。
最終經過選舉資訊vote(2,0,1)為推薦leader,並用推薦leader在recvset選票池裡比對持相同票數量為2個。因為總共有3個節點參與選舉,sid1和sid2都選舉sid2為leader,滿足票數過半要求,故確認sid2為leader。
- setPeerState更新當前節點角色;
- proposedLeader選舉出來的sid和自己sid相等,設定為Leader;
- 上述條件不相等,設定為Follower或Observing;
- 更新currentVote當前選票為Leader的選票vote(2,0,1)。
07總結
通過對Leader選舉原始碼的解析,可以瞭解到:
-
多個應用節點之間網路通訊採用BIO方式進行相互投票,同時保證每個節點之間只使用一個通道,減少網路資源的消耗,足以見得在BIO分散式中介軟體開發中的技術重要性。
-
基於BIO的基礎上,靈活運用多執行緒和記憶體訊息佇列完好實現多層佇列架構,每層佇列由不同的執行緒分工協作,提高快速選舉效能目的。
-
為BIO在多執行緒技術上的實踐帶來了寶貴的經驗。
- 應用健康度隱患刨析解決系列之資料庫時區設定
- 對於Vue3和Ts的心得和思考
- 一文詳解擴散模型:DDPM
- zookeeper的Leader選舉原始碼解析
- 一文帶你搞懂如何優化慢SQL
- 京東金融Android瘦身探索與實踐
- 微前端框架single-spa子應用載入解析
- cookie時效無限延長方案
- 聊聊前端效能指標那些事兒
- Spring竟然可以建立“重複”名稱的bean?—一次專案中存在多個bean名稱重複問題的排查
- 京東金融Android瘦身探索與實踐
- Spring原始碼核心剖析
- 深入淺出RPC服務 | 不同層的網路協議
- 安全測試之探索windows遊戲掃雷
- 關於資料庫分庫分表的一點想法
- 對於Vue3和Ts的心得和思考
- Bitmap、RoaringBitmap原理分析
- 京東小程式CI工具實踐
- 測試用例設計指南
- 當你對 redis 說你中意的女孩是 Mia