百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

JDK源码万字详解——PriorityBlockingQueue,ThreadPoolExecutor

liebian365 2024-11-12 13:11 5 浏览 0 评论

PriorityBlockingQueue

前文「JDK源码详解-PriorityQueue」分析了优先队列 PriorityQueue,它既不是阻塞队列,而且线程不安全。本文分析线程安全的阻塞优先队列 PriorityBlockingQueue。它的继承结构如下:

PriorityBlockingQueue 与 PriorityQueue 的内部结构类似,也是物理上由数组、逻辑上由堆结构实现的,并且使用 ReentrantLock 实现线程安全。除此之外,二者大部分操作都是类似的。


因此,有了前文的铺垫,这里相对更容易理解一些。下面分析其代码实现。


代码分析

主要成员变量

// 内部数组的默认初始化容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;


// 内部数组的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;


// 保存元素的内部数组
private transient Object[] queue;


// 队列中元素的数量
private transient int size;


// 队列中元素的比较器
private transient Comparator<? super E> comparator;


// 互斥锁(保证线程安全)
private final ReentrantLock lock;


// 表示队列非空的条件
private final Condition notEmpty;


// 扩容时使用的自旋锁,通过 CAS 获取(后面分析)
private transient volatile int allocationSpinLock;


// 一个普通的优先队列,主要用于序列化和反序列化
private PriorityQueue<E> q;


构造器

// 构造器 1:使用默认的初始化容量创建一个对象
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}


// 构造器 2:使用给定的容量创建一个对象
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}


// 构造器 3:使用给定的容量和比较器创建一个对象
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

上面几个构造器都是比较简单的赋值。除此之外,还有一个用给定集合初始化的构造器,如下:

public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    // 是否需要堆化
    boolean heapify = true; // true if not known to be in heap order
    // 是否需要筛选空值
    boolean screen = true;  // true if must screen for nulls
    // 给定集合为 SortedSet
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false; // 已经有序,不需要再堆化
    }
    // 给定集合为 PriorityBlockingQueue
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false; // 不需要筛选判空
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false; // 不需要堆化
    }
    // 集合转为数组
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    // 集合内所有元素都不能为空
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)
        heapify(); // 堆化
}

堆化操作 heapify 代码如下:

private void heapify() {
    Object[] array = queue;
    int n = size;
    int half = (n >>> 1) - 1;
    Comparator<? super E> cmp = comparator;
    // 根据比较器(Comparator)是否为空,采用不同的策略
    // PS: 二者操作基本一样,只是 Comparator 和 Comparable 的区别
    if (cmp == null) {
        for (int i = half; i >= 0; i--)
            siftDownComparable(i, (E) array[i], array, n);
    }
    else {
        for (int i = half; i >= 0; i--)
            siftDownUsingComparator(i, (E) array[i], array, n, cmp);
    }
}

