- {% post_link 读书笔记/极客时间/Java并发编程实战/第二部分—并发工具类/23|Future 23|Future %}
中有一个思考题:如何优化一个询价应用的核心代码。
如果采用 ThreadPoolExecutor
+ Future
的方案,优化的结果应该类似下面的示例代码:用三个线程异步
执行询价操作,通过三次调用 Future
的 get
() 方法获取结果,将结果入库。
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 异步向电商S1询价
Future<Integer> f1 = executor.submit(() -> getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2 = executor.submit(() -> getPriceByS2());
// 异步向电商S3询价
Future<Integer> f3 = executor.submit(() -> getPriceByS3());
// 获取对应电商报价并保存
r = f1.get();
executor.execute(() -> save(r));
r = f2.get();
executor.execute(() -> save(r));
r = f3.get();
executor.execute(() -> save(r));
上面的方案本身没有太大问题,但是有个地方的处理需要注意:如果获取电商 S1 报价的耗时很长,那么即使获取 S2 的报价很短,也无法让保存 S2 报价的操作先执行,因为主线程
会阻塞
在 f1.get
() 上。这个问题该怎样解决呢?
可以用之前提到的阻塞队列
:获取 S1
、S2
、S3
的报价都进入阻塞队列
,在主线程
中消费
队列,这样就能保证先获取到的报价先保存到数据库。
下面的代码展示了如何使用阻塞队列
实现先获取到先保存:
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 异步向电商S1询价
Future<Integer> f1 = executor.submit(() -> getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2 = executor.submit(() -> getPriceByS2());
// 异步向电商S3询价
Future<Integer> f3 = executor.submit(() -> getPriceByS3());
// 创建阻塞队列
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
// 获取电商报价的操作结果放入阻塞队列中
executor.execute(() -> bq.put(f1.get()));
executor.execute(() -> bq.put(f2.get()));
executor.execute(() -> bq.put(f3.get()));
// 异步保存所有报价
for (int i = 0; i < 3; i++) {
Integer r = bq.take();
executor.execute(() -> save());
利用 CompletionService 实现询价系统
在实际项目中并不建议向上面例子那样自己实现,因为 JDK 并发包
已经提供了设计精良的 CompletionService
。 利用这个类,不仅可以实现将先获取的报价保存到数据库中这个操作,还可以让代码更简练。
CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中。上面的示例代码是把任务最重的执行结果放入了阻塞队列中。
如何创建 CompletionService
CompletionService
的对应实现类是 ExecutorCompletionService
,这个实现类的构造方法有两个,分别是:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
// 默认使用无界队列
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
这两个构造方法都需要传入
一个线程池,如果不指定 completionQueue
,则默认使用无界的 LinkedBlockingQueue
,任务执行结果的 Future
存放到 completionQueue
中。
下面的示例代码完整地展示了使用 CompeltionService
实现上面的询价系统:
我们没有指定队列,所以默认使用无界队列 LinkedBlockingQueue
。 之后通过 CompletionService
提供的 submit
() 方法提交了三个询价操作,这三个询价操作会被 CompletionService
异步执行。
最后,通过 CompletionService
接口提供的 take
() 方法获取一个 Future
对象(之前说过,阻塞队列中存放的是任务的执行结果 Future
对象),调用 Future
对象的 get
() 方法就能返回询价的执行结果了。
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向3个供应商询价
cs.submit(() -> getPriceByS1());
cs.submit(() -> getPriceByS2());
cs.submit(() -> getPriceByS3());
// 将查询结果异步保存到数据库
for (int i = 0; i < 3; i++) {
Integer r = cs.take().get();
executor.execute(() -> save(r));
}
CompletionService 接口说明
下面详细介绍 CompletionService 接口的方法,CompletionService 接口提供的方法有5个,这5个方法的签名如下:
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take()
throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException;
submit
提交任务相关的方法有两个,一个是 Callable
一个 Runnable
、V result
,这两个区别在于 Callable
有返回值,而 Runnable
没有,所以要将结果传进去。
其余的三个方法都和阻塞队列的操作有关:take
()、poll
() 都是从阻塞队列中获取并移除一个元素,它们的区别在于如果阻塞队列是空的,那么调用 take
() 方法的线程会被阻塞
,而 poll
() 方法会直接返回一个 null
值。
利用 CompletionService 实现 Dubbo 中的 Forking Cluster
Dubbo
有一个 Forking
集群
模式。 在这种模式下,支持并行调用多个查询服务,只要有一个成功返回,整个服务就可以返回了。
例如需要提供一个地址转坐标的服务,为了保证该服务的高可用性,并行地调用 3 个地图服务商的 API ,然后只要有一个返回了正确结果,则这个服务就可以直接返回了。
这种模式的优点是可以容忍 2 个地图 服务商的异常,但是缺点是消耗的资源更多。
geocoder(addr) {
// 并行执行以下3个查询服务
r1=geocoderByS1(addr);
r2=geocoderByS1(addr);
r3=geocoderByS1(addr);
// 只要 r1/r2/r3 有一个返回 该方法就直接返回
return r1|r2|r3;
}
利用 CompletionService
可以快速实现这种 Forking
集群模式,下面的代码展示了如何具体实现。
首先创建一个线程池 Executor
,一个 CompletionService
和 一个 Future<Integer>
类型的 List
。每次调用 CompletionService
的 submit
() 都提交一个异步任务,这个任务会返回一个 Future
对象,我们将这些对象保存在 List futures
中。
通过调用 cs.take().get()
,能够获得最快返回的任务执行结果,只要一个任务正确返回了,就可以取消所有任务并返回最终结果。
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 创建保存 Future 对象的 List
List<Future<Integer>> futures = new ArrayList<>(3);
// 提交异步任务,将future 保存到 futures 中
futures.add(cs.submit(() -> geocoderByS1()));
futures.add(cs.submit(() -> geocoderByS2()));
futures.add(cs.submit(() -> geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回 则 break
for (int i = 0; i < 3; i++) {
r = cs.take().get();
if (r != null) {
break;
}
}
}
return r;
总结
当需要批量提交异步任务的时候建议使用 COmpletionService
。这个类将线程池 Executor
和 阻塞队列 BlockingQueue
的功能融合在一起,能够让批量执行异步任务的管理更简单。并且 CompletionService
可以让异步任务的执行结果有序化
,先执行完的任务结果先进入阻塞队列
。利用这个特性,可以轻松实现后续处理的有序性,避免无谓的等待。
CompletionService
的实现类 ExecutorCompletionService
需要自己创建线程池,好处是可以实现线程池隔离
,这种隔离可以避免几个特别耗时的任务拖垮整个应用。
Q.E.D.
Comments | 0 条评论