Zookeeper集群Leader选举

Zookeeper集群的启动是通过QuorumPeerMain的main方法中调用initializeAndRun方法,单机模式的启动是调用ZooKeeperServerMain的main方法,集群的启动是调用本类的runFromConfig方法,最终调用QuorumPeerstart方法来完成集群的启动。对于ServerCnxnFactory默认是使用NIO即创建NIOServerCnxnFactory,官方推荐Netty可通过指定VM参数-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class QuorumPeerMain {
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
System.exit(0);
}
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]); // 解析配置文件加载到内存,包括myid文件内容校验
}
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start(); // 清理快照文件任务
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config); // 集群启动核心流程
} else {
ZooKeeperServerMain.main(args); // 单机模式启动
}
}
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory(); // 初始化服务端链接对象zk默认用NIO,官方推荐Netty
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
}
quorumPeer = getQuorumPeer(); // 获取本节点对象
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
quorumPeer.setElectionType(config.getElectionAlg()); // 设置选举类型,默认为3
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); // 初始化内存数据库对象
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory); // 将上面创建的初始服务连接对象放入本服务节点对象
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
quorumPeer.start(); // 启动服务节点
quorumPeer.join();
} catch (InterruptedException e) {
}
}
}

首先判断当前myid是否在配置文件中配置的集群列表中,然后通过loadDataBase调用ZKDatabaseloadDataBase方法读取快照事务日志恢复服务器数据库。然后通过前面创建的NettyServerCnxnFactory的start方法启动Netty服务绑定2181端口。然后通过startLeaderElection方法初始化集群选举leader相关对象数据,最后执行QuorumPeer的run方法启动集群选举leader线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
public synchronized void start() {
if (!getView().containsKey(myid)) { // 判断当前myid是否在配置文件中配置的集群列表中
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase(); // 加载文件数据到内存,读取快照和事务日志后恢复服务器数据库
startServerCnxnFactory(); // 启动Netty服务
try {
adminServer.start(); // 启动内嵌jetty服务,默认8080端口,用来查看服务端状态信息
} catch (AdminServerException e) {
System.out.println(e);
}
startLeaderElection(); // 初始化集群选举leader相关对象数据
super.start(); // 执行当前类的run方法,启动集群选举leader线程
}
}

集群选举初始化

初始化集群选举leader相关对象数据,首先将当前选票currentVote设置为本节点,然后启动QuorumCnxManager.Listener线程类开启BIO监听选举端口,然后通过FastLeaderElection启动快速选举算法相关的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
synchronized public void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) { // 初始时默认为ServerState.LOOKING
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); // 默认将当前选票置为当前节点
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
this.electionAlg = createElectionAlgorithm(electionType); // 初始化选举数据管理器,启动选举监听,启动快速选举算法相关的线程
}
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
switch (electionAlgorithm) {
case 3: // electionAlgorithm默认为3
QuorumCnxManager qcm = createCnxnManager(); // 初始化选举数据管理器
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); // 将新建的选举数据管理器设置到qcmRef,并返回旧的
if (oldQcm != null) {
oldQcm.halt(); // 若旧的选举数据管理器不为null则关闭旧的
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start(); // 启动选举监听,调用QuorumCnxManager.Listener的run方法
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start(); // 启动快速选举算法相关的线程
le = fle;
}
break;
default:
assert false;
}
return le;
}
}

