Zookeeper服务端之ZAB

服务端处理客户端的请求入口是通过NettyServerCnxnFactory无产构造函数中启动Netty服务端时绑定的CnxnChannelHandlerchannelRead方法。该构造方法集群启动时通过QuorumPeerMainrunFromConfig中调用ServerCnxnFactory.createFactory()反射调用

当收到客户端请求后CnxnChannelHandlerchannelRead方法中调用NettyServerCnxnprocessMessage方法从而调用receiveMessage方法,将数据读到ByteBuffer中,然后调用ZooKeeperServerprocessPacket处理数据包,最终在submitRequest方法中通过执行firstProcessorprocessRequest方法执行请求处理链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
public class NettyServerCnxnFactory extends ServerCnxnFactory {
CnxnChannelHandler channelHandler = new CnxnChannelHandler();
NettyServerCnxnFactory() {
x509Util = new ClientX509Util();
boolean usePortUnification = Boolean.getBoolean(PORT_UNIFICATION_KEY);
if (usePortUnification) {
try {
QuorumPeerConfig.configureSSLAuth();
} catch (QuorumPeerConfig.ConfigException e) {
usePortUnification = false;
}
}
this.shouldUsePortUnification = usePortUnification;
// 初始化Netty线程组
EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(NettyUtils.getClientReachableLocalInetAddressCount());
EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.nioOrEpollServerSocketChannel())
.option(ChannelOption.SO_REUSEADDR, true) // parent channel options
.childOption(ChannelOption.TCP_NODELAY, true) // child channels options
.childOption(ChannelOption.SO_LINGER, -1)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (secure) {
initSSL(pipeline, false);
} else if (shouldUsePortUnification) {
initSSL(pipeline, true);
}
pipeline.addLast("servercnxnfactory", channelHandler); // 绑定业务处理CnxnChannelHandler
}
});
this.bootstrap = configureBootstrapAllocator(bootstrap);
this.bootstrap.validate();
}
}
class CnxnChannelHandler extends ChannelDuplexHandler {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
try {
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn == null) {
} else {
cnxn.processMessage((ByteBuf) msg); // 客户端执行命令最终调用该方法
}
} catch (Exception ex) {
throw ex;
}
} finally {
ReferenceCountUtil.release(msg);
}
}
}
public class NettyServerCnxn extends ServerCnxn {
void processMessage(ByteBuf buf) {
checkIsInEventLoop("processMessage");
if (throttled.get()) {
if (queuedBuffer == null) {
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedDuplicate());
} else {
if (queuedBuffer != null) {
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
receiveMessage(buf); // 处理客户端命令
if (!closingChannel && buf.isReadable()) {
if (queuedBuffer == null) {
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
}
}
}
}
private void receiveMessage(ByteBuf message) {
try {
while(message.isReadable() && !throttled.get()) {
if (bb != null) {
if (LOG.isTraceEnabled()) {
ByteBuffer dat = bb.duplicate();
dat.flip();
}
if (bb.remaining() > message.readableBytes()) {
int newLimit = bb.position() + message.readableBytes();
bb.limit(newLimit);
}
message.readBytes(bb);
bb.limit(bb.capacity());
if (LOG.isTraceEnabled()) {
ByteBuffer dat = bb.duplicate();
dat.flip();
}
if (bb.remaining() == 0) {
packetReceived();
bb.flip();
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
throw new IOException("ZK down");
}
if (initialized) {
zks.processPacket(this, bb);
if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {
disableRecvNoWait();
}
} else {
zks.processConnectRequest(this, bb);
initialized = true;
}
bb = null;
}
} else {
if (LOG.isTraceEnabled()) {
ByteBuffer dat = bbLen.duplicate();
dat.flip();
}
if (message.readableBytes() < bbLen.remaining()) {
bbLen.limit(bbLen.position() + message.readableBytes());
}
message.readBytes(bbLen);
bbLen.limit(bbLen.capacity());
if (bbLen.remaining() == 0) {
bbLen.flip();
int len = bbLen.getInt();
bbLen.clear();
if (!initialized) {
if (checkFourLetterWord(channel, message, len)) {
return;
}
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
bb = ByteBuffer.allocate(len);
}
}
}
} catch(IOException e) {
close();
}
}
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch (RuntimeException e) {
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else {
if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer, cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, rsp, "response"); // not sure about 3rd arg..what is it?
return;
} else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
setLocalSessionFlag(si);
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
} catch (RequestProcessorException e) {
}
}
}

若是单机则是通过ZooKeeperServerMainrunFromConfig中调用ServerCnxnFactorystartup方法,最终调用子类NettyServerCnxnFactorystartup方法从而调用ZooKeeperServer子类LeaderZooKeeperServerstartup方法从而调用其setupRequestProcessors方法完成Leader请求处理链加载。若为集群则是Leader选举出来后通过Leaderlead方法中调用startZkServer从而调用ZooKeeperServer子类LeaderZooKeeperServerstartup方法从而调用其setupRequestProcessors方法完成Leader请求处理链加载

最终产生的请求处理链为LeaderRequestProcessorPrepRequestProcessorProposalRequestProcessorCommitProcessorToBeAppliedRequestProcessorFinalRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Leader {
private synchronized void startZkServer() {
lastCommitted = zk.getZxid();
QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());
self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
if (designatedLeader != self.getId()) {
allowedToCommit = false;
}
zk.startup(); // 加载
self.updateElectionVote(getEpoch());
zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}
}
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
public synchronized void startup() {
super.startup();
if (containerManager != null) {
containerManager.start();
}
}
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors(); // 回调子类LeaderZooKeeperServer的setupRequestProcessors方法
registerJMX();
setState(State.RUNNING);
notifyAll();
}
}

LeaderRequestProcessor

首先执行LeaderRequestProcessorprocessRequest方法完成检查处理Session,然后继续调用PrepRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class LeaderRequestProcessor implements RequestProcessor {
public void processRequest(Request request) throws RequestProcessorException {
Request upgradeRequest = null;
try {
upgradeRequest = lzks.checkUpgradeSession(request); // 检查处理Session
} catch (KeeperException ke) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(ke.code().intValue()));
}
request.setException(ke);
} catch (IOException ie) {
}
if (upgradeRequest != null) {
nextProcessor.processRequest(upgradeRequest);
}
nextProcessor.processRequest(request);
}
}

PrepRequestProcessor

调用PrepRequestProcessorprocessRequest方法只是将请求任务放入submittedRequests阻塞队列中,PrepRequestProcessor本身是一个线程类,在LeaderZooKeeperServer中创建时就被启动,当队列中有数据时通过pRequest方法根据OpCode执行对应的逻辑,主要是做一些校验,若有变化则将其其添加到ZooKeeperServeroutstandingChanges队列中,然后调用下一个处理器ProposalRequestProcessor,生成事务zxid处理客户端命令逻辑是单线程从队列中拿数据处理保证事务处理的顺序一致性,且只在主节点调用ZooKeeperServergetNextZxid方法通过AtomicLong自增得到事务zxid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
public void processRequest(Request request) {
submittedRequests.add(request);
}
public void run() {
try {
while (true) { // 单线程处理
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (Request.requestOfDeath == request) {
break;
}
pRequest(request);
}
} catch (RequestProcessorException e) {
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
}
protected void pRequest(Request request) throws RequestProcessorException {
request.setHdr(null);
request.setTxn(null);
try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = new CreateTTLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
break;
case OpCode.deleteContainer:
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch(IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
long zxid = zks.getNextZxid();
KeeperException ke = null;
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
for(Op op: multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn;
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} else { /* Prep the request and convert to a Txn */
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = request.getHdr().getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());
request.setException(e);
rollbackPendingChanges(zxid, pendingChanges);
}
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request") ;
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
txns.add(new Txn(type, bb.array()));
}
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type));
request.setTxn(new MultiTxn(txns));
break;
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
}
break;
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.checkWatches:
case OpCode.removeWatches:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
}
request.setException(e);
} catch (Exception e) {
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if(bb != null){
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
}
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private final AtomicLong hzxid = new AtomicLong(0);
long getNextZxid() {
return hzxid.incrementAndGet();
}
}

ProposalRequestProcessor

ProposalRequestProcessor主要完成三件事,首先调用CommitProcessor,然后给所有follower发送proposal,最后将数据存储到本机日志文件中。若是getData不涉及事务变更的请求,则其Request的hdr属性为空,则直接跳过给follower发送proposal以及将数据保存到本机日志文件中的步骤

给所有follower发送proposal是通过调用Leaderpropose方法,首先封装要发送给followerproposal,然后遍历所有follower发送packet,实际是将packet放入各自follower对应LearnerHandlerqueuedPackets阻塞队列中。队列中的数据最终被各自的LearnerHandler线程中调用startSendingPackets方法启动一个新线程完成队列的消费从而给follower发送proposalLearnerHandler线程是在选举完成后Leader的lead方法LearnerCnxAcceptor线程启动中启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class ProposalRequestProcessor implements RequestProcessor {
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
this.zks = zks;
this.nextProcessor = nextProcessor;
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
}
public void initialize() {
syncProcessor.start();
}
public void processRequest(Request request) throws RequestProcessorException {
if (request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
nextProcessor.processRequest(request); // 调用CommitProcessor
if (request.getHdr() != null) { // 若为getData等不需要同步数据的请求则跳过以下两步
try {
zks.getLeader().propose(request); // 给所有follower发送propose
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request); // 将数据存储到本机日志文件中,ACK处理
}
}
}
}
public class Leader {
public Proposal propose(Request request) throws XidRolloverException {
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized(this) {
p.addQuorumVerifier(self.getQuorumVerifier());
if (request.getHdr().getType() == OpCode.reconfig){
self.setLastSeenQuorumVerifier(request.qv, true);
}
if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
sendPacket(pp); // 遍历follower放入其对应的阻塞队列中
}
return p;
}
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp); // 将Proposal放入对应阻塞队列中
}
}
}
}
public class LearnerHandler extends ZooKeeperThread {
protected void startSendingPackets() {
if (!sendingThreadStarted) {
// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets(); // 从队列中取出Proposal发送给对应的follower
} catch (InterruptedException e) {}
}
}.start();
sendingThreadStarted = true;
}
}
private void sendPackets() throws InterruptedException {
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
while (true) {
try {
QuorumPacket p;
p = queuedPackets.poll(); // 从队列中拿packet发给follower
if (p == null) {
bufferedOutput.flush();
p = queuedPackets.take();
}
if (p == proposalOfDeath) {// Packet of death!
break;
}
if (p.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (p.getType() == Leader.PROPOSAL) {
syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
}
oa.writeRecord(p, "packet"); // 使用jute序列化方式发给follower
} catch (IOException e) {
if (!sock.isClosed()) {
try {
sock.close();
} catch(IOException ie) {
}
}
break;
}
}
}
}

将数据存储到本机日志文件中是通过在ProposalRequestProcessor构造方法中创建的SyncRequestProcessorAckRequestProcessor来完成。SyncRequestProcessor也是线程类通过ProposalRequestProcessorinitialize方法启动。

调用SyncRequestProcessorprocessRequest方法只是将Request放入到阻塞队列中。通过其run方法异步处理,将其更新到事务日志文件中,若是Leader则调用AckRequestProcessor从而调用LeaderprocessAck方法,通过addAck方法将当前节点ACK放入qvAcksetPairs中,然后调用tryToCommit方法判断收到的ACK是否超过半数,若未超过则退出,若超过则向所有follower发送commit命令且向Observer发送proposal,最后唤醒CommitProcessor线程的wait等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
public void processRequest(Request request) {
queuedRequests.add(request);
}
public void run() {
try {
int logCount = 0;
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) { // 若为失效请求则直接退出
break;
}
if (si != null) {// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {// 添加到事务日志文件中
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
zks.getZKDatabase().rollLog(); // roll the log
if (snapInProcess != null && snapInProcess.isAlive()) { // take a snapshot
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
}
private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException {
if (toFlush.isEmpty()) return;
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
}
class AckRequestProcessor implements RequestProcessor {
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null) leader.processAck(self.getId(), request.zxid, null);
}
}
public class Leader {
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { // 在LearnerHandler中当Leader接收到Follower的ACK时也会调用该方法
if (!allowedToCommit) return; // last op committed was a leader change - from now on
if ((zxid & 0xffffffffL) == 0) {
return;
}
if (outstandingProposals.size() == 0) {
return;
}
if (lastCommitted >= zxid) { // The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid); // 在ProposalRequestProcessor中给给follower发送propose之前放入
if (p == null) {
return;
}
p.addAck(sid); // leader节点直接将ack放入qvAcksetPairs中
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p != null) {
curZxid++;
p = outstandingProposals.get(curZxid);
if (p != null) hasCommitted = tryToCommit(p, curZxid, null);
}
}
}
synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
if (outstandingProposals.containsKey(zxid - 1)) return false;
if (!p.hasAllQuorums()) {
return false; // 若ACK未超过半数则直接返回false
}
outstandingProposals.remove(zxid);
if (p.request != null) {
toBeApplied.add(p);
}
if (p.request == null) {
} else if (p.request.getHdr().getType() == OpCode.reconfig) {
Long designatedLeader = getDesignatedLeader(p, zxid);
QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier();
self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
if (designatedLeader != self.getId()) {
allowedToCommit = false;
}
commitAndActivate(zxid, designatedLeader);
informAndActivate(p, designatedLeader);
} else {
commit(zxid); // 向follower发送commit命令
inform(p); // 向Observer同步发送proposal
}
zk.commitProcessor.commit(p.request); // 唤醒CommitProcessor线程的wait等待
if (pendingSyncs.containsKey(zxid)) {
for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
return true;
}
public void commit(long zxid) {
synchronized (this) {
lastCommitted = zxid;
}
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp); // 给所有follower发送COMMIT命令
}
public void inform(Proposal proposal) {
QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);
sendObserverPacket(qp);
}
void sendObserverPacket(QuorumPacket qp) {
for (LearnerHandler f : getObservingLearners()) {
f.queuePacket(qp);
}
}
}
public class SyncedLearnerTracker {
public boolean addAck(Long sid) {
boolean change = false;
for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
qvAckset.getAckset().add(sid);
change = true;
}
}
return change;
}
}

CommitProcessor

当调用CommitProcessorprocessRequest方法时会将Request添加到queuedRequests阻塞队列中,通过run方法异步处理,首先明显queuedRequests不为null,正在等待提交的请求为null,正在提交的请求为null,故不会执行wait方法,Request不需要commitgetData等命令,则直接往下执行下一个RequestProcessor,否则将其放入nextPending中,然后执行processCommitted方法时由于committedRequests为空则什么都不做,当再次执行while时,明显正在等待提交的请求不为null且committedRequests为空,则现场会被wait住。

当Leader收到的选票超过半数时在tryToCommit中调用CommitProcessorcommit方法将request放入committedRequests阻塞队列中且会唤醒CommitProcessor线程,然后执行processCommitted方法从而执行sendToNextProcessor,在CommitWorkRequestdoWork方法中调用下一个请求处理器ToBeAppliedRequestProcessor,且清除当前正在提交的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
// 在提交到来之前一直持有的请求
protected final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
// 已提交的请求
protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue<Request>();
// 目前正在等待提交的请求
protected final AtomicReference<Request> nextPending = new AtomicReference<Request>();
// 当前正在提交的请求(即发送到下一个处理器)
private final AtomicReference<Request> currentlyCommitting = new AtomicReference<Request>();
// 当前正在处理的请求数
protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
public void processRequest(Request request) {
if (stopped) {
return;
}
queuedRequests.add(request);
if (!isWaitingForCommit()) {
wakeup(); // 目前正在等待提交的请求为null
}
}
public void commit(Request request) { // 被Leader的tryToCommit调用或被LearnerZooKeeperServer的commit方法调用
if (stopped || request == null) {
return;
}
committedRequests.add(request);
if (!isProcessingCommit()) { // 正在提交的请求为null
wakeup(); // 唤醒CommitProcessor线程的wait等待
}
}
public void run() {
Request request;
try {
while (!stopped) {
synchronized (this) {
while (!stopped && ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) && (committedRequests.isEmpty() || isProcessingRequest()))) {
wait();// (提交的请求为空 | 正在等待提交的请求不为null | 正在提交的请求数不为null) && (已提交的请求为null || 正在处理的请求数不为0)
}
}
// 正在等待提交的请求为null && 正在提交的请求为null && 提交的请求不为null,queuedRequests是阻塞队列没有数据poll会被阻塞
while (!stopped && !isWaitingForCommit() && !isProcessingCommit() && (request = queuedRequests.poll()) != null) {
if (needCommit(request)) {
nextPending.set(request); // 放入等待提交请求队列
} else { // 若是不需要执行commit的getData等命令直接往下走
sendToNextProcessor(request);
}
}
processCommitted(); // 等待线程被唤醒后返回客户端结果以及写内存数据
}
} catch (Throwable e) {
handleException(this.getName(), e);
}
}
private void sendToNextProcessor(Request request) {
numRequestsProcessing.incrementAndGet(); // 当前正在处理的请求数加一
workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
}
protected void processCommitted() {
Request request;
if (!stopped && !isProcessingRequest() && (committedRequests.peek() != null)) { // 当前正在处理的请求数为0,且被提交的请求不为null
if (!isWaitingForCommit() && !queuedRequests.isEmpty()) {
return; // 正在等待提交的请求为null或queuedRequests中有新请求等待
}
request = committedRequests.poll();
// 与nextPending匹配,以便可以在提交时移动到下一个请求。还想使用nextPending,因为它正确设置了cnxn成员。
Request pending = nextPending.get();
if (pending != null && pending.sessionId == request.sessionId && pending.cxid == request.cxid) {
pending.setHdr(request.getHdr());
pending.setTxn(request.getTxn());
pending.zxid = request.zxid;
currentlyCommitting.set(pending);
nextPending.set(null);
sendToNextProcessor(pending);
} else { // 若请求来自其他人则只发送提交数据包
currentlyCommitting.set(request);
sendToNextProcessor(request);
}
}
}
private class CommitWorkRequest extends WorkerService.WorkRequest {
public void doWork() throws RequestProcessorException {
try {
nextProcessor.processRequest(request); // 调用下一个请求处理器ToBeAppliedRequestProcessor
} finally {
currentlyCommitting.compareAndSet(request, null); // 若此请求是阻塞处理器提交请求,则清除当前正在提交的请求
if (numRequestsProcessing.decrementAndGet() == 0) { // 正在处理的请求数减一
if (!queuedRequests.isEmpty() || !committedRequests.isEmpty()) {
wakeup(); // 减少处理的请求计数,处理器目前可能被阻塞,因为它正在等待管道排空,该情况下若有待处理请求,则将其唤醒
}
}
}
}
}
}

ToBeAppliedRequestProcessor

ToBeAppliedRequestProcessor的下一个RequestProcessor必须是FinalRequestProcessor,其本身没有做什么事,将在请求从toBeApplied中移除,调用FinalRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static class ToBeAppliedRequestProcessor implements RequestProcessor {
ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
if (!(next instanceof FinalRequestProcessor)) {
throw new RuntimeException(ToBeAppliedRequestProcessor.class.getName() + " must be connected to " + FinalRequestProcessor.class.getName() + " not " + next.getClass().getName());
}
this.leader = leader;
this.next = next;
}
public void processRequest(Request request) throws RequestProcessorException {
next.processRequest(request);// 调用下一个请求处理器FinalRequestProcessor
if (request.getHdr() != null) {
long zxid = request.getHdr().getZxid();
Iterator<Proposal> iter = leader.toBeApplied.iterator();
if (iter.hasNext()) {
Proposal p = iter.next();
if (p.request != null && p.request.zxid == zxid) {
iter.remove(); // 将在请求从toBeApplied中移除
return;
}
}
}
}
}

FinalRequestProcessor

FinalRequestProcessor将数据写入到内存中,这时客户端才可以真正查到该数据,当调用ZKDatabaseprocessTxn从而根据请求类型调用具体的方法将数据更新到DataTree,会触发设置的监听器,最后将结果响应给客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
public class FinalRequestProcessor implements RequestProcessor {
public void processRequest(Request request) {
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) { // outstandingChanges数据是在PrepRequestProcessor中添加
rc = zks.processTxn(request); // 写数据到内存中,这这之后客户端节点才可以操作该数据
if (request.getHdr() != null) { // request.hdr是为写请求设置的,这是唯一添加到outstandingChanges的请求
TxnHeader hdr = request.getHdr();
Record txn = request.getTxn();
long zxid = hdr.getZxid();
while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = zks.outstandingChanges.remove();
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
}
if (request.isQuorum()) { // do not add non quorum packets to the queue.
zks.getZKDatabase().addCommittedProposal(request);
}
}
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
if (closeSession(zks.serverCnxnFactory, request.sessionId) || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
return;
}
}
if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn;
String lastOp = "NA";
zks.decInProcess();
Code err = Code.OK;
Record rsp = null;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
if (request.getException() != null) {
throw request.getException();
} else {
throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
}
}
KeeperException ke = request.getException();
if (ke != null && request.type != OpCode.multi) {
throw ke;
}
switch (request.type) { // 根据request.type生成对应的rsp
case OpCode.ping: {
zks.serverStats().updateLatency(request.createTime);
lastOp = "PING";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, request.createTime, Time.currentElapsedTime());
cnxn.sendResponse(new ReplyHeader(-2, zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
return;
}
case OpCode.createSession: {
zks.serverStats().updateLatency(request.createTime);
lastOp = "SESS";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, request.createTime, Time.currentElapsedTime());
zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
lastOp = "CREA";
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
err = Code.get(rc.err);
break;
}
case OpCode.setData: {
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
break;
}
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// XXX We really should NOT need this!!!!
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), cnxn);
break;
}
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo);
List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
break;
}
}
}
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());
try {
cnxn.sendResponse(hdr, rsp, "response"); // 响应客户端
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {}
}
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public ProcessTxnResult processTxn(Request request) {
return processTxn(request, request.getHdr(), request.getTxn());
}
private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = request != null ? request.type : hdr.getType();
long sessionId = request != null ? request.sessionId : hdr.getClientId();
if (hdr != null) {
rc = getZKDatabase().processTxn(hdr, txn); // 写数据到内存中
} else {
rc = new ProcessTxnResult();
}
if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
} else if (request != null && request.isLocalSession()) {
request.request.rewind();
int timeout = request.request.getInt();
request.request.rewind();
sessionTracker.addSession(request.sessionId, timeout);
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
}
public class ZKDatabase {
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return dataTree.processTxn(hdr, txn);
}
public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
return this.processTxn(header, txn, false);
}
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime(), null);
break;
case OpCode.create2:
CreateTxn create2Txn = (CreateTxn) txn;
rc.path = create2Txn.getPath();
Stat stat = new Stat();
createNode(create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(), create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.createTTL:
CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
rc.path = createTtlTxn.getPath();
stat = new Stat();
createNode(createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(), EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.createContainer:
CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
rc.path = createContainerTxn.getPath();
stat = new Stat();
createNode(createContainerTxn.getPath(), createContainerTxn.getData(), createContainerTxn.getAcl(), EphemeralType.CONTAINER_EPHEMERAL_OWNER, createContainerTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.delete:
case OpCode.deleteContainer:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.reconfig:
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(), header.getZxid(), header.getTime());
break;
case OpCode.closeSession:
killSession(header.getClientId(), header.getZxid());
break;
}
} catch (KeeperException e) {
rc.err = e.code().intValue();
} catch (IOException e) {}
if (!isSubTxn) {
if (rc.zxid > lastProcessedZxid) {
lastProcessedZxid = rc.zxid;
}
}
if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
int lastSlash = rc.path.lastIndexOf('/');
String parentName = rc.path.substring(0, lastSlash);
CreateTxn cTxn = (CreateTxn)txn;
try {
setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
} catch (KeeperException.NoNodeException e) {
rc.err = e.code().intValue();
}
}
return rc;
}
}
public class DataTree {
public void createNode(final String path, byte data[], List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws KeeperException.NoNodeException, KeeperException.NodeExistsException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
}
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
Long longval = aclCache.convertAcls(acl);
DataNode child = new DataNode(data, longval, stat);
parent.addChild(childName);
nodes.put(path, child);
EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
if (outputStat != null) {
child.copyStat(outputStat);
}
}
if (parentName.startsWith(quotaZookeeper)) {
if (Quotas.limitNode.equals(childName)) {
pTrie.addPath(parentName.substring(quotaZookeeper.length()));
}
if (Quotas.statNode.equals(childName)) {
updateQuotaForPath(parentName.substring(quotaZookeeper.length()));
}
}
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) { // ok we have some match and need to update
updateCount(lastPrefix, 1);
updateBytes(lastPrefix, data == null ? 0 : data.length);
}
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
}
}

