Nacos集群成员信息同步

成员列表初始化

Nacos集群模式启动时,会加载nacos.home文件夹下conf/cluster.conf集群节点列表。然后通过周期延时任务对每个节点发送心跳检查集群成员的健康状况。集群成员的管理是通过ServerMemberManager来完成的,在该Bean通过构造函数初始化时首先会将当前机器加入到集群成员列表中,然后通过initAndStartLookup()加载其它的集群成员列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {
private volatile ConcurrentSkipListMap<String, Member> serverList; // 集群成员列表
private MemberLookup lookup;
public ServerMemberManager(ServletContext servletContext) throws Exception {
this.serverList = new ConcurrentSkipListMap<>();
EnvUtil.setContextPath(servletContext.getContextPath());
init();
}
protected void init() throws NacosException {
Loggers.CORE.info("Nacos-related cluster resource initialization");
this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
this.localAddress = InetUtils.getSelfIP() + ":" + port;
this.self = MemberUtil.singleParse(this.localAddress);
this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
serverList.put(self.getAddress(), self); // 将本机添加到服务端列表中
registerClusterEvent(); // 注册MembersChangeEvent服务端节点变更事件和IP变更事件IPChangeEvent
initAndStartLookup(); // Initializes the lookup mode
if (serverList.isEmpty()) {
throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
}
}
private void initAndStartLookup() throws NacosException {
this.lookup = LookupFactory.createLookUp(this);
this.lookup.start();
}
private void registerClusterEvent() {
// Register node change events 注册服务端节点变更事件
NotifyCenter.registerToPublisher(MembersChangeEvent.class, EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));
// The address information of this node needs to be dynamically modified when registering the IP change of this node
NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {
@Override
public void onEvent(InetUtils.IPChangeEvent event) { // 注册节点IP变更时需要动态修改该节点地址信息
String newAddress = event.getNewIP() + ":" + port;
ServerMemberManager.this.localAddress = newAddress;
EnvUtil.setLocalAddress(localAddress);
Member self = ServerMemberManager.this.self;
self.setIp(event.getNewIP());
String oldAddress = event.getOldIP() + ":" + port;
ServerMemberManager.this.serverList.remove(oldAddress);
ServerMemberManager.this.serverList.put(newAddress, self);
ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
ServerMemberManager.this.memberAddressInfos.add(newAddress);
}
@Override
public Class<? extends Event> subscribeType() {
return InetUtils.IPChangeEvent.class;
}
});
}
}

若是集群模式启动会根据lookupType来创建具体的MemberLookup来加载集群成员列表。lookupType默认为null,默认若在nacos.home文件夹下有conf/cluster.conf集群配置文件则通过FileConfigMemberLookup来加载该配置文件内容。若是单机模式则直接创建StandaloneMemberLookup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final class LookupFactory {
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
if (!EnvUtil.getStandaloneMode()) { // 集群模式启动
String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE); // 默认为null
LookupType type = chooseLookup(lookupType); // 默认为FILE_CONFIG
LOOK_UP = find(type);
currentLookupType = type;
} else {
LOOK_UP = new StandaloneMemberLookup(); // Standalone模式启动
}
LOOK_UP.injectMemberManager(memberManager);
return LOOK_UP;
}
private static LookupType chooseLookup(String lookupType) {
if (StringUtils.isNotBlank(lookupType)) { // 默认lookupType传入的null
LookupType type = LookupType.sourceOf(lookupType);
if (Objects.nonNull(type)) {
return type;
}
}
File file = new File(EnvUtil.getClusterConfFilePath());
if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) {
return LookupType.FILE_CONFIG; // 默认返回
}
return LookupType.ADDRESS_SERVER;
}
private static MemberLookup find(LookupType type) {
if (LookupType.FILE_CONFIG.equals(type)) {
LOOK_UP = new FileConfigMemberLookup();
return LOOK_UP; // 默认返回
}
if (LookupType.ADDRESS_SERVER.equals(type)) {
LOOK_UP = new AddressServerMemberLookup();
return LOOK_UP;
}
throw new IllegalArgumentException();
}
}
public static String getClusterConfFilePath() {
return Paths.get(getNacosHome(), "conf", "cluster.conf").toString();
}

最终调用具体的MemberLookupstart方法,对于单机模式是直接获取本机地址作为集群列表,对于FileConfigMemberLookup首先读取配置文件中的成员列表,让后通过MemberUtil工具类的singleParse方法构造Member对象且默认状态为UP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class FileConfigMemberLookup extends AbstractMemberLookup {
private FileWatcher watcher = new FileWatcher() {
@Override
public void onChange(FileChangeEvent event) {
readClusterConfFromDisk(); // 若文件发生改变,则重新加载文件内容
}
@Override
public boolean interest(String context) {
return StringUtils.contains(context, "cluster.conf");
}
};
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
readClusterConfFromDisk(); // 从磁盘读取集群配置文件
try { // 注册配置文件变更监听器
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
} catch (Throwable e) {
}
}
}
private void readClusterConfFromDisk() {
Collection<Member> tmpMembers = new ArrayList<>();
try {
List<String> tmp = EnvUtil.readClusterConf();
tmpMembers = MemberUtil.readServerConf(tmp);
} catch (Throwable e) {
}
afterLookup(tmpMembers); // 调用ServerMemberManager的memberChange方法发布MembersChangeEvent事件
}
}
public void afterLookup(Collection<Member> members) {
this.memberManager.memberChange(members);
}
public class WatchFileCenter {
public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException {
checkState();
if (NOW_WATCH_JOB_CNT == MAX_WATCH_FILE_JOB) {
return false;
}
WatchDirJob job = MANAGER.get(paths);
if (job == null) {
job = new WatchDirJob(paths);
job.start();
MANAGER.put(paths, job);
NOW_WATCH_JOB_CNT++;
}
job.addSubscribe(watcher);
return true;
}
}