siftDownUsingComparator 代码如下:

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                int n,
                                                Comparator<? super T> cmp) {
    if (n > 0) {
        // 数组的中间位置
        int half = n >>> 1;
        while (k < half) {
            // 获取索引为 k 的节点左子节点索引
            int child = (k << 1) + 1;
            // 获取 child 的值
            Object c = array[child];
            // 获取索引为 k 的节点右子节点索引
            int right = child + 1;
            // 比较左右子节点的值,取较小的一个
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            // 给定的元素 x 与其较小的子节点的值比较,若 x 不大于子节点的值,停止交换
            if (cmp.compare(x, (T) c) <= 0)
                break;
            // 将 x 与其较小的子节点互换位置
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}

该方法与 PriorityQueue 中的 siftDownUsingComparator 方法操作几乎完全一致,可参考前文的分析,这里不再赘述(siftDownComparable 方法亦是如此)。


入队方法:add(E), put(E), offer(E, timeout, TimeUnit), offer(E)

public boolean add(E e) {
    return offer(e);
}


public void put(E e) {
    offer(e); // never need to block
}


public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block
}

上述三个方法内部都是通过 offer(e) 方法实现的,因此只需分析 offer(e) 方法即可:

public boolean offer(E e) {
    // 插入元素不能为空
    if (e == null)
        throw new NullPointerException();
    // 获取锁,保证线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 如果容量不够,则进行扩容(注意这里是一个循环)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap); // 尝试扩容
    try {
        Comparator<? super E> cmp = comparator;
        // 根据 Comparator 是否为空采用不同的堆化策略
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        // 有新元素插入了,唤醒 notEmpty 条件下等待的线程(消费者)
        notEmpty.signal();
    } finally {
        // 释放锁
        lock.unlock();
    }
    return true;
}

下面分析一下扩容操作 tryGrow:

private void tryGrow(Object[] array, int oldCap) {
    // 释放锁
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // 尝试以 CAS 方式修改 allocationSpinLock 的值(将 0 改为 1)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 若旧容量 n 较小(小于 64),则扩容为 2 * n + 2,否则扩容为 1.5 * n
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 创建一个新数组
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 将 allocationSpinLock 重置为 0
            allocationSpinLock = 0;
        }
    }
    // newArray 为空表示未进行上述扩容操作,则当前线程让出 CPU 时间
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 尝试获取锁
    lock.lock();
    // 到这里表示扩容成功
    // queue == array 保证老数据复制一次
    if (newArray != null && queue == array) {
        // 扩容后的新数组
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

这个扩容方法比较有意思:它刚开始会释放锁,而后再重新获取锁。


1. 为什么刚开始要释放锁?

由于该锁是全局的,其他大部分公有(public)方法也会用到;而扩容操作又相对比较耗时,若这里不释放,则某个线程扩容时其他方法调用可能会阻塞。


2. 释放锁之后如何保证线程安全?

这就用到了成员变量 allocationSpinLock,使用了 Unsafe 类的 CAS 操作。它尝试将 allocationSpinLock 的值设置为 1,而一旦操作成功,其他线程就无法进入,直到该线程将它重置为 0. 这就保证了同一时间内只能有一个线程在扩容。


3. 在释放锁后的扩容操作中,先后可能会有多个线程扩容,也即会产生多个新容量的空数组(此时它们都未指向原先的数组 queue),如何避免老数据多次复制到新数组呢?

代码里用到了 queue == array 这个判断。

比如线程 T1 和 T2 都对原数组进行了扩容,得到了两个 newArray,在后面复制老数据时,若其中一个线程已经对 queue 重新赋值并复制后,由于 queue 已经改变,后面的线程就不会再复制一次了。


出队方法:poll(), take(), peek()

// 出队
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}
// 出队(队列为空时阻塞)
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}


// 有超时等待的出队
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}

可以看到这几个出队的操作都加了锁,内部都调用了 dequeue 方法:

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 取数据中的第一个元素
        E result = (E) array[0];
        // 获取最后一个元素
        E x = (E) array[n];
        // 将最后一个元素置空,并恢复堆结构
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

该方法与 PriorityQueue 的出队操作 poll() 类似,也不再赘述。


小结

1. PriorityBlockingQueue 是优先队列的阻塞方式实现,它与 PriorityQueue 内部结构类似,即物理结构是可变数组、逻辑结构是堆;


2. PriorityBlockingQueue 内部元素不能为空,且可比较,使用 ReentrantLock 保证线程安全。


ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中线程池的实现类,它的继承结构如下:

本文主要分析 ThreadPoolExecutor 类的主要方法和实现原理(部分代码暂未涉及,后面有机会再行分析),以后再分析 Executor 和 ExecutorService 接口的相关内容。


代码分析

成员变量

该类中的成员变量较多,下面分析一些主要的。

// 该变量是一个原子整型变量,保存了线程池的状态和线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3=29
// 线程的最大容量(即池内允许的最大线程数)
// 00011111 11111111 11111111 11111111,即 29 个 1,超过 5 亿
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 2^29-1


// runState is stored in the high-order bits
// 线程池的运行状态,保存在 ctl 的高位
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

