跳转至

Java CompletableFuture 全面解析

简介

在 Java 编程中,异步编程是提高程序性能和响应能力的重要手段。CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它实现了 FutureCompletionStage 接口,为开发者提供了丰富的 API 来处理异步任务。本文将详细介绍 CompletableFuture 的基础概念、使用方法、常见实践以及最佳实践,帮助读者深入理解并高效使用这一工具。

目录

  1. 基础概念
  2. 使用方法
    • 创建 CompletableFuture
    • 链式调用
  3. 常见实践
    • 异步任务的组合
    • 异常处理
    • 批量任务处理
  4. 最佳实践
    • 线程池的使用
    • 避免阻塞操作
  5. 小结
  6. 参考资料

基础概念

Future 接口

在 Java 中,Future 接口代表一个异步计算的结果。它提供了检查计算是否完成、等待计算完成并获取结果等方法。然而,Future 的功能有限,例如它无法进行链式调用,也不能很好地处理异常。

CompletableFuture

CompletableFutureFuture 的扩展,它不仅具备 Future 的基本功能,还实现了 CompletionStage 接口,这使得它可以进行链式调用,方便地处理异步任务的结果和异常。CompletableFuture 支持多种异步任务的组合方式,如顺序执行、并行执行等。

使用方法

创建 CompletableFuture

1. 使用 CompletableFuture.runAsync

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 创建一个 CompletableFuture 并异步执行一个任务
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task completed");
        }, executor);

        // 关闭线程池
        executor.shutdown();
    }
}

runAsync 方法用于执行一个无返回值的异步任务,它接受一个 Runnable 接口的实现和一个可选的 Executor

2. 使用 CompletableFuture.supplyAsync

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureSupplyExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 创建一个 CompletableFuture 并异步执行一个有返回值的任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task result";
        }, executor);

        // 关闭线程池
        executor.shutdown();
    }
}

supplyAsync 方法用于执行一个有返回值的异步任务,它接受一个 Supplier 接口的实现和一个可选的 Executor

链式调用

CompletableFuture 支持链式调用,通过 thenApplythenAcceptthenRun 等方法可以对异步任务的结果进行处理。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChainingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }).thenApply(result -> {
            return result + " World";
        }).thenApply(result -> {
            return result.toUpperCase();
        });

        // 获取最终结果
        String finalResult = future.join();
        System.out.println(finalResult);
    }
}

thenApply 方法接受一个 Function 接口的实现,用于对前一个任务的结果进行转换。

常见实践

异步任务的组合

1. 顺序执行

import java.util.concurrent.CompletableFuture;

public class CompletableFutureSequentialExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            return "Task 1 result";
        });

        CompletableFuture<String> future2 = future1.thenApply(result -> {
            return result + " + Task 2 result";
        });

        String finalResult = future2.join();
        System.out.println(finalResult);
    }
}

2. 并行执行

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureParallelExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 1 result";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 2 result";
        });

        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
        combinedFuture.join();

        String result1 = future1.get();
        String result2 = future2.get();
        System.out.println(result1 + " + " + result2);
    }
}

allOf 方法用于并行执行多个 CompletableFuture,并等待所有任务完成。

异常处理

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionHandlingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Something went wrong");
        }).exceptionally(ex -> {
            System.out.println("Exception caught: " + ex.getMessage());
            return "Default result";
        });

        String result = future.join();
        System.out.println(result);
    }
}

exceptionally 方法用于处理异步任务中抛出的异常。

批量任务处理

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class CompletableFutureBatchProcessingExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<CompletableFuture<String>> futures = new ArrayList<>();

        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                return "Task " + taskId + " result";
            });
            futures.add(future);
        }

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> {
            return futures.stream()
                   .map(CompletableFuture::join)
                   .collect(Collectors.toList());
        });

        List<String> results = allResults.get();
        results.forEach(System.out::println);
    }
}

最佳实践

线程池的使用

在使用 CompletableFuture 时,建议使用自定义的线程池,避免使用默认的线程池。默认线程池是 ForkJoinPool.commonPool(),它是一个共享的线程池,可能会影响其他任务的执行。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Task result";
        }, executor);

        executor.shutdown();
    }
}

避免阻塞操作

尽量避免在 CompletableFuture 的任务中进行阻塞操作,如 Thread.sleep()IO 操作。如果必须进行阻塞操作,建议使用自定义的线程池来执行这些任务,以免影响其他任务的执行。

小结

CompletableFuture 是 Java 中一个强大的异步编程工具,它提供了丰富的 API 来处理异步任务。通过本文的介绍,我们了解了 CompletableFuture 的基础概念、使用方法、常见实践以及最佳实践。在实际开发中,合理使用 CompletableFuture 可以提高程序的性能和响应能力。

参考资料

  1. 《Effective Java》
  2. 《Java 8 in Action》