Dubbo基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Spring boot application
spring.application.name=dubbo-provider-demo
server.port=8081

# Base packages to scan Dubbo Component: @org.apache.dubbo.config.annotation.Service
dubbo.scan.base-packages=com.eleven.icode.service
dubbo.application.name=${spring.application.name}

## Dubbo Registry
dubbo.registry.address=zookeeper://127.0.0.1:2181

dubbo.protocols.p1.id=dubbo1
dubbo.protocols.p1.name=dubbo
dubbo.protocols.p1.port=20881
dubbo.protocols.p1.host=0.0.0.0

dubbo.protocols.p2.id=dubbo2
dubbo.protocols.p2.name=dubbo
dubbo.protocols.p2.port=20882
dubbo.protocols.p2.host=0.0.0.0

dubbo.protocols.p3.id=dubbo3
dubbo.protocols.p3.name=dubbo
dubbo.protocols.p3.port=20883
dubbo.protocols.p3.host=0.0.0.0

负载均衡

在集群负载均衡时Dubbo提供了RandomRoundRobinLeastActiveConsistentHash四种均衡策略缺省为random随机调用。若消费端服务端都配置了负载均衡策略以消费端为准

  • Random随机,按权重设置随机概率,在一个截面上碰撞的概率高,但调用量越大分布越均匀,且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
  • RoundRobin轮询,按公约后的权重设置轮询比率,存在慢提供者累积请求问题
  • LeastActive基于消费端最少活跃调用数相同活跃数的随机,活跃数指调用前后计数差,使慢的提供者收到更少请求,越慢的提供者的调用前后计数差会越大
  • ConsistentHash:相同参数的请求总是发到同一提供者
1
2
3
4
5
6
7
8
9
10
11
12
13
// 服务端
@Service(version = "default")
public class DefaultDemoService implements DemoService {
@Override
public String sayHello(String name) {
System.out.println("执行了服务" + name);
URL url = RpcContext.getContext().getUrl();
return String.format("%s:%s, Hello, %s", url.getProtocol(), url.getPort(), name);
}
}
// 消费端:random、roundrobin、leastactive、consistenthash
@Reference(version = "default", loadbalance = "leastactive")
private DemoService demoService;

服务超时

在服务提供者和服务消费者上都可配置服务超时时间,若在服务端和消费端只在其中一方配置了timeout,则表示消费端调用服务的超时时间,消费端若超过时间没有收到响应结果,则消费端会抛超时异常,但服务端不会抛异常,服务端在执行服务后,会检查执行该服务的时间,若超过timeout,则会打印一个超时日志,服务会正常执行完。且若请求失败默认重试两次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 服务端
@Service(version = "timeout", timeout = 6000)
public class TimeoutDemoService implements DemoService {
@Override
public String sayHello(String name) {
System.out.println("执行了timeout服务" + name);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行结束" + name);
URL url = RpcContext.getContext().getUrl();
return String.format("%s:%s, Hello, %s", url.getProtocol(), url.getPort(), name);
}
}
// 消费端:请求失败默认重试2次
@Reference(version = "timeout", timeout = 6000)
private DemoService demoService;

集群容错

  • InvokerProvider的一个可调用Service的抽象,封装了Provider地址及Service接口信息
  • Directory:代表多个Invoker,可把它看成List<Invoker> ,但与List不同的是,它的值可能是动态变化的,如注册中心推送变更
  • Cluster:将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
  • Router:负责从多个Invoker中按路由规则选出子集,比如读写分离,应用隔离等
  • LoadBalance:负责从多个Invoker中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后需要重选

集群容错表示服务消费者在调用某个服务时,该服务有多个服务提供者,在经过负载均衡后选出其中一个服务提供者之后进行调用,调用报错后Dubbo所采取的后续处理策略。在集群调用失败时,Dubbo提供多种容错方案,缺省为failover重试

  • failover失败自动切换,当出现失败重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过retries=2来设置重试次数,不含第一次
  • failfast快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作,比如新增记录
  • failsafe失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作
  • failback失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作,默认三次
  • forking:并行调用多个服务器,只要一个成功即返回,通常用于实时性要求较高的读操作,但需浪费更多服务资源,可通过forks=2来设置最大并行数
  • broadcast:广播逐个调用所有提供者,任意一台报错则报错,通常用于通知所有提供者更新缓存或日志等本地资源信息
1
2
@Reference(version = "timeout", timeout = 1000, cluster = "failback")
private DemoService demoService;

服务降级

通过服务降级功能临时屏蔽某个出错的非关键服务,并定义降级后的返回策略,与集群容错的区别在于,集群容错是整个集群范围内的容错服务降级是单个服务提供者的自身容错

  • mock=force:return+null:表示消费方对该服务的方法调用都直接返回null不发起远程调用,用来屏蔽不重要服务不可用时对调用方的影响
  • mock=fail:return+null:表示消费方对该服务方法调用失败后再返回null值不抛异常,依然会失败重试,用来容忍不重要服务不稳定时对调用方的影响
1
2
@Reference(version = "timeout", timeout = 1000, mock = "force: return 123")
private DemoService demoService;

本地存根

本地存根就是一段逻辑,这段逻辑是在服务消费端执行的,这段逻辑一般都是由服务提供者提供,服务提供者可利用这种机制在服务消费者远程调用服务提供者之前或之后再做一些其他事情,比如结果缓存,请求参数验证等。