通过WatchDirJob对目标路径的文件进行监听,当发生覆盖、修改、创建、数据删除时会触发FileWatcheronChange方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
private static class WatchDirJob extends Thread {
private WatchService watchService;
private Set<FileWatcher> watchers = new ConcurrentHashSet<>();
public WatchDirJob(String paths) throws NacosException {
setName(paths);
this.paths = paths;
final Path p = Paths.get(paths);
if (!p.toFile().isDirectory()) {
throw new IllegalArgumentException("Must be a file directory : " + paths);
}
this.callBackExecutor = ExecutorFactory.newSingleExecutorService(new NameThreadFactory("com.alibaba.nacos.sys.file.watch-" + paths));
try { // 监听指定路径的文件的覆盖、修改、创建、数据删除
WatchService service = FILE_SYSTEM.newWatchService();
p.register(service, StandardWatchEventKinds.OVERFLOW, StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
this.watchService = service;
} catch (Throwable ex) {
throw new NacosException(NacosException.SERVER_ERROR, ex);
}
}
void addSubscribe(final FileWatcher watcher) {
watchers.add(watcher);
}
public void run() {
while (watch) {
try {
final WatchKey watchKey = watchService.take();
final List<WatchEvent<?>> events = watchKey.pollEvents();
watchKey.reset();
if (callBackExecutor.isShutdown()) {
return;
}
if (events.isEmpty()) {
continue;
}
callBackExecutor.execute(new Runnable() {
@Override public void run() {
for (WatchEvent<?> event : events) {
WatchEvent.Kind<?> kind = event.kind();
// Since the OS's event cache may be overflow, a backstop is needed
if (StandardWatchEventKinds.OVERFLOW.equals(kind)) {
eventOverflow();
} else {
eventProcess(event.context());
}
}
}
});
} catch (InterruptedException ignore) {
Thread.interrupted();
} catch (Throwable ex) {
}
}
}
private void eventProcess(Object context) {
final FileChangeEvent fileChangeEvent = FileChangeEvent.builder().paths(paths).context(context).build();
final String str = String.valueOf(context);
for (final FileWatcher watcher : watchers) {
if (watcher.interest(str)) {
Runnable job = new Runnable() {
@Override public void run() {
watcher.onChange(fileChangeEvent);
}
};
Executor executor = watcher.executor();
if (executor == null) {
try {
job.run();
} catch (Throwable ex) {
}
} else {
executor.execute(job);
}
}
}
}
private void eventOverflow() {
File dir = Paths.get(paths).toFile();
for (File file : Objects.requireNonNull(dir.listFiles())) {
if (file.isDirectory()) {
continue;
}
eventProcess(file.getName());
}
}
}

当成员列表实例化完成后,会通过订阅发布模式MembersChangeEvent放入DefaultPublisher的队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {
synchronized boolean memberChange(Collection<Member> members) {
if (members == null || members.isEmpty()) {
return false;
}
// 是否包含本机地址
boolean isContainSelfIp = members.stream().anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
if (isContainSelfIp) {
isInIpList = true; // 包含本机地址
} else {
isInIpList = false; // 不包含本机地址
members.add(this.self); // 添加本机到成员列表中
}
boolean hasChange = members.size() != serverList.size();
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
for (Member member : members) {
final String address = member.getAddress();
if (!serverList.containsKey(address)) {
hasChange = true;
}
tmpMap.put(address, member);
if (NodeState.UP.equals(member.getState())) {
tmpAddressInfo.add(address);
}
}
serverList = tmpMap;
memberAddressInfos = tmpAddressInfo;
Collection<Member> finalMembers = allMembers();
if (hasChange) {
MemberUtil.syncToFile(finalMembers);
Event event = MembersChangeEvent.builder().members(finalMembers).build();
NotifyCenter.publishEvent(event);
}
return hasChange;
}
}

MembersChangeEvent事件最终被MemberChangeListener的子类DistroMapper订阅者消费,DistroMapperinit方法中会将自身注册为MembersChangeEvent事件订阅者,发生MembersChangeEvent事件变更后会更新健康成员列表healthyList,该列表在客户端节点做心跳时起比较重要的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public abstract class MemberChangeListener extends Subscriber<MembersChangeEvent> {
@Override
public Class<? extends Event> subscribeType() {
return MembersChangeEvent.class;
}
@Override
public boolean ignoreExpireEvent() {
return true;
}
}
public class DistroMapper extends MemberChangeListener {
private volatile List<String> healthyList = new ArrayList<>();
@PostConstruct
public void init() {
NotifyCenter.registerSubscriber(this);
this.healthyList = MemberUtil.simpleMembers(memberManager.allMembers());
}
public void onEvent(MembersChangeEvent event) { // 当有服务端成员信息更新时
List<String> list = MemberUtil.simpleMembers(MemberUtil.selectTargetMembers(event.getMembers(), member -> NodeState.UP.equals(member.getState()) || NodeState.SUSPICIOUS.equals(member.getState())));
Collections.sort(list);
Collection<String> old = healthyList;
healthyList = Collections.unmodifiableList(list);
}
}

成员状态检查

通过周期延时任务对集群每个成员发送心跳检查并更新集群节点状态,该任务是通过MemberInfoReportTask来完成的,ServerMemberManager实现了ApplicationListener接口,在onApplicationEvent方法中添加了该任务。

1
2
3
4
5
6
7
8
9
10
11
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {
private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
public void onApplicationEvent(WebServerInitializedEvent event) {
getSelf().setState(NodeState.UP);
if (!EnvUtil.getStandaloneMode()) {
GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L); // 5s后开始执行
}
EnvUtil.setPort(event.getWebServer().getPort());
EnvUtil.setLocalAddress(this.localAddress);
}
}

MemberInfoReportTask继承自Task,Task实现了Runnable接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class Task implements Runnable {
protected volatile boolean shutdown = false;
@Override
public void run() {
if (shutdown) {
return;
}
try {
executeBody();
} catch (Throwable t) {
} finally {
if (!shutdown) {
after();
}
}
}
protected abstract void executeBody();
protected void after() {
}
}

