在**{% post_link 读书笔记/极客时间/Java并发编程实战/第三部分—并发设计模式/33|Thread-Per-Message 上篇文章 %}** 中,介绍了一种最简单的分工模式:Thread-Per-Message 模式,现实世界中对应的就是委托代办。

而这种模式如果要在 Java 中使用,如果不适用轻量级的线程 Fiber 实现的话,就会导致频繁创建/销毁线程,非常影响性能,同时大量创建线程还可能导致 OOM,所以该模式在 Java 中的使用场景就受限了。

如果想有效避免频繁创建线程,就必须提到 Java 领域中使用最多的 Worker Thread 模式。

Worker Thread 模式及其实现

Worker Thread 可以类比现实世界中的车间里的工作模式:车间里的工人,有活了大家一起干活,没活大家就聊聊天等着。

可以参考下面这个示意图:

Worker Thread 对应到现实世界中可以类比为车间里干活的工人,但是需要注意的是,车间里工人数量往往是确定的

将上面的思路映射到编程中,可以使用阻塞队列任务池,创建固定数量的线程消费阻塞队列的中的任务,这就实现了 Worker Thread 模式,这个方案就是 Java 语言提供的**线程池**。

线程池的优点很多:

  • 避免重复创建、销毁线程带来的额外开销。
  • 可以限制线程的上限数量,不会无限制的创建。

下面是使用线程池实现的上一章的 echo 程序:下面的示例代码是用线程池实现的 echo 服务端,相比于 Thread-Per-Message 模式的实现,改动非常少,只是增加了创建线程池将任务提交给线程池这两步操作。

// 创建一个上限为500个线程的线程池
ExecutorService es = Executors.newFixedThreadPool(500);

final ServerSocketChannel ssc = ServerSocketChannel.open()
  .bind(new InetSocketAddress(8080));

// 处理请求
try {
  while (true) {
    // 接受请求
    SocketChannel sc = ssc.accept();
    // 将需要处理的任务提交给线程池
    es.execute(()->{
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer.allocate(1024);
        sc.read(rb);
        // 模拟处理请求
        LockSupport.parkNanos(2000 * 1000000);
        // 写 Socket
        ByteBuffer wb = (ByteBuffer) rb.flip();
        sc.write(wb);
        sc.close();
      } catch (IOException e) {
        throw new UncheckedIOException(e);
      }

    });
  }
}finally {
  ssc.close();
  es.shutdown();
}

正确地创建线程池

Java 线程池能够避免无限制创建线程导致的 OOM,也能避免无限制接收任务导致的 OOM。但是后者容易被我们忽略,例如上面的实现中就被忽略了,所以强烈建议使用**有界队列**的线程池来接受任务。

当请求数量大于有界队列的容量时,就需要合理地拒绝请求。关于「合理」的定义需要结合具体的业务场景来指定,即使线程池的默认拒绝策略可以满足需求,也建议在创建线程池时,清晰地指明拒绝策略。

同时,为了便于诊断和调试,也强烈建议在工作中给线程赋予业务相关的名字

综合以上三点,echo 程序中创建线程可以使用下面的示例代码的形式:

     ExecutorService es = new ThreadPoolExecutor(
       50,
       500,
       60L,
       TimeUnit.SECONDS,
       // 上限2000的有界队列
       new LinkedBlockingDeque<>(2000),
       // 建议根据业务需求实现 ThreadFactory
       r -> {
         return new Thread(r, "echo-" + r.hashCode());
       },
       // 拒绝策略                                 
       new CallerRunsPolicy() 
     );

避免线程死锁

使用线程池的过程还有一种可能发生死锁的场景:如果提交到相同线程池中的任务不是独立的,而是存在依赖关系,就可能导致线程死锁。

实际工作作者经历过这种线程死锁的场景,具体现象是:应用每隔一段时间偶尔就会处于无响应状态,监控数据看上去一切正常,但是实际上已经不能正常工作了。

这个出问题的应用是一个将大型的计算任务分成两个阶段,第一个阶段的任务会等待第二个阶段的子任务完成,每个阶段都使用了线程池,并且两个阶段使用的是同一个线程池。

示意图如下:

下面是这个应用的示例代码,如果执行下面这段代码,会发现它永远执行不到最后一行。执行过程中没有任何异常,但是应用已经停止响应了。

// L1,L2 阶段共用的线程池
ExecutorService es = Executors.newFixedThreadPool(2);

// L1阶段的闭锁
CountDownLatch l1 = new CountDownLatch(2);

for (int i = 0; i < 2; i++) {
  System.out.println("L1");
  // 执行L1 阶段任务
  es.execute(()-> {
    // L2 阶段的闭锁
    CountDownLatch l2 = new CountDownLatch(2);

    // 执行L2阶段的子任务
    for (int j = 0; j < 2; j++) {
      es.execute(()->{
        System.out.println("L2");
        l2.countDown();
      });
    }
    // 等待L2阶段任务执行完成
    try {
      l2.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    l1.countDown();
  });
}
// 等待L1阶段任务执行完成
l1.await();
System.out.println("end");

【下面是如何查看堆栈信息的扩展:】

1、jps -lvm --> 查看当前机器上运行的 java 进程

找到我们的进程id,然后查看这个进程的堆栈信息

2、jstack -l pid 查看指定进程id的堆栈信息

   jstack -l 75614
2020-08-21 10:39:23
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.172-b11 mixed mode):

"Attach Listener" #13 daemon prio=9 os_prio=31 tid=0x00007fd749074000 nid=0x5903 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fd750098000 nid=0xa503 waiting on condition [0x0000700007333000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000716259a38> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at chapter3.Singleton.lambda$main$1(Singleton.java:142)
        at chapter3.Singleton$$Lambda$1/159413332.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - <0x0000000715e273e8> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007fd738143800 nid=0x5703 waiting on condition [0x0000700007230000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000715fc7b30> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at chapter3.Singleton.lambda$main$1(Singleton.java:142)
        at chapter3.Singleton$$Lambda$1/159413332.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - <0x0000000715e27078> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"Service Thread" #10 daemon prio=9 os_prio=31 tid=0x00007fd750005000 nid=0x5603 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"C1 CompilerThread3" #9 daemon prio=9 os_prio=31 tid=0x00007fd738107800 nid=0x5503 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"C2 CompilerThread2" #8 daemon prio=9 os_prio=31 tid=0x00007fd748048800 nid=0x4103 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"C2 CompilerThread1" #7 daemon prio=9 os_prio=31 tid=0x00007fd738106800 nid=0x4303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"C2 CompilerThread0" #6 daemon prio=9 os_prio=31 tid=0x00007fd74803f000 nid=0x3f03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Monitor Ctrl-Break" #5 daemon prio=5 os_prio=31 tid=0x00007fd74803d800 nid=0x3e03 runnable [0x0000700006b1b000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        - locked <0x000000071584d748> (a java.io.InputStreamReader)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        - locked <0x000000071584d748> (a java.io.InputStreamReader)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:61)

   Locked ownable synchronizers:
        - None

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fd738000000 nid=0x4703 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fd728030000 nid=0x4c03 in Object.wait() [0x0000700006915000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x0000000715588ed0> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
        - locked <0x0000000715588ed0> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:216)

   Locked ownable synchronizers:
        - None

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fd72802f000 nid=0x3603 in Object.wait() [0x0000700006812000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x0000000715586bf8> (a java.lang.ref.Reference$Lock)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
        - locked <0x0000000715586bf8> (a java.lang.ref.Reference$Lock)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

   Locked ownable synchronizers:
        - None

"main" #1 prio=5 os_prio=31 tid=0x00007fd738001800 nid=0x2603 waiting on condition [0x0000700005bee000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000071572ead0> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at chapter3.Singleton.main(Singleton.java:150)

   Locked ownable synchronizers:
        - None

"VM Thread" os_prio=31 tid=0x00007fd728028800 nid=0x4d03 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fd749005000 nid=0x2207 runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fd749005800 nid=0x2a03 runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fd749006800 nid=0x2b03 runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fd749007000 nid=0x5303 runnable 

"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fd738800000 nid=0x5203 runnable 

"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fd738801000 nid=0x2f03 runnable 

"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fd738801800 nid=0x5103 runnable 

"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fd738802000 nid=0x4f03 runnable 

"GC task thread#8 (ParallelGC)" os_prio=31 tid=0x00007fd738802800 nid=0x3203 runnable 

"GC task thread#9 (ParallelGC)" os_prio=31 tid=0x00007fd73800a800 nid=0x3403 runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x00007fd749071800 nid=0xa703 waiting on condition 

JNI global references: 322

重点在这里:

可以看到线程池中的两个线程全部阻塞在 l2.await()这行,也就是说线程池里的所有任务都在等待 L2 阶段的任务执行完,但是因为线程池中线程数是有限的,这个例子中只有2个工作线程,而这2个线程已经全部被 L1 占用了,所以L2在等待队列中,永远无法被执行,而 L1 将永远等待 L2 执行完,这就形成了死锁。

找到了问题的原因,要解决就很简单了,最简单粗暴的就是加大线程池中线程的数量,jcip 线程池章节中也提到了这个问题,如果是一个无限大的线程池,就不会出现这种问题,所以如果应用场景并不需要过多的线程的话,增大线程池即可

如果需要的线程数量很多,这个方法就行不通了,更加有效的方法是为不同的任务创建不同的线程池,线程池中不要运行异构任务,只运行相同结构,相同目的的任务,在上面的例子中就是 给 L1、L2 分别各创建一个线程池就不会出现问题了。

线程池中的任务一定要注意是否是相互独立的,否则就要遇见到可能出现死锁的风险并提前做出应对。

总结

解决并发编程里的分工问题,最好的办法是和现实世界中的解决方案做对比,通过对比构建编程领域的模型,能让模型更加容易理解。

上篇介绍 Thread-Per-Message 模式,类似于现实中委托他人办理事务,今天介绍的 Worker Thread 模式则类似车间里工人的工作模式。

如果在设计阶段对业务模型进行建模之后,模型非常类似于车间工作模式,基本就能确定可以在实现阶段采用 Worker Thread 模式进行实现。

Worker Thread 模式 和 Thread-Per-Message 模式之间的区别:

  • 委托别人做事,和代办人直接沟通,对应到编程领域,实现是主线程创建子线程,线程之间可以直接通信
  • 车间工人的工作则是围绕工作展开,具体的任务被谁执行预先是无法知道的,对应到编程中就是主线程将任务提交给线程池,但是主线程并不关心这个任务是被哪个具体的工作线程执行

Worker Thread 模式可以避免线程频繁创建/销毁的问题,并且可以限制线程最大数量,Java 里可以直接使用线程池来实现 Worker Thread 模式,线程池是非常基础和优秀的工具类,有的大厂甚至不允许使用 new Thread() 的形式创建线程,只能使用线程池。

但是使用线程池也会带来问题,除了这篇文章中介绍的死锁问题,还要注意之前提过的 ThreadLocal 内存泄漏问题,同时还需要对提交的任务做好异常理,避免异常任务无法发现。

从业务的角度看,没有发现异常任务的后果都很严重。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。