这里用了一个原子整型(AtomicInteger,可以理解为线程安全的 Integer 类,占用 4 个字节,32 位)变量 ctl 来表示线程池的运行状态和线程池内部的线程数量。其中高 3 位表示线程池的运行状态,低 29 位表示线程池中线程的数量。


线程池的状态有以下 5 种:

1. RUNNING: 接受新的任务,并且处理任务队列中的任务;

2. SHUTDOWN: 不接受新的任务,但处理任务队列中的任务;

3. STOP: 不接受新的任务,不处理任务队列中的任务,并且中断正在进行的任务;

4. TIDYING: 所有的任务都已终结,工作线程的数量为 0;

5. TERMINATED: 执行 terminated() 方法后进入该状态,terminated() 方法默认实现为空。

这些状态之间的转换流程及触发条件如图所示:

接下来看其他成员变量:

// 任务队列(阻塞队列)
private final BlockingQueue<Runnable> workQueue;
// 互斥锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 锁对应的条件
private final Condition termination = mainLock.newCondition();
// 线程池创建过的最大线程数量
private int largestPoolSize;
// 已完成任务的数量
private long completedTaskCount;
// 线程工厂类,用于创建线程
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲线程的存活时间
private volatile long keepAliveTime;
/*
 * 核心线程是否允许超时
 * 默认为 false,表示核心线程即使处于空闲状态也继续存活;
 *   若为 true,核心线程同样受到 keepAliveTime 的超时约束
 */
private volatile boolean allowCoreThreadTimeOut;
// 核心池大小
private volatile int corePoolSize;
// 最大池大小
private volatile int maximumPoolSize;
// 默认拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

这里有几个重要的成员变量:

corePoolSize: 核心池大小;

maximumPoolSize: 最大池大小,线程池中能同时存在的最大线程数,大于等于 corePoolSize;

workQueue: 工作/任务队列,是一个阻塞队列,可参考前文「JDK源码分析-BlockingQueue」的分析。


为便于理解,这里先大概描述下向线程池提交任务的流程,后面再分析其代码实现:

① 初始化一个容量为 corePoolSize 的池子;

② 刚开始,每来一个任务就在池中创建一个线程去执行该任务,直到池中的容量到达 corePoolSize;

③ 此时若再来任务,则把这些任务放到 workQueue 中;

④ 若 workQueue 也满了,则继续创建线程执行任务,直到线程数量达到 maximumPoolSize;

⑤ 若 workQueue 已满,且线程数量达到 maximumPoolSize,此时若还有任务到来,则执行拒绝策略(handler)。


keepAliveTime & allowCoreThreadTimeOut

其中 keepAliveTime 表示空闲线程的存活时间,这两个值有一定关联:

若 allowCoreThreadTimeOut 为 false (默认),且线程数量超出 corePoolSize,则空闲时间超过 keepAliveTime 的线程会被关闭(最多保留 corePoolSize 个线程存活);

若将 allowCoreThreadTimeOut 设为 true,核心池的线程也会受该超时的影响而关闭。


构造器

ThreadPoolExecutor 内部有多个构造器,但最终都是调用下面这个:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

构造器参数虽然比较多,但基本都是简单的赋值,前面已经分析过这些成员变量的含义,这里不再赘述。下面分析它的核心方法 execute。


在此之前,先看几个常用方法:

// Packing and unpacking ctl
// 根据 ctl 和 CAPACITY 得到线程池的运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 根据 ctl 和 CAPACITY 得到线程池中的线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 将线程池运行状态和线程数量合并为 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

execute 方法代码如下:

