百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

AQS 都看完了,Condition 原理可不能少

liebian365 2024-11-12 13:10 26 浏览 0 评论


前言


" 在介绍 AQS 时,其中有一个内部类叫做 ConditionObject,当时并没有进行介绍,并且在后续阅读源码时,会发现很多地方用到了 Condition ,这时就会很诧异,这个 Condition 到底有什么作用?那今天就通过阅读 Condition 源码,从而弄清楚 Condition 到底是做什么的?当然阅读这篇文章的时候希望你已经阅读了 AQS、ReentrantLock 以及 LockSupport 的相关文章或者有一定的了解(~~当然小伙伴也可以直接跳到文末看总结~~) "


1

介绍


Object 的监视器方法:wait、notify、notifyAll 应该都不陌生,在多线程使用场景下,必须先使用 synchronized 获取到锁,然后才可以调用 Object 的 wait、notify。

Condition 的使用,相当于用 Lock 替换了 synchronized,然后用 Condition 替换 Object 的监视器方法。

Conditions(也称为条件队列或条件变量)为一种线程提供了一种暂停执行(等待),直到另一线程通知被阻塞的线程,某些状态条件现在可能为真。

因为访问到此共享状态信息发生在不同的线程中,因此必须对其进行保护,所以会使用某种形式的锁。等待条件提供的关键属性是它以原子地释放了关联的锁,并且挂起当前线程,就像 Object.wait 一样。

Condition 实例本质上要绑定到锁。为了获得 Condition 实例,一般使用 Lock 实例的 newCondition() 方法。

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();


基本使用

class BoundedBuffer {

    final Lock lock = new ReentrantLock();
    // condition 实例依赖于 lock 实例
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];

    int putPtr, takePtr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            //  put 时判断是否已经满了
            // 则线程在 notFull 条件上排队阻塞
            while (count == items.length) {
                notFull.await();
            }
            items[putPtr] = x;
            if (++putPtr == items.length) {
                putPtr = 0;
            }
            ++count;
            // put 成功之后,队列中有元素
            // 唤醒在 notEmpty 条件上排队阻塞的线程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            // take 时,发现为空
            // 则线程在 notEmpty 的条件上排队阻塞
            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[takePtr];
            if (++takePtr == items.length) {
                takePtr = 0;
            }
            --count;
            // take 成功,队列不可能是满的
            // 唤醒在 notFull 条件上排队阻塞的线程
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}


上面是官方文档的一个例子,实现了一个简单的 BlockingQueue ,看懂这里,会发现在同步队列中很多地方都是用的这个逻辑。必要的代码说明都已经在代码中进行注释。


问题疑问

  1. Condition 和 AQS 有什么关系?
  2. Condition 的实现原理是什么?
  3. Condition 的等待队列和 AQS 的同步队列有什么区别和联系?


2

源码分析


基本结构


通过 UML 可以看出,Condition 只是一个抽象类,它的主要实现逻辑是在 AQS 的内部类 ConditionObject 实现的。下面主要从 await 和 signal 两个方法入手,从源码了解 ConditionObject。


创建 Condition

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();


一般使用 lock.newCondition() 创建条件变量。


public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    public Condition newCondition() {
        return sync.newCondition();
    }
    // Sync 集成 AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }
}


这里使用的是 ReentrantLock 的源码,里面调用的 sync.newCondition(),Sync 继承 AQS,其实就是创建了一个 AQS 内部类的 ConditionObject 的实例。


这里需要注意的是 lock
每调用一次 lock.newCondition() 都会有一个新的 ConditionObject 实例生成,就是说一个 lock 可以创建多个 Condition 实例。


Condition 参数

/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;


await 方法

await 方法,会造成当前线程在等待,直到收到信号或被中断。


与此 Condition 相关联的锁被原子释放,并且出于线程调度目的,当前线程被禁用,并且处于休眠状态,直到发生以下四种情况之一:


  1. 其他一些线程调用此 Condition 的 signal 方法,而当前线程恰好被选择为要唤醒的线程;
  2. 其他一些线程调用此 Condition 的 signalAll 方法;
  3. 其他一些线程中断当前线程,并支持中断线程挂起;
  4. 发生虚假唤醒。


