JDK并发包 —— AQS 类架构与源码解析

2020-09-13   14 次阅读


学前问题:

1、什么是 AQS?

AQS 是一个用于构建同步器框架,许多同步器都可以通过 AQS 很容易并且高效地构造出来。 —— 《Java并发编程实战》

AQS 作为基础组件,封装了核心并发操作,主要实现了独占以及共享模式下的资源获取以及释放。 —— 博客文章

AQS 提供了一种原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。 —— 美团技术团队《从 ReentrantLock 的实现看AQS的原理及应用》

看了书,也看了不少博文,实际也看了源码画了继承关系,可以看到 AQS 是大厦的基石,不少并发包中的类都在 AQS 的基础上进行构建,同时一些文章认为学习 AQS 是学习 JUC 并发包的切入点,那我也就从 AQS 开始进行学习吧。

AQS 学习 (以下转自美团技术)图是我重新绘制的

AQS 整体框架图:

image-20200912171328257

image-20200912171337452

image-20200912173227608

image-20200912173235603

image-20200912173246177

image-20200912173216907

image-20200912173307090

确实挺复杂,方法很多,属性也不少。

  • 文章将 AQS 类划分为了5层,从对外暴露的 API 到 底层的数据结构。
  • 当有 自定义同步器 接入时,只需要重写第一层需要的部分方法,而不需要关注底层的具体实现流程。【设计上的精妙之处】,当自定义同步器进行加锁或者解锁操作时,经过第一层API 进入 AQS 方法内部【入口】,经过第二层进行锁的获取【真正去获取锁】,对于获取锁失败的流程,进入第三层第四层等待队列处理,这些依赖于第五层基础数据提供

image-20200912193740376

AQS 原理概览

AQS 的核心思想:

  • 如果请求的共享资源空闲,就将请求资源的线程设置为有效的工作线程,将共享资源状态变更为锁定。【资源申请成功】
  • 如果请求的资源被占用,需要 阻塞-等待-唤醒 机制来保证锁分配。这个机制用 CLH 队列的变体实现,将暂时无法获取锁的线程加入到队列中。

AQS 使用了一个 Volatileint 类型 state 表示同步状态,通过内置的 FIFO 队列完成资源的排队,通过 CAS 完成对 State值的修改:

// 同步状态
private volatile int state;

// 内置 FIFO 队列 Node
static final class Node {

  static final Node SHARED = new Node();

  static final Node EXCLUSIVE = null;


  static final int CANCELLED =  1;

  static final int SIGNAL    = -1;

  static final int CONDITION = -2;

  static final int PROPAGATE = -3;


  volatile int waitStatus;


  volatile Node prev;


  volatile Node next;


  volatile Thread thread;

  Node nextWaiter;


  final boolean isShared() {
    return nextWaiter == SHARED;
  }


  final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
      throw new NullPointerException();
    else
      return p;
  }

  Node() {    // Used to establish initial head or SHARED marker
  }

  Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
  }

  Node(Thread thread, int waitStatus) { // Used by Condition
    this.waitStatus = waitStatus;
    this.thread = thread;
  }
}

AQS 数据结构

AQS 中最基本的数据结构 NodeNode 是上面 CLH变体队列中的节点

Node 中的方法和属性值的含义:

方法和属性值含义
waitStatus当前节点在队列中的状态
thread处于该节点的线程
prev前驱指针
predecessor返回前驱节点,没有的话抛出空指针
nextWaiter指向下一个处于 CONDITION 状态的节点(这篇文章部讲述 Condition Queue 队列,所以对这个指针没有过多介绍)
next后继指针

线程两种锁的模式:

  • SHARED —— 线程以共享模式等待锁
  • EXCLUSIVE —— 线程以独占方式等待锁

