Gateway源码

网关作为流量的入口,常用的功能包括路由转发权限校验限流等,Spring Cloud Gateway是Spring Cloud官方推出的由WebFlux + Netty + Reactor实现的响应式第二代API网关框架,定位于取代Netflix Zuul。

Spring Cloud Gateway的核心概念:路由Route断言过滤器。路由是网关中最基础的部分,路由信息包括一个ID、一个目的URI、一组断言工厂、一组Filter组成,若断言为真则说明请求的URL和配置的路由匹配;Spring Cloud Gateway中的断言函数类型是Spring5.0框架中的ServerWebExchange,允许开发者去定义匹配Http Request中的任何信息,如请求头和参数等;Spring Cloud Gateway的过滤器分为GatewayFilIerGlobalFilter可对请求和响应进行处理

Spring Cloud Gateway工作原理跟Zuul的差不多,最大的区别就是Gateway的Filter只有prepost两种,客户端向Spring Cloud Gateway发出请求,若请求与网关定义的路由匹配,则该请求会被发送到网关Web处理程序,此时处理程序运行特定的请求过滤器链。过滤器之间用虚线分开的原因是过滤器可能会在发送代理请求的前后执行逻辑。所有pre过滤器逻辑先执行,然后执行代理请求;代理请求完成后执行post过滤器逻辑

Gateway对请求处理的核心逻辑是在DispatcherHandler中,在DispatcherHandler中依次调用HandlerMappingHandlerAdapterHandlerResultHandler三个核心接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange)) // 获取具体的HandlerMapping,这里返回FilteringWebHandler
.next()
.switchIfEmpty(createNotFoundError()) // 若路由断言匹配未匹配到,则返回Empty,这里对Empty进行处理
.flatMap(handler -> invokeHandler(exchange, handler)) // 调用具体的HandlerAdapter的handle
.flatMap(result -> handleResult(exchange, result));
}
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]")
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exResult -> {
String text = "Exception handler " + exResult.getHandler() + ", error=\"" + ex.getMessage() + "\" [DispatcherHandler]";
return getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text);
}));
}
private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
if (this.resultHandlers != null) {
for (HandlerResultHandler resultHandler : this.resultHandlers) {
if (resultHandler.supports(handlerResult)) {
return resultHandler;
}
}
}
throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}
private <R> Mono<R> createNotFoundError() {
return Mono.defer(() -> {
Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND, "No matching handler");
return Mono.error(ex);
});
}
}

HandlerMapping

HandlerMapping负责路径到Handler的映射,Gateway中RoutePredicateHandlerMapping实现了AbstractHandlerMapping,其作用是执行所有的Route的断言工厂PredicateFactory匹配路由信息通过断言判断路由是否可用,且将路由信息绑定到请求上下文中,最终返回FilteringWebHandler

也可自定义断言工厂需继承AbstractRoutePredicateFactory类重写apply方法的逻辑。在apply方法中可以通过exchange.getRequest()拿到ServerHttpRequest对象,从而可获取到请求的参数、请求方式、请求头等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class AbstractHandlerMapping extends ApplicationObjectSupport implements HandlerMapping, Ordered, BeanNameAware {
public Mono<Object> getHandler(ServerWebExchange exchange) {
return getHandlerInternal(exchange).map(handler -> {
ServerHttpRequest request = exchange.getRequest();
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) { // 处理跨域问题
CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return REQUEST_HANDLED_HANDLER;
}
}
return handler;
});
}
}

首先通过lookupRoute方法找出所有与当前请求匹配的Route,在匹配之前从RouteLocator的实现类CachingRouteLocator中已经转换好的Route,在应用启动时会通过RouteLocator的实现类RouteDefinitionRouteLocator通过PropertiesRouteDefinitionLocatorGatewayProperties中读取路由配置RouteDefinition且将其转换为Route并缓存到CachingRouteLocator中。除此之外若在DiscoveryClientRouteDefinitionLocator会获取集群中所有的实例并将其构建成RouteDefinition,最终转换并合并到CachingRouteLocator中。

lookupRoute中通过遍历所有的Route,并遍历调用其具体的PredicateFactorytest方法,过滤出其test方法放回trueroute。然后将匹配的路由绑定到请求上下文中。最终返回FilteringWebHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
if (this.managementPortType == DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange).flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); // 将匹配的路由绑定到请求上下文中,以便FilteringWebHandler的handle方法中使用
return Mono.just(webHandler); // 最终返回FilteringWebHandler
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { // 未找到匹配的路由
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
})));
}
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes().concatMap(route -> Mono.just(route).filterWhen(r -> {
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange); // 调用具体的PredicateFactory的test方法,过滤出test方法放回true的route
}).doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e)).onErrorResume(e -> Mono.empty()))
.next()
.map(route -> {
validateRoute(route, exchange);
return route;
});
}
}

HandlerAdapter调用

具体的HandlerAdapter的调用,在DelegatingWebFluxConfiguration配置类的超类WebFluxConfigurationSupport中注入了SimpleHandlerAdapter。而FilteringWebHandlerWebHandler的子类。在SimpleHandlerAdapterhandle方法中调用FilteringWebHandlerhandle方法。由于SimpleHandlerAdapter返回的是Mono.empty()故不会触发handleResult方法。

1
2
3
4
5
6
7
8
9
10
11
12
public class SimpleHandlerAdapter implements HandlerAdapter {
@Override
public boolean supports(Object handler) {
return WebHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
Mono<Void> mono = webHandler.handle(exchange);
return mono.then(Mono.empty());
}
}

GatewayAutoConfiguration配置类中注入了FilteringWebHandler,由于全局的过滤器GlobalFilterGatewayFilter故在其构造方法中通过适配器模式将GlobalFilter转换成了GatewayFilter。然后通过责任链模式挨个调用GatewayFilterfilter方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class, WebFluxAutoConfiguration.class })
@AutoConfigureAfter({ GatewayLoadBalancerClientAutoConfiguration.class, GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
return new FilteringWebHandler(globalFilters);
}
}
public class FilteringWebHandler implements WebHandler {
private final List<GatewayFilter> globalFilters;
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);// 通过适配器模式将GlobalFilter转换为GatewayFilter
}
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); // 从请求上下文中取出前面绑定的Route
List<GatewayFilter> gatewayFilters = route.getFilters(); // 获取Route中配置的filters
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters); // 合并配置的filters和自动注入的全局的filters
AnnotationAwareOrderComparator.sort(combined); // 对GatewayFilter列表排序
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
}
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
private final int index;

private final List<GatewayFilter> filters;

DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
return filter.filter(exchange, chain);
} else {
return Mono.empty(); // complete
}
});
}
}

也可自定义GatewayFilter,自定义GatewayFilter是通过自定义过滤器工厂来完成的,自定义工厂可集成一些列的AbstractGatewayFilterFactory来完成响应的功能,还可通过实现GlobalFilter来自定义全局的过滤器。对于uri支持lb://的方式类配置目标微服务的请求地址,就是通过LoadBalancerClientFilter过滤器来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class LoadBalancerClientFilter implements GlobalFilter, Ordered {
public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;
protected final LoadBalancerClient loadBalancer;
private LoadBalancerProperties properties;
public LoadBalancerClientFilter(LoadBalancerClient loadBalancer, LoadBalancerProperties properties) {
this.loadBalancer = loadBalancer;
this.properties = properties;
}
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}

@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
addOriginalRequestUrl(exchange, url);
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}

protected ServiceInstance choose(ServerWebExchange exchange) { // 通过负载均衡算法获取具体的实例对象
return loadBalancer.choose(((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
}
}