Dubbo服务引入

Spring启动过程中会将@Reference注解标注的属性赋值,赋值对象为ReferenceBeanget()方法所返回的代理对象。ReferenceBean实现了FactoryBean接口,在其getObject方法中调用的get方法。ReferenceBeanServiceBean类似也实现了ApplicationContextAware接口,在setApplicationContext方法中同样将applicationContext添加到SpringExtensionFactory中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
private transient ApplicationContext applicationContext;
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
SpringExtensionFactory.addApplicationContext(applicationContext);
}
public Object getObject() {
return get(); // 调用超类ReferenceConfig的get方法
}
public void afterPropertiesSet() throws Exception {// 该方法给ReferenceBean对象的属性赋值
// 此处省略了一些列属性填充代码
if (shouldInit()) {
getObject();
}
}
}

调用超类ReferenceConfigget方法时跟服务导出类似同样先调用checkAndUpdateSubConfigs检查和更新参数,将ReferenceBean中的属性值更新为优先级最高的参数值,然后调用init方法成代理对象ref。在init方法中首先将消费者引入服务设置的参数解析到一个map中,后续会根据该map中的参数从注册中心查找服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class ReferenceConfig<T> extends AbstractReferenceConfig {
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {// 入口
init();
}
return ref; // Invoke代理
}
public void checkAndUpdateSubConfigs() {
if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
completeCompoundConfigs(); // 填充ReferenceConfig对象中的属性
startConfigCenter(); // 开启配置中心
checkDefault(); // get consumer's global configuration
this.refresh(); // 刷新ReferenceConfig对象的属性值
if (getGeneric() == null && getConsumer() != null) { // 设置泛化
setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
resolveFile();
checkApplication();
checkMetadataReport();
}
private void init() {
if (initialized) {
return;
}
checkStubAndLocal(interfaceClass);
checkMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
appendRuntimeParameters(map);
if (!ProtocolUtils.isGeneric(getGeneric())) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 通过接口对应的Wrapper,拿到接口中所有的方法名字
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
map.put(INTERFACE_KEY, interfaceName);
appendParameters(map, metrics); // 监控中心参数
appendParameters(map, application); // 应用相关参数
appendParameters(map, module); // 模块相关参数
appendParameters(map, consumer); // 提供者相关参数
appendParameters(map, this); // 服务本身相关参数
Map<String, Object> attributes = null;
if (CollectionUtils.isNotEmpty(methods)) { // 方法参数:@Reference(methods = {@Method(name = "say", timeout = 3000)})
attributes = new HashMap<String, Object>();
for (MethodConfig methodConfig : methods) {
appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) { // 若某个方法配置存在xx.retry=false,则改成xx.retry=0
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
attributes.put(methodConfig.getName(), convertMethodConfig2AsyncInfo(methodConfig));
}
}
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(REGISTER_IP_KEY, hostToRegistry);
ref = createProxy(map); // 根据map中的参数创建代理对象
String serviceKey = URL.buildKey(interfaceName, group, version);
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
initialized = true;
}
}

若为本地调用则生成一个本地URL,然后调用Protocolrefer方法得到一个Invoker对象,若为远程调用首先判断@Reference注解中url属性指定的注册中心地址是否存在,若存在则解析出注册中心的地址列表暂存到urls属性中,解析url判断若为注册中心地址,则在url中添加一个refer参数,将map中存储的消费端参数设置到refer中,若为服务地址,可能url中配置了参数,则需要将服务端的参数合并补全到消费端参数。若@Reference注解中url属性未设置,则加载注册中心地址列表,然后遍历加载到的地址,将消费端参数添加到注册中心地址URL中的REFER_KEY中。

