JDK源码学习——锁之基石 AQS 类

2020-09-15   22 次阅读


学前问题:

1、什么是 AQS?

AQS 是一个用于构建同步器框架,许多同步器都可以通过 AQS 很容易并且高效地构造出来。 —— 《Java并发编程实战》

AQS 作为基础组件,封装了核心并发操作,主要实现了独占以及共享模式下的资源获取以及释放。 —— 博客文章

AQS 提供了一种原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。 —— 美团技术团队《从 ReentrantLock 的实现看AQS的原理及应用》

看了书,也看了不少博文,实际也看了源码画了继承关系,可以看到 AQS 是大厦的基石,不少并发包中的类都在 AQS 的基础上进行构建,同时一些文章认为学习 AQS 是学习 JUC 并发包的切入点,那我也就从 AQS 开始进行学习吧。

通过 ReentrantLock 实现 学习 AQS 原理以及应用

文章来自美团技术,我负责重制图片,添加一些源码,和增加一些自己的理解以及疑问,并找出答案。

1.ReentrantLock

1.1ReentrantLock 特性概览

ReentrantLock可重入锁一个线程可以反复获取同一个临界区的锁,下面是 ReentrantLock 和 Java自带的隐式锁 synchronized 关键字的区别,加粗部分是本篇文章重点剖析的点:

ReentrantLocksynchronized
锁实现机制AQS监视器模式(编程语言同步原语支持)
灵活性支持响应中断,锁超时,尝试获取锁不灵活,无法在阻塞状态释放锁
释放形式显示调用 unlock()自动释放【加、解锁成对由编译器自动生成】
锁类型公平锁 & 非公平锁非公平锁
条件队列可关联多个条件队列(这里的条件队列也就是条件变量)只有一个条件队列(Java 的管程模型只有一个条件变量)
可重入性可重入可重入

下面是伪代码,用于更直观的进行比较:

// ****************** synchronized 的使用方式 ******************

// 1.代码块
synchronized(this) {
  
}
// 2.用于对象 【这个我不太理解,这里的区别只是锁对象不同,但是本质还是对一个代码块进行加锁,构建出一个临界区,所以改为使用自定义锁对象较好】
synchronized(object) {
}

// 3.给方法加锁
public synchronized void test() {}

//4. 可重入性
for (int i = 0; i < 100; i++) {
  synchronized(this) {} // 单个线程可以多次获取同一个锁
}


// ****************** ReentrantLock 的使用方式 ******************
// 这里作者代码有个问题,就是 throws 写成了 throw,我写的也有问题,ReentrantLock 拼错了,还是重新用 IDE 校正了一遍才改过来
public class ReentrantLockTest {
  public void test() throws Exception {
    // 1.初始化锁时可以指明锁类型,默认初始化类型是 非公平锁 
    ReentrantLock lock = new ReentrantLock(true);

    // 2.给代码块加锁
    lock.lock();
    try {
      try {
        // 3.支持多种加锁方式,具有可重入性
        if(lock.tryLock(100, TimeUnit.MILLISECONDS)) {
          // 业务逻辑
        }
      } finally {
        // 4.在 finally 中手动释放锁,try-finally 是 Lock 的编程范式,需要保证锁的释放
        lock.unlock();
      } 
    }
    finally {
      lock.unlock();
    }
  }
}

1.2 ReentrantLock 与 AQS 之间的关联

ReentrantLock 支持公平锁和非公平锁(这里作者给了个链接,是对这两种锁的更详细的描述,《不可不说的Java“锁”事》),ReentrantLock 的底层是 AQS 实现的,ReentrantLock 是如何将公平锁与非公平锁与 AQS 进行关联?通过看源码,梳理这两者的加锁过程来理解它们与 AQS 之间的关系(加锁过程中的关联比较明显)。

非公平锁NonFiare的加锁流程如下:

// ReentrantLock&NonfairSync
static final class NonfairSync extends Sync {
  private static final long serialVersionUID = 7316153563782823691L;
  final void lock() {
    if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
    else
      acquire(1);
  }

  protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
  }
}

代码详细说明:

  • 如果通过 CAS 设置 State变量成功,则代表成功获取锁,将当前线程设置为独占线程 setExclusiveOwnerThread
  • 如果通过 CAS 设置 State变量失败,则进入 acquire 方法,进行后续处理。

第一部很好理解,第二步后续的处理策略是什么呢,可能会有以下思考:

  • 当一个线程尝试获取锁的失败后续流程,有两种可能:
    • 1、将当前线程获取锁的结果设置为失败,获取锁流程结束。这种方法会极降低系统的并发度,无法满足实际需求,所以需要的是下面这种流程,也是 AQS 框架的处理流程
    • 2、存在一个等待队列,线程继续等待,保留获取锁的可能,获取锁的流程仍然继续

针对第2种可能,也就是真实的 AQS 实现,也会有许多问题

  • 等待队列的数据结构是什么样的?
  • 处于等待队列中的线程什么时候有机会获取锁?【这里根据我现有的知识先超前回答一下,当其他线程执行完成后会唤醒队列中的线程,然后队列中的线程继续尝试获取锁】
  • 处于等待队列中的线程如果一直无法获取锁,有没有别的处理策略,是一直等待吗?还是会超时?

带着非公平锁的疑问,再看下公平锁的锁获取方式

// java.util.concurrent.locks.ReentrantLock#FairSync
static final class FairSync extends Sync {
  final void lock() {
    acquire(1);
  }
}

可以看到,这里直接使用 acquire 函数进行加锁,具体是怎样实现的呢?

这里公平锁和非公平锁的加锁流程中都调用了 acquire 方法,这个方法是 FairSyncNonfairSync 父类 AQS 中的核心方法。而上面提到的问题在 ReentrantLock 中是找不到答案的,需要去 AbstractQueuedSynchronizer 中去寻找答案。

下面对 AQSReentrantLock 之间的关联关系做详细介绍。

2.AQS 学习

AQS 整体框架图:

image-20200912171328257

image-20200912171337452

image-20200912173227608

image-20200912173235603

image-20200912173246177

image-20200912173216907

image-20200912173307090

确实挺复杂,方法很多,属性也不少。

  • 文章将 AQS 类划分为了5层,从对外暴露的 API 到 底层的数据结构。
  • 当有 自定义同步器 接入时,只需要重写第一层需要的部分方法,而不需要关注底层的具体实现流程。【设计上的精妙之处】,当自定义同步器进行加锁或者解锁操作时,经过第一层API 进入 AQS 方法内部【入口】,经过第二层进行锁的获取【真正去获取锁】,对于获取锁失败的流程,进入第三层第四层等待队列处理,这些依赖于第五层基础数据提供

image-20200912193740376

AQS 原理概览

AQS 的核心思想:

  • 如果请求的共享资源空闲,就将请求资源的线程设置为有效的工作线程,将共享资源状态变更为锁定。【资源申请成功】
  • 如果请求的资源被占用,需要 阻塞-等待-唤醒 机制来保证锁分配。这个机制用 CLH 队列的变体实现,将暂时无法获取锁的线程加入到队列中

概念什么是CLH Queue? ===> 自旋锁队列

CLH: CraigLandin and Hagersten 队列,单向链表AQS 中的队列是 CLH 队列的变体,虚拟双向队列(FIFO)AQS 通过将每个请求共享资源的线程封装成一个节点来实现所的分配

自旋锁的优点:浪费了一部分计算资源,但是在**锁很快被释放的前提下**,自旋锁的开销低于线程阻塞唤醒的开销。【可以看到,应用场景和利弊说的都很明白了】

主要原理图如下:

2.1.1AQS 数据结构

AQS 中最基本的数据结构 NodeNode 是上面 CLH变体队列中的节点。这里作者画了一个挺复杂的图,我仔细一看,就是 Node类的结构,所以我就直接截图了,也更清晰一些。

image-20200915014733174

