Hystrix总结

在项目中需要对某些接口进行限流和熔断处理,防止由于某些接口资源消耗过大影响到整个的所有接口,防止单独的依赖耗尽资源;对一些依赖服务进行隔离,防止当依赖服务不可用或者响应非常缓慢导致整个应用不可用,阻止故障的连锁反应。过载立即切断并快速失败防止排队。

Hystrix工作流程

Hystrix有4种参数配置,优先级由低到高分别为:内置全局默认值动态全局默认属性内置实例默认值动态配置实例属性

基于编程式

基于编程式使用Hystrix,只需继承HystrixCommandHystrixObservableCommand,区别在于HystrixCommand命令逻辑卸载run()方法中,且由新创建线程执行,一个实例只能向调用程序发送单条数据。HystrixObservableCommand命令逻辑写在construct()方法中,由调用程序线程执行,一个实例可以顺序发送多条数据。

HystrixCommand命令有execute()queue()observe()、toObservable()4个方法来触发执行run()方法。HystrixObservableCommand命令只有observe()、toObservable()2个方法来触发执行construct()方法。

  • execute()同步堵塞方式执行
  • queue()异步非堵塞方式执行,通过Future.get()获取run()返回结果
  • observe() 事件注册前执行run()construct()方法
  • toObservable() 事件注册后执行run()construct()方法

继承HystrixCommand实现自己的Command,在构造方法中配置需要的参数,后续章节对具体配置进行详细描述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class HelloWorldCommand extends HystrixCommand<JSONObject> {

private DataRequest request;

protected HelloWorldCommand(DataRequest request) {
HystrixCommandProperties.Setter propertiesSetter = HystrixCommandProperties.Setter()
.withCircuitBreakerEnabled(true)
.withRequestCacheEnabled(false)
.withRequestLogEnabled(false)
.withExecutionIsolationStrategy()
.withExecutionIsolationSemaphoreMaxConcurrentRequests(80)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(80)
.withCircuitBreakerRequestVolumeThreshold(30)
.withCircuitBreakerSleepWindowInMilliseconds(5000)
.withExecutionTimeoutInMilliseconds(timeOut);

HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("requestData");
HystrixCommand.Setter setter = HystrixCommand.Setter.withGroupKey(groupKey)
.andCommandKey(HystrixCommandKey.Factory.asKey("data-"+ Id))
.andCommandPropertiesDefaults(propertiesSetter)
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("requestData"));
super(setter);
this.request = request;
}

@Override
protected JSONObject run() {
return request.executeRequest();
}

@Override
protected JSONObject getFallback() {}
}

调用HystrixCommand的执行方法发起实际请求,execute()方法同步调用:

1
2
HelloWorldCommand command = new HelloWorldCommand(request);
JSONObject result = command.execute();

queue()方法异步调用:

1
2
3
HelloWorldCommand command = new HelloWorldCommand(request);
Future<JSONObject> future = command.queue();
JSONObject result = future.get(10000, TimeUnit.MILLISECONDS);

observe()方法,注册观察者事件订阅,事件注册前执行:

1
2
Observable<JSONObject> observable = new HelloWorldCommand(request).observe();
observable.subscribe(result1 -> System.out.println("Observable call--> " + result1));

observe()方法,注册完整执行生命周期事件,事件注册前执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable<JSONObject> observable = new HelloWorldCommand(request).observe();
observable.subscribe(new Observer<JSONObject>() {
//onNext/onError完成之后最后回调
@Override
public void onCompleted() {}

// 当产生异常时回调
@Override
public void onError(Throwable throwable) {}

// 获取结果后回调
@Override
public void onNext(JSONObject s) {}
});

toObservable()方法,注册观察者事件订阅,事件注册后执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable<JSONObject> toObservable = new HelloWorldCommand(request).toObservable();
toObservable.subscribe(new Observer<JSONObject>() {
//onNext/onError完成之后最后回调
@Override
public void onCompleted() {}

// 当产生异常时回调
@Override
public void onError(Throwable throwable) {}

// 获取结果后回调
@Override
public void onNext(JSONObject s) {}
});

基于注解式

注解使用方式和编程式大致相同,只是属性参数配置都注解化了。三个核心注解分别为@HystrixCommand@HystrixProperty@HystrixCollapser

注解同步执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class HelloWorldHystrixAnnotation {

@Autowired
private DataClient dataClient;

@HystrixCommand(groupKey = "helloWorldHystrixAnnotation",
commandKey = "helloWorldHystrixAnnotationSync", fallbackMethod = "fallback")
public JSONObject executeRequest(String param) {
return dataClient.retrieveData(param);
}

public JSONObject fallback() {
return new JSONObject();
}
}

注解异步执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class HelloWorldHystrixAnnotationAsync {

@Autowired
private DataClient dataClient;

@HystrixCommand(groupKey = "helloWorldHystrixAnnotation",
commandKey = "helloWorldHystrixAnnotationAsync", fallbackMethod = "fallback")
public Future<JSONObject> run(String param) {
return new AsyncResult<JSONObject>() {
@Override
public JSONObject invoke() {
return dataClient.retrieveData(param);
}
};
}

public JSONObject fallback() {
return new JSONObject();
}
}

注解订阅执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class HelloWorldHystrixAnnotationObervable {

@Autowired
private DataClient dataClient;

@HystrixCommand(groupKey = "helloWorldHystrixAnnotation",
commandKey = "helloWorldHystrixAnnotationObervable", fallbackMethod = "fallback")
public Observable<JSONObject> run(String param) {
return Observable.create(subscriber -> {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(dataClient.retrieveData(param));
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
});
}

public JSONObject fallback() {
return new JSONObject();
}
}

触发fallback方法的情况

  • 执行抛出异常
  • 执行超时
  • 断路器打开,不尝试执行
  • 线程池拒绝,不尝试执行
  • 信号量拒绝,不尝试执行

Hystrix监控界面参数

基础属性配置

CommandGroup:每个命令最少配置的必选参数,不指定ThreadPoolKey的情况下,用于指定线程池的隔离。

  • 实例配置:HystrixCommand.Setter().withGroupKey(HystrixCommandGroupKey.Factory.asKey("groupKey"));
  • 注解配置:@HystrixCommand(groupKey = "groupKey")

CommandKey:依赖命名,一般每个CommandKey代表一个依赖抽象,相同依赖使用相同CommandKey名称,依赖隔离的根本就是对相同CommandKey的依赖做隔离,不同的依赖隔离最好使用不同的线程池。

  • 实例配置:HystrixCommand.Setter().andCommandKey(HystrixCommandKey.Factory.asKey("commandKey"));
  • 注解配置:@HystrixCommand(commandKey = "commandKey")

ThreadPoolKey:依赖隔离使用的线程池的键值,对同一业务依赖隔离用CommandGroup做区分,对同一依赖的不同远程调用,使用ThreadPoolKey做隔离区分,业务相同的组,需要在资源上做隔离时,使用ThreadPoolKey区分。不同的ThreadPoolKey建议使用不同的CommandKey

  • 实例配置:HystrixCommand.Setter().andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("threadPoolKey"))
  • 注解配置:@HystrixCommand(threadPoolKey = "threadPoolKey")

命令属性配置

execution.isolation.strategy:用于设置HystrixCommand执行的隔离策略,支持THREAD线程池隔离单独线程执行并发数受限于线程池大小和SEMAPHORE信号量隔离调用线程中执行通过信号量来限制并发数。

  • 实例配置:HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "execution.isolation.strategy",value = "SEMAPHORE")})
  • 默认值:THREAD

execution.timeout.enabled:是否启用超时限制。

  • 实例配置:HystrixCommandProperties.Setter().withExecutionTimeoutEnabled(true)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "execution.timeout.enabled", value = "true")})
  • 默认值:true

execution.isolation.thread.timeoutInMilliseconds:执行超时时间,超时会作用在HystrixCommand.queue(),即使没有调用get()获得Future对象。

  • 实例配置:HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(2000)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "2000")})
  • 默认值:1000ms

execution.isolation.thread.interruptOnTimeout:使用线程隔离时,对执行超时的线程是否被中断。

  • 实例配置:HystrixCommandProperties.Setter().withExecutionIsolationThreadInterruptOnTimeout(true)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "execution.isolation.thread.interruptOnTimeout", value = "true")})
  • 默认值:trueTHREAD模式有效)

execution.isolation.semaphore.maxConcurrentRequests:使用信号量策略时,允许的最大并发请求数。

  • 实例配置:HystrixCommandProperties.Setter().withExecutionIsolationSemaphoreMaxConcurrentRequests(50)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "50")})
  • 默认值:10SEMAPHORE模式有效)

Fallback

fallback.enabled:当接口异常或者拒绝时,是否调用Fallback方法处理,线程池和信号量策略都支持。

  • 实例配置:HystrixCommandProperties.Setter().withFallbackEnabled(true)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "fallback.enabled", value = "true")})
  • 默认值:true

fallback.isolation.semaphore.maxConcurrentRequestsFallback方法最大并发数。超过该配置的请求将被拒绝,若没有实现回退,则抛出异常。线程池和信号量策略都支持。

  • 实例配置:HystrixCommandProperties.Setter().withFallbackIsolationSemaphoreMaxConcurrentRequests(20)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "20")})
  • 默认值:10SEMAPHORE模式有效)

circuitBreaker.enabled:断路器是否生效。

  • 实例配置:HystrixCommandProperties.Setter().withCircuitBreakerEnabled(true)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "circuitBreaker.enabled", value = "true")})
  • 默认值:true

断路器

circuitBreaker.requestVolumeThreshold:滚动窗口中,打开断路器的最少请求数。

  • 实例配置:HystrixCommandProperties.Setter().withCircuitBreakerRequestVolumeThreshold(20)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "20")})
  • 默认值:20

circuitBreaker.sleepWindowInMilliseconds:拒绝请求到再次不被拒绝的请求时间间隔。

  • 实例配置:HystrixCommandProperties.Setter().withCircuitBreakerSleepWindowInMilliseconds(10)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "5000")})
  • 默认值:5000ms

circuitBreaker.errorThresholdPercentage:断路器启动回退逻辑的错误比率。

  • 实例配置:HystrixCommandProperties.Setter().withCircuitBreakerErrorThresholdPercentage(50)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50")})
  • 默认值:50

circuitBreaker.forceClosed:强制断路器进入关闭状态,将允许所有的请求,无视错误率。

  • 实例配置:HystrixCommandProperties.Setter().withCircuitBreakerForceClosed(false)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "circuitBreaker.forceClosed", value = "false")})
  • 默认值:false

circuitBreaker.forceOpen:强制断路器进入打开状态,将会拒绝所有的请求。优先级比circuitBreaker.forceClosed高。

  • 实例配置:HystrixCommandProperties.Setter().withCircuitBreakerForceOpen(false)
  • 注解配置:@HystrixCommand(commandProperties = {@HystrixProperty(name = "circuitBreaker.forceOpen", value = "false")})
  • 默认值:false

线程池

hystrix.threadpool.default.coreSize:设置核心线程池大小,与ThreadPoolExecutorcoreSize的含义不一样

  • 实例配置:HystrixThreadPoolProperties.Setter().withCoreSize(10)
  • 注解配置:@HystrixCommand(threadPoolProperties = {@HystrixProperty(name = "coreSize", value = "10")})
  • 默认值:10

hystrix.threadpool.default.maximumSize:设置线程池最大值,不开始拒绝HystrixCommand的情况下支持的最大并发数,设置allowMaximumSizeToDrivergeFromCoreSize后生效。

  • 实例配置:HystrixThreadPoolProperties.Setter().withMaximumSize(10)
  • 注解配置:@HystrixCommand(threadPoolProperties = {@HystrixProperty(name = "maximumSize", value = "10")})
  • 默认值:10

hystrix.threadpool.default.maxQueueSize:最大的队列值,若设置为-1使用SynchronousQueue,否则使用LinkedBlockingQueue

  • 实例配置:HystrixThreadPoolProperties.Setter().withMaxQueueSize(10)
  • 注解配置:@HystrixCommand(threadPoolProperties = {@HystrixProperty(name = "maxQueueSize", value = "10")})
  • 默认值:-1

hystrix.threadpool.default.queueSizeRejectionThreshold:设置队列拒绝的阈值,maxQueueSize值为-1时,该属性不生效。

  • 实例配置:HystrixThreadPoolProperties.Setter().withQueueSizeRejectionThreshold(5)
  • 注解配置:@HystrixCommand(threadPoolProperties = {@HystrixProperty(name = "queueSizeRejectionThreshold", value = "5")})
  • 默认值:5

hystrix.threadpool.default.keepAliveTimeMinutes:设置存活时间,单位分钟,如果coreSize小于maximumSize,则该属性控制一个线程从使用完成到被释放的时间。

  • 实例配置:HystrixThreadPoolProperties.Setter().withKeepAliveTimeMinutes(1)
  • 注解配置:@HystrixCommand(threadPoolProperties = {@HystrixProperty(name = "keepAliveTimeMinutes", value = "1")})
  • 默认值:1

hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize :允许maximumSize起作用。

  • 实例配置:HystrixThreadPoolProperties.Setter().withAllowMaximumSizeToDivergeFromCoreSize(false)
  • 注解配置:@HystrixCommand(threadPoolProperties = {@HystrixProperty(name = "allowMaximumSizeToDivergeFromCoreSize", value = "false")})
  • 默认值:false