Stub必须有可传入Proxy的构造函数,在interface旁边放一个Stub实现,它实现BarService接口,并有一个传入远程BarService实例的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DemoServiceStub implements DemoService {
private final DemoService demoService;
public DemoServiceStub(DemoService demoService){
this.demoService = demoService; // 构造函数传入真正的远程代理对象
}
@Override
public String sayHello(String name) {
// 此代码在客户端执行, 可在客户端做ThreadLocal本地缓存,或预先验证参数是否合法等等
try {
return demoService.sayHello(name); // safe null
} catch (Exception e) { // 可以容错,可以做任何AOP拦截事项
return "容错数据";
}
}
}
@Reference(version = "timeout", timeout = 1000, stub = "true")
private DemoService demoService;

@Reference(version = "timeout", timeout = 1000, stub = "com.eleven.icode.DemoServiceStub")
private DemoService demoService;

本地伪装

本地伪装就是Mock,相对于本地存根更简单一点,Mock其实就是Dubbo中的服务容错的解决方案,通常用于服务降级。使用throw当调用出错时默认抛出RPCException,使用return来返回一个字符串表示的对象,作为Mock返回值。合法的字符串可以是:

  • empty:代表空,基本类型的默认值,或者集合类的空值
  • nullnull
  • truetrue
  • falsefalse
  • JSON格式:反序列化JSON所得到的对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Reference(version = "timeout", timeout = 1000, mock = "throw")
private DemoService demoService;

public class DemoServiceMock implements DemoService {
@Override
public String sayHello(String name) {
return "出现Rpc异常,进行了mock";
}
}
@Reference(version = "timeout", mock = "true")
private DemoService demoService;

@Reference(version = "timeout", timeout = 1000, mock = "force: return 123")
private DemoService demoService;

参数回调

对于某个服务接口中的某个方法,若想支持消费者在调用该方法时能设置回调逻辑,则该方法就需要提供一个入参用来表示回调逻辑。因为Dubbo协议是基于长连接的,故消费端在两次调用同一个方法时想指定不同的回调逻辑,则需要在调用时在指定一定key进行区分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public interface DemoServiceListener {
void changed(String msg);
}
// 服务端
// DemoService的sayHello方法的index=1的参数是回调对象,服务消费者可以调用addListener方法来添加回调对象,服务提供者一旦执行回调对象的方法就会通知给服务消费者
@Service(version = "callback", methods = {@Method(name = "sayHello", arguments = {@Argument(index = 2, callback = true)})}, callbacks = 3)
public class CallBackDemoService implements DemoService {
private final Map<String, DemoServiceListener> listeners = new ConcurrentHashMap<String, DemoServiceListener>();
public CallBackDemoService() {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
for (Map.Entry<String, DemoServiceListener> entry : listeners.entrySet()) {
entry.getValue().changed(getChanged(entry.getKey()));
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
t.start();
}
private String getChanged(String key) {
return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
}
@Override
public String sayHello(String name) {
return null;
}
@Override
public String sayHello(String name, String key, DemoServiceListener callback) {
System.out.println("执行了回调服务" + name);
listeners.put(key, callback);
URL url = RpcContext.getContext().getUrl();
return String.format("%s:%s, Hello, %s", url.getProtocol(), url.getPort(), name);
}
}
// 客户端
public class DemoServiceListenerImpl implements DemoServiceListener {
@Override
public void changed(String msg) {
System.out.println("被回调了:" + msg);
}
}
@Reference(version = "callback")
private DemoService demoService;
System.out.println(demoService.sayHello("eleven", "d1", new DemoServiceListenerImpl()));
System.out.println(demoService.sayHello("eleven", "d2", new DemoServiceListenerImpl()));
System.out.println(demoService.sayHello("eleven", "d3", new DemoServiceListenerImpl()));

异步调用

Dubbo所有异步编程接口开始以CompletableFuture为基础,基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service(version = "async")
public class AsyncDemoService implements DemoService {
@Override
public String sayHello(String name) {
System.out.println("执行了同步服务" + name);
URL url = RpcContext.getContext().getUrl();
return String.format("%s:%s, Hello, %s", url.getProtocol(), url.getPort(), name); // 正常访问
}
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
System.out.println("执行了异步服务" + name);
return CompletableFuture.supplyAsync(() -> {
URL url = RpcContext.getContext().getUrl();
return String.format("%s:%s, Hello, %s", url.getProtocol(), url.getPort(), name); // 正常访问
});
}
}

@Reference(version = "async")
private DemoService demoService;
CompletableFuture<String> future = demoService.sayHelloAsync("异步调用"); // 5
future.whenComplete((v, t) -> {
if (t != null) {
t.printStackTrace();
} else {
System.out.println("Response: " + v);
}
});

泛化调用

泛化调用可以用来做服务测试,某个服务想要支持泛化调用,可将该服务的generic属性设置为true,对于服务消费者来说,就可以不用依赖该服务的接口,直接利用GenericService接口来进行服务调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 方式一,通过服务消费者
@Reference(id = "demoService", version = "default", interfaceName = "com.eleven.icode.DemoService", generic = true)
private GenericService genericService;

// 方式二,通过服务提供者
@Service(interfaceName = "com.eleven.icode.DemoService", version = "generic")
public class GenericDemoService implements GenericService {
@Override
public Object $invoke(String s, String[] strings, Object[] objects) throws GenericException {
System.out.println("执行了generic服务");
return "执行的方法是" + s;
}
}
@Reference(version = "generic")
private DemoService demoService;

REST服务

1
2
dubbo.protocol.name=rest
dubbo.protocol.port=20880
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service(version = "rest")
@Path(value = "demo")
public class RestDemoService implements DemoService {
@GET
@Path(value = "say")
@Produces({ContentType.APPLICATION_JSON_UTF_8, ContentType.TEXT_XML_UTF_8})
@Override
public String sayHello(@QueryParam("name") String name) {
System.out.println("执行了rest服务" + name);
URL url = RpcContext.getContext().getUrl();
return String.format("%s: %s, Hello, %s", url.getProtocol(), url.getPort(), name); // 正常访问
}
}