百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

AQS 都看完了,Condition 原理可不能少

liebian365 2024-11-12 13:10 5 浏览 0 评论


前言


" 在介绍 AQS 时,其中有一个内部类叫做 ConditionObject,当时并没有进行介绍,并且在后续阅读源码时,会发现很多地方用到了 Condition ,这时就会很诧异,这个 Condition 到底有什么作用?那今天就通过阅读 Condition 源码,从而弄清楚 Condition 到底是做什么的?当然阅读这篇文章的时候希望你已经阅读了 AQS、ReentrantLock 以及 LockSupport 的相关文章或者有一定的了解(~~当然小伙伴也可以直接跳到文末看总结~~) "


1

介绍


Object 的监视器方法:wait、notify、notifyAll 应该都不陌生,在多线程使用场景下,必须先使用 synchronized 获取到锁,然后才可以调用 Object 的 wait、notify。

Condition 的使用,相当于用 Lock 替换了 synchronized,然后用 Condition 替换 Object 的监视器方法。

Conditions(也称为条件队列或条件变量)为一种线程提供了一种暂停执行(等待),直到另一线程通知被阻塞的线程,某些状态条件现在可能为真。

因为访问到此共享状态信息发生在不同的线程中,因此必须对其进行保护,所以会使用某种形式的锁。等待条件提供的关键属性是它以原子地释放了关联的锁,并且挂起当前线程,就像 Object.wait 一样。

Condition 实例本质上要绑定到锁。为了获得 Condition 实例,一般使用 Lock 实例的 newCondition() 方法。

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();


基本使用

class BoundedBuffer {

    final Lock lock = new ReentrantLock();
    // condition 实例依赖于 lock 实例
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];

    int putPtr, takePtr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            //  put 时判断是否已经满了
            // 则线程在 notFull 条件上排队阻塞
            while (count == items.length) {
                notFull.await();
            }
            items[putPtr] = x;
            if (++putPtr == items.length) {
                putPtr = 0;
            }
            ++count;
            // put 成功之后,队列中有元素
            // 唤醒在 notEmpty 条件上排队阻塞的线程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            // take 时,发现为空
            // 则线程在 notEmpty 的条件上排队阻塞
            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[takePtr];
            if (++takePtr == items.length) {
                takePtr = 0;
            }
            --count;
            // take 成功,队列不可能是满的
            // 唤醒在 notFull 条件上排队阻塞的线程
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}


上面是官方文档的一个例子,实现了一个简单的 BlockingQueue ,看懂这里,会发现在同步队列中很多地方都是用的这个逻辑。必要的代码说明都已经在代码中进行注释。


问题疑问

  1. Condition 和 AQS 有什么关系?
  2. Condition 的实现原理是什么?
  3. Condition 的等待队列和 AQS 的同步队列有什么区别和联系?


2

源码分析


基本结构


通过 UML 可以看出,Condition 只是一个抽象类,它的主要实现逻辑是在 AQS 的内部类 ConditionObject 实现的。下面主要从 await 和 signal 两个方法入手,从源码了解 ConditionObject。


创建 Condition

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();


一般使用 lock.newCondition() 创建条件变量。


public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    public Condition newCondition() {
        return sync.newCondition();
    }
    // Sync 集成 AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }
}


这里使用的是 ReentrantLock 的源码,里面调用的 sync.newCondition(),Sync 继承 AQS,其实就是创建了一个 AQS 内部类的 ConditionObject 的实例。


这里需要注意的是 lock
每调用一次 lock.newCondition() 都会有一个新的 ConditionObject 实例生成,就是说一个 lock 可以创建多个 Condition 实例。


Condition 参数

/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;


await 方法

await 方法,会造成当前线程在等待,直到收到信号或被中断。


与此 Condition 相关联的锁被原子释放,并且出于线程调度目的,当前线程被禁用,并且处于休眠状态,直到发生以下四种情况之一:


  1. 其他一些线程调用此 Condition 的 signal 方法,而当前线程恰好被选择为要唤醒的线程;
  2. 其他一些线程调用此 Condition 的 signalAll 方法;
  3. 其他一些线程中断当前线程,并支持中断线程挂起;
  4. 发生虚假唤醒。


