Zookeeper客户端之ZAB

整个Zookeeper就是一个多节点分布式一致性算法的实现,底层采用的实现协议是ZAB,即Zookeeper Atomic Broadcast原子广播协议。ZAB协议是为分布式协调服务Zookeeper专门设计的一种支持崩溃恢复原子广播的协议。

ZAB协议消息广播过程使用的是一个原子广播协议,类似一个两阶段提交过程。对于客户端发送的写请求,全部由 Leader接收,Leader将请求封装成一个事务Proposal,将其发送给所有Follower,然后根据所有Follwer的反馈,若Leader自己超过半数成功响应,则执行commit操作。

  • Leader在收到客户端请求之后,会将该请求封装成一个事务,并给该事务分配一个全局递增的唯一ID,称为事务ZXID,ZAB协议需要保证事务的顺序,因此必须将每一个事务按照ZXID进行先后排序然后处理,主要通过消息队列实现
  • 在Leader和Follwer之间还有一个消息队列,用来解耦他们之间的耦合,解除同步阻塞
  • Zookeeper集群中为保证任何所有进程能够有序的顺序执行,只能是Leader服务器接受写请求,即使是 Follower服务器接受到客户端的写请求,也会转发到Leader服务器进行处理,Follower只处理读请求
  • ZAB协议规定了若一个事务在一台机器上被处理Commit成功,则应该在所有机器上都被处理成功,哪怕机器出现故障崩溃

当崩溃恢复之后,需要在正式接收客户端请求之前,Leader服务器首先确认事务是否都已经被过半的Follwer提交了,即是否完成了数据同步。目的是为了保持数据一致。

当Follwer服务器成功同步之后,Leader会将这些服务器加入到可用服务器列表中,Leader服务器处理或丢弃事务都是依赖着ZXID。

实际上,Leader 服务器处理或丢弃事务都是依赖着 ZXID 的,那么这个 ZXID 如何生成呢?

在ZAB协议的事务编号ZXID设计中,ZXID是一个64位的数字,其中32位可看作是一个简单的递增的计数器,针对客户端的每一个事务请求,Leader都会产生一个新的事务Proposal并对该计数器进行+1操作。32则代表Leader服务器上取出本地日志中最大事务ProposalZXID,并从该ZXID中解析出对应Leader选举周期epoch,当一轮新的选举结束后,会对该值加一且事务id又从0开始自增

高32位代表了每代Leader的唯一性,低32代表了每代Leader中事务的唯一性。同时也能让Follwer通过高32位识别不同的Leader,简化数据恢复流程。基于这样的策略,当Follower连接上Leader后,Leader服务器会根据自己服务器上最后被提交的ZXIDFollower上的ZXID进行比对,比对结果要么回滚,要么和Leader同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ZkServiceProviderV implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zk = null;
private static String rootPath = "/GroupMembers";
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper("localhost:2181", 5000, new ZkServiceProviderV());
countDownLatch.await();
zk.create(rootPath + "/test", "test carete".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建集群节点test:" + rootPath + "test");
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && event.getPath() == null) {
countDownLatch.countDown();
}
}
}
}

通过ZooKeeper的构造方法创建ZooKeeper时,首先通过defaultWatchManager创建一个ZKWatchManager对象,且将传入的Watcher赋值给其defaultWatcher属性。然后通过ClientCnxn构造方法创建连接,且在ClientCnxn构造方法中创建了SendThreadEventThread两个重要的线程。且在ZooKeeper构造方法中调用ClientCnxnstart方法启动了这两个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ZooKeeper implements AutoCloseable {
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
this(connectString, sessionTimeout, watcher, false);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString));
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider) throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly, aHostProvider, null);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);
}
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
private boolean disableAutoWatchReset;
protected volatile Watcher defaultWatcher;
ZKWatchManager(boolean disableAutoWatchReset) {
this.disableAutoWatchReset = disableAutoWatchReset;
}
}
}
public class ClientCnxn {
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException {
this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16], canBeReadOnly);
}
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
public void start() {
sendThread.start();
eventThread.start();
}
}

