极客时间 ——《Java并发编程实战》 36 | 生产者——消费者模式:用流水线思想提高效率

2020-10-28   16 次阅读


之前在 {% post_link 读书笔记/极客时间/Java并发编程实战/第三部分—并发设计模式/34|Worker-Thread 34|Worker Thread模式:如何避免重复创建线程%} 这一章中提到,Worker Thread 可以类比工厂里车间工人的工作模式。在现实世界中,工厂里还有一种流水线工作模式,对应的编程领域概念就是 生产者—消费者模式

生产者—消费者模式在编程领域的应用也非常广泛,Java 的线程池的本质就是使用生产者—消费者模式实现的,每当我们在使用线程池的时候,实际上都在应用生产者—消费者模式。

除了线程池中个的应用,为了提升性能,并发领域编程很多地方都用到了生产者—消费者模式,例如 Log4j 中 异步 Appender 内部。

这篇文章将深入介绍 生产者—消费者模式,其具有哪些优点,如何提升系统性。

生产者—消费者模式的优点

生产者—消费者模式的核心是一个任务队列生产者线程生产任务,并将任务添加到任务队列中,消费者线程从任务队列中获取任务并执行。

示意图如下:

从架构设计的角度,生产者—消费者有一个重要的优点:解耦。解耦对于大型系统的设计非常重要,解耦的一个关键点就是组件之间的依赖方式和通信方式必须受限。

在生产者—消费者模式中,生产者和消费者没有任何依赖关系,彼此间通信只依靠任务队列,所以生产者—消费者模式是一个不错的解耦方案。

除了架构设计上优点,还有一个重要优点就是:**支持异步,并且能够平衡生产者和消费者的速度差异。**在该模式中,生产者只需要将任务添加到任务队列中,而无需等待任务呗消费者线程执行完,也就是任务的生产和消费是异步的。这是与传统方法之间调用的本质区别,传统的方法之间调用是同步的。

Java 中异步化最简单的方法就是创建一个新的线程去处理相关的业务逻辑,那么任务队列在这里扮演什么角色呢?其主要作用在于平衡生产者和消费者之间的速度差异。假设生产的速率很慢,消费的速率很高,比例是 1:3,那么如果生产者有三个线程,采用创建新线程的方式,也需要创建3个对应的消费者子线程,但是使用 生产者—消费者模式,消费线程值需要1个就可以了。

Java 中的线程是操作系统级的,线程创建得多了会增加上下文切换的成本,也会占用系统资源,所以 Java 的线程不是越多越好,而是需要保持在一个恰当的数量,生产者—消费者模式可以支持你将线程数保持一个合适的量

支持批量执行以提升性能

之前在 {% post_link 读书笔记/极客时间/Java并发编程实战/第三部分—并发设计模式/33|Thread-Per-Message 《33 | Thread-Per-Message模式:最简单使用的分工方法》 %} 提到了轻量级线程,那么如果使用轻量级线程是否还需要平衡消费者和生产者之间的速度差异?因为轻量级线程足够廉价,那么创建足够多的线程就够了。

但是有一类场景,使用生产者—消费者模式有奇效:批量执行任务的需求。

例如:需要在数据库中 INSERT 1000条数据,有两种方案:

  1. 创建1000个线程并发执行,每个线程 INSERT 一条数据
  2. 创建1个线程,执行一个批量 SQL ,一次性 INSERT 1000条数据。

这两种方案第二种方案效率更高,这样的场景就是上面提到的批量执行场景。

在 **{% post_link 读书笔记/极客时间/Java并发编程实战/第三部分—并发设计模式/35|两阶段终止 《35|两阶段终止模式:如何优雅地终止线程?》 %}**中提到了一个监控系统动态采集的按理,最终的回传数据是需要入库的。

但是被监控的系统往往有很多,如果每条回传数据都是直接 INSERT 进入数据库,那么对应的就是上面提到的方案一。更好的方案是批量执行,这个方案的实现需要使用生产者—消费者模式

利用生产者—消费者模式实现批量执行 SQL 非常简单:将直接 INSERT 到数据库的线程作为生产者线程生产者线程只负责将数据添加到队列中消费者线程负责从任务队列中批量取出并批量执行 INSERT

下面的示例代码创建了 5 个消费者线程负责批量执行 SQL,5个消费者线程以 while(true) {} 的循环方式批量获取任务并执行。需要注意的是:从任务队列中获取批量任务的方法 pollTasks() 首先是以阻塞方式获取任务队列中的一条任务,而后才以非阻塞的方式获取任务

阻塞的原因是:如果任务队列中没有任务,这样可以避免空循环

// 任务队列 (类成员变量)
BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);

// 启动5个消费者线程执行批量任务
void start() {
  ExecutorService es = Executors.newFixedThreadPool(5);
  for (int i = 0; i < 5; i++) {
    es.execute(() -> {
      try {
        while (true) {
          // 批量获取任务
          List<Task> ts = pollTasks();
          // 批量执行任务
          execTasks(ts);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
}

List<Task> pollTasks() throws InterruptedException {
  List<Task> ts = new LinkedList<>();

  // 使用阻塞式获取第一条任务

  Task t = bq.take();
  while (t != null) {
    ts.add(t);
    // 非阻塞的获取一条任务
    t = bq.poll();
  }
  return ts;
}

// 批量执行任务
void execTasks(List<Task> ts) {
  // 省略具体执行逻辑
}

支持分阶段提交以提升性能

生产者—消费者 模式另一个典型应用场景:分阶段提交。

写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,往往采用异步刷盘的方式。作者这里介绍了一个曾经参与的项目,其中的日志组件是自己实现的,采用的就是异步刷盘的方式**(异步将数据保存到硬盘中)**,刷盘的时机是:

  1. ERROR级别的日志立即刷盘
  2. 数据累积到500条时立即刷盘
  3. 存在未刷盘数据,且5秒内没有发生过刷盘,则立即刷盘

这个日志组件的异步刷盘本质上就是一种分阶段提交

下面是具体的示例代码:


public class Logger {
    // 任务队列
    final BlockingQueue<LogMsg> bq = new LinkedBlockingQueue<>();

    // flush 批量
    static final int batchSize = 500;

    // 只需要一个线程写日志
    ExecutorService es = Executors.newFixedThreadPool(1);

    // 启动写日志线程
    void start() throws IOException {
        File file = File.createTempFile("foo", ".log");

        final FileWriter writer = new FileWriter(file);

        this.es.execute(() -> {
            try {
                // 未刷盘日志数量
                int curIdx = 0;
                long preFT = System.currentTimeMillis();
                while (true) {
                    // 获取5秒内的所有日志消息
                    LogMsg log = bq.poll(5, TimeUnit.SECONDS);
                    // 写日志
                    if (log != null) {
                        writer.write(log.toString());
                        ++curIdx;
                    }
                    // 如果不存在未刷盘数据,则无需执行保存操作
                    if (curIdx <= 0) {
                        continue;
                    }

                    // 根据规则刷盘
                    if (log != null && log.level == LEVEL.ERROR || curIdx == batchSize
                            || System.currentTimeMillis() - preFT > 5000) {
                        writer.flush();
                        curIdx = 0;
                        preFT = System.currentTimeMillis();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    writer.flush();
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });

    }

    // 写入 INFO 级别的日志到队列中
    void info(String msg) throws InterruptedException {
        bq.put(new LogMsg(LEVEL.INFO, msg));
    }

    // 写入 ERROR 级别的日志到队列中
    void error(String msg) throws InterruptedException {
        bq.put(new LogMsg(LEVEL.ERROR, msg));
    }

    //日志级别
    enum LEVEL {INFO, ERROR}

    class LogMsg {
        LEVEL level;
        String msg;

        public LogMsg(LEVEL level, String msg) {
            this.level = level;
            this.msg = msg;
        }
    }
}

总结

Java 中的线程池本身就是 生产者—消费者模式的一种实现,但是线程池中的线程每次只能从任务队列中消费一个任务,对于大部分并发场景已经够用,但是例如批量执行/分阶段提交这种场景还是需要自己实现 生产者—消费者模式。

生产者—消费者模式在分布式计算中也应用广泛,可以使用**消息队列MQ** 来实现该模式。MQ 一般支持两种消息模型:

  • 点对点模型
  • 发布订阅模型

点对点一个消息只会被一个消费者消费,类似线程池。

发布订阅模型则是一个消息会被多个消费者消费,本质是一种消息的广播,在并发编程领域可以结合观察者模式实现广播功能。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。