在所有情况下,在此方法可以返回之前,当前线程必须重新获取与此条件关联的锁。当线程返回时,可以保证保持此锁。


现在来看 AQS 内部的实现逻辑:


public final void await() throws InterruptedException {
    // 响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加到条件队列尾部(等待队列)
    // 内部会创建 Node.CONDITION 类型的 Node
    Node node = addConditionWaiter();
    // 释放当前线程获取的锁(通过操作 state 的值)
    // 释放了锁就会被阻塞挂起
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 节点已经不在同步队列中,则调用 park 让其在等待队列中挂着
    while (!isOnSyncQueue(node)) {
        // 调用 park 阻塞挂起当前线程
        LockSupport.park(this);
        // 说明 signal 被调用了或者线程被中断,校验下唤醒原因
        // 如果因为终端被唤醒,则跳出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // while 循环结束, 线程开始抢锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 统一处理中断的
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}


await 方法步骤如下:


  1. 创建 Node.CONDITION 类型的 Node 并添加到条件队列(ConditionQueue)的尾部;
  2. 释放当前线程获取的锁(通过操作 state 的值)
  3. 判断当前线程是否在同步队列(SyncQueue)中,不在的话会使用 park 挂起。
  4. 循环结束之后,说明已经已经在同步队列(SyncQueue)中了,后面等待获取到锁,继续执行即可。


在这里一定要把条件队列和同步队列进行区分清楚!!


条件队列/等待队列:即 Condition 的队列 同步队列:AQS 的队列。


下面对 await 里面重要方法进行阅读:


- addConditionWaiter() 方法


private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 判断尾节点状态,如果被取消,则清除所有被取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建新节点,类型为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 将新节点放到等待队列尾部
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}


addConditionWaiter 方法可以看出,只是创建一个类型为 Node.CONDITION 的节点并放到条件队列尾部。同时通过这段代码还可以得出其他结论:


  1. 条件队列内部的 Node,只用到了 thread、waitStatus、nextWaiter 属性;
  2. 条件队列是单向队列。


作为对比,这里把条件队列和同步队列做出对比:



AQS 同步队列如下:



再来看下 Condition 的条件队列



waitStatus 在 AQS 中已经进行了介绍:


默认状态为 0;

waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;

waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);

waitStatus = -2 CONDITION -2 :该节点目前在条件队列;

waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。


- fullyRelease 方法 (AQS)


final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取当前节点的 state
        int savedState = getState();
        // 释放锁
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}


fullyRelease 方法是由 AQS 提供的,首先获取当前的 state,然后调用 release 方法进行释放锁。


public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}


release 方法在 AQS 中做了详细的介绍。它的主要作用就是释放锁,并且需要注意的是:


  1. fullyRelease 会一次性释放所有的锁,所以说不管重入多少次,在这里都会全部释放的。
  2. 这里会抛出异常,主要是在释放锁失败时,这时就会在 finally 里面将节点状态置为 Node.CANCELLED。

-isOnSyncQueue(node)


通过上面的流程,节点已经放到了
条件队列并且释放了持有的,而后就会挂起阻塞,直到 signal 唤醒。但是在挂起时要保证节点已经不在同步队列(SyncQueue)中了才可以挂起。


final boolean isOnSyncQueue(Node node) {
    // 当前节点是条件队列节点,或者上一个节点是空
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;

    return findNodeFromTail(node);
}
// 从尾部开始遍历
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}


如果一个节点(总是一个最初放置在条件队列中的节点)现在正等待在同步队列上重新获取,则返回true。

这段代码的主要作用判断节点是不是在同步队列中,如果不在同步队列中,后面才会调用 park 进行阻塞当前线程。这里就会有一个疑问:AQS 的同步队列和 Condition 的条件队列应该是无关的,这里为什么会要保证节点不在同步队列之后才可以进行阻塞?因为 signal 或者 signalAll 唤醒节点之后,节点就会被放到同步队列中。


线程到这里已经被阻塞了,当有其他线程调用 signal 或者 signalAll 时,会唤醒当前线程。


