Sentinel配置持久化

sentinel规则的推送模式原始模式Pull模式Push模式三种,原始模式是通过API将规则数据推送至客户端,并直接更新到内存中,简单无任何依赖,但规则保存在内存中,重启失效;

Pull模式

Pull模式是通过扩展写数据源WritableDataSource,客户端主动向某个规则管理中心定期轮训拉取规则,可以是文件或数据库,不能保证很强的实时性,过于频繁拉取可能造成性能问题。

在第一次调用接口时,会执行Env的静态代码块,该静态代码块中会通过SPI机制加载sentinel相关依赖包下META/services/下所有的InitFunc实例类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Env {
public static final Sph sph = new CtSph();
static { // If init fails, the process will exit.
InitExecutor.doInit(); // 通过SPI机制加载sentinel相关依赖包下META/services/下所有的InitFunc实例类
}
}
public final class InitExecutor {
public static void doInit() { // Sentinel扩展点,通过SPI的方式加载实现了InitFunc接口的类
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info(String.format("[InitExecutor] Executing %s with order %d", w.func.getClass().getCanonicalName(), w.order));
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
private static void insertSorted(List<OrderWrapper> list, InitFunc func) {
int order = resolveOrder(func); // 获取排序,若该类上没有@InitOrder注解则默认是最低优先级即Integer.MAX_VALUE
int idx = 0;
for (; idx < list.size(); idx++) {
if (list.get(idx).getOrder() > order) {
break;
}
}
list.add(idx, new OrderWrapper(order, func)); // 将其按照order从大到小即优先级越低越排在前面的顺序放入List中
}
private static int resolveOrder(InitFunc func) {
if (!func.getClass().isAnnotationPresent(InitOrder.class)) {
return InitOrder.LOWEST_PRECEDENCE;
} else {
return func.getClass().getAnnotation(InitOrder.class).value();
}
}
}

可通过SPI机制自定义一个FileDataSourceInit类实现InitFunc接口用文件的方式持久化规则配置,当然也可以通过数据库的方式来实现持久化规则配置原理都一样。

1
2
3
4
5
6
7
8
9
10
11
12
public class FileDataSourceInit implements InitFunc {
@Override
public void init() throws Exception {
RuleFileUtils.mkdirIfNotExits(PersistenceRuleConstant.STORE_PATH); // 创建文件存储目录
RuleFileUtils.createFileIfNotExits(PersistenceRuleConstant.RULES_MAP); // 创建规则文件
dealFlowRules(); // 处理流控规则逻辑 配置读写数据源
dealDegradeRules(); // 处理降级规则
dealSystemRules(); // 处理系统规则
dealParamFlowRules(); // 处理热点参数规则
dealAuthRules(); // 处理授权规则
}
}

首先通过文件路径初始化FileRefreshableDataSource,在其初始化是首先会先初始化其超类AbstractDataSource,在该超类中会对Converter进行赋值以及SentinelProperty初始化为DynamicSentinelProperty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void dealFlowRules() throws FileNotFoundException {
String ruleFilePath = PersistenceRuleConstant.RULES_MAP.get(PersistenceRuleConstant.FLOW_RULE_PATH).toString();
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new FileRefreshableDataSource(ruleFilePath, RuleListConverterUtils.flowRuleListParser);
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());

WritableDataSource<List<FlowRule>> flowRuleWDS = new FileWritableDataSource<List<FlowRule>>(
ruleFilePath, RuleListConverterUtils.flowFuleEnCoding
);
// 将可写数据源注册至transport模块的WritableDataSourceRegistry中.
// 这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.
WritableDataSourceRegistry.registerFlowDataSource(flowRuleWDS);
}
public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> {
protected final Converter<S, T> parser;
protected final SentinelProperty<T> property;
public AbstractDataSource(Converter<S, T> parser) {
if (parser == null) {
throw new IllegalArgumentException("parser can't be null");
}
this.parser = parser;
this.property = new DynamicSentinelProperty<T>();
}
public T loadConfig() throws Exception {
return loadConfig(readSource());
}
public T loadConfig(S conf) throws Exception {
T value = parser.convert(conf);
return value;
}
}

