Spring线程池跨线程数据共享

Spring Cloud中可能会用到sleuth做链路追踪,以及内部链路中需要用到Header中得一些数据,在单线程是没有问题得,但是在某些场景下就会有问题,比如上层业务系统得一个请求需要同时并发去调用基础服务得多个产品,这样请求到其他服务得链路追踪信息就不一样了等。

为了实现多线程并发情况下主线程线程池的线程共享ThreadLocal变量,如MDCRequestAttributes中的数据,需要自定义线程池线程实现

自定义线程池继承ThreadPoolTaskExecutor类重写executesubmit方法即可,SpringExecutorMethodInterceptor会拦截使用LazyTraceThreadPoolTaskExecutor装饰实际执行的task然后调用当前executor执行。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CusThreadPoolExecutor extends ThreadPoolTaskExecutor {
@Override
public void execute(Runnable task) {
ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
Map<String, String> contextOfMDC = MDC.getCopyOfContextMap();
Runnable cusTask = new CusInheritThreadVarRunnable(servletRequestAttributes, contextOfMDC, task);
super.execute(cusTask);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
Map<String, String> contextOfMDC = MDC.getCopyOfContextMap();
Callable<T> cusTask = new CusInheritThreadVarCallable<T>(servletRequestAttributes, contextOfMDC, task);
return super.submit(cusTask);
}
}

该线程池的使用和普通的线程池使用是一样。 只是在在提交到真正的线程池之前,会获取主线程中需要共享的ThreadLocal相关变量。然后将任务包装成一个新的RunnableCallable,在子线程中执行任务之前,将ThreadLocal相关变量设置到子线程的ThreadLocalMap中,执行结束之前清空设置的ThreadLocal相关变量,防止不同线程中串数据。

Callable实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CusInheritThreadVarCallable<V> implements Callable<V> {
private Callable<V> delegate;
private Map<String, String> contextOfMDC;
private ServletRequestAttributes servletRequestAttributes;

public CusInheritThreadVarCallable(ServletRequestAttributes servletRequestAttributes, Map<String, String> contextOfMDC, Callable<V> delegate) {
this.contextOfMDC = contextOfMDC;
this.servletRequestAttributes = servletRequestAttributes;
this.delegate = delegate;
}

@Override
public V call() throws Exception {
RestStrategyContext.clearCurrentContext();
//设置当前线程servletRequestAttributes
RequestContextHolder.setRequestAttributes(servletRequestAttributes);
//设置当前线程MDC
MDC.setContextMap(contextOfMDC);
V result;
try {
result = delegate.call();
} finally {
MDC.clear();
RequestContextHolder.resetRequestAttributes();
}
return result;
}
}

Runnable实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CusInheritThreadVarRunnable implements Runnable {
private Runnable delegate;
private Map<String, String> contextOfMDC;
private ServletRequestAttributes servletRequestAttributes;

public CusInheritThreadVarRunnable(ServletRequestAttributes servletRequestAttributes, Map<String, String> contextOfMDC, Runnable delegate) {
this.contextOfMDC = contextOfMDC;
this.servletRequestAttributes = servletRequestAttributes;
this.delegate = delegate;
}

@Override
public void run() {
RestStrategyContext.clearCurrentContext();
//设置当前线程servletRequestAttributes
RequestContextHolder.setRequestAttributes(servletRequestAttributes);
//设置当前线程MDC
MDC.setContextMap(contextOfMDC);
try {
// 执行
delegate.run();
} finally {
// 清空
MDC.clear();
RequestContextHolder.resetRequestAttributes();
}
}
}

线程池的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean(name = "processorExecutor")
public ThreadPoolTaskExecutor taskProcessorExecutor() {
CusThreadPoolExecutor executor = new CusThreadPoolExecutor();
executor.setCorePoolSize(taskPoolConfig.getCorePoolSize());
executor.setMaxPoolSize(taskPoolConfig.getMaxPoolSize());
executor.setQueueCapacity(taskPoolConfig.getQueueCapacity());
executor.setKeepAliveSeconds(taskPoolConfig.getKeepAliveSeconds());
executor.setThreadNamePrefix("CusTask-");
executor.setRejectedExecutionHandler(cusCallerRunsPolicy);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(taskPoolConfig.getAwaitTerminationSeconds());
executor.initialize();
return executor;
}

对于线程池的配置需要注意的是,拒绝策略不要使用CallerRunsPolicy拒绝策略,该拒绝策略会在任务添加到线程池被拒绝时使用主线程执行该任务。在并发较高的情况下,拒绝策略生效,导致很多任务由主线程执行了,从而导致主线程中的MDC数据被清空了,从而导致一些trace信息丢失。可以自定义拒绝策略将任务丢回队列:

1
2
3
4
5
6
7
8
9
10
11
12
public class CusCallerRunsPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
log.error("Reject policy interrupted exception ", e);
}
}
}
}