之前提到过,使用多线程优化性能,就是将串行操作变成并行操作。而在这个过程中,一定会涉及异步化。如下面2个方法都比较耗时:

// 串行方法
doBizA();
doBizB();

并行化:

new Thread(()->doBizA());.start()
new Thread(()->doBizB());.start()

分别启动2个线程去执行这2个方法就行了,主线程无需等待 doBizAdoBizB 的执行结果,这两个操作被异步化了。

异步化是并行方案的基础,是利用多线程优化性能得以实现的基础。所以最近几年异步编程大火,因为优化性能是互联网大厂的核心需求。

JDK8 中提供了 CompletableFuture 支持异步编程,这个类很复杂,但是功能也让人感到震撼。

CompletableFuture 的核心优势

为了领略 CompletableFuture 异步编程的优势,这里使用它重新实现上篇中提到的烧水泡茶程序。

首先还是需要制定分工方案:在下面的程序中,划分了 3 个任务:

  1. 任务1:负责洗水壶,烧开水
  2. 任务2:负责洗茶壶,洗茶杯,放茶叶
  3. 任务3:负责泡茶

其中任务3 要等待 任务1 和 任务2 完成后才能开始,示意图如下

下面是代码实现,先忽略 runAsync()、supplyAsync()、thenCombine() 这些不熟悉的方法,从大局上看,可以发现:

  1. 无需手工维护线程,给任务分配线程的工作也不需要我们关注
  2. 语义更清晰,例如 f3 = f1.thenCombine(f2,() -> {}) 可以清晰地表述"任务3等待任务1 和 任务2 完成后才能开始"
  3. 代码更简练并专注于业务逻辑,几乎所有代码都是业务相关的。
public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 任务1 :洗水壶->烧开水
        CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
            System.out.println("T1 洗水壶");
            sleep(1, TimeUnit.SECONDS);
            System.out.println("T1 烧开水");
            sleep(15, TimeUnit.SECONDS);
        });

        // 任务2:洗茶壶——洗茶杯——放茶叶
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("T2 洗茶壶");
            sleep(1, TimeUnit.SECONDS);

            System.out.println("T2 洗茶杯");
            sleep(1, TimeUnit.SECONDS);

            System.out.println("T2 拿茶叶");
            sleep(1, TimeUnit.SECONDS);

            return "龙井";
        });

        // 任务3:等待任务2和任务1执行完成后才执行:泡茶
        CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1 拿到茶叶" + tf);
            System.out.println("T1 泡茶");
            return "上茶" + tf;
        });
        // 主线程等待 任务3执行完成
        System.out.println(f3.join());
    }


    static void sleep(int t, TimeUnit unit) {
        try {
            unit.sleep(t);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/** 
一次执行结果:
T1 洗水壶
T2 洗茶壶
T2 洗茶杯
T1 烧开水
T2 拿茶叶
T1 拿到茶叶龙井
T1 泡茶
上茶龙井
*/

下面开始详细介绍 CompletableFuture 的使用,首先是创建对象。

创建 CompletableFuture 对象

创建 CompletableFuture 主要使用下面这 4 个静态方法: runAsync(Runnable runnable )supplyAsync(Supplier<U> supplier) 之间的区别是: Runnable 接口的 run() 没有返回值Supplier 接口的 get() 有返回值.

前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

默认情况下 CompletableFuture 使用公共ForkJoinPool 线程池,这个线程池默认创建的 线程数是 CPU 核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 多的线程数)

如果所有 CompletableFuture 共享一个线程池,一旦有任务执行一些很慢I/O操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

所以,强烈建议根据不同业务类型创建不同的线程池,避免互相干扰。


//使用默认线程池
static CompletableFuture<Void> 
  runAsync(Runnable runnable)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier)
//可以指定线程池  
static CompletableFuture<Void> 
  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier, Executor executor)  

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.ger() 方法,对于异步操作,需要关注两个问题:

  • 异步操作什么时候结束
  • 如何获取异步操作的执行结果

因为 CompletableFuture 实现了 Future 接口,所以这两个问题都可以通过 Future 来解决。 另外 CompletableFuture 还实现了 CompletableStage 接口,这个接口方法很多,下面介绍这个接口中的方法。

如何理解 CompletableStage 接口

任务是有时序关旭的,比如 串行关系、并行关系、汇聚关系等。还是用之前烧水泡茶的例子来说明:洗水壶和烧开水就是串行关系,烧水之前必须把壶洗干净

洗水壶、烧开水、洗茶壶、洗茶杯这两组任务之间就是并行关系:烧开水的同时可以去洗茶壶和茶杯。

而烧开水、放茶叶和泡茶就是汇聚关系:

CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的 f3 = f1.thenCombine(f2,() -> {}) 描述的就是一种**汇聚**关系。

烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的 AND指的是「所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)」。

既然有 AND 聚合关系,就一定有 OR 聚合关系,所谓的 OR 指的是 「依赖的任务只要有一个完成就可以执行当前任务。」

在编程领域还有一个绕不开的地方 —— 异常处理。 CompletionStage 接口可以方便地描述异常处理。

下面逐个介绍 CompletionStage 如何描述串行关系AND 聚合关系OR 聚合关系 以及 异常处理

1. 描述串行关系

CompletionStage 接口里描述串行关系,主要是 thenApplythenAcceptthenRunthenCompose 这四个相关系列接口。

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

public CompletionStage<Void> thenAccept(Consumer<? super T> action);

public CompletionStage<Void> thenRun(Runnable action);

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

thenApply 系列函数里 参数 fn 的类型是 接口 Funciton<T,R> 。这个接口里与 CompletionStage 相关的方法是 R apply(T t),这是一个函数接口既可以接收参数也支持返回值,所以 thenApply 系列方法返回的是 CompletionStage<R>

thenAccept 系列方法里参数 consumer 类型是接口 Consumer<T>,这个接口里与 CompletionStage 相关的方法是 void accept(T t) ,这个方法支持参数,但是没有返回值,所以 thenAccept 系列方法返回的是 CompletionStage<Void>

thenRun 系列方法里action 的参数是 Runnable ,所以 action 既不接收参数,也不支持返回值thenRun 系列方法返回的也是 CompletionStage<Void>

这些方法中,Async 代表异步执行 functionconsumer 或者 action。 其中需要注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列相同。

下面的示例代码演示了 thenApply() 方法如何使用:首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,所以虽然这是一个异步流程,但是任务①②③串行执行的**,②依赖①的结果,③依赖②的结果**。

public class CompletableFutureDemo {
    public static void main(String[] args) {
        CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> "Hello Word")    // ① 
                .thenApply(s -> s + " QQ") // ②
                .thenApply(String::toUpperCase); // ③

        System.out.println(f0.join());
    }
}
/**
输出
HELLO WORD QQ
*/

2.描述 AND 汇聚关系

CompletionStage 中描述 AND 汇聚关系的接口:thenCombinethenAcceptBothrunAfterBoth 这些接口的区别也是源自 fnconsumeraction 这三个核心参数的不同,参考上面烧水泡茶的例子即可。


CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

3. 描述 OR 汇聚关系

applyToEitheracceptEitherrunAfterEither 描述了 OR 汇聚关系,也是根据传入的 fnconsumeraction 三个核心参数不同

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

下面的代码展示了如何使用 applyToEither() 方法描述一个 OR 汇聚关系:

public class CompletableFutureDemo {
    public static void main(String[] args) {

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            int t = RandomUtils.nextInt(5, 10);
            sleep(t, TimeUnit.SECONDS);
            return String.valueOf(t);
        });

        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            int t = RandomUtils.nextInt(5, 10);
            sleep(t, TimeUnit.SECONDS);
            return String.valueOf(t);
        });

        CompletableFuture<String> f3 = f1.applyToEither(f2, s -> s);

        System.out.println(f3.join());
    }


    static void sleep(int t, TimeUnit unit) {
        try {
            unit.sleep(t);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4. 异常处理

fnconsumeraction 核心方法都不允许抛出可检查异常,但是无法限制它们抛出运行时异常。例如下面代码:执行 7/0 这一步时就会出现除零错误 运行时异常,非异步编程中可以使用 try-catch 来捕获并处理异常,异步编程中该怎也处理呢?

      CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(() -> (7 / 0))
                .thenApply(r -> r * 10);

        System.out.println(f0.join());

CompletionStage 提供的解决方案非常简单,比使用 try-catch 还要简单,下面是相关方法,使用这些方法进行异常处理和串行处理操作是一样的,都支持链式编程

    public boolean completeExceptionally(Throwable ex) {
        if (ex == null) throw new NullPointerException();
        boolean triggered = internalComplete(new AltResult(ex));
        postComplete();
        return triggered;
    }
    
     public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }
    
      public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }
    
    public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }
    
      public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }

下面代码展示了如何使用 exceptionally() 方法处理异常,该方法的使用非常类似 try-catch 中的 catch 块的用法,但是由于支持链式编程,所以相对更简单。

whenComplete() 和 handle() 则类似 try-finally 中的 finally{} ,无论是否发生异常都会执行 whenComplete() 中的回调函数 cosumerhandle() 中的回调函数 fn

whenComplete() 不支持返回结果handle() 支持返回结果

       CompletableFuture<Integer> f0 = CompletableFuture
                .supplyAsync(() -> 7 / 0)
                .thenApply(r -> r * 10)
                .exceptionally(e -> 0); // 当发生异常时,返回0
        System.out.println(f0.join());

总结

曾经的异步与大量的回调绑定,例如JS中的异步基本靠回调函数来解决,回调函数在处理异常以及复杂任务关系时往往力不从心。

但是最近几年,伴随着 ReactiveX 的发展(Java 中对应的实现是 RxJava) 回调地狱问题已经被完美解决了,异步编程慢慢开始成熟,Java 语言官方也开始支持异步,JDK8 提供了 CompletableFutureJDK9 提供了更加完善的 Flow API。

如果对异步编程感兴趣,可以重点关注学习 RxJava

个人总结

本章对于异步类 CompletableFuture 做了简单的说明,其中用到了大量的 函数式编程和新增的 函数式接口,这方面的知识如果之前一直没有补上的话还是要去学一下的,很有用。

同时这张介绍的还少太浅了,还需要多找些文章和例子动手去实际多做一下,慢慢就有感觉了。

另外 RxJava 这个库很不错,建议学习一下。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。