Nacos配置中心Server原理

服务端配置加载

Nacos启动时通过DumpService的子类ExternalDumpService在初始化时调用dumpOperate来完成配置从数据库加载到缓存和写入磁盘中缓存文件中。先会通过dumpConfigInfo方法执行DumpAllProcessorprocess方法将数据库中的数据刷到内存中,且将变化的数据通知客户端。且会10分钟执行一次数据库逾期数据清理默认逾期时间30。且6小时将数据库中数据再刷一遍到内存中以及磁盘缓存文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class ExternalDumpService extends DumpService {
@PostConstruct
protected void init() throws Throwable {
dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
}
}
public abstract class DumpService {
protected DumpProcessor processor;
protected DumpAllProcessor dumpAllProcessor;
protected DumpAllBetaProcessor dumpAllBetaProcessor;
protected DumpAllTagProcessor dumpAllTagProcessor;
protected final PersistService persistService;
protected final ServerMemberManager memberManager;
public DumpService(PersistService persistService, ServerMemberManager memberManager) {
this.persistService = persistService;
this.memberManager = memberManager;
this.processor = new DumpProcessor(this);
this.dumpAllProcessor = new DumpAllProcessor(this); // 全量Dump配置信息的Processor
this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
this.dumpTaskMgr.setDefaultTaskProcessor(processor);
this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
DynamicDataSource.getInstance().getDataSource();
}
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor, DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
try {
Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
Runnable clearConfigHistory = () -> { // 清理数据库过期的数据
if (canExecute()) { // 若服务端成员列表中第一个成员是本机
try { // 默认过期时间为30天
Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
int totalCount = persistService.findConfigHistoryCountByTime(startTime); // 查询数据库逾期数据条数(超过30天)
if (totalCount > 0) { // 若逾期数据条数大于0
int pageSize = 1000;
int removeTime = (totalCount + pageSize - 1) / pageSize;
while (removeTime > 0) { // 分页处理
persistService.removeConfigHistory(startTime, pageSize); // 将数据从数据库批量删除
removeTime--;
}
}
} catch (Throwable e) {
}
}
};
try {
dumpConfigInfo(dumpAllProcessor); // 全量Dump配置信息
DiskUtil.clearAllBeta(); // update Beta cache
if (persistService.isExistTable(BETA_TABLE_NAME)) {
dumpAllBetaProcessor.process(new DumpAllBetaTask());
}
DiskUtil.clearAllTag(); // update Tag cache
if (persistService.isExistTable(TAG_TABLE_NAME)) {
dumpAllTagProcessor.process(new DumpAllTagTask());
}
List<ConfigInfoChanged> configList = persistService.findAllAggrGroup(); // add to dump aggr
if (configList != null && !configList.isEmpty()) {
total = configList.size();
List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
for (List<ConfigInfoChanged> list : splitList) {
MergeAllDataWorker work = new MergeAllDataWorker(list);
work.start();
}
}
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(), e);
}
if (!EnvUtil.getStandaloneMode()) {
Runnable heartbeat = () -> { // 更新心跳检测文件中的时间
String heartBeatTime = TimeUtils.getCurrentTime().toString(); // 或去当前心跳的时间
try { // write disk
DiskUtil.saveHeartBeatToDisk(heartBeatTime);
} catch (IOException e) {
}
};
ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS); // 10s执行一次心跳
long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10; // 第一次随机时间执行
ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); // 每6小时执行一次,将dumpAll添加到任务队列中
ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
}
ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES); // 每10分钟执行一次清理数据库过期的数据
} finally {
TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
}
}
}

不是通过isQuickStart方式启动的,或通过isQuickStart方式启动但最后心跳时间大于6小时,则先清理磁盘中的缓存文件,再通过DumpAllProcessor将数据从数据库加载到磁盘和和缓存中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public abstract class DumpService {
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
int timeStep = 6;
Boolean isAllDump = true;
// initial dump all
FileInputStream fis = null;
Timestamp heartheatLastStamp = null;
try {
if (isQuickStart()) { // 默认为false
File heartbeatFile = DiskUtil.heartBeatFile(); // 心跳文件
if (heartbeatFile.exists()) {
fis = new FileInputStream(heartbeatFile);
String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);
heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
if (TimeUtils.getCurrentTime().getTime() - heartheatLastStamp.getTime() < timeStep * 60 * 60 * 1000) {
isAllDump = false; // 若当前时间减最近心跳时间小于6小时则不全量更新
}
}
}
if (isAllDump) { // 若当前时间减最近心跳时间大于6小时则全量更新
DiskUtil.clearAll(); // 清理磁盘缓存的配置信息文件
dumpAllProcessor.process(new DumpAllTask());
} else { // 若当前时间减最近心跳时间小于6小时则不全量更新
Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp, timeStep); // 6小时前
DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(this, beforeTimeStamp, TimeUtils.getCurrentTime());
dumpChangeProcessor.process(new DumpChangeTask()); // 处理6小时前到现在有变更的配置
Runnable checkMd5Task = () -> {
List<String> diffList = ConfigCacheService.checkMd5();
for (String groupKey : diffList) { // 查询数据库中配置与缓存中配置对比,若发生变更,则写入磁盘缓存以及更新缓存通知客户端
String[] dg = GroupKey.parseKey(groupKey);
String dataId = dg[0];
String group = dg[1];
String tenant = dg[2];
ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);
ConfigCacheService.dumpChange(dataId, group, tenant, configInfo.getContent(), configInfo.getLastModified());
}
};
ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS); // 每12小时执行一次checkMd5Task
}
} catch (IOException e) {
throw e;
}
}
}

会通过分页查询数据库将数据批量加载到磁盘和内存中,通过ConfigCacheServicedump方法将数据写入磁盘,且缓存配置信息的md5到内存中,并发布LocalDataChangeEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class DumpAllProcessor implements NacosTaskProcessor {
public boolean process(NacosTask task) {
long currentMaxId = persistService.findConfigMaxId(); // 查询MySQL获取最大的主键值,用于分页查询
long lastMaxId = 0;
while (lastMaxId < currentMaxId) { // 从MySQL分页查询所有配置
Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE); // 每页大小默认为1000条配置
if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
for (ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = id > lastMaxId ? id : lastMaxId;
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) { // dataId == com.alibaba.nacos.metadata.aggrIDs
AggrWhitelist.load(cf.getContent());
}
if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) { // com.alibaba.nacos.metadata.clientIpWhitelist
ClientIpWhiteList.load(cf.getContent());
}
if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(cf.getContent());
}
// 将数据写入磁盘,缓存配置信息的md5到内存中,并发布LocalDataChangeEvent
boolean result = ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(), cf.getType());
final String content = cf.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
}
} else {
lastMaxId += PAGE_SIZE;
}
}
return true;
}
}
public class ConfigCacheService {
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs, String type) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
CacheItem ci = makeSure(groupKey);
ci.setType(type);
final int lockResult = tryWriteLock(groupKey); // 获取写锁
assert (lockResult != 0);
if (lockResult < 0) {
return false;
}
try {
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
} else if (!PropertyUtil.isDirectRead()) { // 非Standalone模式启动且非内嵌数据库
DiskUtil.saveToDisk(dataId, group, tenant, content); // 将配置保存到磁盘文件中
}
updateMd5(groupKey, md5, lastModifiedTs);// 缓存配置信息的md5到内存中,并发布LocalDataChangeEvent
return true;
} catch (IOException ioe) {
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg.contains(DISK_QUATA_EN)) {
System.exit(0); // 磁盘满自杀退出
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
}
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) { // 为空说明没有,不相等说明更新了
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs; // 更新最新更新时间
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey)); // 发布更新事件,最终被LongPollingService订阅者监听
}
}
}

配置变更监听器触发

发布的LocalDataChangeEvent事件会被LongPollingService监听到,最终异步执行DataChangeTask任务,遍历所有的客户端长连接,这些长连接是客户启动时通过监听数据变化而创建的,每10ms执行一次,长连接超时时间30s,服务端会持有长连接29.5s后才执行,比较客户端长链接请求的数据中否包含该groupKey,若包含直接响应长连接。客户端长连接请求时间间隔很短,即使更新时漏掉了某个长连接,但当常链接请求过来时,会发现数据变更了会立即返回。从而达到不漏掉任务客户端,且接近实时的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public LongPollingService() { // 初始化LocalDataChangeEvent事件订阅者
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
// Register LocalDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe LocalDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return LocalDataChangeEvent.class;
}
});
}
class DataChangeTask implements Runnable {
@Override
public void run() { // 服务的push模式
try {
ConfigCacheService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// If published tag is not in the beta list, then it skipped.
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue;
}
// If published tag is not in the tag list, then it skipped.
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
clientSub.sendResponse(Arrays.asList(groupKey)); // 响应配置发送变化的key,调用长连接中的sendResponse方法给客户响应数据
}
}
} catch (Throwable t) {
}
}
}

服务端配置发布

服务端配置的发布是通过控制界面修改了配置点击发布,从而调用ConfigControllerpublishConfig方法完成配置的发布。首先会通过PersistServiceinsertOrUpdate方法将数据持久化到数据库。然后发布ConfigDataChangeEvent事件被订阅者AsyncNotifyService感知,从而通知其他成员节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@PostMapping // 用户通过nacos-console控制界面修改了配置,点击发布调用该接口
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
final String srcIp = RequestUtil.getRemoteIp(request); // 获取远程调用者IP
final String requestIpApp = RequestUtil.getAppName(request);
srcUser = RequestUtil.getSrcUserName(request);
if (!ConfigType.isValidType(type)) { //check type
type = ConfigType.getDefaultType().getType();
}
ParamUtils.checkTenant(tenant); // check tenant
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(dataId)) {
throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
configInfo.setType(type);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true); // 持久化数据到MySQL
// ConfigDataChangeEvent事件被订阅者AsyncNotifyService感知
ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true); // 持久化数据到MySQL
// ConfigDataChangeEvent事件被订阅者AsyncNotifyService感知
ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else {
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true); // beta publish
// ConfigDataChangeEvent事件被订阅者AsyncNotifyService感知
ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}