Node 中的方法和属性值的含义:

方法和属性值含义
waitStatus当前节点在队列中的状态
thread处于该节点的线程
prev前驱指针
predecessor返回前驱节点,没有的话抛出空指针
nextWaiter指向下一个处于 CONDITION 状态的节点(这篇文章部讲述 Condition Queue 队列,所以对这个指针没有过多介绍)
next后继指针

线程两种锁的模式:

  • SHARED —— 线程以共享模式等待锁
  • EXCLUSIVE —— 线程以独占方式等待锁

waitStatus的值的含义:

  • 0 —— Node 初始化时的默认值The values are arranged numerically to simplify use.)
  • 1CANCELLED——线程获取锁的请求已经取消(value to indicate thread has cancelled)
  • -2CONDITIOn —— 节点在等待队列中,节点线程等待唤醒(value to indicate thread is waiting on condition
  • -3PROPAGATE —— 当前线程处于 SHARED 情况下,该字段才会使用(waitStatus value to indicate the next acquireShared should unconditionally propagate
  • -1SIGNAL —— 线程已经做好准备,等待资源释放(value to indicate successor's thread needs unparking

2.1.2同步状态 State

AQS 中维护了一个 int 字段作为 synchronization 的状态,【第一感觉是管程 synchronized 中的 条件变量。先写在这,不一定对,等学完再返回来看这个第一印象是否正确。】

private volatile int state;

protected final int getState() {
  return state;
}


protected final void setState(int newState) {
  state = newState;
}


protected final boolean compareAndSetState(int expect, int update) {
  // See below for intrinsics setup to support this
  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

同时提供了修改状态的方法,可以看到这三个方法都是 final 的,不允许子类重写,保证了安全性。通过修改 state 字段就可以实现多线程的独占加锁模式和共享加锁模式,下面是流程图:

独占模式:

共享模式:

对于自定义的同步工具,需要自定义获取同步状态和释放状态的具体方式,也就是 AQS 架构图中的第一层: API 层。【也就是如果要自己实现一个类似锁的功能,需要重写某些方法。】

2.2 AQS 重要方法与 ReentrantLock 的关联

从架构图中可以看到,AQS 提供了大量用于自定义同步器实现的 Protected 方法。【这里需要返回去再看下,第一次学到这里已经忘了架构图的具体内容了】 自定义同步器实现的相关方法也只是为了修改 State 字段的值,来实现多线程的独占模式或共享模式。自定义同步器需要实现以下方法**【 ReentrantLock 也可以视为一种自定义同步器,所以它也实现了下面的方法,一会可以重点关注一下。】**

方法名描述
protected boolean isHeldExclusively()线程是否正在独占资源,只有用到 Condition 时才需要去实现它。【什么时候需要用到 Condition?】
protected boolean tryAcquire(int arg)独占方式。arg尝试获取锁的次数,成功返回 true,失败返回 false
protected boolean tryRelease(int arg)独占方式,arg 为释放锁的次数,尝试释放资源,成功返回 true,失败返回 false
protected int tryAcquireShared(int arg)共享方式,arg尝试获取锁的次数负数表示失败0表示成功,但没有剩余可用资源正数表示成功,且存在剩余资源
protected boolean tryReleaseShared(int arg)共享方式,arg为释放锁的次数,尝试释放资源。如果释放后允许唤醒后续等待节点返回 true,否则返回 false

一般来说只会使用一种获取锁的方式:要么共享,要么独占。但是也有同时实现共享和独占两种获取锁的方式的 —— ReentrantReadWriteLock 读写锁就实现了这两种方式,而 ReentrantLock 是独占锁,所以只实现了 tryAcquire —— tryRelease

下面以非公平锁为例子,阐述了非公平锁与AQS之间方法的关联之处:

image-20200913005721337

为了理解 ReentrantLockAQS 之间方法交互的过程,作者以**非公平锁为例,把加锁解锁交互流程**单独列了出来:

加锁:

  • 通过 ReentrantLocklock() 方法进行加锁
public void lock() {
    sync.lock();
}
  • sync内部类,并且 Sync#lock抽象方法,具体执行什么根据 ReentrantLock 初始化时选择的是公平锁还是非公平锁,执行对应的内部类的Lock方法,本质上都会执行 AQSAcquire 方法。
// 非公平锁
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 这里会调用 AbstractQueuedSynchronizer 的 acquire
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
 // 公平锁
 static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            // 这里会调用 AbstractQueuedSynchronizer 的 acquire
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
  • AQSacquire 内部逻辑会调用 tryAcquire 方法,但是 tryAcquire 需要自定义同步器进行实现**【这里的实现就是 ReentrantLock】,所以真正执行的是ReentrantLock 中的 tryAcquire 方法,ReentrantLock 内部又分为公平锁非公平锁的实现,所以最终落地的是初始化时选择的锁类型对应的 tryAcqire 方法。**
// AQS 类中的 acquire 方法
public final void acquire(int arg) {
  // 如果获取失败 并且 等待时没有被打断 则自我中断
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}
// AQS 中的 tryAcquire ,可以看到如果直接使用AQS中的 tryAcquire 是会报错的,这里的实现交给了具体的自定义同步器去自己实现
protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}

image-20200915002629224

  • tryAcquire获取锁的逻辑,如果获取锁失败,则执行 AQS后续逻辑,此时跟 ReentrantLock 也就是我们的自定义同步器就无关了。

image-20200915003359654

image-20200915000821411

解锁:

  • ReentrantLock#unlock 解锁
// ReentrantLock
public void unlock() {
  sync.release(1);
}
  • sync#release 继承于 AQS
//AQS
public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

  • tryRelease自定义同步器实现,这里是ReentrantLockSync内部类中实现,可以看到解锁不区分公平锁非公平锁
// ReentrantLock.Sync#tryRelease 
protected final boolean tryRelease(int releases) {
  int c = getState() - releases;
  if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
  boolean free = false;
  if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
  }
  setState(c);
  return free;
}
  • 释放成功后,后续处理AQS 框架完成,与 ReentrantLock 无关。

image-20200915010211038

通过上面的描述,大概可以总结出 ReentrantLock加锁解锁API核心方法的映射关系

2.3 通过ReentrantLock理解AQS

ReentrantLock公平锁和非公平锁的底层实现是相同的,所以这里以非公平锁为例进行分析

// java.util.concurrent.locks.ReentrantLock

static final class NonfairSync extends Sync {
  private static final long serialVersionUID = 7316153563782823691L
  final void lock() {
    if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
    else
      // 这里会调用 AbstractQueuedSynchronizer 的 acquire
      acquire(1);
  }

  protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
  }
}

// AQS 中的 acquire
public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

// AQS 中的 tryAcquire
protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}

