跳转至

Java CompletableFuture 示例详解

简介

在 Java 编程中,异步编程是提高程序性能和响应性的重要手段。CompletableFuture 是 Java 8 引入的一个强大工具,用于支持异步编程和处理异步操作的结果。它提供了丰富的 API 来处理并发任务,使得编写复杂的异步程序变得更加简单和直观。本文将深入介绍 CompletableFuture 的基础概念、使用方法、常见实践以及最佳实践,并通过代码示例帮助读者更好地理解和应用。

目录

  1. 基础概念
  2. 使用方法
  3. 常见实践
  4. 最佳实践
  5. 小结
  6. 参考资料

基础概念

什么是 CompletableFuture

CompletableFuturejava.util.concurrent 包中的一个类,它实现了 Future 接口和 CompletionStage 接口。Future 接口用于表示一个异步计算的结果,而 CompletionStage 接口则定义了一系列的异步操作步骤,可以将多个异步任务组合起来。

异步编程的好处

  • 提高性能:通过异步执行任务,可以充分利用多核处理器的资源,减少线程阻塞,提高程序的整体性能。
  • 增强响应性:在处理耗时的 I/O 操作时,异步编程可以避免主线程阻塞,使得程序能够及时响应用户的请求。

使用方法

创建 CompletableFuture

1. 使用 CompletableFuture.runAsync() 执行无返回值的任务

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 创建一个 CompletableFuture 来执行无返回值的任务
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // 模拟耗时操作
                Thread.sleep(2000);
                System.out.println("任务执行完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, executor);

        // 等待任务完成
        future.join();

        // 关闭线程池
        executor.shutdown();
    }
}

2. 使用 CompletableFuture.supplyAsync() 执行有返回值的任务

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureSupplyExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 创建一个 CompletableFuture 来执行有返回值的任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟耗时操作
                Thread.sleep(2000);
                return "任务执行结果";
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }, executor);

        // 获取任务结果
        String result = future.join();
        System.out.println("任务结果: " + result);

        // 关闭线程池
        executor.shutdown();
    }
}

处理 CompletableFuture 的结果

1. 使用 thenApply() 处理任务结果

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenApplyExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 创建一个 CompletableFuture 来执行有返回值的任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟耗时操作
                Thread.sleep(2000);
                return "任务执行结果";
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }, executor);

        // 使用 thenApply() 处理任务结果
        CompletableFuture<String> processedFuture = future.thenApply(result -> {
            return result + " 经过处理";
        });

        // 获取处理后的结果
        String finalResult = processedFuture.join();
        System.out.println("最终结果: " + finalResult);

        // 关闭线程池
        executor.shutdown();
    }
}

2. 使用 thenAccept() 消费任务结果

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenAcceptExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 创建一个 CompletableFuture 来执行有返回值的任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟耗时操作
                Thread.sleep(2000);
                return "任务执行结果";
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }, executor);

        // 使用 thenAccept() 消费任务结果
        future.thenAccept(result -> {
            System.out.println("消费任务结果: " + result);
        });

        // 等待任务完成
        future.join();

        // 关闭线程池
        executor.shutdown();
    }
}

常见实践

组合多个 CompletableFuture

1. 使用 thenCompose() 顺序执行两个异步任务

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenComposeExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 第一个异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟耗时操作
                Thread.sleep(2000);
                return "任务1执行结果";
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }, executor);

        // 第二个异步任务依赖于第一个任务的结果
        CompletableFuture<String> future2 = future1.thenCompose(result1 -> {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    // 模拟耗时操作
                    Thread.sleep(2000);
                    return result1 + " 任务2执行结果";
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return null;
                }
            }, executor);
        });

        // 获取最终结果
        String finalResult = future2.join();
        System.out.println("最终结果: " + finalResult);

        // 关闭线程池
        executor.shutdown();
    }
}

2. 使用 thenCombine() 合并两个异步任务的结果

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenCombineExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 第一个异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟耗时操作
                Thread.sleep(2000);
                return "任务1执行结果";
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }, executor);

        // 第二个异步任务
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟耗时操作
                Thread.sleep(2000);
                return "任务2执行结果";
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }, executor);

        // 合并两个任务的结果
        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
            return result1 + " " + result2;
        });

        // 获取合并后的结果
        String finalResult = combinedFuture.join();
        System.out.println("最终结果: " + finalResult);

        // 关闭线程池
        executor.shutdown();
    }
}

处理异常

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExceptionHandlingExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 创建一个 CompletableFuture 来执行有返回值的任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟抛出异常
            throw new RuntimeException("任务执行出错");
        }, executor);

        // 处理异常
        CompletableFuture<String> handledFuture = future.exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "默认结果";
        });

        // 获取处理后的结果
        String result = handledFuture.join();
        System.out.println("最终结果: " + result);

        // 关闭线程池
        executor.shutdown();
    }
}

最佳实践

合理使用线程池

在使用 CompletableFuture 时,建议使用自定义的线程池,而不是使用默认的 ForkJoinPool.commonPool()。这样可以更好地控制线程的数量和资源的使用,避免对其他任务产生影响。

避免阻塞操作

尽量避免在 CompletableFuture 中执行阻塞操作,如 Thread.sleep() 或同步的 I/O 操作。如果必须执行阻塞操作,建议使用自定义的线程池来执行这些任务,以避免影响其他任务的执行。

异常处理

在处理 CompletableFuture 时,一定要进行异常处理,避免程序因为未捕获的异常而崩溃。可以使用 exceptionally() 方法来处理异常,并提供默认的处理结果。

小结

CompletableFuture 是 Java 中一个强大的异步编程工具,它提供了丰富的 API 来处理异步任务和组合多个异步操作。通过合理使用 CompletableFuture,可以提高程序的性能和响应性。在使用过程中,需要注意合理使用线程池、避免阻塞操作和进行异常处理,以确保程序的稳定性和可靠性。

参考资料