开启BIO监听选举端口,当接收到数据后通过receiveConnection处理接收到的数据,首先读取发送选票的机器sid即该机器myid文件中配置的值,由于Socket链接是全双工的为了防止重复链接的创建,Zookeeper只允许sid大的机器连接sid小的机器,若当前机器的sid大于发送选票机器的sid则关闭Socket连接,然后当前机器主动发起socket连接到发送选票的id较小的机器。若小于则为其创建一个独属的选票发送器线程选票接收线程用于将来发送或接收选票使用,且将选票发送器与sid对应放入senderWorkerMap中缓存起来,且给发送选票sid机器初始化一个发送选票队列放入queueSendMap中。然后启动选票发送线程选票接收线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class Listener extends ZooKeeperThread {
public void run() { // 使用普通的socket通信即BIO
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
Exception exitException = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
ss = new ServerSocket(); // 初始化Socket
ss.setReuseAddress(true);
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress(); // 获取进行选举的ip和端口,即配置文件中配置的当前服务的server.1=127.0.0.1:3001
setName(addr.toString());
ss.bind(addr); // 绑定监听地址:127.0.0.1:3001
while (!shutdown) {
try {
client = ss.accept(); // 在选举端口监听连接
setSockOpts(client);
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client); // 接收连接数据
}
numRetries = 0;
}
}
}
}
}
}
public class QuorumCnxManager {
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din); // 处理连接信息
} catch (IOException e) {
closeSocket(sock);
}
}
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
Long sid = null, protocolVersion = null;
InetSocketAddress electionAddr = null;
try {
protocolVersion = din.readLong(); // 读取发送选票的机器id,即myid文件中配置的值
if (protocolVersion >= 0) { // this is a server id and not a protocol version
sid = protocolVersion;
} else {
try {
InitialMessage init = InitialMessage.parse(protocolVersion, din);
sid = init.sid;
electionAddr = init.electionAddr;
} catch (InitialMessage.InitialMessageException ex) {
closeSocket(sock);
return;
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
sid = observerCounter.getAndDecrement();
}
} catch (IOException e) {
closeSocket(sock);
return;
}
authServer.authenticate(sock, din); // 认证权限相关的处理
if (sid < self.getId()) { // 若发送选票的机器id小于当前机器则关闭连接,为了防止机器之间相互重复的建立socket连接,不允许id小的机器连接id大的机器
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
closeSocket(sock); // 关闭Socket连接
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid); // 当前机器主动发起socket连接到发送选票的id较小的机器
}
} else if (sid == self.getId()) { // 一般不可能发送,若发生可能是bug
} else { // Otherwise start worker threads to receive data.
// 在接收到选票时,就给发选票的机器建立一个选票发送器线程供将来发送选票使用,并启动发送线程
SendWorker sw = new SendWorker(sock, sid); // 给发送选票sid这台机器创建一个选票发送器
RecvWorker rw = new RecvWorker(sock, din, sid, sw); // 开启接收选票线程
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw); // 将选票发送器与sid对应放入map
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); // 给发送选票sid机器初始化一个发送选票队列放入map
sw.start(); // 启动选票发送线程
rw.start(); // 启动选票接收线程
}
}
}

在快速选举类FastLeaderElection的start方法中通过Messenger类启动了发送选票线程WorkerSender接收选票线程WorkerReceiver。这里的两个线程与QuorumCnxManager.Listener中开启的SendWorkerRecvWorker组成了Leader选举的多层队列架构

这里可以将选举底层分为由WorkerSenderWorkerReceiver组成的选举应用层,以及由SendWorkerRecvWorker组成的选举传输层。应用层有自己的队列统一接收和发送选票传输层也有自己的队列,且是按照发送机器分配的队列避免给没太机器发送消息时相互影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FastLeaderElection implements Election {
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
void start() {
this.wsThread.start(); // 运行发送选票线程
this.wrThread.start(); // 运行接收选票线程
}
}
protected class Messenger {
WorkerSender ws;
WorkerReceiver wr;
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager); // 用于发送选票的异步线程
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);

this.wr = new WorkerReceiver(manager); // 用于接收选票的异步线程
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
}

应用层发送选票线程WorkerSender,首先从从应用层发送阻塞队列里获取选票信息,然后判断发送方是否为本节点,若为本节点则将选票信息放入传输层接收选票的阻塞队列recvQueueWorkerReceiver异步处理。若不为本节点则将选票信息放入传输层待发送队列SendWorker异步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class WorkerSender extends ZooKeeperThread {
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); // 从应用层发送阻塞队列里获取选票
if (m == null) continue;
process(m); // 处理取出的选票
} catch (InterruptedException e) {
break;
}
}
}
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
manager.toSend(m.sid, requestBuffer);
}
}
public class QuorumCnxManager {
public void toSend(Long sid, ByteBuffer b) {
if (this.mySid == sid) { // 若选票接收机器是自己,则放入本节点传输层接收队列中
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b); // 放入传输层待发送队列
} else {
addToSendQueue(bq, b); // 放入传输层待发送队列
}
connectOne(sid); // 与对方sid机器建立连接
}
}
public void addToRecvQueue(Message msg) {
synchronized (recvQLock) {
if (recvQueue.remainingCapacity() == 0) {
try {
recvQueue.remove();
} catch (NoSuchElementException ne) {
}
}
try {
recvQueue.add(msg); // 将接收到的选票丢入传输层接收选票的阻塞队列recvQueue中异步处理
} catch (IllegalStateException ie) {
}
}
}
}