解析得到urls后,若只有一个注册中心地址则直接refer得到一个invoker,若有多个注册中心地址,则遍历每个注册中心,分别调用refer方法得到Invoker且添加到invokers中,然后把invokers调用CLUSTER.join封装所有invokers得到一个invokerCLUSTER为一个SPI扩展接口ClusterCLUSTER默认实现为FailoverCluster。最终将得到的invoker对象调用PROXY_FACTORY.getProxy得到一个代理对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class ReferenceConfig<T> extends AbstractReferenceConfig {
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) { // 若是本地调用即以injvm://开头的协议
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
} else {// 之所以有urls,因为可在@Reference的url属性中配置多个url,可以是点对点的服务地址,也可以是注册中心的地址
urls.clear(); // reference retry init will add url to urls, lead to OOM
if (url != null && url.length() > 0) { // @Reference中指定了url属性
String[] us = SEMICOLON_SPLIT_PATTERN.split(url); // 用;号切分
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 若是注册中心地址,则在url中添加一个refer参数
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); // map表示消费者端配置的参数
} else {// 若是服务地址,有可能url中配置了参数,map中表示的服务消费者消费服务时的参数,所以需要合并
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // @Reference中的protocol属性表示使用哪个协议调用服务,若不是本地调用协议injvm://,则把注册中心地址找出来,对于injvm://协议已经在之前的逻辑中就已经生成invoke了
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
List<URL> us = loadRegistries(false); // 加载注册中心地址
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); // 对于注册中心地址都添加REFER_KEY
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
if (urls.size() == 1) { // 若只有一个url则直接refer得到一个invoker
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); // RegistryProtocol.refer() 或 DubboProtocol.refer()
// MockClusterInvoker-->FailoverClusterInvoker-->RegistryDirectory
// --->RegistryDirectory$InvokerDelegate-->ListenerInvokerWrapper-->ProtocolFilterWrapper$CallbackRegistrationInvoker-->ConsumerContextFilter-->FutureFilter-->MonitorFilter-->AsyncToSyncInvoker-->DubboInvoker
// --->RegistryDirectory$InvokerDelegate-->ListenerInvokerWrapper-->ProtocolFilterWrapper$CallbackRegistrationInvoker-->ConsumerContextFilter-->FutureFilter-->MonitorFilter-->AsyncToSyncInvoker-->DubboInvoker
} else {// 若这多个urls中不存在注册中心url,则把所有invoker整合为FailoverCluster
// 若有多个url根据每个url,refer得到对应的invoker,若这多个urls中存在注册中心url,则把所有invoker整合为RegistryAwareClusterInvoker,该Invoker在调用时,会查看所有Invoker中是否有默认的,若有则使用默认Invoker,若没有则使用第一个Invoker
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null; // 用来记录urls中最后一个注册中心url
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available use RegistryAwareCluster only when register's CLUSTER is available
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME); // 若存在注册中心地址
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));// StaticDirectory表示静态服务目录,里面的invokers是不会变的, 生成一个RegistryAwareCluster
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers)); // 若不存在注册中心地址, 生成一个FailoverClusterInvoker
}
}
}
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
return (T) PROXY_FACTORY.getProxy(invoker);// create service proxy
}
}

和服务导出一样这里的Protocol会有ProtocolListenerWrapperProtocolFilterWrapper两个Wrapper,但这两个Wrapper没有做什么事,最终掉到RegistryProtocol,首先根据URL获取Registry,然后在doRefer中根据URL和接口生成RegistryDirectory动态服务目录,然后将消费者注册到注册中心。然后通过RegistryDirectorybuildRouterChain构造路由链,然后订阅服务当前应用对应的服务提供者目录动态配置目录路由器目录。然后将生成的RegistryDirectory作为参数调用ClusterMockClusterWrapper子类的join方法将具体的Invoker包装成MockClusterInvoker。若消费者引入了多个group中的服务则生成MergeableClusterInvoker否则生成FailoverClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ProtocolListenerWrapper implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // dubbo://
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url, INVOKER_LISTENER_KEY)));
}
}
public class ProtocolFilterWrapper implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // dubbo://
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
}
public class RegistryProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 从registry://的url中获取对应的注册中心,如zookeeper,默认为dubbo,dubbo提供了自带的注册中心实现,url由registry://改变为--->zookeeper://
url = URLBuilder.from(url).setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY).build();
Registry registry = registryFactory.getRegistry(url); // 拿到注册中心实现,ZookeeperRegistry
if (RegistryService.class.equals(type)) { // 用来解决SimpleRegistry不可用的问题
return proxyFactory.getInvoker((T) registry, type, url);
}
// qs表示queryString, 表示url中的参数,表示消费者引入服务时所配置的参数
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// group="a,b" or group="*"
// https://dubbo.apache.org/zh/docs/v2.7/user/examples/group-merger/
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url); // group有多个值,这里的cluster为MergeableCluster
}
}
return doRefer(cluster, registry, type, url); // 这里的cluster是cluster的Adaptive对象,扩展点
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// RegistryDirectory表示动态服务目录,会和注册中心的数据保持同步,type表示一个服务对应一个RegistryDirectory,url表示注册中心地址
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 在消费端,最核心的就是RegistryDirectory
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY 引入服务所配置的参数
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 消费者url
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl()); // 注册消费者,注册简化后的消费url
}
// 构造路由链,路由链会在引入服务时按路由条件进行过滤,路由链是动态服务目录中的一个属性,通过路由链可以过滤某些服务提供者
directory.buildRouterChain(subscribeUrl);
// 服务目录需要订阅的几个路径
// 当前应用所对应的动态配置目录:/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators
// 当前所引入的服务的动态配置目录:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators
// 当前所引入的服务的提供者目录:/dubbo/org.apache.dubbo.demo.DemoService/providers
// 当前所引入的服务的老版本动态配置目录:/dubbo/org.apache.dubbo.demo.DemoService/configurators
// 当前所引入的服务的老版本路由器目录:/dubbo/org.apache.dubbo.demo.DemoService/routers
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory); // 利用传进来的cluster,join得到invoker, MockClusterWrapper
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}

