logo

AQS源码解析(2)-共享锁的获取与释放

大家好,我是呼噜噜,好久没聊关于Java并发,本文接着上一篇文章图解ReentrantLock的基石AQS源码-独占锁的获取与释放,将继续聊聊AQS共享锁获取与释放的一些细节

共享锁与独占锁的区别

首先我们先了解知道共享锁与独占锁的区别,他们主要的区别是:

排他锁,顾名思义,锁是线程独占的,锁在同一时刻只能有一个线程使用,同一时刻不能被多个线程一同占用,一个线程占用后其它线程只能等待。在AQS中常量EXCLUSIVE表示独占模式(独占锁)

共享锁,锁是线程共享的,即锁在同一时刻可以被多个线程共享使用,一个线程对资源加了共享锁后其它线程对资源也只能加共享锁。共享锁有着很好的读性能。在AQS中常量SHARED表示共享模式(共享锁)

共享锁的获取

在AQS中,我们通过acquireShared来获取共享锁,先来看看acquireShared的源码:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

acquireShared如果对应到独占锁的方法,其实就是acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquireShared与Semaphore子类的实现

我们可以发现AQS中没有实现tryAcquireShared,而是抛出了一个异常,那么就是由其子类实现

protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

我们这里以Semaphore这个类来分析tryAcquireShared的具体实现。Semaphore也叫信号量,可以用来控制同时访问共享资源的线程数量,通过协调各个线程,以保证合理的使用资源。

在Semaphore内部维护了一个计数器,其值为可以访问的共享资源的个数。一个线程如果要访问共享资源,需要先获得信号量:

  1. 如果信号量的计数器值大于 1,意味着有共享资源可以访问,则使其计数器值减去1,再访问共享资源。
  2. 如果计数器值为0, 表示信号量(许可证)已分配完了,则线程将进入等待状态。直到某个线程使用完共享资源后,释放信号量,并将信号量内部的计数器加1,之前进入等待状态的线程,将被唤醒并再次试图获得信号量。

Semaphore简单的使用:

Semaphore semaphore = new Semaphore(10,true);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();

我们继续回到Semaphore这个类实现的tryAcquireShared源码处:

abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits);//permits是代表许可证的数量
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {//自旋(死循环)
                
                int available = getState();// 获取state状态变量的值(即:许可证的数量)
                int remaining = available - acquires;//计算 剩余许可证的数量
                //如果remaining<0直接返回
        		//或者如果使用CAS设置许可证数量成功,也返回
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
	}


    /**
         * 非公平锁tryAcquireShared实现
         */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
    
        NonfairSync(int permits) {
            super(permits);//调用父类Sync的构造方法
        }
    
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
         * 公平锁tryAcquireShared实现
         */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
    
        FairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            for (;;) {//自旋(死循环)
                
                // 判断AQS队列中头部结点的后继节点是否为空,或者是判断头部结点的后继节点是否不是当前线程
                // 如果阻塞队列不为空直接返回-1
                if (hasQueuedPredecessors())
                    return -1;
                //得到当前state的值
                int available = getState();
                int remaining = available - acquires;
                // 得到资源后state的值小于0直接返回state的值,否则CAS更新state的值再返回state的值
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

可以很容易发现,Semaphore中tryAcquireShared公平和非公平锁的逻辑,主要区别就是,公平锁里面每次循环都会先调用hasQueuedPredecessors(),用来判断AQS队列中头部结点的后继节点是否为空,或者是判断头部结点的后继节点是否不是当前线程,这样就保证了先进来的线程会先执行,也就是实现了公平锁的逻辑

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

所以tryAcquireShared()顾名思义就是当前线程尝试获取共享锁,它的返回值有3种情况:

  1. 如果返回值小于0,表示当前线程获取共享锁失败
  2. 如果返回值大于0,表示当前线程获取共享锁成功,后继线程尝试获取共享锁,可能会成功
  3. 如果返回值大于0,也表示当前线程获取共享锁成功,但后继线程尝试获取共享锁,不会成功,这点需要格外注意

doAcquireShared

    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        //线程封装成Node,并根据给定的模式(独占或者共享)
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            // 中断标志
            boolean interrupted = false;
            for (;;) {
                // 获取node的前继节点
                final Node p = node.predecessor();
                if (p == head) {// 若node的前继节点为head节点,则执行tryAcquireShared方法尝试获取锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 若返回值>=0,表明获取锁成功
                        // 将当前节点设置为head节点,并唤醒后继节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        
                        if (interrupted)//如果中断标志位true
                            selfInterrupt();//调用selfInterrupt,给当前线程补上一个中断标志,让当前线程自己知道自己被中断过,同时也唤醒当前线程。
                        failed = false;
                        return;
                    }
                }
                //通过Node的状态来判断,线程竞争锁失败以后是否应该被挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //将线程挂起,重新被唤醒后,去检查阻塞期间是否被中断过
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireShared与之对应的是独占锁的acquireQueued,逻辑类似,需要注意的是,线程获取锁失败后入队列并不会立刻阻塞,而是判断是否应该阻塞shouldParkAfterFailedAcquire,如果前继是head,会再给一次机会获取锁,一切都是为了尽快唤醒其他等待线程

与之对应的是独占锁的acquireQueued的主要区别,我们接着一一到来:

addWaiter(Node.SHARED)

不同于独占锁调用addWaiter(Node.EXCLUSIVE)SHARED表示共享模式,EXCLUSIVE表示独占模式,我们先来看一下addWaiter(Node.SHARED)方法相关的源码:

private Node addWaiter(Node mode) {
  	//线程封装成Node,并根据给定的模式(独占或者共享)!!!
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
   //尝试添加尾节点,如果是第一个结点加入肯定为空,跳过
    if (pred != null) {
        node.prev = pred;
        //CAS设置尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
  	//没有一次成功的话,就会去多次尝试
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {//自旋,也就是死循环
        Node t = tail;
        if (t == null) { // Must initialize
          	//CAS 设置队列头,新建一个空的Node节点作为头结点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
          	//CAS 设置队列尾,存储当前线程的节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

需要注意的是,head结点本身不存在任何数据,是一个虚节点,它只是作为一个牵头结点,如果队列不为null,tail则永远指向尾部结点

采用虚节点当头结点,主要是因为每个节点都需要设置前置节点的 ws 状态(这个状态是为了保证数据一致性),如果只有一个线程竞争锁时,只有一个结点,其是没有前置节点的,所以需要创建一个虚拟节点,这样就能兼容临界情况当只有一个线程竞争锁时,无需初始化生成同步队列,直接获取同步锁即可

setHeadAndPropagate

另一个不同的是,独占锁直接调用了setHead(node),而当共享锁获取锁之后,调用的是setHeadAndPropagate(node, r),共享锁的传播性由setHeadAndPropagate完成

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 将当前节点设置为head
        setHead(node);
       /*
         * propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一,表示剩余可用许可数
         * h 表示旧的head节点
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {//若propagate>0或者原head节点为null或原head节点的状态值<0
            // 获取node的后继节点
            Node s = node.next;
            if (s == null || s.isShared())// 若后继节点为null或者为共享节点,则执行doReleaseShared方法继续传递唤醒操作
                doReleaseShared();
        }
    }

propagate这里是tryAcquireShared的返回值,如果propagate > 0,说明tryAcquireShared还有剩余共享锁可以获取,会去唤醒下一个线程;如果propagate=0,说明tryAcquireShared没有剩余共享锁可以获取

setHeadAndPropagate其内部不仅仅是setHead(node)还会在一定条件唤醒head后继,这是为啥呢?

这是由于在共享锁模式下,顾名思义,锁可以被多个线程所共同持有,如果当前线程已获取到锁了,那么后继节点(线程),也可以拿到该锁,所以当符合一定条件,可以唤醒head后继节点,在这里调用doReleaseShared能更灵敏的唤醒阻塞线程

doReleaseShared方法,我们先暂时不分析,等下午讲共享锁释放的时候再一起讲

selfInterrupt

还有一个区别就是,如果中断标志位truedoAcquireShared会在方法内部调用selfInterrupt,直接将中断响应掉,而我们知道acquireQueued只是返回中断标志,会在其外层方法调用selfInterrupt响应中断

共享锁的释放

releaseShared

AQS中,我们通过releaseShared来释放共享锁,先来看看releaseShared的源码:

public final boolean releaseShared(int arg) {
    	//尝试释放资源
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared与Semaphore子类的实现

我们可以发现AQS中也没有实现tryReleaseShared,而是抛出了一个异常,那么就是由其子类实现

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

我们来看看Semaphore子类的实现:

protected final boolean tryReleaseShared(int releases) {
            for (;;) {//自旋
              	// 获取当前state的值
                int current = getState();
              	//释放资源state值要增加
                int next = current + releases;

                if (next < current) // next < current说明传入参数非法
                    throw new Error("Maximum permit count exceeded");

                if (compareAndSetState(current, next))//尝试CAS更新state的值
                    return true;
            }
        }

首先通过tryReleaseShared() 去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则会通过 doReleaseShared() 去释放共享锁

doReleaseShared

doReleaseShared是AQS共享锁核心释放锁的方法,我们先来看下源码:

private void doReleaseShared() {
         */
        for (;;) {//自旋
            Node h = head;
            //若head不为null且不是tail节点
            if (h != null && h != tail) {
                // 获取头节点对应的线程的状态
                int ws = h.waitStatus;
                //若head节点状态为SIGNAL(-1),则通过CAS将head节点的状态设置为0
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒head结点的后继节点
                    unparkSuccessor(h);
                }
                //若head节点状态为0,自旋CAS设置节点状态PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果head节点在自旋期间未change的话,则跳出自旋
            if (h == head)
                break;
        }
    }

doReleaseShared源码中,有许多Node结点的状态,我们先回顾一下图解ReentrantLock的基石AQS源码-独占锁的获取与释放 中AQS同步队列模型:

其中waitStatus是表示当前被封装成Node结点的状态,默认为0,表示初始化状态,还有4种状态:CANCELLED、SIGNAL、CONDITION、PROPAGATE,分别是:

  1. CANCELLED: 1, 表示该节点的线程被取消,当同步队列中的线程超时或中断,会将此节点取消。该节点永远不会再发生变化,需要注意的是当前节点的线程为取消状态时,再也不会被阻塞
  2. SIGNAL:-1, 当其prev结点释放了同步锁 或者 被取消后,立即通知处于SIGNAL状态的next节点的线程执行
  3. CONDITION:-2,表示节点处于条件队列等待,调用了await方法后处于等待状态的线程节点会被标记为此种状态,当调用了Condition的singal()方法后,CONDITION状态会变为SIGNAL状态,并且会在适当的时机从等待队列转移到同步队列中。
  4. PROPAGATE:-3,这种状态与共享模式有关,在共享模式下,表示节点处于可运行状态

我们来梳理一下doReleaseShared的主要流程:

  1. 如果若head节点不为null且不是tail节点(这个其实是保证了同步队列初始话完成,防止未初始化,直接去调用doReleaseShared),首先去获取节点的waitStatus状态,如果head节点的状态等于SIGNAL(-1),则通过CAS将head节点的状态设置为0

  2. 如果状态设置成功则调用 unparkSuccessor唤醒head结点的后继节点,此时当头节点发生变化时(即被唤醒后继节点已经成为了新的头节点),会继续回到循环中,继续唤醒head节点的后继节点,直到符合跳出循环的条件(见第4点)

  3. 还有种特殊情况,当AQS同步队列的最后一个节点成为了头节点,由于后面没有新的节点了,也就不会出现新节点将自己的前驱节点的修改成SIGNAL,最终此时head节点状态为0,会进入else if,通过CAS将head 的状态从0再次修改为 PROPAGATE(-3);如果CAS操作失败的话,就说明有新的节点入队, 就ws的值被修改为SIGNAL,继续循环,等待被唤醒

  4. 若head节点的状态不是SIGNAL或者0,会去判断head节点在自旋期间是否发生改变的,也就是h == head,若如果未发生改变,则跳出自旋(死循环); 这种情况h == head一般发生于某个被唤醒的线程因为获取不到锁(资源被用尽)执行shouldParkAfterFailedAcquire方法被阻塞挂起,head节点没有发生改变

那这里为什么要设置PROPAGATE(-3)状态?

PROPAGETE的作用是,保证唤醒的传播,避免线程无法会唤醒的窘境

因为当多个线程并发执行releaseShared时,有可能出现在AQS同步队列等待的节点,比如前一个线程完成了释放唤醒,同时后一个线程获取锁,但还未执行setHeadAndPropagate进行共享锁传播,也就是未设置好head,也就是说此时读取老的head状态为0(也就是这种else if (ws == 0情况),会导致释放但不唤醒,此时会将头结点的状态置为 PROPAGATE状态,让获取锁的线程任然能够进行共享锁传播,唤醒下一个线程。

doReleaseShared在AQS中,有2处地方被调用,一处就是在这里,当线程释放共享锁的时候调用;另一处就是我们上面讲的setHeadAndPropagate方法中,当线程成功获取到共享锁后,在一定条件下调用该方法

如果设置了PROPAGETE,传播行为抽象出来,不仅仅简单地依赖unparkSuccessor,再配合setHeadAndPropagate处的逻辑

if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {//若propagate>0或者原head节点为null或原head节点的状态值<0
            // 获取node的后继节点
            Node s = node.next;
            if (s == null || s.isShared())// 若后继节点为null或者为共享节点,则执行doReleaseShared方法继续传递唤醒操作
                doReleaseShared();
        }

若后继节点为null或者为共享节点,持锁线程会去调用doReleaseShared来唤醒该线程,这样也在某种意义上加快了唤醒后继节点的速度

小结

共享锁加锁的逻辑和独占锁类似,最主要的区别就是共享锁可以被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。

共享锁的释放,doReleaseShared()中只要head发生改变,会不断地循环唤醒head的后继节点,尝试唤醒尽可能多且可以唤醒的节点,而新的线程一旦获取到锁,会加入到唤醒head后继节点的循环中,尽可能提高唤醒等待线程的速度

我们可以发现AQS共享锁head节点的各个状态,转化的过程,还是非常绕的,需要耐心地阅读源码,一点点地抽丝剥茧。


参考资料:
https://www.cnblogs.com/micrari/p/6937995.html