Nacos集群注册服务同步

服务注册同步

客户端在调用服务端接口注册实例时,会调用ConsistencyService#put方法将实例对象添加到注册表,以及同步给其它服务端成员。注册服务的同步是通过调用DistroProtocolsync同步方法来完成的,然后调用NacosDelayTaskExecuteEngineaddTask方法将同步任务添加到ConcurrentHashMap中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
public void put(String key, Record value) throws NacosException {
onPut(key, value); // 若是ephemeral实例将其添加到DataStore缓存中,然后通过一部任务替换注册表中实例列表
// 同步实例数据到其它服务端成员列表中,是将当前服务名称下所有实例同步到其他服务端,最终是通过异步任务加队列的方式,调用HTTP接口最终在其他服务上也是通过onPut方法来完成
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
}
}
public class DistroProtocol {
public void sync(DistroKey distroKey, DataOperation action, long delay) { // delay默认值为1000
for (Member each : memberManager.allMembersWithoutSelf()) { // 遍历每个除自己以外的其它成员
// DistroKey的resourceKey对应的是DataStore中dataMap的key用于获取缓存Service信息的Datum
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
}
}
}

NacosDelayTaskExecuteEngine初始化时向延时线程池中添加了一个延时任务ProcessRunnable,该任务每100ms执行一次调用processTasks将添加到ConcurrentHashMap中的任务取出调用NacosTaskProcessor#process方法进行同步操作。addTask时若已存在同步任务,则调用DistroDelayTask#merge方法进行任务合并,其实就是替换为最新任务的action和createTime。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
// 在DistroDelayTaskExecuteEngine中实例化是代用该构造函数实例化
public NacosDelayTaskExecuteEngine(String name, Logger logger) {
this(name, 32, logger, 100L);
}
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask); // 若同步任务已存在,则合并同步任务
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
if (!processor.process(task)) {
retryFailedTask(taskKey, task); // ReAdd task if process failed
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
private class ProcessRunnable implements Runnable {
@Override
public void run() { // 100毫秒执行一次
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
}
public class DistroDelayTask extends AbstractDelayTask {
public void merge(AbstractDelayTask task) {
if (!(task instanceof DistroDelayTask)) {
return;
}
DistroDelayTask newTask = (DistroDelayTask) task;
if (!action.equals(newTask.getAction()) && createTime < newTask.getCreateTime()) {
action = newTask.getAction();
createTime = newTask.getCreateTime();
}
setLastProcessTime(newTask.getLastProcessTime());
}
}

具体NacosTaskProcessor是在DistroTaskEngineHolder中实例化DistroDelayTaskExecuteEngine时设置了默认DistroDelayTaskProcessor,以及在DistroHttpRegistry中添加了具体的KeyBuilder.INSTANCE_LIST_KEY_PREFIXDistroHttpDelayTaskProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DistroTaskEngineHolder {
private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
}
}
public class DistroHttpRegistry {
@PostConstruct
public void doRegister() {
componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroDataStorageImpl(dataStore, distroMapper));
componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager));
componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpCombinedKeyTaskFailedHandler(globalConfig, taskEngineHolder));
taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
componentHolder.registerDataProcessor(consistencyService);
}
}

很明显NacosTaskProcessor#process具体执行的是DistroHttpDelayTaskProcessor#process,上面DistroTaskEngineHolder中实例化了DistroExecuteTaskExecuteEngine,最终调用其父类NacosExecuteTaskExecuteEngine#addTask方法。

1
2
3
4
5
6
7
8
9
10
public class DistroHttpDelayTaskProcessor implements NacosTaskProcessor {
private final DistroTaskEngineHolder distroTaskEngineHolder;
public boolean process(NacosTask task) {
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig, distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction());
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, executeTask);
return true;
}
}

