上一章中介绍了如何创建正确的线程池
,当我们使用线程池
的时候调用 execute(Runnable command)
方法,但是这个方法只能提交任务,无法获取任务的执行结果
(没有返回值),获取任务结果
在很多场景下又是刚需,这篇文章介绍的是如何在使用 ThreadPoolExecutor
的时候获取任务执行结果。
如何获取任务执行结果
ThreadPoolExecutor
提供了 3
个 submit
() 方法和 1
个 FutureTask
工具类来支持获得任务执行结果的请求。
下面是 3个 submit
方法:
// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable任务及结果引用
<T> Future<T> submit(Runnable task, T result);
// 提交一个 Runnable 任务
Future<?> submit(Runnable task);
这三个方法的返回值都是 Future
接口, Futrue
接口有 5 个方法:
public interface Future<V> {
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已经取消
boolean isCancelled();
// 判断任务是否已经结束
boolean isDone();
// 获得任务执行结果
V get() throws InterruptedException, ExecutionException;
// 获得任务执行结果,支持超时
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
通过 Future
的这5
个方法可以发现:提交的任务不但可以获取任务执行结果,还可以取消任务。但是需要注意的是:这两个 get
() 方法都是阻塞式
的,如果调用的时候任务还没有执行完,调用 get
() 方法的线程会被阻塞
,直到任务执行完才会被唤醒。
ExecutorService
中的 3个 submit
方法主要区别在于参数不同,下面是简要介绍:
- 提交一个
Runnable
任务的submit(Rnnable task)
: 这个方法的参数是一个Runnable
接口,Runnable
接口的run()
方法是没有返回值
的,所以submit(Runnable task)
这个方法返回的Future
仅可以用来断言
任务已经结束,类似Thread.join()
。 - 提交
Callable
任务submit(Callable<T> task)
:这个方法的入参是一个Callable
接口,它只有一个call()
方法,并且这个方法是有返回值的,所以这个方法返回的Future
对象可以通过调用get()
方法来获取任务执行的结果 - 提交
Runnable
任务以及结果引用的submit(Runnable task, T result)
: 这个方法很有意思,假设这个方法返回的Future
对象是f
,f.get()
的返回值
就是 传给submit
的方法入参 result
。 下面有个例子展示了这个方法的经典用法。需要注意的是Runnable
接口实现类Task
声明了一个有参构造函数
Task(Result r)
,创建Task
对象的时候传入了result
对象,这样就能在类Task
的run()
方法中对result
进行各种操作
了。result
相当于 主线程 和 子线程之间的桥梁
,通过它可以实现线程之间数据的共享
。
示例代码:
ExecutorService executor = Executors.newFixedThreadPool(1);
// 创建Result对象r
Result r = new Result();
r.setAAA(a);
//提交任务
Future<Result> future = executor.submit(new Task(r), r);
Result fr = future.get();
// 下面的等式成立:
// fr === r;
// fr.getAAA() === a;
// fr.getXXX() === x;
}
class Result<T> {
T t;
T getAAA() {
return t;
}
void setXXX(T x) {
this.t = x;
}
}
【这个示例代码不太行,最起码没让我搞明白,而且是伪代码,还是要去找一个能真正跑起来的代码加深一下理解。】
下面介绍 FutureTask 工具类。 Future 是一个接口,而 FutureTask 是一个实实在在的工具与类,这个工具类有两个构造函数,参数与 submit() 方法类似:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask 的使用很简单,FutureTask 实现了 Runnable 和 Future 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行。 又因为实现了 Future 接口,所以也能获取任务执行的结果。
下面代码是将 FutureTask
对象提交给 ThreadPoolExecutor
去执行的示例代码:
// 创建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es =
Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
FutureTask
直接被 Thread
执行的示例代码如下:
// 创建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();
可以看到,使用 FutureTask
对象可以很容易的获取子线程的执行结果。
实现最优"烧水泡茶"程序
《烧水泡茶》是华罗庚先生文章《统筹方法
》中的一个例子,这是一个典型的异步
任务,文中提到的最优工序是下面这样:
上面是图解,下面是用程序模拟这个最优工序。之前提到过,并发编程可以总结为三个核心问题:分工、同步、互斥。
编写并发程序,首先要做的就是分工 —— 如何高效地拆解任务并分配给线程。
对于烧水泡茶这个程序,可以用两个线程 T1
和 T2
来完成。 T1
负责洗水壶,烧开水,泡茶这三道工序。 T2
负责洗茶壶,洗茶杯、放茶叶三道工序。 T1 在执行泡茶
这道工序时需要等待
T2 完成放茶叶
这道工序。 对于 T1
的这个等待动作,你能想到很多方法实现:Thread.join()
、CountDownLatch
、阻塞队列
。 但是在这篇文章中使用 Future
特性来实现。
下面的示例代码使用了这一章提到的 Future
特性来实现:
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建T2线程的FutureTask
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
// 创建T1线程的FutureTask
FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
// 线程T1 执行任务 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1的执行结果
System.out.println(ft1.get());
}
// T1Task 需要执行的任务:洗水壶、烧开水、泡茶
private static class T1Task implements Callable<String> {
FutureTask<String> ft2;
//T1 任务需要 T2任务的 FutureTask
T1Task(FutureTask<String> ft2) {
this.ft2 = ft2;
}
@Override
public String call() throws Exception {
System.out.println("T1 洗水壶");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1 烧开水");
TimeUnit.SECONDS.sleep(15);
//获取 T2 线程的插页
String tf = ft2.get();
System.out.println("T1 拿到茶叶" + tf);
System.out.println("T1 开始泡茶");
return "上茶" + tf;
}
}
// T2Task 需要执行的任务:洗茶壶、洗茶杯、放茶叶
private static class T2Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T2 洗茶壶");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2 洗茶杯");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2 拿茶叶");
TimeUnit.SECONDS.sleep(1);
return "龙井";
}
}
}
/**
运行后输出:
T2 洗茶壶
T1 洗水壶
T1 烧开水
T2 洗茶杯
T2 拿茶叶
T1 拿到茶叶龙井
T1 开始泡茶
上茶龙井
*/
总结
Future
的用处:很容易的获得异步任务的执行结果,无论这个任务是通过线程池 ThreadPoolExecutor
执行还是通过手工创建子线程执行。Future
可以类比为现实中的提货单,去订货之后店家给你个单子,能货准备好后再凭借单子去提货。
利用多线程可以快速将一些串行的任务并行化,从而提高性能。如果任务之间有依赖关系,比如一个任务依赖另一个任务的执行结果,这种问题基本都可以用 Future
解决。
在分析这种问题的过程中,建议画图描述任务之间的依赖关系,会更直观。
Q.E.D.
Comments | 0 条评论