哈喽,刷题小伙伴们!Day21咱们吃透了ThreadLocal的线程隔离机制,解决了线程内上下文传递和非线程安全工具类的复用问题。今天咱们进入并发进阶的又一核心考点——CompletableFuture!在实际开发中,异步任务的编排是高频需求(比如先调用用户接口,再调用订单接口,最后汇总结果),传统的Future接口功能薄弱,无法优雅实现任务的串行、并行与聚合,而CompletableFuture正是为解决这些痛点而生!今天咱们就从异步编程的痛点入手,拆解CompletableFuture的核心原理、常用API、任务编排技巧与实战场景,彻底掌握这个异步编程的“瑞士军刀”!
今日核心目标
理解传统Future的局限性,明确CompletableFuture的核心优势;掌握CompletableFuture的创建方式与核心API;掌握异常处理的正确姿势(避免异步任务“吞异常”);明确CompletableFuture的使用场景与性能优化技巧。一、前置认知:异步编程的痛点与CompletableFuture的诞生
在Java中,异步任务的执行最早依赖Future+线程池实现,但Future存在三大致命痛点,无法满足复杂的任务编排需求:无法手动完成任务:Future的任务状态由线程池控制,开发者无法手动标记任务完成或失败;无回调机制:任务执行完成后,无法自动触发回调函数,只能通过get()方法阻塞获取结果,或轮询isDone()方法,效率极低;不支持任务编排:无法优雅实现多个Future的串行(A完成后执行B)、并行(A和B同时执行)、聚合(合并A和B的结果)操作。CompletableFuture实现了Future和CompletionStage接口,CompletionStage接口定义了任务编排的核心方法,让CompletableFuture具备以下能力:手动完成任务:支持complete()、completeExceptionally()手动标记任务成功或失败;异步回调机制:通过thenAccept()、whenComplete()等方法实现非阻塞回调;强大的任务编排:支持串行、并行、聚合等多种任务组合方式,满足复杂业务场景;完善的异常处理:提供exceptionally()、handle()等方法处理异步任务的异常,避免“吞异常”问题。二、基础用法:CompletableFuture的创建与核心API
CompletableFuture的使用分为两步:创建异步任务和处理任务结果。下面先介绍核心创建方式和常用API。CompletableFuture提供了多种静态方法创建异步任务,可分为“有返回值”和“无返回值”两类,推荐结合自定义线程池使用(避免使用默认的ForkJoinPool.commonPool(),防止资源竞争)。方法名 | 是否有返回值 | 核心说明 |
runAsync(Runnable runnable) | 无 | 执行Runnable任务,无返回值,使用默认线程池 |
runAsync(Runnable runnable, Executor executor) | 无 | 执行Runnable任务,无返回值,使用自定义线程池(推荐) |
supplyAsync(Supplier<U> supplier) | 有 | 执行Supplier任务,有返回值,使用默认线程池 |
supplyAsync(Supplier<U> supplier, Executor executor) | 有 | 执行Supplier任务,有返回值,使用自定义线程池(推荐) |
import java.util.concurrent.*;public class CompletableFutureBasicDemo { // 自定义线程池(推荐使用,避免默认线程池资源竞争) private static final ExecutorService CUSTOM_POOL = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); publicstaticvoidmain(String[] args) throws ExecutionException, InterruptedException { // 1. 无返回值异步任务 CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> { System.out.println("无返回值任务执行中,线程:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }, CUSTOM_POOL); voidFuture.get(); // 阻塞获取结果(无返回值,仅等待任务完成) // 2. 有返回值异步任务 CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.println("有返回值任务执行中,线程:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "异步任务执行结果"; }, CUSTOM_POOL); String result = supplyFuture.get(); // 阻塞获取返回值 System.out.println("有返回值任务结果:" + result); // 关闭线程池 CUSTOM_POOL.shutdown(); }}
CompletableFuture的API众多,按功能可分为四类,掌握分类后更容易记忆和使用:结果处理类:thenAccept()(消费结果)、thenApply()(转换结果)、whenComplete()(处理结果和异常);任务编排类:thenCompose()(串行任务)、thenCombine()(合并两个任务结果)、allOf()(等待所有任务完成)、anyOf()(等待任一任务完成);异常处理类:exceptionally()(异常兜底)、handle()(处理结果+异常);手动完成类:complete()(手动标记成功)、completeExceptionally()(手动标记失败)。三、核心实战:CompletableFuture任务编排三大模式
任务编排是CompletableFuture的核心能力,也是面试和实战的高频考点。下面讲解三种最常用的编排模式:串行、并行、聚合。场景:任务B依赖任务A的执行结果,需在A完成后执行。核心使用thenCompose()方法(避免嵌套Future,返回扁平化的CompletableFuture)。import java.util.concurrent.*;public class CompletableFutureSerialDemo { private static final ExecutorService CUSTOM_POOL = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10) ); // 模拟查询用户信息 private static CompletableFuture<String> queryUser(Long userId) { return CompletableFuture.supplyAsync(() -> { System.out.println("查询用户" + userId + "信息,线程:" + Thread.currentThread().getName()); return "用户" + userId + "信息"; }, CUSTOM_POOL); } // 模拟查询用户订单(依赖用户信息) private static CompletableFuture<String> queryOrder(String userInfo) { return CompletableFuture.supplyAsync(() -> { System.out.println("根据" + userInfo + "查询订单,线程:" + Thread.currentThread().getName()); return userInfo + "的订单列表"; }, CUSTOM_POOL); } public static void main(String[] args) throws ExecutionException, InterruptedException { // 串行执行:queryUser → queryOrder CompletableFuture<String> finalResult = queryUser(1L) .thenCompose(userInfo -> queryOrder(userInfo)); System.out.println("最终结果:" + finalResult.get()); CUSTOM_POOL.shutdown(); }}
场景:任务A和任务B无依赖关系,可同时执行,最后合并结果。核心使用thenCombine()方法。import java.util.concurrent.*;public class CompletableFutureParallelDemo { private static final ExecutorService CUSTOM_POOL = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10) ); // 模拟查询商品信息 privatestatic CompletableFuture<String> queryProduct(Long productId) { return CompletableFuture.supplyAsync(() -> { System.out.println("查询商品" + productId + "信息,线程:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "商品" + productId + "详情"; }, CUSTOM_POOL); } // 模拟查询商品库存 privatestatic CompletableFuture<Integer> queryStock(Long productId) { return CompletableFuture.supplyAsync(() -> { System.out.println("查询商品" + productId + "库存,线程:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 100; // 库存数量 }, CUSTOM_POOL); } publicstaticvoidmain(String[] args) throws ExecutionException, InterruptedException { Long productId = 1001L; // 并行执行两个任务,合并结果 CompletableFuture<String> combineResult = queryProduct(productId) .thenCombine(queryStock(productId), (productInfo, stock) -> { return productInfo + ",库存:" + stock + "件"; }); System.out.println("合并结果:" + combineResult.get()); CUSTOM_POOL.shutdown(); }}
场景:需要等待多个异步任务全部完成,或任一任务完成。核心使用allOf()(等待所有)和anyOf()(等待任一)。适合批量任务处理(如批量查询多个用户信息),无返回值,需手动获取每个任务的结果。import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;public class CompletableFutureAllOfDemo { private static final ExecutorService CUSTOM_POOL = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10) ); privatestatic CompletableFuture<String> queryUser(Long userId) { return CompletableFuture.supplyAsync(() -> { System.out.println("查询用户" + userId + ",线程:" + Thread.currentThread().getName()); return "用户" + userId + "信息"; }, CUSTOM_POOL); } publicstaticvoidmain(String[] args) throws ExecutionException, InterruptedException { // 批量查询用户1、2、3的信息 List<CompletableFuture<String>> futureList = new ArrayList<>(); for (long i = 1; i <= 3; i++) { futureList.add(queryUser(i)); } // 等待所有任务完成 CompletableFuture<Void> allFuture = CompletableFuture.allOf( futureList.toArray(new CompletableFuture[0]) ); // 所有任务完成后,获取每个任务的结果 allFuture.get(); for (CompletableFuture<String> future : futureList) { System.out.println("任务结果:" + future.get()); } CUSTOM_POOL.shutdown(); }}
适合多源数据查询(如从多个缓存源查询数据,取最快的结果),返回第一个完成的任务结果。import java.util.concurrent.*;public class CompletableFutureAnyOfDemo { private static final ExecutorService CUSTOM_POOL = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10) ); // 模拟从缓存A查询 private static CompletableFuture<String> queryFromCacheA(String key) { return CompletableFuture.supplyAsync(() -> { System.out.println("从缓存A查询" + key + ",线程:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); // 模拟慢查询 } catch (InterruptedException e) { e.printStackTrace(); } return "缓存A-" + key; }, CUSTOM_POOL); } // 模拟从缓存B查询 private static CompletableFuture<String> queryFromCacheB(String key) { return CompletableFuture.supplyAsync(() -> { System.out.println("从缓存B查询" + key + ",线程:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); // 模拟快查询 } catch (InterruptedException e) { e.printStackTrace(); } return "缓存B-" + key; }, CUSTOM_POOL); } public static void main(String[] args) throws ExecutionException, InterruptedException { String key = "userId:100"; // 从两个缓存查询,取最快的结果 CompletableFuture<Object> anyFuture = CompletableFuture.anyOf( queryFromCacheA(key), queryFromCacheB(key) ); System.out.println("最快结果:" + anyFuture.get()); CUSTOM_POOL.shutdown(); }}
四、避坑指南:CompletableFuture异常处理的正确姿势
异步任务的异常若未正确处理,会导致“吞异常”问题(任务失败但无任何日志),排查难度极大。CompletableFuture提供了两种核心异常处理方式:当任务执行异常时,执行指定的函数,返回默认值,不影响后续任务执行。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("任务执行中..."); int i = 1 / 0; // 模拟异常 return "正常结果";}, CUSTOM_POOL).exceptionally(e -> { System.out.println("任务执行异常:" + e.getMessage()); return "默认兜底结果"; // 异常时返回默认值});System.out.println("最终结果:" + future.get());
whenComplete():任务完成后执行(无论成功或失败),无返回值;handle():任务完成后执行(无论成功或失败),有返回值,可处理结果或异常。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("任务执行中..."); return "正常结果";}, CUSTOM_POOL).whenComplete((result, e) -> { if (e != null) { System.out.println("任务异常:" + e.getMessage()); } else { System.out.println("任务成功,结果:" + result); }}).handle((result, e) -> { if (e != null) { return "异常处理后的结果"; } else { return result + "(处理后)"; }});System.out.println("最终结果:" + future.get());
五、性能优化与使用注意事项
务必使用自定义线程池:避免使用默认的ForkJoinPool.commonPool(),尤其是在IO密集型任务中,自定义线程池可更好地控制资源;避免过度异步:异步任务的创建和切换有一定开销,简单任务无需异步化;合理设置线程池参数:IO密集型任务(如网络请求、数据库操作)可设置较大的线程数(核心线程数=CPU核心数*2);CPU密集型任务设置较小的线程数(核心线程数=CPU核心数);及时关闭线程池:异步任务执行完毕后,调用shutdown()关闭线程池,避免资源泄漏;避免嵌套调用:串行任务优先使用thenCompose(),而非嵌套supplyAsync(),防止出现“Future嵌套”导致的代码可读性差问题。六、今日打卡
评论区留下你的答案:以下场景该如何使用CompletableFuture实现?✅场景:电商下单流程,需要依次执行:① 扣减库存 → ② 生成订单 → ③ 发送短信通知;同时,扣减库存和生成订单需要合并结果后再发送短信。请写出核心代码。文末预告
Day23预告:Day22咱们掌握了CompletableFuture的异步任务编排,明天咱们进入并发进阶的另一核心——Fork/Join框架!它是Java专为分治任务设计的并行计算框架,适合大数据量的分治处理(如海量数据排序、统计)。明天咱们拆解其核心原理、工作流程与实战案例,掌握并行计算的高效方案!