在前面的篇章中,我们掌握了 CompletableFuture 的核心 API、异常处理和线程池配置,能够应对大部分常规异步场景。但在生产环境中,还会遇到任务超时 、 失败重试 、 与并行流结合等更复杂的需求。本篇将深入讲解 CompletableFuture 的高级特性,包括超时控制 、 失败重试机制 、 与 Stream 并行流的协同使用,以及 异步任务的编排模式 ,帮你打造更健壮、更灵活的异步编程方案。在异步编程中, 超时控制是必不可少的 —— 如果一个异步任务因为网络故障或服务宕机一直阻塞,会导致资源泄漏和请求堆积。CompletableFuture 提供了 orTimeout() 和 completeOnTimeout() 两个方法,实现优雅的超时处理。orTimeout() 方法的核心作用是:当任务在指定时间内未完成时,抛出 TimeoutException 异常,可以结合异常处理 API 实现降级逻辑。public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
- 超时后任务会被标记为异常状态,抛出 TimeoutException
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class OrTimeoutDemo { private static final ExecutorService executor = Executors.newFixedThreadPool(2); // 模拟远程接口调用 private static CompletableFuture<String> callRemoteApi(String url) { return CompletableFuture.supplyAsync(() -> { try { // 模拟接口响应延迟 Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } return url + " 响应结果"; }, executor); } public static void main(String[] args) { CompletableFuture<String> future = callRemoteApi("https://api.example.com") // 设置超时时间为1秒 .orTimeout(1, TimeUnit.SECONDS) // 超时异常降级 .exceptionally(ex -> { System.out.println("接口调用超时:" + ex.getMessage()); return "默认降级结果"; }); System.out.println("最终结果:" + future.join()); executor.shutdown(); }}
接口调用超时:java.util.concurrent.TimeoutException最终结果:默认降级结果
completeOnTimeout ():超时返回默认值completeOnTimeout() 与 orTimeout() 类似,但它不会抛出异常,而是在超时后直接返回一个默认值,任务状态标记为正常完成。public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)
- 超时后任务正常完成,不会触发 exceptionally()
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class CompleteOnTimeoutDemo { private static final ExecutorService executor = Executors.newFixedThreadPool(2); // 模拟缓存查询 private static CompletableFuture<String> queryCache(String key) { return CompletableFuture.supplyAsync(() -> { try { // 模拟缓存服务延迟 Thread.sleep(1500); } catch (InterruptedException e) { throw new RuntimeException(e); } return "缓存值:" + key; }, executor); } public static void main(String[] args) { CompletableFuture<String> future = queryCache("user_1001") // 超时1秒返回默认值 .completeOnTimeout("缓存降级值", 1, TimeUnit.SECONDS); System.out.println("最终结果:" + future.join()); executor.shutdown(); }}
| | |
|---|
| | |
|---|
| 会触发 exceptionally()/handle() | |
|---|
| | |
|---|
- 接口调用优先用 orTimeout():可以捕获超时异常,记录详细日志,方便排查问题。
- 缓存 / 本地查询优先用 completeOnTimeout():简化代码,无需额外处理异常。
- 超时时间合理设置:根据业务场景设置,例如普通接口设为 1 秒,复杂接口设为 3 秒,避免过短或过长。
在分布式系统中,异步任务可能会因为网络抖动 、 服务临时不可用等原因失败,此时需要重试机制来提升任务成功率。CompletableFuture 本身没有提供重试 API,但我们可以通过递归调用结合异常处理实现灵活的重试。实现思路:在 exceptionally() 中判断异常类型和重试次数,未达到最大次数则递归调用原任务。import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class RetryDemo1 { private static final ExecutorService executor = Executors.newFixedThreadPool(2); // 最大重试次数 private static final int MAX_RETRY = 3; // 模拟远程接口调用,随机失败 private static CompletableFuture<String> callApi(String url) { return CompletableFuture.supplyAsync(() -> { System.out.println("调用接口:" + url); // 模拟50%概率失败 if (Math.random() > 0.5) { throw new RuntimeException("接口调用失败"); } return url + " 响应成功"; }, executor); } // 重试方法:递归实现 private static CompletableFuture<String> callApiWithRetry(String url, int retryCount) { return callApi(url) .exceptionally(ex -> { // 判断是否达到最大重试次数 if (retryCount >= MAX_RETRY) { System.out.println("达到最大重试次数,降级处理"); return "接口降级结果"; } // 重试间隔1秒 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("重试第" + (retryCount + 1) + "次"); // 递归重试 return callApiWithRetry(url, retryCount + 1).join(); }); } public static void main(String[] args) { CompletableFuture<String> future = callApiWithRetry("https://api.example.com", 0); System.out.println("最终结果:" + future.join()); executor.shutdown(); }}
调用接口:https://api.example.com重试第1次调用接口:https://api.example.com重试第2次调用接口:https://api.example.com重试第3次调用接口:https://api.example.com达到最大重试次数,降级处理最终结果:接口降级结果
固定间隔重试在高并发场景下可能会加剧服务压力(例如所有请求同时重试),更优雅的方式是使用指数退避策略 —— 重试间隔随重试次数指数增长(如 1s→2s→4s→8s)。import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class RetryDemo2 { private static final ExecutorService executor = Executors.newFixedThreadPool(2); private static final int MAX_RETRY = 3; // 初始重试间隔 private static final long INITIAL_DELAY = 1000; private static CompletableFuture<String> callApi(String url) { return CompletableFuture.supplyAsync(() -> { System.out.println("调用接口:" + url); if (Math.random() > 0.5) { throw new RuntimeException("接口调用失败"); } return url + " 响应成功"; }, executor); } // 指数退避重试 private static CompletableFuture<String> callApiWithExponentialRetry(String url, int retryCount) { return callApi(url) .exceptionally(ex -> { if (retryCount >= MAX_RETRY) { System.out.println("达到最大重试次数,降级"); return "降级结果"; } // 指数退避:间隔 = 初始间隔 * (2^重试次数) long delay = INITIAL_DELAY * (1L << retryCount); try { System.out.println("等待" + delay + "ms后重试"); TimeUnit.MILLISECONDS.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("指数退避重试第" + (retryCount + 1) + "次"); return callApiWithExponentialRetry(url, retryCount + 1).join(); }); } public static void main(String[] args) { CompletableFuture<String> future = callApiWithExponentialRetry("https://api.example.com", 0); System.out.println("最终结果:" + future.join()); executor.shutdown(); }}
调用接口:https://api.example.com等待1000ms后重试指数退避重试第1次调用接口:https://api.example.com等待2000ms后重试指数退避重试第2次调用接口:https://api.example.com等待4000ms后重试指数退避重试第3次调用接口:https://api.example.com达到最大重试次数,降级最终结果:降级结果
- 限制最大重试次数:避免无限重试导致的资源耗尽,建议设置 3~5 次。
- 使用指数退避策略:减少对下游服务的压力,避免雪崩效应。
- 区分异常类型:只对 临时性异常 (如网络超时、服务忙)重试,对 永久性异常 (如参数错误、权限不足)直接降级。
- 结合线程池隔离:重试任务使用独立线程池,避免影响主业务线程池。
CompletableFuture与Stream并行流的协同Java 8 的 Stream 提供了并行流( parallelStream() )功能,底层同样基于 ForkJoinPool.commonPool() 。我们可以结合 CompletableFuture 实现更灵活的并行计算,解决并行流的一些局限性。- 线程池不可控:并行流默认使用 ForkJoinPool.commonPool() ,与 CompletableFuture 共享线程池,容易引发资源竞争。
- 超时与异常处理弱:并行流没有内置的超时机制,异常处理不够灵活。
- 无法设置优先级:所有任务优先级相同,无法区分核心任务和非核心任务。
CompletableFuture 替代并行流:可控的并行计算通过 CompletableFuture.supplyAsync() 结合 Stream ,可以实现自定义线程池的并行计算,同时支持超时和异常处理。需求:批量查询 10 个用户的信息,每个查询耗时 500ms,要求总耗时不超过 1 秒。import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.stream.Collectors;public class CompletableFutureWithStream { // 自定义并行线程池 private static final ExecutorService PARALLEL_POOL = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new BasicThreadFactory.Builder().namingPattern("parallel-thread-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy() ); // 模拟查询单个用户信息 private static CompletableFuture<String> queryUser(String userId) { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return "用户" + userId + "信息"; }, PARALLEL_POOL); } public static void main(String[] args) { long start = System.currentTimeMillis(); // 构建用户ID列表 List<String> userIds = new ArrayList<>(); for (int i = 1; i <= 10; i++) { userIds.add("" + i); } // 并行查询所有用户信息 List<CompletableFuture<String>> futureList = userIds.stream() .map(userId -> queryUser(userId).orTimeout(1, TimeUnit.SECONDS)) .collect(Collectors.toList()); // 等待所有任务完成并聚合结果 List<String> userList = futureList.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); // 输出结果 System.out.println("用户信息列表:"); userList.forEach(System.out::println); System.out.println("总耗时:" + (System.currentTimeMillis() - start) + "ms"); PARALLEL_POOL.shutdown(); }}
用户信息列表:用户1信息用户2信息...用户10信息总耗时:1010ms
- 线程池核心线程数为 5,10 个任务分两批执行,每批 5 个,总耗时约 500*2=1000ms,符合预期。
- 相比并行流,使用自定义线程池可以精确控制并发度,避免资源竞争。
- 并行流适用于简单场景:如果任务简单、无依赖、不需要超时控制,并行流更简洁。
- CompletableFuture 适用于复杂场景:需要自定义线程池、超时控制、重试机制时,优先使用 CompletableFuture。
- 避免并行流与 CompletableFuture 混用:两者默认共享 commonPool ,容易导致资源竞争,建议统一使用自定义线程池。
在复杂业务场景中,我们需要对多个异步任务进行编排,常见的编排模式有串行编排 、 并行编排 、 聚合编排 、 分支编排等。- 适用场景:任务之间有依赖关系,如订单→商品→品牌的多级查询。
- 实现方式:使用 thenCompose() 或 thenApplyAsync() 实现链式调用。
- 适用场景:任务之间无依赖,需要并行执行以缩短总耗时,如电商首页多源数据查询。
- 实现方式:使用 CompletableFuture.allOf() 结合自定义线程池。
- 核心优势:充分利用多核 CPU,总耗时等于最慢任务的耗时。
- 适用场景:需要聚合多个任务的结果,支持部分任务失败降级,如批量查询用户信息。
- 实现方式:使用 handle() 对每个任务的结果进行处理,再聚合。
- 适用场景:根据不同条件执行不同的异步任务,如根据用户等级查询不同的会员权益。
- 实现方式:结合 if-else 或 switch ,动态创建不同的 CompletableFuture 任务。
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class BranchOrchestrationDemo { private static final ExecutorService executor = Executors.newFixedThreadPool(2); static class User { private String userId; private int level; public User(String userId, int level) { this.userId = userId; this.level = level; } public int getLevel() { return level; } } // 普通会员权益查询 private static CompletableFuture<String> queryNormalRights() { return CompletableFuture.supplyAsync(() -> "普通会员:无折扣", executor); } // 高级会员权益查询 private static CompletableFuture<String> queryVipRights() { return CompletableFuture.supplyAsync(() -> "高级会员:9折优惠", executor); } public static void main(String[] args) { User user = new User("1001", 2); // 2级为高级会员 // 分支编排:根据用户等级选择不同任务 CompletableFuture<String> rightsFuture = user.getLevel() >= 2 ? queryVipRights() : queryNormalRights(); System.out.println("会员权益:" + rightsFuture.join()); executor.shutdown(); }}
- 超时控制: orTimeout() 抛异常, completeOnTimeout() 返默认值,根据场景选择。
- 失败重试:通过递归 + exceptionally() 实现,推荐指数退避策略。
- 与并行流结合:复杂场景用 CompletableFuture + 自定义线程池,简单场景用并行流。
- 任务编排:串行用 thenCompose() ,并行用 allOf() ,聚合用 handle() ,分支用条件判断。
优先使用自定义线程池:避免默认线程池的资源竞争问题。超时 + 重试 + 降级 三板斧:提升异步任务的稳定性和容错性。合理选择编排模式:根据任务依赖关系选择对应的 API,避免过度设计。加强监控与日志:记录任务的执行时间、重试次数、异常信息,方便问题排查。