SpringMvc异步原理及实现

在实际的项目中,可能会用到HTTP异步请求方式来提高系统的吞吐量。

同步请求

客户端发起同步HTTP请求时,线程进入等待状态,直到接受到一个response对象或者请求超时状态 ,往返WEB服务器的过程:

HTTP同步请求过程图

HTTP请求在经过DNS服务器的域名解析,到Nginx反向代理转发到我们的WEB服务器(servlet容器,Tomcat),WEB服务器会启动一个请求处理线程来处理请求,完成资源分配处理之后,线程起调后端的处理线程,同时WEB服务器的线程将会进入阻塞状态,直到后端的线程处理完毕,WEB服务器释放请求处理线程的资源,同时返回response对象,客户端接收到response对象,整个请求完成。

若后端处理服务器中进行了大量的IO操作,数据库操作,或者跨网调用等耗时操作,导致请求处理线程进入长时间的阻塞。因为WEB服务器的请求处理线程条个数是有限的,如果同时大量的请求阻塞在WEB服务器中,新的请求将会处于等待状态,甚至服务不可用,connection refused。

异步请求

Servlet3的异步web机制的引入,改造接口服务,可以让请求线程(IO线程)和业务处理线程分开,进而对业务进行线程池隔离。 解决tomcat线程池资源消耗,频繁gc,高io,堆内存上升 。还可以根据业务重要性进行业务分级,然后再把线程池分级 。还可以根据这些分级做其它操作比如监控和降级处理

请求处理线程对后台处理的调用使用了invoke的方式,调invoke方法后直接返回不等待,请求处理线程就释放了,它可以接着去处理别的请求,当后端处理完成后,会钩起一个回调处理线程来处理调用的结果,这个回调处理线程跟请求处理线程也许都是线程池中的某个线程,相互间可以完全没有关系,由这个回调处理线程向浏览器返回内容。

带来的改进是显而易见的,请求处理线程不需要阻塞了,它的能力得到了更充分的使用,带来了服务器吞吐能力的提升。下图是异步请求过程图:

HTTP异步请求过程图

Servlet3异步流程

Servlet3异步流程

接收到request请求之后,由Tomcat工作线程从HttpServletRequest中获得一个异步上下文AsyncContext对象,然后由Tomcat工作线程把AsyncContext对象传递给业务处理线程,同时Tomcat工作线程归还到工作线程池,这一步就是异步开始。在业务处理线程中完成业务逻辑的处理,生成response返回给客户端。在Servlet3.0中虽然处理请求可以实现异步,但是InputStream和OutputStream的IO操作还是阻塞的,当数据量大的request body 或者 response body的时候,就会导致不必要的等待。从Servlet3.1以后增加了非阻塞IO,需要tomcat8.x支持。

Servlet3的异步使用步骤:

  • 声明Servlet,增加asyncSupported属性,开启异步支持。@WebServlet(urlPatterns = "/simpleAsync", asyncSupported = true)
  • 通过request获取异步上下文AsyncContextAsyncContext asyncCtx = request.startAsync();
  • 开启业务逻辑处理线程,并AsyncContext 传递给业务线程executor.execute(new AsyncRequestProcessor(asyncCtx, secs));
  • 在异步业务逻辑处理线程中,通过asyncContext获取request和response,处理对应的业务。
  • 业务逻辑处理线程处理完成逻辑之后,调用AsyncContextcomplete方法。asyncContext.complete();从而结束该次异步线程处理。

同步异步对比

实际写了一个固定延时10秒的Demo,Tomcat的参数设置如下:

1
2
3
4
tomcat:
max-threads: 5
accept-count: 10
max-connections: 1000

在500的并发下分别对同步和异步请求进行了测试,通过MBeanTomcat参数进行监控

  • 同步情况下currentThreadsBusy参数始终是与最大线程数一致,说明线程一致未释放,会导致请求一致阻塞
  • 异步情况由于后台是异步处理的线程马上就释放了,故currentThreadsBusy基本上都是0。在某些情况下能够极大的提升系统吞吐量。

将Tomcat业务线程池的压力转移到系统自定义线程池中。使得更加可控,即使变更应用服务器系统任然兼容。

同步线程阻塞Tomcat参数使用情况

异步线程非阻塞Tomcat参数使用情况

Spring异步

Spring MVC 3.2开始引入了基于Servlet 3的异步请求处理。相比以前,控制器方法已经不一定需要返回一个值,而是可以返回一个java.util.concurrent.Callable对象,并通过Spring MVC所管理的线程来产生返回值。 同时Servlet容器的主线程则可以退出并释放其资源了,同时也允许容器去处理其他的请求。通过一个TaskExecutorSpring MVC可以在另外的线程中调用Callable。当Callable返回时,请求再携带Callable返回的值,再次被分配到Servlet容器中恢复处理流程。

另一个选择,是让控制器方法返回一个DeferredResult实例。该场景下,返回值可由任何一个线程产生,也包括那些不是由Spring MVC管理的线程。 返回值可能是为了响应某些外部事件所产生的,比如一条JMS的消息,一个计划任务 。

Callable异步请求
  • 控制器先返回一个Callable对象
  • Spring MVC开始进行异步处理,并把该Callable对象提交给另一个独立线程的执行器TaskExecutor处理
  • DispatcherServlet和所有过滤器都退出Servlet容器线程,但此时方法的响应对象仍未返回
  • Callable对象最终产生一个返回结果,此时Spring MVC会重新把请求分派回Servlet容器,恢复处理
  • DispatcherServlet再次被调用,恢复对Callable异步处理所返回结果的处理
DeferredResult异步请求
  • 控制器先返回一个DeferredResult对象,并把它存取在内存(队列或列表等)中以便存取
  • Spring MVC开始进行异步处理
  • DispatcherServlet和所有过滤器都退出Servlet容器线程,但此时方法的响应对象仍未返回
  • 由处理该请求的线程对 DeferredResult进行设值,然后Spring MVC会重新把请求分派回Servlet容器,恢复处理
  • DispatcherServlet再次被调用,恢复对该异步返回结果的处理

SpringMvc异步实现方式一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Callable<String> process(HttpServletResponse response) {
return () -> {
response.setContentType("text/plain;charset=utf-8");
response.getWriter().write("响应内容");
response.getWriter().close();
return null;
};
}

// taskService是一个@Service注解类
public Callable<Map<String, Object>> process() {
Callable<Map<String, Object>> callable = taskService::execute;
return callable;
}

// taskService是一个@Service注解类
public Callable<Map<String, Object>> process() {
Callable<Map<String, Object>> callable = () -> {
return taskService.execute();
};
return callable;
}

SpringMvc异步实现方式二:

1
2
3
4
5
6
7
8
9
10
11
// taskService是一个@Service注解类
public WebAsyncTask process() {
Callable<Map<String, Object>> callable = taskService::execute;
return new WebAsyncTask<>(20000, callable);
}

// taskService是一个@Service注解类
public WebAsyncTask process() {
Callable<Map<String, Object>> callable = taskService::execute;
return new WebAsyncTask<>(callable);
}

SpringMvc异步实现方式三:

1
2
3
4
5
6
public DeferredResult<Map<String, Object>> process() {
DeferredResult<Map<String, Object>> deferredResult = new DeferredResult<>();
CompletableFuture.supplyAsync(taskService::execute)
.whenCompleteAsync((result, throwable) -> deferredResult.setResult(result));
return deferredResult;
}

方式一和方式二Spring返回的CallableRequestMappingHandlerAdapter拦截,使用SimpleAsyncTaskExecutor线程池处理,每当任务被提交到此线程池时,线程池产生一个新的线程去执行Callable中的代码, 每次都产生新的线程而且没有上上限(默认没有上限的,可以设置concurrencyLimit属性来设置线程数的大小) 但:SimpleAsyncTaskExecutor 线程池性能不好,可使用自定义的线程池来代替

方式三使用的是CompletableFuture.supplyAsync,在completablefuturesupplyasync方法将在ForkJoinPool池运行任务。也可以使用任何其他的线程池来执行。

若不自定线程池,MvcAsync线程数会飙涨:

未自定义线程池线程数飙涨

自定义MVC Callable线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Bean
@ConfigurationProperties(prefix = "spring.task.mvcPool")
public TaskPoolConfig mvcPoolConfig() {
return new TaskPoolConfig();
}

@Bean
public AsyncTaskExecutor mvcTaskExecutor(TaskPoolConfig mvcPoolConfig) {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(mvcPoolConfig.getCorePoolSize());
threadPool.setMaxPoolSize(mvcPoolConfig.getMaxPoolSize());
threadPool.setQueueCapacity(mvcPoolConfig.getQueueCapacity());
threadPool.setAllowCoreThreadTimeOut(mvcPoolConfig.isAllowCoreThreadTimeOut());
threadPool.setWaitForTasksToCompleteOnShutdown(
mvcPoolConfig.isWaitForTasksToCompleteOnShutdown());
threadPool.setKeepAliveSeconds(mvcPoolConfig.getKeepAliveSeconds());
threadPool.setThreadNamePrefix("Mvc-Thread-");

threadPool.setRejectedExecutionHandler(rejectedExecutionHandler);
threadPool.initialize();
return threadPool;
}

@Bean
public WebMvcConfigurerAdapter webMvcConfigurerAdapter(AsyncTaskExecutor mvcTaskExecutor) {
return new WebMvcConfigurerAdapter() {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(mvcTaskExecutor);
super.configureAsyncSupport(configurer);
}
};
}

请求由Tomcat业务线程池转移到系统自定义线程池中,从下面的示例中可以明显得看出Tomcat的处理线程非常快的就结束了,而由自定义线程池中的线程去处理任务,等任务结束后再由Tomcat线程响应给用户:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[nio-8011-exec-4] c.i.ent.controller.DashboardController   : async start
[nio-8011-exec-4] c.i.ent.controller.DashboardController : async end
[nio-8011-exec-3] c.i.ent.controller.DashboardController : async start
[nio-8011-exec-3] c.i.ent.controller.DashboardController : async end
[nio-8011-exec-5] c.i.ent.controller.DashboardController : async start
[nio-8011-exec-5] c.i.ent.controller.DashboardController : async end
[nio-8011-exec-2] c.i.ent.controller.DashboardController : async start
[nio-8011-exec-2] c.i.ent.controller.DashboardController : async end
[ Mvc-Thread-4] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-4执行进度:task:0/10
[ Mvc-Thread-2] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-2执行进度:task:0/10
[ Mvc-Thread-7] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-7执行进度:task:0/10
[ Mvc-Thread-5] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-5执行进度:task:0/10
[ Mvc-Thread-3] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-3执行进度:task:0/10
[ Mvc-Thread-1] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-1执行进度:task:0/10
[ Mvc-Thread-6] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-6执行进度:task:0/10
[ Mvc-Thread-8] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-8执行进度:task:0/10
[ Mvc-Thread-9] c.i.ent.service.impl.TaskServiceImpl : Mvc-Thread-9执行进度:task:0/10

异步多线程池

在实际中可能会用到不同的异步接口使用不同的线程池,以下代码是自定义多个线程池给不同的接口使用的示例代码:

多线程池的配置如下,这里做了快、中、慢三个线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
spring:
task:
slowMvcPool:
corePoolSize: 10
maxPoolSize: 20
queueCapacity: 125
keepAliveSeconds: 60
allowCoreThreadTimeOut: true
middleMvcPool:
corePoolSize: 20
maxPoolSize: 40
queueCapacity: 250
keepAliveSeconds: 60
allowCoreThreadTimeOut: true
fastMvcPool:
corePoolSize: 40
maxPoolSize: 80
queueCapacity: 500
keepAliveSeconds: 60
allowCoreThreadTimeOut: true

通过@Bean方式将各个线程池的参数注入到Spring中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Bean
@ConfigurationProperties(prefix = "spring.task.fastMvcPool")
public TaskPoolConfig fastMvcPoolConfig() {
return new TaskPoolConfig();
}

@Bean
@ConfigurationProperties(prefix = "spring.task.middleMvcPool")
public TaskPoolConfig middleMvcPoolConfig() {
return new TaskPoolConfig();
}

@Bean
@ConfigurationProperties(prefix = "spring.task.slowMvcPool")
public TaskPoolConfig slowMvcPoolConfig() {
return new TaskPoolConfig();
}

@Bean
public AsyncTaskExecutor slowMvcTaskExecutor(TaskPoolConfig slowMvcPoolConfig) {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(slowMvcPoolConfig.getCorePoolSize());
threadPool.setMaxPoolSize(slowMvcPoolConfig.getMaxPoolSize());
threadPool.setQueueCapacity(slowMvcPoolConfig.getQueueCapacity());
threadPool.setAllowCoreThreadTimeOut(slowMvcPoolConfig.isAllowCoreThreadTimeOut());
threadPool.setWaitForTasksToCompleteOnShutdown(
slowMvcPoolConfig.isWaitForTasksToCompleteOnShutdown());
threadPool.setKeepAliveSeconds(slowMvcPoolConfig.getKeepAliveSeconds());
threadPool.setThreadNamePrefix("Slow-Mvc-");

threadPool.setRejectedExecutionHandler(rejectedExecutionHandler);
threadPool.initialize();
return threadPool;
}

@Bean
public AsyncTaskExecutor middleMvcTaskExecutor(TaskPoolConfig middleMvcPoolConfig) {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(middleMvcPoolConfig.getCorePoolSize());
threadPool.setMaxPoolSize(middleMvcPoolConfig.getMaxPoolSize());
threadPool.setQueueCapacity(middleMvcPoolConfig.getQueueCapacity());
threadPool.setAllowCoreThreadTimeOut(middleMvcPoolConfig.isAllowCoreThreadTimeOut());
threadPool.setWaitForTasksToCompleteOnShutdown(
middleMvcPoolConfig.isWaitForTasksToCompleteOnShutdown());
threadPool.setKeepAliveSeconds(middleMvcPoolConfig.getKeepAliveSeconds());
threadPool.setThreadNamePrefix("Middle-Mvc-");

threadPool.setRejectedExecutionHandler(rejectedExecutionHandler);
threadPool.initialize();
return threadPool;
}

@Bean
public AsyncTaskExecutor fastMvcTaskExecutor(TaskPoolConfig fastMvcPoolConfig) {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(fastMvcPoolConfig.getCorePoolSize());
threadPool.setMaxPoolSize(fastMvcPoolConfig.getMaxPoolSize());
threadPool.setQueueCapacity(fastMvcPoolConfig.getQueueCapacity());
threadPool.setAllowCoreThreadTimeOut(fastMvcPoolConfig.isAllowCoreThreadTimeOut());
threadPool.setWaitForTasksToCompleteOnShutdown(
fastMvcPoolConfig.isWaitForTasksToCompleteOnShutdown());
threadPool.setKeepAliveSeconds(fastMvcPoolConfig.getKeepAliveSeconds());
threadPool.setThreadNamePrefix("Fast-Mvc-");

threadPool.setRejectedExecutionHandler(rejectedExecutionHandler);
threadPool.initialize();
return threadPool;
}

在Controller层中使用自定义的线程池,WebAsyncTask支持多种方式的自定义线程池的使用,可以通过下线程池在Spring中的Bean的名称,也可以直接注入线程池Bean,WebAsyncTask可以设置Timeout以及通过onTimeout方法在超时时响应内容,在使用时最好设置,如不设置如果接口超时会抛出AsyncRequestTimeoutException异常该异常比较难处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@GetMapping("/slowAsyncTask")
public WebAsyncTask slowAsyncTask(HttpServletResponse response, AsyncTaskExecutor slowMvcTaskExecutor) {
logger.info(Thread.currentThread().getName() + " 进入helloController方法");
Callable<Map<String, Object>> callable = taskService::execute;
WebAsyncTask asyncTask = new WebAsyncTask(
ASYNC_REQUEST_TIME_OUT, slowMvcTaskExecutor, callable);
return asyncTask;
}

@GetMapping("/middleAsyncTask")
public WebAsyncTask middleAsyncTask(HttpServletResponse response, AsyncTaskExecutor middleMvcTaskExecutor) {
logger.info(Thread.currentThread().getName() + " 进入helloController方法");
Callable<Map<String, Object>> callable = taskService::execute;
WebAsyncTask asyncTask = new WebAsyncTask(
ASYNC_REQUEST_TIME_OUT, middleMvcTaskExecutor, callable);
return asyncTask;
}

@GetMapping("/fastAsyncTask")
public WebAsyncTask fastAsyncTask(HttpServletResponse response, AsyncTaskExecutor fastMvcTaskExecutor) {
logger.info(Thread.currentThread().getName() + " 进入helloController方法");
Callable<Map<String, Object>> callable = taskService::execute;
WebAsyncTask asyncTask = new WebAsyncTask(
ASYNC_REQUEST_TIME_OUT, fastMvcTaskExecutor, callable);
return asyncTask;
}

@GetMapping("/fastAsyncTask")
public WebAsyncTask fastAsyncTask(HttpServletResponse response) {
logger.info(Thread.currentThread().getName() + " 进入helloController方法");
Callable<Map<String, Object>> callable = taskService::execute;
WebAsyncTask asyncTask = new WebAsyncTask(
ASYNC_REQUEST_TIME_OUT, "fastMvcTaskExecutor", callable);
return asyncTask;
}

Servlet3非阻塞IO

Servlet3.1以后增加了非阻塞IO实现,需要Tomcat8.x以上支持非阻塞 IO 仅对在 Servlet 中的异步处理请求有效,否则当调用 ServletInputStream.setReadListener或ServletOutputStream.setWriteListener方法时将抛出IllegalStateException。Servlet3的非阻塞IO是对Servlet3异步的增强。Servlet3的非阻塞是利用java.util.EventListener的事件驱动机制来实现的。

Servlet3.1的非阻塞IO从下面图中可以看出是面对InputStream 和 OutPutStream流的,这里的非阻塞IO跟我们常说的JDK NIO不是一个概念,Servlet3.1的非阻塞是同jdk的事件驱动机制来实现。

Servlet3.1非阻塞IO