Sentinel规则发布源码

Sentinel规则发布是通过Dashboard模块的FlowControllerV1apiAddFlowRule方法,首先将数据保存到服务端缓存中,若扩展了持久化可进行先关的持久化,然后和sentinel客户端通信,异步设置流控规则将数据推送给客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@RestController
@RequestMapping(value = "/v1/flow")
public class FlowControllerV1 {
@Autowired
private InMemoryRuleRepositoryAdapter<FlowRuleEntity> repository;
@Autowired
private SentinelApiClient sentinelApiClient;
@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
if (checkResult != null) {
return checkResult;
}
entity.setId(null);
Date date = new Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
entity.setLimitApp(entity.getLimitApp().trim());
entity.setResource(entity.getResource().trim());
try {
entity = repository.save(entity); // 控制台保存规则,持久化扩展点RuleRepository
publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
return Result.ofSuccess(entity);
} catch (Throwable t) {
Throwable e = t instanceof ExecutionException ? t.getCause() : t;
return Result.ofFail(-1, e.getMessage());
}
}
}
public abstract class InMemoryRuleRepositoryAdapter<T extends RuleEntity> implements RuleRepository<T, Long> {
// 通过客户端服务名称、IP、端口以及服务端生成的ID来嵌套映射存储发布的规则
private Map<MachineInfo, Map<Long, T>> machineRules = new ConcurrentHashMap<>(16);
// 通过服务端生成的ID映射存储发布的规则
private Map<Long, T> allRules = new ConcurrentHashMap<>(16);
// 通过客户端服务名称映射存储发布的规则
private Map<String, Map<Long, T>> appRules = new ConcurrentHashMap<>(16);
public T save(T entity) {
if (entity.getId() == null) {
entity.setId(nextId());
}
T processedEntity = preProcess(entity);
if (processedEntity != null) {
allRules.put(processedEntity.getId(), processedEntity);
machineRules.computeIfAbsent(MachineInfo.of(processedEntity.getApp(), processedEntity.getIp(), processedEntity.getPort()),
e -> new ConcurrentHashMap<>(32)).put(processedEntity.getId(), processedEntity);
appRules.computeIfAbsent(processedEntity.getApp(), v -> new ConcurrentHashMap<>(32)).put(processedEntity.getId(), processedEntity);
}

return processedEntity;
}
public List<T> findAllByMachine(MachineInfo machineInfo) {
Map<Long, T> entities = machineRules.get(machineInfo);
if (entities == null) {
return new ArrayList<>();
}
return new ArrayList<>(entities.values());
}
}
public class SentinelApiClient {
private static final String SET_RULES_PATH = "setRules";
private CompletableFuture<Void> publishRules(String app, String ip, Integer port) {
List<FlowRuleEntity> rules = repository.findAllByMachine(MachineInfo.of(app, ip, port));
return sentinelApiClient.setFlowRuleOfMachineAsync(app, ip, port, rules); // 和sentinel客户端通信,异步设置流控规则
}
public CompletableFuture<Void> setFlowRuleOfMachineAsync(String app, String ip, int port, List<FlowRuleEntity> rules) {
return setRulesAsync(app, ip, port, FLOW_RULE_TYPE, rules);
}
private CompletableFuture<Void> setRulesAsync(String app, String ip, int port, String type, List<? extends RuleEntity> entities) {
try {
AssertUtil.notNull(entities, "rules cannot be null");
AssertUtil.notEmpty(app, "Bad app name");
AssertUtil.notEmpty(ip, "Bad machine IP");
AssertUtil.isTrue(port > 0, "Bad machine port");
String data = JSON.toJSONString(entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
Map<String, String> params = new HashMap<>(2);
params.put("type", type);
params.put("data", data);
return executeCommand(app, ip, port, SET_RULES_PATH, params, true).thenCompose(r -> {
if ("success".equalsIgnoreCase(r.trim())) {
return CompletableFuture.completedFuture(null);
}
return AsyncUtils.newFailedFuture(new CommandFailedException(r));
});
} catch (Exception e) {
return AsyncUtils.newFailedFuture(e);
}
}
private CompletableFuture<String> executeCommand(String app, String ip, int port, String api, Map<String, String> params, boolean useHttpPost) {
CompletableFuture<String> future = new CompletableFuture<>();
if (StringUtil.isBlank(ip) || StringUtil.isBlank(api)) {
future.completeExceptionally(new IllegalArgumentException("Bad URL or command name"));
return future;
}
StringBuilder urlBuilder = new StringBuilder(); // 拼接客户端推送规则配置的URL
urlBuilder.append("http://");
urlBuilder.append(ip).append(':').append(port).append('/').append(api);
if (params == null) {
params = Collections.emptyMap();
}
if (!useHttpPost || !isSupportPost(app, ip, port)) {// Using GET in older versions, append parameters after url
if (!params.isEmpty()) {
if (urlBuilder.indexOf("?") == -1) {
urlBuilder.append('?');
} else {
urlBuilder.append('&');
}
urlBuilder.append(queryString(params));
}
return executeCommand(new HttpGet(urlBuilder.toString()));
} else {// Using POST
return executeCommand(postRequest(urlBuilder.toString(), params, isSupportEnhancedContentType(app, ip, port)));
}
}
}

客户端接收数据是Sentinel通过SPI的方式在启动时加载实现了InitFunc接口的类,从而加载SPI接口CommandCenter从而加载SimpleHttpCommandCenter来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public final class InitExecutor {
private static AtomicBoolean initialized = new AtomicBoolean(false);
private static AtomicBoolean initialized = new AtomicBoolean(false);
public static void doInit() { // Sentinel扩展点,通过SPI的方式加载实现了InitFunc接口的类
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
}
} catch (Exception ex) {
ex.printStackTrace();
} catch (Error error) {
error.printStackTrace();
}
}
private static void insertSorted(List<OrderWrapper> list, InitFunc func) {
int order = resolveOrder(func); // 获取排序,若该类上没有@InitOrder注解则默认是最低优先级即Integer.MAX_VALUE
int idx = 0;
for (; idx < list.size(); idx++) {
if (list.get(idx).getOrder() > order) {
break;
}
}
list.add(idx, new OrderWrapper(order, func)); // 将其按照order从大到小即优先级越低越排在前面的顺序放入List中
}
private static int resolveOrder(InitFunc func) {
if (!func.getClass().isAnnotationPresent(InitOrder.class)) {
return InitOrder.LOWEST_PRECEDENCE;
} else {
return func.getClass().getAnnotation(InitOrder.class).value();
}
}
}