应用层接收选票线程WorkerReceiver首先从应用层阻塞队列recvQueue中取出选票,首先判断接收到的选票是否是由无投票权的节点,若是则将当前节点投出的选票信息返回应用层发送队列中发送给发起方。否则判断单签节点状态,若为LOOKING状态则将选票信息放入应用层接收队列传输层RecvWorker线程去处理。且若发送选票方是选举状态且其选举周期小于本节点,则把本节点投出的选票回发给发送选票方,该情况可能是发送选票方节点后启动。若本节点状态不是LOOKING发送选票的节点状态为LOOKING,则说明Leader已经明确,则currentVote中存储的是Leader节点的票据信息,则只需要将Leader的票据信息回发即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class WorkerReceiver extends ZooKeeperThread {
public void run() {
Message response;
while (!stop) {
try {
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); // 从传输层接收队列recvQueue中取出选票
if (response == null) continue; // 若未渠道则继续等待下一次获取选票
final int capacity = response.buffer.capacity();
if (capacity < 28) {
continue; // 当前协议和前两代协议至少发送28bytes
}
response.buffer.clear();
Notification n = new Notification();
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
int version = 0x0;
QuorumVerifier rqv = null;
if (!validVoter(response.sid)) { // 若来自无投票权的节点则将当前节点投出的选票信息返回应用层发送队列中发送给发起方
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes());
sendqueue.offer(notmsg); // 将当前的选票信息放回应用层发送队列中
} else {
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
recvqueue.offer(n); // 是本机且还处于选举状态,放入应用层接收队列,由RecvWorker线程去处理
// 若发送选票方是选举状态,且发送选票方选举周期小于自己,则把自己PK出来的选票回发给发送选票方
if ((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())) {
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes());
sendqueue.offer(notmsg); // 把自己PK出来的选票回发给发送选票方,其实就是当前节点的投票发送回去
}
} else { // 若本节点状态不是LOOKING但发送选票的节点状态为LOOKING,则说明Leader已经明确,则只需要将Leader的票据信息返回即可
Vote current = self.getCurrentVote(); // 获取本节点当前的投票
if (ackstate == QuorumPeer.ServerState.LOOKING) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes());
sendqueue.offer(notmsg); // 把本节点的投票回发给发送选票方
}
}
}
} catch (InterruptedException e)
}
}
}
public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
return recvQueue.poll(timeout, unit); // 从传输层接收队列中取出选票
}

传输层接收选票线程RecvWorker,将接收到的选票信息放入传输层接收选票队列recvQueue中由应用层WorkerReceiver线程异步处理。这里接收到的选票信息是由其他节点的SendWorker线程发来的。RecvWorker线程是每个连接的节点都创建了一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class RecvWorker extends ZooKeeperThread {
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
addToRecvQueue(new Message(message.duplicate(), sid)); // 将接收到的选票丢入recvQueue中异步处理
}
} catch (Exception e) {
} finally {
sw.finish();
closeSocket(sock);
}
}
}
public void addToRecvQueue(Message msg) {
synchronized (recvQLock) {
if (recvQueue.remainingCapacity() == 0) {
try {
recvQueue.remove();
} catch (NoSuchElementException ne) {
}
}
try {
recvQueue.add(msg); // 将接收到的选票丢入传输层接收选票队列recvQueue中由应用层WorkerReceiver线程异步处理
} catch (IllegalStateException ie) {
}
}
}

