极客时间 ——《Java并发编程实战》 23 | Future:如何用多线程实现最优"烧水泡茶" 程序?

2020-10-28   10 次阅读


上一章中介绍了如何创建正确的线程池,当我们使用线程池的时候调用 execute(Runnable command) 方法,但是这个方法只能提交任务,无法获取任务的执行结果(没有返回值),获取任务结果在很多场景下又是刚需,这篇文章介绍的是如何在使用 ThreadPoolExecutor 的时候获取任务执行结果。

如何获取任务执行结果

ThreadPoolExecutor 提供了 3submit() 方法和 1FutureTask 工具类来支持获得任务执行结果的请求。

下面是 3个 submit 方法:

// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);

// 提交一个 Runnable任务及结果引用
<T> Future<T> submit(Runnable task, T result);

// 提交一个 Runnable 任务
Future<?> submit(Runnable task);

这三个方法的返回值都是 Future 接口, Futrue 接口有 5 个方法:

public interface Future<V> {

    // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 判断任务是否已经取消
    boolean isCancelled();

    // 判断任务是否已经结束
    boolean isDone();

    // 获得任务执行结果
    V get() throws InterruptedException, ExecutionException;

    // 获得任务执行结果,支持超时
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

通过 Future 的这5个方法可以发现:提交的任务不但可以获取任务执行结果,还可以取消任务。但是需要注意的是:这两个 get() 方法都是阻塞式的,如果调用的时候任务还没有执行完,调用 get() 方法的线程会被阻塞,直到任务执行完才会被唤醒。

ExecutorService 中的 3个 submit 方法主要区别在于参数不同,下面是简要介绍:

  1. 提交一个 Runnable 任务的 submit(Rnnable task) : 这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束,类似 Thread.join()
  2. 提交 Callable 任务 submit(Callable<T> task):这个方法的入参是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用 get() 方法来获取任务执行的结果
  3. 提交 Runnable 任务以及结果引用submit(Runnable task, T result) : 这个方法很有意思,假设这个方法返回的 Future 对象是 f, f.get()返回值就是 传给 submit 的方法入参 result。 下面有个例子展示了这个方法的经典用法。需要注意的是 Runnable 接口实现类 Task 声明了一个有参构造函数 Task(Result r),创建 Task 对象的时候传入result 对象,这样就能在类 Taskrun() 方法中对 result 进行各种操作了。 result 相当于 主线程子线程之间的桥梁,通过它可以实现线程之间数据的共享

示例代码:

      ExecutorService executor = Executors.newFixedThreadPool(1);

        // 创建Result对象r
        Result r = new Result();
        r.setAAA(a);

        //提交任务
        Future<Result> future = executor.submit(new Task(r), r);
        Result fr = future.get();
        // 下面的等式成立:
        // fr === r;
        // fr.getAAA() === a;
        // fr.getXXX() === x;

    }

    class Result<T> {
        T t;

        T getAAA() {
            return t;
        }

        void setXXX(T x) {
            this.t = x;
        }
    }

【这个示例代码不太行,最起码没让我搞明白,而且是伪代码,还是要去找一个能真正跑起来的代码加深一下理解。】

下面介绍 FutureTask 工具类。 Future 是一个接口,而 FutureTask 是一个实实在在的工具与类,这个工具类有两个构造函数,参数与 submit() 方法类似:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask 的使用很简单,FutureTask 实现了 Runnable 和 Future 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行。 又因为实现了 Future 接口,所以也能获取任务执行的结果。

下面代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行的示例代码:


// 创建FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = 
  Executors.newCachedThreadPool();
// 提交FutureTask 
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();

FutureTask 直接被 Thread 执行的示例代码如下:


// 创建FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();

可以看到,使用 FutureTask 对象可以很容易的获取子线程的执行结果。

实现最优"烧水泡茶"程序

《烧水泡茶》是华罗庚先生文章《统筹方法》中的一个例子,这是一个典型的异步任务,文中提到的最优工序是下面这样:

上面是图解,下面是用程序模拟这个最优工序。之前提到过,并发编程可以总结为三个核心问题:分工、同步、互斥。

编写并发程序,首先要做的就是分工 —— 如何高效地拆解任务并分配给线程。

对于烧水泡茶这个程序,可以用两个线程 T1T2 来完成。 T1 负责洗水壶,烧开水,泡茶这三道工序。 T2 负责洗茶壶,洗茶杯、放茶叶三道工序。 T1 在执行泡茶这道工序时需要等待T2 完成放茶叶这道工序。 对于 T1 的这个等待动作,你能想到很多方法实现:Thread.join()CountDownLatch阻塞队列。 但是在这篇文章中使用 Future 特性来实现。

下面的示例代码使用了这一章提到的 Future 特性来实现:

public class FutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建T2线程的FutureTask
        FutureTask<String> ft2 = new FutureTask<>(new T2Task());

        // 创建T1线程的FutureTask
        FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));

        // 线程T1 执行任务 ft1
        Thread T1 = new Thread(ft1);
        T1.start();

        // 线程T2执行任务ft2
        Thread T2 = new Thread(ft2);
        T2.start();

        // 等待线程T1的执行结果
        System.out.println(ft1.get());


    }

    // T1Task 需要执行的任务:洗水壶、烧开水、泡茶
    private static class T1Task implements Callable<String> {
        FutureTask<String> ft2;

        //T1 任务需要 T2任务的 FutureTask
        T1Task(FutureTask<String> ft2) {
            this.ft2 = ft2;
        }



        @Override
        public String call() throws Exception {
            System.out.println("T1 洗水壶");
            TimeUnit.SECONDS.sleep(1);

            System.out.println("T1 烧开水");
            TimeUnit.SECONDS.sleep(15);

            //获取 T2 线程的插页
            String tf = ft2.get();
            System.out.println("T1 拿到茶叶" + tf);

            System.out.println("T1 开始泡茶");
            return "上茶" + tf;
        }
    }

    // T2Task 需要执行的任务:洗茶壶、洗茶杯、放茶叶
    private static class T2Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("T2 洗茶壶");

            TimeUnit.SECONDS.sleep(1);

            System.out.println("T2 洗茶杯");
            TimeUnit.SECONDS.sleep(2);

            System.out.println("T2 拿茶叶");
            TimeUnit.SECONDS.sleep(1);

            return "龙井";
        }
    }
}
/**
运行后输出:
T2 洗茶壶
T1 洗水壶
T1 烧开水
T2 洗茶杯
T2 拿茶叶
T1 拿到茶叶龙井
T1 开始泡茶
上茶龙井
*/

总结

Future 的用处:很容易的获得异步任务的执行结果,无论这个任务是通过线程池 ThreadPoolExecutor 执行还是通过手工创建子线程执行。Future 可以类比为现实中的提货单,去订货之后店家给你个单子,能货准备好后再凭借单子去提货。

利用多线程可以快速将一些串行的任务并行化,从而提高性能。如果任务之间有依赖关系,比如一个任务依赖另一个任务的执行结果,这种问题基本都可以用 Future 解决。

在分析这种问题的过程中,建议画图描述任务之间的依赖关系,会更直观。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。