使用ThreadPoolTaskExecutor与@Async快速实现多线程异步操作

一、前言

ThreadpoolTaskExecutor相对于ThreadpoolExecutor来说,是使用了ThreadPoolExecutor并增强,扩展了更多特性。它是Spring提供的线程池,帮助我们快速创建一个可用的线程池来使用。

@Async是Spring的注解,可以加在类或方法上。通俗的来讲,如果加上了这个注解,那么该类或者该方法在使用时将会进行异步处理,也就是创建一个线程来实现这个类或者方法,实现多线程。

二、使用

1、配置线程池

package com.czf.connect.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author zfChen
 * @create 2022/1/13 9:49
 */
@EnableAsync	//表示开启多线程
@Configuration
public class ThreadPoolConfig {
    /**
     * 核心线程数
     */
    private static final int CORE_POOL_SIZE = 20;
    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 40;
    /**
     * 队列长度
     */
    private static final int QUEUE_CAPACITY = 200;
    /**
     * 线程池维护线程所允许的空闲时间
     */
    private static final int KEEP_ALIVE_SECONDS = 60;

    @Bean(name = "threadPoolTaskExecutor")	//ThreadPoolTaskExecutor不会自动创建ThreadPoolExecutor,需要手动调initialize才会创建。如果@Bean就不需手动,会自动InitializingBean的afterPropertiesSet来调initialize
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置线程池最大线程数
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        // 线程池活跃的线程数
        executor.setCorePoolSize(CORE_POOL_SIZE);
        // 设置线程队列最大线程数
        executor.setQueueCapacity(QUEUE_CAPACITY);
        // 线程池维护线程所允许的空闲时间
        executor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
        executor.setThreadNamePrefix("task-async");//线程前缀名称
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

2、需要异步操作的方法或类:

@Slf4j
@Component		//将该类注入到容器中
public class AsyncTest {
    @Async("threadPoolTaskExecutor")		//调用该线程池
    public void task() throws InterruptedException {
        Thread.sleep(4000);
        log.info("task异步处理");
    }
}

3、测试类

@Slf4j
@RestController
public class PoolTest {

    @Autowired
    private AsyncTest asyncTest;

    @RequestMapping("/pool")
    public void poolTest() throws InterruptedException {
        log.info("主线程开始");
        asyncTest.task();
        log.info("主线程结束");
    }
}

4、结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aY9QhFUk-1642401928200)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1642401298345.png)]
可以看出是由不同的线程执行。

三、注意要点

1、@Async需要在Spring环境下才能启动,使用的是AOP动态代理技术

2、@Async使用的是动态代理来实现异步调用,因此不能够在同一个类中进行调用。方法一定要从另一个类中调用,也就是从类的外部调用,类的内部调用是无效的,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器。

3、注解的方法必须是public方法

4、需要在@SpringBootApplication启动类或者@configure注解类上 添加注解@EnableAsync启动多线程注解。

5、@Async就会对标注的方法开启异步多线程调用,注意,这个方法的类一定要交给spring容器来管理。

6、异步方法使用注解@Async的返回值只能为void或者Future

四、有返回值的异步处理

上述方法实现的是无返回值的异步操作,接下来实现的是有返回值的异步操作,即返回值为Future。

Future的方法

isDone()

返回Boolean值,用来判断该异步任务是否执行完成,如果执行完成,返回true,如果未执行完成,则返回false。

cancel(boolean mayInterruptRunning)

返回boolean值,参数也是一个boolean值,用来传入是否可以打断当前正在执行的任务。如果参数是true且当前任务没有执行完成,说明可以打断当前任务,那么就会返回true;如果当前任务还没有执行,那么不管参数是true还是false,返回值都是true;如果当前任务已经完成,那么不管参数是true还是false,那么返回值都是false;如果当前任务没有完成且参数是false,那么返回值也是false。总结下来就是:1.如果任务还没执行,那么如果想取消任务,就一定返回true,与参数无关。2.如果任务已经执行完成,那么任务一定是不能取消的,所以此时返回值都是false,与参数无关。3.如果任务正在执行中,那么此时是否取消任务就看参数是否允许打断(true/false)。

isCancelled()

返回的是boolean类型,如果是上面总结的第三种情况,这才是真正意义上有机会被取消的任务,那么此时如果上面的方法返回的是true,那么说明任务取消成功了,则这个方法返回的也就是true。

get()

返回的是在异步方法中最后return 的那个对象中的value的值。主要是通过里面的get()方法来获取异步任务的执行结果,这个方法是阻塞的直到异步任务执行完成

get(long timeout,TimeUnit unit)

这个方法和get()的功能是一样的(在方法执行没有超时的情况下效果是一样的),只不过这里参数中设置了超时时间,因为get()在执行的时候是需要等待回调结果的,是阻塞在那里的,如果不设置超时时间,它就阻塞在那里直到有了任务执行完成。我们设置超时时间,就可以在当前任务执行太久的情况下中断当前任务,释放线程,这样就不会导致一直占用资源。参数一是时间的数值,参数二是参数一的单位,可以在TimeUnit这个枚举中选择单位。如果任务执行超时,则抛出TimeOut异常,返回的message就是null。

实现

异步方法

@Service
@Slf4j
public class ThreadServiceImpl {

    @Async("threadPoolTaskExecutor")
    public Future<String> thread(){
        try {
            Thread thread = Thread.currentThread();
            String name = thread.getName();
            log.info(name);		//输出线程名称
            Thread.sleep(3000);		//线程睡眠3秒
            return new AsyncResult<>(name);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new AsyncResult<>(null);
    }

}

接口实现

@RestController
@Slf4j
public class ThreadTest {

    @Autowired
    private ThreadServiceImpl threadService;

    @RequestMapping("/thread/test")
    public void test() throws ExecutionException, InterruptedException {
        ArrayList<Future> list = new ArrayList<>();
        log.info("主线程开始");
        for (int i = 0; i < 3; i++) {
            Future<String> thread = threadService.thread();	//调用3次异步线程方法
            list.add(thread);
        }
        log.info("调用异步线程完毕");
        Thread.sleep(2000);
        log.info("睡眠两秒");
        List<String> stringList = new ArrayList<>();
        for (Future future: list) {
            stringList.add((String) future.get());	//get()是阻塞方法
        }
        log.info("主线程结束");
        log.info("stringList={}",JSON.toJSONString(stringList));
    }
}

结果

17:05:11.764  INFO  [nio-8787-exec-1] com.thread.ThreadTest          : 主线程开始
17:05:11.766  INFO  [nio-8787-exec-1] com.thread.ThreadTest          : 调用异步线程完毕
17:05:11.768  INFO  [    task-async1] com.thread.ThreadServiceImpl   : task-async1
17:05:11.768  INFO  [    task-async2] com.thread.ThreadServiceImpl   : task-async2
17:05:11.768  INFO  [    task-async3] com.thread.ThreadServiceImpl   : task-async3
17:05:13.776  INFO  [nio-8787-exec-1] com.thread.ThreadTest          : 睡眠两秒
17:05:14.772  INFO  [nio-8787-exec-1] com.thread.ThreadTest          : 主线程结束
17:05:14.812  INFO  [nio-8787-exec-1] com.thread.ThreadTest          : stringList=["task-async1","task-async2","task-async3"]


版权声明:本文为weixin_45866737原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>