这段代码其实逻辑很简单,就是通过 CAS 乐观锁的方式来做比较并替换。上面这段代码的意思是,如果当前内存中的?state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返回 false。
这个操作是原子的,不会出现线程安全问题。
lock 方法的第一步很好理解,但第二步获取锁失败后,后续的处理策略是怎么样的呢?这块可能会有以下思考:
- 某个线程获取锁失败的后续流程是什么呢?有以下两种可能:
-
将当前线程获锁结果设置为失败,获取锁流程结束。这种设计会极大降低系统的并发度,并不满足我们实际的需求。所以就是 2 这种流程,也就是 AQS 框架的处理流程。
-
存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。
-
对于问题 1 的第二种情况,既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
-
处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
-
如果处于排队等候机制中的线程一直无法获取锁,还是需要一直等待吗,还是有别的策略来解决这一问题?
可以看一下 else 分支的逻辑,acquire 方法:
void acquire( arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
Acquire 方法是 AQS 中的核心方法。这里它干了三件事情:
-
tryAcquire:会尝试再次通过 CAS 获取一次锁。
-
addWaiter:将当前线程加入上面锁的双向链表(等待队列)中
-
acquireQueued:通过自旋,判断当前队列节点是否可以获取锁
?
tryAcquire 方法
下面详细看下?NonfairSync 的?tryAcquire 方法,该方法会直接调用?nonfairTryAcquire 方法,代码如下:
boolean nonfairTryAcquire(final Thread current = Thread.currentThread();
int c = getState();
// c=0 说明此时没有获取没有线程占有锁
if (c == 0) {
// CAS 操作去获取锁
if (compareAndSetState(0return ;
}
}
// 已经获取锁了,可以继续重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) overflow
new Error("Maximum lock count exceeded");
setState(nextc);
;
}
false;
}
简单来说上面的方法主要就是看能不能获取到锁,不能获取到就返回 false,然后就会调用 addWaiter 添加到等待队列中,具体代码如下:
private Node addWaiter(Node mode) {
Node node = Node(mode);
死循环
for (;;) {
Node oldTail = tail;
if (oldTail != ) {
通过unsafe 类来对 Node.prev 节点赋值
U.putObject(node,Node.PREV,oldTail);
更新 tail 节点为 node,该操作对其他线程是可见的,确保每次只有一个线程可以更新成功
if (compareAndSetTail(oldTail,node)) {
oldTail.next = node;
node;
}
} {
initializeSyncQueue();
}
}
}
cas 设置 tail 节点
compareAndSetTail(Node expect,Node update) {
初始化 head 和 tail 节点
initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this,HEAD,1)">null,(h = Node())))
tail = h;
}
addWaiter(Node node) 方法通过采用死循环方案,确保将该节点设置尾成尾节点。
-
如果为尾节点不为空,需要将新节点添加到?oldTail?的 next 节点,同时将新节点的 prev 节点指向 oldTail;
-
如果当前队列为空,需要进行初始化,此时 head 结点和 tail 节点都是 h =? new Node () 实例;此时?oldTail = h 不为空,node 的 prev 为 oldTail,?oldTail 的?next 是 node。
这里代码很简单,但是却通过 CAS 操作保证了多个线程一起添加节点的时候,只有一个线程可以成功。
此外,入队操作还有个 enq 方法,这个方法和?addWaiter 一样的,就是返回值不一样,具体如下:
private Node enq(Node node) {
) {
U.putObject(node,oldTail);
oldTail;
}
} {
initializeSyncQueue();
}
}
}
但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。
acquireQueued
将添加到队列中的 Node 作为参数传入 acquireQueued 方法,这里面会做抢占锁的操作:
boolean acquireQueued(final Node node,1)">boolean interrupted = ;
(;;) {
// 获取前一个节点,为空,抛出 NPE
final Node p = node.predecessor();
// p==head 说明 node 是队列中的第一位,这时候还会再去获取一次锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功后,node 变成 head 节点,凡是 head 节点,其 thread 和 pre 都为空,next 保持不变。
setHead(node);
p.next = null; help GC
// 注意这个中断记录是在获取锁之后才会被返回的,也就是说获取锁之后,才有资格处理中断
interrupted;
}
// 获取锁失败,说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)
// 或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p,node) &&
parkAndCheckInterrupt())
// 说明在这个过程中发生过中断,需要补上
interrupted = ;
}
} (Throwable t) {
cancelAcquire(node);
t;
}
}
总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
下面来看获取失败后的处理,具体在看下面的代码:?
shouldParkAfterFailedAcquire(Node pred,Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// This node has already set status asking a release to signal it,so it can safely park.
;
// 前节点处于取消状态,跳过,获取再前一个的节点状态
if (ws > 0) {
do {
// 这里将取消状态的节点删除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 同时设置下一个节点为 node
pred.next = node;
} {
pred.compareAndSetWaitStatus(ws,Node.SIGNAL);
}
;
}
parkAndCheckInterrupt() {
LockSupport.park();
如果之前中断了,为 true,并清除中断标志
Thread.interrupted();
}
如果 shouldParkAfterFailedAcquire 返回了true,则会执行:parkAndCheckInterrupt()
方法,它是通过 LockSupport.park(this) 将当前线程挂起到 WATING 状态,它需要等待一个中断、unpark 方法来唤醒它,通过这样一种 FIFO 的机制的等待,来实现了 Lock 的操作。
LockSupport 类是 Java6 引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数:
native unpark(Thread jthread);
void park(boolean isAbsolute,1)">long time);
unpark 函数为线程提供“许可( permit )”,线程调用 park 函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。
permit相当于0/1的开关,默认是 0,调用一次 unpark 就加 1 变成了 1。调用一次 park 会消费 permit,又会变成 0,变成 0 不会影响原有线程的运行。 如果再调用一次 park 会阻塞,因为 permit 已经是 0 了。直到 permit 变成 1。这时调用 unpark 会把 permit 设置为 1 。每个线程都有一个相关的 permit,permit 最多只有一个,重复调用 unpark 不会累积。
这里需要说明的一点就是:acquireQueued 方法内部是一个死循环,?shouldParkAfterFailedAcquire 和??parkAndCheckInterrupt 也都在这里面。这里对这个逻辑再整理下:
-
?acquireQueued 本意是通过无限循环让队列中的第一个节点尝试去获取锁;当一个 node 被加入到队列中的时候,就会促发这个无限循环;
-
如果等待队列中的第一个节点获取到锁了,就会退出循环;
-
如果 node 是第一个加入等待队列的,此时 node 的 prev 节点是 head ( new Node() ),node 会先去获取锁,失败后,因为 prev 的?waitStatus = 0,这时候将其?waitStatus 设置为 -1,然后再次循环,再获取锁失败就会调用?parkAndCheckInterrupt 阻塞当前线程;
-
shouldParkAfterFailedAcquire 过程中会将队列中处于?CANCELLED = 1 的节点删除。也就是说每添加一个节点,获取锁失败后,都可能会对队列做一遍整理;
-
被加入队列后的线程是不会响应中断的。当node 获取锁之后,如果线程在等待中被中断过,需要将这个中断补上,这样线程就可以响应中断操作,比如此时被取消了。
cancelAcquire 方法
如果在获取锁的过程中,发生了错误,就会响应??cancelAcquire(node) 方法。下面具体看下方法的源码,看看它做了啥:
cancelAcquire(Node node) {
Ignore if node doesn't exist
if (node == )
;
node.thread = 过滤掉那些被取消的节点
Node pred = node.prev;
)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node,pred)) {
pred.compareAndSetNext(predNext,);
} {
ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws,Node.SIGNAL))) &&
pred.thread != ) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext,next);
// 走到这里,已经把 node 从 next 队列里面删除了,但是保留了 prev 指针
} // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
// 这里修改了 node 的next 指针,但是保证了 prev 指针的不变
node.next = node; help GC
}
}
当前的流程:获取当前节点的前驱节点,如果前驱节点的状态是 CANCELLED,那就一直往前遍历,找到第一个 waitStatus <= 0 的节点,将找到的 Pred 节点和当前 Node 关联,将当前Node 设置为 CANCELLED。
根据当前节点的位置,考虑以下三种情况:
-
当前节点是尾节点。
-
当前节点是Head的后继节点。
-
当前节点不是Head的后继节点,也不是尾节点。
根据上述第二条,我们来分析每一种情况的流程。
当前节点是尾节点。