这里的 AQS 并没有实现 tryAcquire ,而是将获取锁的方法交给自定义同步器自己去实现,这里是由公平锁和非公平锁单独进行实现

ReentrantLock 为例:

  • 如果该方法返回了 true,则说明当前线程获取锁成功,代码不必往后继续执行
  • 如果获取失败返回 false,则需要将等待获取锁的线程加入等待队列,之后详细解释线程何时以及如何被加入队列

2.3.1 线程加入等待队列

2.3.1.1 何时加入队列

执行 Acquire(1) 时,会通过 tryAcquire 获取锁,这种情况下,如果锁获取失败,就会调用 addWaiter 将线程加入到等待队列中。

【下面是我将代码找出来梳理了一下,感觉会更加清晰一些】

// 1.ReentrantLock.Nonfaire#lock()
final void lock() {
  if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
  else
    acquire(1); // 这里会调用超类 AQS 中的方法
}

// 2. AQS#acquire
public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

// 3. AQS 调用的 tryAcquire 是子类自己实现的,这里又回到了 ReentrantLock.Nonfaire#tryAcquire()
public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

// 4. 如果锁获取失败,则执行 AQS 中的 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 这一步

2.3.1.2 如何加入队列

这里就要看 addWaiter(Node.EXCLUSIVE), arg) 代码的实现了:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
  // 1.新建一个Node,其内容是当前线程和使用的锁的模式
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  // 将 Pred 指针指向尾节点 tail ,这里的意义是上面的注释:尝试快速入队操作 后半句没太理解是啥意思
  Node pred = tail;
  if (pred != null) {
    // 将 prev 指针 指向 pred,目的是什么?
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  // 如果队列中没有等待元素,或者 Pred 指针和 Tail 指向的位置不同(队列被别的线程修改了)此时进入 enq 方法
  enq(node);
  return node;
}


