Nacos配置中心Client原理

客户端配置拉取

对于客户端配置的拉取主要是通过NacosConfigBootstrapConfiguration配置类中注册的NacosPropertySourceLocator来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {
@Bean
@ConditionalOnMissingBean
public NacosConfigProperties nacosConfigProperties() {
return new NacosConfigProperties(); // 配置中心属性配置类,对应Bootstrap.properties中的配置信息
}
@Bean
@ConditionalOnMissingBean
public NacosConfigManager nacosConfigManager(NacosConfigProperties nacosConfigProperties) {
return new NacosConfigManager(nacosConfigProperties); // 持有NacosConfigProperties和ConfigService
}
@Bean
public NacosPropertySourceLocator nacosPropertySourceLocator(NacosConfigManager nacosConfigManager) {
return new NacosPropertySourceLocator(nacosConfigManager); // 加载nacos配置中心的配置信息
}
}

在SpringBoot容器启动时首选通过在SpringApplicationprepareEnvironment中调用SpringApplicationRunListenersenvironmentPrepared方法,从而调用NacosDefaultPropertySourceEnvironmentPostProcessorpostProcessEnvironment方法完成Nacos的默认配置的加载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class NacosDefaultPropertySourceEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered {
public static final String PROPERTY_SOURCE_NAME = "nacos-default";
public static final String RESOURCE_LOCATION_PATTERN = CLASSPATH_ALL_URL_PREFIX + "META-INF/nacos-default.properties";
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
ResourceLoader resourceLoader = getResourceLoader(application);
processPropertySource(environment, resourceLoader);
}
private void processPropertySource(ConfigurableEnvironment environment, ResourceLoader resourceLoader) {
try {
PropertySource nacosDefaultPropertySource = buildPropertySource(resourceLoader);
MutablePropertySources propertySources = environment.getPropertySources();
propertySources.addLast(nacosDefaultPropertySource);
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private PropertySource buildPropertySource(ResourceLoader resourceLoader) throws IOException {
CompositePropertySource propertySource = new CompositePropertySource(PROPERTY_SOURCE_NAME);
appendPropertySource(propertySource, resourceLoader);
return propertySource;
}
private void appendPropertySource(CompositePropertySource propertySource, ResourceLoader resourceLoader)
throws IOException {
ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver(resourceLoader);
Resource[] resources = resourcePatternResolver.getResources(RESOURCE_LOCATION_PATTERN);
for (Resource resource : resources) {
// Add if exists
if (resource.exists()) {
String internalName = String.valueOf(resource.getURL());
propertySource.addPropertySource(new ResourcePropertySource(internalName, new EncodedResource(resource, FILE_ENCODING)));
}
}
}
}

然后在SpringApplicationprepareContext中的applyInitializers调用在spring-cloud-context包中实现的,通过SpringFactoriesLoader加载的ApplicationContextInitializer的实现类PropertySourceBootstrapConfiguration,从而调用NacosPropertySourceLocatorlocate,从服务端加载配置数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class SpringApplication {
protected void applyInitializers(ConfigurableApplicationContext context) {
for (ApplicationContextInitializer initializer : getInitializers()) {
Class<?> requiredType = GenericTypeResolver.resolveTypeArgument(initializer.getClass(), ApplicationContextInitializer.class);
Assert.isInstanceOf(requiredType, context, "Unable to call initializer.");
initializer.initialize(context);
}
}
}
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(PropertySourceBootstrapProperties.class)
public class PropertySourceBootstrapConfiguration implements ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {
public void initialize(ConfigurableApplicationContext applicationContext) {
List<PropertySource<?>> composite = new ArrayList<>();
AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
boolean empty = true;
ConfigurableEnvironment environment = applicationContext.getEnvironment();
for (PropertySourceLocator locator : this.propertySourceLocators) {
Collection<PropertySource<?>> source = locator.locateCollection(environment);
if (source == null || source.size() == 0) {
continue;
}
List<PropertySource<?>> sourceList = new ArrayList<>();
for (PropertySource<?> p : source) {
if (p instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerable = (EnumerablePropertySource<?>) p;
sourceList.add(new BootstrapPropertySource<>(enumerable));
} else {
sourceList.add(new SimpleBootstrapPropertySource(p));
}
}
composite.addAll(sourceList);
empty = false;
}
if (!empty) {
MutablePropertySources propertySources = environment.getPropertySources();
String logConfig = environment.resolvePlaceholders("${logging.config:}");
LogFile logFile = LogFile.get(environment);
for (PropertySource<?> p : environment.getPropertySources()) {
if (p.getName().startsWith(BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
propertySources.remove(p.getName());
}
}
insertPropertySources(propertySources, composite);
reinitializeLoggingSystem(environment, logConfig, logFile);
setLogLevels(applicationContext, environment);
handleIncludedProfiles(environment);
}
}
}
public interface PropertySourceLocator {
default Collection<PropertySource<?>> locateCollection(Environment environment) {
return locateCollection(this, environment);
}
static Collection<PropertySource<?>> locateCollection(PropertySourceLocator locator, Environment environment) {
PropertySource<?> propertySource = locator.locate(environment); // 最终调用实现类NacosPropertySourceLocator的locate方法
if (propertySource == null) {
return Collections.emptyList();
}
if (CompositePropertySource.class.isInstance(propertySource)) {
Collection<PropertySource<?>> sources = ((CompositePropertySource) propertySource).getPropertySources();
List<PropertySource<?>> filteredSources = new ArrayList<>();
for (PropertySource<?> p : sources) {
if (p != null) {
filteredSources.add(p);
}
}
return filteredSources;
} else {
return Arrays.asList(propertySource);
}
}
}
public class NacosPropertySourceLocator implements PropertySourceLocator {
public PropertySource<?> locate(Environment env) {
nacosConfigProperties.setEnvironment(env); // 将Spring容器中的Environment设置到nacosConfigProperties
ConfigService configService = nacosConfigManager.getConfigService();
if (null == configService) {
return null;
}
long timeout = nacosConfigProperties.getTimeout(); // 获取超时时间,默认3000
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);
String name = nacosConfigProperties.getName(); // spring.cloud.nacos.config.name配置的名称
String dataIdPrefix = nacosConfigProperties.getPrefix(); // spring.cloud.nacos.config.prefix配置的值
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name"); // 获取应用名称
}
CompositePropertySource composite = new CompositePropertySource(NACOS_PROPERTY_SOURCE_NAME); // 名称为NACOS
loadSharedConfiguration(composite); // 加载共享的配置文件,对应配置spring.cloud.nacos.config.sharedConfigs
loadExtConfiguration(composite); // 加载扩展的配置文件,对应配置spring.cloud.nacos.config.extensionConfigs
// 加载当前应用的配置,加载顺序:1.文件名(微服务名称);2.文件名.文件扩展名;3.文件名-profile.文件扩展名,但使用优先级是3>2>1
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
}

配置文件的加载顺序是共享配置文件扩展配置文件当前应用配置文件,而当前应用配置文件加载顺序又分为文件名文件名.文件扩展名文件名-profile.文件扩展名。但使用的优先顺序是反过来的。最终都是调用loadNacosDataIfPresent方法来完成配置信息的加载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void loadSharedConfiguration(CompositePropertySource compositePropertySource) {
List<NacosConfigProperties.Config> sharedConfigs = nacosConfigProperties.getSharedConfigs();
if (!CollectionUtils.isEmpty(sharedConfigs)) {
checkConfiguration(sharedConfigs, "shared-configs"); // 校验共享配置文件的dataId
loadNacosConfiguration(compositePropertySource, sharedConfigs);
}
}
private void loadExtConfiguration(CompositePropertySource compositePropertySource) {
List<NacosConfigProperties.Config> extConfigs = nacosConfigProperties.getExtensionConfigs();
if (!CollectionUtils.isEmpty(extConfigs)) {
checkConfiguration(extConfigs, "extension-configs");
loadNacosConfiguration(compositePropertySource, extConfigs);
}
}
private void loadNacosConfiguration(final CompositePropertySource composite, List<NacosConfigProperties.Config> configs) {
for (NacosConfigProperties.Config config : configs) { // 若存在多个则遍历加载
loadNacosDataIfPresent(composite, config.getDataId(), config.getGroup(), NacosDataParserHandler.getInstance().getFileExtension(config.getDataId()), config.isRefresh());
}
}
private void loadApplicationConfiguration(CompositePropertySource compositePropertySource, String dataIdPrefix, NacosConfigProperties properties, Environment environment) {
String fileExtension = properties.getFileExtension(); // 获取文件扩展名
String nacosGroup = properties.getGroup();
// load directly once by default:文件名(微服务名称), 注意这里的isRefreshable都是传的true都支持动态刷新配置
loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);
// load with suffix, which have a higher priority than the default:文件名.文件扩展名
loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
// Loaded with profile, which have a higher priority than the suffix:文件名-profile.文件扩展名
for (String profile : environment.getActiveProfiles()) {
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);
}
}