SendThread

SendThread主要跟服务端建立连接,且监听读写事件并处理。首先在ClientCnxnstartConnect方法中调用ClientCnxnSocketNIOconnect方法,创建SocketChannel并注册到Selector中。在ClientCnxnSocketNIOdoTransport方法中监听读写事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class SendThread extends ZooKeeperThread {
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}

if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
if (state == States.CONNECTEDREADONLY) { // 只读模式处理
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
break;
} else {// this is ugly, you have a better way speak up
cleanAndNotifyState();
}
}
}
synchronized (state) {// When it comes to this point, it guarantees that later queued packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
}
}
public class ClientCnxn {
private void startConnect(InetSocketAddress addr) throws IOException {// initializing it for new connection
saslLoginFailed = false;
if (!isFirstConnect) {
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
}
}
state = States.CONNECTING;
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
if (clientConfig.isSaslClientEnabled()) {
try {
if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
}
zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig);
} catch (LoginException e) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
clientCnxnSocket.connect(addr);
}
}
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (IOException e) {
sock.close();
throw e;
}
initialized = false;
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
SocketChannel createSock() throws IOException {
SocketChannel sock;
sock = SocketChannel.open();
sock.configureBlocking(false);
sock.socket().setSoLinger(false, -1);
sock.socket().setTcpNoDelay(true);
return sock;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
}

SendThread中通过startConnect方法与服务端建立好连接后,调用ClientCnxnSocketNIOdoTransport监听读写事件,当接收到读写事件后调用doIO方法对读写逻辑分别处理。对于写事件通过从outgoingQueue队列中取出命令包,最终调用ClientCnxncreateBB方法将数据发送给服务端,服务端最终通过CnxnChannelHandlerchannelRead方法处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard(); // 更新最后发送以及心跳时间
updateSocketAddresses();
sendThread.primeConnection(); // 处理会话、之前的监听器、身份验证
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn); // 有NIO读写事件发生
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) { // 处理服务端响应信息
int rc = sock.read(incomingBuffer);
if (rc < 0) { // 未读取到数据抛出异常
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
if (sockKey.isWritable()) { // 向服务端发送消息
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); // 从outgoingQueue队列中取出命令包
if (p != null) {
updateLastSend();
if (p.bb == null) {
if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB(); // 将待发送数据通过jute序列化后封装到ByteBuffer中去
}
sock.write(p.bb); // 将数据发送给服务端,服务端通过SocketChannel接收客户端请求命令
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else { // Just in case
enableWrite();
}
}
}
}
public class ClientCnxn {
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {}
}
}

对于读事件若收到服务端数据变动返回事件,首先反序列化服务端响应数据为WatcherEvent,然后从服务端路径转换为客户端路径,最后将监听事件添加到EventThreadwaitingEvents阻塞队列中异步处理,在finally中调用finishPacket中判断若getData方法watch若为true,则将Watcher加入到path对应的Watcher集合中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class SendThread extends ZooKeeperThread {
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) { // -2 is the xid for pings 处理Ping
return;
}
if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
}
return;
}
if (replyHdr.getXid() == -1) { // 收到服务端数据变动返回事件
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
if (chrootPath != null) { // 从服务端路径转换为客户端路径
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
}
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent(we);
return;
}
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + +replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
} finally {
finishPacket(packet); // 收到服务端命令执行完毕返回事件
}
}
}
public class ClientCnxn {
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) { // getData方法watch若为true则执行该逻辑
p.watchRegistration.register(err); // 初始化watchRegistration,将watcher加入到path对应的watcher集合中取
}
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll(); // 唤醒客户端等待
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
}

EventThread