private final boolean compareAndSetTail(Node expect, Node update) {
  return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

主要流程如下:

  • 通过当前线程和锁模式新建一个节点(Node
  • Pred 指针指向尾节点 Tail
  • 将新创建的 NodePrev 指针指向 Pred
  • 通过 compareAndSetTail 方法,完成尾节点的设置。 这个方法针对tailOffsetExpect 进行比较,如果 tailOffsetNodeExpectNode 地址相同,则设置 Tail 的值为 Update 的值。【这步有点复杂】
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// AQS 的静态代码块

static {
  try {
    stateOffset = unsafe.objectFieldOffset
      (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
    headOffset = unsafe.objectFieldOffset
      (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
    tailOffset = unsafe.objectFieldOffset
      (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
    waitStatusOffset = unsafe.objectFieldOffset
      (Node.class.getDeclaredField("waitStatus"));
    nextOffset = unsafe.objectFieldOffset
      (Node.class.getDeclaredField("next"));

  } catch (Exception ex) { throw new Error(ex); }
}

从上面的 AQS 静态代码块中可以看出,都是获取一个对象的属性相对于该对象在内存中的偏移量,这样我们就可以根据这个偏移量在对象内存中找到这个属性tailOffset —— tail 对应的偏移量。所以这时候将 new 出来的 Node 置为当前队列的尾节点。同时,由于等待队列是双向链表,也需要将前一个节点指向尾节点。【这个的内部逻辑还是没明白,需要掰碎了,尝试自己再去找吧】

  • 如果 Pred 指针是 Null(说明等待队列中没有元素)或者 Pred 指针和 Tail 指向的位置不同(说明被别的线程修改),此时需要看一下 enq 方法
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
     * Inserts node into queue, initializing if necessary. See picture above.
       将 Node 插入到 队列中,必要时进行初始化
     * @param node the node to insert
     * @return node's predecessor
     */
private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize 这时进行初始化,但是初始化的头节点是无参构造函数的
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      // 这里的处理和 enq 之前的处理逻辑一样 如果 tailOffset 的 Node 和 Expect 的 Node 地址相同,则设置 Tail 的值为 Update 的值
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

如果没有被初始化,就需要初始化一个头节点。但是需要注意的是,初始化的头节点并不是当前线程节点,而是调用了无参构造函数的节点。

如果经历了初始化或者在并发的情况下被其他线程修改了等待队列,导致队列中存在元素,则与之前的方法相同

其实 addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头节点是一个无参构造函数的头节点。

总结一下线程获取锁的大概过程:

  • 1、当没有线程获取到锁时,线程1获取锁成功。
  • 2、线程2申请锁,但线程1占有之后的处理流程。

等待队列示意图:

回到上边的代码,hasQueuedPredcessor公平锁 加锁判断等待队列中是否存在有效节点的方法。如果返回 false,说明当前线程可以尝试获取锁,如果返回 true,说明队列中存在有效节点,当前线程必须加入等待队列等待获取锁。

// java.util.concurrent.locks.ReentrantLock

static final class FairSync extends Sync {
  
  protected int tryAcquireShared(int acquires) {
    for (;;) {
      if (hasQueuedPredecessors())
        return -1;
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 ||
          compareAndSetState(available, remaining))
        return remaining;
    }
  }
}
  
// ReentrantLock#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
  // The correctness of this depends on head being initialized
  // before tail and on head.next being accurate if the current
  // thread is first in queue.
  Node t = tail; // Read fields in reverse initialization order
  Node h = head;
  Node s;
  return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

这里需要理解 h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); 的含义:【下面是我的分析,原来作者也分析了,在下面的引用中是作者的分析】

  • 第一个判断条件: h != t,也就是 头节点和尾节点不相同。
  • 第二个判断条件: (s = h.next) == null 【也就是判断 头节点的下一个节点是否为空】 || 当前线程和 头节点的下一个节点并不相等
  • 返回 第一个判断条件 && 第二个判断条件的值 也就是 头节点不等于尾节点 && 尾节点下一个节点为空 当前线程和下一个节点的线程不相符 的结果

这里的问题是:为什么要判断 头节点的下一个节点是否为空? 第一个节点存储的数据是什么?

双向链表中,第一个节点为虚节点,不存储任何信息,只做占位功能。真正第一个有数据的节点从第二个开始:

h != t 时,如果 (s = h.next) == null , 说明等待队列中有线程正在进行初始化,但是只进行到了 Tail 指向 Head,而没有将 Head 指向 Tail,此时队列中有元素,需要返回 true(这块代码下面具体分析)

如果 (s = h.next != null) ,说明此时队列中至少有一个有效节点

如果此时 s.thread == Thread.currentThread() 说明等待队列的第一个有效节点与当前线程相同,说明当前线程可以获取锁

如果 s.thread != Thread.currentThread() 说明等待队列中已经存在其他节点,则当前线程也需要进入等待队列等待获取锁。

// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
// AQS 中的 入队操作:
private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t; 
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

节点入队不是原子操作,所以会出现短暂的 head != tail ,此时 Tail 指向最后一个节点,并且 Tail 指向 Head。 如果 Head 没有指向 Tail也需要将发生这种情况的相关线程加入队列

这块代码是为了解决极端情况下的并发问题

【这里我有部分没看懂,下面是 compareAndSetHead 的源码分析,我试试看从源码的角度能不能理解作者给出的结论】

// java.util.concurrent.locks.AbstractQueuedSynchronizer#compareAndSetHead 
// 这里调用的是底层的 compareAndSwapObject ,传入的参数是当前AQS对象和刚才 new 出来的 Node,偏移量是在静态代码块中随着类加载被初始化的,是 head 字段定义的值

private final boolean compareAndSetHead(Node update) {
  return unsafe.compareAndSwapObject(this, headOffset, null, update);
}



// sun.misc.Unsafe#compareAndSwapObject 
// 这是一个 native 方法 CAS 系列的方法作用是原子操作比较并交换两个值,底层是 CPU 提供的 CAS 支持
/***
   * @param obj the object containing the field to modify. --> obj 包含要修改的字段的对象,这里对应的是 this 代表的是一个 AQS 对象
   * @param offset ----> headOffSet 偏移量,但是这个偏移量的具体计算我不知道
   * @param expect  ---> 期望值,传入的是 null 
   * @param update the new value of the field if it equals <code>expect</code>. ---> 如果该字段的值和期望值相同,更新字段的值,这里传入的是一个 new Node() ,那 new Node() 怎么会和 null 相同呢?
   * @return true if the field was changed.
   */
public native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);

【看源码还是没看懂,算了 先略过。】

2.3.1.3 等待队列中线程出队列的时机

2.3.2 CANCELLED 状态节点生成

2.3.3 如何解锁

2.3.4 中断恢复后的执行流程

2.3.5 小结

3. AQS 应用

3.1 ReentrantLock 的可重入应用

3.2 JUC 并发包中的应用场景

3.3 自定义同步工具

总结

参考资料

  • Lea D. The java. util. concurrent synchronizer framework[J]. Science of Computer Programming, 2005, 58(3): 293-309.
  • 《Java并发编程实战》
  • 不可不说的Java“锁”事

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

最是人间留不住,曾是惊鸿照影来。