从这里也可以看出由于会将加载的propertySource添加到composite中队列首部,所以加载顺序与使用顺序是相反的。对于扩展共享配置,默认不支持动态刷新,但当前应用配置是支持动态刷新的。最终通过ConfigServicegetConfig方法从服务端拉取配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void loadNacosDataIfPresent(final CompositePropertySource composite, final String dataId, final String group, String fileExtension, boolean isRefreshable) {
if (null == dataId || dataId.trim().length() < 1) {
return; // dataId为空则直接跳过
}
if (null == group || group.trim().length() < 1) {
return; // group为空则直接跳过,一般有默认值
}
NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);
this.addFirstPropertySource(composite, propertySource, false); // 将加载的propertySource添加到composite中队列首部
}
private NacosPropertySource loadNacosPropertySource(final String dataId, final String group, String fileExtension, boolean isRefreshable) {
if (NacosContextRefresher.getRefreshCount() != 0) { // 若已经被加载过了
if (!isRefreshable) { // 对于扩展和共享配置,默认不支持动态刷新
return NacosPropertySourceRepository.getNacosPropertySource(dataId, group);
}
}
return nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable); // 默认扩展名fileExtension为properties
}
public class NacosPropertySourceBuilder {
NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {
List<PropertySource<?>> propertySources = loadNacosData(dataId, group, fileExtension);
NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources, group, dataId, new Date(), isRefreshable);
NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource); // 将配置加载到缓存中
return nacosPropertySource;
}
private List<PropertySource<?>> loadNacosData(String dataId, String group, String fileExtension) {
String data = null;
try {
data = configService.getConfig(dataId, group, timeout);
if (StringUtils.isEmpty(data)) {
return Collections.emptyList();
}
return NacosDataParserHandler.getInstance().parseNacosData(dataId, data, fileExtension); // 从服务端请求回的数据的解析
} catch (Exception e) {
}
return Collections.emptyList();
}
}