上面调用RegistryFactorygetRegistry方法实际调到AbstractRegistryFactory中,最终通过ZookeeperRegistryFactory创建一个ZookeeperRegistry实例,在doRefer方法中实际注册消费端到注册中心时最终调用ZookeeperRegistrydoRegister方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public abstract class AbstractRegistryFactory implements RegistryFactory {
public Registry getRegistry(URL url) {
url = URLBuilder.from(url).setPath(RegistryService.class.getName()).addParameter(INTERFACE_KEY, RegistryService.class.getName()).removeParameters(EXPORT_KEY, REFER_KEY).build();
String key = url.toServiceStringWithoutResolving();
LOCK.lock(); // Lock the registry access process to ensure a single instance of the registry
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url); //create registry by spi/ioc
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
LOCK.unlock(); // Release the lock
}
}
}
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
public abstract class FailbackRegistry extends AbstractRegistry {
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
}
public class ZookeeperRegistry extends FailbackRegistry {
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}

路由链构造

在获得路由链时就要根据URL参数去匹配得到符合当前的服务的Router,最终可以拿到MockRouterFactoryTagRouterFactoryAppRouterFactoryServiceRouterFactory四个路由工厂。遍历每个RouterFactory调用其getRouter方法得到Router并存到routers中进行排序处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public void buildRouterChain(URL url) {
this.setRouterChain(RouterChain.buildChain(url));
}
}
public class RouterChain<T> {
public static <T> RouterChain<T> buildChain(URL url) {
return new RouterChain<>(url);
}
private RouterChain(URL url) {
// 拿到RouterFactory接口有哪些扩展实现类,比如默认情况下就有四个:
// 0 = {MockRouterFactory@2880}
// 1 = {TagRouterFactory@2881} // 标签路由
// 2 = {AppRouterFactory@2882} // 应用条件路由
// 3 = {ServiceRouterFactory@2883} // 服务条件路由
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class).getActivateExtension(url, (String[]) null);
// 然后利用RouterFactory根据url生成各个类型的Router,这里生产的routers已经是真实可用的了,但是有个比较特殊的:
// 对于应用条件路由和服务条件路由对应的Router对象,对象内部已经有真实可用的数据了,数据已经从配置中心得到了
// 但是对于标签路由则没有,它暂时还相当于一个没有内容的对象,还没有从配置中心获取标签路由的数据
List<Router> routers = extensionFactories.stream().map(factory -> factory.getRouter(url)).collect(Collectors.toList());
initWithRouters(routers); // 把routers按priority进行排序
}
public void initWithRouters(List<Router> builtinRouters) {
this.builtinRouters = builtinRouters;
this.routers = new ArrayList<>(builtinRouters);
this.sort();
}
}