CommandCenter接口也是通过SPI方式加载目前有两个实现SimpleHttpCommandCenterNettyHttpCommandCenter。且默认是使用SimpleHttpCommandCenter,首先会调用其beforeStart方法加载遍历实现了SPI接口CommandHandler的所有类,并解析这些实现类上@CommandMapping注解的name属性作为路由使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@InitOrder(-1)
public class CommandCenterInitFunc implements InitFunc {
@Override
public void init() throws Exception {
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
if (commandCenter == null) {
return;
}
commandCenter.beforeStart();
commandCenter.start();
}
}
public class SimpleHttpCommandCenter implements CommandCenter {
public void beforeStart() throws Exception { // Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers); // 注册所有实现了CommandHandler的SPI接口
}
}
public class CommandHandlerProvider implements Iterable<CommandHandler> {
private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoaderUtil.getServiceLoader(CommandHandler.class);
public Map<String, CommandHandler> namedHandlers() {
Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
for (CommandHandler handler : serviceLoader) {
String name = parseCommandName(handler); // 解析@CommandMapping注解中配置的接口名称
if (!StringUtil.isEmpty(name)) {
map.put(name, handler);
}
}
return map;
}
private String parseCommandName(CommandHandler handler) {
CommandMapping commandMapping = handler.getClass().getAnnotation(CommandMapping.class);
if (commandMapping != null) {
return commandMapping.name();
} else {
return null;
}
}
}

然后调用SimpleHttpCommandCenterstart方法中异步启动ServerThread线程开启Socket服务端监听,当监听接收到数据后通过HttpEventTask异步任务解析请求。且监听端口是从8719开始,若被占用则依次向上加一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class SimpleHttpCommandCenter implements CommandCenter {
private static final int PORT_UNINITIALIZED = -1;
private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8719;
private ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("sentinel-command-center-executor"));
private ExecutorService bizExecutor;
private ServerSocket socketReference;
public void beforeStart() throws Exception { // Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers); // 注册所有实现了CommandHandler的SPI接口
}
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), new NamedThreadFactory("sentinel-command-center-service-executor"), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
}
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
}
}
try {// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {// Indicates the task should stop.
break;
}
}
}
}
}

HttpEventTask中首先解析参数,然后通过从在SimpleHttpCommandCenterbeforeStart方法中解析的路由中匹配到具体的CommandHandler接口实现类,并调用其handle方法完成客户端规则推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class HttpEventTask implements Runnable {
public void run() {
if (socket == null) {
return;
}
PrintWriter printWriter = null;
InputStream inputStream = null;
try {
long start = System.currentTimeMillis();
inputStream = new BufferedInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String firstLine = readLine(inputStream);
CommandRequest request = processQueryString(firstLine); // 参数解析
if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {// Deal with post method
processPostRequest(inputStream, request); // POST请求参数解析
}
// Validate the target command.
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter);
} else { // No matching command handler.
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
}
long cost = System.currentTimeMillis() - start;
} catch (RequestException e) {
writeResponse(printWriter, e.getStatusCode(), e.getMessage());
} catch (Throwable e) {
try {
if (printWriter != null) {
String errorMessage = SERVER_ERROR_MESSAGE;
e.printStackTrace();
if (!writtenHead) {
writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
} else {
printWriter.println(errorMessage);
}
printWriter.flush();
}
} catch (Exception e1) {
}
} finally {
closeResource(inputStream);
closeResource(printWriter);
closeResource(socket);
}
}
}

对于规则的设置是通过ModifyRulesCommandHandler来完成的,不同的规则设置根据规则类型走不同的逻辑,都是通过对应规则ManagerloadRules方法将规则加载到内存中,然后通过writeToDataSource方法来实现持久化。也可以通过该处对持久化功能进行扩展。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")
public class ModifyRulesCommandHandler implements CommandHandler<String> {
private static final int FASTJSON_MINIMAL_VER = 0x01020C00;
@Override
public CommandResponse<String> handle(CommandRequest request) {
if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) {// fastjson too old
return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION + "\" introduced in application is too old, you need fastjson-1.2.12 at least."));
}
String type = request.getParam("type");
String data = request.getParam("data"); // rule data in get parameter
if (StringUtil.isNotEmpty(data)) {
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
return CommandResponse.ofFailure(e, "decode rule data error");
}
}
String result = "success";
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
FlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(getFlowDataSource(), flowRules)) { // 获取写数据源,做持久化
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
private <T> boolean writeToDataSource(WritableDataSource<T> dataSource, T value) {
if (dataSource != null) {
try {
dataSource.write(value);
} catch (Exception e) {
return false;
}
}
return true;
}
}

使用Sentinel时一定要配置暴露actuator端点,否则可能在dashboard上找不到对应的服务

1
2
3
4
5
6
# 暴露actuator端点
management:
endpoints:
web:
exposure:
include: '*'