调用TaskExecuteWorker#processDistroHttpCombinedKeyExecuteTask添加到阻塞队列中TaskExecuteWorker实例化时创建并启动了InnerWorker线程,该线程对阻塞队列消费取出DistroDelayTask并执行其run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
public void addTask(Object tag, AbstractExecuteTask task) {
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
processor.process(task);
return;
}
TaskExecuteWorker worker = getWorker(tag);
worker.process(task);
}
}
public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
private final BlockingQueue<Runnable> queue;
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
this.name = name + "_" + mod + "%" + total;
this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
new InnerWorker(name).start();
}
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
putTask((Runnable) task); // 将任务添加到阻塞对列中
}
return true;
}
private void putTask(Runnable task) {
try {
queue.put(task); // 该对列最终通过InnerWorker类消费
} catch (InterruptedException ire) {
}
}
private class InnerWorker extends Thread {
@Override
public void run() {
while (!closed.get()) {
try {
Runnable task = queue.take();
task.run();
} catch (Throwable e) {
}
}
}
}
}

最终会将任务封装成DistroHttpCombinedKeyDelayTask,并重新生成一个DistroKey,其实就是再一次执行了上面的入队操作,然后调用DistroDelayTaskProcessor#process方法,最终调用DistroSyncChangeTask#run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class DistroHttpCombinedKeyExecuteTask extends AbstractExecuteTask {
public void run() {
try {
DistroKey newKey = new DistroKey(DistroHttpCombinedKey.getSequenceKey(), DistroHttpCombinedKeyDelayTask.class.getSimpleName(), singleDistroKey.getTargetServer());
DistroHttpCombinedKeyDelayTask combinedTask = new DistroHttpCombinedKeyDelayTask(newKey, taskAction, globalConfig.getTaskDispatchPeriod() / 2, globalConfig.getBatchSyncKeyCount());
combinedTask.getActualResourceKeys().add(singleDistroKey.getResourceKey());
distroDelayTaskExecuteEngine.addTask(newKey, combinedTask);
} catch (Exception e) {
}
}
}
public class DistroDelayTaskProcessor implements NacosTaskProcessor {
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}
}
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
public void run() { // 最终客户端服务同步是通过该线程来完成的
try {
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE); // DistroData其实就是序列化好的Datum
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
handleFailedTask(); // 失败重试
}
} catch (Exception e) {
handleFailedTask();
}
}
}

最终通过调用HTTP接口/v1/ns/distro/datum来实现数据的同步,这里是直接将整个serviceName对应的实例数据列表全部进行了同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class DistroHttpAgent implements DistroTransportAgent {
public boolean syncData(DistroData data, String targetServer) {
if (!memberManager.hasMember(targetServer)) {
return true; // 若不存在该成员,则直接跳过
}
byte[] dataContent = data.getContent();
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
}
public class NamingProxy {
public static boolean syncData(byte[] data, String curServer) { // 调用/v1/ns/distro/datum接口即DistroController#onSyncDatum
Map<String, String> headers = new HashMap<>(128);
headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
headers.put(HttpHeaderConsts.ACCEPT_ENCODING, "gzip,deflate,sdch");
headers.put(HttpHeaderConsts.CONNECTION, "Keep-Alive");
headers.put(HttpHeaderConsts.CONTENT_ENCODING, "gzip");
try {
RestResult<String> result = HttpClient.httpPutLarge("http://" + curServer + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data);
if (result.ok()) {
return true;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.getCode()) {
return true;
}
throw new IOException("failed to req API:" + "http://" + curServer + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.getCode() + " msg: " + result.getData());
} catch (Exception e) {
}
return false;
}
}

在其他成员的HTTP接口接收到数据后进行遍历将Datum中的数据通过onPut方法将数据设置到自身的DataStore中,以及更新到注册表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro")
public class DistroController {
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
if (dataMap.isEmpty()) {
Loggers.DISTRO.error("[onSync] receive empty entity!");
throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
}
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
String serviceName = KeyBuilder.getServiceName(entry.getKey());
if (!serviceManager.containService(namespaceId, serviceName) && switchDomain.isDefaultInstanceEphemeral()) {
serviceManager.createEmptyService(namespaceId, serviceName, true);
}
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
distroProtocol.onReceive(distroHttpData);
}
}
return ResponseEntity.ok("ok");
}
}
public class DistroProtocol {
public boolean onReceive(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
return dataProcessor.processData(distroData);
}
}
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
public boolean processData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
onPut(datum.key, datum.value);
return true;
}
}

服务启动同步

