这里作者上来抛出了一个例子:对账系统速度越来越慢,怎样进行优化
。业务流程
如下:
- 用户通过在线商城下单,在订单库中生成一条电子订单。
- 物流生成一条派送单数据保存在派送单库。
- 为了防止漏派送或者重复派送,对账系统每天校验是否存在异常订单。
对账系统的处理逻辑很简单,如下图:先查询订单
,然后查询派送单
,对比
这两条数据,将差异写入
差异库。
对账系统的代码抽象之后,核心代码如下:在一个单线程里循环查询订单、派送单,然后执行对账,将差异入库。
while(存在未对账的订单) {
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos,dos);
// 入库
save(diff);
}
利用并行优化对账系统
要优化性能
,首先要找到系统的性能瓶颈
所在。
目前的对账系统,订单量和派送单量巨大,所以查询未对账订单 getPOrders
() 和 查询派送单 getDOrders
() 相对较慢,因为目前是单线程查询
,示意图如下:
对账系统的瓶颈:查询未对账订单 getPOrders
() 和 查询派送订单 getDOrders
() 是否可以并行处理
呢?是可以
的,因为这两个操作没有先后顺序
的依赖
。
将最耗时
的两个操作
并行之后,执行过程图如下,对比单线程执行示意图,在同等时间里,并行
的吞吐量
几乎是单线程的 2 倍
,优化效果比较明显。
以上是优化的思路,下面是如何用代码实现。在下面的代码中,我们创建两个线程
T1
和 T2
,并行执行查询未对账订单 getPOrders
() 和 查询派送单 getDOrders
() 这两个操作。
在主线程
中执行对账操作 check
() 和 差异写入 save
() 这两个操作。
需要注意的是:主线程
需要等待
T1
和 T2
都执行完成
才能执行 check
和 save
操作,所以我们调用 T1.join
() 和 T2.join
() 实现等待
,当 T1
和 T2
线程退出时,调用了 T1.join
() 和 T2.join
() 的主线程
会从阻塞状态被唤醒
,从而执行之后的 check
和 save
。
while (存在未对账的订单) {
// 查询未对账订单
Thread t1 = new Thread(() -> {
pos = getPOrders();
});
t1.start;
// 查询派送单
Thread t2 = new Thread(() -> {
dos = getDOrders();
});
t2.start();
// 等待 T1,T2 执行完成
t1.join();
t2.join();
// 执行对账操作
diff = check(pos, dos);
save(diff);
}
用 CountDownLatch 实现线程等待
经过上面的优化,已经比单线程版本快了不少,但是还是存在问题:在 while
循环里每次都会创建新的线程
,而创建线程是个比较耗时
、耗资源
的操作,最好是能够将创建出来的线程循环
利用,所以我们需要使用「线程池
」来解决这个问题。
下面的代码是用线程池优化之后的:
- 首先创建一个固定大小为 2 的线程池。
- 在
while
循环里重复利用
但是有个问题:主线程如何知道 getPOrders
() 和 getDOrders
() 这两个操作什么时候执行完成呢? 之前方案因为这两个线程是在主线程中调用 join
等待T1
和 T2
执行完退出,但是线程池
方案中,线程不会退出,所以使用 join
方法来等待线程执行完退出已经失效
了。
// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单) {
// 查询未对账订单
executor.execute(() -> {
pos = getPOrders();
});
// 查询派送订单
executor.execute(() -> {
dos = getDOrders();
});
/* ??如何实现等待??*/
// 执行对账操作
diff = check(pos,dos);
// 写入差异库
save(diff);
}
解决方案有很多,最直接的办法是设置一个计数器变量,初始值为2,当执行完 pos = getPOrders();
之后将计数器减1
,执行完 dos = getDOrders();
之后也将计数器减1
,在主线程
中等待
计数器等于0,当计数器等于0之后执行后面的操作,等待计数器等于0是一个条件变量
,用管程
实现起来很简单。
但是并不建议在实际项目上用上述方法,因为 JDK 并发包中已经提供了实现类似功能的工具:CountDownLatch
,直接使用就可以了。
下面的示例代码中, while
循环里,我们首先创建一个 CountDownLatch
,计数器的初始值
等于 2
,之后在 pos = getPOrders();
和 dos = getDOrders();
这两条语句后面对计数器执行减1
操作,这个操作通过调用 latch.countDown();
实现。
在主线程中,调用 latch.await();
实现对条件
计数器等于0 的等待
。
// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
whiel(存在未对账订单) {
// 计数器初始化为2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(() -> {
pos = getPOrders();
latch.countDown();
});
// 查询派送订单
executor.execute(() -> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos,dos);
// 写入差异库
save(diff);
}
进一步优化性能
经过上面的优化之后,已经可以交付了,但是还是存在可优化的余地。前面我们将最耗时的查询操作并行之后,但是对账操作 check
和 save
之间还是串行
的,很显然这两个查询操作
和对账操作
也是可以并行
的,也就是在对账的时候,可以执行下一轮的查询,这个操作示意图如下:
思路有了,思考如何用代码实现:两次查询操作要和对账操作并行,且对账操作依赖查询操作的结果,有点类似 生产者——消费者
模型,两次查询
操作是生产者
,对账
操作是消费者
,既然是 生产者——消费者
模型,那就需要一个队列
来保存生产者生产的数据,消费者从这个队列中消费数据。
针对这个项目,作者设计了两个队列,队列元素之间还存在对应关系:具体如下图所示
订单查询操作将结果插入订单队列,派送单查询操作将结果插入派送单队列,两个队列元素之间一一对应。
两个队列的好处是:对账操作可以每次从订单队列出一个元素,派送单队列出一个元素,对这两个元素执行对账操作,这样数据一定不会乱掉。【保证了订单和派送单的对应关系一致】
下面是如何使用双队列实现完全的并行,最直接的想法是:一个线程 T1
执行订单查询
,T2
执行派送单查询
,当 T1
和 T2
各自
生产完一条数据的时候,通知线程 T3
执行对账
操作。
这个想法虽然简单,但是还隐藏
着一个条件
:T1
和 T2
的生产速率一致,不能一个太快一个太慢,只有这样才能做到各自生产一条数据后通知
T3
对数据进行处理。
下面这幅图是上面流程的示意图:
线程T1
和 线程T2
只有都
生产完一条数据的时候,才能一起向下执行,也就是这两条线程需要相互等待,步调一致
。同时还要能通知
到 T3 线程进行对账操作。
用 CyclicBarrier 实现线程同步
下面是用代码实现上面的方案,这个方案的难点有两个:
- T1 、 T2 生产消息步调一致
- 如何通知到 T3 线程
依然可以使用计数器
来解决这两个难点:
计数器初始化为 2,线程 T1 和 T2 生产完一条数据各自减1。
如果计数器大于0 则 线程T1 或者 T2 等待,如果计数器等于0 则通知线程T3,并唤醒等待的 线程T1 或 T2,将计数器重置为2。
这样线程T1 和 T2 生产下条数据的时候就可以重复使用这个计数器了
这个思路没有问题,但是生产中不要这么做,因为 JDK 中已经有了相关工具:CyclicBarrier
。在下面的代码中,我们首先创建一个计数器初始值为 2 的 CyclicBarrier,需要注意的是创建 CyclicBarrier 的时候,还传入了一个 回调函数,当计数器减到 0 时,就会调用这个函数。
线程 T1
负责查询订单
,查出一条时,调用 barrier.await
() 将计数器减1
,同时等待计数器变为0
线程 T2
负责查询派送单
,查出一条时,也调用 barrier.await
() 将计数器减1
,同时等待计数器变为0
当 T1、T2 都调用了
barrier.await()
时,计数器此时值减为0,此时 T1
和 T2
就可以执行下一次查询
了,同时会调用 barrier
的回调函数
来执行对账
操作。
非常值得一提的是:CyclicBarrier
的计数器有**自动重置
**功能,当减到0时自动重置为设置的初始值,非常方便。
// 订单队列
Vector<P> pos;
// 派送单队列
Vectro<D> dos;
// 执行回调的线程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, () -> {
executor.execute(()->check());
});
void check() {
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p,d);
// 差异写入差异库
save(diff);
}
void checkAll() {
// 循环查询订单库
Thread T1 = new Thread(() -> {
while(存在未对账订单) {
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询派送单库
Thread T2 = new Thread(() -> {
while(存在未对账订单) {
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
总结
CountDownLatch
和 CyclicBarrier
是 Java 并发包提供的两个非常易用的线程同步工具,这两个工具类的用法需要强调一下:
-
CountDownLatch
主要用来解决一个线程等待多个线程
的场景,类比旅行团团长要等所有游客到齐才能去下个景点 -
CyclicBarrier
是一组线程
之间互相等待
,像是几个驴友不离不弃。 -
CountDownLatch
的计数器是不能循环利用的,当计数器减到0时,有线程调用 await() ,该线程会直接通过。 -
CyclicBarrier
的计数器
是会自动重置
的,一旦计数器减到0,会自动重置为初始值。 -
CyclicBarrier
可以设置回调函数
,功能丰富。
Q.E.D.
Comments | 0 条评论