Sentinel限流熔断降级源码

客户端使用Sentinel可通过@SentinelResource注解以AOP增强的方式,也可通过SentinelWebInterceptor拦截器的方式。对于AOP增强的方式是通过在spring-cloud-starter-alibaba-sentinel中的spring.factories中配置的SentinelAutoConfiguration配置类导入的SentinelResourceAspect来配置的AOP增强。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelAutoConfiguration {
@Autowired
private SentinelProperties properties;
@PostConstruct
private void init() {
if (properties.isEager()) { // earlier initialize
InitExecutor.doInit(); // 调用SPI扩展进行初始化
}
}
@Bean
@ConditionalOnMissingBean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect(); // 对标注@SentinelResource注解的方法进行增强
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.web.client.RestTemplate")
@ConditionalOnProperty(name = "resttemplate.sentinel.enabled", havingValue = "true", matchIfMissing = true)
public SentinelBeanPostProcessor sentinelBeanPostProcessor(ApplicationContext applicationContext) {
return new SentinelBeanPostProcessor(applicationContext); // 通过@SentinelRestTemplate注解对RestTemplate进行增强处理
}
}

SentinelResourceAspect中其实就是对切点配置,以及Advice方法的配置,对所有标注@SentinelResource注解的方法进行拦截。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() { // 切点,对所有带有@SentinelResource注解的方法进行拦截
}
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); // 获取方法上的@SentinelResource注解
if (annotation == null) {// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
String resourceName = getResourceName(annotation.value(), originMethod); // 获取注解上配置的资源名称
EntryType entryType = annotation.entryType(); // 默认值为OUT
int resourceType = annotation.resourceType(); // 默认值为0
Entry entry = null;
try {// 申请一个entry,若申请成功,则说明没有被限流,否则抛出BlockException表示已被限流
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
Object result = pjp.proceed();
return result;
} catch (BlockException ex) { // 规则校验异常,处理注解属性blockHandler中配置的方法
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
return handleFallback(pjp, annotation, ex); // 处理抛出的业务异常,处理fallback方法
}
throw ex; // No fallback function can handle the exception, so throw it out.
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}

通过SentinelWebInterceptor拦截器的方式是在spring-cloud-starter-alibaba-sentinel中的spring.factories中配置的SentinelWebAutoConfiguration配置类来完成的,该配置类实现了WebMvcConfigurer,容器启动时向Web拦截器链中添加了SentinelWebInterceptor拦截器来实现Sentinel的集成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = Type.SERVLET)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@ConditionalOnClass(SentinelWebInterceptor.class)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration implements WebMvcConfigurer {
@Autowired
private Optional<SentinelWebInterceptor> sentinelWebInterceptorOptional;
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true)
public SentinelWebInterceptor sentinelWebInterceptor(SentinelWebMvcConfig sentinelWebMvcConfig) {
return new SentinelWebInterceptor(sentinelWebMvcConfig);
}
public void addInterceptors(InterceptorRegistry registry) { // 容器启动时调用该方法
if (!sentinelWebInterceptorOptional.isPresent()) {
return; // 获取SentinelWebInterceptor,若为空则直接返回
}
SentinelProperties.Filter filterConfig = properties.getFilter(); // filterConfig.getUrlPatterns()获取的结果默认置为/**通配符,即所有路径
registry.addInterceptor(sentinelWebInterceptorOptional.get()).order(filterConfig.getOrder()).addPathPatterns(filterConfig.getUrlPatterns());
}
}

SentinelWebInterceptor拦截器默认为/**拦截所有路径,最终会调用超类AbstractSentinelInterceptorpreHandle以及afterCompletion方法来完成Sentinel相关的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class SentinelWebInterceptor extends AbstractSentinelInterceptor {
private final SentinelWebMvcConfig config;
public SentinelWebInterceptor(SentinelWebMvcConfig config) {
super(config);
if (config == null) { // Use the default config by default.
this.config = new SentinelWebMvcConfig();
} else {
this.config = config;
}
}
protected String getResourceName(HttpServletRequest request) { // Resolve the Spring Web URL pattern from the request attribute.
Object resourceNameObject = request.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (resourceNameObject == null || !(resourceNameObject instanceof String)) {
return null;
}
String resourceName = (String) resourceNameObject;
UrlCleaner urlCleaner = config.getUrlCleaner();
if (urlCleaner != null) {
resourceName = urlCleaner.clean(resourceName);
}
if (StringUtil.isNotEmpty(resourceName) && config.isHttpMethodSpecify()) { // Add method specification if necessary
resourceName = request.getMethod().toUpperCase() + ":" + resourceName;
}
return resourceName;
}
protected String getContextName(HttpServletRequest request) {
if (config.isWebContextUnify()) {
return super.getContextName(request);
}
return getResourceName(request);
}
}
public abstract class AbstractSentinelInterceptor implements HandlerInterceptor {
public static final String SENTINEL_SPRING_WEB_CONTEXT_NAME = "sentinel_spring_web_context";
private static final String EMPTY_ORIGIN = "";
private final BaseWebMvcConfig baseWebMvcConfig;
public AbstractSentinelInterceptor(BaseWebMvcConfig config) {
this.baseWebMvcConfig = config;
}
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
try {
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
String origin = parseOrigin(request); // Parse the request origin using registered origin parser.
String contextName = getContextName(request);
ContextUtil.enter(contextName, origin);
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try { // 处理注解属性blockHandler方法,当抛出BlockException规则校验异常时会被调用
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), -1) != 0) {
return;
}
Entry entry = getEntryInRequest(request, baseWebMvcConfig.getRequestAttributeName());
if (entry == null) { // should not happen
return;
}
traceExceptionAndExit(entry, ex); // 在该方法中执行entry.exit()
removeEntryInRequest(request);
ContextUtil.exit();
}
protected String parseOrigin(HttpServletRequest request) {
String origin = EMPTY_ORIGIN;
if (baseWebMvcConfig.getOriginParser() != null) {
origin = baseWebMvcConfig.getOriginParser().parseOrigin(request);
if (StringUtil.isEmpty(origin)) {
return EMPTY_ORIGIN;
}
}
return origin;
}
protected String getContextName(HttpServletRequest request) {
return SENTINEL_SPRING_WEB_CONTEXT_NAME;
}
}

具体调用逻辑

首先获取规则校验链,规则链是根据其上的@SpiOrder注解中定义的值进行排序的,然后逐个调用Slot校验链中的每个校验规则的entry逻辑,若调用过程抛出BlockException异常,则逐个调用Slot校验链中的每一个校验规则的退出exit逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SphU {
public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
}
}
public class CtSph implements Sph {
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException {
return entryWithType(name, resourceType, entryType, count, false, args);
}
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
return entryWithPriority(resource, count, prioritized, args); // prioritized固定传入的false,count固定传入1
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) { // Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
if (!Constants.ON) { // 若全局规则开关是关闭的,则不做规则校验
return new CtEntry(resourceWrapper, null, context);
}
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); // 获取规则校验链,规则链是根据其上的@SpiOrder进行排序的
if (chain == null) { // 若规则校验链为null,则不做规则校验
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try { // 逐个调用Slot校验链条中的每个校验规则的entry逻辑
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args); // 逐个调用slot校验链条中的每一个校验规则的退出exit逻辑
throw e1;
} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
}

校验规则链的加载是通过SPI的方式加载SlotChainBuilder接口的实现类,这里默认加载DefaultSlotChainBuilder,在该类的build方法中又通过SPI的方式加载了一系列的ProcessorSlot接口的实现类,且这些校验规则的实现类是根据类上@SpiOrder注解中定义的值来进行排队,且值越小越排在前面执行,若无该注解则默认其排序值为Integer.MAX_VALUE,即排在最后面。校验规则的优先级从高到低NodeSelectorSlotClusterBuilderSlotLogSlotStatisticSlotAuthoritySlotSystemSlotFlowSlotDegradeSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { // Entry size limit.
return null; // 若chainMap中的规则链数量超过了6000
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// 通过SPI机制加载Slot Chain加载sentinel-core包下的META-INF/services中对应的文件里的类
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
}
return slotChainBuilder.build(); // 调用DefaultSlotChainBuilder的build方法
}
}
public class DefaultSlotChainBuilder implements SlotChainBuilder {
public ProcessorSlotChain build() { // 构建资源的slot校验链条,每个资源都有自己独立的校验链条,类似Netty的pipeline
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
continue; // 若ProcessorSlot不是AbstractLinkedProcessorSlot的子类则跳过
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot); // 按照顺序加入到ProcessorSlotChain中
}
return chain;
}
}
public final class SpiLoader {
public static <T> List<T> loadPrototypeInstanceListSorted(Class<T> clazz) {
try {
ServiceLoader<T> serviceLoader = ServiceLoaderUtil.getServiceLoader(clazz);
List<SpiOrderWrapper<T>> orderWrappers = new ArrayList<>();
for (T spi : serviceLoader) {
int order = SpiOrderResolver.resolveOrder(spi);
SpiOrderResolver.insertSorted(orderWrappers, spi, order); // 对加载的Slot进行排序,越小越排在前面
}
List<T> list = new ArrayList<>(orderWrappers.size());
for (int i = 0; i < orderWrappers.size(); i++) {
list.add(orderWrappers.get(i).spi);
}
return list;
} catch (Throwable t) {
t.printStackTrace();
return new ArrayList<>();
}
}
private static class SpiOrderResolver {
private static <T> void insertSorted(List<SpiOrderWrapper<T>> list, T spi, int order) {
int idx = 0;
for (; idx < list.size(); idx++) {
if (list.get(idx).getOrder() > order) {
break; // 根据@SpiOrder注解中配置的值来进行比较排序,越小越排在前面
}
}
list.add(idx, new SpiOrderWrapper<>(order, spi));
}

private static <T> int resolveOrder(T spi) {
if (!spi.getClass().isAnnotationPresent(SpiOrder.class)) {
return SpiOrder.LOWEST_PRECEDENCE;
} else {
return spi.getClass().getAnnotation(SpiOrder.class).value();
}
}
}
}

对于校验链条的调用首先通过DefaultProcessorSlotChainentry方法调用AbstractLinkedProcessorSlottransformEntry方法在调用具体实现。

1
2
3
4
5
6
7
8
9
10
11
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
}

首先调用NodeSelectorSlot校验规则,该规则负责收集资源路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。通过fireEntry触发下一规则调用。然后调用ClusterBuilderSlot集群校验规则LogSlot规则很简单其实就是打印日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
// 负责收集资源路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
((DefaultNode) context.getLastNode()).addChild(node); // Build invocation tree
}

}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
@SpiOrder(-9000)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private volatile ClusterNode clusterNode = null;
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) { // Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
@SpiOrder(-8000)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args) throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(), context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
}

接下来是稍微复杂一点的StatisticSlot,用于存储资源统计信息以及调用者信息,例如该资源的RTQPSthreadCount等等,这些信息将作为多维度限流降级的依据。其主要是在StatisticNode类中实现了滑动窗口算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
try { // 用于存储资源的统计信息以及调用者信息,例如该资源的RT、QPS、threadCount等等,这些信息将作为多维度限流降级的依据
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args); // 触发下移规则调用
// Request passed, add thread count and pass count.
node.increaseThreadNum(); // 当前调用线程数加一
node.addPassRequest(count); // 增加规则校验通过调用数,调用滑动时间窗口计数算法请求
if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) { // Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
node.increaseBlockQps(count); // 增加被规则限流的调用数
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) { // Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
}

StatisticNode中定义了两个滑动时间窗口,一个是秒级别的滑动时间窗口rollingCounterInSecond,有两个窗格每个窗格500ms分钟级别的滑动时间窗口rollingCounterInMinute,有60个窗格,每个窗格1s。两个时间窗口底层都是通过ArrayMetric来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class StatisticNode implements Node {
// 初始化一个跨度为1000ms包含两个500ms时间窗口对象rollingCounterInSecond,初始化时两个小窗口为空
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
// 初始化一个跨度为一分钟包含60个1s时间窗口对象rollingCounterInMinute,初始化时每个小窗口为空
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
private LongAdder curThreadNum = new LongAdder();
private long lastFetchTime = -1;
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count); // 将通过的请求添加到秒级时间窗口中
rollingCounterInMinute.addPass(count); // 将通过的请求添加到分钟级时间窗口中
}
}
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) { // sampleCount为2,intervalInMs为1000ms
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else { // 默认走下面这个分支,sampleCount默认为60,intervalInMs默认为60 * 1000
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
}
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
}
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
protected final AtomicReferenceArray<WindowWrap<T>> array;
public LeapArray(int sampleCount, int intervalInMs) { // 初始化一个跨度为1000ms包含两个500ms时间窗口对象,初始化时两个小窗口为空
this.windowLengthInMs = intervalInMs / sampleCount; // 时间窗口长度为500ms
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount); // 数组长度为2,放时间窗口的数组
}
}

通过LeapArraycurrentWindow方法根据当前时间获取当前对应的窗格,首先根据当前时间计算出当前请求应该处于的窗格下标,再根据当前时间计算出当前窗格起始时间。从时间窗口数组中获取根据当前时间计算的下标对应的old时间窗口,若old时间窗口为空,则新建一个时间窗口,并放入时间窗口数组中并返回创建的时间窗格;若old时间窗口起始时间跟依据当前时间计算出的时间窗口起始时间相同,则当前时间应该落在old时间窗口内,直接返回old时间窗;若依据当前时间算出的时间窗口起始时间大于old时间窗口的起始时间,则将old时间窗口重置,变为当前时间应该落入的时间窗口。相当于实现了一个循环数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class StatisticNode implements Node {
private final LeapArray<MetricBucket> data;
public void addPassRequest(int count) {
// 初始化一个跨度为1000ms包含两个500ms时间窗口对象rollingCounterInSecond,初始化时两个小窗口为空
rollingCounterInSecond.addPass(count);
// 初始化一个跨度为一分钟包含60个1s时间窗口对象rollingCounterInMinute,初始化时每个小窗口为空
rollingCounterInMinute.addPass(count);
}
}
public class ArrayMetric implements Metric {
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow(); // 根据当前时间定位到具体的窗格
wrap.value().addPass(count); // 将具体的窗格的具体指标加一
}
}
public abstract class LeapArray<T> {
public WindowWrap<T> currentWindow() { // 根据当前时间定位到具体的窗格
return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis); // 根据当前时间计算当前计数应该落在哪个小窗格中
long windowStart = calculateWindowStart(timeMillis); // 计算当前时间窗口的起始时间位置
while (true) {
WindowWrap<T> old = array.get(idx); // 从时间窗口数组中获取一句当前时间计算的下标对应的old时间窗口
if (old == null) { // 若old时间窗口为空,则新建一个时间窗口,并放入时间窗口数组中
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old; // 若old时间窗口起始时间跟依据当前时间计算出的时间窗口起始时间相同,则当前时间应该落在old时间窗口内,直接返回old时间窗
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try { // 若依据当前时间算出的时间窗口起始时间大于old时间窗口的起始时间,则将old时间窗口重置,变为当前时间应该落入的时间窗口
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) { // 不会出现,除非出现时钟回拨
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs; // 秒级windowLengthInMs为500ms,分钟级windowLengthInMs为1000ms
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs; // 秒级windowLengthInMs为500ms,分钟级windowLengthInMs为1000ms
}
}

在统计计数时根据请求调用结果事件类型创建一个数组,不同的类型将计数加到其对应的数组下标的LongAdder中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt; // 默认值为5000
public MetricBucket() {
MetricEvent[] events = MetricEvent.values(); // 根据所有的状态类型创建一个数组
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
public void addPass(int n) {
add(MetricEvent.PASS, n); // 增加时间窗口中的请求通过数
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n); // 时间窗口中请求数据统计实际放在一个counter数组中,不停的下标代表不同的数据统计指标
return this;
}
}
public enum MetricEvent {
PASS, // 通过所有校验规则
BLOCK, // 未通过校验规则,抛出BlockException的调用
EXCEPTION, // 发生业务异常的调用
SUCCESS, // 调用完成的情况,不管是否抛出异常
RT, // 所有的SUCCESS调用耗时的总时间
OCCUPIED_PASS
}

授权规则也相对简单其实就是对配置的黑白名单进行校验,判断若授权规则为黑名单请求者包含在黑名单中,或若授权规则为白名单请求者不包含在白名单中则抛出授权规则异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@SpiOrder(-6000)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context); // 校验资源授权规则,黑名单白名单规则校验
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) { // 授权规则遍历
if (!AuthorityRuleChecker.passCheck(rule, context)) {
// 若授权规则为黑名单且请求者包含在黑名单中,或若授权规则为白名单且请求者不包含在白名单中
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
}
final class AuthorityRuleChecker {
static boolean passCheck(AuthorityRule rule, Context context) {
String requester = context.getOrigin();
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true; // Empty origin or empty limitApp will pass.
}
int pos = rule.getLimitApp().indexOf(requester); // Do exact match with origin name.
boolean contain = pos > -1;
if (contain) {
boolean exactlyMatch = false;
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
int strategy = rule.getStrategy();
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false; // 若授权规则为黑名单且请求者包含在黑名单中
}
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false; // 若授权规则为白名单且请求者不包含在白名单中
}
return true;
}
}

SystemSlot系统规则QPSRT线程数都是使用统计规则StatisticSlot中统计的数据,然后进行了相关的计算判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@SpiOrder(-5000)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
public final class SystemRuleManager {
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
if (!checkSystemStatus.get()) { // Ensure the checking switch is on.
return; // 若系统校验资源规则开关是关闭的
}
if (resourceWrapper.getEntryType() != EntryType.IN) { // for inbound traffic only
return; // 系统资源校验规则只对EntryType.IN入模式生效
}
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) { // total qps
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) { // total thread
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) { // load. BBR algorithm.
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) { // cpu usage
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
}

FlowSlot流控规则首先获取所有该资源配置的流控规则,若未配置直接返回,否则根据流控模式获取具体的目标资源,然后通过canPass方法调用具体的流控效果类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) { // Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();// 获取所有配置的流控规则列表
return flowRules.get(resource);// 获取该资源相关的流控规则
}
};
}
public class FlowRuleChecker {
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); // 获取资源相关的校验规则
if (rules != null) {
for (FlowRule rule : rules) { // 对资源逐条校验每个规则
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) { // 集群限流规则
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized); // 单机限流规则
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
String limitApp = rule.getLimitApp(); // The limit app should not be empty.
int strategy = rule.getStrategy();
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) { // 流控模式:直接
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) { // 流控模式:关联
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) { // 流控模式:链路
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
if (strategy == RuleConstant.STRATEGY_RELATE) { // 流控模式:关联
return ClusterBuilderSlot.getClusterNode(refResource);
}
if (strategy == RuleConstant.STRATEGY_CHAIN) { // 流控模式:链路
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
return null; // No node.
}
}

直接失败流控效果相对简单,其实现类是DefaultController,直接从当前秒级时间窗口遍历所有小窗格,获取通过所有校验规则请求数,即QPS数,若当前QPS加请求数大于配置的值,则返回false直接抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DefaultController implements TrafficShapingController {
public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 快速失败
int curCount = avgUsedTokens(node); // 从当前时间窗口中取统计指标数据
if (curCount + acquireCount > count) { // 若当前qps大于count阈值返回false,校验不通过
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { // prioritized一般传入的false
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
// 若阈值类型是QPS则返回QPS若是线程数则返回当前线程数
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
}

预热流控效果使用的是令牌桶算法实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class WarmUpController implements TrafficShapingController {
protected double count;
private int coldFactor;
protected int warningToken = 0;
private int maxToken;
protected double slope;
protected AtomicLong storedTokens = new AtomicLong(0);
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval. warningToken = 100;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval) maxToken = 200
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 通过令牌桶算法来完成预热
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率,如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢,current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判断前提条件: 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}

排队等待流控效果是使用漏桶算法实现,Sentinel是根据配置的QPS计算出请求的间隔时间,然后根据间隔时间来计算请求是被直接拒绝还是休眠等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class RateLimiterController implements TrafficShapingController {
private final int maxQueueingTimeMs;
private final double count;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 使用漏桶算法
if (acquireCount <= 0) { // Pass when acquire count is less or equal than 0.
return true;
}
if (count <= 0) { // Reject when count is less or equal than 0.
return false; // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
}
long currentTime = TimeUtil.currentTimeMillis();
long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 根据配置的QPS计算每两个请求之间的间隔
long expectedTime = costTime + latestPassedTime.get(); // 此请求的预期通过时间
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); // 计算等待时间
if (waitTime > maxQueueingTimeMs) { // 若等待时间超过超时时间,则直接拒绝
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) { // in race condition waitTime may <= 0
Thread.sleep(waitTime); // 休眠等待时长的时间
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}

资源降级规则熔断器开关关闭状态直接通过,若是打开状态首先判断时间是否达到了熔断结束时间,若达到了则将开关置为半开状态,放一个请求去执行,若请求执行失败则继续将断路器开关置为打开状态,在exit操作时会计算失败比例,然后判断更新断路器状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
}
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
public boolean tryPass(Context context) {
if (currentState.get() == State.CLOSED) { // Template implementation.
return true; // 若熔断器开关是关闭状态
}
if (currentState.get() == State.OPEN) { // 若熔断器开关是打开状态
return retryTimeoutArrived() && fromOpenToHalfOpen(context); // 对于半开状态,允许请求探测
}
return false;
}
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
if (entry.getBlockError() != null) { // 若依然请求失败,将断路器继续打开
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}
protected boolean fromHalfOpenToOpen(double snapshotValue) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
updateNextRetryTimestamp(); // 只要断路器再次被打开,就会按配置的熔断时长更新断路器的熔断周期
notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
return true;
}
return false;
}
protected boolean fromHalfOpenToClose() {
if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
resetStat(); // 重置慢调用和中调用统计次数为0
notifyObservers(State.HALF_OPEN, State.CLOSED, null);
return true;
}
return false;
}
protected void transformToOpen(double triggerValue) {
State cs = currentState.get();
switch (cs) {
case CLOSED:
fromCloseToOpen(triggerValue);
break;
case HALF_OPEN:
fromHalfOpenToOpen(triggerValue);
break;
default:
break;
}
}
}

Sentinel中断路器有根据异常数及比例降级ExceptionCircuitBreaker,和慢调用比例降级ResponseTimeCircuitBreaker两种实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { // 调用异常数及比例判断逻辑
public void onRequestComplete(Context context) { // 调用异常数及比例判断逻辑
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}

private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
if (error == null) { // In detecting request
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
curCount = errCount * 1.0d / totalCount; // Use errorRatio
}
if (curCount > threshold) {
transformToOpen(curCount);
}
}
}
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
public void onRequestComplete(Context context) { // 慢调用比例降级判断逻辑
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
counter.slowCount.add(1); // 若请求响应时间超过最大RT时间,把慢调用次数加一
}
counter.totalCount.add(1); // 总调用次数加一
handleStateChangeWhenThresholdExceeded(rt); // 调用完处理断路器状态
}
private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return; // 若断路器是打开状态直接返回
}
if (currentState.get() == State.HALF_OPEN) { // 若断路器是搬开状态
if (rt > maxAllowedRt) { // 断路器半开状态一般都是尝试调用一次请求,若这次调用响应时间依然超过最大RT阈值则将断路器打开
fromHalfOpenToOpen(1.0d);
} else { // 若这次调用响应时间没有超过最大RT阈值则将断路器关闭
fromHalfOpenToClose();
}
return;
}
// 走到这里说明断路器是关闭状态的
List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
slowCount += counter.slowCount.sum(); // 时间窗口内慢调用次数中总和
totalCount += counter.totalCount.sum(); // 时间窗口内总调用次数中总和
}
if (totalCount < minRequestAmount) {
return; // 若总调用次数小于配置最小请求数,则不修改断路器状态,直接返回
}
double currentRatio = slowCount * 1.0d / totalCount; // 计算慢调用比例
if (currentRatio > maxSlowRequestRatio) {
transformToOpen(currentRatio); // 若慢调用比例超过配置阈值则将断路器打开
}
}
}