最终调用ConfigService的实现类NacosConfigService,首先从本地换存的配置文件中加载配置数据,加载到数据直接返回,后续是通过定时更新机制来更新数据。若从本地未获取到数据,则通过ClientWorkergetServerConfig从服务端拉取数据,若请求失败还是使用本地缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class NacosConfigService implements ConfigService {
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group); // 若group为空,则设置为DEFAULT_GROUP
ParamUtils.checkKeyParam(dataId, group); // 校验dataId和group是否合法
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant); // namespace
cr.setGroup(group);
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); // 优先使用本地配置
if (content != null) { // 若从本地文件中读取到配置
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try { // 若未获取到配置信息,则远程调用配置中心获取配置信息
String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(ct[0]);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
}
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant); // 若请求远程失败则从本地缓存文件获取数据
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
}

ClientWorker中通过HTTP请求调用服务端/v1/cs/configs接口获取最新的配置信息,最终调用ConfigControllergetConfig方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ClientWorker implements Closeable {
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
String[] ct = new String[2];
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP; // 若group为空则设置为默认值DEFAULT_GROUP
}
HttpRestResult<String> result = null;
try {
Map<String, String> params = new HashMap<String, String>(3);
if (StringUtils.isBlank(tenant)) {
params.put("dataId", dataId);
params.put("group", group);
} else {
params.put("dataId", dataId);
params.put("group", group);
params.put("tenant", tenant);
}
// 调用服务端/v1/cs/configs接口获取最新的配置信息
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (Exception ex) {
throw new NacosException(NacosException.SERVER_ERROR, ex);
}
switch (result.getCode()) {
case HttpURLConnection.HTTP_OK: // 保存结果到本地文件中
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());
ct[0] = result.getData();
if (result.getHeader().getValue(CONFIG_TYPE) != null) {
ct[1] = result.getHeader().getValue(CONFIG_TYPE);
} else {
ct[1] = ConfigType.TEXT.getType();
}
return ct;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return ct;
case HttpURLConnection.HTTP_CONFLICT: {
throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
throw new NacosException(result.getCode(), result.getMessage());
}
default: {
throw new NacosException(result.getCode(), "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
}
}
}

集群模式下并不是数据库查询而是通过DiskUtil#targetTagFile方法查询本地磁盘的缓存,修改配置需要发布ConfigDataChangeEvent事件,触发本地文件和内存更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag)
throws IOException, ServletException, NacosException {
ParamUtils.checkTenant(tenant); // check tenant
tenant = NamespaceUtil.processNamespaceParameter(tenant);
ParamUtils.checkParam(dataId, group, "datumId", "content"); // check params
ParamUtils.checkParam(tag);
final String clientIp = RequestUtil.getRemoteIp(request);
inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
public class ConfigServletInner {
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group, String tenant, String tag, String clientIp) throws IOException, ServletException {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
String autoTag = request.getHeader("Vipserver-Tag");
String requestIpApp = RequestUtil.getAppName(request);
int lockResult = tryConfigReadLock(groupKey); // 获取配置读取锁
final String requestIp = RequestUtil.getRemoteIp(request); // 获取请求方法IP
boolean isBeta = false;
if (lockResult > 0) { // 这里并不是去mysql而是查询本地磁盘的缓存,修改配置需要发布ConfigDataChangeEvent事件,触发本地文件和内存更新
FileInputStream fis = null;
try {
String md5 = Constants.NULL;
long lastModified = 0L;
CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
if (cacheItem != null) { // 若缓存cacheItem不为空
if (cacheItem.isBeta()) { // 若isBeta为true
if (cacheItem.getIps4Beta().contains(clientIp)) { // ips4Beta列表中包含请求方ip
isBeta = true;
}
}
final String configType = (null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();
response.setHeader("Config-Type", configType);
FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType); // 获取配置文件类型
String contentTypeHeader = fileTypeEnum.getContentType();
response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);
}
File file = null;
ConfigInfoBase configInfoBase = null;
PrintWriter out = null;
if (isBeta) { // 若isBeta为true 说明缓存cacheItem存在
md5 = cacheItem.getMd54Beta();
lastModified = cacheItem.getLastModifiedTs4Beta(); // 最后修改时间
if (PropertyUtil.isDirectRead()) { // 是单例部署,或使用内嵌存储
configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);
} else { // 集群部署
file = DiskUtil.targetBetaFile(dataId, group, tenant); // 获取到数据文件
}
response.setHeader("isBeta", "true");
} else { // 若isBeta为false
if (StringUtils.isBlank(tag)) { // 一般tag为null
if (isUseTag(cacheItem, autoTag)) {
if (cacheItem != null) {
if (cacheItem.tagMd5 != null) {
md5 = cacheItem.tagMd5.get(autoTag);
}
if (cacheItem.tagLastModifiedTs != null) {
lastModified = cacheItem.tagLastModifiedTs.get(autoTag);
}
}
if (PropertyUtil.isDirectRead()) { // 是单例部署,或使用内嵌存储
configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, autoTag);
} else {
file = DiskUtil.targetTagFile(dataId, group, tenant, autoTag); // 获取到数据文件
}
response.setHeader("Vipserver-Tag", URLEncoder.encode(autoTag, StandardCharsets.UTF_8.displayName()));
} else {
md5 = cacheItem.getMd5();
lastModified = cacheItem.getLastModifiedTs();
if (PropertyUtil.isDirectRead()) { // 是单例部署,或使用内嵌存储
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
file = DiskUtil.targetFile(dataId, group, tenant); // 获取到数据文件
}
if (configInfoBase == null && fileNotExist(file)) { // 若配置文件不存在
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
}
} else { // tag不为null
if (cacheItem != null) {
if (cacheItem.tagMd5 != null) {
md5 = cacheItem.tagMd5.get(tag);
}
if (cacheItem.tagLastModifiedTs != null) {
Long lm = cacheItem.tagLastModifiedTs.get(tag);
if (lm != null) {
lastModified = lm;
}
}
}
if (PropertyUtil.isDirectRead()) {
configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
} else {
file = DiskUtil.targetTagFile(dataId, group, tenant, tag);
}
if (configInfoBase == null && fileNotExist(file)) {
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
}
}
response.setHeader(Constants.CONTENT_MD5, md5);
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
if (PropertyUtil.isDirectRead()) { // 是单例部署,或使用内嵌存储
response.setDateHeader("Last-Modified", lastModified);
} else {
fis = new FileInputStream(file);
response.setDateHeader("Last-Modified", file.lastModified());
}
if (PropertyUtil.isDirectRead()) { // 是单例部署,或使用内嵌存储
out = response.getWriter();
out.print(configInfoBase.getContent());
out.flush();
out.close();
} else {
fis.getChannel().transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));
}
} finally {
releaseConfigReadLock(groupKey);
IoUtils.closeQuietly(fis);
}
} else if (lockResult == 0) {
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
} else { // 未获取读取数据的锁
response.setStatus(HttpServletResponse.SC_CONFLICT);
response.getWriter().println("requested file is being modified, please try later.");
return HttpServletResponse.SC_CONFLICT + "";

}
return HttpServletResponse.SC_OK + "";
}
}