而后会验证是否因中断唤醒当前线程,这里假设没有发生中断。那 while 循环的 isOnSyncQueue(Node node) 必然会返回 true ,表示当前节点已经在同步队列中了。


后续会调用
acquireQueued(node, savedState) 进行获取锁。


final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到资源
    boolean failed = true;
    try {
        // 中断状态
        boolean interrupted = false;
        // 无限循环
        for (;;) {
            // 当前节点之前的节点
            final Node p = node.predecessor();
            // 前一个节点是头节点, 说明当前节点是 头节点的 next 即真实的第一个数据节点 (因为 head 是虚拟节点)
            // 然后再尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 获取成功之后 将头指针指向当前节点
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // p 不是头节点, 或者 头节点未能获取到资源 (非公平情况下被别的节点抢占) 
            // 判断 node 是否要被阻塞,获取不到锁就会一直阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


这里就是 AQS 的逻辑了,同样可以阅读 AQS 的相关介绍。


不断获取本节点的上一个节点是否为 head,因为 head 是虚拟节点,如果当前节点的上一个节点是 head 节点,则当前节点为 第一个数据节点>

第一个数据节点不断的去获取资源,获取成功,则将 head 指向当前节点;

当前节点不是头节点,或者 tryAcquire(arg) 失败(失败可能是非公平锁)。这时候需要判断前一个节点状态决定当前节点是否要被阻塞(前一个节点状态是否为 SIGNAL)。


值得注意的是,当节点放到 AQS 的同步队列时,也是进行争抢资源,同时设置 savedState 的值,这个值则是代表当初释放锁的时候释放了多少重入次数。


总体流程画图如下:


signal

public final void signal() {
    // 是否为当前持有线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // firstWaiter 头节点指向条件队列头的下一个节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 将原来的头节点和同步队列断开
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
 
    // 判断节点是否已经在之前被取消了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 调用 enq 添加到 同步队列的尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    // node 的上一个节点 修改为 SIGNAL 这样后续就可以唤醒自己了
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}


enq 同样可以阅读 AQS 的代码


private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾节点为空 需要初始化头节点,此时头尾节点是一个
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不为空 循环赋值
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}


通过 enq 方法将节点放到 AQS 的同步队列之后,要将 node 的前一个节点的 waitStatus 设置为 Node.SIGNAL。signalAll 的代码也是类似。


3

总结


Q&A

Q: Condition 和 AQS 有什么关系?

A: Condition 是基于 AQS 实现的,Condition 的实现类 ConditionObject 是 AQS 的一个内部类,在里面共用了一部分 AQS 的逻辑。

Q: Condition 的实现原理是什么?

A: Condition 内部维护一个条件队列,在获取锁的情况下,线程调用 await,线程会被放置在条件队列中并被阻塞。直到调用 signal、signalAll 唤醒线程,此后线程唤醒,会放入到 AQS 的同步队列,参与争抢锁资源。

Q: Condition 的等待队列和 AQS 的同步队列有什么区别和联系?

A: Condition 的等待队列是单向链表,AQS 的是双向链表。二者之间并没有什么明确的联系。仅仅在节点从阻塞状态被唤醒后,会从等待队列挪到同步队列中。


结束语

本文主要是阅读 Condition 的相关代码,不过省略了线程中断等逻辑。有兴趣的小伙伴。可以更深入的研究相关的源码。


- <End /> -


作者:刘志航,一个宅宅的北漂程序员。


公众号:liuzhihangs,记录工作学习中的技术、开发及源码笔记;时不时分享一些生活中的见闻感悟。欢迎大佬来指导!

相关推荐

“版本末期”了?下周平衡补丁!国服最强5套牌!上分首选

明天,酒馆战棋就将迎来大更新,也聊了很多天战棋相关的内容了,趁此机会,给兄弟们穿插一篇构筑模式的卡组推荐!老规矩,我们先来看10职业胜率。目前10职业胜率排名与一周前基本类似,没有太多的变化。平衡补丁...

VS2017 C++ 程序报错“error C2065:“M_PI”: 未声明的标识符&quot;

首先,程序中头文件的选择,要选择头文件,在文件中是没有对M_PI的定义的。选择:项目——>”XXX属性"——>配置属性——>C/C++——>预处理器——>预处理器定义,...

东营交警实名曝光一批酒驾人员名单 88人受处罚

齐鲁网·闪电新闻5月24日讯酒后驾驶是对自己和他人生命安全极不负责的行为,为守护大家的平安出行路,东营交警一直将酒驾作为重点打击对象。5月23日,东营交警公布最新一批饮酒、醉酒名单。对以下驾驶人醉酒...

Qt界面——搭配QCustomPlot(qt platform)

这是我第一个使用QCustomPlot控件的上位机,通过串口精确的5ms发送一次数据,再将读取的数据绘制到图表中。界面方面,尝试卡片式设计,外加QSS简单的配了个色。QCustomPlot官网:Qt...

大话西游2分享赢取种族坐骑手办!PK趣闻录由你书写

老友相聚,仗剑江湖!《大话西游2》2021全民PK季4月激燃打响,各PK玩法鏖战齐开,零门槛参与热情高涨。PK季期间,不仅各种玩法奖励丰厚,参与PK趣闻录活动,投稿自己在PK季遇到的趣事,还有机会带走...

测试谷歌VS Code AI 编程插件 Gemini Code Assist

用ClaudeSonnet3.7的天气测试编码,让谷歌VSCodeAI编程插件GeminiCodeAssist自动编程。生成的文件在浏览器中的效果如下:(附源代码)VSCode...

顾爷想知道第4.5期 国服便利性到底需优化啥?

前段时间DNF国服推出了名为“阿拉德B计划”的系列改版计划,截至目前我们已经看到了两项实装。不过关于便利性上,国服似乎还有很多路要走。自从顾爷回归DNF以来,几乎每天都在跟我抱怨关于DNF里面各种各样...

掌握Visual Studio项目配置【基础篇】

1.前言VisualStudio是Windows上最常用的C++集成开发环境之一,简称VS。VS功能十分强大,对应的,其配置系统较为复杂。不管是对于初学者还是有一定开发经验的开发者来说,捋清楚VS...

还嫌LED驱动设计套路深?那就来看看这篇文章吧

随着LED在各个领域的不同应用需求,LED驱动电路也在不断进步和发展。本文从LED的特性入手,推导出适合LED的电源驱动类型,再进一步介绍各类LED驱动设计。设计必读:LED四个关键特性特性一:非线...

Visual Studio Community 2022(VS2022)安装图文方法

直接上步骤:1,首先可以下载安装一个VisualStudio安装器,叫做VisualStudioinstaller。这个安装文件很小,很快就安装完成了。2,打开VisualStudioins...

Qt添加MSVC构建套件的方法(qt添加c++11)

前言有些时候,在Windows下因为某些需求需要使用MSVC编译器对程序进行编译,假设我们安装Qt的时候又只是安装了MingW构建套件,那么此时我们该如何给现有的Qt添加一个MSVC构建套件呢?本文以...

Qt为什么站稳c++GUI的top1(qt c)

为什么现在QT越来越成为c++界面编程的第一选择,从事QT编程多年,在这之前做C++界面都是基于MFC。当时为什么会从MFC转到QT?主要原因是MFC开发界面想做得好看一些十分困难,引用第三方基于MF...

qt开发IDE应该选择VS还是qt creator

如果一个公司选择了qt来开发自己的产品,在面临IDE的选择时会出现vs或者qtcreator,选择qt的IDE需要结合产品需求、部署平台、项目定位、程序猿本身和公司战略,因为大的软件产品需要明确IDE...

Qt 5.14.2超详细安装教程,不会来打我

Qt简介Qt(官方发音[kju:t],音同cute)是一个跨平台的C++开库,主要用来开发图形用户界面(GraphicalUserInterface,GUI)程序。Qt是纯C++开...

Cygwin配置与使用(四)——VI字体和颜色的配置

简介:VI的操作模式,基本上VI可以分为三种状态,分别是命令模式(commandmode)、插入模式(Insertmode)和底行模式(lastlinemode),各模式的功能区分如下:1)...

取消回复欢迎 发表评论: