Dubbo服务调用

客户端

客户端调用Dubbo方法时首先通过InvokerInvocationHandler调用MockClusterInvoker首先判断Mock逻辑,若未配置Mock则直接往下调用,若配置了force强制Mock,则直接本地通过配置构造返回结果,若配置了Mock则调用远程服务失败才通过doMockInvoke走本地Mock逻辑,且在走本地Mock逻辑时会先判断异常是否为业务异常,若为业务异常直接抛出异常

然后通过AbstractClusterInvoker调用Directorylist方法最终调用RegistryDirectorydoList方法执行路由链依次执行MockInvokersSelectorTagRouterAppRouterServiceRouterroute方法过滤出符合路由条件的invokers。然后通过InvokerURL信息通过SPI机制获取负载均衡器,默认为RandomLoadBalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public class InvokerInvocationHandler implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// recreate方法会调用AppResponse的recreate方法,若AppResponse对象中存在exception信息,则此方法中会throw该异常
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
public class AppResponse extends AbstractResult implements Serializable {
public Object recreate() throws Throwable {
if (exception != null) {
try {// get Throwable class
Class clazz = exception.getClass();
while (!clazz.getName().equals(Throwable.class.getName())) {
clazz = clazz.getSuperclass();
}
Field stackTraceField = clazz.getDeclaredField("stackTrace"); // get stackTrace value
stackTraceField.setAccessible(true);
Object stackTrace = stackTraceField.get(exception);
if (stackTrace == null) {
exception.setStackTrace(new StackTraceElement[0]);
}
} catch (Exception e) {// ignore
}
throw exception;
}
return result;
}
}
public class MockClusterInvoker<T> implements Invoker<T> {
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
result = this.invoker.invoke(invocation); // 未配置Mock
} else if (value.startsWith("force")) { // 若强制走Mock
result = doMockInvoke(invocation, null); //force:direct mock
} else {// 若配置了Mock,只有调用远程服务失败才通过doMockInvoke走Mock逻辑
try {
result = this.invoker.invoke(invocation);
if (result.getException() != null && result.getException() instanceof RpcException) { // 若直接返回的是一个异常对象
RpcException rpcException = (RpcException) result.getException();
if (rpcException.isBiz()) { // 若为业务异常直接抛出异常不走Mock逻辑
throw rpcException;
} else {
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) { // 若为业务异常直接抛出异常不走Mock逻辑
throw e;
}
result = doMockInvoke(invocation, e); // 返回Mock值
}
}
return result;
}
}
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation); // 先路由过滤出符合路由条件的invokers
LoadBalance loadbalance = initLoadBalance(invokers, invocation); // 通过Invoker的URL信息通过SPI机制获取负载均衡器
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}
}
public abstract class AbstractDirectory<T> implements Directory<T> {
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
return doList(invocation);
}
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {// 无服务提供者或服务提供者被禁用
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
if (multiGroup) {
return this.invokers == null ? Collections.emptyList() : this.invokers;
}
List<Invoker<T>> invokers = null;
try {// 执行路由过滤器链
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
}
return invokers == null ? Collections.emptyList() : invokers;
}
}
public class RouterChain<T> {
public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) { // 使用路由对服务提供者进行过滤
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
}

首先调用FailoverClusterInvokerdoInvoke,首先获取重试次数,若重试次数小于等于0,则将重试次数置为1,然后遍历重试次数,调用负载均衡策略选择具体的Invoker。若选出的invokerselected中或invoker不可用且availablecheck为真则重新选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) { // 若重试次数小于等于0,则将重试次数置为1
len = 1;
}
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); // 调用负载均衡策略选择具体的Invoker
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName());
}
}
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null; // 忽略重载方法
}
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) { // 若只有一个则直接返回
return invokers.get(0);
}
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); // 调用具体负载均衡器的方法
if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try { // 若选出的invoker在selected中或invoker不可用&&availablecheck为真,则重新选择
Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rInvoker != null) {
invoker = rInvoker;
} else { // 查看当前选中的调用者的索引,若不是最后一个,选择索引+1的那个
int index = invokers.indexOf(invoker);
try {//Avoid collision 避免碰撞
invoker = invokers.get((index + 1) % invokers.size());
} catch (Exception e) {
}
}
} catch (Throwable t) {
}
}
return invoker;
}
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
List<Invoker<T>> reselectInvokers = new ArrayList<>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
for (Invoker<T> invoker : invokers) { // 尝试选择不在selected中的调用者
if (availablecheck && !invoker.isAvailable()) {
continue;
}
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
if (selected != null) { // 使用负载平衡策略选择一个可用的调用程序
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) && !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
return null;
}
}

选出具体的Invoker后通过InvokerWrapper然后调用ListenerInvokerWrapper,从而调用ProtocolFilterWrapperCallbackRegistrationInvoker的invoke方法,首先调用buildInvokerChain中构造的过滤器链。首先执行ConsumerContextFilter设置RpcContext参数,然后执行FutureFilter若当前方法为回调方法则执行回调方法,调用MonitorFilter,最终通过AsyncToSyncInvoker掉到DubboInvoker。若请求是同步请求会在AsyncToSyncInvoker异步转同步同步获取请求结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
public class InvokerWrapper<T> implements Invoker<T> {
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
}
public class ListenerInvokerWrapper<T> implements Invoker<T> {
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation); // AsyncToSyncInvoker
}
}
public class ProtocolFilterWrapper implements Protocol {
static class CallbackRegistrationInvoker<T> implements Invoker<T> {
private final Invoker<T> filterInvoker;
private final List<Filter> filters;
public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
this.filterInvoker = filterInvoker;
this.filters = filters;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation); // 执行过滤器链
// 过滤器都执行完了之后,回调每个ListenableFilter过滤器的onResponse或onError方法
asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
if (filter instanceof ListenableFilter) { // onResponse callback
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
});
return asyncResult;
}
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 根据url获取filter,根据url中的parameters取key为key的value所对应的filter,但是还会匹配group
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {// 得到一个异步结果
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
}
public class ConsumerContextFilter extends ListenableFilter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 设置RpcContext参数
RpcContext.getContext().setInvoker(invoker).setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort())
.setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY))
.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY));
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
RpcContext.removeServerContext();
return invoker.invoke(invocation);
} finally {
RpcContext.removeContext();
}
}
}
public class FutureFilter extends ListenableFilter {
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
fireInvokeCallback(invoker, invocation);
return invoker.invoke(invocation);
}
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {// 当前调用的方法是不是有callback
final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
if (asyncMethodInfo == null) {
return;
}
final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();
final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
}
public class MonitorFilter extends ListenableFilter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
getConcurrent(invoker, invocation).incrementAndGet(); // 方法的执行次数+1
}
return invoker.invoke(invocation); // proceed invocation chain
}
}
public class AsyncToSyncInvoker<T> implements Invoker<T> {
public Result invoke(Invocation invocation) throws RpcException {// 异步转同步
Result asyncResult = invoker.invoke(invocation); // AsyncRpcResult--->CompletableFuture--->DefaultFuure
try {// 如果invocation指定是同步的,则阻塞等待结果
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (t instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
return asyncResult;
}
}
public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
invocation.addAttachments(contextAttachments);
}
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
}
} catch (RpcException e) {
if (e.isBiz()) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
}
}

最终请求发送到服务端是通过DubboInvokerdoInvoke方法完成的,一个DubboInvoker对象可能并发同时调用某个服务,故单独一次调用都需要一个单独client去发送请求。然后依次调用ReferenceCountExchangeClientHeaderExchangeClientHeaderExchangeChannel中会构造一个Request对象且会构造一个DefaultFuture对象来阻塞timeout等待结果,在构造DefaultFuture对象时会把DefaultFuture对象和reqid存入FUTURES中,当HeaderExchangeHandler接收到结果时,会从FUTURES中根据id获取到DefaultFuture对象,然后返回Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
public class DubboInvoker<T> extends AbstractInvoker<T> {
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 一个DubboInvoker对象可能并发同时调用某个服务,故单独一次调用都需要一个单独client去发送请求,这里会去选择使用本次调用该使用哪个client
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {// 轮询使用clients
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // isOneway为true,表示请求不需要拿结果
// 拿当前方法的所配置的超时时间,默认为1000,即1秒
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) { // 若不需要获取请求结果
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent); // 通过NettyClient发送请求
return AsyncRpcResult.newDefaultAsyncResult(invocation); // 生成一个默认的值的结果,value=null
} else { // 需要获取请求结果
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); // 异步去请求,得到一个CompletableFuture
// responseFuture会完成后会调用asyncRpcResult中的方法,这里并不会阻塞,若要达到阻塞的效果在外层使用asyncRpcResult去控制
asyncRpcResult.subscribeTo(responseFuture);
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
final class ReferenceCountExchangeClient implements ExchangeClient {
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
return client.request(request, timeout);
}
}
public class HeaderExchangeClient implements ExchangeClient {
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
return channel.request(request, timeout);
}
}
final class HeaderExchangeChannel implements ExchangeChannel {
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
Request req = new Request(); // create request.
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
public class DefaultFuture extends CompletableFuture<Object> {
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
FUTURES.put(id, this); // put into waiting map.
CHANNELS.put(id, channel);
}
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
timeoutCheck(future); // timeout check
return future;
}
public static void received(Channel channel, Response response, boolean timeout) {
try {// response的id,
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {// decrease Time
t.cancel();
}
future.doReceived(response);
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
private static void timeoutCheck(DefaultFuture future) {
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}
private static class TimeoutCheckTask implements TimerTask {
public void run(Timeout timeout) {
DefaultFuture future = DefaultFuture.getFuture(requestID);
if (future == null || future.isDone()) {
return;
}
Response timeoutResponse = new Response(future.getId()); // create exception response.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); // set timeout status.
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
DefaultFuture.received(future.getChannel(), timeoutResponse, true); // handle response.
}
}
}
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
public void send(Object message, boolean sent) throws RemotingException {
if (needReconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
}
final class NettyChannel extends AbstractChannel {
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent); // whether the channel is closed
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
// sent为true等待消息发出,消息发送失败将抛出异常。sent为false不等待消息发出,将消息放入IO队列即刻返回
if (sent) { // wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout); // await会阻塞得到结果,这就是消费者端的timeout
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit");
}
}
}

处理响应结果

客户端收到服务端响应数据结果处理的入口为NettyClientHandler,后续处理逻辑和服务端收到客户端请求处理一样同样经过MultiMessageHandlerHeartbeatHandler最终通过ChannelEventRunnable异步处理任务,然后同样是调用DecodeHandlerreceived方法,只不过这里的message是Response,和服务端一样同样调用HeaderExchangeHandler,但走的是处理Response的逻辑handleResponse,从而在AsyncToSyncInvoker就能获取到结果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class NettyClientHandler extends ChannelDuplexHandler {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
public class DefaultFuture extends CompletableFuture<Object> {
public static void received(Channel channel, Response response) {
received(channel, response, false);
}
public static void received(Channel channel, Response response, boolean timeout) {
try {// response的id,
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {// decrease Time
t.cancel();
}
future.doReceived(response);
}
} finally {
CHANNELS.remove(response.getId());
}
}
}

服务端

服务端是接收数据的入口为NettyServerHandler,然后调用MultiMessageHandler判断接收到数据是否为MultiMessage,若是则获取MultiMessage单个Message传递给HeartbeatHandler进行处理。HeartbeatHandler判断否心跳消息,若不是则把Message传递给AllChannelHandlerAllChannelHandler将接收到的Message封装为一个ChannelEventRunnable对象通过线程池异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class NettyServerHandler extends ChannelDuplexHandler {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); // 接收到数据
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
}
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
}
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) { // 若是一个心跳请求
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
}
return;
}
if (isHeartbeatResponse(message)) {
return; // 若是一个心跳响应直接返回
}
handler.received(channel, message);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
try { // 交给线程池去处理message
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}

run方法中会调用DecodeHandlerDubbo协议的数据格式,解析当前请求Messagepathversion方法方法参数等,然后把解析好的请求交给HeaderExchangeHandler处理Request数据,首先构造一个Response对象,然后调用ExchangeHandlerAdapter得到一个CompletionStage,然后给future通过whenComplete绑定一个回调函数,当future执行完后可从回调函数中得到ExchangeHandlerAdapter执行结果,并把执行结果设置给Response对象,通过channel发送出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class ChannelEventRunnable implements Runnable {
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {}
break;
default:
}
}
}
}
public class DecodeHandler extends AbstractChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
private void decode(Object message) {
if (message instanceof Decodeable) {
try {
((Decodeable) message).decode();
} catch (Throwable e) {} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
}
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {// 如果是双向通行,则需要返回调用结果
handleRequest(exchangeChannel, request);
} else {// 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {// 客户端接收到服务响应结果
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion()); // 请求id,请求版本
if (req.isBroken()) {// 请求处理失败
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST); // 设置 BAD_REQUEST 状态
channel.send(res);
return;
}
// find handler by message class. 获取data字段值,也就是RpcInvocation对象,表示请求内容
Object msg = req.getData();
try {// 继续向下调用,分异步调用和同步调用,若是同步则会阻塞,若是异步则不会阻塞
CompletionStage<Object> future = handler.reply(channel, msg); // 异步执行服务
// 若是同步调用则直接拿到结果,并发送到channel中去,若是异步调用则会监听,直到拿到服务执行结果,然后发送到channel中去
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {// 服务执行过程中出现了异常,则把Throwable转成字符串,发送给channel中,也就是发送给客户端
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
}

从本机已导出Exporter中根据当前Request所对应的服务key获取Exporter对象,从Exporter中得到Invoker然后执行invoke方法,此Invoker为ProtocolFilterWrapper$CallbackRegistrationInvoker。完成执行过滤器链,且在执行完后回调每个过滤器的onResponseonError方法:

  • EchoFilter:断当前请求是否为回声测试,若是则不继续执行过滤器链
  • ClassLoaderFilter设置当前线程classloader为当前要执行服务接口所对应的classloader
  • GenericFilter:把泛化调用发送过来的信息包装为RpcInvocation对象
  • ContextFilter设置RpcContext.getContext参数
  • TraceFilter:先执行下一个invoker的invoke方法,调用成功后录调用信息
  • TimeoutFilter:调用时没有特别处理只记录一下当前时间,当整个filter链执行完后回调TimeoutFilter的onResponse方法判断本次调用是否超过timeout
  • MonitorFilter记录当前服务的执行次数
  • ExceptionFilter:调用时没有特别处理,回调onResponse方法时对不同异常进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message; // 转成Invocation对象,要开始用反射执行方法了
Invoker<?> invoker = getInvoker(channel, inv); // 服务实现者
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 这里设置了,service中才能拿到remoteAddress
Result result = invoker.invoke(inv);// 执行服务,得到结果
return result.completionFuture().thenApply(Function.identity()); // 返回一个CompletableFuture
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message); // 这是服务端接收到Invocation时的处理逻辑
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
invoke(channel, ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(PATH_KEY);
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; //callback
if (isCallBackServiceInvoke) {
path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
// 从请求中拿到serviceKey,从exporterMap中拿到已经导出了的服务
String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}
return exporter.getInvoker(); // 拿到服务对应的Invoker
}
}
public class ProtocolFilterWrapper implements Protocol {
static class CallbackRegistrationInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation); // 执行过滤器链
// 过滤器都执行完了之后,回调每个ListenableFilter过滤器的onResponse或onError方法
asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
if (filter instanceof ListenableFilter) { // onResponse callback
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
});
return asyncResult;
}
}
}

执行完过滤器链后,通过InvokerWrapperDelegateProviderMetaDataInvoker调用AbstractProxyInvoker,在服务导出时根据服务接口服务实现类对象生成,其invoke方法调用JavassistProxyFactorydoInvoke方法真正执行服务实现类对象方法得到结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class InvokerWrapper<T> implements Invoker<T> {
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
}
public class DelegateProviderMetaDataInvoker<T> implements Invoker {
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
}
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
public Result invoke(Invocation invocation) throws RpcException {
try { // 执行服务,得到一个接口,可能是一个CompletableFuture(表示异步调用),可能是一个正常的服务执行结果(同步调用)
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation); // 将同步调用的服务执行结果封装为CompletableFuture类型
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); // 异步RPC结果
future.whenComplete((obj, t) -> { //设置一个回调,若是异步调用,则服务执行完成后将执行这里的回调
// 当服务执行完后,将结果或异常设置到AsyncRpcResult中,若是异步服务,则服务之后的异常会在此处封装到AppResponse中然后返回,若是同步服务出异常了,则会在下面将异常封装到AsyncRpcResult中
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
asyncRpcResult.complete(result); // 将服务执行完之后的结果设置到异步RPC结果对象中
});
return asyncRpcResult;// 返回异步RPC结果
} catch (InvocationTargetException e) {// 假设抛的NullPointException,那么会把这个异常包装为一个Result对象
// 同步服务执行时如何出异常了,会在此处将异常信息封装为一个AsyncRpcResult然后返回
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {// 执行服务后的所有异常都会包装为RpcException进行抛出
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 若现在被代理对象proxy本身就是一个已经被代理过的对象,则取代理类的Wrapper,否则取type接口的Wrapper
// Wrapper是针对某个类或某个接口的包装类,通过wrapper对象可以更方便的去执行某个类或某个接口的方法
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {// proxy是服务实现类 type是服务接口 url是一个注册中心url
@Override
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
// 执行proxy的method方法,执行的proxy实例的方法,若没有wrapper,则要通过原生的反射技术去获取Method对象,然后执行
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}

异常处理

当服务消费者在调用一个服务时,服务提供者在执行服务逻辑时可能会出现异常,服务消费者需要在消费端抛出该异常。

服务提供者在执行服务时若出现了异常,框架会把异常捕获,捕获异常的逻辑在AbstractProxyInvoker中,捕获到异常后,把异常信息包装为正常的AppResponse对象,其value属性为null,exception属性有值。

然后服务提供者会把该AppResponse对象发送给服务消费端,服务消费端是在InvokerInvocationHandler中调用AppResponserecreate方法重新得到一个结果,在recreate方法中会去判断AppResponse对象是否正常,即是否存在exception信息,若存在则直接throw该exception,从而做到服务执行时出现的异常,在服务消费端抛出

若服务提供者抛出的异常类消费者端不存在,消费者也就抛不出该异常,则需要服务提供者端ExceptionFilter过滤器,其主要是在服务提供者执行完服务后会去识别异常:

  • 若为需要开发人员捕获的异常,则忽略直接把该异常返回给消费者
  • 若当前所执行的方法签名上有声明,则忽略直接把该异常返回给消费者
  • 抛出异常不需要开发人员捕获,或方法上没有申明,则服务端记录一个error日志
  • 异常类和接口类在同一Jar包里,则忽略直接把该异常返回给消费者
  • 异常类是JDK自带异常,则忽略直接把该异常返回给消费者
  • 异常类是Dubbo自带异常,则忽略直接把该异常返回给消费者
  • 若不是以上情况,则把异常信息包装成RuntimeException,并覆盖AppResponse对象中的exception属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ExceptionFilter extends ListenableFilter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
static class ExceptionListener implements Listener {
private Logger logger = LoggerFactory.getLogger(ExceptionListener.class);
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return; // 如果是checked异常,直接抛出
}
try { // 在方法签名上有声明,直接抛出
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return;
}
}
} catch (NoSuchMethodException e) {
return;
}
// 未在方法签名上定义的异常,在服务器端打印ERROR日志
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return; // 异常类和接口类在同一jar包里,直接抛出
}
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return; // 是JDK自带的异常,直接抛出
}
if (exception instanceof RpcException) {
return; // 是Dubbo本身的异常,直接抛出
}
appResponse.setException(new RuntimeException(StringUtils.toString(exception))); // 否则,包装成RuntimeException抛给客户端
return;
} catch (Throwable e) {
return;
}
}
}
}
}