传输层发送选票线程SendWorker,这里队列中的数据是应用层发送选票线程WorkerSender放入队列中的,将选票数据从队列中取出发送给对应的节点,这里的SendWorker线程是每个连接的节点都创建了一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class SendWorker extends ZooKeeperThread {
public void run() {
threadCnt.incrementAndGet();
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); // 取出发送选票队列
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
send(b); // 发送选票
}
}
} catch (IOException e) {
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); // 取出发送选票的阻塞队列
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); // 从阻塞队列中取出选票
} else {
break;
}
if (b != null) {
lastMessageSent.put(sid, b);
send(b); // 发送选票
}
} catch (InterruptedException e) {
}
}
} catch (Exception e) {
}
this.finish();
}
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
b.position(0);
b.get(msgBytes);
} catch (BufferUnderflowException be) {
return;
}
dout.writeInt(b.capacity());
dout.write(b.array());
dout.flush();
}
}

集群选举启动

QuorumPeer就是一个Thread线程类,启动集群选举就是掉用该类的run方法来完成的。若本节点为LOOKING状态,不论是不是只读模式最终会通过makeLEStrategy().lookForLeader()调用FastLeaderElectionlookForLeader方法来完成选举功能。若本节点为LEADING状态,在Leaderlead()方法中会同步数据给从节点且会每隔一段时间给从节点发送ping消息保持长连接;若本节点为OBSERVING状态或FOLLOWING状态,在其ObserverobserveLeader()方法和FollowerfollowLeader()方法中会与leader建立连接并接收leader数据同步且一直保持连接。不论是这三种哪种状态,若连接断开则将通过updateServerState()方法将本节点更新为LOOKING状态,然后重新进行选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
public void run() {
try {
while (running) { // 死循环根据当前节点状态做对应业务处理
switch (getPeerState()) { // 获取本节点状态
case LOOKING:
if (Boolean.getBoolean("readonlymode.enabled")) { // 只读模式
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
Thread roZkMgr = new Thread() {
public void run() {
try {
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
} catch (Exception e) {
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try { // 这里的Leader选举策略默认是FastLeaderElection
reconfigFlagClear(); // 将reconfigFlag置为false,在updateServerState()方法中会用到
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader()); // 调用FastLeaderElection的lookForLeader方法
} catch (Exception e) {
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
}
break;
case FOLLOWING:
try {
setFollower(makeFollower(logFactory));
follower.followLeader(); // 与leader建立连接并接收leader数据同步
} catch (Exception e) {
} finally {
follower.shutdown();
setFollower(null);
updateServerState(); // 将自己的状态改为LOOKING,进入下一轮选举
}
break;
case LEADING:
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
}
}
}
private synchronized void updateServerState() {
if (!reconfigFlag) { // 进行选举流程时会通过reconfigFlagClear()方法先将reconfigFlag置为false
setPeerState(ServerState.LOOKING);
return;
}
if (getId() == getCurrentVote().getId()) {
setPeerState(ServerState.LEADING);
} else if (getLearnerType() == LearnerType.PARTICIPANT) {
setPeerState(ServerState.FOLLOWING);
} else if (getLearnerType() == LearnerType.OBSERVER) {
setPeerState(ServerState.OBSERVING);
} else { // currently shouldn't happen since there are only 2 learner types
setPeerState(ServerState.LOOKING);
}
reconfigFlag = false;
}

Leader选举首先会将本节点选举周期加一,其初始化选票为本节点,然后将给自己的投票通过sendNotifications将选票放入应用层发送队列WorkerSender线程异步处理。然后通过while循环对接收到的选票进行处理,首先从应用层接收队列中取出接收到的选票,第一次启动时没有选票跟需要发送选票的机器建立连接,若获取到选票先判断发送选票方选票是否有效即判断发送选票方sid选票是否在votingMembers中,然后根据发送方的状态走不同的逻辑。

发送选票方为LOOKING状态选举主线,首先判断发送选票方选举周期大于本节点选举周期,该情况可能是本节点后启动加入集群选举或网络中断恢复后加入集群选举,其他机器都选举好几轮了,故需要更新本节点选举周期到最新,然后清空之前的选票箱,然后通过totalOrderPredicate进行选票PK,并将最终通过updateProposal方法将PK获胜的选票更新,最后将获胜的选票发送给所有其他参与投票的节点。若发送选票方选举周期小于本节点选举周期,一般是发送选票的机器刚加入到集群选举,发起投它自己的选票,这种选票一般废弃掉;若选举周期相等,则将发送方节点投递的选票本节点投递的选票进行选票PK,若发送方节点投递的选票获胜则将选票更新并发送给所有参与选票的节点;

将收到的选票放入选票箱中,然后通过过半数选举leader逻辑,判断是否已经产生了leader,若已产生会查看是否有新选票加入,若有再做一次PK,若新选票获胜则重新选举,若没有新选票加入则返回最终的Leader设置到currentVote属性中。

发送方节点OBSERVING状态则直接退出不参与选举,若发送选票方节点为FOLLOWINGLEADING状态,一般是本节点去加入已经选出leader的集群,本节点处于LOKING状态会先将选票投给自己,其他机器接收到后回发已选出的集群leader选票给本节点。直接将收到的集群选票做过半数选举leader逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class FastLeaderElection implements Election {
public Vote lookForLeader() throws InterruptedException {
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized (this) {
logicalclock.incrementAndGet(); // 选举周期加一
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); // 初始化选票为自己
}
sendNotifications(); // 发送选票选自己
// 当前节点是选举状态会不断从应用层接收队列中获取选票做选举
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
// 从应用层接收队列中取出接收到的选票
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
if (n == null) { // 第一次启动时肯定没有选票,这时会跟需要发送选票的机器建立连接
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
int tmpTimeOut = notTimeout * 2;
notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
} else if (validVoter(n.sid) && validVoter(n.leader)) {// 判断发送选票方选票是否有效,其实就是判断sid与选票是否在votingMembers中
switch (n.state) { // 根据发送方的状态走不同的逻辑
case LOOKING: // 选举主线
// 接收的选票选举周期大于自己的选举周期,1.可能是自己后启动加入集群选举;2.网络中断恢复后加入集群选举;其他机器都选举好几轮了,所以需要更新自己的选举周期到最新
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch); // 更新自己的选举周期到最新
recvset.clear(); // 清空之前的选票箱
// 选票PK,因为自己选举周期落后,可能是刚加入集群选举,所以是拿收到的选票跟给自己的选票做PK
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch); // 接收到的选票获胜,更新选票
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); // 本节点获胜,更新选票
}
sendNotifications(); // 发送选票给其他参与选举的所有节点
} else if (n.electionEpoch < logicalclock.get()) {
break;// 接收的选票选举周期小于自己的选举周期,一般发送选票的机器刚加入到集群选举,发起投它自己的选票,这种选票一般废弃掉
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
// 接收的选票的选举周期等于自己,说明大家一直都在参与选举,则在选举PK时需要拿收到的选票跟之前本机投的选票做PK
updateProposal(n.leader, n.zxid, n.peerEpoch); // 接收到的选票获胜,更新选票
sendNotifications(); // 发送选票给其他参与选举的所有节点
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 将接收到的选票放入选票箱
if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // 过半数选举leader逻辑
// 虽然termPredicate已经选出leader,但需要查看是否有新选票加入,若有再做一次PK,若新选票获胜则重新选举
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break; // 若新选票获胜则重新选举,则退出重新选举
}
}
if (n == null) {
// 若选出的Leader节点是当前节点,则设置为LEADING,否则设置为FOLLOWING或OBSERVING
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING : learningState());
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote; // 返回选出的Leader并设置到本节点的currentVote属性中
}
}
break;
case OBSERVING: // 若是OBSERVING状态的节点则不参与选举
break;
case FOLLOWING:
case LEADING:
// FOLLOWING和LEADING状态的都会走该逻辑,这种状态一般是已经选出leader的集群有新机器加入,新机器处于LOKING状态,会先将选票投给自己
// 其他机器接收到后会发已选出的集群leader选票给该机器,该选票的发送方状态就是LEADING或FOLLOWING
if (n.electionEpoch == logicalclock.get()) { // 若选举周期相等
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 判断n的选票是否超过半数
if (termPredicate(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
break;
}
}
}
return null;
}
}
}
synchronized void updateProposal(long leader, long zxid, long epoch) {
proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
// 选票中最大zxid是从内存树中取得
ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes());
sendqueue.offer(notmsg); // 给所有其他参与投票的节点发送选票到应用层发送队列
}
}