AppRouterServiceRouter是非常类似,他们的父类都是ListenableRouter,在创建AppRouterServiceRouter时会绑定一个监听器,然后主动去获取一下对应节点所配置的路由规则,然后解析得到ConditionRouterRule,再调用generateConditions方法解析出一个或多个ConditionRouter,并存入到conditionRouters中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public class AppRouterFactory implements RouterFactory {
public static final String NAME = "app";
private volatile Router router;
@Override
public Router getRouter(URL url) {
if (router != null) {
return router;
}
synchronized (this) {
if (router == null) {
router = createRouter(url);
}
}
return router;
}
private Router createRouter(URL url) {// 内部会进行初始化
return new AppRouter(DynamicConfiguration.getDynamicConfiguration(), url);
}
}
public interface DynamicConfiguration extends Configuration {
static DynamicConfiguration getDynamicConfiguration() {
Optional<Configuration> optional = Environment.getInstance().getDynamicConfiguration();
return (DynamicConfiguration) optional.orElseGet(() -> getExtensionLoader(DynamicConfigurationFactory.class).getDefaultExtension().getDynamicConfiguration(null));
}
}
public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
}
}
public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
this.url = url;
rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
zkClient = zookeeperTransporter.connect(url);
zkClient.addDataListener(rootPath, cacheListener, executor);
try {// Wait for connection
long timeout = url.getParameter("init.timeout", 5000);
boolean isCountDown = this.initializedLatch.await(timeout, TimeUnit.MILLISECONDS);
if (!isCountDown) {
throw new IllegalStateException("Failed to receive INITIALIZED event from zookeeper, pls. check if url " + url + " is correct");
}
} catch (InterruptedException e) {
}
}
}
public class CacheListener implements DataListener {
public void dataChanged(String path, Object value, EventType eventType) {
if (eventType == null) {
return;
}
if (eventType == EventType.INITIALIZED) {
initializedLatch.countDown();
return;
}
if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
return;
}
if (path.split("/").length >= MIN_PATH_DEPTH) {
String key = pathToKey(path);
ConfigChangeType changeType;
switch (eventType) {
case NodeCreated:
changeType = ConfigChangeType.ADDED;
break;
case NodeDeleted:
changeType = ConfigChangeType.DELETED;
break;
case NodeDataChanged:
changeType = ConfigChangeType.MODIFIED;
break;
default:
return;
}
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType);
Set<ConfigurationListener> listeners = keyListeners.get(path);
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}
}
public class AppRouter extends ListenableRouter {
public static final String NAME = "APP_ROUTER";
private static final int APP_ROUTER_DEFAULT_PRIORITY = 150;
public AppRouter(DynamicConfiguration configuration, URL url) {// 拿到应用名
super(configuration, url, url.getParameter(CommonConstants.APPLICATION_KEY));
this.priority = APP_ROUTER_DEFAULT_PRIORITY;
}
}
public class ServiceRouterFactory extends CacheableRouterFactory {
public static final String NAME = "service";
@Override
protected Router createRouter(URL url) {
return new ServiceRouter(DynamicConfiguration.getDynamicConfiguration(), url);
}
}
public class ServiceRouter extends ListenableRouter {
public ServiceRouter(DynamicConfiguration configuration, URL url) { // 得到服务名
super(configuration, url, DynamicConfiguration.getRuleKey(url));
this.priority = SERVICE_ROUTER_DEFAULT_PRIORITY;
}
}
public abstract class ListenableRouter extends AbstractRouter implements ConfigurationListener {
public ListenableRouter(DynamicConfiguration configuration, URL url, String ruleKey) {
super(configuration, url);
this.force = false;
// ruleKey为服务名或应用名,初始化,会绑定一个监听器,负责监听配置中心条件路由的修改,并且会主动从配置中心获取一下当前条件路由的数据并做解析
this.init(ruleKey);
}
private synchronized void init(String ruleKey) {
if (StringUtils.isEmpty(ruleKey)) {
return;
}
String routerKey = ruleKey + RULE_SUFFIX; // 服务名+".condition-router",或 应用名+".condition-router"
configuration.addListener(routerKey, this); // 绑定一个监听器去监听routerKey对应的路径,当前类ListenableRouter就自带了一个监听器
// 绑定完监听器后,主动的从配置中心获取一下当前服务或消费者应用的对应的路由配置
String rule = configuration.getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP);
if (StringUtils.isNotEmpty(rule)) {// 手动调用监听器处理事件的方法process()
this.process(new ConfigChangeEvent(routerKey, rule));
}
}
public synchronized void process(ConfigChangeEvent event) {
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
// 若是一个删除时间,则清空当前Router中的conditionRouters属性,表示当前Router对象中没有路由规则
routerRule = null;
conditionRouters = Collections.emptyList();
} else {
try {
routerRule = ConditionRuleParser.parse(event.getValue()); // 解析路由规则
generateConditions(routerRule); // 根据路由规则,生成ConditionRouter-条件路由对象,并赋值给当前Router对象的conditionRouters属性
} catch (Exception e) {
}
}
}
}

TagRouter比较特殊,首先标签路由是用在当消费者在调用某个服务时,通过在请求中设置标签,然后根据所设置的标签获得可用的服务提供者地址,目前TagRouter只支持应用级别的配置。

对服务消费者而言在引用某个服务时,需知道提供该服务的应用名,然后去监听该应用名对应的.tag-router节点内容,需要先获取到当前所引入服务的服务提供者URL,从服务提供者URL中得到服务提供者应用名,拿到应用名后才能去应用名对应的.tag-router节点去绑定监听器