然后再初始化AutoRefreshDataSource超类,在该超类中会添加一个定时刷新文件的规则数据的定时任务,在更新内存中规则时是通过DynamicSentinelProperty中注册的监听器来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public abstract class AutoRefreshDataSource<S, T> extends AbstractDataSource<S, T> {
private ScheduledExecutorService service;
protected long recommendRefreshMs = 3000;
public AutoRefreshDataSource(Converter<S, T> configParser, final long recommendRefreshMs) {
super(configParser);
if (recommendRefreshMs <= 0) {
throw new IllegalArgumentException("recommendRefreshMs must > 0, but " + recommendRefreshMs + " get");
}
this.recommendRefreshMs = recommendRefreshMs;
startTimerService(); // 扩展自动更新功能
}
private void startTimerService() { // 自动更新定时任务,开启线程监控本地文件最后修改时间,周期3s
service = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-datasource-auto-refresh-task", true));
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (!isModified()) { // 若没有修改则直接跳过
return;
}
T newValue = loadConfig(); // 若修改过,获取文件最后的修改时间,并加载文件
getProperty().updateValue(newValue); // 更新到内存中
} catch (Throwable e) {
}
}
}, recommendRefreshMs, recommendRefreshMs, TimeUnit.MILLISECONDS); // 默认每3s执行一次
}
}
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
protected Set<PropertyListener<T>> listeners = Collections.synchronizedSet(new HashSet<PropertyListener<T>>());
private T value = null;
public void addListener(PropertyListener<T> listener) {
listeners.add(listener);
listener.configLoad(value);
}
public boolean updateValue(T newValue) {
if (isEqual(value, newValue)) {
return false;
}
value = newValue;
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue); // 用新规则替换旧规则,并通知所有所有属性监听器回调configUpdate
}
return true;
}
}
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
}
@Override
public void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
}
}

在初始化FileRefreshableDataSource时会将规则数据加载到内存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> {
public FileRefreshableDataSource(String fileName, Converter<String, T> configParser) throws FileNotFoundException {
this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
}
public FileRefreshableDataSource(File file, Converter<String, T> configParser, long recommendRefreshMs, int bufSize, Charset charset) throws FileNotFoundException {
super(configParser, recommendRefreshMs);
if (bufSize <= 0 || bufSize > MAX_SIZE) {
throw new IllegalArgumentException("bufSize must between (0, " + MAX_SIZE + "], but " + bufSize + " get");
}
if (file == null || file.isDirectory()) {
throw new IllegalArgumentException("File can't be null or a directory");
}
if (charset == null) {
throw new IllegalArgumentException("charset can't be null");
}
this.buf = new byte[bufSize];
this.file = file;
this.charset = charset;
// If the file does not exist, the last modified will be 0.
this.lastModified = file.lastModified();
firstLoad();
}
private void firstLoad() {
try {
T newValue = loadConfig();
getProperty().updateValue(newValue);
} catch (Throwable e) {
RecordLog.info("loadConfig exception", e);
}
}
public String readSource() throws Exception { // AbstractDataSource中的loadConfig方法调用该方法加载数据
if (!file.exists()) {// Will throw FileNotFoundException later.
RecordLog.warn(String.format("[FileRefreshableDataSource] File does not exist: %s", file.getAbsolutePath()));
}
FileInputStream inputStream = null;
try {
inputStream = new FileInputStream(file);
FileChannel channel = inputStream.getChannel();
if (channel.size() > buf.length) {
throw new IllegalStateException(file.getAbsolutePath() + " file size=" + channel.size() + ", is bigger than bufSize=" + buf.length + ". Can't read");
}
int len = inputStream.read(buf);
return new String(buf, 0, len, charset);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception ignore) {
}
}
}
}
protected boolean isModified() { // 在定时更新规则配置的任务中调用该方法判断文件是否被改变
long curLastModified = file.lastModified();
if (curLastModified != this.lastModified) {
this.lastModified = curLastModified;
return true;
}
return false;
}
}

