aqs

基础

AbstractQueuedSynchronizer抽象同步队列简称A Q S,它是实现同步器的基础组件,如常用的ReentrantLock、Semaphore、CountDownLatch等。A Q S定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作

AQS组成结构

三部分组成,state同步状态、Node组成的CLH队列、ConditionObject条件变量(包含Node组成的条件单向队列),下面会分别对这三部分做介绍。

状态

  • getState():返回同步状态
  • setState(int newState):设置同步状态
  • compareAndSetState(int expect, int update):使用C A S设置同步状态
  • isHeldExclusively():当前线程是否持有资源

独占资源(不响应线程中断)

  • tryAcquire(int arg):独占式获取资源,子类实现
  • acquire(int arg):独占式获取资源模板
  • tryRelease(int arg):独占式释放资源,子类实现
  • release(int arg):独占式释放资源模板

共享资源(不响应线程中断)

  • tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现
  • acquireShared(int arg):共享式获取资源模板
  • tryReleaseShared(int arg):共享式释放资源,子类实现
  • releaseShared(int arg):共享式释放资源模板

共享状态

A Q S中维护了一个同步状态变量stategetState函数获取同步状态,setState、compareAndSetState函数修改同步状态,对于A Q S来说,线程同步的关键是对state的操作,可以说获取、释放资源是否成功都是由state决定的,比如state>0代表可获取资源,否则无法获取,所以state的具体语义由实现者去定义,现有的ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch定义的state语义都不一样。

  • ReentrantLockstate用来表示是否有锁资源
  • ReentrantReadWriteLockstate16位代表读锁状态,低16位代表写锁状态
  • Semaphorestate用来表示可用信号的个数
  • CountDownLatchstate用来表示计数器的值

CLH队列

CLHA Q S内部维护的FIFO先进先出)双端双向队列(方便尾部节点插入),基于链表数据结构,当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点,通过C A S原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列,所以说A Q S通过CLH队列管理竞争资源的线程,个人总结CLH队列具有如下几个优点:

  • 先进先出保证了公平性
  • 非阻塞的队列,通过自旋锁和C A S保证节点插入和移除的原子性,实现无锁快速插入
  • 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁

Node内部类

NodeA Q S的内部类,每个等待资源的线程都会封装成Node节点组成C L H队列、等待队列,所以说Node是非常重要的部分,理解它是理解A Q S的第一步。

waitStatus等待状态

nextWaiter特殊标记

  • NodeCLH队列时,nextWaiter表示共享式或独占式标记
  • Node在条件队列时,nextWaiter表示下个Node节点指针

node过程

注意:队头head节点的thread一定为null,用于表示正在执行的线程对象,且用于唤醒后续线程

入队


    private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node var1) {
        //根据当前线程创建节点,等待状态为0
        AbstractQueuedSynchronizer.Node var2 = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), var1);
        AbstractQueuedSynchronizer.Node var3 = this.tail;
        // 如果阻塞队列中尾节点不为空,就cas将当前节点添加到最后
        if (var3 != null) {
            var2.prev = var3;
            if (this.compareAndSetTail(var3, var2)) {
                var3.next = var2;
                return var2;
            }
        }
        // 第一次还没有阻塞队列的时候,会到enq方法里面
        this.enq(var2);
        return var2;
    }

我们在第一次进入这个方法的时候,上面图一所示,tail和head都指向null;

第一次循环,首先会到图二,然后判断t所指向的节点是不是null,如果是的话,就用CAS更新节点,这个CAS我们可以看作:头节点head为null,我们把head节点更新为一个哨兵节点(哨兵节点就是new Node()),再将tail也指向head,就是图三了

第二次for循环:走到上面的else语句,将新节点的前一个节点设置为哨兵节点;

然后就是CAS更新节点,这里CAS的意思:如果最后的节点tail指向的和t是一样的,那么就将tail指向node节点

最后再将t的下一个节点设置为node,下图所示,就ok了

出队

    final boolean acquireQueued(AbstractQueuedSynchronizer.Node var1, int var2) {
        boolean var3 = true;
        try {
            boolean var4 = false;
            while(true) {
                //1.获取前驱节点
                AbstractQueuedSynchronizer.Node var5 = var1.predecessor();
                //如果前驱节点是首节点,获取资源(子类实现)
                if (var5 == this.head && this.tryAcquire(var2)) {
                    //2.获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
                    this.setHead(var1);
                    //3.原来首节点下个节点指向为null
                    var5.next = null;
                    //4.非异常状态,防止指向finally逻辑
                    var3 = false;
                    boolean var6 = var4;
                    //5.返回线程中断状态
                    return var6;
                }

                if (shouldParkAfterFailedAcquire(var5, var1) && this.parkAndCheckInterrupt()) {
                    var4 = true;
                }
            }
        } finally {
            if (var3) {
                this.cancelAcquire(var1);
            }
        }
    }
private void setHead(Node node) {
    //节点设置为头部
    head = node;
    //清空线程
    node.thread = null;
    //清空前驱节点
    node.prev = null;
}

假设获取资源成功,更换头部节点,并把头部节点的信息清除变成哨兵节点,注意这个过程是不需要使用C A S来保证,因为只有一个线程能够成功获取到资源。

条件变量

Objectwait、notify函数是配合Synchronized锁实现线程间同步协作的功能,A Q SConditionObject条件变量也提供这样的功能,通过ConditionObjectawaitsignal两类函数完成。

不同于Synchronized锁,一个A Q S可以对应多个条件变量,而Synchronized只有一个。

如上图所示,ConditionObject内部维护着一个单向条件队列,不同于C H L队列,条件队列只入队执行await的线程节点,并且加入条件队列的节点,不能在C H L队列, 条件队列出队的节点,会入队到C H L队列。

当某个线程执行了ConditionObjectawait函数,阻塞当前线程,线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObjectsignal函数,会将条件队列头部线程节点转移到C H L队列参与竞争资源,具体流程如下图

最后补充下,条件队列Node类是使用nextWaiter变量指向下个节点,并且因为是单向队列,所以prevnext变量都是null

模板方法

A Q S采用了模板方法设计模式,提供了两类模板,一类是独占式模板,另一类是共享形模式,对应的模板函数如下

  1. 独占式

  2. acquire获取资源

  3. release释放资源

  4. 共享式

  5. acquireShared获取资源

  6. releaseShared释放资源

独占式获取资源

