Dubbo服务导出

服务导出首先确定服务参数,确定服务支持的协议,构造服务最终的URL,将服务URL注册到注册中心向注册中心注册监听器,监听Dubbo的中的动态配置信息变更,主要根据服务支持的不同协议启动不同的Server用来接收和处理请求如netty、tomcat、jetty等。

服务的参数除了可以在@Service注解中配置AbstractConfig,还会继承Dubbo服务所属应用Application上的配置,还可在配置中心配置且分为应用配置AppExternalConfiguration和全局配置ExternalConfigurationJVM环境变量中去配置某个服务的参数SystemConfiguration,还可以通过dubbo.properties文件配置PropertiesConfiguration。优先级从高到低为SystemConfigurationAppExternalConfigurationExternalConfigurationAbstractConfigPropertiesConfiguration

服务导出入口ServiceBeanexport方法,ServiceBean继承了ApplicationContextAware,在setApplicationContext方法中会把applicationContext添加到SpringExtensionFactory中便于Dubbo的SPI机制引入的类中完成Spring容器中Bean的注入,且ServiceBean继承了ApplicationListener接口,将ServiceBean作为监听器注册到Spring监听器列表中。当Spring启动完之后通过接收ContextRefreshedEvent事件从而调用onApplicationEvent来触发export方法的执行。且若注册监听器失败在afterPropertiesSet完成一系列属性填充后,将直接调用export方法来完成导出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware {
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
// 若某一个Service是通过Spring暴露的,则当需要获取该服务时就要从Spring容器中进行获取,所以需要把applicationContext添加到SpringExtensionFactory中去
SpringExtensionFactory.addApplicationContext(applicationContext);
// 一定要有这一步,不然ServiceBean将接收不到ContextRefreshedEvent事件
supportedApplicationListener = addApplicationListener(applicationContext, this);
}
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) { // 当前服务没有被导出并且没有卸载,才导出服务
export(); // 服务导出(服务注册)
}
}
public void export() {
super.export();
// Publish ServiceBeanExportedEvent,Spring启动完发布ContextRefreshedEvent事件--->服务导出--->发布ServiceBeanExportedEvent,可通过Spring中的ApplicationListener来监听服务导出是否完成
publishExportEvent();
}
private void publishExportEvent() {
ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this);
applicationEventPublisher.publishEvent(exportEvent);
}
public void afterPropertiesSet() throws Exception {
// 此处省略了一些列属性填充代码
if (!supportedApplicationListener) { // 在setApplicationContext方法中添加监听器成功,则该参数会被置为true
export();
}
}
}

最终调用超类ServiceConfig中的export方法,首先调用checkAndUpdateSubConfigs完成参数补全刷新等操作,刷新完后会检查stublocalmock等参数是否配置正确。

