Nacos集群CP模式

RaftPeerSet组件初始化时,会将所有集群成员遍历初始化成RaftPeer,即完成peers初始化以及将ready设置为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RaftPeerSet extends MemberChangeListener implements Closeable {
@PostConstruct
public void init() {
NotifyCenter.registerSubscriber(this); // 注册当前RaftPeerSet为MemberChangeEvent事件的订阅者
changePeers(memberManager.allMembers()); // 传入所有服务端成员初始化peers
}
protected void changePeers(Collection<Member> members) {
Map<String, RaftPeer> tmpPeers = new HashMap<>(members.size());
for (Member member : members) {
final String address = member.getAddress();
if (peers.containsKey(address)) { // 若已包含则获取原来的设置到tmpPeers中
tmpPeers.put(address, peers.get(address));
continue;
}
RaftPeer raftPeer = new RaftPeer(); // 若不存在则创建一个RaftPeer
raftPeer.ip = address; // 初始化ip为当前成员的地址
// first time meet the local server:
if (EnvUtil.getLocalAddress().equals(address)) {
raftPeer.term.set(localTerm.get()); // 若当前成员为本机则设置周期term,localTerm起始为0
}
tmpPeers.put(address, raftPeer);
}
peers = tmpPeers; // replace raft peer set:
ready = true;
Loggers.RAFT.info("raft peers changed: " + members);
}
}

Leader选举

RaftCore初始化时会首先从文件中加载持久化的节点数据,然后从{nacos_home}\data\naming\meta.properties文件中加载选举周期,然后添加每500ms定期执行集群leader选举MasterElection任务和集群节点心跳检测的HeartBeat任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RaftCore implements Closeable {
@PostConstruct
public void init() throws Exception {
final long start = System.currentTimeMillis();
raftStore.loadDatums(notifier, datums); // 加载持久化数据
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); // 加载{nacos_home}\data\naming\meta.properties文件中的周期
initialized = true;
masterTask = GlobalExecutor.registerMasterElection(new MasterElection()); // 定期执行集群leader选举的MasterElection任务
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat()); // 定时执行集群节点心跳检测的HeartBeat任务
versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 100);
NotifyCenter.registerSubscriber(notifier); // 注册PersistentNotifier订阅者
}
public synchronized void loadDatums(PersistentNotifier notifier, Map<String, Datum> datums) throws Exception {
Datum datum;
long start = System.currentTimeMillis();
for (File cache : listCaches()) {
if (cache.isDirectory() && cache.listFiles() != null) {
for (File datumFile : cache.listFiles()) {
datum = readDatum(datumFile, cache.getName()); // 读取文件中缓存的数据
if (datum != null) {
datums.put(datum.key, datum);
if (notifier != null) { // 通知PersistentNotifier订阅者
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
}
}
}
continue;
}
datum = readDatum(cache, StringUtils.EMPTY);
if (datum != null) {
datums.put(datum.key, datum);
}
}
}
}

虽然MasterElection选举任务是每500ms执行一次,但会对leaderDueMs初始时默认从0到15s随机一个时间,然后每500ms执行时减去500ms直到leaderDueMs小于0,然后将leaderDueMs重置为15s + 05s的随机时间15s - 20s,且将heartbeatDueMs重置为5s,然后发起集群leader投票

