AQS 原理及应用
前言
Java 中许多同步类都是基于 AbstractQueuedSynchronized 来进行实现的。它是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。
本文是阅读 从ReentrantLock的实现看AQS的原理及应用 笔记。
从 ReentrantLock 出发
简介
ReentrantLock 就是基于 AQS 进行实现的,它的意思是可重入锁,指的是同一个线程能够对一个临界资源。
简单来说可重入锁,即 一个记录同步资源状态的字段,可以被同一个线程进行重复修改,并且也要进行同样次数的 Release 进行锁释放。
Synchronized 也是一种可重入锁,但是它是基于监视器模式,Monitor对象 来记录锁状态,并且可以自动释放监视器。
方法与表层的实现
我们可以可以简单看一下方法
// 加锁
public void lock() {
sync.lock();
}
// 释放锁
public void unlock() {
sync.release(1);
}
// 等等等..........
我们可以注意到,它的加锁等方法全部调用了 sync 对象,它的内部类信息如下,一个继承了 AQS 的抽象类,并且默认提供了 lock 抽象方法,并对 AQS 中为实现的方法进行了基本实现,除了 tryAcquire() 方法。它重写 AQS 实现了:
- tryRealease
- isHeldExclusively
abstract static class Sync extends AbstractQueuedSynchronizer {
// .......................
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
// .............
}
ReentrantLock 构造函数如下,它默认以 NonfairSync 作为 Sync 实现,也可以通过传递 fair 属性指定不同的锁实现。
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
我们继续从看向它实现的 Lock 方法
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
- 首先它尝试了 CAS 进行加锁,如果成功,则设定当前线程为独占线程。
- 如果失败,调用 Acquire(1) 方法。
如果获取锁,线程就以失败结束,这样会大大影响系统并发:
- 当前线程手动重试获取锁,而不是进入阻塞队列,不停进入盲等状态,大大消耗 CPU 资源
- 高并发下,手动竞争资源的消耗大多是无所谓的,无法成功获取
- 当前线程也许缘分已尽,每次都无法公平地抢占到锁,线程岂不是一直处于盲等阻塞?
- 说来说去,没必要一直陷入盲等,一直盲等那就一直没有意义
我们得想办法,让当前线程进入一个排队等待机制,让线程一直等待,保留锁获取得可能,以阻塞或其他方式停留在在锁获取的流程中,这段逻辑就隐藏在了 AQS 中,我们调用 acquire 方法,也即是 AQS 实现的方法。
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
而公平锁更是一个重量级,它直接调用了 acquire 方法,直接结束了,这些背后是什么呢,这时候就要将目光聚焦在 AQS 之上。
AQS
AQS,AbstractQueuedSynchronized,抽象队列同步器。
原理
概述
如果被请求的共享资源空闲,就将当前请求资源的线程设置为有效的工作线程,并将资源设定为锁定状态。
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。AQS 通过 CLH 队列的变体实现,将暂时获取不到锁的线程加入至队列中。
CLH 队列,名字取自发明者 Craig, Landin, and Hagersten,它是一种用于实现锁的等待队列的经典算法。
CLH 队列是一种隐式链表,每个等待线程都对应于队列中的一个节点,与 显示链表不同,它的节点并不存储指向下一个节点的引用,而是旨在节点内部保存表示锁状态的标志位。
原理:
每个线程都会持有自己的节点,通过 locked:booelan 字段表示当前线程是否持有锁。
节点的连接关系并不是从前向后记录下一个节点,而是每个节点记录了前一个节点的引用,通过 自旋 方式判断前一个节点是否释放锁, 如果前一个节点释放了锁,当前锁才会结束自旋,加锁成功。
由此看,CLH 是通过隐式队列,顺序获取锁的等待队列。
好奇问了问 CHATGPT,多线程之间,CLH 如何传递前一个节点呢?它告诉我可以通过 AtomicReference # geteAndSet,可以获取当前值并设置新的值。
AtomicReference 通过 Unsafe 类来进行操作,这里就到此为止了。
AQS 实现的队列是虚拟双向队列(FIFO),将每条请求共享资源的线程封装称为了一个节点来实现锁的分配。
/**
* The synchronization state.
*/
private volatile int state;
通过 FIFO 队列对线程资源获取进行排队,通过 volatile int 类型 state 来表示同步状态,线程通过 CAS 来对 state 进行修改。
线程Node
线程以 Node 的形式,记录在了 CLH 变体队列之中。
它的属性:
- waitStatus:当前节点在队列中的状态
- thread:表示处于该节点的线程
- prev:前驱节点
- next:后继节点
- nextWaiter:指向处于 CONDITION 状态的节点(忽略 Condition Queue队列,不多介绍)
它的方法:
- predecessor:返回前驱节点,没有的话会抛出 NPE
- isShared
节点线程两种锁模式:
- SHARED:表示线程以共享的模式等待锁
- EXCLUSIVE:表示线程正在以独占的方式等待锁
而 waitSatus 有如下枚举值:
- 0:Node初始化时默认值
- CANCLELLED:为1,表示线程获取锁的请求已经取消了
- CONDITION:为-2,表示节点在等待队列中,节点线程等待唤醒
- PROPAGATE:为-3,当前线程处在 SHARED 情况下,该字段才会使用
- SIGNAL:为-1,表示后继线程不需要挂起,等待锁释放
同步状态 State
state 通过 volatile 修饰,保证了每次读取时不同线程之间的可见性,表示了当前临界资源的获取情况。
访问方法:
- protected final int getState:获取 State
- protected final void setState:设置 State
- protected final boolean compareAndSetState:CAS 设置 State
通过调用 final 方法,我们即可实现实现简单的多线程 独占模式和共享模式中加锁的过程。
对于我们自定义的同步工具,需要自定义获取同步状态与释放状态的过程,即图上方的内容,AQS 架构将这一部分交由了我们自行实现。
自定义同步器与AQS定义API
自定义同步器重写API
AbstractQueuedSynchronized 定义了一些 protected 方法,它会在 AQS 流程中被调用到,但是它并没有默认实现,而是抛出 UNsupportedOperationException。我们需要对他们进行实现,以对 同步状态 State 进行修改。
/**
* 独占方式:arg 为获取锁的次数,尝试获取锁资源
* 成功则返回 True,失败返回 False
**/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 独占方式:arg 为释放锁的次数,尝试释放锁资源
* 成功返回 True,失败返回 False
**/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享方式:arg 为获取锁的次数,尝试获取锁资源
* 负数表示失败,0表示成功且没有剩余可用资源,正数表示成功且有剩余资源
**/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享方式:arg 为释放锁的次数,尝试释放资源
* 如果释放后允许唤醒后续等待节点返回 True,否则返回 False
**/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 该线程是否正在独占资源。只有用到 Condition 才需要进行实现。
* 本篇文章没有说明,我以后有机会了解到再进行补充吧。
**/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
一般来讲的自定义同步器只会有两种情况:独占方式,共享方式。
针对上面这组API,我们只需要实现 tryAcquire-tryRelease 或者 tryAcquiredShared-tryReleaseShared。ReentrantLock 就实现了独占锁API。
API与AQS流程
这组API会在哪里被调用呢,展示独占锁状态下AQS调用API位置:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}
可以看到,在 AQS 中 final 方法 acquire,release 都调用了 未实现API,自定义即是要实现这些API来修改 State 状态,而 CLH 队列的阻塞排队等功能,交由 AQS 进行自行实现。
例如 acquire 方法:
- 首先进行 tryAcquire
- 如果 tryAcquire 返回了 false,与运算的前表示则会成功,此时进入 acquireQueued 方法,可以从字面猜测,线程节点会加入至队列中
- 如果 acquireQueued 返回 true
- selfInterrupt,线程中断
ReentrantLock与AQS的关系
下面我们将从 ReentrantLock 角度来看这些 API 实现与 AQS 流程之间的关系
首先展示下基础抽象类 Sync,它进行了基础实现,并定义了 lock 方法,作为对对外统一接口,将非公平锁获取锁 nonfairTryAcquire 作为通用方法进行默认实现进行复用。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
非公平锁
接下来是 NonfairSync 实现,可以看到 AQS#tryAcquire 被重写,但它实际调用了 Sync#nonfairTtryAcquire
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
我们可以发现,Lock 方法首先尝试 CAS 抢锁,失败则调用 AQS#acquire 方法,根据前文剖析,我们可以知道,acquire 方法实际上首先会调用自定义同步器 tryAcquire 方法,失败才会进入其他流程。
在这里的流程如下:
公平锁
公平锁重写了加锁的实现,每次加锁时,只有它位于等待队列首位,才会进行判断,调用的方法为 hasQueuedPredecessors(),关于此方法的相关问题,在下文了解了 AQS 线程节点加入等待队列后,再会重新说明。
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
// 获取CurrenThread
final Thread current = Thread.currentThread();
// 获取State
int c = getState();
// 如果State==0,即无人加锁
if (c == 0) {
// FairSync 与 NonfairSync 区别在于此处,多了一个判断
// hasQueuedPredecessors return true,才会进行加锁
// hasQueuedPredecessors 返回当前线程节点前是否还有其他线程等待同步资源
// 只有返回 false,即当前节点前不存在节点排队或者当前线程为队首节点
// 基于此实现了 排队 ---- 即公平锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 有人在占锁,尝试判断是否是当前线程重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 重入 setState && return true
setState(nextc);
return true;
}
// 获取锁失败
return false;
}
}
}
ReentrantLock#unLock
调用 sync#release,即调用至了 AQS#release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒同步队列中的下一个等待线程
unparkSuccessor(h);
return true;
}
return false;
}
AQS#release 调用了 tryRelease,这个API交由了 sync 进行了实现,并且公平锁与非公平锁逻辑相同,因为公平与非公平并不影响释放锁逻辑。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果当前线程与持有锁的线程不一致 throw Exception
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果释放 state 至 0,则设定当前独占线程为NULL
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设定当前State数,由于可重入所以 state 最终不一定为 0
setState(c);
// 返回是否完全释放了锁
return free;
}
这里我们就能看到 AQS 与 自定义API实现之间的联系,自定义只需要维护API具体实现,维护State并按照API规则返回内容, 根据API实现,AQS实现了更多的流程,相当于一种模板方法实现,在这里,如果tryRelease返回了TRUE,即锁完全释放,AQS有可能会唤醒同步线程队列中的下一个等待线程。好了,图我就懒得画了,有点麻烦
以上部分主要聚焦在通过对 State 状态的修订,来处理获取锁/释放锁流程,并且二者在之后的流程我们只是简单提了一下,如:
tryAcquire失败当前线程节点进入队列,完全释放锁后会唤醒队列等待线程。。。
这些内容将继续进行说明。
走入AQS隐藏流程
在之前的内容,我们主要聚焦在了AQS交由子类实现的API,它们只需要用来管理 State,而其他流程都交由了 AQS,而通过 State值修改,ReentrantLock 便实现了 可重入锁、公平锁、非公平锁特性,实现内容并不多,至于加锁失败线程加入队列,队列唤醒,皆交由 AQS 实现,接下来我们就要聚焦在 AQS CLH 队列变体实现的功能。
线程加入等待队列
发生位置
先来重新看一下 这个过程发生的时刻:
public final void acquire(int arg) {
if (!tryAcquire(arg) && // tryAcquire 失败
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
添加新节点
先来看第一步:addWaiter(Node.EXCLUSIVE),指定传入了 EXCLUSIVE 模式类型
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 根据当前线程构建排他锁节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 等待队列中存在节点
node.prev = pred;
// Compare tail 地址与 pred,符合则设定新 tail 为 node
if (compareAndSetTail(pred, node)) { /uj
pred.next = node; // 成功更新返回,失败则说明已有其他节点入队
return node;
}
}
enq(node); // enq 更新 node
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// 循环重试
for (;;) {
Node t = tail;
// t == null 需要手动初始化一个头节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 保证在等待队列初始化后 通过CAS添加当前Node
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
从这里我么能看出来,当 等待队列没有初始化时,CLH 变体中节点为NULL;当初始化时,头节点后建立一个虚拟节点 New Node(),并且 head 与 tail 都指向了它,当添加节点时,newNode.prev = tail,再将 tail CAS 更改为新节点,并将 tail 原节点.next -> newNode。
当 tail != null,即完成了初始化,否则将进行初始化头节点,tail 赋值其实并不担心并发问题,head 通过 CAS 将 NULL 更新为了头节点,只有更新成功的线程才可以更新 tail,并发初始化情况下,失败线程将重复获取非 NULL Tail 指针,并在此基础上继续 CAS 添加节点。
线程节点如此加入等待队列后,便要继续接下来的流程:线程已经抢锁失败了,那么线程合适再次抢锁,又如何休眠、唤醒又进行抢锁呢?这部分就是等待队列中线程出队列时机的相关问题,交由后文叙述。
再看hasQueuedPredecessors
翻译即为 存在已排队的前置任务,此被 ReentrantLock 公平锁类进行了调用,用于判断是否位于 等待队列 首元素 / 等待队列中元素为NULL,否则并不会尝试 CAS 加锁,而是再次排队。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
依次进行了如下判断:
h != t,如果 ==,则说明等待队列内部只存在头节点,公平锁可以直接尝试进行加锁
此时节点已经进行了初始化,参考上文添加新节点中 enq 方法,tail 指向了 head
((s = h.next)) == null,当 h.next 指向 NULL,说明当前队列虽然有线程请求了初始化,但是还没有节点添加到 等待队列,只有存在一个节点加入时,h.next 才会开始指向 tail,其实这个过程也是通过 tail 插入新元素实现的,只有在第一个节点添加时,head 才赋值指向了一个节点。
虽然此刻等待队列中没有元素,但是有个线程正在 addWaiter 添加节点时,相当于那个线程排队在当前线程之前,所以也要返回 true
按理说,h != t 就代表线程中有元素,但其实当第一个线程节点在尝试初始化时,它在初始化后未设置节点时,h.next == null,在这之后就没有这种情况了。
s.thread != Thread.currentThread()),s 其实是被判别式前一项初始化了,它指的是队首元素,当队首元素非当前线程,返回 true
线程什么时候阻塞
继续回到 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter,即线程节点添加至等待队列后,返回了创建的线程节点,并传递至了 acquireQueued 函数,接下来我们着眼于这个方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 标记是否在等待过程出现中断
boolean interrupted = false;
for (;;) {
// 获取当前节点 Prev 节点
final Node p = node.predecessor();
// 如果当前节点为队列头节点 并且 尝试获取锁成功
if (p == head && tryAcquire(arg)) {
// 头指针设定为当前节点
setHead(node); // 将Node设定为虚拟头节点,并清空Node属性,但是并未修改waitStatus
p.next = null; // help GC, For GC Prev Node
failed = false; // 记录成功
return interrupted; // 返回是否中断
}
// 如果上述步骤失败,有两种可能
// 1. prevNode 非头节点
// 2. prevNode 为头节点,但是当前节点抢锁失败
// 首先调用方法判断是否需要阻塞(条件:PrevNode.waitSatus = -1,即 Singal)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 线程阻塞
interrupted = true; // 记录进行过中断,这个动作发生时,线程已经结束了阻塞
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/*
* 将Node设定为虚拟头节点,并清空Node属性,但是并未修改waitStatus
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/*
* 根据前驱节点判断当前节点是否需要阻塞
* Only on prevNode.waitStatus = Node.Signal
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取PrevNode waitStatus
int ws = pred.waitStatus;
// 如果前节点为唤醒状态
if (ws == Node.SIGNAL)
return true; // 返回需要阻塞
// 如果前节点为取消状态
if (ws > 0) {
// 向前遍历寻找一个非取消状态的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 消除取消状态的节点
pred.next = node;
} else {
/*
* waitStatus 可能为 0/PROPAGATE
* 将前任节点设定为Singal
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回不需要阻塞
return false;
}
/*
* gua线程阻塞
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
我们来通过流程图梳理一下这个函数调用的过程:
总结一下,只有当前节点为等待队列第一个节点线程才会被调用抢锁,成功后返回,否则将获取前一个节点,根据前一个节点状态,有如下情况:
- SIGNAL:需要阻塞
- 初始态 / PROPAGATE,当前线程会设定前节点为 SIGNAL,如果当前节点非第一个排队线程节点,下一循环依旧会将当前线程阻塞
- 前一个节点为 CANCELED,遍历清空取消节点,再次循环,判断状态
之后再次循环判断(可能是被阻塞唤醒),再次进入上述流程。
在这个流程退出后,线程有可能并没有进入阻塞就直接 tryAcquire 成功,也有可能是阻塞唤醒后 tryAcquire 成功,此时节点已经抢锁成功并进行了返回。
那么我还有问题:
- SIGNAL指定了当前节点下一个节点需要unpark,它本身值就由下一个节点的线程自行设定,可是队列中哪里来的 CANCELED 节点
- 线程进入阻塞后,什么时候会被唤醒呢?
线程退出等待队列
CANCELED 节点退出
在上面的代码讲解中,我们没有涉及到 CANCELED 节点,但是我们漏掉了一个方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Finnally 代码块执行了 cancelAcquire,如果 tryAcquire 代码执行出现异常,则会进入 finally 代码块。
异常情况:
当前加锁线程抛出了中断/超时异常(这块写在了 在阻塞唤醒之后 -- 线程中断异常)
其他:如 ReentrantLock 中是在重入次数加锁 State < 0 时抛出了 Error 异常
下面是详细代码:
/**
* Cancels an ongoing attempt to acquire.
*/
private void cancelAcquire(Node node) {
// 忽略NULL节点
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
// 通过前驱节点,消除取消状态Node
while (pred.waitStatus > 0 )
node.prev = pred = pred.prev; // 链式赋值 pred = pred.prev; node.prev = pred;
// 现在 pred.waitStatus <= 0 && node.prev = node;
// prev.waitStatus != Node.CANCELED && (pred.next == node || prev.node.waitStatus == CANCELED )
Node predNext = pred.next;
// 当前节点!!!设定为CANCELED
node.waitStatus = Node.CANCELLED;
// 如果当前节点为 TAIL CAS设定pred为尾部
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); // 并将尾部节点next设定为null
} else {
// 此时node!=tail, 当前为队列中的中间节点
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head && // 当前node不为等待队列首元素
// 当前节点是需要被唤醒的,当前线程可能会自行设定SIGNAL
// 当前节点为0/PROPAGATE/,,, 并设定prev.waitStatus==SIGNAL成功
((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
// prev非队首元素(thread!=null),还是保证了当前node非队首元素,只有head.thread == null
&& pred.thread != null) {
Node next = node.next; // 获取下一个线程节点
if (next != null && next.waitStatus <= 0) // 如果非空并且waitStatus非取消(INIT,SIGNAL,PROPAGATE)
compareAndSetNext(pred, predNext, next); // 讲前节点连接至node下一个节点(即从等待队列中去除了当前线程节点)
} else {
// node为队列首个等待节点 || 非首元素时设定prev.waitStatus==SIGNAL失败
// 唤醒node后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
/*
* 唤醒Node后首个waitStatus <= 0 的节点线程
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 如果s 为 NULL,则通过 tail 指针向前找
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
我们进行了以下操作:
Node.waitStatus = CANCELED
清除里Node前CANCELED节点
根据当前Node位置(如果为TAIL,只需要移除自己;第一个节点或队中节点,需要将前后进行关联):
TAIL,tail = prev,prevNode.next = null
HEAD.next,唤醒了下一个节点,node.next = node,help GC
等待队列中元素,如果 prev.Node.waitStatus 为/主动设定成功为 SIGNAL,成功则 prev.next = node.next
如果设定失败,唤醒下一个节点
最后 node.next = node,help GC
有些细节:
如果当前取消节点为首节点,只进行唤醒下一个节点:
因为下一个节点唤醒后,将会主动将 prev 指向前一个非 CANCELED 节点,在这里他唤醒后会主动将 prev 指向了 head(shouldParkAfterFailedAcquire),便位于了队首,再次进行 tryAcquire 成功后,head 便更新为了 下一个节点
队中元素,只是将prev.next = next.next,next线程节点会在唤醒时在方法shouldParkAfterFailedAcquire由主动更新prev指针
队中元素如果更新前线程节点失败,也会唤醒下一个节点,唤醒下一个线程节点来更新前节点 waitStatus。
当前节点取消时(cancelAcquire),会主动移除当前节点, 它将维护前指针的next指针,指向正确位置,但是在 shouldParkAfterFailedAcquire方法 和 cancelAcquire 中,它都会存在一步,主动更新了自己NODE的prev指针正确性:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// .......
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// ......
}
// ...
}
private void cancelAcquire(Node node) {
// .....
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// ......
}
两者都操作了node.prev指针指向,呃,似乎有些跑题,这两个方法都针对了当前Node,假如回到 cancelAcquire 我们移除当前节点,顺便更新了node.next.prev,当前节点线程与后续节点线程则会可能发生冲突,两者更新prev冲突,可能会导致赋值prev了也是一个无意义的节点,所以。。我认为prev指针,只会由当前线程自身去更新自身prev指针。
仔细想了想,next 指针在目前流程中只会用于在 cancel 节点时,去尝一次试唤醒下一个线程,如果 next 指向无效NODE,再会 TAIL 遍历获取所有节点以获取只需要唤醒的正确 Node。
prev 指针交由了 持有该引用的节点自身去修改,我认为它只用于了去除节点中无效NODE,帮助GC,减少无用判断等,而交由其节点在 tryAcquire 前自行更改 prev 更能保证正确性以及最新有效性。
Release 释放锁
这段话是在看完这段代码后添加的,释放锁并不是节点退出等待队列,节点等待队列实际在线程节点抢到锁的时候就发生了(也或许当前线程节点根本就没有进入到队列之中)。
ReentrantLock 不同锁实现都使用了 AQS 的模板方法 与 Sync 对其模板抽象方法的实现。
首先来看 Sync 对 tryRelease 的实现,它实现对 可重入锁 State 状态的更新:
// Sync # tryRelease
protected final boolean tryRelease(int releases) {
// 获取释放后state
int c = getState() - releases;
// 如果thread与独占线程不一致 异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // c == 0,即重入锁全部释放
free = true;
setExclusiveOwnerThread(null); // 更改独占线程标记
}
setState(c); // 更新 state
return free; // 返回锁是否全部释放
}
这段代码就较为简单,因为这个方法调用并不会考虑并发问题,Thread.currentThread() == getExclusiveOwnerThread(),说明是加锁线程自己在操作,并不会考虑临界资源竞争,所以只是普通值设定。
接下来看 AQS 的模板方法流程:
// AQS # release
public final boolean release(int arg) {
// 调用tryAcquire实现
if (tryRelease(arg)) { // 成功则说明重入锁已被释放
Node h = head;
// 如果 head != null 并且 head.waitStatus 不为 Init
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 前文提到过该方法: 唤醒Node后首个waitStatus <= 0 的节点线程
return true; // Success
}
return false; // failed
}
当重入锁被释放,这时候就要考虑是否要唤醒后方等待的线程。
唤醒条件: h != null && h.waitStatus != 0,我们挨个解读
若 h == null,说明线程未初始化,可能当前锁占直接进行了抢锁并且成功,所以没有入队(NofairLock),此时不需要唤醒
h != null,即节点中存在了线程,可能只有等待节点(此时 head 为当前线程节点),也有可能节点为空(没有等待节点)。。。
h.waitStatus 只会因为后续线程需要阻塞才会被设定,可以回到 线程什么时候阻塞 中,讲到了 shouldParkAfterFailedAcquire 方法的讲解来看
- waitStauts == 0,后继节点线程还未被阻塞 / 还妹有后继线程节点
- waitStauts < 0,可能已经由于后继节点再次循环发现 prev != head,进行了线程休眠
此时我有个疑惑,为什么唤醒 head 后的线程节点而不是当前线程对应节点的后继节点,通过全局搜索 SetHeader 函数,我终于想起来了!只要是线程进入了队列,再次唤醒抢锁后,此时 node.prev == head,并且在抢锁后,head = node(详见 线程什么时候阻塞 acquireQueued 方法分析),也就是说:
如果当前 h != null,并且当前 Node 为入队后位于 head.next 后才抢锁成功,则当前 head 会指向了 node,这就能解释为什么唤醒了 head 后继节点。在抢到锁后,FIFO,移除了抢到锁的线程节点,将其作为了 Head。
唤醒线程 unparkSuccessor
/*
* 唤醒Node后首个waitStatus <= 0 的节点线程
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 如果s 为 NULL,则通过 tail 指针向前找
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这里面有个小细节,如果当前 s 为 NULL || s.watiStatus > 0 时,会通过 tail 向前找到有意义且位于队列最前方的线程节点进行唤醒,Why?
先说结论:
1. 通过 next 指针向后遍历,在高并发情况下,也可能会跳过新增尾节点。
2. 当 s.watiStatus > 0,即 nextNode 已经 Cancdeled,nextNode 将会自连接自己(help GC),所以 next 指针遍历将进入循环。
再回看 addWaiter 代码(# 添加新位置):
private Node addWaiter(Node mode) {
// 根据当前线程构建排他锁节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 等待队列中存在节点
node.prev = pred;
// Compare tail 地址与 pred,符合则设定新 tail 为 node
if (compareAndSetTail(pred, node)) { /uj
pred.next = node; // 成功更新返回,失败则说明已有其他节点入队
return node;
}
}
enq(node); // enq 更新 node
return node;
}
当节点进入线程,tail 首先通过 CAS 进行抢占设定,并提前将其prev关联至了前一个节点,如果 CAS 失败,会进入 enq 方法通过 循环CAS 设定。
for(...) {
// ...
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
// ...
}
乍一看没问题,但是如果极端情况,在 CAS 成功时,此时 prev.next 并未指向新节点,如果此时 unpark 方法 通过 next 指针进行遍历,便会跳过新增节点的状态(虽然是看美团技术博客提到了这种情况,但是在这种状态下,倘若真这么极端,此时这个新增线程也并未休眠,我认为也不需要休眠哈哈哈哈。。。)
继续讲 当 s.watiStatus > 0,即 nextNode 已经 Canceled,nextNode 将会自连接自己(help GC),所以 next 指针遍历将进入循环。
在 CANCELED 状态下,next 指针会被节点自身进行断开连接并连上自身,通过 prev 才是正确的遍历方式!你如果通过 next 遍历岂不是最终进入了死循环。
综上:既然增加新节点是按照 TAIL 来CAS更新连接上新节点,那么遍历也应该从它来遍历嘛。
在阻塞唤醒之后
线程中断
测试代码来看一看线程中断:
/**
* @author yancy0109
* @date: 2023/11/27
*/
public class InterruptTest {
private static class Method implements Runnable {
private boolean interrupt = false;
@Override
public void run() {
try {
LockSupport.park(this);
System.out.println("Thread "+ Thread.currentThread().getId() +" has been unpark...");
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
interrupt = true;
System.out.println("ThreadId: " + Thread.currentThread().getId()
+ " has been Interrupted");
} finally {
if (Thread.currentThread().isInterrupted()) {
System.out.println("线程中断处理: " + Thread.currentThread().getId());
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Method());
t1.start();
Thread.sleep(1000);
t1.interrupt();
Thread.sleep(3000);
System.out.println("--------------");
Thread t2 = new Thread(new Method());
t2.interrupt();
t2.start();
Thread.sleep(1000);
LockSupport.unpark(t2);
}
}
// Thread 12 has been unpark...
// ThreadId: 12 has been Interrupted
// 线程中断处理: 12
// --------------
// Thread 13 has been unpark...
LockSupport.part 能够阻塞调用线程,有也可以通过 unpark 唤醒阻塞线程,而 interrupt 是对 线程 的中断标志位进行设定,它不会引起线程异常,但是假如线程处于阻塞状态被调用中断,线程是将被唤醒!如果线程在阻塞态被唤醒,我们可以通过检查方法来处理中断请求,以保证对外部中断请求的响应(也可以添加当前线程中断后的善后逻辑)。
阻塞唤醒-忽略中断请求
前面讲述了线程节点是如何加入等待队列,并且当前线程如何进入休眠,而后有又涉及到了线程退出等待队列后,会对阻塞线程进行了唤醒。
并发情况下,节点都会被依次从阻塞队列中唤醒继续执行,那么在唤醒之后发生了什么呢?
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我们的目光应该放在 parkAndCheckInterrupt 方法内
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park(this) 将当前线程进行了阻塞,最终返回了 Thread.interrupted(返回线程是否被中断,并恢复当前线程中断记录)。
如果线程被中断了,则会将 acquireQueued interrupted 标记位设定为了 true 用于返回,但是在流程中我们并不会对中断进行任何的操作,而只对 tryAcquire 异常进行响应。
继续向外看,在获取是否中断后,线程会重新设定中断标志位:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt(); // 设定中断
}
其实在这里我有了个问题:为啥在方法内部每次获取中断请求都要清空,而在外部最终返回又重新设定了中断呢?
当前节点从阻塞状态唤醒有两种情况:
- 前节点释放锁唤醒节点
- 当前获取节点线程被其他线程调用中断方法
就当前方法而言,线程唤醒后都会尝试再次获取锁,假如是由中断唤醒,节点则又会重新进入休眠,所以每次从休眠结束,返回 中断状态 会使用 interrupted 进行状态清空,下次唤醒中断得到了仍是最近一次阻塞是否有中断请求。
从方法外部我倒是没有看到有什么影响,所以问题不大~只是返回了是否存在中断,由于内部清空了中断状态,外部总归要再次重新中断。
阻塞唤醒-响应中断请求
只看 AQS 实现了:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
能发现,每次 parkAndCheckInterrupt ,假如检测到了中断,会抛出 InterruptedException,这样就会导致当前方法进入 finally 代码块,进行 cancelAcquire 流程
总结
至此,AQS 几个部分我们就大概进行了梳理:
- AQS 内部数据结构
- AQS 如何被自定义实现
- 对外API
- 实现抢锁方法
- AQS 请求如何进行加入排队(此时已是抢锁失败)
- 如何添加新节点
- 线程何时进行阻塞
- AQS 请求如何从排队中退出
- CANCELED 节点退出
- RELEASE 退出
- 唤醒后续节点线程
- AQS 从阻塞中唤醒后发生了什么,以及 对 CANCELED 节点中中断取消进行了补充。
AQS 应用
CountDownLatch
总览
之前看过对这个类使用,可以方便主线程与执行任务线程之间的同步(让主线程等待其他线程结束任务后再被唤醒)。
有了之前经验我们就可以直接看 API 实现辣
先看看 CountDownLatch 对外部提供的方法吧~
我们主要用的有两个:
public void countDown() {
sync.releaseShared(1); // 释放共享锁
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 获取共享锁 响应中断
}
// 其他超时等方法就不暂不关注
很明显,它的结构也类似 ReentrantLock,内部提供了一个 Sync,外部实现一定方法来调用内部实现方法。
继续看 Sync 实现:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 初始化一个State
// 代表了需要等待结束的线程数
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 当当前 State == 0,即认为获取锁成功,返回
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // 如果当前锁
}
// 修改共享锁 State 的实现逻辑
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
由此我们可以猜测,主线程等待其他线程释放前,会在队列中阻塞等待,直至 State == 0,即可抢锁成功,此时代表了锁已经全部释放。
在外部其他线程只需要调动 countDown 即可让主线程唤醒,可以发现它只是release锁,并没有加入队列,那主线程是如何被唤醒的呢?
那么我们就去看一下,CountDownLatch # Sync 中,共享锁加锁与释放锁的逻辑吧~
acquire
先看 await 方法,我们就是通过调用它,让主线程进入休眠
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
内部是 AQS 方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 尝试获取锁
doAcquireSharedInterruptibly(arg);
}
其中就调用到了上层 CountDownLatch 提供的 API了
// 当当前 State == 0,即认为获取锁成功,返回
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // 如果当前锁
}
即 当前 state != 0,就要继续接下来的操作。这里也可以再做理解,return > 0 即为抢锁成功,return < 0,即为抢锁失败
那么根据我们之前的经验,线程该做什么了呢?Yes,线程节点入队,线程判断是否休眠。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 添加SHARE节点
boolean failed = true; // 标记是否成功
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 当前节点为 head
int r = tryAcquireShared(arg); // 尝试 修改 State
if (r >= 0) { // 成功,则清除节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; // 返回
}
}
// 是否休眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
休眠逻辑与之前类似了,先将前节点 waitStatus 设定为 SIGNAL,然后再次循环重新判断,最终进入休眠。
好了其他已经不重要了,我们的主线程如今通过 await 等待 state == 0,才能抢锁成功,接下问题来了,什么时候 state == 0,主线程又如何被唤醒的呢?
release
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
先调用 tryReleaseShared(CountDownLatch#Sync 实现的API)
// 修改共享锁 State 的实现逻辑
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState(); // 获取当前 state
if (c == 0) // 如果 state == 0,无法释放
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) // CAS 修改 State
return nextc == 0; // 返回是否 nextc == 0
}
}
方法实现也很简单,CAS 修改 STATE。
最终返回 nextc == 0 时,则说明应该去唤醒下一线程,与 ReentrantLock # Sync 实现的功能方式一致(ReentrantLock # Sync,只有当当前State == 0,才会触发唤醒流程,因为它是可重入,只有 state 为 0,才说明当前线程占锁结束)
那么是如何进行唤醒的呢?
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 节点不为空
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 前文加锁可以得知,目标主线程阻塞时,head.waitStatus 应该等于0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 如果 CAS 未成功
continue; // loop to recheck cases // 重新检查
unparkSuccessor(h); // 唤醒 head 后续节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果 WS == 0,设定为 Node.PROPAGATE
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
刚看完这点我有个疑问,会不会主线程在准备进入休眠过程时,其他线程刚好处理完发起唤醒请求,又正好错过它休眠的调用,导致主线程一直陷入休眠呢?
毕竟线程检测是否休眠和休眠是两步,前面我也一直没有相关这个问题,于是进行了测试与查找。
下面进一步说明。
补充休眠与唤醒
测试代码如下:
public class ParkTest {
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println("线程Sleep结束");
LockSupport.park();
System.out.println("线程Park结束");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
LockSupport.unpark(t1);
System.out.println("主线程unpark成功");
}
}
// out
// 主线程unpark成功
// 线程Sleep结束
// 线程Park结束
原以为 t1 线程会持续阻塞,没想到 park 方法却没有生效。
对相关问题进行了搜索,答案如下:
Unpark 会给线程发放许可,而 park 会进行许可消耗,所以我们提前唤醒,也不会导致该线程休眠无法唤醒(它会消耗许可不进入休眠),所以也不会存在我之前的疑问。
总结
CountDownLatch 通过固定 State 属性,将等待线程放入等待队列阻塞等待,其他线程通过方法释放修改 state,当 state 修改为 0,AQS 会触发唤醒后续节点流程,便对使用 await 的线程进行了唤醒。