若ServiceConfig中某些属性为空,则从ProviderConfigModuleConfigApplicationConfig中获取,补全ServiceConfig属性,从配置中心获取配置,包括应用配置和全局配置,把获取到的配置放入到Environment中的externalConfigurationMapappExternalConfigurationMap中,并刷新所有除开ServiceConfigXxConfig的属性,即将配置中心的配置覆盖XxConfig中的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
public class ServiceConfig<T> extends AbstractServiceConfig {
public synchronized void export() {
checkAndUpdateSubConfigs();
if (!shouldExport()) { // 检查服务是否需要导出
return;
}
if (shouldDelay()) { // 检查是否需要延迟发布
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport(); // 导出服务
}
}
public void checkAndUpdateSubConfigs() {
// ServiceConfig中的某些属性如果是空的,那么就从ProviderConfig、ModuleConfig、ApplicationConfig中获取,补全ServiceConfig中的属性
completeCompoundConfigs();
// 从配置中心获取配置,包括应用配置和全局配置,把获取到的配置放入到Environment中的externalConfigurationMap和appExternalConfigurationMap中
// 并刷新所有除开ServiceConfig的XxConfig的属性,即将配置中心的配置覆盖XxConfig中的属性
startConfigCenter();
checkDefault();
checkProtocol();
checkApplication();
if (!isOnlyInJvm()) {
checkRegistry();// 如果protocol不是只有injvm协议,表示服务调用不是只在本机jvm里面调用,那就需要用到注册中心
}
this.refresh(); // 刷新ServiceConfig
checkMetadataReport(); // 如果配了metadataReportConfig,那么就刷新配置
if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
if (ref instanceof GenericService) { // 当前服务对应的实现类是一个GenericService,表示没有特定的接口
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try { // 加载接口
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods); // 刷新MethodConfig,并判断MethodConfig中对应的方法在接口中是否存在
checkRef(); // 实现类是不是该接口类型
generic = Boolean.FALSE.toString();
}
if (local != null) { // local和stub一样,不建议使用了
if (Boolean.TRUE.toString().equals(local)) { // 如果本地存根为true,则存根类为interfaceName + "Local"
local = interfaceName + "Local";
}
Class<?> localClass; // 加载本地存根类
try {
localClass = ClassUtils.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) { // 本地存根
if (Boolean.TRUE.toString().equals(stub)) {
stub = interfaceName + "Stub"; // 若本地存根为true,则存根类为interfaceName + "Stub"
}
Class<?> stubClass;
try {
stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
checkStubAndLocal(interfaceClass); // 检查local和stub
checkMock(interfaceClass); // 检查mock
}
private void completeCompoundConfigs() {
if (provider != null) { // 如果配置了provider,那么则从provider中获取信息赋值其他属性,在这些属性为空的情况下
if (application == null) {
setApplication(provider.getApplication());
}
if (module == null) {
setModule(provider.getModule());
}
if (registries == null) {
setRegistries(provider.getRegistries());
}
if (monitor == null) {
setMonitor(provider.getMonitor());
}
if (protocols == null) {
setProtocols(provider.getProtocols());
}
if (configCenter == null) {
setConfigCenter(provider.getConfigCenter());
}
}
if (module != null) { // 如果配置了module,那么则从module中获取信息赋值其他属性,在这些属性为空的情况下
if (registries == null) {
setRegistries(module.getRegistries());
}
if (monitor == null) {
setMonitor(module.getMonitor());
}
}
if (application != null) { // 如果配置了application,那么则从application中获取信息赋值其他属性,在这些属性为空的情况下
if (registries == null) {
setRegistries(application.getRegistries());
}
if (monitor == null) {
setMonitor(application.getMonitor());
}
}
}
}
public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {
void startConfigCenter() {
if (configCenter == null) {
ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc);
}
if (this.configCenter != null) { // 如果配置了ConfigCenter
this.configCenter.refresh(); // 从其他位置获取配置中心的相关属性信息,比如配置中心地址
prepareEnvironment(); // 属性更新后,从远程配置中心获取数据(应用配置,全局配置)
}
ConfigManager.getInstance().refreshAll(); // 从配置中心取到配置数据后,刷新所有的XxConfig中的属性,除开ServiceConfig
}
}
public abstract class AbstractConfig implements Serializable {
public void refresh() {
try {
CompositeConfiguration compositeConfiguration = Environment.getInstance().getConfiguration(getPrefix(), getId());
// 表示XxConfig对象本身- AbstractConfig
Configuration config = new ConfigConfigurationAdapter(this); // ServiceConfig
if (Environment.getInstance().isConfigCenterFirst()) {
// The sequence would be: SystemConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
compositeConfiguration.addConfiguration(4, config);
} else {
// The sequence would be: SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration
compositeConfiguration.addConfiguration(2, config);
}
// loop methods, get override value and set the new value back to method
Method[] methods = getClass().getMethods(); //ServiceBean
for (Method method : methods) {
if (MethodUtils.isSetter(method)) { // 是不是setXX()方法
// 获取xx配置项的value
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
// isTypeMatch() is called to avoid duplicate and incorrect update, for example, we have two 'setGeneric' methods in ReferenceConfig.
if (StringUtils.isNotEmpty(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value)) {
method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value));
}
} else if (isParametersSetter(method)) { // 是不是setParameters()方法
// 获取parameter配置项的value
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
if (StringUtils.isNotEmpty(value)) {
Map<String, String> map = invokeGetParameters(getClass(), this);
map = map == null ? new HashMap<>() : map;
map.putAll(convert(StringUtils.parseParameters(value), ""));
invokeSetParameters(getClass(), this, map);
}
}
}
}
}
}
public class ConfigManager {
public void refreshAll() {
getApplication().ifPresent(ApplicationConfig::refresh);
getMonitor().ifPresent(MonitorConfig::refresh);
getModule().ifPresent(ModuleConfig::refresh);
getProtocols().values().forEach(ProtocolConfig::refresh);
getRegistries().values().forEach(RegistryConfig::refresh);
getProviders().values().forEach(ProviderConfig::refresh);
getConsumers().values().forEach(ConsumerConfig::refresh);
}
}

通过loadRegistries方法获得所配置的注册中心URL可配多个配置中心且当前导出服务注册到每个配置中心去,注册中心类型地址端口配置参数等,都会存在以registry://开头的URL上。

遍历当前服务所有的ProtocolConfig,且针对每个ProtocolConfig生成一个服务名称pathKey,然后将通过ProviderModel封装服务提供者访问路径实现类接口,以及接口中的各个方法对应的ProviderMethodModel,然后将解析好的ProviderModel与服务名称映射放入PROVIDED_SERVICES。然后调用doExportUrlsFor1Protocol方法把当前服务按每个协议每个注册中心分别导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ServiceConfig<T> extends AbstractServiceConfig {
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) { // 已经导出了,就不再导出了
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true); // registryURL 表示一个注册中心
for (ProtocolConfig protocolConfig : protocols) {
// pathKey = group/contextpath/path:version
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
// ProviderModel中存在服务提供者访问路径,实现类,接口,以及接口中的各个方法对应的ProviderMethodModel,ProviderMethodModel表示某一个方法,方法名,所属的服务的,
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
// ApplicationModel表示应用中有哪些服务提供者和引用了哪些服务
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs); // 重点
}
}
}
public class ProviderModel {
private final String serviceName;
private final Object serviceInstance;
private final Class<?> serviceInterfaceClass;
private final Map<String, List<ProviderMethodModel>> methods = new HashMap<String, List<ProviderMethodModel>>();

public ProviderModel(String serviceName, Object serviceInstance, Class<?> serviceInterfaceClass) {
if (null == serviceInstance) {
throw new IllegalArgumentException("Service[" + serviceName + "]Target is NULL.");
}
this.serviceName = serviceName;
this.serviceInstance = serviceInstance;
this.serviceInterfaceClass = serviceInterfaceClass;
initMethod();
}
private void initMethod() {
Method[] methodsToExport = this.serviceInterfaceClass.getMethods();
for (Method method : methodsToExport) { // 遍历接口所有的方法
method.setAccessible(true);
// methods表示是某个方法对应的ProviderMethodModel,method.getName返回的仅仅只有方法名,不包括方法参数列表,有可能存在重载
List<ProviderMethodModel> methodModels = methods.get(method.getName());
if (methodModels == null) {
methodModels = new ArrayList<ProviderMethodModel>(1);
methods.put(method.getName(), methodModels);
}
methodModels.add(new ProviderMethodModel(method, serviceName));
}
}
}
public class ApplicationModel {
private static final ConcurrentMap<String, ProviderModel> PROVIDED_SERVICES = new ConcurrentHashMap<>();
public static void initProviderModel(String serviceName, ProviderModel providerModel) {
if (PROVIDED_SERVICES.putIfAbsent(serviceName, providerModel) != null) {
}
}
}

首先从ProtocolConfig中获取协议名称默认为dubbo协议,然后通过一系列的appendParameters方法将服务参数转存到Map中,若@Service注解中配置了methods参数,则遍历解析methods参数配置且补充到Map中。然后解析出提供的方法列表以及Token,然后通过所有的参数最终构造一个服务的URL,再根据scope执行本地导出和远程导出

把服务URL作为参数添加到registryURL中,然后把registryURL服务接口当前服务实现类ref调用ProxyFactorygetInvoker方法,使用代理生成一个当前服务接口服务提供者的代理对象Invoker,再把该代理对象Invoker当前ServiceConfig对象包装成一个DelegateProviderMetaDataInvoker对象。

最后通过具体Protocol的export方法对服务进行导出,这里的协议为RegistryProtocol,导出成功后得到一个ExporterRegistryProtocol中进行服务注册,注册完之后使用DubboProtocol进行导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
public class ServiceConfig<T> extends AbstractServiceConfig {
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName(); // protocolConfig表示某个协议,registryURLs表示所有的注册中心
if (StringUtils.isEmpty(name)) {
name = DUBBO; // 若配置的某个协议没有配置name,则默认为dubbo
}
Map<String, String> map = new HashMap<String, String>(); // 表示服务url参数
map.put(SIDE_KEY, PROVIDER_SIDE);
appendRuntimeParameters(map);
appendParameters(map, metrics); // 监控中心参数
appendParameters(map, application); // 应用相关参数
appendParameters(map, module); // 模块相关参数
appendParameters(map, provider); // 提供者相关参数
appendParameters(map, protocolConfig); // 协议相关参数
appendParameters(map, this); // 服务本身相关参数
if (CollectionUtils.isNotEmpty(methods)) { // 服务中某些方法参数:@Service(methods = {@Method(name = "say", timeout = 3000)})
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName()); // 某个方法的配置参数,注意有prefix
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) { // 如果某个方法配置存在xx.retry=false,则改成xx.retry=0
String retryValue = map.remove(retryKey);
if (Boolean.FALSE.toString().equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {// 遍历当前方法配置中的参数配置
for (ArgumentConfig argument : arguments) {
// 若配置了type,则遍历当前接口的所有方法,找到方法名和当前方法名相等的方法,可能存在多个
// 若配置了index,则看index对应位置的参数类型是否等于type,若相等,则向map中存入argument对象中的参数
// 若没有配置index,那么则遍历方法所有的参数类型,等于type则向map中存入argument对象中的参数
// 若没有配置type,但配置了index,则把对应位置的argument放入map
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
if (methods != null && methods.length > 0) { // visit all methods
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
if (methodName.equals(method.getName())) { // target the method, and get its signature
Class<?>[] argtypes = methods[i].getParameterTypes();
if (argument.getIndex() != -1) { // one callback in the method
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}

}
}
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 通过接口对应的Wrapper,拿到接口中所有的方法名字
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) { // Token是为了防止服务被消费者直接调用(伪造http请求)
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// export service通过该host和port访问该服务
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); // 服务url
// 可通过ConfiguratorFactory,对服务url再次进行配置
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY); // scope可能为null,remote, local,none

if (!SCOPE_NONE.equalsIgnoreCase(scope)) {// don't export when none is configured
// 若scope为none则不会进行任何的服务导出,既不会远程,也不会本地
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url); // 如果scope不是remote,则会进行本地导出,会把当前url的protocol改为injvm,然后进行导出
}
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {// 如果scope不是local则会进行远程导出
if (CollectionUtils.isNotEmpty(registryURLs)) {// 如果有注册中心,则将服务注册到注册中心
for (URL registryURL : registryURLs) {
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;// 如果是injvm,则不需要进行注册中心注册
}
// 该服务是否是动态,对应zookeeper上表示是否是临时节点,对应dubbo中的功能就是静态服务
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL); // 拿到监控中心地址
if (monitorUrl != null) { // 当前服务连接哪个监控中心
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
String proxy = url.getParameter(PROXY_KEY); // 服务使用的动态代理机制,如果为空则使用javassit
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 使用代理生成一个当前服务接口即服务提供者的代理对象Invoker,Invoker中包括了服务实现者、服务接口类、服务的注册地址,可以使用Invoker的invoke方法执行服务,同时此invoker也可用来导出
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// invoker.invoke(Invocation) DelegateProviderMetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用特定的协议来对服务进行导出,这里的协议为RegistryProtocol,导出成功后得到一个Exporter
// 1. 先使用RegistryProtocol进行服务注册
// 2. 注册完了之后,使用DubboProtocol进行导出
// 到此为止完成了ServiceBean.export()-->刷新ServiceBean的参数-->得到注册中心URL和协议URL-->遍历每个协议URL-->组成服务URL-->生成可执行服务Invoker-->导出服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {// 没有配置注册中心时,也会导出服务
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url); // 根据服务url,讲服务的元信息存入元数据中心
}
}
}
this.urls.add(url);
}
}