AppRouter监听的是本消费者应用的路由规则ServiceRouter监听的是所引入服务的路由规则TagRouter是在引入服务时获取服务提供者URL之后,才监听.tag-router节点中的内容,并手动获取一次节点中的内容,设置TagRouter对象中tagRouterRule属性表示标签路由规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class TagRouterFactory extends CacheableRouterFactory {
public static final String NAME = "tag";
@Override
protected Router createRouter(URL url) {
return new TagRouter(DynamicConfiguration.getDynamicConfiguration(), url);
}
}
public class TagRouter extends AbstractRouter implements ConfigurationListener {
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return invokers;
}
final TagRouterRule tagRouterRuleCopy = tagRouterRule; // since the rule can be changed by config center, we should copy one to use.
if (tagRouterRuleCopy == null || !tagRouterRuleCopy.isValid() || !tagRouterRuleCopy.isEnabled()) {
return filterUsingStaticTag(invokers, url, invocation);
}
List<Invoker<T>> result = invokers;
// 获取调用对象invocation中所设置的tag
String tag = StringUtils.isEmpty(invocation.getAttachment(Constants.TAG_KEY)) ? url.getParameter(Constants.TAG_KEY) : invocation.getAttachment(Constants.TAG_KEY);
if (StringUtils.isNotEmpty(tag)) { // 若请求具有特定标签的提供者
List<String> addresses = tagRouterRuleCopy.getTagnameToAddresses().get(tag); // 获取对应tag所设置的服务提供者address
if (CollectionUtils.isNotEmpty(addresses)) { // 首先按动态标签组过过滤
// 根据tag所对应的address对所有服务提供者invokers进行过滤
result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
// 若过滤之后还有结果,那就用过滤之后的结果,若没有结果,但是此标签路由是要强制使用的,那么则会把空结果返回(没有此tag所对应的服务提供者可用)
if (CollectionUtils.isNotEmpty(result) || tagRouterRuleCopy.isForce()) {
return result;
}
} else {// 动态标签组没有关于请求的应用程序的任何项目或它在过滤后为空,动态标签组但force=false,检查静态标签
result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY)));
}
// 若没有可匹配当前标记请求的标记提供者,force.tag默认为false,这意味着它将调用任何没有标记的提供程序,除非它被明确禁止
if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
return result;
} else { // FAILOVER:返回没有任何标签的所有提供者。
List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), tagRouterRuleCopy.getAddresses()));
return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
}
} else {
// List<String> addresses = tagRouterRule.filter(providerApp);
List<String> addresses = tagRouterRuleCopy.getAddresses(); // 返回动态标签组中的所有地址
if (CollectionUtils.isNotEmpty(addresses)) {
result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
if (CollectionUtils.isEmpty(result)) {
return result; // 所有地址都在动态标签组中,返回空列表
}
// 如果有一些地址不在任何动态标签组中,则继续使用
}
return filterInvoker(result, invoker -> {
String localTag = invoker.getUrl().getParameter(Constants.TAG_KEY);
return StringUtils.isEmpty(localTag) || !tagRouterRuleCopy.getTagNames().contains(localTag);
});
}
}
private <T> List<Invoker<T>> filterUsingStaticTag(List<Invoker<T>> invokers, URL url, Invocation invocation) {
List<Invoker<T>> result = invokers;
String tag = StringUtils.isEmpty(invocation.getAttachment(Constants.TAG_KEY)) ? url.getParameter(Constants.TAG_KEY) : invocation.getAttachment(Constants.TAG_KEY);
if (!StringUtils.isEmpty(tag)) { // Tag request
result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
if (CollectionUtils.isEmpty(result) && !isForceUseTag(invocation)) {
result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
}
} else {
result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
}
return result;
}
}
public class MockRouterFactory implements RouterFactory {
public static final String NAME = "mock";
@Override
public Router getRouter(URL url) {
return new MockInvokersSelector();
}
}
public class MockInvokersSelector extends AbstractRouter {
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers, URL url, final Invocation invocation) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return invokers;
}
if (invocation.getAttachments() == null) {
return getNormalInvokers(invokers);
} else {
String value = invocation.getAttachments().get(INVOCATION_NEED_MOCK);
if (value == null) {
return getNormalInvokers(invokers);
} else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
return getMockedInvokers(invokers);
}
}
return invokers;
}
private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
if (!hasMockProviders(invokers)) {
return null;
}
List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
for (Invoker<T> invoker : invokers) {
if (invoker.getUrl().getProtocol().equals(MOCK_PROTOCOL)) {
sInvokers.add(invoker);
}
}
return sInvokers;
}
private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
if (!hasMockProviders(invokers)) {
return invokers;
} else {
List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
for (Invoker<T> invoker : invokers) {
if (!invoker.getUrl().getProtocol().equals(MOCK_PROTOCOL)) {
sInvokers.add(invoker);
}
}
return sInvokers;
}
}
private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) {
boolean hasMockProvider = false;
for (Invoker<T> invoker : invokers) {
if (invoker.getUrl().getProtocol().equals(MOCK_PROTOCOL)) {
hasMockProvider = true;
break;
}
}
return hasMockProvider;
}
}

