Ribbon集成原理

主流的负载方案分为集中式负载均衡,在消费者和服务提供方中间使用独立的代理方式进行负载,如硬件的F5,软件的Nginx客户端负载均衡,客户端会有一个服务器地址列表,在发送请求前通过负载均衡算法选择一个服务器,然后进行访问。

Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端负载均衡工具,Ribbon客户端组件提供一系列的完善的配置如超时重试等。通过Load Balancer获取到服务提供的所有机器实例,Ribbon会自动基于某种负载策略去调用这些服务,Ribbon也可以实现自己的负载均衡算法。

对于RestTemplate通过Ribbon实现负载均衡调用,需要在声明RestTemplate Bean时加上@LoadBalanced注解。调用时直接通过服务名称即可。@LoadBalanced利用@Qualifier作为restTemplates注入的筛选条件,筛选出具有负载均衡标识RestTemplate,被@LoadBalanced注解的restTemplate会被定制,添加LoadBalancerInterceptor拦截器。

1
2
3
4
5
6
7
8
9
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Autowired
private RestTemplate restTemplate;
String url = "http://mall-order/order/findOrderByUserId/" + userId;
Object result = restTemplate.getForObject(url, Object.class);

Ribbon集成到RestTemplate中是通过RibbonAutoConfiguration配置类中导入LoadBalancerClient,然后通过LoadBalancerAutoConfiguration配置类将LoadBalancerClient封装到LoadBalancerInterceptor拦截器中,再通过RestTemplateCustomizerSmartInitializingSingletonBean初始化完成后将该拦截器封装到RestTemplate拦截器集合中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class })
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {
@Autowired(required = false)
private List<RibbonClientSpecification> configurations = new ArrayList<>();
@Bean
@ConditionalOnMissingBean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced // 该注解被@Qualifier注解修饰,这里会导入所有被@LoadBalanced注解标记的RestTemplate
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
@Bean // 将LoadBalancerClient封装到LoadBalancerInterceptor拦截器中
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> { // 将拦截器添加到RestTemplate的拦截器链中
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate); // 在所有Bean初始化完成后被调用
}
}
});
}
}

在调用RestTemplate的方法请求时,会被LoadBalancerInterceptor拦截器拦截从而调用RibbonLoadBalancerClientexecute方法,通过getLoadBalancer获取负载均衡器,通过获取到的负载均衡器根据负载均衡算法挑选一个Server。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
public class RibbonLoadBalancerClient implements LoadBalancerClient {
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
return execute(serviceId, request, null);
}
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId); // 获取负载均衡器
Server server = getServer(loadBalancer, hint); // 负载均衡器LoadBalancer根据负载均衡算法挑选一个Server
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
}

获取负载均衡器

ILoadBalancer负载均衡器是通过RibbonClientConfiguration配置类导入的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater);
}
}
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
}
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature(); // Ribbon定时更新Nacos实例列表
updateListOfServers(); // 获取所有Nacos实例列表
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections().primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
}
}

定时更新Nacos实例列表最终调用PollingServerListUpdaterstart方法,将更新任务添加到周期延时线程池中,且每30s执行一次该任务。该任务最终调用DynamicServerListLoadBalancerupdateListOfServers方法完成实例列表的更新,最终通过updateAllServerList方法将服务实例列表设置到父类BaseLoadBalancerallServerList中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {    
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers(); // 真正去做定时更新的方法
}
};
public void enableAndInitLearnNewServersFeature() {
serverListUpdater.start(updateAction);
}
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers); // 将服务实例俩表设置到父类BaseLoadBalancer的allServerList中
}
protected void updateAllServerList(List<T> ls) {
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
public void setServersList(List lsrv) {
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
}
public class PollingServerListUpdater implements ServerListUpdater {
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) { // 不是存活状态
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate(); // 调用定义时创建的匿名方法从而调用updateListOfServers
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
}
}
};
// 第一次1s后开始执行,后续每30s执行一次
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS);
}
}
}

若集成了Nacos最终会调用NacosServerListgetServers方法,从Nacos服务获取最新的实例列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class NacosServerList extends AbstractServerList<NacosServer> {
public List<NacosServer> getUpdatedListOfServers() {
return getServers();
}
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
// 调用Nacos的方法获取最新的实例列表
List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
return instancesToServerList(instances);
} catch (Exception e) {
throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
}
}
}
public class NacosNamingService implements NamingService {
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return selectInstances(serviceName, groupName, healthy, true);
}
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
}

负载均衡算法选择

负载均衡策略IRule,默认采用ZoneAvoidanceRule实现,在多区域环境下选出最佳区域的实例进行访问。实例检查策略IPing,默认采用DummyPing实现,是一个特殊的实现始终返回true,并不会检查实例是否可用。服务实例清单的维护机制ServerList,默认采用ConfigurationBasedServerList实现。服务实例清单过滤机制ServerListFilter,默认采ZonePreferenceServerListFilter,该策略能够优先过滤出与请求方处于同区域的服务实例。负载均衡器ILoadBalancer,默认采用ZoneAwareLoadBalancer实现,它具备了区域感知的能力。

  • RandomRule随机选择一个Server。
  • RetryRule: 对选定的负载均衡策略机上重试机制,在一个配置时间段内当选择Server不成功,则一直尝试使用subRule的方式选择一个可用的server。
  • RoundRobinRule轮询选择, 轮询index,选择index对应位置的Server。
  • AvailabilityFilteringRule: 过滤一直连接失败被标记为Circuit Tripped的后端Server,并过滤高并发的后端Server或使用一个AvailabilityPredicate来包含过滤Server的逻辑,其实就是检查status里记录的各个Server的运行状态。
  • BestAvailableRule: 选择一个最小的并发请求的Server,逐个考察Server若Server被tripped了则跳过。
  • WeightedResponseTimeRule: 根据响应时间加权,响应时间越长,权重越小,被选中的可能性越低
  • ZoneAvoidanceRule默认的负载均衡策略,复合判断Server所在区域的性能和Server的可用性选择Server,在没有区域的环境下,类似于RandomRule轮询
  • NacosRule: 同集群优先调用

可通过主动注入IRule Bean的方式修改全局的负载均衡策略

1
2
3
4
@Bean
public IRule ribbonRule() {
return new NacosRule();
}

同在在配置文件中指定调用指定微服务提供的服务时,使用对应的负载均衡算法

1
2
3
mall-order: # 被调用的微服务名
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class RibbonLoadBalancerClient implements LoadBalancerClient {
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
}
public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware {
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key); // 调用具体规则的choose方法
} catch (Exception e) {
return null;
}
}
}
}
public class NacosRule extends AbstractLoadBalancerRule {
public Server choose(Object key) {
try {
String clusterName = this.nacosDiscoveryProperties.getClusterName();
String group = this.nacosDiscoveryProperties.getGroup();
DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer();
String name = loadBalancer.getName();
NamingService namingService = nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
List<Instance> instances = namingService.selectInstances(name, group, true);
if (CollectionUtils.isEmpty(instances)) {
return null;
}
List<Instance> instancesToChoose = instances;
if (StringUtils.isNotBlank(clusterName)) {
List<Instance> sameClusterInstances = instances.stream().filter(instance -> Objects.equals(clusterName, instance.getClusterName())).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sameClusterInstances)) {
instancesToChoose = sameClusterInstances;
}
}
Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);
return new NacosServer(instance);
} catch (Exception e) {
return null;
}
}
}