waitStatus的值的含义:

  • 0 —— Node 初始化时的默认值(The values are arranged numerically to simplify use.)
  • 1CANCELLED——线程获取锁的请求已经取消(value to indicate thread has cancelled)
  • -2CONDITIOn —— 节点在等待队列中,节点线程等待唤醒(value to indicate thread is waiting on condition )
  • -3PROPAGATE —— 当前线程处于 SHARED 情况下,该字段才会使用(waitStatus value to indicate the next acquireShared should unconditionally propagate)
  • -1SIGNAL —— 线程已经做好准备,等待资源释放(value to indicate successor's thread needs unparking *)

同步状态 State

AQS 中维护了一个 int 字段作为 synchronization 的状态,【第一感觉是管程 synchronized 中的 条件变量。先写在这,不一定对,等学完再返回来看这个第一印象是否正确。】

private volatile int state;

protected final int getState() {
  return state;
}


protected final void setState(int newState) {
  state = newState;
}


protected final boolean compareAndSetState(int expect, int update) {
  // See below for intrinsics setup to support this
  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

同时提供了修改状态的方法,可以看到这三个方法都是 final 的,不允许子类重写,保证了安全性。通过修改 state 字段就可以实现多线程的独占加锁模式和共享加锁模式,下面是流程图:

独占模式:

共享模式:

对于自定义的同步工具,需要自定义获取同步状态和释放状态的具体方式,也就是 AQS 架构图中的第一层: API 层。【也就是如果要自己实现一个类似锁的功能,需要重写某些方法。】

AQS 重要方法与 ReentrantLock 的关联

从架构图中可以看到,AQS 提供了大量用于自定义同步器实现的 Protected 方法。【这里需要返回去再看下,第一次学到这里已经忘了架构图的具体内容了】 自定义同步器实现的相关方法也只是为了修改 State 字段的值,来实现多线程的独占模式或共享模式。自定义同步器需要实现以下方法**【 ReentrantLock 也可以视为一种自定义同步器,所以它也实现了下面的方法,一会可以重点关注一下。】**

方法名描述
protected boolean isHeldExclusively()线程是否正在独占资源,只有用到 Condition 时才需要去实现它。【什么时候需要用到 Condition?】
protected boolean tryAcquire(int arg)独占方式。arg尝试获取锁的次数,成功返回 true,失败返回 false
protected boolean tryRelease(int arg)独占方式,arg 为释放锁的次数,尝试释放资源,成功返回 true,失败返回 false
protected int tryAcquireShared(int arg)共享方式,arg尝试获取锁的次数负数表示失败0表示成功,但没有剩余可用资源正数表示成功,且存在剩余资源
protected boolean tryReleaseShared(int arg)共享方式,arg为释放锁的次数,尝试释放资源。如果释放后允许唤醒后续等待节点返回 true,否则返回 false。

一般来说只会使用一种获取锁的方式:要么共享,要么独占。但是也有同时实现共享和独占两种获取锁的方式的 —— ReentrantReadWriteLock 读写锁就实现了这两种方式,而 ReentrantLock 是独占锁,所以只实现了 tryAcquire —— tryRelease

下面以非公平锁为例子,阐述了非公平锁与AQS之间方法的关联之处:

image-20200913005721337

【问题,这两种的方式的区别?以及有没有例子?】

源码学习:类注释阅读与总结

【源码的注释中存在着最核心的一手信息,看别人写的文章之前,建议先读注释,毕竟这是代码作者写的东西。】

**核心定义:**这个类的注释非常长,分为好几个部分,可以看的出来确实是很重要

  • 提供了实现阻塞锁和相关同步器依靠先进先出(FIFO)的等待队列(Semaphore、Events etc)的框架。
  • 子类应被定义为用于实现其封闭类的同步性能的非公共内部辅助类。(Subclass should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class)

所以可以看到,这些使用 AQS 的类都是定义了一个内部子类去实现 AQS。

D13E7A2E-0290-478B-A707-C41A20330BFF

  • 用AQS 作为同步的基础上需要子类重新定义下面方法,用来检查和修改同步状态:getStatesetStatecompareAndSetState
  • 下面的方法会抛出 UnsupportedOperationException 异常
    • tryAcquire
    • tryRelease
    • tryeAcquireShared
    • tryReleaseShared
    • isHeldExclusively

所以需要子类自己去重新实现,这些方法的内部必须是线程安全的,并且应该短小且非阻塞(Implementations of these methods must be internally thread-safe, and should in general be short and not block.

AQS 的具体作用?

AQS 的类结构

AQS 的继承关系:

D13E7A2E-0290-478B-A707-C41A20330BFF

可以看到:CountDownLatchSemaphoreReentrantLockReentrantReadWriteLock 都是基于 AQS 构建的。

image-20200912154056503

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。