服务目录

消费端每个服务对应一个服务目录RegistryDirectory,一个服务目录中包含serviceType服务接口serviceKey引入的服务keyserviceclass+version+groupqueryMap引入服务参数配置configurators动态配置routerChain路由链、invokers服务目录当前缓存的服务提供者InvokerConsumerConfigurationListener监听本应用的动态配置ReferenceConfigurationListener监听所引入的服务的动态配置

ConsumerConfigurationListener接收到消费者应用动态配置数据变化后,调用当前消费者应用中所有RegistryDirectoryrefreshInvoker方法,刷新消费者应用中引入的每个服务对应的Invoker。当ReferenceConfigurationListener接收到某个服务动态配置数据变化后,调用该服务对应的RegistryDirectoryrefreshInvoker方法,刷新该服务对应的Invoker。

refreshInvoker方法中会从注册中心获取到的providers节点下的服务URL,然后调用toInvokers方法得到Invoker,先按Protocol进行过滤且调用DubboProtocol.refer方法得到Invoker,将得到的invokers设置到RouterChain,且调用RouterChain上所有的routersnotify方法,实际只有TagRouter的notify方法有用,再把属于同一个group中的invoker合并起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
private ReferenceConfigurationListener serviceConfigurationListener;
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); // 监听consumer应用
serviceConfigurationListener = new ReferenceConfigurationListener(this, url); // 监听所引入的服务的动态配置
registry.subscribe(url, this);
}
private void refreshInvoker(List<URL> invokerUrls) { //http:// dubbo://
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// 这里会先按Protocol进行过滤,并且调用DubboProtocol.refer方法得到DubboInvoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
routerChain.setInvokers(newInvokers); // 得到了所引入的服务Invoker之后,把它们设置到路由链中去,在调用时使用,并且会调用TagRouter的notify方法
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
}
}
}
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) { // 遍历当前服务所有的服务提供者URL
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) { // 当前消费者如果手动配置了Protocol,那么则进行匹配
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 当前Protocol是否在应用中存在对应的扩展点
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try { // 如果当前服务提供者URL没有生产过Invoker
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {// 调用Protocol的refer方法得到一个Invoker DubboProtocol.refer()
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
List<String> deleted = null;
if (oldUrlInvokerMap != null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<>();
}
deleted.add(entry.getKey());
}
}
}

if (deleted != null) {
for (String url : deleted) {
if (url != null) {
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if (invoker != null) {
try {
invoker.destroy();
} catch (Exception e) {
}
}
}
}
}
}
}
private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
List<RegistryDirectory> listeners = new ArrayList<>();
ConsumerConfigurationListener() {
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
void addNotifyListener(RegistryDirectory listener) {
this.listeners.add(listener);
}
@Override
protected void notifyOverrides() {// 调用RegistryDirectory的refreshInvoker方法
listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
}
}
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
private RegistryDirectory directory;
private URL url;
ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
this.directory = directory;
this.url = url;
this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
}
@Override
protected void notifyOverrides() {
directory.refreshInvoker(Collections.emptyList());
}
}
public abstract class AbstractConfiguratorListener implements ConfigurationListener {
protected final void initWith(String key) {
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
dynamicConfiguration.addListener(key, this); // 添加Listener,进行了订阅
// 从配置中心ConfigCenter获取属于当前应用的动态配置数据,从zk中拿到原始数据(主动从配置中心获取数据)
String rawConfig = dynamicConfiguration.getRule(key, DynamicConfiguration.DEFAULT_GROUP);
if (!StringUtils.isEmpty(rawConfig)) { // 如果存在应用配置信息则根据配置信息生成Configurator
genConfiguratorsFromRawRule(rawConfig);
}
}
private boolean genConfiguratorsFromRawRule(String rawConfig) {
boolean parseSuccess = true;
try { // 先把应用或服务配置转成url,再根据url生成对应的Configurator
configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(rawConfig)).orElse(configurators);
} catch (Exception e) {
parseSuccess = false;
}
return parseSuccess;
}
}