acquire是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入CLH队列,直到获取资源成功为止,且整个过程忽略中断的影响,acquire函数代码如下

    public final void acquire(int var1) {
        if (!this.tryAcquire(var1) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {
            selfInterrupt();
        }
    }

//可以转换成这样
public final void acquire(int arg){
    if(!tryAcquire(arg)){//获取资源失败,tryAcquire子类实现
        //创建独占式标记节点,节点入队CLH队列
        Node node = addWaiter(Node.EXCLUSIVE);
        if(acquireQueued(node,arg)){//自旋阻塞等待获取资源,并返回线程是否被中断过标记
            //如果线程被中断过,指向线程中断操作
            selfInterrupt();
        }
    }
}
  • 执行tryAcquire函数,tryAcquire是由子类实现,代表获取资源是否成功,如果资源获取失败,执行下面的逻辑
  • 执行addWaiter函数(前面已经介绍过),根据当前线程创建出独占式节点,并入队CLH队列
  • 执行acquireQueued函数,自旋阻塞等待获取资源
  • 如果acquireQueued函数中获取资源成功,根据线程是否被中断状态,来决定执行线程中断逻辑

acquire方法

分析下acquireQueued函数,线程封装成节点后,是如何自旋阻塞等待获取资源的,代码如下

    final boolean acquireQueued(final Node node, int arg) {
        //异常状态,默认是
        boolean failed = true;
        try {
            //该线程是否中断过,默认否
            boolean interrupted = false;
            for (;;) {//自旋
                //获取前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是首节点,获取资源(子类实现)
                if (p == head && tryAcquire(arg)) {
                    //获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
                    setHead(node);
                    //原来首节点下个节点指向为null
                    p.next = null; // help GC
                    //非异常状态,防止指向finally逻辑
                    failed = false;
                    //返回线程中断状态
                    return interrupted;
                }
                /**
                 * 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事
                 * 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
                 * 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
                 * 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
                    parkAndCheckInterrupt())
                    //该线程被中断过
                    interrupted = true;
                }
            } finally {
                // 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
                if (failed)
                    cancelAcquire(node);
            }
    }

独占式释放资源

A Q S中提供了release模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CLH队列的第二个线程节点(首节点的下个节点),代码如下

    public final boolean release(int arg) {

        if (tryRelease(arg)) {//释放资源成功,tryRelease子类实现
            //获取头部线程节点
            Node h = head;
            if (h != null && h.waitStatus != 0) //头部线程节点不为null,并且等待状态不为0
                //唤醒CHL队列第二个线程节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


    private void unparkSuccessor(Node node) {
        //获取节点等待状态
        int ws = node.waitStatus;
        if (ws < 0)
            //cas更新节点状态为0
            compareAndSetWaitStatus(node, ws, 0);

        //获取下个线程节点        
        Node s = node.next;
        if (s == null || s.waitStatus > 0) { //如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒线程节点
            LockSupport.unpark(s.thread);
        }
    }

共享式获取资源

acquireShared是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响,acquireShared函数代码如下

    public final void acquireShared(int arg) {
        /**
         * 1.负数表示失败
         * 2.0表示成功,但没有剩余可用资源
         * 3.正数表示成功且有剩余资源
         */
        if (tryAcquireShared(arg) < 0) //获取资源失败,tryAcquireShared子类实现
            //自旋阻塞等待获取资源
            doAcquireShared(arg);
    }

doAcquireShared函数与独占式的acquireQueued函数逻辑基本一致,唯一的区别就是创建出来的节点时共享式的和setHeadAndPropagate(node,r)

    final boolean doacquireShared(int arg) {
        //根据当前线程创建出共享式节点,并入队
        //异常状态,默认是
        boolean failed = true;
        try {
            //该线程是否中断过,默认否
            boolean interrupted = false;
            for (;;) {//自旋
                //获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if(r >= 0){
                        //设置自己为头节点,并尝试唤醒后继节点
                        setHeadAndPropagate(node,r);
                        //原来首节点下个节点指向为null
                        p.next = null; // help GC
                        //如果线程被中断过
                        if(interrupted)
                            selfInterrupt();
                        //非异常状态,防止指向finally逻辑
                        failed = false;
                        return;
                    }
                }
                /**
                 * 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事
                 * 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
                 * 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
                 * 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
                    parkAndCheckInterrupt())
                    //该线程被中断过
                    interrupted = true;
                }
            } finally {
                // 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
                if (failed)
                    cancelAcquire(node);
            }
    }
  • 节点的标记是共享式
  • 获取资源成功,还会唤醒后续资源,因为资源数可能>0,代表还有资源可获取,所以需要做后续线程节点的唤醒

共享式释放资源

A Q S中提供了releaseShared模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CHL队列的第二个线程节点(首节点的下个节点),代码如下

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现
            //唤醒后继节点
            doReleaseShared();
            return true;
        }
        return false;
    }

    private void doReleaseShared() {
        for (;;) {
            //获取头节点
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;

                if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
                        continue;            // loop to recheck cases
                    //唤醒头节点下个线程节点
                    unparkSuccessor(h);
                }
                //如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)              
                break;
        }
    }

与独占式释放资源区别不大,都是唤醒头节点的下个节点

转载:https://mp.weixin.qq.com/s?__biz=MzAwMDg2OTAxNg==&mid=2652048892&idx=1&sn=1b12dc819ec677a2af67875d7fbbe4a0&scene=21#wechat_redirect


   转载规则


《aqs》 锦泉 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
枚举enum 枚举enum
enum的values方法此方法可以将枚举类转变为一个枚举类型的数组,因为枚举中没有下标,我们没有办法通过下标来快速找到需要的枚举类,这时候,转变为数组之后,我们就可以通过数组的下标,来找到我们需要的枚举类。 testpublic enum
2021-05-24
下一篇 
Java关键字——instanceof Java关键字——instanceof
测试一个对象是否为一个类的实例
2021-05-18
  目录