线程池源码理解

线程池参数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }//核心线程数,最大线程数,超时等待时间,超时等待单位,阻塞队列,线程工厂,拒绝策略

参数配置

线程池流程

提交优先级:core thread > queue > other thread

执行优先级:core thread > other thread > queue

execute方法

ThreadPoolExecutor有一个AtomicInteger变量,叫ctl(control的简写),一共32位,

高3位为线程池的状态runstatus(Running,Shutdown,Stop,Tidying,Terminate),

低29位存当前有效线程数workerCount

提交流程

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static final int COUNT_BITS = Integer.SIZE - 3;//COUNT_BITS=29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//线程容量

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;//111
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//000
    private static final int STOP       =  1 << COUNT_BITS;//001
    private static final int TIDYING    =  2 << COUNT_BITS;//010
    private static final int TERMINATED =  3 << COUNT_BITS;//011

    //返回状态,上边那5种之一
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //返回工作线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    //判断工作线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //执行addworker,创建一个核心线程,创建失败重新获取ctl
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果工作线程数大于核心线程数,判断线程池的状态是否为running,并且可以添加进队列
    //如果线程池不是running状态,则执行拒绝策略,(还是会调用一次addworker)
    if (isRunning(c) && workQueue.offer(command)) {
        //再次获取ctl,进行双重检索
        int recheck = ctl.get();
        //如果线程池是不是处于RUNNING的状态,那么就会将任务从队列中移除, 
        //如果移除失败,则会判断工作线程是否为0 ,如果过为0 就创建一个非核心线程 
        //如果移除成功,就执行拒绝策略,因为线程池已经不可用了;
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //线程池挂了或者大于最大线程数
    else if (!addWorker(command, false))
        reject(command);
}

addWorker方法

其实做了两件事:

  1. 才用循环CAS操作来将线程数加1;
  2. 新建一个线程并启用。
private boolean addWorker(Runnable firstTask, boolean core) {
        //(1)循环CAS操作,将线程池中的线程数+1.
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //core true代表是往核心线程池中增加线程 false代表往最大线程池中增加线程
                //线程数超标,不能再添加了,直接返回
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS修改ctl的值+1,在线程池中为将要添加的线程流出空间,成功退出cas循环,失败继续
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //如果线程池的状态发生了变化回到retry外层循环
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        //(2)新建线程,并加入到线程池workers中。
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //对workers操作要通过加锁来实现
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
               //细化锁的力度,防止临界区过大,浪费时间
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    //判断线程池的状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //判断添加的任务状态,如果已经开始丢出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                       //将新建的线程加入到线程池中
                        workers.add(w);
                        int s = workers.size();
                        //修正largestPoolSize的值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //线程添加线程池成功,则开启新创建的线程
                if (workerAdded) {
                    t.start();//(3)
                    workerStarted = true;
                }
            }
        } finally {
            //线程添加线程池失败或者线程start失败,则需要调用addWorkerFailed函数,
            //如果添加成功则需要移除,并回复clt的值
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker类继承自AQS,实现了Runnable接口。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        //线程池中正真运行的线程。通过我们指定的线程工厂创建而来
        final Thread thread;
       //线程包装的任务。thread 在run时主要调用了该任务的run方法
        Runnable firstTask;
        //记录当前线程完成的任务数
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //利用我们指定的线程工厂创建一个线程
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

参考:https://blog.csdn.net/varyall/article/details/82392048

runWorker方法

从中可以看出workQueue的优先级是最低的。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //task = getTask()中workQueue.poll,所有队列执行的优先级是最低的
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //线程池处于stop状态或者当前线程被中断时,线程池状态是stop状态。
            //但是当前线程没有中断,则发出中断请求
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted()) {
                wt.interrupt();
            }

            try {
                //开始执行任务前的Hook,类似回调函数
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //任务执行后的Hook,类似回调函数
                    afterExecute(task, thrown);
                }
            } finally {
                //执行完毕后task重置,completedTasks计数器++,解锁
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
       //线程空闲达到我们设定的值时,Worker退出销毁。
        processWorkerExit(w, completedAbruptly);
    }
}

回收线程方法

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            //如果是意外退出的话,那么就需要把WorkerCount--
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //Worker移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        //判断当前的线程池是否处于SHUTDOWN状态,判断是否要终止线程
        tryTerminate();

        int c = ctl.get();
        //如果当前线程池状态比STOP大的话,就不处理
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                //allowCoreThreadTimeOut=true的话,核心线程超时也会被销毁
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //Queue不为空的话至少留一个
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //当前线程大于最小的话直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //如果当前运行的Worker数比当前所需要的Worker数少的话,调用addWorker
            addWorker(null, false);
        }
    }

   转载规则


《线程池源码理解》 锦泉 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录