作者介绍了同事小灰的一个 Web 项目:Web 版的文件浏览器,通过这个文件浏览器可以直接在网页中查看服务器上的目录和文件

这个项目依赖运维部门提供的文件浏览服务,而这个文件浏览服务只支持以消息队列的方式接入。

消息队列在互联网大厂中的应用非常多,主要的作用是流量削峰系统解耦。在这种接入方式中,发送消息消费结果这两个操作是异步的,示意图如下:

在这个 Web 项目中,用户通过浏览器发送一个请求,会被转换成一个异步消息发送给 MQ ,等 MQ 返回结果后,再将这个结果返回给浏览器。

但是存在的问题是:给 MQ 发送消息的线程是处理 Web 请求的线程 T1 , 但消费 MQ 结果的线程并不是 T1如何让发送消息和消费消息的线程是同一个

下面是示例代码:

public class Message {
    String id;
    String content;

    // 发送消息的方法
    void send(Message msg) {
        // 具体逻辑
    }

    // MQ 消息返回后调用该方法,但是该方法的执行线程与发送消息的线程不一致
    void onMeesage(Message msg) {
        // 省略相关业务代码
    }

    // 处理浏览器发送的请求
    Respond handleWebReq() {
        // 创建一个消息
        Message msg = new Message("1", "{...}");

        // 发送消息
        send(msg);

        // 如何等待 MQ 返回的消息
        String result = ...;
    }
}

这里的场景和

  • {% post_link 读书笔记/极客时间/Java并发编程实战/第二部分—并发工具类/15|Dubbo如何使用管程 15|Dubbo如何使用管程 %}

描述的很相似,Dubbo 使用 Lock&Condition异步转为了同步,在那篇文章中作者给出了最终的解决方案,但是却没有介绍这个方案是怎么设计出来的,这篇文章将会介绍这个方案背后的设计原理。

Guarded Suspension 模式

上面例子中的问题现实世界中比比皆是,比如去饭店吃饭,到了之后发现预定的包间上一批客人刚走还没有收拾好,这时候大堂经理就会告诉你包间正在收拾,稍等一会,等收拾好了再进行通知,过了一会收拾好了,大堂经理将我们带进包间就餐。

等待包间收拾完毕和等待 MQ 消息返回本质上是一样的:都是等待一个条件满足

现实世界中大堂经理这个角色很重要,他负责协调是否需要等待。那么程序世界中该如何设计并实现对应的功能呢?

这个问题的设计方案前任早就搞定了,并且总结成了一个设计模式:Guarded Suspension —— 保护性地暂停

**下图是Guarded Suspension 设计模式对应的结构图:**很简单,一个对象 GuardedObject:

  • 内部有一个成员变量 —— 受保护的对象
  • 两个成员方法:get(Predicate<T> p)onChanged(T obj)

GuardedObject 定位就是之前提到的大堂经理的角色,受保护对象就是餐厅里的包间,get() 指的是我们就餐,就餐的前提是包间已经收拾好了, 参数 p 用来描述这个前提条件。 onChanged() 方法对应服务员将包间收拾完毕,通过 onChanged() 可以触发一个事件,这个事件改变``前提条件 p计算结果。 下图中左侧的绿色线程就是需要就餐的顾客,右侧的蓝色线程就是收拾包间的服务员

GuardedObject 内部实现非常简单,是管程的经典用法,可以参考下面的示例代码:

public class GuardedObject<T> {
    // 受保护的对象
    T obj;
    final Lock lock = new ReentrantLock();

    // 条件
    final Condition done = lock.newCondition();