客户端数据监听

数据的监听是通过NacosConfigBootstrapConfiguration配置类中注册的NacosConfigManager中创建的NacosConfigService来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class NacosConfigManager {
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
this.nacosConfigProperties = nacosConfigProperties;
createConfigService(nacosConfigProperties);
}
static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {
if (Objects.isNull(service)) {
synchronized (NacosConfigManager.class) {
try {
if (Objects.isNull(service)) { // 若ConfigService为null则通过反射的方式调用NacosConfigService的构造器进行实例化
service = NacosFactory.createConfigService(nacosConfigProperties.assembleConfigServiceProperties());
}
} catch (NacosException e) {
log.error(e.getMessage());
throw new NacosConnectionFailureException(nacosConfigProperties.getServerAddr(), e.getMessage(), e);
}
}
}
return service;
}
}

NacosConfigService主要是通过构造方法创建的ClientWorker客户端工作类,为需要刷新的配置创建用来监听配置更新长轮训LongPollingRunnable,该任务周期执行,正常情况执行周期是任务执行完后10ms,若发送异常则2s后执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class NacosConfigService implements ConfigService {
public NacosConfigService(Properties properties) throws NacosException { // 被ConfigFactory.createConfigService反射调用实例化
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
initNamespace(properties);

this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); // 用来向Nacos Server即配置的注册中心发起请求的代理
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); // 客户端工作类创建一个LongPollingRunnable用来监听配置更新
}
}
public class ClientWorker implements Closeable {
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
init(properties); // Initialize the timeout parameter
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 线程数等于处理器个数的线程池,用来执行LongPollingRunnable#run方法,在checkConfigInfo方法中被放入线程中
this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo(); // 每10ms执行一次定时任务,将cacheMap中的数量以3000分一个组,分别创建一个LongPollingRunnable用来监听配置更新
} catch (Throwable e) {
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
public void checkConfigInfo() {
// Dispatch taskes. cacheMap中缓存着需要刷新的配置,将cacheMap中的数量以3000分一个组,分别创建一个LongPollingRunnable用来监听配置更新
int listenerSize = cacheMap.size();
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); // perTaskConfigSize默认为3000
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
executorService.execute(new LongPollingRunnable(i)); // 创建一个长轮训
}
currentLongingTaskCount = longingTaskCount;
}
}
}

长轮训首先检查本地配置,若存在本地配置,且与缓存中的本地配置版本不一致,把本地配置内容更新到缓存中,并触发事件,然后向服务端发送长连接,30s超时,服务端会返回变化的dataIds,让后客户端根据变化的dataId,从服务端拉取最新的配置内容,并更新本地快照和缓存,对有变化的配置触发监听事件类处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class LongPollingRunnable implements Runnable {
private final int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() { // 客户端pull长轮询,出现异常延迟2s执行
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
for (CacheData cacheData : cacheMap.values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData); // 容错配置,用于检测本地的配置,若本地配置更新则更新缓存
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5(); // 对于变化的配置调用对应的监听器去处理
}
} catch (Exception e) {
}
}
}
// check server config,向Nacos Server发一个长连接30s超时,返回Nacos Server中有更新过的dataIds
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) { // 遍历变更列表
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try { // 根据变化的dataId调用Nacos Config服务端获取配置信息,并更新本地快照
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]); // 更新缓存数据
if (null != ct[1]) {
cache.setType(ct[1]);
}
} catch (NacosException ioe) {
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5(); // 对于变化的配置调用对应的监听器去处理
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this); // 若正常则延迟10ms执行
} catch (Throwable e) {
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); // 若异常则延迟2s执行
}
}
}

最终通过调用注册中心的/v1/cs/configs/listener接口来发送长连接。最终调用ConfigControllerlistener方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ClientWorker implements Closeable {
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) { // 若不存在namespace
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) { // isInitializing默认为true
// It updates when cacheData occours in cacheMap by first time.
inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
Map<String, String> params = new HashMap<String, String>(2);
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); // Listening-Configs
Map<String, String> headers = new HashMap<String, String>(2);
headers.put("Long-Pulling-Timeout", "" + timeout); // timeout默认为30000ms即30s
if (isInitializingCacheList) {
headers.put("Long-Pulling-Timeout-No-Hangup", "true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList(); // 若没有数据则直接返回
}
try {
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); // 默认为45s
// 调用注册中心的/v1/cs/configs/listener接口
HttpRestResult<String> result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs);
if (result.ok()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData()); // 解析服务端返回的数据
} else {
setHealthServer(false);
}
} catch (Exception e) {
setHealthServer(false);
throw e;
}
return Collections.emptyList();
}
}

服务端接受到请求后,首先遍历从缓存中获取数据比较缓存内容的的MD5值是否相等,若存在MD5不一致则生成响应信息直接返回给客户端,最终在ClientLongPolling异步任务中保持长连接,服务端最多处理时长为29.5s,需留0.5s来返回,以免客户端超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
public class ConfigServletInner {
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
if (LongPollingService.isSupportLongPolling(request)) { // 若支持长连接
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// // 遍历从缓存中获取数据比较缓存内容的的MD5值是否相等,返回不相等的groupKey列表,最终将有变更的配置返给请求方
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
String oldResult = MD5Util.compareMd5OldResult(changedGroups); // Compatible with short polling result.
String newResult = MD5Util.compareMd5ResultString(changedGroups);

String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
Loggers.AUTH.info("new content:" + newResult);
response.setHeader("Pragma", "no-cache"); // Disable cache.
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
}
public class LongPollingService {
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
// 服务端这边最多处理时长为29.5s,需留0.5s来返回,以免客户端超时
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); // delayTime默认为500
long timeout = Math.max(10000, Long.parseLong(str) - delayTime); // timeout默认为 30000 - 500 = 29500
if (isFixedPolling()) { // 默认为false
timeout = Math.max(10000, getFixedPollingInterval());
} else {
long start = System.currentTimeMillis();
// 遍历从缓存中获取数据比较缓存内容的的MD5值是否相等, 返回变更列表
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups); // 若有MD5不一致则生成响应信息直接返回
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
final AsyncContext asyncContext = req.startAsync(); // Must be called by http thread, or send response.
asyncContext.setTimeout(0L); // AsyncContext.setTimeout() is incorrect, Control by oneself
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
}

首先将任务添加到延时线程池中,延时时间为29.5s,若存在变化的数据则直接返回变化的数据,客户端在发起下一次长连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ClientLongPolling implements Runnable {
@Override
public void run() {
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() { // 1.创建一个调度任务,任务的延迟时间为29.5s
@Override
public void run() { // 执行长链接任务,延迟29.5s后执行
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
allSubs.remove(ClientLongPolling.this); // 3.从队列中移除当前任务
if (isFixedPolling()) {
// 从服务端本机上获取保存的对应客户端请求的groupKeys,检查是否发生变更,并将变更结果返回给客户端
List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest) asyncContext.getRequest(), (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups); // 有变更则返回变更列表
} else {
sendResponse(null); // 若无变更则返回null
}
} else {
sendResponse(null);
}
} catch (Throwable t) {
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this); // 2.无论配置是否更新,最终都会进行响应,延迟29.5s执行,然后把自己添加到一个队列中
}
}