然后通过FlowRuleManagerSentinelProperty注册监听器,用于规则数据的刷新,这里是以流控规则为例,其他的规则通过相应的规则管理器的对应方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class FlowRuleManager {
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-metrics-record-task", true));
static {
currentProperty.addListener(LISTENER);
SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); // 监控指标的定时统计
}
public static void register2Property(SentinelProperty<List<FlowRule>> property) {
AssertUtil.notNull(property, "property cannot be null");
synchronized (LISTENER) {
currentProperty.removeListener(LISTENER);
property.addListener(LISTENER);
currentProperty = property;
}
}
}
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
@Override
public void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules);
}
}

对于写规则的实现其实相对简单,最终会调用WritableDataSourcewrite方法完成数据的持久化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class FileWritableDataSource<T> implements WritableDataSource<T> {
public FileWritableDataSource(String filePath, Converter<T, String> configEncoder) {
this(new File(filePath), configEncoder);
}
public FileWritableDataSource(File file, Converter<T, String> configEncoder) {
this(file, configEncoder, DEFAULT_CHARSET);
}
public FileWritableDataSource(File file, Converter<T, String> configEncoder, Charset charset) {
if (file == null || file.isDirectory()) {
throw new IllegalArgumentException("Bad file");
}
if (configEncoder == null) {
throw new IllegalArgumentException("Config encoder cannot be null");
}
if (charset == null) {
throw new IllegalArgumentException("Charset cannot be null");
}
this.configEncoder = configEncoder;
this.file = file;
this.charset = charset;
}
public void write(T value) throws Exception {
lock.lock();
try {
String convertResult = configEncoder.convert(value);
FileOutputStream outputStream = null;
try {
outputStream = new FileOutputStream(file);
byte[] bytesArray = convertResult.getBytes(charset);
RecordLog.info(String.format("[FileWritableDataSource] Writing to file %s: %s", file.toString(), convertResult));
outputStream.write(bytesArray);
outputStream.flush();
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (Exception ignore) {
}
}
}
} finally {
lock.unlock();
}
}
}

最后通过WritableDataSourceRegistry将对应的WritableDataSource进行注册,在CommandHandler中对其更新是会调用具体的方法获取具体的WritableDataSource进行write操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class WritableDataSourceRegistry {
private static WritableDataSource<List<FlowRule>> flowDataSource = null;
private static WritableDataSource<List<AuthorityRule>> authorityDataSource = null;
private static WritableDataSource<List<DegradeRule>> degradeDataSource = null;
private static WritableDataSource<List<SystemRule>> systemSource = null;
public static synchronized void registerFlowDataSource(WritableDataSource<List<FlowRule>> datasource) {
flowDataSource = datasource;
}
public static synchronized void registerAuthorityDataSource(WritableDataSource<List<AuthorityRule>> dataSource) {
authorityDataSource = dataSource;
}
public static synchronized void registerDegradeDataSource(WritableDataSource<List<DegradeRule>> dataSource) {
degradeDataSource = dataSource;
}
public static synchronized void registerSystemDataSource(WritableDataSource<List<SystemRule>> dataSource) {
systemSource = dataSource;
}
}

Sentinel控制台通过API将规则推送至客户端并更新到内存中,接着注册的写数据源会将新的规则保存到本地的文件中。使用pull模式的数据源时一般不需要对Sentinel控制台进行改造。使用时首先需要引入sentinel-datasource-extension依赖,让后将自定义的持久化包引入即可。

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
<version>1.8.0</version>
</dependency>
<!-- sentinel规则pull模式依赖 改造拉模式实现-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension-file-pull</artifactId>
<version>1.8.0</version>
</dependency>

Push模式

Push是通过通过Sentinel控制台直接推送到Nacos配置中心,然后配置中心再更新到本地,基于Nacos配置中心的推送需要额外引入sentinel-datasource-nacos依赖。且配置相应规则的数据源:

1
2
3
4
5
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
<version>1.8.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
ephemeral: false
sentinel:
transport:
# 添加sentinel的控制台地址
dashboard: 127.0.0.1:8080
# 指定应用与Sentinel控制台交互的端口,应用本地会起一个该端口占用的HttpServer
port: 8719
datasource:
ds-flow:
nacos:
server-addr: 127.0.0.1:8848
dataId: ${spring.application.name}-flow
groupId: DEFAULT_GROUP
namespace: sentinel
data-type: json
rule-type: flow
ds-degrade:
nacos:
server-addr: 127.0.0.1:8848
dataId: ${spring.application.name}-degrade
groupId: DEFAULT_GROUP
namespace: sentinel
data-type: json
rule-type: degrade
ds-param-flow:
nacos:
server-addr: 127.0.0.1:8848
dataId: ${spring.application.name}-param-flow
groupId: DEFAULT_GROUP
namespace: sentinel
data-type: json
rule-type: param-flow
ds-system:
nacos:
server-addr: 127.0.0.1:8848
dataId: ${spring.application.name}-system
groupId: DEFAULT_GROUP
namespace: sentinel
data-type: json
rule-type: system
ds-authority:
nacos:
server-addr: 127.0.0.1:8848
dataId: ${spring.application.name}-authority
groupId: DEFAULT_GROUP
namespace: sentinel
data-type: json
rule-type: authority

对于Nacos的集成是通过SentinelDataSourceHandler中遍历SentinelPropertiesdatasource属性,该属性是一个存放DataSourcePropertiesConfiguration的Map,而DataSourcePropertiesConfiguration是持有具体的类型的配置类。最后通过AbstractDataSourceProperties根据具体的配置的RuleType调用具体规则管理器给具体的SentinelProperty注册监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class SentinelProperties {
private Map<String, DataSourcePropertiesConfiguration> datasource = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
public Map<String, DataSourcePropertiesConfiguration> getDatasource() {
return datasource;
}
}
public class DataSourcePropertiesConfiguration {
private FileDataSourceProperties file;
private NacosDataSourceProperties nacos;
private ZookeeperDataSourceProperties zk;
private ApolloDataSourceProperties apollo;
private RedisDataSourceProperties redis;
private ConsulDataSourceProperties consul;
}
public class SentinelDataSourceHandler implements SmartInitializingSingleton {
public void afterSingletonsInstantiated() {
sentinelProperties.getDatasource().forEach((dataSourceName, dataSourceProperties) -> {
try {
List<String> validFields = dataSourceProperties.getValidField();
if (validFields.size() != 1) {
log.error("[Sentinel Starter] DataSource " + dataSourceName + " multi datasource active and won't loaded: " + dataSourceProperties.getValidField());
return;
}
AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties.getValidDataSourceProperties();
abstractDataSourceProperties.setEnv(env);
abstractDataSourceProperties.preCheck(dataSourceName);
registerBean(abstractDataSourceProperties, dataSourceName + "-sentinel-" + validFields.get(0) + "-datasource");
} catch (Exception e) {
}
});
}
private void registerBean(final AbstractDataSourceProperties dataSourceProperties, String dataSourceName) {
Map<String, Object> propertyMap = Arrays.stream(dataSourceProperties.getClass().getDeclaredFields()).collect(HashMap::new, (m, v) -> {
try {
v.setAccessible(true);
m.put(v.getName(), v.get(dataSourceProperties));
} catch (IllegalAccessException e) {
throw new RuntimeException("[Sentinel Starter] DataSource " + dataSourceName + " field: " + v.getName() + " invoke error", e);
}
}, HashMap::putAll);
propertyMap.put(CONVERTER_CLASS_FIELD, dataSourceProperties.getConverterClass());
propertyMap.put(DATA_TYPE_FIELD, dataSourceProperties.getDataType());
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(dataSourceProperties.getFactoryBeanName());
propertyMap.forEach((propertyName, propertyValue) -> {
Field field = ReflectionUtils.findField(dataSourceProperties.getClass(), propertyName);
if (null == field) {
return;
}
if (DATA_TYPE_FIELD.equals(propertyName)) {
String dataType = StringUtils.trimAllWhitespace(propertyValue.toString());
if (CUSTOM_DATA_TYPE.equals(dataType)) {
try {
if (StringUtils.isEmpty(dataSourceProperties.getConverterClass())) {
throw new RuntimeException("[Sentinel Starter] DataSource " + dataSourceName + "dataType is custom, please set converter-class property");
}
String customConvertBeanName = "sentinel-" + dataSourceProperties.getConverterClass();
if (!this.beanFactory.containsBean(customConvertBeanName)) {
this.beanFactory.registerBeanDefinition(customConvertBeanName, BeanDefinitionBuilder.genericBeanDefinition(Class.forName(dataSourceProperties.getConverterClass())).getBeanDefinition());
}
builder.addPropertyReference("converter", customConvertBeanName);
} catch (ClassNotFoundException e) {
throw new RuntimeException("[Sentinel Starter] DataSource " + dataSourceName + " handle " + dataSourceProperties.getClass().getSimpleName() + " error, class name: " + dataSourceProperties.getConverterClass(), e);
}
} else {
if (!dataTypeList.contains(StringUtils.trimAllWhitespace(propertyValue.toString()))) {
throw new RuntimeException("[Sentinel Starter] DataSource " + dataSourceName + " dataType: " + propertyValue + " is not support now. please using these types: " + dataTypeList.toString());
}
builder.addPropertyReference("converter", "sentinel-" + propertyValue.toString() + "-" + dataSourceProperties.getRuleType().getName() + "-converter");
}
} else if (CONVERTER_CLASS_FIELD.equals(propertyName)) {
return;
} else { // wired properties
Optional.ofNullable(propertyValue).ifPresent(v -> builder.addPropertyValue(propertyName, v));
}
});
this.beanFactory.registerBeanDefinition(dataSourceName, builder.getBeanDefinition());
AbstractDataSource newDataSource = (AbstractDataSource) this.beanFactory.getBean(dataSourceName);
dataSourceProperties.postRegister(newDataSource); // 注意配置的数据源类型,给具体的类型注册监听器
}
}
public class AbstractDataSourceProperties {
public void postRegister(AbstractDataSource dataSource) {
switch (this.getRuleType()) {
case FLOW:
FlowRuleManager.register2Property(dataSource.getProperty());
break;
case DEGRADE:
DegradeRuleManager.register2Property(dataSource.getProperty());
break;
case PARAM_FLOW:
ParamFlowRuleManager.register2Property(dataSource.getProperty());
break;
case SYSTEM:
SystemRuleManager.register2Property(dataSource.getProperty());
break;
case AUTHORITY:
AuthorityRuleManager.register2Property(dataSource.getProperty());
break;
case GW_FLOW:
GatewayRuleManager.register2Property(dataSource.getProperty());
break;
case GW_API_GROUP:
GatewayApiDefinitionManager.register2Property(dataSource.getProperty());
break;
default:
break;
}
}
}