// command 是一个 Runnable 对象,也就是用户提交执行的任务
public void execute(Runnable command) {
    // 提交的任务为空时抛出异常
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // 获取当前 ctl (存有线程池状态和线程数量)
    int c = ctl.get();
    // 若当前工作线程数量小于核心池大小(coolPoolSize)
    // 则在核心池中新增一个工作线程,并将该任务交给这个线程执行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 重新获取(存在并发可能)
        c = ctl.get();
    }
    // 若执行到这里,表示池中线程数量 >= corePoolSize,或者上面 addWorker 失败
    // 若线程池处于 RUNNING 状态,并且该任务(command)成功添加到任务队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次获取 ctl 值
        int recheck = ctl.get();
        // 若线程池不是运行状态,则要把上面添加的任务从队列中移除并执行拒绝策略
        //(可理解为“回滚”操作)
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略           
            reject(command);
        // 若此时池中没有线程,则新建一个
        // PS: 这里是防止任务提交后,池中没有存活的线程了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 根据上述代码分析,若执行到这里,可分为以下两种情况:
    // ① 线程池不是 RUNNING 状态;
    // ② 线程池处于 RUNNING 状态,且实际线程数量 workCount >= corePoolSize,
    //   并且,添加到 workQueue 失败(已满)
    // 此时,则需要和 maximumPoolSize 进行比较,
    //   若 workCount <= maximumPoolSize, 则新建一个线程去执行该任务;
    //   否则,即 workCount > maximumPoolSize (饱和),则执行拒绝策略
    else if (!addWorker(command, false))
        // 执行拒绝策略
        reject(command);
}

该方法描述的就是一个任务提交到线程池的流程,主要执行逻辑如下:

1. 若正在运行的线程数少于 corePoolSize,则创建一个新的线程,并将传入的任务(command)作为它的第一个任务执行。


2. 若运行的线程数不小于 corePoolSize,则将新来的任务添加到任务队列(workQueue)。若入队成功,仍需再次检查是否需要增加一个线程(上次检查之后现有的线程可能死了,或者进入该方法时线程池 SHUTDOWN 了,此时需要执行回滚);若池中没有线程则新建一个(确保 SHUTDOWN 状态也能执行队列中的任务)。


3. 若任务不能入队(队列已满),则创建新的线程并执行任务,若失败(超过 maximumPoolSize),则表示线程池关闭或者已经饱和,因此拒绝该任务。


为了便于理解,可参考下面的流程图:

下面分析 Worker 类及 addWorker 方法。


内部嵌套类 Worker

// 继承自 AQS,且实现了 Runnable 接口
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    // 运行的第一个任务,可能为空
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 初始化 thread
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    // 其他一些 AQS 相关的方法不再一一列举
}

可以看到 Worker 类继承自 AQS,它的实现与 ReentrantLock 有一些类似,可对比前文「JDK源码分析-ReentrantLock」分析。而且,Worker 类实现了 Runnable 接口,它的 run 方法是将自身作为参数传递给了外部类的 runWorker 方法,下面分析这两个方法。


addWorker 方法:

