极客时间 ——《Java并发编程实战》 19 | CountDownLatch 和 CyclicBarrier:如何让多线程步调一致

2020-10-28   8 次阅读


这里作者上来抛出了一个例子:对账系统速度越来越慢,怎样进行优化业务流程如下:

  1. 用户通过在线商城下单,在订单库中生成一条电子订单。
  2. 物流生成一条派送单数据保存在派送单库。
  3. 为了防止漏派送或者重复派送,对账系统每天校验是否存在异常订单。

对账系统的处理逻辑很简单,如下图:先查询订单,然后查询派送单对比这两条数据,将差异写入差异库。

对账系统的代码抽象之后,核心代码如下:在一个单线程里循环查询订单、派送单,然后执行对账,将差异入库。

while(存在未对账的订单) {
	// 查询未对账订单
	pos = getPOrders();
	// 查询派送单
	dos = getDOrders();
	// 执行对账操作
	diff = check(pos,dos);
	// 入库
	save(diff);
}

利用并行优化对账系统

优化性能,首先要找到系统的性能瓶颈所在。

目前的对账系统,订单量和派送单量巨大,所以查询未对账订单 getPOrders() 和 查询派送单 getDOrders() 相对较慢,因为目前是单线程查询,示意图如下:

对账系统的瓶颈:查询未对账订单 getPOrders() 和 查询派送订单 getDOrders() 是否可以并行处理呢?是可以的,因为这两个操作没有先后顺序依赖

最耗时的两个操作并行之后,执行过程图如下,对比单线程执行示意图,在同等时间里,并行吞吐量几乎是单线程的 2 倍,优化效果比较明显。

以上是优化的思路,下面是如何用代码实现。在下面的代码中,我们创建两个线程 T1T2,并行执行查询未对账订单 getPOrders() 和 查询派送单 getDOrders() 这两个操作。

主线程中执行对账操作 check() 和 差异写入 save() 这两个操作。

需要注意的是:主线程需要等待 T1T2 都执行完成才能执行 checksave 操作,所以我们调用 T1.join() 和 T2.join() 实现等待,当 T1T2 线程退出时,调用了 T1.join() 和 T2.join() 的主线程会从阻塞状态被唤醒,从而执行之后的 checksave

        while (存在未对账的订单) {
            // 查询未对账订单
            Thread t1 = new Thread(() -> {
                pos = getPOrders();
            });
            t1.start;
            // 查询派送单
            Thread t2 = new Thread(() -> {
                dos = getDOrders();
            });
            t2.start();

            // 等待 T1,T2 执行完成
            t1.join();
            t2.join();
            // 执行对账操作
            diff = check(pos, dos);
            save(diff);
        }

用 CountDownLatch 实现线程等待

经过上面的优化,已经比单线程版本快了不少,但是还是存在问题:在 while 循环里每次都会创建新的线程,而创建线程是个比较耗时耗资源的操作,最好是能够将创建出来的线程循环利用,所以我们需要使用「线程池」来解决这个问题。

下面的代码是用线程池优化之后的:

  • 首先创建一个固定大小为 2 的线程池。
  • while 循环里重复利用

但是有个问题:主线程如何知道 getPOrders() 和 getDOrders() 这两个操作什么时候执行完成呢? 之前方案因为这两个线程是在主线程中调用 join 等待T1T2 执行完退出,但是线程池方案中,线程不会退出,所以使用 join 方法来等待线程执行完退出已经失效了。

// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);

while(存在未对账订单) {
  // 查询未对账订单
  executor.execute(() -> {
    pos = getPOrders();
  });
  // 查询派送订单
  executor.execute(() -> {
    dos = getDOrders();
  });
  
  /* ??如何实现等待??*/
  
  // 执行对账操作
  diff = check(pos,dos);
  // 写入差异库
  save(diff);
}

解决方案有很多,最直接的办法是设置一个计数器变量,初始值为2,当执行完 pos = getPOrders(); 之后将计数器减1,执行完 dos = getDOrders(); 之后也将计数器减1,在主线程等待计数器等于0,当计数器等于0之后执行后面的操作,等待计数器等于0是一个条件变量,用管程实现起来很简单。

但是并不建议在实际项目上用上述方法,因为 JDK 并发包中已经提供了实现类似功能的工具:CountDownLatch,直接使用就可以了。

下面的示例代码中, while 循环里,我们首先创建一个 CountDownLatch,计数器的初始值等于 2,之后在 pos = getPOrders();dos = getDOrders(); 这两条语句后面对计数器执行减1 操作,这个操作通过调用 latch.countDown(); 实现。

在主线程中,调用 latch.await(); 实现对条件 计数器等于0 的等待

// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);

whiel(存在未对账订单) {
  // 计数器初始化为2
  CountDownLatch latch = new CountDownLatch(2);
 // 查询未对账订单
  executor.execute(() -> {
    pos = getPOrders();
    latch.countDown();
  });
  // 查询派送订单
  executor.execute(() -> {
    dos = getDOrders();
     latch.countDown();
  });
  // 等待两个查询操作结束
  latch.await();
  
  // 执行对账操作
  diff = check(pos,dos);
  // 写入差异库
  save(diff);
}

进一步优化性能

经过上面的优化之后,已经可以交付了,但是还是存在可优化的余地。前面我们将最耗时的查询操作并行之后,但是对账操作 checksave 之间还是串行的,很显然这两个查询操作对账操作也是可以并行的,也就是在对账的时候,可以执行下一轮的查询,这个操作示意图如下:

思路有了,思考如何用代码实现:两次查询操作要和对账操作并行,且对账操作依赖查询操作的结果,有点类似 生产者——消费者 模型,两次查询操作是生产者对账操作是消费者,既然是 生产者——消费者 模型,那就需要一个队列来保存生产者生产的数据,消费者从这个队列中消费数据。

针对这个项目,作者设计了两个队列,队列元素之间还存在对应关系:具体如下图所示

订单查询操作将结果插入订单队列,派送单查询操作将结果插入派送单队列,两个队列元素之间一一对应。

两个队列的好处是:对账操作可以每次从订单队列出一个元素,派送单队列出一个元素,对这两个元素执行对账操作,这样数据一定不会乱掉。【保证了订单和派送单的对应关系一致】

下面是如何使用双队列实现完全的并行,最直接的想法是:一个线程 T1 执行订单查询T2 执行派送单查询,当 T1T2 各自生产完一条数据的时候,通知线程 T3 执行对账操作。

这个想法虽然简单,但是还隐藏一个条件T1T2生产速率一致,不能一个太快一个太慢,只有这样才能做到各自生产一条数据后通知 T3 对数据进行处理。

下面这幅图是上面流程的示意图

线程T1 和 线程T2 只有生产完一条数据的时候,才能一起向下执行,也就是这两条线程需要相互等待,步调一致。同时还要能通知到 T3 线程进行对账操作。

用 CyclicBarrier 实现线程同步

下面是用代码实现上面的方案,这个方案的难点有两个:

  1. T1 、 T2 生产消息步调一致
  2. 如何通知到 T3 线程

依然可以使用计数器来解决这两个难点:

计数器初始化为 2,线程 T1 和 T2 生产完一条数据各自减1。

如果计数器大于0 则 线程T1 或者 T2 等待,如果计数器等于0 则通知线程T3,并唤醒等待的 线程T1 或 T2,将计数器重置为2。

这样线程T1 和 T2 生产下条数据的时候就可以重复使用这个计数器了

这个思路没有问题,但是生产中不要这么做,因为 JDK 中已经有了相关工具:CyclicBarrier。在下面的代码中,我们首先创建一个计数器初始值为 2 的 CyclicBarrier,需要注意的是创建 CyclicBarrier 的时候,还传入了一个 回调函数,当计数器减到 0 时,就会调用这个函数。

线程 T1 负责查询订单,查出一条时,调用 barrier.await() 将计数器减1,同时等待计数器变为0

线程 T2 负责查询派送单,查出一条时,也调用 barrier.await() 将计数器减1,同时等待计数器变为0

当 T1、T2 都调用了 barrier.await() 时,计数器此时值减为0,此时 T1T2 就可以执行下一次查询了,同时会调用 barrier回调函数来执行对账操作。

非常值得一提的是:CyclicBarrier 的计数器有**自动重置**功能,当减到0时自动重置为设置的初始值,非常方便。

// 订单队列
Vector<P> pos;
// 派送单队列
Vectro<D> dos;
// 执行回调的线程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, () -> {
  executor.execute(()->check());
});

void check() {
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p,d);
  // 差异写入差异库
  save(diff);
}

void checkAll() {
  // 循环查询订单库
  Thread T1 = new Thread(() -> {
    while(存在未对账订单) {
      // 查询订单库
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();
  
  // 循环查询派送单库
  Thread T2 = new Thread(() -> {
    while(存在未对账订单) {
      // 查询运单库
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}

总结

CountDownLatchCyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具,这两个工具类的用法需要强调一下:

  • CountDownLatch 主要用来解决一个线程等待多个线程的场景,类比旅行团团长要等所有游客到齐才能去下个景点

  • CyclicBarrier一组线程之间互相等待,像是几个驴友不离不弃。

  • CountDownLatch计数器是不能循环利用的,当计数器减到0时,有线程调用 await() ,该线程会直接通过。

  • CyclicBarrier计数器是会自动重置的,一旦计数器减到0,会自动重置为初始值。

  • CyclicBarrier 可以设置回调函数,功能丰富。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。