在服务启动时会去其它成员服务中同步注册的实例数据,通过DistroProtocol实例化时通过startLoadTask方法将同步任务封装成DistroLoadDataTask对象异步来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DistroProtocol {
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroConfig = distroConfig;
startDistroTask();
}
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask(); // 注册的客户端服务校验5s后开始,每5s执行一次
startLoadTask(); // 注册的客户端服务数据同步
}
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
}

若数据加载失败会再次将任务放入线程池中延时30s后再重试同步任务,通过调用目标成员的HTTP接口获取目标成员中全量的注册服务数据。并解析更新到缓存和注册表中,只要有一个成员更新成功则退出循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class DistroLoadDataTask implements Runnable {
public void run() {
try {
load(); // 加载数据
if (!checkCompleted()) { // 若数据加载失败则重试
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess(); // 加载成功将isInitialized设置为true
}
} catch (Exception e) {
loadCallback.onFailed(e); // 异常将isInitialized设置为false
}
}
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) { // 等待成员数据加载完成
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) { // 等待缓存实例加载完成
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); // 从其他服务端成员加载数据镜像
}
}
}
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
return false;
}
for (Member each : memberManager.allMembersWithoutSelf()) { // 遍历除自己以外的所有其它服务端成员
try {
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress()); // 通过HTTP接口获取目标成员中的注册服务数据
boolean result = dataProcessor.processSnapshot(distroData); // 解析数据并更新到缓存和注册表中
if (result) {
return true; // 只要有一个成功则退出循环
}
} catch (Exception e) {
}
}
return false;
}
private boolean checkCompleted() {
if (distroComponentHolder.getDataStorageTypes().size() != loadCompletedMap.size()) {
return false;
}
for (Boolean each : loadCompletedMap.values()) {
if (!each) {
return false;
}
}
return true;
}
}

最终通过DistroHttpAgent调用NamingProxy#getAllData方法从而调用目标成员的/v1/ns/distro/datums接口即DistroController#getAllDatums获取数据列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DistroHttpAgent implements DistroTransportAgent {
public DistroData getDatumSnapshot(String targetServer) { // 通过HTTP接口获取目标成员中的注册服务数据
try {
byte[] allDatum = NamingProxy.getAllData(targetServer);
return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
} catch (Exception e) {
throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);
}
}
}
public class NamingProxy {
public static byte[] getAllData(String server) throws Exception { // 调用目标成员的HTTP接口/v1/ns/distro/datums获取数据列表
Map<String, String> params = new HashMap<>(8);
RestResult<String> result = HttpClient.httpGet("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList<>(), params);
if (result.ok()) {
return result.getData().getBytes();
}
throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: " + result.getMessage());
}
}

其它成员接收到请求后,直接从DataStore缓存中获取到数据进行序列化后返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro")
public class DistroController {
@GetMapping("/datums")
public ResponseEntity getAllDatums() {
DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return ResponseEntity.ok(distroData.getContent());
}
}
public class DistroProtocol {
public DistroData onSnapshot(String type) {
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
if (null == distroDataStorage) {
return new DistroData(new DistroKey("snapshot", type), new byte[0]);
}
return distroDataStorage.getDatumSnapshot();
}
}
public class DistroDataStorageImpl implements DistroDataStorage {
public DistroData getDatumSnapshot() {
Map<String, Datum> result = dataStore.getDataMap();
byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return new DistroData(distroKey, dataContent);
}
}

对于ephemeraltrue实例数据,会首先直接添加到缓存中,然后判断若当前没有则创建添加更新到注册表中,对于ephemeral为false的实例数据,若不存在则跳过,若存在则更新注册表数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
public boolean processSnapshot(DistroData distroData) {
try {
return processData(distroData.getContent());
} catch (Exception e) {
return false;
}
}
private boolean processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class); // 反序列化
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue()); // 替换缓存中的数据
if (!listeners.containsKey(entry.getKey())) { // 若serviceList中不包含该数据则创建一个并更新到注册表中
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) { // ephemeral为true的实例,默认为true
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// The Listener corresponding to the key value must not be empty
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener)) {
return false;
}
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); // 更新注册表数据
}
}
}
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { // 处理ephemeral为false的实例
if (!listeners.containsKey(entry.getKey())) {// Should not happen: 若实例不存在则直接跳过
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value); // 更新注册表数据
}
} catch (Exception e) {
continue;
}
dataStore.put(entry.getKey(), entry.getValue()); // 更新缓存中的数据
}
}
return true;
}
}

服务状态定时同步

注册的实例数据的定时同步是通过DistroProtocol构造方法中startDistroTask中调用startVerifyTask将同步任务封装成DistroVerifyTask添加到周期定时执行线程中。

1
2
3
4
5
public class DistroProtocol {
private void startVerifyTask() {
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder), distroConfig.getVerifyIntervalMillis());
}
}

首先会获取注册在本机上的所有客户端注册数据计算的checksum放入列表中封装成DistroKey,然后将该数据发送到每一个服务端成员进行校验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class DistroVerifyTask implements Runnable {
public void run() {
try {
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf(); // 获取所有除开自己的其它服务端成员
for (String each : distroComponentHolder.getDataStorageTypes()) {
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
}
}
private void verifyForDataStorage(String type, List<Member> targetServer) {
DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData(); // 获取本机上所有客户端服务的注册keyChecksums
if (null == distroData) {
return; // 若没有任何客户端注册在本实例上
}
distroData.setType(DataOperation.VERIFY);
for (Member member : targetServer) {
try {
distroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress());
} catch (Exception e) {
}
}
}
}
public class DistroDataStorageImpl implements DistroDataStorage {
public DistroData getVerifyData() {
Map<String, String> keyChecksums = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue; // 若key对应的客户端服务不是注册在当前的服务端上,则跳过
}
Datum datum = dataStore.get(key);
if (datum == null) {
continue; // 若当前Key获取到的Service缓存数据为null
}
keyChecksums.put(key, datum.value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return null; // 若没有找到注册的服务信息
}
DistroKey distroKey = new DistroKey("checksum", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(keyChecksums));
}
}

