Java CompletableFuture 全面解析
简介
在 Java 编程中,异步编程是提高程序性能和响应能力的重要手段。CompletableFuture
是 Java 8 引入的一个强大的异步编程工具,它实现了 Future
和 CompletionStage
接口,为开发者提供了丰富的 API 来处理异步任务。本文将详细介绍 CompletableFuture
的基础概念、使用方法、常见实践以及最佳实践,帮助读者深入理解并高效使用这一工具。
目录
- 基础概念
- 使用方法
- 创建 CompletableFuture
- 链式调用
- 常见实践
- 异步任务的组合
- 异常处理
- 批量任务处理
- 最佳实践
- 线程池的使用
- 避免阻塞操作
- 小结
- 参考资料
基础概念
Future 接口
在 Java 中,Future
接口代表一个异步计算的结果。它提供了检查计算是否完成、等待计算完成并获取结果等方法。然而,Future
的功能有限,例如它无法进行链式调用,也不能很好地处理异常。
CompletableFuture
CompletableFuture
是 Future
的扩展,它不仅具备 Future
的基本功能,还实现了 CompletionStage
接口,这使得它可以进行链式调用,方便地处理异步任务的结果和异常。CompletableFuture
支持多种异步任务的组合方式,如顺序执行、并行执行等。
使用方法
创建 CompletableFuture
1. 使用 CompletableFuture.runAsync
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureExample {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 创建一个 CompletableFuture 并异步执行一个任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task completed");
}, executor);
// 关闭线程池
executor.shutdown();
}
}
runAsync
方法用于执行一个无返回值的异步任务,它接受一个 Runnable
接口的实现和一个可选的 Executor
。
2. 使用 CompletableFuture.supplyAsync
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureSupplyExample {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 创建一个 CompletableFuture 并异步执行一个有返回值的任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task result";
}, executor);
// 关闭线程池
executor.shutdown();
}
}
supplyAsync
方法用于执行一个有返回值的异步任务,它接受一个 Supplier
接口的实现和一个可选的 Executor
。
链式调用
CompletableFuture
支持链式调用,通过 thenApply
、thenAccept
、thenRun
等方法可以对异步任务的结果进行处理。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureChainingExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
}).thenApply(result -> {
return result + " World";
}).thenApply(result -> {
return result.toUpperCase();
});
// 获取最终结果
String finalResult = future.join();
System.out.println(finalResult);
}
}
thenApply
方法接受一个 Function
接口的实现,用于对前一个任务的结果进行转换。
常见实践
异步任务的组合
1. 顺序执行
import java.util.concurrent.CompletableFuture;
public class CompletableFutureSequentialExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task 1 result";
});
CompletableFuture<String> future2 = future1.thenApply(result -> {
return result + " + Task 2 result";
});
String finalResult = future2.join();
System.out.println(finalResult);
}
}
2. 并行执行
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureParallelExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 1 result";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 2 result";
});
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
combinedFuture.join();
String result1 = future1.get();
String result2 = future2.get();
System.out.println(result1 + " + " + result2);
}
}
allOf
方法用于并行执行多个 CompletableFuture
,并等待所有任务完成。
异常处理
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExceptionHandlingExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Something went wrong");
}).exceptionally(ex -> {
System.out.println("Exception caught: " + ex.getMessage());
return "Default result";
});
String result = future.join();
System.out.println(result);
}
}
exceptionally
方法用于处理异步任务中抛出的异常。
批量任务处理
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class CompletableFutureBatchProcessingExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int taskId = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Task " + taskId + " result";
});
futures.add(future);
}
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
List<String> results = allResults.get();
results.forEach(System.out::println);
}
}
最佳实践
线程池的使用
在使用 CompletableFuture
时,建议使用自定义的线程池,避免使用默认的线程池。默认线程池是 ForkJoinPool.commonPool()
,它是一个共享的线程池,可能会影响其他任务的执行。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Task result";
}, executor);
executor.shutdown();
}
}
避免阻塞操作
尽量避免在 CompletableFuture
的任务中进行阻塞操作,如 Thread.sleep()
或 IO
操作。如果必须进行阻塞操作,建议使用自定义的线程池来执行这些任务,以免影响其他任务的执行。
小结
CompletableFuture
是 Java 中一个强大的异步编程工具,它提供了丰富的 API 来处理异步任务。通过本文的介绍,我们了解了 CompletableFuture
的基础概念、使用方法、常见实践以及最佳实践。在实际开发中,合理使用 CompletableFuture
可以提高程序的性能和响应能力。
参考资料
- 《Effective Java》
- 《Java 8 in Action》