客户端数刷新

SpringApplicationrun方法中调用SpringApplicationRunListenersrunning方法,从而调用EventPublishingRunListenerrunning方法发布ApplicationReadyEvent事件,在NacosConfigAutoConfiguration配置类中注入的NacosContextRefresher实现了ApplicationListener接口且监听了ApplicationReadyEvent事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class EventPublishingRunListener implements SpringApplicationRunListener, Ordered {
public void running(ConfigurableApplicationContext context) {
context.publishEvent(new ApplicationReadyEvent(this.application, this.args, context));
AvailabilityChangeEvent.publish(context, ReadinessState.ACCEPTING_TRAFFIC);
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigAutoConfiguration {
@Bean
public NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {
return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
}
}

在接收到ApplicationReadyEvent事件后,遍历所有的配置为每个dataId注册一个AbstractSharedListener监听器,通过NacosConfigServiceaddListener方法最终将监听器添加到CacheData中,这里监听器的作用是在配置发送变更是会调用该监听器进行配置的刷新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
public NacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory refreshHistory) {
this.nacosConfigProperties = nacosConfigManager.getNacosConfigProperties();
this.nacosRefreshHistory = refreshHistory;
this.configService = nacosConfigManager.getConfigService();
this.isRefreshEnabled = this.nacosConfigProperties.isRefreshEnabled(); // 刷新配置的主开关,默认打开
}
public void onApplicationEvent(ApplicationReadyEvent event) {
if (this.ready.compareAndSet(false, true)) { // many Spring context
this.registerNacosListenersForApplications();
}
}
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) { // 默认为true
for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) { // 遍历所有的配置
if (!propertySource.isRefreshable()) {
continue; // 若单个的配置不支持刷新配置
}
String dataId = propertySource.getDataId();
registerNacosListener(propertySource.getGroup(), dataId); // 为每个dataId注册监听器
}
}
}
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group, String configInfo) {
refreshCountIncrement(); // 增加刷新次数
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); // 添加刷新记录
// 发布RefreshEvent事件,对应的监听器为RefreshEventListener
applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));
}
});
try {
configService.addListener(dataKey, groupKey, listener);
} catch (NacosException e) {
}
}
}
public class NacosConfigService implements ConfigService {
private final ClientWorker worker;
public void addListener(String dataId, String group, Listener listener) throws NacosException {
// 在NacosContextRefresher中onApplicationEvent中监听ApplicationReadyEvent事件时为每一个dataId注册了一个监听器,调用本方法添加创建的监听器
worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
}
public class ClientWorker implements Closeable {
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
// 被NacosConfigService的addListener方法调用,
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
// 这里的Listener是在NacosContextRefresher中onApplicationEvent中监听ApplicationReadyEvent事件时为每一个dataId注册了一个AbstractSharedListener监听器
for (Listener listener : listeners) {
cache.addListener(listener);
}
}
}
public class CacheData {
public void addListener(Listener listener) {
if (null == listener) {
throw new IllegalArgumentException("listener is null");
}
// 这里的Listener是在NacosContextRefresher中onApplicationEvent中监听ApplicationReadyEvent事件时为每一个dataId注册了一个AbstractSharedListener监听器
ManagerListenerWrap wrap = (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content) : new ManagerListenerWrap(listener, md5);
if (listeners.addIfAbsent(wrap)) {
}
}
}

发布的RefreshEvent事件会被监听器RefreshEventListener监听到,把原来的Environment中的参数方法放到一个临时的Spring容器中重新加载,加载完毕后关闭临时容器,就将最新的参数加载到了容器中了,将新值与之前得到的值进行对比,找出变化的参数,并将变化的数据通过EnvironmentChangeEvent事件通知监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class RefreshEventListener implements SmartApplicationListener {
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
handle((ApplicationReadyEvent) event);
} else if (event instanceof RefreshEvent) {
handle((RefreshEvent) event);
}
}
public void handle(RefreshEvent event) {
if (this.ready.get()) { // don't handle events before app is ready
Set<String> keys = this.refresh.refresh(); // 刷新容器中标记了@RefreshScope的Bean
}
}
}
public class ContextRefresher {
public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll(); // 处理所有带有@RefreshScope注解的类将其销毁
return keys;
}
public synchronized Set<String> refreshEnvironment() {
// 抽取除了system、jndi、servlet之外的所有参数变量
Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
addConfigFilesToEnvironment(); // 把原来的Environment中的参数方法放到一个临时的Spring容器中重新加载,加载完毕后关闭临时容器,就将最新的参数加载到了容器中
// 获取新的参数值,将其与之前得到的值进行对比,找出变化的参数
Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys)); // 发布变更时间,并带上改变的参数值。
return keys;
}
}