最终挨个调用目标成员的/v1/ns/distro/checksum接口进行数据校验,这里localServer的目的是用于目标成员回调,获取更新的服务实例数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class DistroHttpAgent implements DistroTransportAgent {
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
if (!memberManager.hasMember(targetServer)) {
return true; // 若当前服务端实例不在成员列表中
}
NamingProxy.syncCheckSums(verifyData.getContent(), targetServer);
return true;
}
}
public class NamingProxy {
public static void syncCheckSums(byte[] checksums, String server) {
try { // 调用/v1/ns/distro/checksum?source=localServer接口校验
Map<String, String> headers = new HashMap<>(128);
headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
headers.put(HttpHeaderConsts.CONNECTION, "Keep-Alive");
HttpClient.asyncHttpPutLarge("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL + "?source=" + NetUtils.localServer(), headers, checksums, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
}
}
}

上一次任务还没有执行完成又接收到了新的任务会直接跳过,若接收的实例列表存在注册在当前机器上发送者是本机则直接中断,让后将新增和更新的服务实例的Key添加到toUpdateKeys中,然后遍历本机中注册在发送请求验证的服务成员上的实例列表,判断实例是否已删除。最后通过onRemove异步执行更新注册表,对于需要更新的数据会回调/v1/ns/distro/datum接口获取最新的实例数据然后解析更新缓存和注册表中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro")
public class DistroController {
@PutMapping("/checksum")
public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap) {
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), dataMap);
distroProtocol.onVerify(distroHttpData);
return ResponseEntity.ok("ok");
}
}
public class DistroProtocol {
public boolean onVerify(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
return false;
}
return dataProcessor.processVerifyData(distroData);
}
}
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
public boolean processVerifyData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
String sourceServer = distroData.getDistroKey().getResourceKey();
Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent();
onReceiveChecksums(verifyData, sourceServer);
return true;
}
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
if (syncChecksumTasks.containsKey(server)) { // Already in process of this server:
return;
}
syncChecksumTasks.put(server, "1"); // 将任务添加到syncChecksumTasks中
try {
List<String> toUpdateKeys = new ArrayList<>(); // 需要更新的列表,包括更新和添加的数据
List<String> toRemoveKeys = new ArrayList<>(); // 需要移除的列表
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {// this key should not be sent from remote server:
return; // abort the procedure: 当前客户端服务注册在当前的服务端上
}
if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).value == null || !dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey()); // 若DataStore缓存中key不存在或key对应的值不存在或值的checksum不相等
}
}
for (String key : dataStore.keys()) {
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue; // 若当前客户端服务不是注册在发送校验请求的服务端成员上
}
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key); // 若当前客户端服务不在接收列表中,说明已被删除
}
}
for (String key : toRemoveKeys) {
onRemove(key); // 将需要删除的service数据从缓存中移除
}
if (toUpdateKeys.isEmpty()) {
return; // 若更新列表为空
}
try {
DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server);
distroKey.getActualResourceTypes().addAll(toUpdateKeys);
DistroData remoteData = distroProtocol.queryFromRemote(distroKey); // 调用对应的服务端成员查询数据
if (null != remoteData) {
processData(remoteData.getContent());
}
} catch (Exception e) {
}
} finally {// Remove this 'in process' flag:
syncChecksumTasks.remove(server);
}
}
public DistroData queryFromRemote(DistroKey distroKey) {
if (null == distroKey.getTargetServer()) {
Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server");
return null;
}
String resourceType = distroKey.getResourceType();
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
if (null == transportAgent) {
Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}", resourceType);
return null;
}
return transportAgent.getData(distroKey, distroKey.getTargetServer());
}
public void onRemove(String key) {
dataStore.remove(key); // 从缓存中移除
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.DELETE); // 更注册表
}
}
public class DistroHttpAgent implements DistroTransportAgent {
public DistroData getData(DistroKey key, String targetServer) {
try {
List<String> toUpdateKeys = null;
if (key instanceof DistroHttpCombinedKey) {
toUpdateKeys = ((DistroHttpCombinedKey) key).getActualResourceTypes();
} else {
toUpdateKeys = new ArrayList<>(1);
toUpdateKeys.add(key.getResourceKey());
}
byte[] queriedData = NamingProxy.getData(toUpdateKeys, key.getTargetServer());
return new DistroData(key, queriedData);
} catch (Exception e) {
}
}
}
public class NamingProxy {
public static byte[] getData(List<String> keys, String server) throws Exception { // 回调/v1/ns/distro/datum接口获取最新的实例数据
Map<String, String> params = new HashMap<>(8);
params.put("keys", StringUtils.join(keys, ","));
RestResult<String> result = HttpClient.httpGetLarge("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL, new HashMap<>(8), JacksonUtils.toJson(params));
if (result.ok()) {
return result.getData().getBytes();
}
throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: " + result.getCode() + " msg: " + result.getMessage());
}
}

最终会将最新的数据批量从缓存中获取然后返给请求方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro")
public class DistroController {
@GetMapping("/datum")
public ResponseEntity get(@RequestBody String body) throws Exception {
JsonNode bodyNode = JacksonUtils.toObj(body);
String keys = bodyNode.get("keys").asText();
String keySplitter = ",";
DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, "");
for (String key : keys.split(keySplitter)) {
distroKey.getActualResourceTypes().add(key);
}
DistroData distroData = distroProtocol.onQuery(distroKey);
return ResponseEntity.ok(distroData.getContent());
}
}
public class DistroProtocol {
public DistroData onQuery(DistroKey distroKey) {
String resourceType = distroKey.getResourceType();
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);
if (null == distroDataStorage) {
return new DistroData(distroKey, new byte[0]);
}
return distroDataStorage.getDistroData(distroKey);
}
}
public class DistroDataStorageImpl implements DistroDataStorage {
public DistroData getDistroData(DistroKey distroKey) {
Map<String, Datum> result = new HashMap<>(1);
if (distroKey instanceof DistroHttpCombinedKey) {
result = dataStore.batchGet(((DistroHttpCombinedKey) distroKey).getActualResourceTypes());
} else {
Datum datum = dataStore.get(distroKey.getResourceKey());
result.put(distroKey.getResourceKey(), datum);
}
byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
return new DistroData(distroKey, dataContent);
}
}
public class DataStore {
public Map<String, Datum> batchGet(List<String> keys) {
Map<String, Datum> map = new HashMap<>(128);
for (String key : keys) {
Datum datum = dataMap.get(key);
if (datum == null) {
continue;
}
map.put(key, datum);
}
return map;
}
}