百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

阻塞队列—ArrayBlockingQueue源码分析

liebian365 2024-11-12 13:10 32 浏览 0 评论

前言

ArrayBlockingQueue 由数组支持的有界阻塞队列,队列基于数组实现,容量大小在创建 ArrayBlockingQueue 对象时已经定义好。 此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁,默认采用非公平锁。其数据结构如下图:

注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程和请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁

队列创建

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);

应用场景

在线程池中有比较多的应用,生产者消费者场景。

  • 先进先出队列(队列头的是最先进队的元素;队列尾的是最后进队的元素)
  • 有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)
  • 队列不支持空元素
  • 公平性 (fairness)可以在构造函数中指定。

此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过在构造函数将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。

工作原理

ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。

基于 ReentrantLock 保证线程安全,根据 Condition 实现队列满时的阻塞。

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细地对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。

notEmpty表示“锁的非空条件”。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒“之前通过notEmpty.await()进入等待状态的线程”。 同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。

试图向已满队列中放入元素会导致放入操作受阻塞,直到BlockingQueue里有新的唤空间才会被醒继续操作; 试图从空队列中检索元素将导致类似阻塞,直到BlocingkQueue进了新货才会被唤醒。

源码分析

以下源码分析基于JDK1.8

定义

ArrayBlockingQueue的类继承关系如下:

其包含的方法定义如下:

成员属性

 /** 真正存入数据的数组 */
    final Object[] items;

    /** take,poll,peek or remove 的下一个索引 */
    int takeIndex;

    /** put,offer,or add 下一个索引 */
    int putIndex;

    /** 队列中元素个数 */
    int count;

    /** 可重入锁 */
    final ReentrantLock lock;

    /** 如果数组是空的,在该Condition上等待 */
    private final Condition notEmpty;

    /** 如果数组是满的,在该Condition上等待 */
    private final Condition notFull;

    /** 遍历器实现 */
    transient Itrs itrs = null;

构造函数

 /**
     * 构造函数,设置队列的初始容量
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /**
     * 构造函数,
     * capacity and the specified access policy.
     *
     * @param capacity 设置数组大小
     * @param fair  设置是否为公平锁
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        // 是否为公平锁,如果是的话,那么先到的线程先获得锁对象
        // 否则,由操作系统调度由哪个线程获得锁,一般为false,性能会比较高
        lock = new ReentrantLock(fair); 
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**
     * 构造函数,带有初始内容的队列
     */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        //加锁的目的是为了其他CPU能够立即看到修改
        //加锁和解锁底层都是CAS,会强制修改写回主存,对其他CPU可见
        lock.lock(); // 要给数组设置内容,先上锁
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e; // 依次拷贝内容
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i; // 如果 putIndex大于数组大小,那么从0重写开始
        } finally {
            lock.unlock(); // 最后一定要释放锁
        }
    }

入队方法