最终EnvironmentChangeEvent事件会被ConfigurationPropertiesRebinder监听,最终会将所有带有@ConfigurationProperties注解的配置类的Bean销毁,然后重新创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ConfigurationPropertiesRebinder implements ApplicationContextAware, ApplicationListener<EnvironmentChangeEvent> {
public void onApplicationEvent(EnvironmentChangeEvent event) {
if (this.applicationContext.equals(event.getSource()) || event.getKeys().equals(event.getSource())) {
rebind();
}
}
public void rebind() {
this.errors.clear();
for (String name : this.beans.getBeanNames()) {
rebind(name);
}
}
public boolean rebind(String name) {
if (!this.beans.getBeanNames().contains(name)) {
return false;
}
if (this.applicationContext != null) {
try {
Object bean = this.applicationContext.getBean(name);
if (AopUtils.isAopProxy(bean)) {
bean = ProxyUtils.getTargetObject(bean);
}
if (bean != null) {
if (getNeverRefreshable().contains(bean.getClass().getName())) {
return false; // ignore
}
this.applicationContext.getAutowireCapableBeanFactory().destroyBean(bean);
this.applicationContext.getAutowireCapableBeanFactory().initializeBean(bean, name);
return true;
}
} catch (RuntimeException e) {
this.errors.put(name, e);
throw e;
} catch (Exception e) {
this.errors.put(name, e);
throw new IllegalStateException("Cannot rebind to " + name, e);
}
}
return false;
}
}
public class ConfigurationPropertiesBeans implements BeanPostProcessor, ApplicationContextAware {
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (isRefreshScoped(beanName)) {
return bean;
}
ConfigurationPropertiesBean propertiesBean = ConfigurationPropertiesBean.get(this.applicationContext, bean, beanName);
if (propertiesBean != null) {
this.beans.put(beanName, propertiesBean);
}
return bean;
}
}

@RefreshScope注解标注的Bean,在RefreshAutoConfiguration配置类中注册了RefreshScopeBeanDefinitionEnhancer后置处理器,在其postProcessBeanDefinitionRegistry将Bean的scope作用于设置为了refresh,在通过getBean创建Bean时,会通过判断Bean的作用域走相应的逻辑。最终会调用GenericScopeget方法,将Bean放入BeanLifecycleWrapperCache中,在调用RefreshScoperefreshAll方法是会被销毁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class GenericScope implements Scope, BeanFactoryPostProcessor, BeanDefinitionRegistryPostProcessor, DisposableBean {
private BeanLifecycleWrapperCache cache = new BeanLifecycleWrapperCache(new StandardScopeCache());
public Object get(String name, ObjectFactory<?> objectFactory) {
BeanLifecycleWrapper value = this.cache.put(name, new BeanLifecycleWrapper(name, objectFactory));
this.locks.putIfAbsent(name, new ReentrantReadWriteLock());
try {
return value.getBean();
} catch (RuntimeException e) {
this.errors.put(name, e);
throw e;
}
}
public void destroy() {
List<Throwable> errors = new ArrayList<Throwable>();
Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
for (BeanLifecycleWrapper wrapper : wrappers) {
try {
Lock lock = this.locks.get(wrapper.getName()).writeLock();
lock.lock();
try {
wrapper.destroy();
} finally {
lock.unlock();
}
} catch (RuntimeException e) {
errors.add(e);
}
}
if (!errors.isEmpty()) {
throw wrapIfNecessary(errors.get(0));
}
this.errors.clear();
}
}
public class RefreshScope extends GenericScope implements ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, Ordered {
public void refreshAll() {
super.destroy(); // 调用超类GenericScope的destroy方法
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
}