    final int timeout = 1;
    // 获取受保护的队形啊
    T get(Predicate<T> p) {
        lock.lock();
        try {
            // MESA 模型的管程推荐写法
            while (!p.test(obj)) {
                done.await(timeout, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            lock.unlock();
        }
        return obj;
    }

    // 事件通知方法
    void onChanged(T obj) {
        lock.lock();
        try {
            this.obj = obj;
            done.signalAll();
        }finally {
            lock.unlock();
        }
    }
}

上面示例代码的核心是get() 方法通过条件变量 await() 实现等待onChanged() 方法通过条件变量 signalAll() 方法实现唤醒功能。

扩展 Guarded Suspension 模式

介绍了 Guarded Suspension 模式以及具体实现,那么看看能否使用这个模式解决上面的 Web 项目中的问题。

Guarded Suspension 模式里的两个核心方法:get() 和 onChanged() ,对应到上面处理 Web 请求的方法 handleWebReq() 中,可以调用 GuardedObjectget() 方法实现等待;在 MQ 消息的消费方法 onMessage() 中可以调用 GuardedObjectonChanged() 方法实现唤醒

// 处理浏览器发来的请求
Respond handleWebReq() {
  // 创建消息
  Message msg1 = new Message("1","{...}");
  
  // 发送消息
  send(msg1);
  
  // 利用 GuardedObject 实现等待
  GuardedObject<Message> go = new GuardedObject<>();
  
  Message r = go.get(t -> t != null);
}

void onMessage(Message msg) {
  // 实际中遇到的问题:如何找到匹配的 go?
  GuardedObject<Message> go = ???
  go.onChanged(msg);
}

在实现中遇到了一个问题:handleWebReq() 里创建了 GuardedObject 对象的实例 go,并调用其 get() 方法等待结果。

onMessage() 方法中,如何找到匹配的 GuardedObject 对象呢?

现实世界中,在大堂经理的头脑中,包间和就餐人之间建立了一个关系图,所以当服务员通知大堂经理包间准备完成后大堂经理可以通知到对应的就餐人。

我们可以参考大堂经理识别就餐人的办法,对 Guarded Suspension 模式进行扩展,从而使其可以解决上面的 GuardedObject 匹配问题。

在上面的 Web 程序中,每个发送到 MQ 的消息都有一个唯一的属性 id,所以可以维护一个 MQ 消息 idGuardedObject 对象实例之间的关系,这个关系可以类比为 大堂经理大脑中的包间和就餐人的关系。

有了这个思路,下面是具体实现:示例代码扩展了 Guarded Suspension 模式的实现,扩展后的 GuardedObject 内部维护了一个 Map,其 KeyMQ 消息的 idValueGuardedObject 对象实例。同时增加了静态方法 create() 和 fireEvent();

create():创建一个 GuardedObject 实例,并根据 key 值将其放入 Map

fireEvent(): 模拟大堂经理根据包间找就餐人的逻辑。

public class GuardedObject<T> {
    // 受保护的对象
    T obj;
    final Lock lock = new ReentrantLock();

    // 条件
    final Condition done = lock.newCondition();

    final int timeout = 2;

    // 保存所有 GuardedObject
    final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>();

    // 静态方法创建 GuardedObject
    static <K> GuardedObject create(K key) {
        GuardedObject<Object> go = new GuardedObject<>();
        gos.put(key, go);
        return go;
    }

    static<K,T> void fireEnvent(K key, T obj) {
        GuardedObject go = gos.remove(key);
        if (go != null) {
            go.onChanged(obj);
        }
    }

    // 获取受保护的对象
    T get(Predicate<T> p) {
        lock.lock();
        try {
            // MESA 模型的管程推荐写法
            while (!p.test(obj)) {
                done.await(timeout, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            lock.unlock();
        }
        return obj;
    }

    // 事件通知方法
    void onChanged(T obj) {
        lock.lock();
        try {
            this.obj = obj;
            done.signalAll();
        }finally {
            lock.unlock();
        }
    }
}

下面是将扩展后的 GuardedObject 用到上面的 Web 文件浏览器项目中的具体代码:

// 处理浏览器发送来的请求
Respond handleWebReq() {
  int id = 序号生成器.get();
  // 创建
  Message msg1 = new Message(id,"{...}");
  // 创建 GuardedObject 实例
  GuardedObject<Message> go = GuardedObject.create(id);
  // 发送消息
  send(msg1);
  Message r = go.get(t -> t != null);
}

void onMessage(Message msg) {
  // 唤醒等待线程
  GuardedObject.fireEvent(msg.id,msg);
}

总结

Guarded Suspension 本质上是一种等待唤醒机制的实现,只是将其总结规范化了。规范化的好处就是无需重头思考如何实现,也无需担心实现的程序的理解性问题,同时也避免了 bug 的出现。

但是 Guarded Suspension 在解决实际问题的时候往往还是需要进行扩展,扩展的方式很多,本章中就直接对 GuardedObject 的功能进行了增强。 DubboDefaultFeture 这个类也是采用了同样的方式,可以进行对比学习,会让你对这种模式的理解更深刻。

同时也可以创建新的类来对 Guarded Suspension 模式进行扩展。

Guarded Suspension 模式也经常被称为 Guarded Wait 模式、 Spin Lock 模式(因为使用了 while 循环等待),这些名字都很形象,但是还有一个更形象的非官方名字:多线程版本的 if

在单线程场景中,if 语句是不需要等待的,因为在只有一个线程的条件下,如果这个线程被阻塞,是没有其他活动线程的,这意味着 if 判断中的条件不会发生变化。而多线程场景下 if 判断条件的结果随时可能发生变化,所以用 "多线程版本的 if" 来理解这个模式会更简单。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。