NacosDataSourceProperties构造方法中调用超类设置factoryBeanNameNacosDataSourceFactoryBean,故在SentinelDataSourceHandler中的registerBean方法中getBean其实是调用NacosDataSourceFactoryBeangetObject方法创建了一个NacosDataSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NacosDataSourceProperties extends AbstractDataSourceProperties {
public NacosDataSourceProperties() {
super(NacosDataSourceFactoryBean.class.getName());
}
}
public class AbstractDataSourceProperties {
public AbstractDataSourceProperties(String factoryBeanName) {
this.factoryBeanName = factoryBeanName;
}
}
public class NacosDataSourceFactoryBean implements FactoryBean<NacosDataSource> {
public NacosDataSource getObject() throws Exception {
Properties properties = new Properties();
if (!StringUtils.isEmpty(this.serverAddr)) {
properties.setProperty(PropertyKeyConst.SERVER_ADDR, this.serverAddr);
}
else {
properties.setProperty(PropertyKeyConst.ACCESS_KEY, this.accessKey);
properties.setProperty(PropertyKeyConst.SECRET_KEY, this.secretKey);
properties.setProperty(PropertyKeyConst.ENDPOINT, this.endpoint);
}
if (!StringUtils.isEmpty(this.namespace)) {
properties.setProperty(PropertyKeyConst.NAMESPACE, this.namespace);
}
if (!StringUtils.isEmpty(this.username)) {
properties.setProperty(PropertyKeyConst.USERNAME, this.username);
}
if (!StringUtils.isEmpty(this.password)) {
properties.setProperty(PropertyKeyConst.PASSWORD, this.password);
}
return new NacosDataSource(properties, groupId, dataId, converter); // 创建NacosNacosDataSource
}
}

NacosDataSource中给具体的dataId注册了监听器,更变时会回调更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class NacosDataSource<T> extends AbstractDataSource<String, T> {
public NacosDataSource(final Properties properties, final String groupId, final String dataId, Converter<String, T> parser) {
super(parser);
if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
throw new IllegalArgumentException(String.format("Bad argument: groupId=[%s], dataId=[%s]", groupId, dataId));
}
AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst");
this.groupId = groupId;
this.dataId = dataId;
this.properties = properties;
this.configListener = new Listener() { // 创建监听器
@Override
public Executor getExecutor() {
return pool;
}
@Override
public void receiveConfigInfo(final String configInfo) { // 当配置发生变更回调该方法
T newValue = NacosDataSource.this.parser.convert(configInfo); // 解析配置
getProperty().updateValue(newValue); // 通过监听器更新到内存
}
};
initNacosListener(); // 初始化Nacos监听器
loadInitialConfig();
}
private void initNacosListener() {
try {
this.configService = NacosFactory.createConfigService(this.properties);
configService.addListener(dataId, groupId, configListener); // 给具体的dataId注册监听器,配置发生变化时回调
} catch (Exception e) {
e.printStackTrace();
}
}
public String readSource() throws Exception {
if (configService == null) {
throw new IllegalStateException("Nacos config service has not been initialized or error occurred");
}
return configService.getConfig(dataId, groupId, DEFAULT_TIMEOUT);
}
private void loadInitialConfig() {
try {
T newValue = loadConfig();
getProperty().updateValue(newValue);
} catch (Exception ex) {
}
}
}

目前这种方式集成可在Nacos中修改被Sentinel控制台感应到,但在Sentinel控制台修改不能被Nacos感应到。从Sentinel 1.4.0开始,Sentinel控制台提供用于实现应用维度规则推送DynamicRulePublisher拉取DynamicRuleProvider接口,可以通过对源码进行改造通过直接Sentinel控制台与Nacos配置中心通信,可参照Sentinel Dashboard test包下的流控规则拉取和推送的实现逻辑。

Sentinel持久化Push模式

以流控规则为例,通过实现DynamicRuleProviderDynamicRulePublisher,从而实现规则向Nacos配置中心的推送和拉取。在具体的规则修改接口中将原来的推送和拉取规则的地方修改为上面实现的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Component(value = "flowRuleNacosProvider")
public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {
@Autowired
private ConfigService configService;
@Value(value = "${sentinel.nacos.config.refreshTimeoutMs:3000}")
private Integer refreshTimeoutMs;
@Override
public List<FlowRuleEntity> getRules(String appName, String ip, Integer port) throws Exception {
// 从Nacos配置中心拉取配置
String rules = configService.getConfig(
appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
NacosConfigUtil.GROUP_ID, 3000);
if (StringUtil.isEmpty(rules)) {
return new ArrayList<>();
}
List<FlowRule> flowRuleList = JSON.parseArray(rules, FlowRule.class);
return flowRuleList.stream()
.map(rule -> FlowRuleEntity.fromFlowRule(appName, ip, port, rule))
.collect(Collectors.toList());
}
}
@Component("flowRuleNacosPublisher")
public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {
@Autowired
private ConfigService configService;
@Override
public void publish(String app, List<FlowRuleEntity> rules) throws Exception {
AssertUtil.notEmpty(app, "app name cannot be empty");
if (rules == null) {
return;
}
configService.publishConfig(
app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
NacosConfigUtil.GROUP_ID,
NacosConfigUtil.convertToRule(rules));
}
}
@Configuration
public class NacosConfig {
@Value(value = "${sentinel.nacos.config.serverAddr:localhost:8848}")
private String serverAddr;
@Bean
public ConfigService nacosConfigService() throws Exception {
return ConfigFactory.createConfigService(serverAddr);
}
}