// firstTask: 第一个任务,可为空
// core: 是否为核心池,true 是,false 为最大池
private boolean addWorker(Runnable firstTask, boolean core) {
    // 该循环的主要作用就是增加 workCount 计数,增加成功后再新增 Worker 对象
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        /*
         * rs >= SHUTDOWN 表示线程池不再接受新的任务
         * 该判断条件分为以下三种:
         *   ① 线程池处于 STOP, TYDING 或 TERMINATED 状态;
         *   ② 线程池处于 SHUTDOWN 状态,且 firstTask 不为空;
         *   ③ 线程池处于 SHUTDOWN 状态,且 workQueue 为空
         * 满足任一条件即返回 false.
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        for (;;) {
            int wc = workerCountOf(c);
            // 超出最大容量 CAPACITY,或者超出初始设置的核心池/最大池数量,则返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS 方式增加 ctl 的 workerCount 数量(该循环的主要目的)
            if (compareAndIncrementWorkerCount(c))
                break retry; // 若增加失败则退出循环
            c = ctl.get();  // Re-read ctl
            // 运行状态改变
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 标记 Worker 是否启动、是否添加成功
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 将 firstTask 封装成 Worker 对象
        w = new Worker(firstTask);
        // 获取 thread 对象 t
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 若线程池状态小于 SHUTDOWN,即为 RUNNING 状态;
                // 或者为 SHUTDOWN 状态,且 firstTask 为空,
                //   表示不再接受新的任务,但会继续执行队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 添加到工作线程集合(HashSet)
                    workers.add(w);
                    // 更新最大计数
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 标记 Worker 添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 若成功添加到工作线程集合,则启动线程执行任务
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // Worker 启动失败,执行回滚操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

runWorker 方法:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task 不为空时才执行,循环执行
        // getTask 是从 workQueue 中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 若线程池状态 >= STOP,则需要中断该线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 中断线程
                wt.interrupt();
            try {
                // 任务执行前调用该方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后调用该方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // getTask 返回空,说明任务队列没有任务了
        processWorkerExit(w, completedAbruptly);
    }
}

可以看到这里有 beforeExecute 和 afterExecute 方法,分别表示提交的任务执行前后做的事情,在 ThreadPoolExecutor 类中这两个都是空方法。我们可以通过继承 ThreadPoolExecutor 类并重写这两个方法来定制自己的需求。


getTask 方法:

// 从任务队列(阻塞队列)中取任务
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?


    for (;;) {
        // 获取线程池运行状态
        int c = ctl.get();
        int rs = runStateOf(c);


        // Check if queue empty only if necessary.
        /*
         * 线程池运行状态 rs >= SHUTDOWN,表示非 RUNNING 状态
         * 该判断条件有两个:
         * 1. rs >= STOP;
         * 2. rs == SHUTDOWN,且工作队列为空
         * 若满足上述条件中的一个,则将线程数量(workerCount)减少 1,返回 null
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount(); // 减少工作现场数量
            return null; // 返回 null 表示会从池中移除一个 Worker
        }
        
        int wc = workerCountOf(c);
        
        // Are workers subject to culling?
        // 是否要移除 Worker
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 线程数大于 maximumPoolSize,或者需要移除 Worker
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null; // 返回空意味着会减少移除一个 Worker
            continue;
        }
        
        try {
            // 从 workQueue 中获取任务(Runnable 对象)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

该方法主要是从任务队列 workQueue 中获取任务,并且控制池内线的程数量。


拒绝策略

拒绝策略 RejectedExecutionHandler 是一个接口,它只有一个 rejectedExecution 方法,代码如下:

public interface RejectedExecutionHandler {
    // 执行拒绝策略
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

它在 ThreadPoolExecutor 中的几个实现类如下:

ThreadPoolExecutor 默认的拒绝策略为 AbortPolicy,代码如下:

public static class AbortPolicy implements RejectedExecutionHandler {


    public AbortPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 抛出异常
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

可以看到,该策略就是直接抛出 RejectedExecutionException 异常。其他拒绝策略代码也都相对简单,不再一一列举。值得一提的是,如果我们对这几种策略都不满意,可以自定义拒绝策略(实现 RejectedExecutionHandler 接口)。


小结

本文主要分析了线程池 ThreadPoolExecutor 类的主要成员变量和核心方法实现,主要包括一个任务(Runnable)的提交流程。

该类稍微有些复杂,分析时首先要搞清楚任务提交的流程以及主要成员变量(workQueue、corePoolSize、maximumPoolSize、keepAliveTime、allowCoreThreadTimeOut 等)的含义,接下来再分析会更清晰。


PS: 本文是本人参考网上的一些文章及个人理解的结果,如有不正之处,敬请指正。此外,也建议大家多读几篇相关文章进行比较分析,以便更容易理解。

相关推荐

快递查询教程,批量查询物流,一键管理快递

作为商家,每天需要查询许许多多的快递单号,面对不同的快递公司,有没有简单一点的物流查询方法呢?小编的回答当然是有的,下面随小编一起来试试这个新技巧。需要哪些工具?安装一个快递批量查询高手快递单号怎么快...

一键自动查询所有快递的物流信息 支持圆通、韵达等多家快递

对于各位商家来说拥有一个好的快递软件,能够有效的提高自己的工作效率,在管理快递单号的时候都需要对单号进行表格整理,那怎么样能够快速的查询所有单号信息,并自动生成表格呢?1、其实方法很简单,我们不需要一...

快递查询单号查询,怎么查物流到哪了

输入单号怎么查快递到哪里去了呢?今天小编给大家分享一个新的技巧,它支持多家快递,一次能查询多个单号物流,还可对查询到的物流进行分析、筛选以及导出,下面一起来试试。需要哪些工具?安装一个快递批量查询高手...

3分钟查询物流,教你一键批量查询全部物流信息

很多朋友在问,如何在短时间内把单号的物流信息查询出来,查询完成后筛选已签收件、筛选未签收件,今天小编就分享一款物流查询神器,感兴趣的朋友接着往下看。第一步,运行【快递批量查询高手】在主界面中点击【添...

快递单号查询,一次性查询全部物流信息

现在各种快递的查询方式,各有各的好,各有各的劣,总的来说,还是有比较方便的。今天小编就给大家分享一个新的技巧,支持多家快递,一次能查询多个单号的物流,还能对查询到的物流进行分析、筛选以及导出,下面一起...

快递查询工具,批量查询多个快递快递单号的物流状态、签收时间

最近有朋友在问,怎么快速查询单号的物流信息呢?除了官网,还有没有更简单的方法呢?小编的回答当然是有的,下面一起来看看。需要哪些工具?安装一个快递批量查询高手多个京东的快递单号怎么快速查询?进入快递批量...

快递查询软件,自动识别查询快递单号查询方法

当你拥有多个快递单号的时候,该如何快速查询物流信息?比如单号没有快递公司时,又该如何自动识别再去查询呢?不知道如何操作的宝贝们,下面随小编一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号若干...

教你怎样查询快递查询单号并保存物流信息

商家发货,快递揽收后,一般会直接手动复制到官网上一个个查询物流,那么久而久之,就会觉得查询变得特别繁琐,今天小编给大家分享一个新的技巧,下面一起来试试。教程之前,我们来预览一下用快递批量查询高手...

简单几步骤查询所有快递物流信息

在高峰期订单量大的时候,可能需要一双手当十双手去查询快递物流,但是由于逐一去查询,效率极低,追踪困难。那么今天小编给大家分享一个新的技巧,一次能查询多个快递单号的物流,下面一起来学习一下,希望能给大家...

物流单号查询,如何查询快递信息,按最后更新时间搜索需要的单号

最近有很多朋友在问,如何通过快递单号查询物流信息,并按最后更新时间搜索出需要的单号呢?下面随小编一起来试试吧。需要哪些工具?安装一个快递批量查询高手快递单号若干怎么快速查询?运行【快递批量查询高手】...

连续保存新单号功能解析,导入单号查询并自动识别批量查快递信息

快递查询已经成为我们日常生活中不可或缺的一部分。然而,面对海量的快递单号,如何高效、准确地查询每一个快递的物流信息,成为了许多人头疼的问题。幸运的是,随着科技的进步,一款名为“快递批量查询高手”的软件...

快递查询教程,快递单号查询,筛选更新量为1的单号

最近有很多朋友在问,怎么快速查询快递单号的物流,并筛选出更新量为1的单号呢?今天小编给大家分享一个新方法,一起来试试吧。需要哪些工具?安装一个快递批量查询高手多个快递单号怎么快速查询?运行【快递批量查...

掌握批量查询快递动态的技巧,一键查找无信息记录的两种方法解析

在快节奏的商业环境中,高效的物流查询是确保业务顺畅运行的关键。作为快递查询达人,我深知时间的宝贵,因此,今天我将向大家介绍一款强大的工具——快递批量查询高手软件。这款软件能够帮助你批量查询快递动态,一...

从复杂到简单的单号查询,一键清除单号中的符号并批量查快递信息

在繁忙的商务与日常生活中,快递查询已成为不可或缺的一环。然而,面对海量的单号,逐一查询不仅耗时费力,还容易出错。现在,有了快递批量查询高手软件,一切变得简单明了。只需一键,即可搞定单号查询,一键处理单...

物流单号查询,在哪里查询快递

如果在快递单号多的情况,你还在一个个复制粘贴到官网上手动查询,是一件非常麻烦的事情。于是乎今天小编给大家分享一个新的技巧,下面一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号怎么快速查询?...

取消回复欢迎 发表评论: