百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

阻塞队列—ArrayBlockingQueue源码分析

liebian365 2024-11-12 13:10 9 浏览 0 评论

前言

ArrayBlockingQueue 由数组支持的有界阻塞队列,队列基于数组实现,容量大小在创建 ArrayBlockingQueue 对象时已经定义好。 此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁,默认采用非公平锁。其数据结构如下图:

注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程和请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁

队列创建

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);

应用场景

在线程池中有比较多的应用,生产者消费者场景。

  • 先进先出队列(队列头的是最先进队的元素;队列尾的是最后进队的元素)
  • 有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)
  • 队列不支持空元素
  • 公平性 (fairness)可以在构造函数中指定。

此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过在构造函数将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。

工作原理

ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。

基于 ReentrantLock 保证线程安全,根据 Condition 实现队列满时的阻塞。

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细地对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。

notEmpty表示“锁的非空条件”。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒“之前通过notEmpty.await()进入等待状态的线程”。 同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。

试图向已满队列中放入元素会导致放入操作受阻塞,直到BlockingQueue里有新的唤空间才会被醒继续操作; 试图从空队列中检索元素将导致类似阻塞,直到BlocingkQueue进了新货才会被唤醒。

源码分析

以下源码分析基于JDK1.8

定义

ArrayBlockingQueue的类继承关系如下:

其包含的方法定义如下:

成员属性

 /** 真正存入数据的数组 */
    final Object[] items;

    /** take,poll,peek or remove 的下一个索引 */
    int takeIndex;

    /** put,offer,or add 下一个索引 */
    int putIndex;

    /** 队列中元素个数 */
    int count;

    /** 可重入锁 */
    final ReentrantLock lock;

    /** 如果数组是空的,在该Condition上等待 */
    private final Condition notEmpty;

    /** 如果数组是满的,在该Condition上等待 */
    private final Condition notFull;

    /** 遍历器实现 */
    transient Itrs itrs = null;

构造函数

 /**
     * 构造函数,设置队列的初始容量
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /**
     * 构造函数,
     * capacity and the specified access policy.
     *
     * @param capacity 设置数组大小
     * @param fair  设置是否为公平锁
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        // 是否为公平锁,如果是的话,那么先到的线程先获得锁对象
        // 否则,由操作系统调度由哪个线程获得锁,一般为false,性能会比较高
        lock = new ReentrantLock(fair); 
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**
     * 构造函数,带有初始内容的队列
     */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        //加锁的目的是为了其他CPU能够立即看到修改
        //加锁和解锁底层都是CAS,会强制修改写回主存,对其他CPU可见
        lock.lock(); // 要给数组设置内容,先上锁
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e; // 依次拷贝内容
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i; // 如果 putIndex大于数组大小,那么从0重写开始
        } finally {
            lock.unlock(); // 最后一定要释放锁
        }
    }

入队方法

add / offer / put,这三个方法都是往队列中添加元素,说明如下:

  • add方法依赖于offer方法,如果队列满了则抛出异常,否则添加成功返回true;
  • offer方法有两个重载版本,只有一个参数的版本,如果队列满了就返回false,否则加入到队列中,返回true,add方法就是调用此版本的offer方法;另一个带时间参数的版本,如果队列满了则等待,可指定等待的时间,如果这期间中断了则抛出异常,如果等待超时了则返回false,否则加入到队列中返回true;
  • put方法跟带时间参数的offer方法逻辑一样,不过没有等待的时间限制,会一直等待直到队列有空余位置了,再插入到队列中,返回true
 /**
     * 添加一个元素,其实super.add里面调用了offer方法
     */
    public boolean add(E e) {
        return super.add(e);
    }

    /**
     * 加入成功返回 true,否则返回 false
     */
    public boolean offer(E e) {
     // 创建插入的元素是否为null,是的话抛出NullPointerException异常
        checkNotNull(e);
        // 获取“该阻塞队列的独占锁”
        final ReentrantLock lock = this.lock;
        lock.lock(); // 上锁
        try {
         // 如果队列已满,则返回false。
            if (count == items.length) // 超过数组的容量
                return false;
            else {
             // 如果队列未满,则插入e,并返回true。
                enqueue(e); 
                return true;
            }
        } finally {
         // 释放锁
            lock.unlock();
        }
    }

    /**
     * 如果队列已满的话,就会等待
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //和lock方法的区别是让它在阻塞时可以抛出异常跳出
        try {
            while (count == items.length)
                notFull.await(); // 这里就是阻塞了,要注意:如果运行到这里,那么它会释放上面的锁,一直等到 notify
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    /**
     * 带有超时事件的插入方法,unit 表示是按秒、分、时哪一种
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos); // 带有超时等待的阻塞方法
            }
            enqueue(e); // 入队
            return true;
        } finally {
            lock.unlock();
        }
    }

出队方法

poll / take / peek,这几个方法都是获取队列顶的元素,具体说明如下:

  • poll方法有两个重载版本,第一个版本,如果队列是空的,返回null,否则移除并返回队列头部元素;另一个带时间参数的版本,如果栈为空则等待,可以指定等待的时间,如果等待超时了则返回null,如果被中断了则抛出异常,否则移除并返回栈顶元素
  • take方法同带时间参数的poll方法,但是不能指定等待时间,会一直等待直到队列中有元素为止,然后移除并返回栈顶元素
  • peek方法只是返回队列头部元素,不移除
 // 实现的方法,如果当前队列为空,返回null
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
 // 实现的方法,如果当前队列为空,一直阻塞
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); // 队列为空,阻塞方法
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
 // 带有超时事件的取元素方法,否则返回null
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos); // 超时等待
            }
            return dequeue(); // 取得元素
        } finally {
            lock.unlock();
        }
    }
 
    // 只是看一个队列最前面的元素,取出是不擅长队列中原来的元素,队列为空时返回null
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // 队列为空时返回null
        } finally {
            lock.unlock();
        }
    }

删除元素方法

remove / clear /drainT,这三个方法用于从队列中移除元素,具体说明如下:

  • remove方法用于移除某个元素,如果栈为空或者没有找到该元素则返回false,否则从栈中移除该元素;移除时,如果该元素位于栈顶则直接移除,如果位于栈中间,则需要将该元素后面的其他元素往前面挪动,移除后需要唤醒因为栈满了而阻塞的线程
  • clear方法用于整个栈,同时将takeIndex置为putIndex,保证栈中的元素先进先出;最后会唤醒最多count个线程,因为正常一个线程插入一个元素,如果唤醒超过count个线程,可能导致部分线程因为栈满了又再次被阻塞
  • drainTo方法有两个重载版本,一个是不带个数,将所有的元素都移除并拷贝到指定的集合中;一个带个数,将指定个数的元素移除并拷贝到指定的集合中,两者的底层实现都是同一个方法。移除后需要重置takeIndex和count,并唤醒最多移除个数的因为栈满而阻塞的线程。
 /**
     * 从队列中删除一个元素的方法。删除成功返回true,否则返回false
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                //从takeIndex开始往后遍历直到等于putIndex
                do {
                    if (o.equals(items[i])) {
                        removeAt(i); // 真正删除的方法
                        return true;
                    }
                    //走到数组末尾了又从头开始,put时也按照这个规则来
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex); // 一直不断的循环取出来做判断
            }
            //如果数组为空,返回false
            return false;
        } finally {
            lock.unlock();
        }
    }

 /**
     * 指定删除索引上的元素.
     */
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            //如果移除的就是栈顶的元素
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            //元素个数减1
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // 如果移除的是栈中间的某个元素,需要将该元素后面的元素往前挪动
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                //到数组末尾了,从头开始
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                 //将后面一个元素复制到前面来
                    items[i] = items[next];
                    i = next;
                } else {
                 //如果下一个元素的索引等于putIndex,说明i就是栈中最后一个元素了,直接将该元素置为null
                    items[i] = null;
                    //重置putIndex为i
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
             //通知itrs节点移除了
                itrs.removedAt(removeIndex);
        }
        //唤醒因为栈满了而等待的线程
        notFull.signal(); // 有一个元素删除成功,那肯定队列不满
    }

 /**
     * 清空队列
     */
    public void clear() {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int k = count;
            if (k > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                //从takeIndex开始遍历直到i等于putIndex,将数组元素置为null
                do {
                    items[i] = null;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
                //注意此处没有将这两个index置为0,只是让他们相等,因为只要相等就可以实现栈先进先出了
                takeIndex = putIndex;
                count = 0;
                if (itrs != null)
                    itrs.queueIsEmpty();
                //如果有因为栈满了而等待的线程,则将其唤醒
                //注意这里没有使用signalAll而是通过for循环来signal多次,单纯从唤醒线程来看是可以使用signalAll的,效果跟这里的for循环是一样的
                //如果有等待的线程,说明count就是当前线程的最大容量了,这里清空了,最多只能put count次,一个线程只能put 1次,只唤醒最多count个线程就避免了
                //线程被唤醒后再次因为栈满了而阻塞
                for (; k > 0 && lock.hasWaiters(notFull); k--)
                    notFull.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 取出所有元素到集合
     */
    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }

    /**
     * 取出所有元素到集合
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        //校验参数合法
        checkNotNull(c);
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
         //取两者之间的最小值
            int n = Math.min(maxElements, count);
            int take = takeIndex;
            int i = 0;
            try {
             //从takeIndex开始遍历,取出元素然后添加到c中,直到满足个数要求为止
                while (i < n) {
                    @SuppressWarnings("unchecked")
                    E x = (E) items[take];
                    c.add(x);
                    items[take] = null;
                    if (++take == items.length)
                        take = 0;
                    i++;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                 //取完了,修改count减去i
                    count -= i;
                    takeIndex = take;
                    if (itrs != null) {
                        if (count == 0)
                         //通知itrs 栈空了
                            itrs.queueIsEmpty();
                        else if (i > take)
                         //说明take中间变成0了,通知itrs
                            itrs.takeIndexWrapped();
                    }
                    //唤醒在因为栈满而等待的线程,最多唤醒i个,同上避免线程被唤醒了因为栈又满了而阻塞
                    for (; i > 0 && lock.hasWaiters(notFull); i--)
                        notFull.signal();
                }
            }
        } finally {
            lock.unlock();
        }
    }

iterator / Itr / Itrs

Itr和Itrs都是ArrayBlockingQueue的两个内部类,如下:

iterator方法返回一个迭代器实例,用于实现for循环遍历和部分Collection接口,该方法的实现如下:

public Iterator<E> iterator() {
 return new Itr();
}

Itr() {
 // assert lock.getHoldCount() == 0;
 lastRet = NONE;
 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
     if (count == 0) {
         //NONE和DETACHED都是常量
            cursor = NONE;
            nextIndex = NONE;
            prevTakeIndex = DETACHED;
        } else {
         //初始化各属性
            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
            prevTakeIndex = takeIndex;
            nextItem = itemAt(nextIndex = takeIndex);
            cursor = incCursor(takeIndex);
            if (itrs == null) {
             itrs = new Itrs(this);
            } else {
             //初始化Itrs,将当前线程注册到Itrs
                itrs.register(this); // in this order
                itrs.doSomeSweeping(false);
            }
            prevCycles = itrs.cycles;
            // assert takeIndex >= 0;
            // assert prevTakeIndex == takeIndex;
            // assert nextIndex >= 0;
            // assert nextItem != null;
     }
 } finally {
    lock.unlock();
    }
}

Itrs(Itr initial) {
 register(initial);
}

//根据index计算cursor
private int incCursor(int index) {
 // assert lock.getHoldCount() == 1;
 if (++index == items.length)
  index = 0;
 if (index == putIndex)
  index = NONE;
 return index;
}

/**
* 创建一个新的Itr实例时,会调用此方法将该实例添加到Node链表中
*/
void register(Itr itr) {
 //创建一个新节点将其插入到head节点的前面
 head = new Node(itr, head);
}

小结

ArrayBlockingQueue是一个阻塞队列,内部由ReentrantLock来实现线程安全,由Condition的await和signal来实现等待唤醒的功能。它的数据结构是数组,准确地说是一个循环数组(可以类比一个圆环),所有的下标在到达最大长度时自动从0继续开始。


PS:以上代码提交在 Github

https://github.com/Niuh-Study/niuh-juc-final.git


文章持续更新,可以公众号搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

相关推荐

快递查询教程,批量查询物流,一键管理快递

作为商家,每天需要查询许许多多的快递单号,面对不同的快递公司,有没有简单一点的物流查询方法呢?小编的回答当然是有的,下面随小编一起来试试这个新技巧。需要哪些工具?安装一个快递批量查询高手快递单号怎么快...

一键自动查询所有快递的物流信息 支持圆通、韵达等多家快递

对于各位商家来说拥有一个好的快递软件,能够有效的提高自己的工作效率,在管理快递单号的时候都需要对单号进行表格整理,那怎么样能够快速的查询所有单号信息,并自动生成表格呢?1、其实方法很简单,我们不需要一...

快递查询单号查询,怎么查物流到哪了

输入单号怎么查快递到哪里去了呢?今天小编给大家分享一个新的技巧,它支持多家快递,一次能查询多个单号物流,还可对查询到的物流进行分析、筛选以及导出,下面一起来试试。需要哪些工具?安装一个快递批量查询高手...

3分钟查询物流,教你一键批量查询全部物流信息

很多朋友在问,如何在短时间内把单号的物流信息查询出来,查询完成后筛选已签收件、筛选未签收件,今天小编就分享一款物流查询神器,感兴趣的朋友接着往下看。第一步,运行【快递批量查询高手】在主界面中点击【添...

快递单号查询,一次性查询全部物流信息

现在各种快递的查询方式,各有各的好,各有各的劣,总的来说,还是有比较方便的。今天小编就给大家分享一个新的技巧,支持多家快递,一次能查询多个单号的物流,还能对查询到的物流进行分析、筛选以及导出,下面一起...

快递查询工具,批量查询多个快递快递单号的物流状态、签收时间

最近有朋友在问,怎么快速查询单号的物流信息呢?除了官网,还有没有更简单的方法呢?小编的回答当然是有的,下面一起来看看。需要哪些工具?安装一个快递批量查询高手多个京东的快递单号怎么快速查询?进入快递批量...

快递查询软件,自动识别查询快递单号查询方法

当你拥有多个快递单号的时候,该如何快速查询物流信息?比如单号没有快递公司时,又该如何自动识别再去查询呢?不知道如何操作的宝贝们,下面随小编一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号若干...

教你怎样查询快递查询单号并保存物流信息

商家发货,快递揽收后,一般会直接手动复制到官网上一个个查询物流,那么久而久之,就会觉得查询变得特别繁琐,今天小编给大家分享一个新的技巧,下面一起来试试。教程之前,我们来预览一下用快递批量查询高手...

简单几步骤查询所有快递物流信息

在高峰期订单量大的时候,可能需要一双手当十双手去查询快递物流,但是由于逐一去查询,效率极低,追踪困难。那么今天小编给大家分享一个新的技巧,一次能查询多个快递单号的物流,下面一起来学习一下,希望能给大家...

物流单号查询,如何查询快递信息,按最后更新时间搜索需要的单号

最近有很多朋友在问,如何通过快递单号查询物流信息,并按最后更新时间搜索出需要的单号呢?下面随小编一起来试试吧。需要哪些工具?安装一个快递批量查询高手快递单号若干怎么快速查询?运行【快递批量查询高手】...

连续保存新单号功能解析,导入单号查询并自动识别批量查快递信息

快递查询已经成为我们日常生活中不可或缺的一部分。然而,面对海量的快递单号,如何高效、准确地查询每一个快递的物流信息,成为了许多人头疼的问题。幸运的是,随着科技的进步,一款名为“快递批量查询高手”的软件...

快递查询教程,快递单号查询,筛选更新量为1的单号

最近有很多朋友在问,怎么快速查询快递单号的物流,并筛选出更新量为1的单号呢?今天小编给大家分享一个新方法,一起来试试吧。需要哪些工具?安装一个快递批量查询高手多个快递单号怎么快速查询?运行【快递批量查...

掌握批量查询快递动态的技巧,一键查找无信息记录的两种方法解析

在快节奏的商业环境中,高效的物流查询是确保业务顺畅运行的关键。作为快递查询达人,我深知时间的宝贵,因此,今天我将向大家介绍一款强大的工具——快递批量查询高手软件。这款软件能够帮助你批量查询快递动态,一...

从复杂到简单的单号查询,一键清除单号中的符号并批量查快递信息

在繁忙的商务与日常生活中,快递查询已成为不可或缺的一环。然而,面对海量的单号,逐一查询不仅耗时费力,还容易出错。现在,有了快递批量查询高手软件,一切变得简单明了。只需一键,即可搞定单号查询,一键处理单...

物流单号查询,在哪里查询快递

如果在快递单号多的情况,你还在一个个复制粘贴到官网上手动查询,是一件非常麻烦的事情。于是乎今天小编给大家分享一个新的技巧,下面一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号怎么快速查询?...

取消回复欢迎 发表评论: