使用ThreadPoolTaskExecutor与@Async快速实现多线程异步操作
一、前言
ThreadpoolTaskExecutor相对于ThreadpoolExecutor来说,是使用了ThreadPoolExecutor并增强,扩展了更多特性。它是Spring提供的线程池,帮助我们快速创建一个可用的线程池来使用。
@Async是Spring的注解,可以加在类或方法上。通俗的来讲,如果加上了这个注解,那么该类或者该方法在使用时将会进行异步处理,也就是创建一个线程来实现这个类或者方法,实现多线程。
二、使用
1、配置线程池
package com.czf.connect.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author zfChen
* @create 2022/1/13 9:49
*/
@EnableAsync //表示开启多线程
@Configuration
public class ThreadPoolConfig {
/**
* 核心线程数
*/
private static final int CORE_POOL_SIZE = 20;
/**
* 最大线程数
*/
private static final int MAX_POOL_SIZE = 40;
/**
* 队列长度
*/
private static final int QUEUE_CAPACITY = 200;
/**
* 线程池维护线程所允许的空闲时间
*/
private static final int KEEP_ALIVE_SECONDS = 60;
@Bean(name = "threadPoolTaskExecutor") //ThreadPoolTaskExecutor不会自动创建ThreadPoolExecutor,需要手动调initialize才会创建。如果@Bean就不需手动,会自动InitializingBean的afterPropertiesSet来调initialize
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置线程池最大线程数
executor.setMaxPoolSize(MAX_POOL_SIZE);
// 线程池活跃的线程数
executor.setCorePoolSize(CORE_POOL_SIZE);
// 设置线程队列最大线程数
executor.setQueueCapacity(QUEUE_CAPACITY);
// 线程池维护线程所允许的空闲时间
executor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
executor.setThreadNamePrefix("task-async");//线程前缀名称
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
2、需要异步操作的方法或类:
@Slf4j
@Component //将该类注入到容器中
public class AsyncTest {
@Async("threadPoolTaskExecutor") //调用该线程池
public void task() throws InterruptedException {
Thread.sleep(4000);
log.info("task异步处理");
}
}
3、测试类
@Slf4j
@RestController
public class PoolTest {
@Autowired
private AsyncTest asyncTest;
@RequestMapping("/pool")
public void poolTest() throws InterruptedException {
log.info("主线程开始");
asyncTest.task();
log.info("主线程结束");
}
}
4、结果:
可以看出是由不同的线程执行。
三、注意要点
1、@Async需要在Spring环境下才能启动,使用的是AOP动态代理技术
2、@Async使用的是动态代理来实现异步调用,因此不能够在同一个类中进行调用。方法一定要从另一个类中调用,也就是从类的外部调用,类的内部调用是无效的,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器。
3、注解的方法必须是public方法
4、需要在@SpringBootApplication启动类或者@configure注解类上 添加注解@EnableAsync启动多线程注解。
5、@Async就会对标注的方法开启异步多线程调用,注意,这个方法的类一定要交给spring容器来管理。
6、异步方法使用注解@Async的返回值只能为void或者Future。
四、有返回值的异步处理
上述方法实现的是无返回值的异步操作,接下来实现的是有返回值的异步操作,即返回值为Future。
Future的方法
isDone()
返回Boolean值,用来判断该异步任务是否执行完成,如果执行完成,返回true,如果未执行完成,则返回false。
cancel(boolean mayInterruptRunning)
返回boolean值,参数也是一个boolean值,用来传入是否可以打断当前正在执行的任务。如果参数是true且当前任务没有执行完成,说明可以打断当前任务,那么就会返回true;如果当前任务还没有执行,那么不管参数是true还是false,返回值都是true;如果当前任务已经完成,那么不管参数是true还是false,那么返回值都是false;如果当前任务没有完成且参数是false,那么返回值也是false。总结下来就是:1.如果任务还没执行,那么如果想取消任务,就一定返回true,与参数无关。2.如果任务已经执行完成,那么任务一定是不能取消的,所以此时返回值都是false,与参数无关。3.如果任务正在执行中,那么此时是否取消任务就看参数是否允许打断(true/false)。
isCancelled()
返回的是boolean类型,如果是上面总结的第三种情况,这才是真正意义上有机会被取消的任务,那么此时如果上面的方法返回的是true,那么说明任务取消成功了,则这个方法返回的也就是true。
get()
返回的是在异步方法中最后return 的那个对象中的value的值。主要是通过里面的get()方法来获取异步任务的执行结果,这个方法是阻塞的,直到异步任务执行完成。
get(long timeout,TimeUnit unit)
这个方法和get()的功能是一样的(在方法执行没有超时的情况下效果是一样的),只不过这里参数中设置了超时时间,因为get()在执行的时候是需要等待回调结果的,是阻塞在那里的,如果不设置超时时间,它就阻塞在那里直到有了任务执行完成。我们设置超时时间,就可以在当前任务执行太久的情况下中断当前任务,释放线程,这样就不会导致一直占用资源。参数一是时间的数值,参数二是参数一的单位,可以在TimeUnit这个枚举中选择单位。如果任务执行超时,则抛出TimeOut异常,返回的message就是null。
实现
异步方法
@Service
@Slf4j
public class ThreadServiceImpl {
@Async("threadPoolTaskExecutor")
public Future<String> thread(){
try {
Thread thread = Thread.currentThread();
String name = thread.getName();
log.info(name); //输出线程名称
Thread.sleep(3000); //线程睡眠3秒
return new AsyncResult<>(name);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new AsyncResult<>(null);
}
}
接口实现
@RestController
@Slf4j
public class ThreadTest {
@Autowired
private ThreadServiceImpl threadService;
@RequestMapping("/thread/test")
public void test() throws ExecutionException, InterruptedException {
ArrayList<Future> list = new ArrayList<>();
log.info("主线程开始");
for (int i = 0; i < 3; i++) {
Future<String> thread = threadService.thread(); //调用3次异步线程方法
list.add(thread);
}
log.info("调用异步线程完毕");
Thread.sleep(2000);
log.info("睡眠两秒");
List<String> stringList = new ArrayList<>();
for (Future future: list) {
stringList.add((String) future.get()); //get()是阻塞方法
}
log.info("主线程结束");
log.info("stringList={}",JSON.toJSONString(stringList));
}
}
结果
17:05:11.764 INFO [nio-8787-exec-1] com.thread.ThreadTest : 主线程开始
17:05:11.766 INFO [nio-8787-exec-1] com.thread.ThreadTest : 调用异步线程完毕
17:05:11.768 INFO [ task-async1] com.thread.ThreadServiceImpl : task-async1
17:05:11.768 INFO [ task-async2] com.thread.ThreadServiceImpl : task-async2
17:05:11.768 INFO [ task-async3] com.thread.ThreadServiceImpl : task-async3
17:05:13.776 INFO [nio-8787-exec-1] com.thread.ThreadTest : 睡眠两秒
17:05:14.772 INFO [nio-8787-exec-1] com.thread.ThreadTest : 主线程结束
17:05:14.812 INFO [nio-8787-exec-1] com.thread.ThreadTest : stringList=["task-async1","task-async2","task-async3"]