在所有情况下,在此方法可以返回之前,当前线程必须重新获取与此条件关联的锁。当线程返回时,可以保证保持此锁。


现在来看 AQS 内部的实现逻辑:


public final void await() throws InterruptedException {
    // 响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加到条件队列尾部(等待队列)
    // 内部会创建 Node.CONDITION 类型的 Node
    Node node = addConditionWaiter();
    // 释放当前线程获取的锁(通过操作 state 的值)
    // 释放了锁就会被阻塞挂起
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 节点已经不在同步队列中,则调用 park 让其在等待队列中挂着
    while (!isOnSyncQueue(node)) {
        // 调用 park 阻塞挂起当前线程
        LockSupport.park(this);
        // 说明 signal 被调用了或者线程被中断,校验下唤醒原因
        // 如果因为终端被唤醒,则跳出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // while 循环结束, 线程开始抢锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 统一处理中断的
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}


await 方法步骤如下:


  1. 创建 Node.CONDITION 类型的 Node 并添加到条件队列(ConditionQueue)的尾部;
  2. 释放当前线程获取的锁(通过操作 state 的值)
  3. 判断当前线程是否在同步队列(SyncQueue)中,不在的话会使用 park 挂起。
  4. 循环结束之后,说明已经已经在同步队列(SyncQueue)中了,后面等待获取到锁,继续执行即可。


在这里一定要把条件队列和同步队列进行区分清楚!!


条件队列/等待队列:即 Condition 的队列 同步队列:AQS 的队列。


下面对 await 里面重要方法进行阅读:


- addConditionWaiter() 方法


private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 判断尾节点状态,如果被取消,则清除所有被取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建新节点,类型为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 将新节点放到等待队列尾部
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}


addConditionWaiter 方法可以看出,只是创建一个类型为 Node.CONDITION 的节点并放到条件队列尾部。同时通过这段代码还可以得出其他结论:


  1. 条件队列内部的 Node,只用到了 thread、waitStatus、nextWaiter 属性;
  2. 条件队列是单向队列。


作为对比,这里把条件队列和同步队列做出对比:



AQS 同步队列如下:



再来看下 Condition 的条件队列



waitStatus 在 AQS 中已经进行了介绍:


默认状态为 0;

waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;

waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);

waitStatus = -2 CONDITION -2 :该节点目前在条件队列;

waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。


- fullyRelease 方法 (AQS)


final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取当前节点的 state
        int savedState = getState();
        // 释放锁
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}


fullyRelease 方法是由 AQS 提供的,首先获取当前的 state,然后调用 release 方法进行释放锁。


public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}


release 方法在 AQS 中做了详细的介绍。它的主要作用就是释放锁,并且需要注意的是:


  1. fullyRelease 会一次性释放所有的锁,所以说不管重入多少次,在这里都会全部释放的。
  2. 这里会抛出异常,主要是在释放锁失败时,这时就会在 finally 里面将节点状态置为 Node.CANCELLED。

-isOnSyncQueue(node)


通过上面的流程,节点已经放到了
条件队列并且释放了持有的,而后就会挂起阻塞,直到 signal 唤醒。但是在挂起时要保证节点已经不在同步队列(SyncQueue)中了才可以挂起。


final boolean isOnSyncQueue(Node node) {
    // 当前节点是条件队列节点,或者上一个节点是空
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;

    return findNodeFromTail(node);
}
// 从尾部开始遍历
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}


如果一个节点(总是一个最初放置在条件队列中的节点)现在正等待在同步队列上重新获取,则返回true。

这段代码的主要作用判断节点是不是在同步队列中,如果不在同步队列中,后面才会调用 park 进行阻塞当前线程。这里就会有一个疑问:AQS 的同步队列和 Condition 的条件队列应该是无关的,这里为什么会要保证节点不在同步队列之后才可以进行阻塞?因为 signal 或者 signalAll 唤醒节点之后,节点就会被放到同步队列中。


线程到这里已经被阻塞了,当有其他线程调用 signal 或者 signalAll 时,会唤醒当前线程。


而后会验证是否因中断唤醒当前线程,这里假设没有发生中断。那 while 循环的 isOnSyncQueue(Node node) 必然会返回 true ,表示当前节点已经在同步队列中了。