第一个被唤醒的节点,将重置leadernull,且将所有节点RaftPeer选票voteFor置为null,然后先将选票投给自己,然后给其它服务端成员的/raft/vote接口发送投票数据,最后回收投票信息,计算并决定哪个节点是leader节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class MasterElection implements Runnable {
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) { // RaftPeerSet初始化方法中changePeers完成peers初始化将ready置为true
return;
}
RaftPeer local = peers.local(); // 获取本机对应的RaftPeer
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; // leaderDueMs默认是从0到15s随机一个时间,减去500ms
if (local.leaderDueMs > 0) { // 若leaderDueMs>0继续等待下次执行
return;
}
local.resetLeaderDue(); // 将leaderDueMs重置为15s + (0到5秒的随机时间) = 15s - 20s
local.resetHeartbeatDue(); // 将heartbeatDueMs重置为5s
sendVote(); // 发起集群leader投票
} catch (Exception e) {
}
}
private void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer()); // 获取本机对应的RaftPeer
peers.reset(); // 将leader和所有的RaftPeer的voteFor字段置null
local.term.incrementAndGet(); // 将本机选举周期加一
local.voteFor = local.ip; // 将本机的voteFor设置为本机IP
local.state = RaftPeer.State.CANDIDATE; // 将本机状态由FOLLOWER变更为CANDIDATE状态
Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));
for (final String server : peers.allServersWithoutMySelf()) { // 给所有其他成员发送投票请求
final String url = buildUrl(server, API_VOTE); // 目标服务成员的/raft/vote接口
try {
HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
return;
}
RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
peers.decideLeader(peer); // 计算并决定哪个节点是领导者,若有新的peer超过半数投票,则将leader更换为新peer
}
});
} catch (Exception e) {
}
}
}
}

当收到响应的投票信息后遍历所有节点统计投票结果,并记录被选举次数最多的节点和次数,若某个节点票数超过一半,则将该节点置为leader节点,并发布选举结果事件,在RaftListener中更新本机对应节点的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RaftPeerSet extends MemberChangeListener implements Closeable {
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) { // 遍历所有节点,若voteFor不为null,则将节点的voteFor添加到ips中,并记录被选举次数最多的节点和次数
if (StringUtils.isEmpty(peer.voteFor)) {
continue; // 若投票结果为null则直接跳过
}
ips.add(peer.voteFor); // 将投票添加到ips中
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
if (maxApproveCount >= majorityCount()) { // 若票数超过一半
RaftPeer peer = peers.get(maxApprovePeer); // 获取出该节点
peer.state = RaftPeer.State.LEADER; // 将该节点状态设置为leader节点
if (!Objects.equals(leader, peer)) {
leader = peer; // 若leader节点不是该节点,将leader节点设置为该节点
ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local())); // 发布选举结果事件,RaftListener
}
}
return leader;
}
}

当其它成员接收到投票信息后,若remote节点的选举周期小于或等于本机节点的选举周期,且本机的选票为空,则将选票投给自己本机选票不为空则直接返回已投出的选票。若remote节点选举周期大于本机节点选举周期则将选票投给remote节点,并将本机的选举周期设置为remote节点的选举周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RaftController {
@PostMapping("/vote")
public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));
return JacksonUtils.transferToJsonNode(peer);
}
}
public class RaftCore implements Closeable {
public synchronized RaftPeer receivedVote(RaftPeer remote) {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
RaftPeer local = peers.get(NetUtils.localServer());
if (remote.term.get() <= local.term.get()) { // 若remote节点的选举周期小于或等于本机节点的选举周期
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip; // 若本机的选票为空,则将选票投给自己
}
return local;
}
local.resetLeaderDue(); // 将leaderDueMs重置为15s + (0到5秒的随机时间) = 15s - 20s
local.state = RaftPeer.State.FOLLOWER; // 将本机节点状态置为FOLLOWER
local.voteFor = remote.ip; // 将本机的选票投给remote节点
local.term.set(remote.term.get()); // 将本机的选举周期设置为remote节点的选举周期
return local;
}
}

心跳检查

虽然HeartBeat是每500ms执行一次,但跟MasterElection选举任务一样,其内部进行了逻辑处理,第一次是从0到5s随机一个时间后开始执行,后续是每5s执行一次,若Leader没有被选举出来也不会发送心跳包。且当leader被选举出来后,每一次执行心跳检查时都将leaderDueMs重置为15s + 05s的随机时间即15s - 20sleaderDueMs大于heartbeatDueMs,从而保证了选举不会重复执行。由于只有leader节点能往其它成员节点发送心跳,且其它成员节点收到心跳数据后会重置leaderDueMs时间,若leader节点挂掉了,则其他节点的leaderDueMs是不能被重置的,则会再次发起投票选举新的leader。

进行心跳检测时会将本节点leader节点上的所有数据的key以及本节点对应的RaftPeer数据通过Gzip压缩后发送给其它所有从节点/raft/beat接口,从节点会检查其数据是否为最新的数据,若不是会调用leader的/raft/datum接口拉取最新数据进行更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) { // RaftPeerSet初始化方法中changePeers完成peers初始化将ready置为true
return;
}
RaftPeer local = peers.local(); // 获取本机对应的RaftPeer
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS; // heartbeatDueMs默认是从0到5s随机一个时间,减去500ms
if (local.heartbeatDueMs > 0) { // 若leaderDueMs>0继续等待下次执行
return;
}
local.resetHeartbeatDue(); // 将heartbeatDueMs重置为5s
sendBeat();
} catch (Exception e) {
}
}
private void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
return; // 若是Standalone模式启动或本机不是leader则不发送心跳检查,若Leader没有选出来也不会发送
}
local.resetLeaderDue(); // 将leaderDueMs重置为15s + (0到5秒的随机时间) = 15s - 20s
ObjectNode packet = JacksonUtils.createEmptyJsonNode(); // build data
packet.replace("peer", JacksonUtils.transferToJsonNode(local));
ArrayNode array = JacksonUtils.createEmptyArrayNode();
if (!switchDomain.isSendBeatOnly()) { // 不仅仅只发送心跳包
for (Datum datum : datums.values()) { // 将KEY和对应的版本放入element中,最终添加到array中
ObjectNode element = JacksonUtils.createEmptyJsonNode();
if (KeyBuilder.matchServiceMetaKey(datum.key)) { // key以com.alibaba.nacos.naming.domains.meta.或meta.开头
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) { // key以com.alibaba.nacos.naming.iplist.或iplist.开头
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp.get());
array.add(element);
}
}
packet.replace("datums", array); // 将array放入数据包
Map<String, String> params = new HashMap<String, String>(1); // broadcast
params.put("beat", JacksonUtils.toJson(packet));
String content = JacksonUtils.toJson(params);
ByteArrayOutputStream out = new ByteArrayOutputStream(); // 使用GZIP将数据进行压缩
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
for (final String server : peers.allServersWithoutMySelf()) { // 遍历每一个从节点(除开自己,自己是leader)
try {
final String url = buildUrl(server, API_BEAT); // 调用从节点的/raft/beat接口发送心跳数据
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) { // 若发送失败
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return;
}
peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class)); // 更新对应IP的RaftPeer
}
});
} catch (Exception e) {
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
}

当其它成员接收到心跳数据后,首先会leader信息进行校验,判断发送心跳的节点是否是leader节点,以及当前节点的选举周期是大于leader节点的选举周期,若是一次收到心跳,还会将设置其voteForleaderip,然后通过RaftPeerSet#makeLeader方法更新本节点上leader信息