循环向除开自己的其他成员的HTTP接口/v1/core/cluster/report发送自己Member信息,且每一个任务周期只向一个成员发送数据。再根据接口的返回情况更新成员的状态,其实这里做了两件事,即检查更新其他成员的状态也更新其他成员节点上自己的状态executeBody执行完后再通过after方法递归将任务丢回延时线程池中。第一次5s后执行,后续是每2s执行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class MemberInfoReportTask extends Task {
private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {
};
private int cursor = 0;
@Override
protected void executeBody() { // 请求其他成员的HTTP接口/v1/core/cluster/report
List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
if (members.isEmpty()) {
return;
}
this.cursor = (this.cursor + 1) % members.size(); // 遍历每个周期向一个成员发送HTTP请求
Member target = members.get(cursor);
final String url = HttpUtils.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT, "/cluster/report");
try {
asyncRestTemplate.post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version), Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value() || result.getCode() == HttpStatus.NOT_FOUND.value()) {
return; // 返回码为403或404
}
if (result.ok()) { // 请求成功
MemberUtil.onSuccess(ServerMemberManager.this, target); // 更新成员信息
} else { // 请求失败
MemberUtil.onFail(ServerMemberManager.this, target); // 更新成员信息
}
}
@Override
public void onError(Throwable throwable) { // 请求错误
MemberUtil.onFail(ServerMemberManager.this, target, throwable); // 更新成员信息
}
@Override
public void onCancel() {
}
});
} catch (Throwable ex) {
}
}
@Override
protected void after() { // executeBody执行完成后,递归将任务在放入延时线程池中
GlobalExecutor.scheduleByCommon(this, 2_000L);
}
}

若请求接口成功直接将成员状态设置为UP重置失败访问次数,若请求接口失败直接将当前成员状态设置为SUSPICIOUS,若访问失败次数大于最大失败访问次数,或Connection refused则将当前成员状态置为DOWN。若状态发生变化则发送MembersChangeEvent事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MemberUtil {
public static void onSuccess(final ServerMemberManager manager, final Member member) {
final NodeState old = member.getState();
manager.getMemberAddressInfos().add(member.getAddress());
member.setState(NodeState.UP);
member.setFailAccessCnt(0);
if (!Objects.equals(old, member.getState())) {
manager.notifyMemberChange(); // 状态若发送变化则发送MembersChangeEvent事件
}
}
public static void onFail(final ServerMemberManager manager, final Member member) {
onFail(manager, member, ExceptionUtil.NONE_EXCEPTION);
}
public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
manager.getMemberAddressInfos().remove(member.getAddress());
final NodeState old = member.getState();
member.setState(NodeState.SUSPICIOUS);
member.setFailAccessCnt(member.getFailAccessCnt() + 1);
// 最大访问失败次数,默认为3次
int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);
if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
member.setState(NodeState.DOWN); // 若访问失败次数大于最大失败访问次数,或Connection refused则将当前成员状态置为DOWN
}
if (!Objects.equals(old, member.getState())) {
manager.notifyMemberChange(); // 状态若发送变化则发送MembersChangeEvent事件
}
}
}

其他服务端成员的NacosClusterController会接收到该信息,从而将收到的成员状态置为UP失败访问次数置为0,之所以这样是因为接收到的是发送服务本身,相当于接收到了一个心跳。然后调用ServerMemberManagerupdate方法将信息更新到serverListhealthyList中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class NacosClusterController {
@PostMapping(value = {"/report"})
public RestResult<String> report(@RequestBody Member node) {
if (!node.check()) {
return RestResultUtils.failedWithMsg(400, "Node information is illegal");
}
node.setState(NodeState.UP);
node.setFailAccessCnt(0);
boolean result = memberManager.update(node);
return RestResultUtils.success(Boolean.toString(result));
}
}
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {
public boolean update(Member newMember) {
Loggers.CLUSTER.debug("member information update : {}", newMember);
String address = newMember.getAddress();
if (!serverList.containsKey(address)) {
return false; // 若当前地址不在服务列表中
}
serverList.computeIfPresent(address, (s, member) -> {
if (NodeState.DOWN.equals(newMember.getState())) {
memberAddressInfos.remove(newMember.getAddress()); // 若服务已下线,则从memberAddressInfos移除
}
boolean isPublishChangeEvent = MemberUtil.isBasicInfoChanged(newMember, member); // 若有属性变更则为true
newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
MemberUtil.copy(newMember, member); // 将新信息更新到旧Member上
if (isPublishChangeEvent) { // 若接收到的服务端成员存在信息变更
notifyMemberChange(); // 通过发布订阅模式通知DistroMapper更新healthyList
}
return member;
});
return true;
}
}