极客时间 ——《Java并发编程实战》 15 | Dubbo 如何使用管程实现异步转同步

2020-10-28   10 次阅读


JDK 并发包中的 Lock 和 内置锁 synchronized 之间的区别:Lock 能够响应中断、支持超时、非阻塞地获取锁。

这一章介绍 Java 并发包中的 Condition —— 实现了管程模型中的条件变量的类。

在 08 中提到了,

  • {% post_link 读书笔记/极客时间/Java并发编程实战/第一部分—基础/08|管程 08 | 管程:并发编程的万能钥匙 %}

Java 内置的管程只有一个条件变量,而 Lock&Condition 实现的管程支持多个条件变量,这是二者的重要区别。

在很多并发场景下,支持多个条件变量能够让并发程序可读性更好实现起来更容易。例如一个阻塞队列,就需要两个条件变量。

如何使用两个条件变量快速实现一个阻塞队列

阻塞队列需要两个条件变量:

  • 队列不空,空队列不允许执行出队操作。
  • 队列不满,满队列不允许执行入队操作。

这个例子之前已经有过,重温一下代码如下:

public class BlockedQueue<T> {
    final Lock lock = new ReentrantLock();
    // 条件变量 队列不满
    final Condition notFull = lock.newCondition();
    // 条件变量 队列不空
    final Condition notEmpty = lock.newCondition();

    // 入队
    void enq(T x) {
        lock.lock();
        try {
            while (队列已满) {
                // 等待队列不满
                notFull.await();
            }
            // 省略入队操作
            // 入队后,通知可执行出队操作
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 出队

    void deq() {
        lock.lock();

        try {
            while (队列已空) {
                notEmpty.await();
            }
            // 省略出队操作
            // 出队后通知可执行入队操作
            notFull.signal();
        } finally {
            lock.unlock();
        }
    }
}

这里需要注意的是:LockCondition 实现的管程,线程等待和通知需要调用 awitsignalsignalAll,它们的语义和 waitnotifynotifyAll 是相同的。

但是不一样的是: Lock & Condition 实现的管程里只能使用它定义的 awitsignalsignalAll,而不能使用内置管程实现的关键字 waitnotifynotifyAll

如果在 Lock&Condition 中使用了后者,则程序会出现大问题。

JDK 并发包中的 LockCondition 只是管程的一种实现,作者在下面的例子中举了 Dubbo 是怎样使用 LockCondition的。

同步与异步

我们一般编写的代码都是同步代码,即顺序流。但是最近异步编程越来越火,那么同步和异步之间的区别是什么呢?

通俗的讲是调用方法是否需要等待结果

  • 同步:需要等待结果。
  • 异步:不需要等待结果。

比如下面的代码中,有一个计算圆周率后 100万位的方法 pai1M() ,这个方法可能执行时间很长,比如需要执行两周,如果调用方法后线程一直等待着计算结果,等方法执行完成之后继续执行下一行 printf("hello world"),这就属于同步

如果调用 pai1M() 之后线程不用等待方法执行完成,可以立刻执行下一行 printf("hello world") ,这就属于异步


// 计算圆周率小说点后100万位 
String pai1M() {
  //省略代码无数
}

pai1M()
printf("hello world")

同步是 Java 中默认的处理方式,如果想让程序支持异步,可以使用下面两种方式实现:

  1. 调用方创建一个子线程,在子线程中执行方法调用,这种一般称为异步调用
  2. 方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return,这种方法我们一般称为异步方法

Dubbo 源码分析

编程领域中,异步的应用场景很多, TCP 协议本身就是异步的,在工作中经常用到的 RPC 调用:在 TCP 协议的层面来分析,发送完 RPC 请求后,线程是不会等待 RPC 响应结果的。但是你会奇怪,工作中的 RPC 大部分都是同步的,这是怎么做的呢?

这就是框架层帮我们将异步转为了同步,比如 Dubbo 就实现了这个功能,下面是相关源码的分析:

对于一个简单的 RPC 调用,默认情况下 sayHello 方法是同步方法,也就是当执行到 service.sayHello("Dubbo"); 时线程会停下来等待方法的执行结果:


DemoService service = 初始化部分省略
String message = service.sayHello("dubbo");
System.out.println(message);

此时如果将调用线程 dump 出来,会是下图这样,你会发现调用线程阻塞了,线程状态是 TIMED_WAITING。本来发送的请求是异步的,但是调用线程却阻塞了,说明 Dubbo 帮我们做了异步转同步的事情。

通过调用栈,你能看到线程是阻塞在 DefaultFuture.get 方法上,所以可以推断:Dubbo 转同步的功能是通过 DefaultFuture 这个类实现的。

img

为了理清前后关系,还是需要分析调用 DefaultFuture.get() 之前发生了什么。 DubboInvoker 的 **108**行调用了 DefaultFuture.get() ,这一行很关键

public class DubboInvoker{
	Result doInvoke(Invocation inv) {
		// 下面这行是源码中的 108 行,为了方便展示,做了修改
		return currentClient.request(inv,timeout).get();
	}
}

DefaultFuture 这个类很关键,作者这里精简了相关代码,列到了下面。但是在看代码之前还是需要重复确认一下需求:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待。当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。这其实就是一个经典的 等待—通知 机制,我们用管程也是能实现这个功能的,下面看看 Dubbo 是怎么实现的:

// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout) {
	long start = System.nanoTime();
	lock.lock();
	try{
		while(!isDone()) {
			done.await(timeout);
			long cur = System.nanoTime();
			if(isDone() || cur - start > timeout) {
				break;
			}
		}
	} finanly {
		lock.unlock();
	}
	if(!isDone()) {
		throw new TimeoutException();
	}
	return returnFromResponse();
}

// RPC 结果是否已经返回
boolean isDone() {
	return response != null;
}

// RPC 结果返回时调用该方法
private void doReceived(Response res) {
	lock.lock();
	try{
		response = res;
		if(done != null) {
			done.signal();
		}
	}finally {
		lock.unlock();
	}
}

调用线程通过 get() 方法等待 RPC 返回结果,这个方法的实现是我们很熟悉的套路:

  • 调用 lock() 获取锁,在 finally 中调用 unlock() 释放锁
  • 获取锁后,通过经典的在循环中调用 await() 的方法来实现等待功能。

RPC 结果返回时,会调用 doReceived() 方法,在这个方法中调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁。

获取锁喉通过调用 signal() 通知调用线程,结果已经返回,不需要继续等待了。

以上就是 Dubbo 中异步转同步的源码分析,最近几年工作中需要异步处理的越来越多了,其中一个主要原因就是有些 API 本身就是 异步API。

例如 websocket 也是一个异步的通信协议,如果基于这个协议实现一个简单的 RPC,也会遇到 异步转同步的问题。

很多公有云的 API 本身也是异步的,例如创建一个云主机就是异步 API ,调用虽然成功了,但是实例并没有立即被创建完成,需要调用另一个 API 不停的去轮询实例的状态。

如果你需要在项目封装创建云主机的API,也会面临异步转同步的问题,因为同步的 API 更易用。

总结

Lock&Condition 是管程的一种实现,所以能否用好 Lock 和 Condition 取决于对于管程模型的理解是否深刻。

例子中对于 Dubbo 的 DefaultFuture 代码进行了很多缩减,完整版在下面:

Dubbo 的源代码在Github 上,DefaultFuture 的路径是:

incubator-dubbo/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。