AQS源码解析(2)-共享锁的获取与释放
大家好,我是呼噜噜,好久没聊关于Java并发,本文接着上一篇文章图解ReentrantLock的基石AQS源码-独占锁的获取与释放,将继续聊聊AQS共享锁获取与释放的一些细节
共享锁与独占锁的区别
首先我们先了解知道共享锁与独占锁的区别,他们主要的区别是:
排他锁,顾名思义,锁是线程独占的,锁在同一时刻只能有一个线程使用,同一时刻不能被多个线程一同占用,一个线程占用后其它线程只能等待。在AQS中常量EXCLUSIVE
表示独占模式(独占锁)
共享锁,锁是线程共享的,即锁在同一时刻可以被多个线程共享使用,一个线程对资源加了共享锁后其它线程对资源也只能加共享锁。共享锁有着很好的读性能。在AQS中常量SHARED
表示共享模式(共享锁)
共享锁的获取
在AQS中,我们通过acquireShared
来获取共享锁,先来看看acquireShared
的源码:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
acquireShared
如果对应到独占锁的方法,其实就是acquire
:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquireShared与Semaphore子类的实现
我们可以发现AQS中没有实现tryAcquireShared
,而是抛出了一个异常,那么就是由其子类实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
我们这里以Semaphore
这个类来分析tryAcquireShared
的具体实现。Semaphore
也叫信号量,可以用来控制同时访问共享资源的线程数量,通过协调各个线程,以保证合理的使用资源。
在Semaphore内部维护了一个计数器,其值为可以访问的共享资源的个数。一个线程如果要访问共享资源,需要先获得信号量:
- 如果信号量的计数器值大于 1,意味着有共享资源可以访问,则使其计数器值减去
1
,再访问共享资源。 - 如果计数器值为
0
, 表示信号量(许可证)已分配完了,则线程将进入等待状态。直到某个线程使用完共享资源后,释放信号量,并将信号量内部的计数器加1
,之前进入等待状态的线程,将被唤醒并再次试图获得信号量。
Semaphore简单的使用:
Semaphore semaphore = new Semaphore(10,true);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();
我们继续回到Semaphore
这个类实现的tryAcquireShared
源码处:
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);//permits是代表许可证的数量
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {//自旋(死循环)
int available = getState();// 获取state状态变量的值(即:许可证的数量)
int remaining = available - acquires;//计算 剩余许可证的数量
//如果remaining<0直接返回
//或者如果使用CAS设置许可证数量成功,也返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
/**
* 非公平锁tryAcquireShared实现
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);//调用父类Sync的构造方法
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平锁tryAcquireShared实现
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {//自旋(死循环)
// 判断AQS队列中头部结点的后继节点是否为空,或者是判断头部结点的后继节点是否不是当前线程
// 如果阻塞队列不为空直接返回-1
if (hasQueuedPredecessors())
return -1;
//得到当前state的值
int available = getState();
int remaining = available - acquires;
// 得到资源后state的值小于0直接返回state的值,否则CAS更新state的值再返回state的值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
可以很容易发现,Semaphore中tryAcquireShared
公平和非公平锁的逻辑,主要区别就是,公平锁里面每次循环都会先调用hasQueuedPredecessors()
,用来判断AQS队列中头部结点的后继节点是否为空,或者是判断头部结点的后继节点是否不是当前线程,这样就保证了先进来的线程会先执行,也就是实现了公平锁的逻辑
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
所以tryAcquireShared()
顾名思义就是当前线程尝试获取共享锁,它的返回值有3种情况:
- 如果返回值小于0,表示当前线程获取共享锁失败
- 如果返回值大于0,表示当前线程获取共享锁成功,后继线程尝试获取共享锁,可能会成功
- 如果返回值大于0,也表示当前线程获取共享锁成功,但后继线程尝试获取共享锁,不会成功,这点需要格外注意
doAcquireShared
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
//线程封装成Node,并根据给定的模式(独占或者共享)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 中断标志
boolean interrupted = false;
for (;;) {
// 获取node的前继节点
final Node p = node.predecessor();
if (p == head) {// 若node的前继节点为head节点,则执行tryAcquireShared方法尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) { // 若返回值>=0,表明获取锁成功
// 将当前节点设置为head节点,并唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)//如果中断标志位true
selfInterrupt();//调用selfInterrupt,给当前线程补上一个中断标志,让当前线程自己知道自己被中断过,同时也唤醒当前线程。
failed = false;
return;
}
}
//通过Node的状态来判断,线程竞争锁失败以后是否应该被挂起
if (shouldParkAfterFailedAcquire(p, node) &&
//将线程挂起,重新被唤醒后,去检查阻塞期间是否被中断过
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireShared
与之对应的是独占锁的acquireQueued
,逻辑类似,需要注意的是,线程获取锁失败后入队列并不会立刻阻塞,而是判断是否应该阻塞shouldParkAfterFailedAcquire
,如果前继是head,会再给一次机会获取锁,一切都是为了尽快唤醒其他等待线程
与之对应的是独占锁的acquireQueued
的主要区别,我们接着一一到来:
addWaiter(Node.SHARED)
不同于独占锁调用addWaiter(Node.EXCLUSIVE)
,SHARED表示共享模式,EXCLUSIVE表示独占模式,我们先来看一下addWaiter(Node.SHARED)
方法相关的源码:
private Node addWaiter(Node mode) {
//线程封装成Node,并根据给定的模式(独占或者共享)!!!
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//尝试添加尾节点,如果是第一个结点加入肯定为空,跳过
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//没有一次成功的话,就会去多次尝试
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {//自旋,也就是死循环
Node t = tail;
if (t == null) { // Must initialize
//CAS 设置队列头,新建一个空的Node节点作为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//CAS 设置队列尾,存储当前线程的节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
需要注意的是,head结点本身不存在任何数据,是一个虚节点,它只是作为一个牵头结点,如果队列不为null,tail则永远指向尾部结点
采用虚节点当头结点,主要是因为每个节点都需要设置前置节点的 ws 状态(这个状态是为了保证数据一致性),如果只有一个线程竞争锁时,只有一个结点,其是没有前置节点的,所以需要创建一个虚拟节点,这样就能兼容临界情况当只有一个线程竞争锁时,无需初始化生成同步队列,直接获取同步锁即可
setHeadAndPropagate
另一个不同的是,独占锁直接调用了setHead(node)
,而当共享锁获取锁之后,调用的是setHeadAndPropagate(node, r)
,共享锁的传播性由setHeadAndPropagate
完成
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 将当前节点设置为head
setHead(node);
/*
* propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一,表示剩余可用许可数
* h 表示旧的head节点
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {//若propagate>0或者原head节点为null或原head节点的状态值<0
// 获取node的后继节点
Node s = node.next;
if (s == null || s.isShared())// 若后继节点为null或者为共享节点,则执行doReleaseShared方法继续传递唤醒操作
doReleaseShared();
}
}
propagate
这里是tryAcquireShared
的返回值,如果propagate > 0
,说明tryAcquireShared
后还有剩余共享锁可以获取,会去唤醒下一个线程;如果propagate=0
,说明tryAcquireShared
后没有剩余共享锁可以获取
setHeadAndPropagate
其内部不仅仅是setHead(node)
,还会在一定条件唤醒head后继,这是为啥呢?
这是由于在共享锁模式下,顾名思义,锁可以被多个线程所共同持有,如果当前线程已获取到锁了,那么后继节点(线程),也可以拿到该锁,所以当符合一定条件,可以唤醒head后继节点,在这里调用doReleaseShared
能更灵敏的唤醒阻塞线程
doReleaseShared
方法,我们先暂时不分析,等下午讲共享锁释放的时候再一起讲
selfInterrupt
还有一个区别就是,如果中断标志位true
,doAcquireShared
会在方法内部调用selfInterrupt
,直接将中断响应掉,而我们知道acquireQueued
只是返回中断标志,会在其外层方法调用selfInterrupt
响应中断
共享锁的释放
releaseShared
AQS中,我们通过releaseShared
来释放共享锁,先来看看releaseShared
的源码:
public final boolean releaseShared(int arg) {
//尝试释放资源
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared与Semaphore子类的实现
我们可以发现AQS中也没有实现tryReleaseShared,
而是抛出了一个异常,那么就是由其子类实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
我们来看看Semaphore
子类的实现:
protected final boolean tryReleaseShared(int releases) {
for (;;) {//自旋
// 获取当前state的值
int current = getState();
//释放资源state值要增加
int next = current + releases;
if (next < current) // next < current说明传入参数非法
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//尝试CAS更新state的值
return true;
}
}
首先通过tryReleaseShared()
去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则会通过 doReleaseShared()
去释放共享锁
doReleaseShared
doReleaseShared
是AQS共享锁核心释放锁的方法,我们先来看下源码:
private void doReleaseShared() {
*/
for (;;) {//自旋
Node h = head;
//若head不为null且不是tail节点
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
//若head节点状态为SIGNAL(-1),则通过CAS将head节点的状态设置为0
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒head结点的后继节点
unparkSuccessor(h);
}
//若head节点状态为0,自旋CAS设置节点状态PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果head节点在自旋期间未change的话,则跳出自旋
if (h == head)
break;
}
}
doReleaseShared
源码中,有许多Node结点的状态,我们先回顾一下图解ReentrantLock的基石AQS源码-独占锁的获取与释放 中AQS同步队列模型:
其中waitStatus
是表示当前被封装成Node结点的状态,默认为0,表示初始化状态,还有4种状态:CANCELLED、SIGNAL、CONDITION、PROPAGATE
,分别是:
- CANCELLED: 1, 表示该节点的线程被取消,当同步队列中的线程超时或中断,会将此节点取消。该节点永远不会再发生变化,需要注意的是当前节点的线程为取消状态时,再也不会被阻塞
- SIGNAL:-1, 当其prev结点释放了同步锁 或者 被取消后,立即通知处于SIGNAL状态的next节点的线程执行
- CONDITION:-2,表示节点处于条件队列等待,调用了
await
方法后处于等待状态的线程节点会被标记为此种状态,当调用了Condition的singal()
方法后,CONDITION
状态会变为SIGNAL
状态,并且会在适当的时机从等待队列转移到同步队列中。 - PROPAGATE:-3,这种状态与共享模式有关,在共享模式下,表示节点处于可运行状态
我们来梳理一下doReleaseShared
的主要流程:
-
如果若head节点不为null且不是tail节点(这个其实是保证了同步队列初始话完成,防止未初始化,直接去调用
doReleaseShared
),首先去获取节点的waitStatus
状态,如果head节点的状态等于SIGNAL(-1)
,则通过CAS将head节点的状态设置为0 -
如果状态设置成功则调用
unparkSuccessor
唤醒head结点的后继节点,此时当头节点发生变化时(即被唤醒后继节点已经成为了新的头节点),会继续回到循环中,继续唤醒head节点的后继节点,直到符合跳出循环的条件(见第4点) -
还有种特殊情况,当AQS同步队列的最后一个节点成为了头节点,由于后面没有新的节点了,也就不会出现新节点将自己的前驱节点的修改成
SIGNAL
,最终此时head节点状态为0,会进入else if
,通过CAS将head 的状态从0再次修改为PROPAGATE(-3)
;如果CAS操作失败的话,就说明有新的节点入队, 就ws的值被修改为SIGNAL
,继续循环,等待被唤醒 -
若head节点的状态不是
SIGNAL或者0
,会去判断head节点在自旋期间是否发生改变的,也就是h == head
,若如果未发生改变,则跳出自旋(死循环); 这种情况h == head
一般发生于某个被唤醒的线程因为获取不到锁(资源被用尽)执行shouldParkAfterFailedAcquire方法被阻塞挂起,head节点没有发生改变
那这里为什么要设置PROPAGATE(-3)
状态?
PROPAGETE
的作用是,保证唤醒的传播,避免线程无法会唤醒的窘境
因为当多个线程并发执行releaseShared
时,有可能出现在AQS同步队列等待的节点,比如前一个线程完成了释放唤醒,同时后一个线程获取锁,但还未执行setHeadAndPropagate
进行共享锁传播,也就是未设置好head,也就是说此时读取老的head状态为0(也就是这种else if (ws == 0情况)
,会导致释放但不唤醒,此时会将头结点的状态置为 PROPAGATE
状态,让获取锁的线程任然能够进行共享锁传播,唤醒下一个线程。
doReleaseShared
在AQS中,有2处地方被调用,一处就是在这里,当线程释放共享锁的时候调用;另一处就是我们上面讲的setHeadAndPropagate
方法中,当线程成功获取到共享锁后,在一定条件下调用该方法
如果设置了PROPAGETE
,传播行为抽象出来,不仅仅简单地依赖unparkSuccessor
,再配合setHeadAndPropagate
处的逻辑
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {//若propagate>0或者原head节点为null或原head节点的状态值<0
// 获取node的后继节点
Node s = node.next;
if (s == null || s.isShared())// 若后继节点为null或者为共享节点,则执行doReleaseShared方法继续传递唤醒操作
doReleaseShared();
}
若后继节点为null或者为共享节点,持锁线程会去调用doReleaseShared
来唤醒该线程,这样也在某种意义上加快了唤醒后继节点的速度
小结
共享锁加锁的逻辑和独占锁类似,最主要的区别就是共享锁可以被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
共享锁的释放,doReleaseShared()
中只要head发生改变,会不断地循环唤醒head的后继节点,尝试唤醒尽可能多且可以唤醒的节点,而新的线程一旦获取到锁,会加入到唤醒head后继节点的循环中,尽可能提高唤醒等待线程的速度
我们可以发现AQS共享锁head节点的各个状态,转化的过程,还是非常绕的,需要耐心地阅读源码,一点点地抽丝剥茧。