然后遍历心跳发送过来的数据,若收到的key在本节点上没有,或本节点上该数据版本小于或等于收到的版本,将其放入List然后调用/raft/datum接口批量从Leader中拉取最新的数据,然后将其写入磁盘文件中并覆盖datums中的数据,更新本节点的选举周期。最后对于未覆盖到的key,说明已被删除,将其从datums中删除且将磁盘文件也删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
public class RaftController {
@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8); // Gzip解压
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JsonNode json = JacksonUtils.toObj(value);
RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
return JacksonUtils.transferToJsonNode(peer);
}
}
public class RaftCore implements Closeable {
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
JsonNode peer = beat.get("peer"); // 存放的leader的RaftPeer数据
remote.ip = peer.get("ip").asText(); // leader的ip
remote.state = RaftPeer.State.valueOf(peer.get("state").asText()); // leader的state
remote.term.set(peer.get("term").asLong()); // leader的选举周期
remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
remote.leaderDueMs = peer.get("leaderDueMs").asLong();
remote.voteFor = peer.get("voteFor").asText();
if (remote.state != RaftPeer.State.LEADER) { // 若接收到的不是leader发送的心跳数据直接抛出异常
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
if (local.term.get() > remote.term.get()) { // 若本机的选举周期大于leader节点的选举周期直接抛出异常
throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
}
if (local.state != RaftPeer.State.FOLLOWER) { // 若本机的状态不是FOLLOWER,则将其置为FOLLOWER并将选票投给当前的leader节点
local.state = RaftPeer.State.FOLLOWER; // mk follower
local.voteFor = remote.ip;
}
final JsonNode beatDatums = beat.get("datums");
local.resetLeaderDue(); // 将leaderDueMs重置为15s + (0到5秒的随机时间) = 15s - 20s
local.resetHeartbeatDue(); // 将heartbeatDueMs重置为5s
peers.makeLeader(remote); // 更新leader信息,将remote设置为新leader,更新原有leader的节点信息
if (!switchDomain.isSendBeatOnly()) {
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0); // 将当前节点的可以放到一个map中,value都置为0
}
List<String> batch = new ArrayList<>(); // now check datums
int processedCount = 0;
for (Object object : beatDatums) { // 遍历心跳数据
processedCount = processedCount + 1;
JsonNode entry = (JsonNode) object;
String key = entry.get("key").asText();
final String datumKey;
if (KeyBuilder.matchServiceMetaKey(key)) { // key以com.alibaba.nacos.naming.domains.meta.或meta.开头
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) { // key以com.alibaba.nacos.naming.iplist.或iplist.开头
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
continue; // ignore corrupted key:
}
long timestamp = entry.get("timestamp").asLong();
receivedKeysMap.put(datumKey, 1); // 将心跳数据覆盖receivedKeysMap中的数据,且其值置为1
try {
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
continue; // 若收到的心跳数据在本地存在,且本地的版本大于等于收到的版本,且还有数据未处理完,则直接continue跳过
}
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey); // 若收到的key在本地没有,或本地版本小于收到的版本,放入批量处理
}
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue; // 只有batch的数量超过50或接收的数据已处理完毕,才进行获取数据操作
}
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) { // 若没有数据需要处理直接跳过
continue;
}
String url = buildUrl(remote.ip, API_GET); // 调用leader的/raft/datum接口获取最新数据
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() { // 批量从leader节点获取keys对应的数据更新到本地
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
return;
}
List<JsonNode> datumList = JacksonUtils.toObj(result.getData(), new TypeReference<List<JsonNode>>() {
});
for (JsonNode datumJson : datumList) { // 保证集群节点间的数据最终一致性
Datum newDatum = null;
OPERATE_LOCK.lock();
try {
Datum oldDatum = getDatum(datumJson.get("key").asText()); // 或去当前机器上的旧的数据
if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp.get()) {
continue; // 若旧数据不为null且旧数据的版本大于或等于新数据的版本,则不需要更新直接跳过
}
if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) { // key以com.alibaba.nacos.naming.domains.meta.或meta.开头
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.get("key").asText();
serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
serviceDatum.value = JacksonUtils.toObj(datumJson.get("value").toString(), Service.class);
newDatum = serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) { // key以com.alibaba.nacos.naming.iplist.或iplist.开头
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.get("key").asText();
instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
instancesDatum.value = JacksonUtils.toObj(datumJson.get("value").toString(), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
continue; // 跳过空数据
}
raftStore.write(newDatum); // 将数据写入磁盘缓存中
datums.put(newDatum.key, newDatum); // 将新的数据添加到缓存中
notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value); // 更新注册表中的数据
local.resetLeaderDue(); // 将leaderDueMs重置为15s + (0到5秒的随机时间) = 15s - 20s
if (local.term.get() + 100 > remote.term.get()) { // 若本地的选举周期加100大于leader节点的选举周期
getLeader().term.set(remote.term.get()); // 将同步leader节点的选举周期
local.term.set(getLeader().term.get()); // 将本地节点的选举周期也置为leader节点的周期
} else {
local.term.addAndGet(100); // 将本地节点的选举周期加100
}
raftStore.updateTerm(local.term.get()); // 更新本地缓存文件meta.properties中的选举周期
} catch (Throwable e) {
} finally {
OPERATE_LOCK.unlock();
}
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
}
return;
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
batch.clear();
} catch (Exception e) {
}
}
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) { // 未被覆盖的key,说明是已经被删除的数据
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey); // 删除数据以及清理key对应的缓存文件
} catch (Exception e) {
}
}

}
return local;
}
private void deleteDatum(String key) {
Datum deleted;
try {
deleted = datums.remove(URLDecoder.decode(key, "UTF-8"));
if (deleted != null) { // 若key对应的数据未被删除
raftStore.delete(deleted); // 删除缓存数据文件
}
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(URLDecoder.decode(key, "UTF-8")).action(DataOperation.DELETE).build());
} catch (UnsupportedEncodingException e) {
}
}
}
public class RaftPeerSet extends MemberChangeListener implements Closeable {
public RaftPeer makeLeader(RaftPeer candidate) { // 更新leader信息,将remote设置为新leader,更新原有leader的节点信息
if (!Objects.equals(leader, candidate)) { // 若leader不是candidate,则将leader设置为candidate
leader = candidate;
ApplicationUtils.publishEvent(new MakeLeaderEvent(this, leader, local())); // 调用RaftListener的onApplicationEvent方法响应事件
Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JacksonUtils.toJson(local()), JacksonUtils.toJson(leader));
}
for (final RaftPeer peer : peers.values()) {
Map<String, String> params = new HashMap<>(1);
if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) { // 若存在纯在旧的leader,则将旧的leader信息更新
try {
String url = RaftCore.buildUrl(peer.ip, RaftCore.API_GET_PEER); // 调用成员的/raft/peer接口
HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) { // 若请求失败则将其状态置为FOLLOWER
Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}", result.getCode(), peer.ip);
peer.state = RaftPeer.State.FOLLOWER;
return;
}
update(JacksonUtils.toObj(result.getData(), RaftPeer.class)); // 更新当前peer的信息
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
peer.state = RaftPeer.State.FOLLOWER;
Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
}
}
}
return update(candidate); // 更新leader的信息
}
public RaftPeer update(RaftPeer peer) {
peers.put(peer.ip, peer);
return peer;
}
}
public class RaftController {
@GetMapping("/datum")
public String get(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String keysString = WebUtils.required(request, "keys");
keysString = URLDecoder.decode(keysString, "UTF-8");
String[] keys = keysString.split(",");
List<Datum> datums = new ArrayList<Datum>();
for (String key : keys) {
Datum datum = raftCore.getDatum(key);
datums.add(datum);
}
return JacksonUtils.toJson(datums);
}
}

实例注册

与AP模式注册实例的区别在调用ConsistencyService#put方法时是调用RaftConsistencyServiceImplPersistentServiceProcessor的put方法。

1
2
3
4
5
6
7
8
9
10
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
public void put(String key, Record value) throws NacosException {
checkIsStopWork();
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
}
}
}

当前节点不是leader节点,会将请求转发给leader节点/raft/datum接口,若当前节点是leader节点则首先将数据更新到缓存datums以及通过发布ValueChangeEvent事件,最终订阅者PersistentNotifier会收到该事件变更执行onEvent方法异步更新注册表,然后遍历调用从节点/raft/datum/commit接口同步数据给其它从节点,且利用CountDownLatch实现一个简单的raft协议写入数据的逻辑,必须集群半数以上节点写入成功才会给客户端返回成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class RaftCore implements Closeable {
public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
if (!isLeader()) { // 若本节点不是leader节点则将注册请求转发到集群的leader节点
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); // 将数据发送到leader节点的/raft/datum接口
return;
}
OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) { // 若key对应的数据不存在
datum.timestamp.set(1L); // 将更新版本设置为1
} else { // 若key已存在,则将已有版本加一
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum)); // 需要更新的数据
json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); // leader节点信息
onPublish(datum, peers.local());
final String content = json.toString();
// 利用CountDownLatch实现一个简单的raft协议写入数据的逻辑,必须集群半数以上节点写入成功才会给客户端返回成功
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) { // 若是leader直接减一,因为上面已更新了数据
latch.countDown();
continue;
}
final String url = buildUrl(server, API_ON_PUB); // 调用/raft/datum/commit接口同步数据给其他从节点
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
return;
}
latch.countDown();
}
});
}
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
} finally {
OPERATE_LOCK.unlock();
}
}
}

其它从节点收到leader节点发送的数据后调用RaftCoreonPublish方法将数据更新到本节点的缓存datums以及通过发布ValueChangeEvent事件,最终订阅者PersistentNotifier会收到该事件变更执行onEvent方法异步更新注册表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class RaftController {
@PostMapping("/datum/commit")
public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");
JsonNode jsonObject = JacksonUtils.toObj(value);
String key = "key";
RaftPeer source = JacksonUtils.toObj(jsonObject.get("source").toString(), RaftPeer.class);
JsonNode datumJson = jsonObject.get("datum");
Datum datum = null;
if (KeyBuilder.matchInstanceListKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Instances>>() {
});
} else if (KeyBuilder.matchSwitchKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<SwitchDomain>>() {
});
} else if (KeyBuilder.matchServiceMetaKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Service>>() {
});
}
raftConsistencyService.onPut(datum, source);
return "ok";
}
}
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
public void onPut(Datum datum, RaftPeer source) throws NacosException {
try {
raftCore.onPublish(datum, source);
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Raft onPut failed, datum:" + datum + ", source: " + source, e);
}
}
}

不仅从节点更新数据是调用的该方法,上面leader节点更新数据同样是通过该方法,首先校验source是否为leader节点,然后重置leaderDueMs,然后将数据写入磁盘,然后将数据更新到缓存datums中,然后更新选举周期以及将其写入磁盘文件meta.properties中,最后发布发布ValueChangeEvent事件异步更新注册表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class RaftCore implements Closeable {
public void onPublish(Datum datum, RaftPeer source) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
RaftPeer local = peers.local();
if (datum.value == null) { // 若接收数据为null抛出异常
throw new IllegalStateException("received empty datum");
}
if (!peers.isLeader(source.ip)) { // 非leader节点这里直接抛出异常
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
if (source.term.get() < local.term.get()) { // 若发布超时则直接抛出异常
throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
local.resetLeaderDue(); // 将leaderDueMs重置为15s + (0到5秒的随机时间) = 15s - 20s
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) { // 若key不是以com.alibaba.nacos.naming.iplist.ephemeral.开头的数据数据
raftStore.write(datum); // 同步写实例数据到文件
}
datums.put(datum.key, datum); // 更新缓存中的数据
if (isLeader()) { // 若本机是leader节点
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); // 将leader的选举周期加100
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else { // 若本机不是的选举周期+100小于leader的选举周期
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); // 本机选举周期加100
}
}
raftStore.updateTerm(local.term.get()); // 更新缓存文件meta.properties中选举周期
// 最终订阅者PersistentNotifier会收到该事件变更执行onEvent方法异步更新注册表
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
}
}

PersistentNotifier订阅者中最终还是调用ServiceonChange方法将实例数据更新到注册表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
public void onEvent(ValueChangeEvent event) {
notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}
public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
// key以com.alibaba.nacos.naming.domains.meta.或meta.开头但不以00-00---000-NACOS_SWITCH_DOMAIN-000---00-00结尾
if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) {
for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
try {
if (action == DataOperation.CHANGE) {
listener.onChange(key, value); // 调用Service的onChange方法将实例数据更新到注册表中
}
if (action == DataOperation.DELETE) {
listener.onDelete(key); // Service总onDelete方法是空实现
}
} catch (Throwable e) {
}
}
}
}
if (!listenerMap.containsKey(key)) {
return;
}
for (RecordListener listener : listenerMap.get(key)) {
try {
if (action == DataOperation.CHANGE) {
listener.onChange(key, value); // 调用Service的onChange方法将实例数据更新到注册表中
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(key); // Service总onDelete方法是空实现
}
} catch (Throwable e) {
}
}
}
}

心跳检测