后续会调用
acquireQueued(node, savedState) 进行获取锁。


final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到资源
    boolean failed = true;
    try {
        // 中断状态
        boolean interrupted = false;
        // 无限循环
        for (;;) {
            // 当前节点之前的节点
            final Node p = node.predecessor();
            // 前一个节点是头节点, 说明当前节点是 头节点的 next 即真实的第一个数据节点 (因为 head 是虚拟节点)
            // 然后再尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 获取成功之后 将头指针指向当前节点
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // p 不是头节点, 或者 头节点未能获取到资源 (非公平情况下被别的节点抢占) 
            // 判断 node 是否要被阻塞,获取不到锁就会一直阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


这里就是 AQS 的逻辑了,同样可以阅读 AQS 的相关介绍。


不断获取本节点的上一个节点是否为 head,因为 head 是虚拟节点,如果当前节点的上一个节点是 head 节点,则当前节点为 第一个数据节点>

第一个数据节点不断的去获取资源,获取成功,则将 head 指向当前节点;

当前节点不是头节点,或者 tryAcquire(arg) 失败(失败可能是非公平锁)。这时候需要判断前一个节点状态决定当前节点是否要被阻塞(前一个节点状态是否为 SIGNAL)。


值得注意的是,当节点放到 AQS 的同步队列时,也是进行争抢资源,同时设置 savedState 的值,这个值则是代表当初释放锁的时候释放了多少重入次数。


总体流程画图如下:


signal

public final void signal() {
    // 是否为当前持有线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // firstWaiter 头节点指向条件队列头的下一个节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 将原来的头节点和同步队列断开
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
 
    // 判断节点是否已经在之前被取消了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 调用 enq 添加到 同步队列的尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    // node 的上一个节点 修改为 SIGNAL 这样后续就可以唤醒自己了
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}


enq 同样可以阅读 AQS 的代码


private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾节点为空 需要初始化头节点,此时头尾节点是一个
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不为空 循环赋值
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}


通过 enq 方法将节点放到 AQS 的同步队列之后,要将 node 的前一个节点的 waitStatus 设置为 Node.SIGNAL。signalAll 的代码也是类似。


3

总结


Q&A

Q: Condition 和 AQS 有什么关系?

A: Condition 是基于 AQS 实现的,Condition 的实现类 ConditionObject 是 AQS 的一个内部类,在里面共用了一部分 AQS 的逻辑。

Q: Condition 的实现原理是什么?

A: Condition 内部维护一个条件队列,在获取锁的情况下,线程调用 await,线程会被放置在条件队列中并被阻塞。直到调用 signal、signalAll 唤醒线程,此后线程唤醒,会放入到 AQS 的同步队列,参与争抢锁资源。

Q: Condition 的等待队列和 AQS 的同步队列有什么区别和联系?

A: Condition 的等待队列是单向链表,AQS 的是双向链表。二者之间并没有什么明确的联系。仅仅在节点从阻塞状态被唤醒后,会从等待队列挪到同步队列中。


结束语

本文主要是阅读 Condition 的相关代码,不过省略了线程中断等逻辑。有兴趣的小伙伴。可以更深入的研究相关的源码。


- <End /> -


作者:刘志航,一个宅宅的北漂程序员。


公众号:liuzhihangs,记录工作学习中的技术、开发及源码笔记;时不时分享一些生活中的见闻感悟。欢迎大佬来指导!

相关推荐

快递查询教程,批量查询物流,一键管理快递

作为商家,每天需要查询许许多多的快递单号,面对不同的快递公司,有没有简单一点的物流查询方法呢?小编的回答当然是有的,下面随小编一起来试试这个新技巧。需要哪些工具?安装一个快递批量查询高手快递单号怎么快...

一键自动查询所有快递的物流信息 支持圆通、韵达等多家快递

对于各位商家来说拥有一个好的快递软件,能够有效的提高自己的工作效率,在管理快递单号的时候都需要对单号进行表格整理,那怎么样能够快速的查询所有单号信息,并自动生成表格呢?1、其实方法很简单,我们不需要一...

快递查询单号查询,怎么查物流到哪了

输入单号怎么查快递到哪里去了呢?今天小编给大家分享一个新的技巧,它支持多家快递,一次能查询多个单号物流,还可对查询到的物流进行分析、筛选以及导出,下面一起来试试。需要哪些工具?安装一个快递批量查询高手...

3分钟查询物流,教你一键批量查询全部物流信息

很多朋友在问,如何在短时间内把单号的物流信息查询出来,查询完成后筛选已签收件、筛选未签收件,今天小编就分享一款物流查询神器,感兴趣的朋友接着往下看。第一步,运行【快递批量查询高手】在主界面中点击【添...

快递单号查询,一次性查询全部物流信息

现在各种快递的查询方式,各有各的好,各有各的劣,总的来说,还是有比较方便的。今天小编就给大家分享一个新的技巧,支持多家快递,一次能查询多个单号的物流,还能对查询到的物流进行分析、筛选以及导出,下面一起...

快递查询工具,批量查询多个快递快递单号的物流状态、签收时间

最近有朋友在问,怎么快速查询单号的物流信息呢?除了官网,还有没有更简单的方法呢?小编的回答当然是有的,下面一起来看看。需要哪些工具?安装一个快递批量查询高手多个京东的快递单号怎么快速查询?进入快递批量...

快递查询软件,自动识别查询快递单号查询方法

当你拥有多个快递单号的时候,该如何快速查询物流信息?比如单号没有快递公司时,又该如何自动识别再去查询呢?不知道如何操作的宝贝们,下面随小编一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号若干...

教你怎样查询快递查询单号并保存物流信息

商家发货,快递揽收后,一般会直接手动复制到官网上一个个查询物流,那么久而久之,就会觉得查询变得特别繁琐,今天小编给大家分享一个新的技巧,下面一起来试试。教程之前,我们来预览一下用快递批量查询高手...

简单几步骤查询所有快递物流信息

在高峰期订单量大的时候,可能需要一双手当十双手去查询快递物流,但是由于逐一去查询,效率极低,追踪困难。那么今天小编给大家分享一个新的技巧,一次能查询多个快递单号的物流,下面一起来学习一下,希望能给大家...

物流单号查询,如何查询快递信息,按最后更新时间搜索需要的单号

最近有很多朋友在问,如何通过快递单号查询物流信息,并按最后更新时间搜索出需要的单号呢?下面随小编一起来试试吧。需要哪些工具?安装一个快递批量查询高手快递单号若干怎么快速查询?运行【快递批量查询高手】...

连续保存新单号功能解析,导入单号查询并自动识别批量查快递信息

快递查询已经成为我们日常生活中不可或缺的一部分。然而,面对海量的快递单号,如何高效、准确地查询每一个快递的物流信息,成为了许多人头疼的问题。幸运的是,随着科技的进步,一款名为“快递批量查询高手”的软件...

快递查询教程,快递单号查询,筛选更新量为1的单号

最近有很多朋友在问,怎么快速查询快递单号的物流,并筛选出更新量为1的单号呢?今天小编给大家分享一个新方法,一起来试试吧。需要哪些工具?安装一个快递批量查询高手多个快递单号怎么快速查询?运行【快递批量查...

掌握批量查询快递动态的技巧,一键查找无信息记录的两种方法解析

在快节奏的商业环境中,高效的物流查询是确保业务顺畅运行的关键。作为快递查询达人,我深知时间的宝贵,因此,今天我将向大家介绍一款强大的工具——快递批量查询高手软件。这款软件能够帮助你批量查询快递动态,一...

从复杂到简单的单号查询,一键清除单号中的符号并批量查快递信息

在繁忙的商务与日常生活中,快递查询已成为不可或缺的一环。然而,面对海量的单号,逐一查询不仅耗时费力,还容易出错。现在,有了快递批量查询高手软件,一切变得简单明了。只需一键,即可搞定单号查询,一键处理单...

物流单号查询,在哪里查询快递

如果在快递单号多的情况,你还在一个个复制粘贴到官网上手动查询,是一件非常麻烦的事情。于是乎今天小编给大家分享一个新的技巧,下面一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号怎么快速查询?...

取消回复欢迎 发表评论: