极客时间 ——《Java并发编程实战》 22 | Executor与线程池:如何创建正确的线程池

2020-10-28   19 次阅读


Java 中创建线程虽然从语言上看很简单,但是具体到底层:需要调用操作系统内核API操作系统需要为线程分配一系列资源,所以创建一个线程的成本远没有看上去那么简单,线程是重量级对象,应该避免频繁创建和销毁。而在 Java 中我们为了重复使用线程,使用了线程池。

线程池的需求很普遍,所以 JDK 并发包中存在着不少线程池相关的类。但是线程池和一般的池化资源还有些不同:

  • 一般的池化资源:需要资源时调用 acquire 申请,用完调用 release 释放。
  • 线程池没有提供申请线程和释放线程的方法。

一般的池化资源类:

class XXXPool {
  // 获取池化资源
  XXX acquire() {
  
  }
  // 释放池化资源
  void release(XXX x) {
  
  }
}

线程池是一种 生产者—消费者 模式

为什么线程池没有像普通池化资源一样设计?如果线程池采用一般意义上的池化资源设计方法,应该类似下面的示例代码:

// 采用一般意义上池化资源设计思路下的线程池
class ThreadPool{
  // 获取空闲线程
  Thread acquire() {
    
  }
  // 释放线程
  void release(Thread t) {
    
  }
  // 期望的使用方法
  ThreadPool pool;
  Thread T1 = pool.acqire();
  // 传入 Runnable 对象
  T1.execute(() -> {
    // 具体业务逻辑
    ....
  });
}

当我们获取到空闲线程 T1 时,该如何使用呢? 你期望的使用方法可能是这样:调用 T1execute() 方法,传入一个 Runnable 对象执行具体业务逻辑,就像通过参数是 RunnableThread 构造函数创建一个 Thread 对象一样。

但是 Thread 中并没有提供类似 execute(Runnbale target) 这样的公共方法。

所以线程池的设计没有办法直接采用一般意义的池化资源设计方式。目前业界的线程池普遍采用了 生产者—消费者 模式来设计。

线程池的使用方是生产者,线程池本身是消费者。

下面的代码中,创建了一个非常简单的线程池,通过这个例子来了解线程池的工作原理:

【我将作者伪代码写了个能运行的小例子,所以可以直接复制运行跑起来看看】

// 简化的线程池实现,仅用来说明工作原理
public class MyThreadPool {
    // 利用阻塞队列实现生产者—消费者模式
    BlockingQueue<Runnable> workQueue;

    // 保存内部工作线程
    List<WorkerThread> threads = new ArrayList<>();

    // 构造函数
    public MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        // 创建工作线程
        for (int index = 0; index < poolSize; index++) {
            WorkerThread work = new WorkerThread();
            work.start();
            threads.add(work);
        }
    }

    // 提交任务
    void execute(Runnable command) throws InterruptedException {
        workQueue.put(command);
    }

    // 工作线程负责消费任务,并执行任务
    class WorkerThread extends Thread {
        @Override
        public void run() {
            // 循环获取任务并执行
            while (true) { // ①
                Runnable task = null;
                try {
                    task = workQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                task.run();
            }
        }
    }

    /** 下面是使用示例 **/
    public static void main(String[] args) throws InterruptedException {
        // 创建有界队列
        BlockingQueue workQueue = new LinkedBlockingQueue<>(2);

        // 创建线程池
        MyThreadPool pool = new MyThreadPool(10, workQueue);

        // 提交任务
        pool.execute(() -> System.out.println("hello"));
    }
}


这个我们自己编写的简化的示例代码 MyThreadPool 的内部,维护了一个阻塞队列 workQueue 和一组工作线程,线程数量使用者创建的时候传入

用户通过调用 execute() 方法提交要执行的 Runnable 任务,execute() 方法内部实现仅仅是将任务加入到 workQueue 队列中。

MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行,相关代码就是 处的 while 循环。 线程池的主要工作原理就是上述这些。

如何使用 Java 中的线程池

Java 并发包中提供的线程池类,远比上面的示例代码强大。Java 提供的线程池相关工具类中,核心类是:ThreadPoolExecutor,通过名字可以看出,强调的是 Executor,而不是一般意义上的池化资源。

ThreadPoolExecutor 的构造函数非常复杂,如下面代码所示,最完备的构造函数有7个参数:

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

下面每个参数的意义:你可以将线程池理解为一个项目组,线程是项目组成员

  • corePoolSize线程池最小保留线程数。有的项目组很闲,但是也不能把人都撤了,至少保留 corePoolSize 个人。

  • maximumPoolSize线程池最大创建线程数。项目很忙的时候需要增加人手,但是最多增加到 maximumPoolSize 个人。

  • keepAliveTime & unit : 如果一个线程在一段时间内没有执行任务,说明系统空闲,这段时间由 keepAliveTimeunit 来决定。如果一个线程空闲了 keepAliveTime & unit 这么久,并且线程数大于 corePoolSize,则空闲线程将被回收

  • workQueue:工作队列,和示例中的工作队列一样,用来保存执行任务的线程的队列。

  • threadFactory:通过这个参数可以自定义如何创建线程,可以给线程指定线程的名字。

  • handler:通过这个参数定义任务的拒绝策略。如果线程中所有线程都在忙碌,并且工作队列(前提是有界)也满了,此时提交任务,线程池会拒绝接受,拒绝策略通过 hanlder 参数来指定。ThreadPoolExecutor 提供了以下 4种策略:

    • CaalerRunsPolicy:提交任务的线程自己去执行该任务。
    • AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
    • DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    • DiscardOldestPolicy:丢弃最老的任务 —— 将最早进入工作队列的任务丢弃,添加新的任务进入工作队列。

    JDK 6 增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程支持超时,这意味着如果线程很闲,会撤掉所有的线程,并不保留 corePoolSize 个线程。

线程池的使用注意事项

ThreadPoolExecutor 的构造函数很复杂,JDK 并发包中提供了一个线程池的静态工厂类 Executors,使用这个类可以快速创建线程池。

但是现在大厂都不建议使用 Executors 了,原因是因为 Executors 提供的很多方法默认使用的是无界队列,在高负载情况下,无界队列很容易导致 OOMOOM 会导致所有请求都无法处理,这是致命问题,所以创建线程池强烈建议使用有界队列。

使用有界队列的情况下,当任务过多,线程池会触发拒绝策略,默认的拒绝策略会 throw RejectedExecutionException ,这是个运行时异常,编译器并不强制要求 catch,所以容易被开发人员忽略。因此,默认拒绝策略慎重使用。 如果线程池处理的任务非常重要,建议自定义拒绝策略。在实际工作中,自定义决拒绝略通常配合降级策略使用

使用线程池,还要注意异常处理问题,通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行过程中出现运行时异常,会导致执行任务的线程终止。

最致命的是,任务终止了但是你却无法获得任何通知,这会让程序员误以为任务执行的很正常。

虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,可以参考如下方法:

try {
 // 业务逻辑
} cathch (RuntimeException x) {
 // 按需处理
} catch (Throable x) {
 // 按需处理
}

注意事项总结:

  1. 不要用无界队列,在高负载下有 OOM 风险。
  2. 最好自定义拒绝策略,不要忽视默认拒绝策略带来的异常,拒绝策略一般配合降级使用。
  3. 如果被执行的任务可能抛出运行时异常,需要处理这些异常,否则当异常发生时任务会终止且无法获得提示。

总结

线程池在 Java 并发编程领域非常重要,理解线程池需要理解 生产者—消费者模式。《Java 并发编程实战》中第七、第八章用了大量的偏移来深入的介绍线程池。还是建议去看这本书。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。