与Follower交互

LearnerHandlerrun方法中会死循环接收从节点发来的数据,然后根据请求类型做响应的逻辑,当收到ACK请求时,将调用LeaderprocessAck方法收集ACK投票,当票数超过一半唤醒CommitProcessor线程的wait等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
public class LearnerHandler extends ZooKeeperThread {
public void run() {
try {
leader.addLearnerHandler(this);
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
return;
}
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
if (this.getVersion() < 0x10000) {
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
if (needSnap) {
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
LearnerSnapshot snapshot = leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
try {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
snapshot.close();
}
}
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
startSendingPackets(); // 启动线程将队列中的数据包发送给Follower
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if (qp.getType() != Leader.ACK) {
return;
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid());
syncLimitCheck.start();
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
synchronized (leader.zk) {
while (!leader.zk.isRunning() && !this.isInterrupted()) {
leader.zk.wait(20);
}
}
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
ByteBuffer bb;
long sessionId;
int cxid;
int type;
switch (qp.getType()) {
case Leader.ACK: // 处理Follower发送给Leader的ACK
syncLimitCheck.updateAck(qp.getZxid());
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
leader.zk.touch(sess, to);
}
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
leader.zk.submitLearnerRequest(si);
break;
default:
break;
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
try { //close the socket to make sure the other side can see it being close
sock.close();
}
}
} finally {
shutdown();
}
}
}

