跳转至

Java 中的 CompletableFuture 详解

简介

在 Java 编程中,异步编程是提高程序性能和响应能力的重要手段。CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它实现了 Future 接口,并且提供了更丰富的功能,使得开发者可以更方便地处理异步任务和链式操作。本文将详细介绍 CompletableFuture 的基础概念、使用方法、常见实践以及最佳实践。

目录

  1. 基础概念
  2. 使用方法
    • 创建 CompletableFuture
    • 处理 CompletableFuture 的结果
    • 链式操作
  3. 常见实践
    • 异步任务的组合
    • 异常处理
    • 超时处理
  4. 最佳实践
  5. 小结
  6. 参考资料

基础概念

CompletableFuture 是 Java 中用于异步编程的一个类,它代表一个可能还未完成的异步操作的结果。与传统的 Future 接口相比,CompletableFuture 提供了更强大的功能,例如可以通过回调函数处理异步任务的结果,支持链式操作,方便进行多个异步任务的组合等。

CompletableFuture 可以通过多种方式创建,并且可以通过一系列的方法对其进行操作,如 thenApplythenAcceptthenCompose 等,这些方法可以用于处理异步任务的结果,或者将多个异步任务组合在一起。

使用方法

创建 CompletableFuture

1. 使用 CompletableFuture.runAsync 创建无返回值的异步任务

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务执行完成");
        }, executor);

        // 等待任务完成
        future.join();
        executor.shutdown();
    }
}

2. 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureSupplyExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步任务返回的结果";
        }, executor);

        // 获取任务结果
        String result = future.join();
        System.out.println(result);
        executor.shutdown();
    }
}

处理 CompletableFuture 的结果

1. 使用 thenApply 处理有返回值的结果

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenApplyExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "原始结果";
        }, executor).thenApply(result -> result + " 经过处理");

        String finalResult = future.join();
        System.out.println(finalResult);
        executor.shutdown();
    }
}

2. 使用 thenAccept 处理无返回值的结果

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenAcceptExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步任务结果";
        }, executor).thenAccept(result -> System.out.println("处理结果: " + result));

        future.join();
        executor.shutdown();
    }
}

链式操作

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ChainingExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "第一步结果";
        }, executor)
          .thenApply(result -> result + " 第二步处理")
          .thenApply(result -> result + " 第三步处理");

        String finalResult = future.join();
        System.out.println(finalResult);
        executor.shutdown();
    }
}

常见实践

异步任务的组合

1. 使用 thenCompose 组合两个异步任务

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenComposeExample {
    public static CompletableFuture<String> getFirstResult() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "第一个任务结果";
        });
    }

    public static CompletableFuture<String> getSecondResult(String input) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return input + " 加上第二个任务结果";
        });
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = getFirstResult().thenCompose(ThenComposeExample::getSecondResult);
        String finalResult = future.join();
        System.out.println(finalResult);
        executor.shutdown();
    }
}

2. 使用 thenCombine 组合两个独立的异步任务

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenCombineExample {
    public static CompletableFuture<String> task1() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 1 结果";
        });
    }

    public static CompletableFuture<String> task2() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 2 结果";
        });
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = task1().thenCombine(task2(), (result1, result2) -> result1 + " 和 " + result2);
        String finalResult = future.join();
        System.out.println(finalResult);
        executor.shutdown();
    }
}

异常处理

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExceptionHandlingExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "正常结果";
        }, executor).exceptionally(ex -> {
            System.out.println("处理异常: " + ex.getMessage());
            return "异常处理后的结果";
        });

        String result = future.join();
        System.out.println(result);
        executor.shutdown();
    }
}

超时处理

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TimeoutExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "正常结果";
        }, executor);

        try {
            String result = future.orTimeout(1, TimeUnit.SECONDS).join();
            System.out.println(result);
        } catch (Exception e) {
            System.out.println("任务超时: " + e.getMessage());
        }
        executor.shutdown();
    }
}

最佳实践

  1. 合理使用线程池:在创建 CompletableFuture 时,尽量使用自定义的线程池,避免使用默认的线程池,以更好地控制线程资源。
  2. 异常处理:在异步任务中,要对可能出现的异常进行处理,避免程序崩溃。可以使用 exceptionally 方法来处理异常。
  3. 避免阻塞操作:在异步任务中,尽量避免使用阻塞操作,如 Thread.sleep,如果需要等待一段时间,可以使用 CompletableFuture 提供的超时处理方法。
  4. 链式操作的可读性:在进行链式操作时,要注意代码的可读性,可以将复杂的操作拆分成多个步骤,提高代码的可维护性。

小结

CompletableFuture 是 Java 中一个强大的异步编程工具,它提供了丰富的功能,使得开发者可以更方便地处理异步任务和链式操作。通过本文的介绍,我们了解了 CompletableFuture 的基础概念、使用方法、常见实践以及最佳实践。在实际开发中,合理使用 CompletableFuture 可以提高程序的性能和响应能力。

参考资料

  1. 《Effective Java》