add / offer / put,这三个方法都是往队列中添加元素,说明如下:

  • add方法依赖于offer方法,如果队列满了则抛出异常,否则添加成功返回true;
  • offer方法有两个重载版本,只有一个参数的版本,如果队列满了就返回false,否则加入到队列中,返回true,add方法就是调用此版本的offer方法;另一个带时间参数的版本,如果队列满了则等待,可指定等待的时间,如果这期间中断了则抛出异常,如果等待超时了则返回false,否则加入到队列中返回true;
  • put方法跟带时间参数的offer方法逻辑一样,不过没有等待的时间限制,会一直等待直到队列有空余位置了,再插入到队列中,返回true
 /**
     * 添加一个元素,其实super.add里面调用了offer方法
     */
    public boolean add(E e) {
        return super.add(e);
    }

    /**
     * 加入成功返回 true,否则返回 false
     */
    public boolean offer(E e) {
     // 创建插入的元素是否为null,是的话抛出NullPointerException异常
        checkNotNull(e);
        // 获取“该阻塞队列的独占锁”
        final ReentrantLock lock = this.lock;
        lock.lock(); // 上锁
        try {
         // 如果队列已满,则返回false。
            if (count == items.length) // 超过数组的容量
                return false;
            else {
             // 如果队列未满,则插入e,并返回true。
                enqueue(e); 
                return true;
            }
        } finally {
         // 释放锁
            lock.unlock();
        }
    }

    /**
     * 如果队列已满的话,就会等待
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //和lock方法的区别是让它在阻塞时可以抛出异常跳出
        try {
            while (count == items.length)
                notFull.await(); // 这里就是阻塞了,要注意:如果运行到这里,那么它会释放上面的锁,一直等到 notify
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    /**
     * 带有超时事件的插入方法,unit 表示是按秒、分、时哪一种
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos); // 带有超时等待的阻塞方法
            }
            enqueue(e); // 入队
            return true;
        } finally {
            lock.unlock();
        }
    }

出队方法

poll / take / peek,这几个方法都是获取队列顶的元素,具体说明如下:

  • poll方法有两个重载版本,第一个版本,如果队列是空的,返回null,否则移除并返回队列头部元素;另一个带时间参数的版本,如果栈为空则等待,可以指定等待的时间,如果等待超时了则返回null,如果被中断了则抛出异常,否则移除并返回栈顶元素
  • take方法同带时间参数的poll方法,但是不能指定等待时间,会一直等待直到队列中有元素为止,然后移除并返回栈顶元素
  • peek方法只是返回队列头部元素,不移除
 // 实现的方法,如果当前队列为空,返回null
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
 // 实现的方法,如果当前队列为空,一直阻塞
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); // 队列为空,阻塞方法
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
 // 带有超时事件的取元素方法,否则返回null
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos); // 超时等待
            }
            return dequeue(); // 取得元素
        } finally {
            lock.unlock();
        }
    }
 
    // 只是看一个队列最前面的元素,取出是不擅长队列中原来的元素,队列为空时返回null
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // 队列为空时返回null
        } finally {
            lock.unlock();
        }
    }

删除元素方法

remove / clear /drainT,这三个方法用于从队列中移除元素,具体说明如下:

  • remove方法用于移除某个元素,如果栈为空或者没有找到该元素则返回false,否则从栈中移除该元素;移除时,如果该元素位于栈顶则直接移除,如果位于栈中间,则需要将该元素后面的其他元素往前面挪动,移除后需要唤醒因为栈满了而阻塞的线程
  • clear方法用于整个栈,同时将takeIndex置为putIndex,保证栈中的元素先进先出;最后会唤醒最多count个线程,因为正常一个线程插入一个元素,如果唤醒超过count个线程,可能导致部分线程因为栈满了又再次被阻塞
  • drainTo方法有两个重载版本,一个是不带个数,将所有的元素都移除并拷贝到指定的集合中;一个带个数,将指定个数的元素移除并拷贝到指定的集合中,两者的底层实现都是同一个方法。移除后需要重置takeIndex和count,并唤醒最多移除个数的因为栈满而阻塞的线程。
 /**
     * 从队列中删除一个元素的方法。删除成功返回true,否则返回false
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                //从takeIndex开始往后遍历直到等于putIndex
                do {
                    if (o.equals(items[i])) {
                        removeAt(i); // 真正删除的方法
                        return true;
                    }
                    //走到数组末尾了又从头开始,put时也按照这个规则来
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex); // 一直不断的循环取出来做判断
            }
            //如果数组为空,返回false
            return false;
        } finally {
            lock.unlock();
        }
    }

 /**
     * 指定删除索引上的元素.
     */
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            //如果移除的就是栈顶的元素
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            //元素个数减1
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // 如果移除的是栈中间的某个元素,需要将该元素后面的元素往前挪动
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                //到数组末尾了,从头开始
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                 //将后面一个元素复制到前面来
                    items[i] = items[next];
                    i = next;
                } else {
                 //如果下一个元素的索引等于putIndex,说明i就是栈中最后一个元素了,直接将该元素置为null
                    items[i] = null;
                    //重置putIndex为i
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
             //通知itrs节点移除了
                itrs.removedAt(removeIndex);
        }
        //唤醒因为栈满了而等待的线程
        notFull.signal(); // 有一个元素删除成功,那肯定队列不满
    }

 /**
     * 清空队列
     */
    public void clear() {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int k = count;
            if (k > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                //从takeIndex开始遍历直到i等于putIndex,将数组元素置为null
                do {
                    items[i] = null;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
                //注意此处没有将这两个index置为0,只是让他们相等,因为只要相等就可以实现栈先进先出了
                takeIndex = putIndex;
                count = 0;
                if (itrs != null)
                    itrs.queueIsEmpty();
                //如果有因为栈满了而等待的线程,则将其唤醒
                //注意这里没有使用signalAll而是通过for循环来signal多次,单纯从唤醒线程来看是可以使用signalAll的,效果跟这里的for循环是一样的
                //如果有等待的线程,说明count就是当前线程的最大容量了,这里清空了,最多只能put count次,一个线程只能put 1次,只唤醒最多count个线程就避免了
                //线程被唤醒后再次因为栈满了而阻塞
                for (; k > 0 && lock.hasWaiters(notFull); k--)
                    notFull.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 取出所有元素到集合
     */
    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }

    /**
     * 取出所有元素到集合
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        //校验参数合法
        checkNotNull(c);
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
         //取两者之间的最小值
            int n = Math.min(maxElements, count);
            int take = takeIndex;
            int i = 0;
            try {
             //从takeIndex开始遍历,取出元素然后添加到c中,直到满足个数要求为止
                while (i < n) {
                    @SuppressWarnings("unchecked")
                    E x = (E) items[take];
                    c.add(x);
                    items[take] = null;
                    if (++take == items.length)
                        take = 0;
                    i++;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                 //取完了,修改count减去i
                    count -= i;
                    takeIndex = take;
                    if (itrs != null) {
                        if (count == 0)
                         //通知itrs 栈空了
                            itrs.queueIsEmpty();
                        else if (i > take)
                         //说明take中间变成0了,通知itrs
                            itrs.takeIndexWrapped();
                    }
                    //唤醒在因为栈满而等待的线程,最多唤醒i个,同上避免线程被唤醒了因为栈又满了而阻塞
                    for (; i > 0 && lock.hasWaiters(notFull); i--)
                        notFull.signal();
                }
            }
        } finally {
            lock.unlock();
        }
    }

iterator / Itr / Itrs

Itr和Itrs都是ArrayBlockingQueue的两个内部类,如下:

iterator方法返回一个迭代器实例,用于实现for循环遍历和部分Collection接口,该方法的实现如下:

public Iterator<E> iterator() {
 return new Itr();
}

Itr() {
 // assert lock.getHoldCount() == 0;
 lastRet = NONE;
 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
     if (count == 0) {
         //NONE和DETACHED都是常量
            cursor = NONE;
            nextIndex = NONE;
            prevTakeIndex = DETACHED;
        } else {
         //初始化各属性
            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
            prevTakeIndex = takeIndex;
            nextItem = itemAt(nextIndex = takeIndex);
            cursor = incCursor(takeIndex);
            if (itrs == null) {
             itrs = new Itrs(this);
            } else {
             //初始化Itrs,将当前线程注册到Itrs
                itrs.register(this); // in this order
                itrs.doSomeSweeping(false);
            }
            prevCycles = itrs.cycles;
            // assert takeIndex >= 0;
            // assert prevTakeIndex == takeIndex;
            // assert nextIndex >= 0;
            // assert nextItem != null;
     }
 } finally {
    lock.unlock();
    }
}

Itrs(Itr initial) {
 register(initial);
}

//根据index计算cursor
private int incCursor(int index) {
 // assert lock.getHoldCount() == 1;
 if (++index == items.length)
  index = 0;
 if (index == putIndex)
  index = NONE;
 return index;
}

/**
* 创建一个新的Itr实例时,会调用此方法将该实例添加到Node链表中
*/
void register(Itr itr) {
 //创建一个新节点将其插入到head节点的前面
 head = new Node(itr, head);
}

小结

ArrayBlockingQueue是一个阻塞队列,内部由ReentrantLock来实现线程安全,由Condition的await和signal来实现等待唤醒的功能。它的数据结构是数组,准确地说是一个循环数组(可以类比一个圆环),所有的下标在到达最大长度时自动从0继续开始。


PS:以上代码提交在 Github

https://github.com/Niuh-Study/niuh-juc-final.git


文章持续更新,可以公众号搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

相关推荐

“版本末期”了?下周平衡补丁!国服最强5套牌!上分首选

明天,酒馆战棋就将迎来大更新,也聊了很多天战棋相关的内容了,趁此机会,给兄弟们穿插一篇构筑模式的卡组推荐!老规矩,我们先来看10职业胜率。目前10职业胜率排名与一周前基本类似,没有太多的变化。平衡补丁...

VS2017 C++ 程序报错“error C2065:“M_PI”: 未声明的标识符&quot;

首先,程序中头文件的选择,要选择头文件,在文件中是没有对M_PI的定义的。选择:项目——>”XXX属性"——>配置属性——>C/C++——>预处理器——>预处理器定义,...

东营交警实名曝光一批酒驾人员名单 88人受处罚

齐鲁网·闪电新闻5月24日讯酒后驾驶是对自己和他人生命安全极不负责的行为,为守护大家的平安出行路,东营交警一直将酒驾作为重点打击对象。5月23日,东营交警公布最新一批饮酒、醉酒名单。对以下驾驶人醉酒...

Qt界面——搭配QCustomPlot(qt platform)

这是我第一个使用QCustomPlot控件的上位机,通过串口精确的5ms发送一次数据,再将读取的数据绘制到图表中。界面方面,尝试卡片式设计,外加QSS简单的配了个色。QCustomPlot官网:Qt...

大话西游2分享赢取种族坐骑手办!PK趣闻录由你书写

老友相聚,仗剑江湖!《大话西游2》2021全民PK季4月激燃打响,各PK玩法鏖战齐开,零门槛参与热情高涨。PK季期间,不仅各种玩法奖励丰厚,参与PK趣闻录活动,投稿自己在PK季遇到的趣事,还有机会带走...

测试谷歌VS Code AI 编程插件 Gemini Code Assist

用ClaudeSonnet3.7的天气测试编码,让谷歌VSCodeAI编程插件GeminiCodeAssist自动编程。生成的文件在浏览器中的效果如下:(附源代码)VSCode...

顾爷想知道第4.5期 国服便利性到底需优化啥?

前段时间DNF国服推出了名为“阿拉德B计划”的系列改版计划,截至目前我们已经看到了两项实装。不过关于便利性上,国服似乎还有很多路要走。自从顾爷回归DNF以来,几乎每天都在跟我抱怨关于DNF里面各种各样...

掌握Visual Studio项目配置【基础篇】

1.前言VisualStudio是Windows上最常用的C++集成开发环境之一,简称VS。VS功能十分强大,对应的,其配置系统较为复杂。不管是对于初学者还是有一定开发经验的开发者来说,捋清楚VS...

还嫌LED驱动设计套路深?那就来看看这篇文章吧

随着LED在各个领域的不同应用需求,LED驱动电路也在不断进步和发展。本文从LED的特性入手,推导出适合LED的电源驱动类型,再进一步介绍各类LED驱动设计。设计必读:LED四个关键特性特性一:非线...

Visual Studio Community 2022(VS2022)安装图文方法

直接上步骤:1,首先可以下载安装一个VisualStudio安装器,叫做VisualStudioinstaller。这个安装文件很小,很快就安装完成了。2,打开VisualStudioins...

Qt添加MSVC构建套件的方法(qt添加c++11)

前言有些时候,在Windows下因为某些需求需要使用MSVC编译器对程序进行编译,假设我们安装Qt的时候又只是安装了MingW构建套件,那么此时我们该如何给现有的Qt添加一个MSVC构建套件呢?本文以...

Qt为什么站稳c++GUI的top1(qt c)

为什么现在QT越来越成为c++界面编程的第一选择,从事QT编程多年,在这之前做C++界面都是基于MFC。当时为什么会从MFC转到QT?主要原因是MFC开发界面想做得好看一些十分困难,引用第三方基于MF...

qt开发IDE应该选择VS还是qt creator

如果一个公司选择了qt来开发自己的产品,在面临IDE的选择时会出现vs或者qtcreator,选择qt的IDE需要结合产品需求、部署平台、项目定位、程序猿本身和公司战略,因为大的软件产品需要明确IDE...

Qt 5.14.2超详细安装教程,不会来打我

Qt简介Qt(官方发音[kju:t],音同cute)是一个跨平台的C++开库,主要用来开发图形用户界面(GraphicalUserInterface,GUI)程序。Qt是纯C++开...

Cygwin配置与使用(四)——VI字体和颜色的配置

简介:VI的操作模式,基本上VI可以分为三种状态,分别是命令模式(commandmode)、插入模式(Insertmode)和底行模式(lastlinemode),各模式的功能区分如下:1)...

取消回复欢迎 发表评论: