在**{% post_link 读书笔记/极客时间/Java并发编程实战/第三部分—并发设计模式/33|Thread-Per-Message 上篇文章 %}** 中,介绍了一种最简单的分工模式:Thread-Per-Message
模式,现实世界中对应的就是委托代办。
而这种模式如果要在 Java 中使用,如果不适用轻量级的线程 Fiber 实现的话,就会导致频繁创建/销毁线程,非常影响性能,同时大量创建线程还可能导致 OOM,所以该模式在 Java 中的使用场景就受限了。
如果想有效避免频繁创建线程,就必须提到 Java 领域中使用最多的 Worker Thread
模式。
Worker Thread 模式及其实现
Worker Thread
可以类比现实世界中的车间
里的工作模式:车间里的工人,有活了大家一起干活,没活大家就聊聊天等着。
可以参考下面这个示意图:
Worker Thread
对应到现实世界中可以类比为车间里干活的工人,但是需要注意的是,车间里工人数量往往是确定的。
将上面的思路映射到编程中,可以使用阻塞队列做任务池
,创建固定数量的线程消费阻塞队列的中的任务,这就实现了 Worker Thread
模式,这个方案就是 Java 语言提供的**线程池
**。
线程池的优点很多:
- 避免重复创建、销毁线程带来的额外开销。
- 可以限制线程的上限数量,不会无限制的创建。
下面是使用线程池实现的上一章的 echo
程序:下面的示例代码是用线程池实现的 echo 服务端,相比于 Thread-Per-Message
模式的实现,改动非常少,只是增加了创建线程池
,将任务提交给线程池
这两步操作。
// 创建一个上限为500个线程的线程池
ExecutorService es = Executors.newFixedThreadPool(500);
final ServerSocketChannel ssc = ServerSocketChannel.open()
.bind(new InetSocketAddress(8080));
// 处理请求
try {
while (true) {
// 接受请求
SocketChannel sc = ssc.accept();
// 将需要处理的任务提交给线程池
es.execute(()->{
try {
// 读 Socket
ByteBuffer rb = ByteBuffer.allocate(1024);
sc.read(rb);
// 模拟处理请求
LockSupport.parkNanos(2000 * 1000000);
// 写 Socket
ByteBuffer wb = (ByteBuffer) rb.flip();
sc.write(wb);
sc.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
}finally {
ssc.close();
es.shutdown();
}
正确地创建线程池
Java 线程池能够避免无限制创建线程导致的 OOM,也能避免无限制接收任务导致的 OOM。但是后者容易被我们忽略,例如上面的实现中就被忽略了,所以强烈建议使用**有界队列
**的线程池来接受任务。
当请求数量大于有界队列的容量时,就需要合理地拒绝请求。关于「合理」的定义需要结合具体的业务场景来指定,即使线程池的默认拒绝策略可以满足需求,也建议在创建线程池时,清晰地指明拒绝策略。
同时,为了便于诊断和调试,也强烈建议在工作中给线程赋予业务相关的名字。
综合以上三点,echo
程序中创建线程可以使用下面的示例代码的形式:
ExecutorService es = new ThreadPoolExecutor(
50,
500,
60L,
TimeUnit.SECONDS,
// 上限2000的有界队列
new LinkedBlockingDeque<>(2000),
// 建议根据业务需求实现 ThreadFactory
r -> {
return new Thread(r, "echo-" + r.hashCode());
},
// 拒绝策略
new CallerRunsPolicy()
);
避免线程死锁
使用线程池的过程还有一种可能发生死锁
的场景:如果提交到相同线程池中的任务不是独立的,而是存在依赖关系,就可能导致线程死锁。
实际工作作者经历过这种线程死锁的场景,具体现象是:应用每隔一段时间偶尔就会处于无响应状态,监控数据看上去一切正常,但是实际上已经不能正常工作了。
这个出问题的应用是一个将大型的计算任务分成两个阶段,第一个阶段的任务会等待第二个阶段的子任务完成,每个阶段都使用了线程池,并且两个阶段使用的是同一个线程池。
示意图如下:
下面是这个应用的示例代码,如果执行下面这段代码,会发现它永远执行不到最后一行。执行过程中没有任何异常,但是应用已经停止响应了。
// L1,L2 阶段共用的线程池
ExecutorService es = Executors.newFixedThreadPool(2);
// L1阶段的闭锁
CountDownLatch l1 = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
System.out.println("L1");
// 执行L1 阶段任务
es.execute(()-> {
// L2 阶段的闭锁
CountDownLatch l2 = new CountDownLatch(2);
// 执行L2阶段的子任务
for (int j = 0; j < 2; j++) {
es.execute(()->{
System.out.println("L2");
l2.countDown();
});
}
// 等待L2阶段任务执行完成
try {
l2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
l1.countDown();
});
}
// 等待L1阶段任务执行完成
l1.await();
System.out.println("end");
【下面是如何查看堆栈信息的扩展:】
1、jps -lvm
--> 查看当前机器上运行的 java 进程
找到我们的进程id,然后查看这个进程的堆栈信息
2、jstack -l pid
查看指定进程id的堆栈信息
jstack -l 75614
2020-08-21 10:39:23
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.172-b11 mixed mode):
"Attach Listener" #13 daemon prio=9 os_prio=31 tid=0x00007fd749074000 nid=0x5903 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fd750098000 nid=0xa503 waiting on condition [0x0000700007333000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000716259a38> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at chapter3.Singleton.lambda$main$1(Singleton.java:142)
at chapter3.Singleton$$Lambda$1/159413332.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x0000000715e273e8> (a java.util.concurrent.ThreadPoolExecutor$Worker)
"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007fd738143800 nid=0x5703 waiting on condition [0x0000700007230000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000715fc7b30> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at chapter3.Singleton.lambda$main$1(Singleton.java:142)
at chapter3.Singleton$$Lambda$1/159413332.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x0000000715e27078> (a java.util.concurrent.ThreadPoolExecutor$Worker)
"Service Thread" #10 daemon prio=9 os_prio=31 tid=0x00007fd750005000 nid=0x5603 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"C1 CompilerThread3" #9 daemon prio=9 os_prio=31 tid=0x00007fd738107800 nid=0x5503 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"C2 CompilerThread2" #8 daemon prio=9 os_prio=31 tid=0x00007fd748048800 nid=0x4103 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"C2 CompilerThread1" #7 daemon prio=9 os_prio=31 tid=0x00007fd738106800 nid=0x4303 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"C2 CompilerThread0" #6 daemon prio=9 os_prio=31 tid=0x00007fd74803f000 nid=0x3f03 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"Monitor Ctrl-Break" #5 daemon prio=5 os_prio=31 tid=0x00007fd74803d800 nid=0x3e03 runnable [0x0000700006b1b000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x000000071584d748> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x000000071584d748> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:61)
Locked ownable synchronizers:
- None
"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fd738000000 nid=0x4703 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fd728030000 nid=0x4c03 in Object.wait() [0x0000700006915000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000715588ed0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
- locked <0x0000000715588ed0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:216)
Locked ownable synchronizers:
- None
"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fd72802f000 nid=0x3603 in Object.wait() [0x0000700006812000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000715586bf8> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x0000000715586bf8> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
Locked ownable synchronizers:
- None
"main" #1 prio=5 os_prio=31 tid=0x00007fd738001800 nid=0x2603 waiting on condition [0x0000700005bee000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000071572ead0> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at chapter3.Singleton.main(Singleton.java:150)
Locked ownable synchronizers:
- None
"VM Thread" os_prio=31 tid=0x00007fd728028800 nid=0x4d03 runnable
"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fd749005000 nid=0x2207 runnable
"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fd749005800 nid=0x2a03 runnable
"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fd749006800 nid=0x2b03 runnable
"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fd749007000 nid=0x5303 runnable
"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fd738800000 nid=0x5203 runnable
"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fd738801000 nid=0x2f03 runnable
"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fd738801800 nid=0x5103 runnable
"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fd738802000 nid=0x4f03 runnable
"GC task thread#8 (ParallelGC)" os_prio=31 tid=0x00007fd738802800 nid=0x3203 runnable
"GC task thread#9 (ParallelGC)" os_prio=31 tid=0x00007fd73800a800 nid=0x3403 runnable
"VM Periodic Task Thread" os_prio=31 tid=0x00007fd749071800 nid=0xa703 waiting on condition
JNI global references: 322
重点在这里:
可以看到线程池中的两个线程全部阻塞在 l2.await()
这行,也就是说线程池里的所有任务都在等待 L2 阶段的任务执行完,但是因为线程池中线程数是有限的,这个例子中只有2个工作线程,而这2个线程已经全部被 L1 占用了,所以L2在等待队列中,永远无法被执行,而 L1 将永远等待 L2 执行完,这就形成了死锁。
找到了问题的原因,要解决就很简单了,最简单粗暴的就是加大线程池中线程的数量,jcip 线程池章节中也提到了这个问题,如果是一个无限大
的线程池,就不会出现这种问题,所以如果应用场景并不需要过多的线程的话,增大线程池即可。
如果需要的线程数量很多,这个方法就行不通了,更加有效的方法是为不同的任务创建不同的线程池
,线程池中不要运行异构
任务,只运行相同结构,相同目的的任务,在上面的例子中就是 给 L1、L2 分别各创建一个线程池就不会出现问题了。
线程池中的任务一定要注意是否是相互独立的,否则就要遇见到可能出现死锁的风险并提前做出应对。
总结
解决并发编程里的分工问题,最好的办法是和现实世界中的解决方案做对比,通过对比构建编程领域的模型,能让模型更加容易理解。
上篇介绍 Thread-Per-Message 模式,类似于现实中委托他人办理事务,今天介绍的 Worker Thread 模式则类似车间里工人的工作模式。
如果在设计阶段对业务模型进行建模之后,模型非常类似于车间工作模式,基本就能确定可以在实现阶段采用 Worker Thread 模式进行实现。
Worker Thread
模式 和 Thread-Per-Message
模式之间的区别:
- 委托别人做事,和代办人直接沟通,对应到编程领域,实现是主线程创建子线程,
线程之间可以直接通信
。 - 车间工人的工作则是围绕工作展开,具体的任务被谁执行预先是无法知道的,对应到编程中就是主线程将任务提交给线程池,但是主线程并不关心这个任务是被哪个具体的工作线程执行。
Worker Thread
模式可以避免线程频繁创建/销毁
的问题,并且可以限制线程最大数量
,Java 里可以直接使用线程池
来实现 Worker Thread
模式,线程池是非常基础和优秀的工具类,有的大厂甚至不允许使用 new Thread()
的形式创建线程,只能使用线程池。
但是使用线程池也会带来问题,除了这篇文章中介绍的死锁
问题,还要注意之前提过的 ThreadLocal 内存泄漏
问题,同时还需要对提交的任务
做好异常
的处
理,避免异常任务无法发现。
从业务的角度看,没有发现异常任务的后果都很严重。
Q.E.D.
Comments | 0 条评论