LearnerHandlerrun方法中通过startSendingPackets方法,启动了一个线程从阻塞队列queuedPackets中poll请求数据包,发送给follower。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class LearnerHandler extends ZooKeeperThread {
protected void startSendingPackets() {
if (!sendingThreadStarted) {// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {}
}
}.start();
sendingThreadStarted = true;
}
}
private void sendPackets() throws InterruptedException {
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
while (true) {
try {
QuorumPacket p;
p = queuedPackets.poll();
if (p == null) {
bufferedOutput.flush();
p = queuedPackets.take();
}
if (p == proposalOfDeath) {// Packet of death!
break;
}
if (p.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (p.getType() == Leader.PROPOSAL) {
syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
}
oa.writeRecord(p, "packet");
} catch (IOException e) {
if (!sock.isClosed()) {
try {// this will cause everything to shutdown on this learner handler and will help notify the learner/observer instantaneously
sock.close();
} catch (IOException ie) {}
}
break;
}
}
}
}

从节点

对于从节点当选举完成后会调用FollowerfollowLeader方法中通过syncWithLeader调用ZooKeeperServer的子类FollowerZooKeeperServersetupRequestProcessors方法加载请求处理链FollowerRequestProcessorCommitProcessorFinalRequestProcessorSyncRequestProcessorSendAckRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class Follower extends Learner{
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader(); // 获取leader server
try {
connectToLeader(leaderServer.addr, leaderServer.hostname); // 主动向leader发起socket连接
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); // 注册自己到Leader
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid); // 同步leader数据
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) { // while死循环接收leader同步的数据
readPacket(qp); // 若leader挂了,这里从leader取数据时会抛出异常退出循环
processPacket(qp);
}
} catch (Exception e) {
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
}
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
}

当接收到Leader发送的数据时调用首先调用readPacket使用jute序列化从输入流中拿数据,然后调用processPacket根据请求类型调用具体的方法处理请求数据。对于Leader.PROPOSAL类型的请求调用FollowerZooKeeperServerlogRequest方法,从而异步执行SyncRequestProcessor将数据更新到事务日志文件中。

对于Follower来说,SyncRequestProcessornextProcessorSendAckRequestProcessor,故当将数据同步到事务日志文件中后,则执行SendAckRequestProcessor给Leader回复ACK命令,当Leader收到的ACK票数超过一半时执行Commit操作,且给Follower发送Commit命令。当收到Leader的Commit命令后调用FollowerZooKeeperServercommit方法执行CommitProcessorcommit关键方法,然后执行FinalRequestProcessor将数据同步到内存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class Learner {
void readPacket(QuorumPacket pp) throws IOException {
synchronized (leaderIs) {
leaderIs.readRecord(pp, "packet"); // 调用QuorumPacket的deserialize方法,使用jute序列化从输入流中拿数据,jute类似protobuf
}
}
}
public class Follower extends Learner{
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
lastQueued = hdr.getZxid();
if (hdr.getType() == OpCode.reconfig){
SetDataTxn setDataTxn = (SetDataTxn) txn;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
case Leader.COMMITANDACTIVATE:
Request request = fzk.pendingTxns.element();
SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
fzk.commit(qp.getZxid());
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
case Leader.UPTODATE:
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
fzk.sync();
break;
default:
break;
}
}
}
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
syncProcessor.processRequest(request);
}
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
System.exit(12);
}
Request request = pendingTxns.remove();
commitProcessor.commit(request);
}
}
public class SendAckRequestProcessor implements RequestProcessor, Flushable {
public void processRequest(Request si) {
if(si.type != OpCode.sync){
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
try {
learner.writePacket(qp, false); // 向Leader发送构建的ACK的packet,被LearnerHandler.run接收到执行Leader的processAck方法
} catch (IOException e) {
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {}
}
}
}
}