默认使用ProxyFactoryJavassistProxyFactory子类生成Invoker的代理对象,若被代理对象proxy本身就是一个已经被代理过的对象,则取代理类的Wrapper,否则取type接口的Wrapper,Wrapper是针对某个类或某个接口的包装类,通过wrapper对象可更方便的去执行某个类或某个接口的方法,最终封装为AbstractProxyInvoker对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class JavassistProxyFactory extends AbstractProxyFactory {
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 若现在被代理对象proxy本身就是一个已经被代理过的对象,则取代理类的Wrapper,否则取type接口的Wrapper
// Wrapper是针对某个类或某个接口的包装类,通过wrapper对象可以更方便的去执行某个类或某个接口的方法
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {// proxy是服务实现类 type是服务接口 url是一个注册中心url
@Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
// 执行proxy的method方法,执行的proxy实例的方法,若没有wrapper,则要通过原生的反射技术去获取Method对象,然后执行
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}

protocol是Protocol接口的一个Adaptive对象,故会根据wrapperInvokergenUrl方法得到一个url,根据此url协议找到对应RegistryProtocol扩展点,而Protocol接口有ProtocolFilterWrapperProtocolListenerWrapper两个包装类,实际在调用export方法时会经过这两个包装类的export方法。

RegistryProtocol中首先获取到服务提供者URL注册中心URL,然后给服务提供者生成并绑定监听器OverrideListener,监听动态配置中心此服务的参数数据的变化,一旦监听到变化则重写服务URL;在服务导出时先重写一次服务URL,然后通过doLocalExport调用DubboProtocol进行导出服务,导出成功后将得到一个ExporterChangeableWrapper;然后将其注册到注册中心。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
public class ProtocolListenerWrapper implements Protocol {
private final Protocol protocol;
public ProtocolListenerWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 导出了一个服务之后,调用ExporterListener
return new ListenerExporterWrapper<T>(protocol.export(invoker),
// 得到ExporterListener接口中能用的扩展点,根据url和EXPORTER_LISTENER_KEY进行筛选
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
}
public class ListenerExporterWrapper<T> implements Exporter<T> {
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
public class ProtocolFilterWrapper implements Protocol {
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 根据url获取filter,根据url中的parameters取key为key的value所对应的filter,但是还会匹配group
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {// 得到一个异步结果
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
}
public class RegistryProtocol implements Protocol {
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 导出服务registry://对应RegistryProtocol,zookeeper://对应ZookeeperRegistry,dubbo://对应DubboProtocol
URL registryUrl = getRegistryUrl(originInvoker); // registry://xxx?xx=xx&registry=zookeeper--->zookeeper://xxx?xx=xx 表示注册中心
URL providerUrl = getProviderUrl(originInvoker); // 得到服务提供者url,表示服务提供者
// overrideSubscribeUrl是老版本的动态配置监听url,表示了需要监听的服务以及监听的类型,configurators是老版本上的动态配置
// 在服务提供者url的基础上,生成一个overrideSubscribeUrl,协议为provider://,增加参数category=configurators&check=false
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 一个overrideSubscribeUrl对应一个OverrideListener,用来监听变化事件,监听到overrideSubscribeUrl的变化后,OverrideListener就会根据变化进行相应处理,具体处理逻辑看OverrideListener的实现
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 在该方法里会利用providerConfigurationListener和serviceConfigurationListener去重写providerUrl
// providerConfigurationListener表示应用级别的动态配置监听器,providerConfigurationListener是RegistyProtocol的一个属性
// serviceConfigurationListener表示服务级别的动态配置监听器,serviceConfigurationListener是在每暴露一个服务时就会生成一个
// 这两个监听器都是新版本中的监听器,新版本监听的zk路径是:
// 服务:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators节点的内容
// 应用:/dubbo/config/dubbo/dubbo-demo-provider-application.configurators节点的内容
// 注意,要和配置中心的路径区分开来,配置中心的路径是:
// 应用:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties节点的内容
// 全局:/dubbo/config/dubbo/dubbo.properties节点的内容
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// export invoker,根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
final Registry registry = getRegistry(originInvoker); // url to registry,得到注册中心-ZookeeperRegistry
// 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 将当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
//to judge if we need to delay publish 是否需要注册到注册中心
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {// 注册服务,把简化后的服务提供者url注册到registryUrl中去
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// 针对老版本的动态配置,需要把overrideSubscribeListener绑定到overrideSubscribeUrl上去进行监听,兼容老版本的配置修改,利用overrideSubscribeListener去监听旧版本的动态配置变化
// 老版本监听的zk路径是:/dubbo/org.apache.dubbo.demo.DemoService/configurators/override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_config=true&dynamic=false&enabled=true&timeout=6000
// 监听的是路径的内容,不是节点的内容
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
return new DestroyableExporter<>(exporter);//Ensure that a new exporter instance is returned every time export
}
}

通过DubboProtocol导出服务主要是启动NettyServer层层封装处理数据的RequestHandler通过url绑定端口和对应的请求处理器RequestHandler类型为ExchangeHandler,以便在接收到请求时能依次被这些RequestHandler所处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
public class RegistryProtocol implements Protocol {
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// protocol属性的值是哪来的,是在SPI中注入进来的,是一个代理类,这里实际利用的就是DubboProtocol或HttpProtocol去export,使用ExporterChangeableWrapper是为了方便注销已经被导出的服务
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
}
public class DubboProtocol extends AbstractProtocol {
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url); // export service.
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 构造一个Exporter进行服务导出
exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) { //export an stub service for dispatching event
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
} else {// 服务的stub方法
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url); // 开启NettyServer请求--->invocation--->服务key--->exporterMap.get(key)--->exporter--->invoker--->invoker.invoke(invocation)-->执行服务
optimizeSerialization(url); // 特殊的一些序列化机制,比如kryo提供了注册机制来注册类,提高序列化和反序列化的速度
return exporter;
}
private void openServer(URL url) {
String key = url.getAddress(); // find server.获得ip地址和port:192.168.40.17:20880
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);// 缓存Server对象
if (server == null) {// DCL,Double Check Lock
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url)); // 创建Server,并进行缓存
}
}
} else {// server supports reset, use together with override
server.reset(url); // 服务重新导出时,就会走这里
}
}
}
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))// enable heartbeat by default
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 协议的服务器端实现类型,如:dubbo协议的mina,netty等,http协议的jetty,servlet等,默认为netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {// 通过url绑定端口,和对应的请求处理器,requestHandler是请求处理器,类型为ExchangeHandler,表示从url的端口接收到请求后,requestHandler来进行处理
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY); // 协议的客户端实现类型,比如:dubbo协议的mina,netty等
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message; // 转成Invocation对象,要开始用反射执行方法了
Invoker<?> invoker = getInvoker(channel, inv); // 服务实现者
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 这里设置了,service中才能拿到remoteAddress
Result result = invoker.invoke(inv);// 执行服务,得到结果
return result.completionFuture().thenApply(Function.identity()); // 返回一个CompletableFuture
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message); // 这是服务端接收到Invocation时的处理逻辑
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
invoke(channel, ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
}

Netty启动

最终在Exchangers中调用HeaderExchanger的bind方法最终调用NettyTransporter去启动NettyServer,在调用NettyServer构造方法时会调用超类AbstractServer构造方法,从而调用NettyServerdoOpen启动NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class Exchangers {
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");// codec表示协议编码方式
return getExchanger(url).bind(url, handler);// 通过url得到HeaderExchanger,利用HeaderExchanger进行bind,将得到一个HeaderExchangeServer
}
}
public class HeaderExchanger implements Exchanger {
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 去启动Netty对handler包装了两层,表示当处理一个请求时,每层Handler负责不同的处理逻辑,在connect和bind时都是DecodeHandler,解码解的是把InputStream解析成RpcInvocation对象
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
public class Transporters {
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
// 如果bind了多个handler,那么当有一个连接过来时,会循环每个handler去处理连接
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);// 调用NettyTransporter去绑定,Transporter表示网络传输层
}
}
public class NettyTransporter implements Transporter {
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
}
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 设置线程名,wrap方法会返回一个MultiMessageHandler,该Handler会被设置到AbstractPeer的handler属性上
// 当netty接收到数据时,会调用AbstractPeer的handler属性的received方法,所以MultiMessageHandler就是负责处理请求
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// boss线程,主要监听端口和分配socketChannel给worker线程
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
// worker线程负责数据读写
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
// iothreads就是读写数据的线程
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
// 连接处理器,建立连接、连接断开、接收到数据、返回数据的逻辑都在这个Handler里面,this表示的是NettyServer,在它的父类AbstractServer
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("backlog", getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
}
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
}

服务注册

首先从originInvoker中获取注册中心的实现类ZookeeperRegistry,将重写后的服务URL简化,把不用存到注册中心去的参数去除,将简化后的服务URL调用ZookeeperRegistry.registry()方法注册到注册中心去,首先会先调用ZookeeperRegistry超类FailbackRegistryregister方法,然后调用ZookeeperRegistrydoRegister正则将服务注册到Zookeeper上,最后将ExporterChangeableWrapper封装为DestroyableExporter对象返回完成服务导出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RegistryProtocol implements Protocol {
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl); // 调用ZookeeperRegistry的register方法
}
}
public abstract class FailbackRegistry extends AbstractRegistry {
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
}
public class ZookeeperRegistry extends FailbackRegistry {
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}

Exporter架构

一个服务导出成功后,会生成对应的Exporter,Exporter嵌套结构由外到内依次为:

  • DestroyableExporter:Exporter的最外层包装类,该类的主要作用是可用来unexporter对应的服务
  • ExporterChangeableWrapper:该类主要负责在unexport对应服务之前把服务URL从注册中心中移除,且把该服务对应的动态配置监听器移除
  • ListenerExporterWrapper:该类主要负责在unexport对应服务之后,把服务导出监听器移除
  • DubboExporter:该类中保存了对应服务的Invoker对象当前服务唯一标志,当NettyServer接收到请求后,会根据请求中的服务信息,找到服务对应的DubboExporter对象,然后从对象中得到Invoker对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
private static class DestroyableExporter<T> implements Exporter<T> {
private Exporter<T> exporter;
public DestroyableExporter(Exporter<T> exporter) {
this.exporter = exporter;
}
@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
@Override
public void unexport() {
exporter.unexport();
}
}
private class ExporterChangeableWrapper<T> implements Exporter<T> {
public void unexport() {
String key = getCacheKey(this.originInvoker);
bounds.remove(key);
Registry registry = RegistryProtocol.INSTANCE.getRegistry(originInvoker); // 从注册中心删除服务URL
try {
registry.unregister(registerUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {// 解绑当前服务的Listener
NotifyListener listener = RegistryProtocol.INSTANCE.overrideListeners.remove(subscribeUrl);
registry.unsubscribe(subscribeUrl, listener);
DynamicConfiguration.getDynamicConfiguration().removeListener(subscribeUrl.getServiceKey() + CONFIGURATORS_SUFFIX, serviceConfigurationListeners.get(subscribeUrl.getServiceKey()));
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
executor.submit(() -> {
try {
int timeout = ConfigurationUtils.getServerShutdownTimeout();
if (timeout > 0) {
Thread.sleep(timeout);
}
exporter.unexport();
} catch (Throwable t) {
}
});
}
}
public class ListenerExporterWrapper<T> implements Exporter<T> {
public void unexport() {
try {
exporter.unexport();
} finally {
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.unexported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
}
public class DubboExporter<T> extends AbstractExporter<T> {
private final String key;
private final Map<String, Exporter<?>> exporterMap;
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
}
@Override
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
}

Invoker架构

Invoker嵌套结构由外到内依次为:

  • ProtocolFilterWrapper$CallbackRegistrationInvoker:会去调用下层Invoker,下层Invoker执行完之后会遍历过滤器,查看是否有过滤器实现了ListenableFilter接口,若有则回调对应onResponse方法如TimeoutFilter,当调用完下层Invoker后会计算服务执行时间
  • ProtocolFilterWrapper$1:ProtocolFilterWrapper中的过滤器组成的Invoker,利用该Invoker可执行服务端的过滤器,执行完过滤器之后,调用下层Invoker
  • RegistryProtocol$InvokerDelegate:服务委托类,包含了DelegateProviderMetaDataInvoker对象和服务对应的providerUrl,执行时直接调用下层Invoker
  • DelegateProviderMetaDataInvoker:服务委托类,包含了AbstractProxyInvoker对象和ServiceConfig对象,执行时直接调用下层Invoker
  • AbstractProxyInvoker:服务接口代理类,绑定了对应的实现类,执行时会利用反射调用服务实现类实例的具体方法并得到结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public class ProtocolFilterWrapper implements Protocol {
static class CallbackRegistrationInvoker<T> implements Invoker<T> {
private final Invoker<T> filterInvoker;
private final List<Filter> filters;
public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
this.filterInvoker = filterInvoker;
this.filters = filters;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation); // 执行过滤器链
// 过滤器都执行完了之后,回调每个ListenableFilter过滤器的onResponse或onError方法
asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
if (filter instanceof ListenableFilter) { // onResponse callback
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
});
return asyncResult;
}
}
}
public class ProtocolFilterWrapper implements Protocol {
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 根据url获取filter,根据url中的parameters取key为key的value所对应的filter,但是还会匹配group
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {// 得到一个异步结果
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) { // onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
}
public static class InvokerDelegate<T> extends InvokerWrapper<T> {
private final Invoker<T> invoker;
public InvokerDelegate(Invoker<T> invoker, URL url) {
super(invoker, url);
this.invoker = invoker;
}
public Invoker<T> getInvoker() {
if (invoker instanceof InvokerDelegate) {
return ((InvokerDelegate<T>) invoker).getInvoker();
} else {
return invoker;
}
}
}
public class DelegateProviderMetaDataInvoker<T> implements Invoker {
protected final Invoker<T> invoker;
private ServiceConfig metadata;
public DelegateProviderMetaDataInvoker(Invoker<T> invoker, ServiceConfig metadata) {
this.invoker = invoker;
this.metadata = metadata;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
public ServiceConfig getMetadata() {
return metadata;
}
}
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
private final T proxy;
private final Class<T> type;
private final URL url;
public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
if (proxy == null) {
throw new IllegalArgumentException("proxy == null");
}
if (type == null) {
throw new IllegalArgumentException("interface == null");
}
if (!type.isInstance(proxy)) {
throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
}
this.proxy = proxy;
this.type = type;
this.url = url;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
// 执行服务,得到一个接口,可能是一个CompletableFuture(表示异步调用),可能是一个正常的服务执行结果(同步调用)
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation); // 将同步调用的服务执行结果封装为CompletableFuture类型
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); // 异步RPC结果
future.whenComplete((obj, t) -> { //设置一个回调,若是异步调用,则服务执行完成后将执行这里的回调
// 当服务执行完后,将结果或异常设置到AsyncRpcResult中,若是异步服务,则服务之后的异常会在此处封装到AppResponse中然后返回,若是同步服务出异常了,则会在下面将异常封装到AsyncRpcResult中
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
asyncRpcResult.complete(result); // 将服务执行完之后的结果设置到异步RPC结果对象中
});
return asyncRpcResult;// 返回异步RPC结果
} catch (InvocationTargetException e) {// 假设抛的NullPointException,那么会把这个异常包装为一个Result对象
// 同步服务执行时如何出异常了,会在此处将异常信息封装为一个AsyncRpcResult然后返回
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {// 执行服务后的所有异常都会包装为RpcException进行抛出
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private CompletableFuture<Object> wrapWithFuture(Object value, Invocation invocation) {
if (RpcContext.getContext().isAsyncStarted()) {
return ((AsyncContextImpl) (RpcContext.getContext().getAsyncContext())).getInternalFuture();
} else if (value instanceof CompletableFuture) {
return (CompletableFuture<Object>) value;
}
return CompletableFuture.completedFuture(value);
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}

服务监听器

服务在导出的过程中需要向动态配置中心的数据进行订阅,以便当管理人员修改了动态配置中心中对应服务的参数后,服务提供者能及时做出变化。Dubbo2.7之前仅支持对某个服务动态配置,Dubbo2.7之后不仅支持对单个服务动态配置,也支持对某个应用动态配置,是利用ZookeeperWatcher机制

在服务提供者端给当前服务生成一个对应监听器实例OverrideListener,它负责监听对应服务的动态配置变化,且根据动态配置中心的参数重写服务URL

  • ProviderConfigurationListener:监听应用动态配置数据修改,是在RegistryProtocol类中的一个属性,且是随着RegistryProtocol实例化而实例化一个应用中只有一个
  • ServiceConfigurationListener:监听服务动态配置数据修改和OverrideListener类似,在每个服务进行导出时都会生成一个,实际上其内部有一个OverrideListener属性,当其监听数据发生变化时把配置中心最新数据交给OverrideListener去重写服务URL

同时在RegistryProtocol类中保存了所有服务所对应的OverrideListener,当ProviderConfigurationListener监听到数据发生变化时,会把它所得到的最新数据依次调用每个OverrideListener去重写服务对应的服务URL。

ProviderConfigurationListener会监听/dubbo/config/dubbo/应用.configurators节点,ServiceConfigurationListener会监听/dubbo/config/dubbo/服务.configurators节点。

修改服务动态配置,底层会修改Zookeeper中的数据,ServiceConfigurationListener监听到节点内容变化触发ServiceConfigurationListener的父类AbstractConfiguratorListener的process(ConfigChangeEvent event)方法,ConfigChangeEvent表示一个事件,事件中有事件类型ADDEDMODIFIEDDELETED,还有事件内容即节点内容,以及触发该事件的节点名字,事件类型有三个,当接收到一个ConfigChangeEvent事件后,会根据事件类型做对应的处理

  • ADDED、MODIFIED:根据节点内容去生成override://协议的URL,然后根据URL去生成Configurator配置器,根据配置器可去重写URL
  • DELETED:删除ServiceConfigurationListener内所有Configurator配置器

生成了Configurator后,调用notifyOverrides()方法对服务URL进行重写,每次重写并不仅仅只是用到上面所生成的Configurator,而是包括本服务的Configurator,也包括本应用的Configurator,也包括老版本管理台的Configurator,重写URL的逻辑如下:

从exporter中获取目前已经导出了的服务URL即currentUrl,根据老版本管理台的Configurator重写服务URL,根据providerConfigurationListener中的Configurator重写服务URL,根据serviceConfigurationListeners中对应的服务的Configurator重写服务URL。

若重写之后newUrl和currentUrl不相等,则需要进行服务重新导出,根据newUrl调用DubboProtocol的export进行导出,再次启动NettyServer,将newUrl进行简化为registeredProviderUrl,调用RegistryProtocol的unregister()方法,把当前服务之前的服务提供URL从注册中心删掉,调用RegistryProtocol的register()方法,把新的registeredProviderUrl注册到注册中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
private class OverrideListener implements NotifyListener {// 当subscribeUrl对应的数据发生了改变,OverrideListener将收到通知
private final URL subscribeUrl;
private final Invoker originInvoker;
private List<Configurator> configurators;
public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
this.subscribeUrl = subscribeUrl;
this.originInvoker = originalInvoker;
}
@Override
public synchronized void notify(List<URL> urls) {
List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY, CONFIGURATORS_CATEGORY));
if (matchedUrls.isEmpty()) { // No matching results
return;
}
// 对发生了变化的url进行过滤,只取url是override协议,或者参数category等于configurators的url
this.configurators = Configurator.toConfigurators(classifyUrls(matchedUrls, UrlUtils::isConfigurator)).orElse(configurators);
doOverrideIfNecessary(); // 根据Override协议修改
}
public synchronized void doOverrideIfNecessary() {
final Invoker<?> invoker;
if (originInvoker instanceof InvokerDelegate) {
invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
} else {
invoker = originInvoker;
}
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker); //The origin invoker 当前服务的原始服务提供者url
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<?> exporter = bounds.get(key);
if (exporter == null) {
return;
}
//The current, may have been merged many times,当前服务被导出的url
URL currentUrl = exporter.getInvoker().getUrl();
//根据configurators修改url,configurators是全量的,并不是某个新增的或删除的,所以是基于原始的url进行修改,并不是基于currentUrl
//Merged with this configuration
URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey()).getConfigurators(), newUrl);
if (!currentUrl.equals(newUrl)) { // 修改过的url如果和目前的url不相同,则重新按newUrl导出
RegistryProtocol.this.reExport(originInvoker, newUrl);
}
}
private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) {
List<URL> result = new ArrayList<URL>();
for (URL url : configuratorUrls) {
URL overrideUrl = url;
// Compatible with the old version
if (url.getParameter(CATEGORY_KEY) == null && OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
overrideUrl = url.addParameter(CATEGORY_KEY, CONFIGURATORS_CATEGORY);
}
// Check whether url is to be applied to the current service
if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) {
result.add(url);
}
}
return result;
}
}
private class ServiceConfigurationListener extends AbstractConfiguratorListener {
private URL providerUrl;
private OverrideListener notifyListener;
public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
this.providerUrl = providerUrl;
this.notifyListener = notifyListener;
this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX); // 订阅 服务接口名+group+version+".configurators"
}
private <T> URL overrideUrl(URL providerUrl) {
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
notifyListener.doOverrideIfNecessary();
}
}
private class ProviderConfigurationListener extends AbstractConfiguratorListener {
public ProviderConfigurationListener() {// // 订阅 应用名+".configurators"
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
private <T> URL overrideUrl(URL providerUrl) {// 通过configurators去修改/装配providerUrl
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
}
}