RegistryDirectory接收到providers节点数据变化后,会调用refreshOverrideAndInvoker方法,该方法用来针对每个服务提供者来生成InvokerrefreshOverrideAndInvoker方法中首先调用overrideDirectoryUrl方法利用Configurators重写目录地址,在注册中心URL基础上把当前引入服务的参数作为URL的Parameters,所以该地址既包括了注册中心信息,也包括了当前引入服务的信息,重写往目录地址后,调用refreshInvoker方法去刷新Invoker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream().filter(Objects::nonNull).filter(this::isValidCategory).filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
// 获取动态配置URL,生成configurators
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// 获取老版本路由URL,生成Router,并添加到路由链中
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// 获取服务提供者URL
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}

private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
refreshInvoker(urls);
}
private void overrideDirectoryUrl() {
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
List<Configurator> localConfigurators = this.configurators; // local reference
doOverrideUrl(localConfigurators);
List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference
doOverrideUrl(localAppDynamicConfigurators);
if (serviceConfigurationListener != null) {
List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators(); // local reference
doOverrideUrl(localDynamicConfigurators);
}
}
private void doOverrideUrl(List<Configurator> configurators) {
if (CollectionUtils.isNotEmpty(configurators)) {
for (Configurator configurator : configurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
}
}

服务引入

调用Protocolrefer方法首先超类AbstractProtocolrefer方法,然后调用DubboProtocolprotocolBindingRefer方法,该方法中主要是通过getClients获取client列表,为了提高效率一个DubboInvoker会有多个client每个client和server之间都会有一个socket,多个client连的是同一个server,在DubboInvoker发送请求时轮询clients去发送数据

在获取Client时首先判断配置的连接数connections,若未配置则使用共享socket连配置shareConnectionsStr默认为1,非共享连接若消费者应用引用了两个服务A和B,这两个服务都部署在了应用C上,若connections2则消费者应用会与应用C建立4个Socket连接,若为共享连接shareConnectionsStr2,那么消费者应用会与应用C建立2个Socket连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
public abstract class AbstractProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 异步转同步Invoker, type是接口,url是服务地址,DubboInvoker是异步的,而AsyncToSyncInvoker会封装为同步的
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
}
public class DubboProtocol extends AbstractProtocol {
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// clients很重要,为了提高效率一个DubboInvoker会有多个clients,因为每个client和server之间都会有一个socket, 多个client连的是同一个server
// 在DubboInvoker发送请求时会轮询clients去发送数据
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
boolean useShareConnect = false;
// connections表示对当前服务提供者建立connections个socket连接
// 消费者应用引用了两个服务A和B,这两个服务都部署在了应用C上,如果connections为2,那么消费者应用会与应用C建立4个Socket连接
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// 如果没有配置connections,那么则取shareConnectionsStr(默认为1),表示共享socket连接个数
// 消费者应用引用了两个服务A和B,这两个服务都部署在了应用C上,如果shareConnectionsStr为2,那么消费者应用会与应用C建立2个Socket连接
if (connections == 0) {
useShareConnect = true;
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) { // 如果使用共享的,则利用shareClients
clients[i] = shareClients.get(i);
} else { // 不然就初始化,在初始化client时会去连接服务端
clients[i] = initClient(url);
}
}
return clients;
}
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
// 这个方法返回的是可以共享的client,要么已经生成过了,要么需要重新生成
String key = url.getAddress(); // 对于已经生成过的client,都会存在referenceClientMap中,key为所调用的服务IP+PORT
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
// 根据当前引入的服务对应的ip+port,看看是否已经存在clients了,
if (checkClientCanUse(clients)) {// 如果每个client都可用,那就对每个client的计数+1,表示这些client被引用了多少次
batchClientRefIncr(clients);
return clients;
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
if (checkClientCanUse(clients)) { // dubbo check
batchClientRefIncr(clients);
return clients;
}
connectNum = Math.max(connectNum, 1); // 至少一个
if (CollectionUtils.isEmpty(clients)) {// 如果clients为空,则按指定的connectNum生成client
clients = buildReferenceCountExchangeClientList(url, connectNum);
referenceClientMap.put(key, clients);
} else {// 如果clients不为空,则遍历这些client,对于不可用的client,则重新生成一个client
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
locks.remove(key);
return clients;
}
}
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url); // 生成一个ExchangeClient
return new ReferenceCountExchangeClient(exchangeClient); // 包装成ReferenceCountExchangeClient
}
private ExchangeClient initClient(URL url) {
// 拿设置的client,默认为netty
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// 编码方式
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// 心跳, 默认60 * 1000,60秒一个心跳
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// 如果没有指定的client扩展,则抛异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + ", supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
if (url.getParameter(LAZY_CONNECT_KEY, false)) { // connection should be lazy
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler); // 建立连接
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message; // 转成Invocation对象,要开始用反射执行方法了
Invoker<?> invoker = getInvoker(channel, inv); // 服务实现者
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 这里设置了,service中才能拿到remoteAddress
Result result = invoker.invoke(inv);// 执行服务,得到结果
return result.completionFuture().thenApply(Function.identity()); // 返回一个CompletableFuture
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message); // 这是服务端接收到Invocation时的处理逻辑
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
invoke(channel, ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
}

