当前位置:首页 > 技术分析 > 正文内容

Java多线程?用CompletableFuture就够了

ruisui883个月前 (02-05)技术分析18

前言

在项目开发中,经常会遇到一个问题:在一个后端接口里,往往会进行多项耗时任务(相互之间独立,没有依赖)的操作,如:

  • 需要从不同的外部接口获取不同的数据,做融合;
  • 请求外部接口数据的同时,还需要读取数据库;
  • 等等

如果在一个请求的主线程里,串行做这些任务操作,会导致响应时间的线性叠加,极有可能导致不符合要求,如图1:

图1

那么,对这些耗时任务进行并行操作,从而使得:响应时间 约等于 耗时最大的任务处理时间,这样可以大大降低系统的响应时间,如图2:

图2

Future

Future类型,其实就是一个未来任务的返回对象,或者说是子线程的返回对象(通过线程池方式分配子线程)

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务:
Callable task = new Task();
// 提交任务并获得Future:
Future future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
复制代码

可以看到,通过线程池的方式创建子线程后,executor.submit()返回的是一个Future对象,通过future.get()方法来获得该子任务的运行结果。需要注意的是,这个操作是阻塞的,也就是说,如果这个子任务没有运行结束,主线程会一直block在改行,直到子任务完成。

一个Future接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

CompletableFuture

当需要判断图2中的所有task是否完成时,如果采用Future,则需要:

  • 调用future.get()获取运行结果,
  • 或者轮询future.isDone()方法直到返回true

无论哪种方法,都是在主线程里调用,且会阻塞主线程。

以上痛点,从Java 8开始引入了CompletableFuture方法。主要新增的功能有:

  • thenAccept(): 当task正常完成后,回调调用.thenAccept()方法
  • exceptionally(): 当task出现异常时,回调调用.exceptionally()方法
  • anyOf(): 当所有的task中,只要有一个task完成,则主线程继续往下走,可以使用.anyOf()方法
  • allOf(): 所有的task均完成后,则主线程继续往下走
  • supplyAsync(): 异步执行,有返回值
  • runAsync(): 异步执行,无返回值

针对图2,需要所有task都完成后,再执行后续操作,就可以用allOf()方法:

CompletableFuture.allOf(task1, task2, ..., taskn).join();

注意:CompletableFuture的命名规则:

  • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示将异步在线程池中执行,即可以异步执行。

基于CompletableFuture+线程池的实现

线程池配置类

@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
    @Bean
    public Executor asyncExecutor() {
        log.info("start async executor");
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//        配置核心线程数
        threadPoolTaskExecutor.setCorePoolSize(ThreadPoolConstant.CORE_POOL_SIZE);
//        配置最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(ThreadPoolConstant.MAX_POOL_SIZE);
//        配置队列大小
        threadPoolTaskExecutor.setQueueCapacity(ThreadPoolConstant.QUEUE_CAPACITY);
//        配置线程池中线程的名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix(ThreadPoolConstant.THREAD_NAME_PREFIX);
//   HelloWorldServiceImpl     rejection-policy: 当pool已经达到max size时,如何处理新任务:
//        CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
//        AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
//        DiscardPolicy:丢弃当前将要加入队列的任务;
//        DiscardOldestPolicy:丢弃任务队列中最旧的任务;
        threadPoolTaskExecutor.setRejectedExecutionHandler(
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

异步服务与服务实现

public interface AsyncService {
    @Async("asyncExecutor")
    CompletableFuture getResponseFromCp(QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType, int queryType);
}
@Service
public class AsyncServiceImpl implements AsyncService {
    @Autowired
    CustomProps customProps;
    @Autowired
    RestTemplate restTemplate;
    @Override
    public CompletableFuture getResponseFromCp(QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType, int queryType) {
        return CompletableFuture
                .completedFuture(
                        FactoryUtil
                                .createFactory(customProps, null, restTemplate)
                                .obtainData(queryTrainInfoDetailReqDTOWithType.setQueryType(queryType), String.class)
                );
    }
}

业务代码中调用异步服务接口

...
    @Autowired
    AsyncService asyncService;

    @Override
    public ReturnData qTrainInfoDetail(QueryTrainInfoDetailReqDTO queryTrainInfoDetailReqDTO) {
        QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType = new QueryTrainInfoDetailReqDTOWithType().setQueryTrainInfoDetailReqDTO(queryTrainInfoDetailReqDTO);
        CompletableFuture fromCpFirstReq = asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType, 1);
        CompletableFuture fromCpSecondReq = asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType, 2);
        CompletableFuture.allOf(fromCpFirstReq, fromCpSecondReq).join(); //阻塞直到当第一次请求和第二次请求都完成
    }
...

扫描二维码推送至手机访问。

版权声明:本文由ruisui88发布,如需转载请注明出处。

本文链接:http://www.ruisui88.com/post/1702.html

标签: 技术博客
分享给朋友:

“Java多线程?用CompletableFuture就够了” 的相关文章

Linux Lite 6.6发行版正式发布:添加简体中文及AI工具支持

IT之家 9 月 8 日消息,Linux Lite 是一个对新手友好的 Linux 发行版,它基于 Ubuntu LTS,并以 Xfce 桌面为特色,主要针对 Windows 用户而设计,也被认为是从 Windows 过渡到 Linux 的友好方案。目前 Linux Lite 6.6 版本已经正式发...

红帽最新的企业 Linux 发行版具有解决混合云复杂性的新功能

据zdnet网5月1日报道,红帽这家 Linux 和超云领导者今天发布了其最新的旗舰 Linux 发行版 Red Hat Enterprise Linux (RHEL) 9.4,此前上周宣布对已有十年历史的流行 RHEL 7.9 再支持四年。这个领先的企业 Linux 发行版的最新版本引入了许多新功...

15款测试html5响应式的在线工具

手机、平板灯手持设备的增多,网站要顺应变化,就必须要做响应式开发,响应式网站最大的特点在于可以在不同设备下呈现不同的布局,是基于html5+css3技术,目前越来越多的网站开始采用了响应式设计,而下面15款工具可以方便测试你的html5响应式效果。Responsinatorhttp://www.re...

数组、去重、排序、合并、过滤、删除

ES6数字去重 Array.from(new Set([1,2,3,3,4,4])) //[1,2,3,4] [...new Set([1,2,3,3,4,4])] //[1,2,3,4]2、ES6数字排序 [1,2,3,4].sort(); // [1, 2,3,4],默认是升序...

Excel中的FILTER函数详细介绍及使用示例

在Excel中处理大量数据时,经常需要根据特定条件筛选出符合条件的数据行或列。这正是Excel的FILTER函数发挥作用的地方。FILTER函数是Excel中一个非常强大的工具,它可以基于一个或多个条件动态地过滤数据,使数据分析和报告制作变得更加高效和准确。本文将详细介绍FILTER函数的用法,并提...

vue2中路由的使用步骤,你学会了吗?

今天我们来整理下关于vue2中路由的使用步骤:1. 导入 vue 文件和Vue-router文件(注意:vue-router是依赖vue运行的,所以一定在vue后引入vue-router)2. 定义路由组件模板3. 创建路由实例并定义路由规则4. 将路由实例挂载给Vue实例5. 在结构区域定义控制路...