EventThread只要完成监听事件的异步执行,通过queueEvent方法将监听事件添加到waitingEvents阻塞队列中,通过processEvent方法执行具体的监听器的回调方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {// materialize the watchers based on the event
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
waitingEvents.add(pair);
}
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {}
}
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event); // 执行具体Watcher的process方法
} catch (Throwable t) {}
}
} else if (event instanceof LocalCallback) {
LocalCallback lcb = (LocalCallback) event;
if (lcb.cb instanceof StatCallback) {
((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof DataCallback) {
((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof ACLCallback) {
((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof ChildrenCallback) {
((ChildrenCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof Children2Callback) {
((Children2Callback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof StringCallback) {
((StringCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else {
((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx);
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
} else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response).getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response).getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response).getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath().substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof Create2Response) {
Create2Callback cb = (Create2Callback) p.cb;
Create2Response rsp = (Create2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath().substring(chrootPath.length())), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
if (rc == 0) {
List<OpResult> results = rsp.getResultList();
int newRc = rc;
for (OpResult result : results) {
if (result instanceof ErrorResult && KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result).getErr())) {
break;
}
}
cb.processResult(newRc, clientPath, p.ctx, results);
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {}
}
}

Create

对于创建节点将数据和serverPath等数据封装到CreateRequest中,调用ClientCnxnsubmitRequest方法,将Request封装到packet中,将packet放入发送队列outgoingQueue阻塞队列中等待发送,然后调用ClientCnxnSocketNIOpacketAdded方法唤醒阻塞在selectorselect方法上的线程将待发送队列outgoingQueue中的命令数据发给服务端。然后调用Packetwait方法阻塞等待Server返回,最终被SendThreaddoIO的isReadable逻辑中执行finishPacket方法中调用Packet的notifyAll方法唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class ZooKeeper implements AutoCloseable {
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
EphemeralType.validateTTL(createMode, -1);
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
}
public class ClientCnxn {
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {
return submitRequest(h, request, response, watchRegistration, null);
}
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// 将Request封装到packet中,将packet放入发送队列outgoingQueue中等待发送
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) { // Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else { // Wait for request completion infinitely
while (!packet.finished) {
packet.wait(); // 等待Server返回,最终会被SendThread的doIO的isReadable逻辑中执行finishPacket方法唤醒
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
Packet packet = null;
// request中有一个是否监听的watch属性传到服务端,服务端根据该属性做对应监听处理
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet); // 将发送数据包放入outgoingQueue阻塞队列
}
}
// 用于唤醒阻塞在select方法上的线程,为了出发写事件,底层会往管道中写一个字节,写事件出发后会将待发送队列中的命令数据发给服务端
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
}

getData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ZooKeeper implements AutoCloseable {
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
return getData(path, watch ? watchManager.defaultWatcher : null, stat);
}
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null); // 是否注册监听器
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
}

服务端最终通过FinalRequestProcessorprocessRequest调用ZKDatabasegetData方法获取数据,若客户端请求时watch为true,则Watcher不为null则将其添加到dataWatches中。若发生数据变更时则调用WatchManagertriggerWatch方法触发监听机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ZKDatabase {
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
return dataTree.getData(path, stat, watcher);
}
}
public class DataTree {
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
}
class WatchManager {
synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);

HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
}

triggerWatch触发监听机制首先将该路径对应的监听器移除,然后调用每个监听器对应客户端对应的NettyServerCnxn的process方法通知客户端节点变更,客户端收到通知会触发监听回调方法。监听的回调方法并没有注册到服务端,只是将监听路径注册到了服务端,当服务端发生数据变更时,遍历监听的路径回发给客户端,客户端通过路径匹配到对应的监听器回调方法完成回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class WatchManager {
Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path); // 一次性监听体现
if (watchers == null || watchers.isEmpty()) {
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e); // 调用NettyServerCnxn的process方法节点变更通知客户端,客户端收到通知会触发监听回调方法调用
}
return watchers;
}
}
public class NettyServerCnxn extends ServerCnxn {
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
try {
sendResponse(h, e, "notification");
} catch (IOException e1) {
close();
}
}
}