• {% post_link 读书笔记/极客时间/Java并发编程实战/第二部分—并发工具类/23|Future 23|Future %}

中有一个思考题:如何优化一个询价应用的核心代码。

如果采用 ThreadPoolExecutor + Future 的方案,优化的结果应该类似下面的示例代码:用三个线程异步执行询价操作,通过三次调用 Futureget() 方法获取结果,将结果入库。

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);

// 异步向电商S1询价
Future<Integer> f1 = executor.submit(() -> getPriceByS1());

// 异步向电商S2询价
Future<Integer> f2 = executor.submit(() -> getPriceByS2());

// 异步向电商S3询价
Future<Integer> f3 = executor.submit(() -> getPriceByS3());

// 获取对应电商报价并保存
r = f1.get();
executor.execute(() -> save(r));

r = f2.get();
executor.execute(() -> save(r));

r = f3.get();
executor.execute(() -> save(r));

上面的方案本身没有太大问题,但是有个地方的处理需要注意:如果获取电商 S1 报价的耗时很长,那么即使获取 S2 的报价很短,也无法让保存 S2 报价的操作先执行,因为主线程阻塞f1.get() 上。这个问题该怎样解决呢?

可以用之前提到的阻塞队列:获取 S1S2S3 的报价都进入阻塞队列,在主线程消费队列,这样就能保证先获取到的报价先保存到数据库。

下面的代码展示了如何使用阻塞队列实现先获取到先保存:

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);

// 异步向电商S1询价
Future<Integer> f1 = executor.submit(() -> getPriceByS1());

// 异步向电商S2询价
Future<Integer> f2 = executor.submit(() -> getPriceByS2());

// 异步向电商S3询价
Future<Integer> f3 = executor.submit(() -> getPriceByS3());

// 创建阻塞队列
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();

// 获取电商报价的操作结果放入阻塞队列中
executor.execute(() -> bq.put(f1.get()));
executor.execute(() -> bq.put(f2.get()));
executor.execute(() -> bq.put(f3.get()));

// 异步保存所有报价
for (int i = 0; i < 3; i++) {
Integer r = bq.take();
executor.execute(() -> save());

利用 CompletionService 实现询价系统

在实际项目中并不建议向上面例子那样自己实现,因为 JDK 并发包已经提供了设计精良的 CompletionService。 利用这个类,不仅可以实现将先获取的报价保存到数据库中这个操作,还可以让代码更简练。

CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中。上面的示例代码是把任务最重的执行结果放入了阻塞队列中。

如何创建 CompletionService

CompletionService 的对应实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        // 默认使用无界队列
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
    
   public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

这两个构造方法都需要传入一个线程池,如果不指定 completionQueue ,则默认使用无界的 LinkedBlockingQueue,任务执行结果的 Future 存放到 completionQueue 中。

下面的示例代码完整地展示了使用 CompeltionService 实现上面的询价系统:

我们没有指定队列,所以默认使用无界队列 LinkedBlockingQueue。 之后通过 CompletionService 提供的 submit() 方法提交了三个询价操作,这三个询价操作会被 CompletionService 异步执行。

最后,通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(之前说过,阻塞队列中存放的是任务的执行结果 Future 对象),调用 Future 对象的 get() 方法就能返回询价的执行结果了。

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);

// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);

// 异步向3个供应商询价
cs.submit(() -> getPriceByS1());
cs.submit(() -> getPriceByS2());
cs.submit(() -> getPriceByS3());

// 将查询结果异步保存到数据库
for (int i = 0; i < 3; i++) {
  Integer r = cs.take().get();
  executor.execute(() -> save(r));
}

CompletionService 接口说明

下面详细介绍 CompletionService 接口的方法,CompletionService 接口提供的方法有5个,这5个方法的签名如下:

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() 
  throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) 
  throws InterruptedException;

submit 提交任务相关的方法有两个,一个是 Callable 一个 RunnableV result,这两个区别在于 Callable 有返回值,而 Runnable 没有,所以要将结果传进去。

其余的三个方法都和阻塞队列的操作有关:take()、poll() 都是从阻塞队列中获取并移除一个元素,它们的区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会直接返回一个 null 值。

image-20200814152909469

利用 CompletionService 实现 Dubbo 中的 Forking Cluster

Dubbo 有一个 Forking 集群模式。 在这种模式下,支持并行调用多个查询服务,只要有一个成功返回,整个服务就可以返回了。

例如需要提供一个地址转坐标的服务,为了保证该服务的高可用性,并行地调用 3 个地图服务商的 API ,然后只要有一个返回了正确结果,则这个服务就可以直接返回了。

这种模式的优点是可以容忍 2 个地图 服务商的异常,但是缺点是消耗的资源更多。

geocoder(addr) {
  // 并行执行以下3个查询服务
  r1=geocoderByS1(addr);
  r2=geocoderByS1(addr);
  r3=geocoderByS1(addr);
  
  // 只要 r1/r2/r3 有一个返回 该方法就直接返回
  return r1|r2|r3;
}

利用 CompletionService 可以快速实现这种 Forking 集群模式,下面的代码展示了如何具体实现。

首先创建一个线程池 Executor,一个 CompletionService 和 一个 Future<Integer> 类型的 List。每次调用 CompletionServicesubmit() 都提交一个异步任务,这个任务会返回一个 Future 对象,我们将这些对象保存在 List futures 中。

通过调用 cs.take().get() ,能够获得最快返回的任务执行结果,只要一个任务正确返回了,就可以取消所有任务并返回最终结果

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);

// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);

// 创建保存 Future 对象的 List
List<Future<Integer>> futures = new ArrayList<>(3);

// 提交异步任务,将future 保存到 futures 中
futures.add(cs.submit(() -> geocoderByS1()));
futures.add(cs.submit(() -> geocoderByS2()));
futures.add(cs.submit(() -> geocoderByS3()));

// 获取最快返回的任务执行结果
Integer r = 0;
try {
  // 只要有一个成功返回 则 break
  for (int i = 0; i < 3; i++) {
    r = cs.take().get();
    if (r != null) {
      break;
    }
  }
}
return r;

总结

当需要批量提交异步任务的时候建议使用 COmpletionService。这个类将线程池 Executor 和 阻塞队列 BlockingQueue 的功能融合在一起,能够让批量执行异步任务的管理更简单。并且 CompletionService 可以让异步任务的执行结果有序化,先执行完的任务结果先进入阻塞队列。利用这个特性,可以轻松实现后续处理的有序性,避免无谓的等待。

CompletionService 的实现类 ExecutorCompletionService 需要自己创建线程池,好处是可以实现线程池隔离,这种隔离可以避免几个特别耗时的任务拖垮整个应用

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。