当前节点是 Head 的后继节点。

当前节点不是 Head 的后继节点,也不是尾节点。

通过上面的流程,我们对于 CANCELLED 节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?什么情况下会对 Prev 指针进行操作?
执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改 Prev指针,有可能会导致 Prev 指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。 shouldParkAfterFailedAcquire 方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev 指针比较安全。
{
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
?
unparkSuccessor
下面看下?unparkSuccessor 的逻辑:
unparkSuccessor(Node node) {
/*
* If status is negative (i.e.,possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
node.waitStatus;
if (ws < 0)
将传入的参数node的等待状态变为 0
node.compareAndSetWaitStatus(ws,0);
* Thread to unpark is held in successor,which is normally
* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual
* non-cancelled successor.
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = ;
从后往前寻找那些没有被取消的线程
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != )
LockSupport.unpark(s.thread);
}
?unparkSuccessor 的作用如下:
这个方法的找到一个需要唤醒的节点,看下后面怎么处理:
// LockSupport
unpark(Thread thread) {
if (thread != )
U.unpark(thread);
}
发现也是通过 unSafe 类来处理的。这里调用了 unpark 方法,那肯定有地方调用了 park 方法,这个是在??parkAndCheckInterrupt 里调用的。
?FairSync lock 公平锁
到这里,NonfairSync lock 的逻辑就讲完了 。那 FairSync lock 是如何保证公平的呢?且看代码:
long serialVersionUID = -3000897897090466540L;
// 加锁
lock() {
acquire(1);
}
//
// 当前没有线程获取锁
// 当前线程处于 head 之后,或者队列为空,就会去调用 CAS 获取锁,否则是没有机会获取锁的
if (!hasQueuedPredecessors() &&
compareAndSetState(0 // 当前线程就是独占线程,可重入
if (nextc < 0)
;
}
}
可见对于公平锁,新加入的节点有以下几种操作:
-
node 能获取锁的情况有两种:1 是当前没有线程持有锁,并且队列为空,或者 node 是 head 的下一个节点;2 是 node 本身持有锁,可重入。
-
在情况 1 后的 node,都将会被加入到队列中去;
这里就可以看出来,公平锁完全是按照先来后到的顺序进行排列等候的,不会给你机会去通过 CAS 操作获取锁的。对于非公平锁,每个线程去获取锁的时候都有机会去尝试获取锁的,成功锁就是你的,不成功就加入到队列中去。
?
unLock 方法?
讲完了 lock 方法以后,接下去讲 unLock 方法了。来看下 unlock 的逻辑:
unlock() {
sync.release(1);
}
释放锁
boolean release( arg) {
// true 表示成功释放,就会唤醒下一个线程
(tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
;
}
;
}
boolean tryRelease( releases) {
int c = getState() - releases;
// 确保是当前线程,非当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
IllegalMonitorStateException();
boolean free = ;
) {
free = ;
setExclusiveOwnerThread();
}
// 更新同步状态
setState(c);
free;
}
unlock 的逻辑比较好理解,就是释放锁,更新同步状态,然后唤醒下一个等待线程。
其中 tryRelease 动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是 1 ),如果结果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它的线程有机会进行执行。
在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock() 的次数与 lock() 的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true。
hasQueuedPredecessors 是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回 False,说明当前线程可以争取共享资源;如果返回 True,说明队列中存在有效节点,当前线程必须加入到等待队列中。
java.util.concurrent.locks.ReentrantLock
hasQueuedPredecessors() {
The correctness of this depends on head being initialized
before tail and on head.next being accurate if the current
thread is first in queue.
Node t = tail; Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
看到这里,我们理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());为什么要判断的头结点的下一个节点?第一个节点储存的数据是什么?
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位,这个可以从列表的第一次初始化也可以看出来。
真正的第一个有数据的节点,是在第二个节点开始的。当h != t时: 如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回 True。
-
如果(s = h.next) != null,说明此时队列中至少有一个有效节点。
-
如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;
-
如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。
对于 unparkSuccessor 逻辑前面讲过了,就是唤醒下一个节点去获取锁。当然在唤醒过程中,对于非公平锁,其他线程是有机会去抢占的
到这里,就把加锁和解锁的逻辑都讲完了。
?
Lock 和 unLock 总结
以非公平锁为例,这里主要阐述一下非公平锁与 AQS 之间方法的关联之处,具体每一处核心方法的作用都已经在上文阐述清楚了。

为了帮助大家理解 ReentrantLock 和 AQS 之间方法的交互过程,以非公平锁为例,将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。

加锁:
-
通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。
-
会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方法,本质上都会执行 AQS 的 Acquire 方法。
-
AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。
-
tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。
解锁:
-
通过 ReentrantLock 的解锁方法 Unlock 进行解锁。
-
Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。
-
Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现,tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
-
释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。
通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。

?
到这里,基本就讲完了。
?
关于 Lock 及 AQS 的一些补充:
1、 Lock 的操作不仅仅局限于 lock()/unlock(),因为这样线程可能进入 WAITING 状态,这个时候如果没有 unpark() 就没法唤醒它,可能会一直“睡”下去,可以尝试用 tryLock()、tryLock(long,TimeUnit) 来做一些尝试加锁或超时来满足某些特定场景的需要。例如有些时候发现尝试加锁无法加上,先释放已经成功对其它对象添加的锁,过一小会再来尝试,这样在某些场合下可以避免“死锁”哦。
看下相关代码:
ReentrantLock
tryLock() {
// 调用的是非公平锁来抢占锁
return sync.nonfairTryAcquire(1);
}
InterruptedException {
// 超过一定时间后再去获取锁
return sync.tryAcquireNanos(1 AQS
// 拿不到锁时,等一段时间再拿不到就退出
boolean doAcquireNanos(int arg,1)"> nanosTimeout)
// 时间 <=0 直接返回
if (nanosTimeout <= 0Llong deadline = System.nanoTime() + nanosTimeout;
// 将当前线程加入到队列中
final Node node = addWaiter(Node.EXCLUSIVE);
(;;) {
// 这里如果当前线程是第一个有效节点,直接尝试去获取锁
tryAcquire(arg)) {
setHead(node);
p.next = help GC
;
}
nanosTimeout = deadline - System.nanoTime();
// 时间到了之后,就退出等待队列
cancelAcquire(node);
;
}
// 需要等待,并且时长大于 1000L
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
// 阻塞一定时间,再去获取锁
LockSupport.parkNanos( (Thread.interrupted())
InterruptedException();
}
} t;
}
}
2、 lockInterruptibly() 它允许抛出 InterruptException 异常,也就是当外部发起了中断操作,程序内部有可能会抛出这种异常,但是并不是绝对会抛出异常的。
ReentrantLock
void lockInterruptibly() InterruptedException {
sync.acquireInterruptibly(1 AQS
void acquireInterruptibly( arg)
// 如果发生了中断,就抛出中断异常
(Thread.interrupted())
InterruptedException();
//
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
可中断的
void doAcquireInterruptibly( arg)
InterruptedException {
// 再次看能不能获取锁
// park 前发现中断了,抛出中断错误
parkAndCheckInterrupt())
t;
}
}
可以发现,基本上可以中断的点,都会去判断线程是否有中断标志,有的话,直接抛出中断异常,但是在加入队列过程,和获取锁的过程是不响应中断的,只有之前之后会做中断判断。
3、 newCondition() 操作,是返回一个 Condition 的对象,Condition 只是一个接口,它要求实现 await()、awaitUninterruptibly()、awaitNanos(long)、await(long,TimeUnit)、awaitUntil(Date)、signal()、signalAll() 方法,AbstractQueuedSynchronizer 中有一个内部类叫做 ConditionObject 实现了这个接口,它也是一个类似于队列的实现,具体可以参考源码。大多数情况下可以直接使用,当然觉得自己比较牛逼的话也可以参考源码自己来实现。
4、 在 AQS 的 Node 中有每个 Node 自己的状态(waitStatus),我们这里归纳一下,分别包含:
-
SIGNAL 从前面的代码状态转换可以看得出是前面有线程在运行,需要前面线程结束后,调用 unpark() 方法才能激活自己,值为:-1
-
CANCELLED 当 AQS 发起取消或 fullyRelease() 时,会是这个状态。值为 1,也是几个状态中唯一一个大于 0 的状态,所以前面判定状态大于 0 就基本等价于是 CANCELLED 的意思。
-
CONDITION 线程基于 Condition 对象发生了等待,进入了相应的队列,自然也需要 Condition 对象来激活,值为 -2。
-
PROPAGATE 读写锁中,当读锁最开始没有获取到操作权限,得到后会发起一个 doReleaseShared() 动作,内部也是一个循环,当判定后续的节点状态为 0 时,尝试通过CAS自旋方式将状态修改为这个状态,表示节点可以运行。
-
状态 0 初始化状态,也代表正在尝试去获取临界资源的线程所对应的 Node 的状态。?
?
总结
本文基于 ReentrantLock 非公平锁的独占锁源码来分析了 AQS 的内部实现原理。在获得同步锁时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg) 方法释放同步状态,然后唤醒头节点的后继节点。
?
参考文章
从ReentrantLock的实现看AQS的原理及应用
AQS的原理浅析
J.U.C|同步队列(CLH)
深入分析AQS实现原理
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!