对于选票的PK逻辑,首先比较选票的选举周期,若前者的选票的选举周期大于后者的则返回true,若选举周期相等则比较最大事务zxid,若前者大返回true,若最大事务zxid也相等,则比较sid或者叫myid,若前者大返回true,否则返回false;

1
2
3
4
5
6
7
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
// 首先比较收到的选票的选举周期,若收到的选票的选举周期大于当前的则返回true,若选举周期相同,比较zxid,若选举周期相同,zxid也相同,则比较myid
return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

过半数选举leader逻辑,是判断选票箱中vote的选票是否超过参与选举机器的半数,没有超过半数则返回false;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) { // 过半数选举leader逻辑,判断vote的选票是否超过半数
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// 遍历选票箱中收到的选票与本机投的leader选票对比,若相等,则将投票机器sid加入到选票箱
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums(); // 判断投票机器是否超过半数,超过半数返回true
}
public class SyncedLearnerTracker {
public boolean hasAllQuorums() {
for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
return false; // containsQuorum判断投票机器是否超过半数
}
return true;
}
}
public class QuorumMaj implements QuorumVerifier {
public boolean containsQuorum(Set<Long> ackSet) {
return (ackSet.size() > half);
}
}

若本节点为OBSERVING状态,则通过makeObserver新建一个Observer类然后调用其observeLeader()方法,与leader建立连接并接收leader数据同步且一直保持连接,若连接断开会将observer关闭后置空且将当前节点状态置为LOOKING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Observer extends Learner{
void observeLeader() throws Exception {
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader();
try { // 连接到leader节点交换信息的端口,如配置的2001
connectToLeader(leaderServer.addr, leaderServer.hostname);// 主动向leader发起socket连接
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);// 注册自己到Leader
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
syncWithLeader(newLeaderZxid);// 同步leader数据
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) { // 死循环获取数据且保持连接
readPacket(qp);// 若leader挂了,这里从leader取数据时会抛出异常退出循环
processPacket(qp);
}
} catch (Exception e) {
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}
pendingRevalidations.clear();
}
}
}
}