客户端的启动和服务端的启动类似,在connect方法中调用HeaderExchanger的connect方法去建立socket连接并得到一个HeaderExchangeClient,构造HeaderExchangeClient时先执行Transporters.*connect*()方法得到一个Client,从而调用调用NettyTransporter的connect方法构造一个NettyClient,构造NettyClient的过程中会初始化Netty客户端,然后连接Server端建立一个Socket连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
public class Exchangers {
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler); // 得到一个HeaderExchanger去connect
}
}
public class HeaderExchanger implements Exchanger {
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 利用NettyTransporter去connect,为什么在connect和bind时都是DecodeHandler,解码解的是把InputStream解析成AppResponse对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
public class Transporters {
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler); // NettyTransporter
}
}
public class NettyTransporter implements Transporter {
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener); // 生成一个NettyClient, 这个内部会去进行连接
}
}
public class NettyClient extends AbstractClient {
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(CHANNEL_FACTORY);
// config @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
future.cancel();
}
}
}
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {// connect.
connect();
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
// 得到消费端的线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
}
public class ChannelHandlers {
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 先通过ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
// 得到一个AllChannelHandler(handler, url)把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler
// 当Netty接收到一个数据时,会经历MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler,而AllChannelHandler会调用handler
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
}

创建代理

最终创建好Invoker对象后会通过ProxyFactory给Invoker创建代理对象,ProxyFactory是通过SPI机制加载的默认为JavassistProxyFactory。最终创建代理对象InvokerInvocationHandler即为最终ReferenceBeangetObject方法返回对象,在发起服务调用时首先执行InvokerInvocationHandlerinvoke方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 若现在被代理对象proxy本身就是一个已经被代理过的对象,则取代理类的Wrapper,否则取type接口的Wrapper
// Wrapper是针对某个类或某个接口的包装类,通过wrapper对象可以更方便的去执行某个类或某个接口的方法
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {// proxy是服务实现类 type是服务接口 url是一个注册中心url
@Override
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
// 执行proxy的method方法,执行的proxy实例的方法,若没有wrapper,则要通过原生的反射技术去获取Method对象,然后执行
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// recreate方法会调用AppResponse的recreate方法,若AppResponse对象中存在exception信息,则此方法中会throw该异常
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}

Invoker调用链

1
2
@Reference(url = "dubbo://ip:20881/package.DemoService;registry://ip:2181/package.RegistryService?registry=zookeeper")
private DemoService demoService;

最复杂情况下Invoker链@Reference注解中设置URL,且指定服务提供者URLdubbo://ip:20881/package.DemoService注册中心URLregistry://ip:2181/package.RegistryService?registry=zookeeper,最终refer处理的invoker链路为:

  1. MockClusterInvoker
  2. invoker=RegistryAwareClusterInvoker
  3. directory=StaticDirectory
  4. 0=ProtocolFilterWrapper$CallbackRegistrationInvoke子流程
  5. 1=MockClusterInvoker
  6. FailoverClusterInvoker
  7. RegistryDirectory
  8. invokers=UnmodifiableRandomAccessListsize=1
  9. 0=RegistryDirectory$InvokerDelegat
  10. ProtocolFilterWrapper$CallbackRegistrationInvoke子流程
  11. filterInvoker=ProtocolFilterWrapper$1
  12. filter=ConsumerContextFilter
  13. next=ProtocolFilterWrapper$1
  14. filter=FutureFilter
  15. next=ProtocolFilterWrapper$1
  16. filter=MonitorFilter
  17. next=ListenerInvokerWrapper
  18. invoker=AsyncToSyncInvoker
  19. invoker=DubboInvoker