极客时间 ——《Java并发编程实战》 33 | Thread-Per-Message模式:最简单实用的分工方法

2020-10-28   18 次阅读


并发编程领域问题的三个核心问题:分工同步互斥

  • 同步、互斥 —— 微观层面。

  • 分工 —— 宏观层面。

解决问题,一般从宏观入手,在编程领域,软件的设计也是从概要开始,然后才进行详细设计。解决并发问题也是从宏观到微观。

有一系列设计模式来解决并发中的分工问题,常用的有:

  • Thread-Per-Message
  • Worker Thread
  • 生产者 — 消费者

这篇文章是对 Thread-Per-Message 模式的介绍。

如何理解 Thread-Per-Message 模式

现实世界中,很场景下我们需要委托他人:找房子需要委托中介,看病需要委托医生,等等,委托一方面受限于我们的能力,另一方面受限于我们的时间。这同样也是社会分工的来源,每个人都有自己专业的事情,将专业的事给专业的人做。

编程领域也有很多类似的需求:写一个 HTTP Server主线程只负责接收请求,而不进行具体的处理。如果在主线程中处理具体的请求,同一时间只能处理一个请求,这时就采用代办的思路 —— 创建一个子线程,委托子线程去处理 HTTP 请求。

这种委托他人办理的模式在并发编程领域被总结为一种设计模式 —— Thread-Per-Message 模式:为每个任务分配一个独立的线程,这是一种最简单的分工方法,实现起来也非常简单。

用 Thread 实现 Thread-Per-Message 模式

Thread-Per-Message 最经典的应用场景是网络编程里服务端的实现服务端为每个客户端请求创建一个独立的线程,线程处理完成后自动销毁,这是一种最简单的并发处理网络请求的方法。

网络编程中最简单的是 echo 程序,echo 程序的服务端会原封不动地将客户端的请求发送回客户端。例如:客户端发送 TCP 请求 "Hello World",服务端也会返回 "Hello World"

下面以 echo 程序的服务端为例,介绍如何实现 Thread-Per-Message 模式。

使用 Java 实现一个 echo 程序的服务端很简单:

final ServerSocketChannel ssc = ServerSocketChannel.open()
  .bind(new InetSocketAddress(8080));

// 处理请求
try {
  while (true) {
    // 接受请求
    SocketChannel sc = ssc.accept();
    // 为每个请求创建一个线程
    new Thread(() -> {
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer.allocate(1024);
        sc.read(rb);
        // 模拟处理请求
        Thread.sleep(2000);
        // 写 Socket
        ByteBuffer wb = (ByteBuffer) rb.flip();
        sc.write(wb);
        sc.close();
      } catch (Exception e) {
        throw new UncheckedIOException(e);
      }
    }).start();
  }
}finally {
  ssc.close();
}

【上面的代码用到了 Java nio 包中的知识,包括 Socket,缓冲区的读和写,这部分知识对于我还属于空白,需要写一个 nio 的学习文章与代码集合放在这里做引用】

可以看到上面的代码并不复杂,我们为每个请求都创建了一个 Java 线程,核心是:new Thread(()->{…}).start()。

但是这种处理方法在实际中是行不通的,根本原因在于 Java 创建线程的开销很大线程是一个重量级对象,创建需要耗时,并且占用的内存也比较大,所以为每个请求创建一个线程进行处理并不适合高并发场景

所以看起来 Thread-Per-Message 在Java语言中并不适合实现这种需求。

那么频繁创建线程不可取,是否可以使用线程池呢?引入线程池的思路没有问题,但是会增加复杂度。语言、工具、框架本身应该是帮助我们更敏捷简单地实现方案,而不是用来否定方案,Thread-Per-Message 模式作为一种最简单的分工方案,Java 语言无法支持不是模式有问题,而是 Java 语言本身的设计问题。

Java 中的线程和操作系统的线程是一一对应的,这种做法的本质是将 Java 线程的调度权完全委托给操作系统,操作系统在这方面非常成熟,这种委托给操作系统的好处就是稳定、可靠。但是也继承了操作系统级线程的缺点创建成本高

为了解决这个缺点,JDK 并发提供了线程池等工具类,这个思路在很长一段时间里都是非常稳妥的解决方案,但是这个方案并不是唯一的解决方案。

业界还有一种另一种解决方案,叫做**轻量级线程**。这个方案在 Java 领域的知名度不高,在其他编程语言里名声却很大,例如 Go 语言Lua 语言里的协程本质上就是一种轻量级的线程

轻量级的线程,创建的成本很低,和创建一个普通对象的成本相似,创建的速度和占用的内存相比操作系统级别的线程至少有一个数量级的提升,所以基于轻量级线程实现 Thread-Per-Message 模式就完全没有问题了。

Java 也已经意识到了轻量级线程的重要性,OpenJDK 有一个 Loom 项目,就是为了解决 Java 语言的轻量级线程问题,在这个项目中,轻量级线程被叫做 Fiber,下面介绍如何基于 Fiber 实现 Thread-Per-Message 模式。

用 Fiber 实现 Thread-Per-Message 模式

Loom 项目在设计轻量级线程时,充分考虑了当前 Java 线程的使用方式,采取了尽量兼容的态度,所以在使用 Loom 上没有什么难度。

Fiber 实现 echo 服务的示例代码如下:

final ServerSocketChannel ssc = ServerSocketChannel.open()
  .bind(new InetSocketAddress(8080));

// 处理请求
try {
  while (true) {
    // 接受请求
    SocketChannel sc = ssc.accept();
    // 这里使用 Fiber.schedule 创建一个协程
    Fiber.schedule(() -> {
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer.allocate(1024);
        sc.read(rb);
        // 模拟处理请求
        LockSupport.parkNanos(2000 * 1000000);
        // 写 Socket
        ByteBuffer wb = (ByteBuffer) rb.flip();
        sc.write(wb);
        sc.close();
      } catch (Exception e) {
        throw new UncheckedIOException(e);
      }
    }).start();
  }
}finally {
  ssc.close();
}

可以看到这里和上面使用 Thread 创建线程的区别仅仅是这里是使用 Fiber.schedule() 创建一个协程,其他逻辑都是一样的。

作者在 Linux 环境下 使用 压测工具 ab 进行压测,具体步骤:

  1. 设置 ulmit -u 512 将用户能创建的最大进程数(包括线程)设置为512
  2. 启动通过 Fiber 实现的 echo 程序
  3. 使用压测工具 ab :ab -r -c 20000 -n 200000 http:// 测试机 IP 地址:8080/

压测执行结果如下:


Concurrency Level:      20000
Time taken for tests:   67.718 seconds
Complete requests:      200000
Failed requests:        0
Write errors:           0
Non-2xx responses:      200000
Total transferred:      16400000 bytes
HTML transferred:       0 bytes
Requests per second:    2953.41 [#/sec] (mean)
Time per request:       6771.844 [ms] (mean)
Time per request:       0.339 [ms] (mean, across all concurrent requests)
Transfer rate:          236.50 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0  557 3541.6      1   63127
Processing:  2000 2010  31.8   2003    2615
Waiting:     1986 2008  30.9   2002    2615
Total:       2000 2567 3543.9   2004   65293

可以看到在 20000 并发下程序运行良好,同等条件下使用 Thread 实现的 echo 程序连512个并发都抗不过去就OOM了。

通过 Linux 命令 top -Hp pid 查看 Fiber 实现的 echo 程序的进程信息,可以看到该进程创建了16(不同核数的CPU 结果会不同)个操作系统级线程

总结

发编程领域分工问题指的是如何高效拆解任务并分配给线程。之前已经介绍了不少解决分工问题的工具类:FutureCompletableFutureCompletionService``、Fork/Join 计算框架等。

这些工具类都能很好地解决特定应用场景的问题,这些工具类很优秀,但是也继承了 Java 语言的老毛病 : 太复杂

如果一直从事 Java 开发可能已经习惯了这个复杂度,但是现在看来增加复杂度是没有必要的,例如使用线程池实现 Thread-Per-Message 模式就会增加复杂度。

Thread-Per-Message 在Java 领域知名度不是很高的根本原因就在于 Java 中线程和系统级线程一一对应,创建成本太高,尤其在高并发领域,这样做基本不具备可行性。

但是这个背景条件正在发生巨变,Java 语言未来一定会提供轻量级线程,这样基于语言级别的轻量级线程实现 Thread-Per-Message 就是一种非常靠谱的选择。

对于一些并发度没有那么高的异步场景,例如定时任务,采用 Thread-Per-Message 模式完全没有问题。实际工作中就有完全基于 Thread-Per-Mesage 模式实现的分布式调度框架,这个框架为每个定时任务分配了一个独立的线程。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。