对于CP模式服务实例的心跳的健康检查是通过HealthCheckTask来完成的,该任务是在Cluster初始化时注册的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
}
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
public void init() {
if (inited) {
return;
}
checkTask = new HealthCheckTask(this); // 创建CP模式心跳监控检查任务
HealthCheckReactor.scheduleCheck(checkTask);
inited = true;
}
}

和AP模式一样的健康检查一样,只注册在本机上的实例才执行健康检查,最终调用TcpSuperSenseProcessorprocess方法。且执行完成后将再次将任务放入延时任务中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class HealthCheckTask implements Runnable {
public HealthCheckTask(Cluster cluster) {
this.cluster = cluster;
distroMapper = ApplicationUtils.getBean(DistroMapper.class);
switchDomain = ApplicationUtils.getBean(SwitchDomain.class);
healthCheckProcessor = ApplicationUtils.getBean(HealthCheckProcessorDelegate.class);
initCheckRT();
}

private void initCheckRT() {
// 第一次健康检查的延迟时间,2000 + (0 ~ 5000)对0到5000进行了嵌套取随机数
checkRtNormalized = 2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));
checkRtBest = Long.MAX_VALUE;
checkRtWorst = 0L;
}
public void run() {
try {
if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {
healthCheckProcessor.process(this); // 注册在本机上的实例,才执行健康检查,调用TcpSuperSenseProcessor的process方法
}
} catch (Throwable e) {
} finally {
if (!cancelled) {
HealthCheckReactor.scheduleCheck(this);
if (this.getCheckRtWorst() > 0 && switchDomain.isHealthCheckEnabled(cluster.getService().getName()) && distroMapper.responsible(cluster.getService().getName())) {
long diff = ((this.getCheckRtLast() - this.getCheckRtLastLast()) * 10000) / this.getCheckRtLastLast();
this.setCheckRtLastLast(this.getCheckRtLast());
Cluster cluster = this.getCluster();
}
}
}
}
}

process中会将任务封装为Beat,然后放入阻塞队列中,TcpSuperSenseProcessor本身就是一个线程类,其初始化时就会被启动,在run方法中调用processTask方法对阻塞队列消费,再将beat任务封装成TaskProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
private BlockingQueue<Beat> taskQueue = new LinkedBlockingQueue<Beat>();
public TcpSuperSenseProcessor() {
try {
selector = Selector.open();
GlobalExecutor.submitTcpCheck(this);
} catch (Exception e) {
throw new IllegalStateException("Error while initializing SuperSense(TM).");
}
}
public void process(HealthCheckTask task) {
List<Instance> ips = task.getCluster().allIPs(false); // 获取所有持久化实例
if (CollectionUtils.isEmpty(ips)) {
return; // 若注册在本机上的持久化实例为null
}
for (Instance ip : ips) {
if (ip.isMarked()) {
continue;
}
if (!ip.markChecking()) {
healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getTcpHealthParams());
continue;
}
Beat beat = new Beat(ip, task);
taskQueue.add(beat); // 将需要进行监控检查的服务放入阻塞对列中
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
}
private void processTask() throws Exception {
Collection<Callable<Void>> tasks = new LinkedList<>();
do {
Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
if (beat == null) {
return;
}
tasks.add(new TaskProcessor(beat));
} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
f.get();
}
}
public void run() {
while (true) {
try {
processTask();
int readyCount = selector.selectNow();
if (readyCount <= 0) {
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
GlobalExecutor.executeTcpSuperSense(new PostProcessor(key));
}
} catch (Throwable e) {
}
}
}
}

最终再通过PostProcessorTaskProcessorTimeOutTask等异步任务发送TCP请求来完成健康检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class PostProcessor implements Runnable {
public void run() {
Beat beat = (Beat) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
try {
if (!beat.isHealthy()) {
//invalid beat means this server is no longer responsible for the current service
key.cancel();
key.channel().close();
beat.finishCheck();
return;
}
if (key.isValid() && key.isConnectable()) {
channel.finishConnect();
beat.finishCheck(true, false, System.currentTimeMillis() - beat.getTask().getStartTime(), "tcp:ok+");
}
if (key.isValid() && key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(128);
if (channel.read(buffer) == -1) {
key.cancel();
key.channel().close();
}
}
} catch (ConnectException e) {
beat.finishCheck(false, true, switchDomain.getTcpHealthParams().getMax(), "tcp:unable2connect:" + e.getMessage());
} catch (Exception e) {
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage());
try {
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}
private class TaskProcessor implements Callable<Void> {
public Void call() {
long waited = System.currentTimeMillis() - beat.getStartTime();
if (waited > MAX_WAIT_TIME_MILLISECONDS) {
Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
}
SocketChannel channel = null;
try {
Instance instance = beat.getIp();
BeatKey beatKey = keyMap.get(beat.toString());
if (beatKey != null && beatKey.key.isValid()) {
if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
instance.setBeingChecked(false);
return null;
}
beatKey.key.cancel();
beatKey.key.channel().close();
}
channel = SocketChannel.open();
channel.configureBlocking(false);
// only by setting this can we make the socket close event asynchronous
channel.socket().setSoLinger(false, -1);
channel.socket().setReuseAddress(true);
channel.socket().setKeepAlive(true);
channel.socket().setTcpNoDelay(true);
Cluster cluster = beat.getTask().getCluster();
int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
channel.connect(new InetSocketAddress(instance.getIp(), port));
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.attach(beat);
keyMap.put(beat.toString(), new BeatKey(key));
beat.setStartTime(System.currentTimeMillis());
GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage());
if (channel != null) {
try {
channel.close();
} catch (Exception ignore) {
}
}
}
return null;
}
}
private static class TimeOutTask implements Runnable {
public void run() {
if (key != null && key.isValid()) {
SocketChannel channel = (SocketChannel) key.channel();
Beat beat = (Beat) key.attachment();
if (channel.isConnected()) {
return;
}
try {
channel.finishConnect();
} catch (Exception ignore) {
}
try {
beat.finishCheck(false, false, beat.getTask().getCheckRtNormalized() * 2, "tcp:timeout");
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}

最终调用BeatfinishCheck方法更新实例的健康状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private class Beat {
public void finishCheck(boolean success, boolean now, long rt, String msg) {
ip.setCheckRt(System.currentTimeMillis() - startTime);
if (success) {
healthCheckCommon.checkOK(ip, task, msg);
} else {
if (now) {
healthCheckCommon.checkFailNow(ip, task, msg);
} else {
healthCheckCommon.checkFail(ip, task, msg);
}
keyMap.remove(task.toString());
}
healthCheckCommon.reEvaluateCheckRT(rt, task, switchDomain.getTcpHealthParams());
}
}
public class HealthCheckCommon {
public void checkOK(Instance ip, HealthCheckTask task, String msg) {
Cluster cluster = task.getCluster();
try {
if (!ip.isHealthy() || !ip.isMockValid()) {
if (ip.getOkCount().incrementAndGet() >= switchDomain.getCheckTimes()) {
if (distroMapper.responsible(cluster, ip)) {
ip.setHealthy(true);
ip.setMockValid(true);
Service service = cluster.getService();
service.setLastModifiedMillis(System.currentTimeMillis());
pushService.serviceChanged(service);
addResult(new HealthCheckResult(service.getName(), ip));
} else {
if (!ip.isMockValid()) {
ip.setMockValid(true);
}
}
}
}
} catch (Throwable t) {
}
ip.getFailCount().set(0);
ip.setBeingChecked(false);
}
public void checkFail(Instance ip, HealthCheckTask task, String msg) {
Cluster cluster = task.getCluster();
try {
if (ip.isHealthy() || ip.isMockValid()) {
if (ip.getFailCount().incrementAndGet() >= switchDomain.getCheckTimes()) {
if (distroMapper.responsible(cluster, ip)) {
ip.setHealthy(false);
ip.setMockValid(false);
Service service = cluster.getService();
service.setLastModifiedMillis(System.currentTimeMillis());
addResult(new HealthCheckResult(service.getName(), ip));
pushService.serviceChanged(service);
}

}
}
} catch (Throwable t) {
}
ip.getOkCount().set(0);
ip.setBeingChecked(false);
}
}