若本节点为FOLLOWING状态,其逻辑和OBSERVING状态的节点类似。若连接断开会将observer关闭后置空且将当前节点状态置为LOOKING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Follower extends Learner{
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader(); // 获取leader server
try {
connectToLeader(leaderServer.addr, leaderServer.hostname); // 主动向leader发起socket连接
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); // 注册自己到Leader
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid); // 同步leader数据
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) { // while死循环接收leader同步的数据
readPacket(qp); // 若leader挂了,这里从leader取数据时会抛出异常退出循环
processPacket(qp);
}
} catch (Exception e) {
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
}

若本节点为LEADING状态,其逻辑与OBSERVINGFOLLOWING状态的节点有一点区别,这里是调用Leader的lead()方法,首先会通过LearnerCnxAcceptor同步数据给从节点,然后每隔一段时间给从节点发送ping消息保持长连接。若连接断开会将observer关闭后置空且将当前节点状态置为LOOKING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class Leader {
void lead() throws IOException, InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
self.start_fle = 0;
self.end_fle = 0;
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.tick.set(0);
zk.loadData(); // 初始化LeaderZookeeperServer数据
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start(); // 同步数据给从节点
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized(this){
lastProposed = zk.getZxid();
}
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
try {
QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV, true);
} catch (Exception e) {
throw new IOException(e);
}
}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ " + newLeaderProposal.ackSetsToString() + " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for(LearnerHandler f : getLearners()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
followerSet.add(f.getSid());
}
}
boolean initTicksShouldBeIncreased = true;
for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
initTicksShouldBeIncreased = false;
break;
}
}
return;
}
startZkServer();
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.setZooKeeperServer(zk);
}
self.adminServer.setZooKeeperServer(zk);
boolean tickSkip = true;
String shutdownMessage = null;
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!tickSkip) {
self.tick.incrementAndGet();
}
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
syncedAckSet.addAck(self.getId());
for (LearnerHandler f : getLearners()) {
if (f.synced()) {
syncedAckSet.addAck(f.getSid());
}
}
if (!this.isRunning()) {
shutdownMessage = "Unexpected internal error";
break;
}
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ " + syncedAckSet.ackSetsToString() + " ]";
break;
}
tickSkip = !tickSkip;
}
for (LearnerHandler f : getLearners()) {
f.ping(); // leader跟所有follower定时发送ping请求保持长连接
}
}
if (shutdownMessage != null) {
shutdown(shutdownMessage);
}
}
}
}