当接收到ConfigDataChangeEvent变更事件后,首先遍历所有服务端成员,将变更配置的KEY以及成员信息封装到NotifySingleTask中,并放入阻塞队列中,通过AsyncTask中执行执行executeAsyncInvoke方法将更变数据通过调用服务端成员的/v1/cs/communication/dataChange接口同步给对方。若成员不是健康状态则延迟执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class AsyncNotifyService {
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
Collection<Member> ipList = memberManager.allMembers();
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (Member member : ipList) { // 将变更的配置信息同步给集群其它节点,包括自己
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta));
}
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
}
static class NotifySingleTask extends NotifyTask {
private String target;
public String url;
private boolean isBeta;
private static final String URL_PATTERN = "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" + "?dataId={2}&group={3}";
private static final String URL_PATTERN_TENANT = "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange?dataId={2}&group={3}&tenant={4}";
private int failCount;
public NotifySingleTask(String dataId, String group, String tenant, String tag, long lastModified, String target, boolean isBeta) {
super(dataId, group, tenant, lastModified);
this.target = target;
this.isBeta = isBeta;
try {
dataId = URLEncoder.encode(dataId, Constants.ENCODE);
group = URLEncoder.encode(group, Constants.ENCODE);
} catch (UnsupportedEncodingException e) {
}
if (StringUtils.isBlank(tenant)) {
this.url = MessageFormat.format(URL_PATTERN, target, EnvUtil.getContextPath(), dataId, group);
} else {
this.url = MessageFormat.format(URL_PATTERN_TENANT, target, EnvUtil.getContextPath(), dataId, group, tenant);
}
if (StringUtils.isNotEmpty(tag)) {
url = url + "&tag=" + tag;
}
failCount = 0;
}
}
class AsyncTask implements Runnable {
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() { // 将更变数据通过调用其他成员的/v1/cs/communication/dataChange接口同步给对方
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (memberManager.hasMember(targetIp)) {
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
if (unHealthNeedDelay) { // 若成员不健康
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
asyncTaskExecute(task);
} else {
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
AuthHeaderUtil.addIdentityToHeader(header);
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
}
}
}
}
}

最终调用CommunicationControllernotifyConfigInfo方法接收变更数据,最终调用DumpService的dump方法将任务放入NacosDelayTaskExecuteEngine的任务队列中。最终通过在DumpService构造方法中设置好的DumpProcessor处理器来处理该任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
public abstract class DumpService {
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp) {
dump(dataId, group, tenant, lastModified, handleIp, false);
}
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
}
}

DumpProcessor处理器中,首先通过PersistServicefindConfigInfo方法从数据库查询最新的数据,然后将变更信息即最新数据封装成了ConfigDumpEvent事件,通过DumpConfigHandlerconfigDump方法最终调用ConfigCacheServicedump方法将最新的数据更新到磁盘缓存文件中,以及缓存中并发布LocalDataChangeEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class DumpProcessor implements NacosTaskProcessor {
public boolean process(NacosTask task) {
final PersistService persistService = dumpService.getPersistService();
DumpTask dumpTask = (DumpTask) task;
String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
String dataId = pair[0];
String group = pair[1];
String tenant = pair[2];
long lastModified = dumpTask.getLastModified();
String handleIp = dumpTask.getHandleIp();
boolean isBeta = dumpTask.isBeta();
String tag = dumpTask.getTag();
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
if (isBeta) {
// beta发布,则dump数据,更新beta缓存,从数据库加载最新数据
ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
build.content(Objects.isNull(cf) ? null : cf.getContent());
return DumpConfigHandler.configDump(build.build());
} else {
if (StringUtils.isBlank(tag)) {
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant); // 从数据库加载最新数据
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.type(Objects.isNull(cf) ? null : cf.getType());
return DumpConfigHandler.configDump(build.build());
} else {
ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag); // 从数据库加载最新数据
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
return DumpConfigHandler.configDump(build.build());
}
}
}
}
public class DumpConfigHandler extends Subscriber<ConfigDumpEvent> {
public static boolean configDump(ConfigDumpEvent event) { // 该方法删除了多余代码逻辑
final String dataId = event.getDataId();
final String group = event.getGroup();
final String namespaceId = event.getNamespaceId();
final String content = event.getContent();
final String type = event.getType();
final long lastModified = event.getLastModifiedTs();
if (StringUtils.isBlank(event.getTag())) {
if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(content);
}
if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(content);
}
if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(content);
}
boolean result;
if (!event.isRemove()) {
result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, content.length());
}
} else {
result = ConfigCacheService.remove(dataId, group, namespaceId);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
}
return result;
}
}
}