Spring 基于注解的线程池
阅读数:66 评论数:0
跳转到新版页面分类
python/Java
正文
一、@Async
通过配置注解 @EnableAsync
可以开启异步任务,然后在实际执行的方法上配置注解 @Async
上声明是异步任务。通过 @Async
注解表明该方法是异步方法,如果注解在类上,那表明这个类里面的所有方法都是异步的。
@Async的默认线程池为SimpleAsyncTaskExecutor。
二、Spring的线程池
SimpleAsyncTaskExecutor | 不利用线程,默认每次调用都会创建一个新的线程 |
SyncTaskExecutor | 同步操作。只适用于不需要多线程的地方。 |
ConcurrentTaskExecutor | Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。 |
SimpleThreadPoolTaskExecutor | 是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类 |
ThreadPoolTaskExecutor | 最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装。 |
三、使用方法
第一步,异步方法类上或启动类加上注解 @EnableAsync。第二步,需要被异步调用的方法外加上 @Async。第三步,使用的 @Async 注解方法的类对象应该是Spring容器管理的bean对象;
在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。
@Async
public void returnVoid() {
}
用 future.get() 来获取异步任务的返回结果。在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。
@Async
public Future<String> returnFuture() {
try {
Thread.sleep(1000);
return new AsyncResult<String>("hello");
} catch (InterruptedException e) {
}
}
当方法返回值是Future时,异常捕获是没问题的,Future.get()方法会抛出异常。但如果返回类型是void,异常在当前线程就捕获不到,需要添加额外的配置来处理异常。
可实现AsyncUncaughtExceptionHandler接口来自定义异常处理类,重写handleUncaughtException()方法。
@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
}
}
四、自定义线程池
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("myAsyncExecutor")
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int corePoolSize = 10;
executor.setCorePoolSize(corePoolSize);
int maxPoolSize = 50;
executor.setMaxPoolSize(maxPoolSize);
int queueCapacity = 10;
executor.setQueueCapacity(queueCapacity);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
String threadNamePrefix = "myDeeAsyncExecutor-";
executor.setThreadNamePrefix(threadNamePrefix);
executor.setWaitForTasksToCompleteOnShutdown(true);
int awaitTerminationSeconds =5;
executor.setAwaitTerminationSeconds(awaitTerminationSeconds);
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return executor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> ErrorLogger.getInstance().log(String.format("执行异步任务'%s'", method), ex);
}
}
在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(),不然在调用时会报线程池未初始化的异常
@Configuration
@EnableAsync
class SpringAsyncConfigurer extends AsyncConfigurerSupport {
@Bean
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(3);
threadPool.setMaxPoolSize(3);
threadPool.setWaitForTasksToCompleteOnShutdown(true);
threadPool.setAwaitTerminationSeconds(60 * 15);
return threadPool;
}
@Override
public Executor getAsyncExecutor() {
return asyncExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> ErrorLogger.getInstance().log(String.format("执行异步任务'%s'", method), ex);
}
}
3、配置由自定义的TaskExecutor替代内置的任务执行器
由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class),又查询是否存在默认名称为TaskExecutor的线程池。所以可在项目中,定义名称为TaskExecutor的bean生成一个默认线程池。
也可不指定线程池的名称,申明一个线程池,本身底层是基于TaskExecutor.class便可。
@Slf4j
@Configuration
public class ThreadPoolConfiguration {
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService() {
return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10000),
new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
(r, executor) -> log.error("system pool is full! "));
}
}