文章

ThreadPoolExecutor - Worker

之前通过Executor - Thread Pool介绍了ThreadPoolExecutor,这次详细介绍一下ThreadPoolExecutor#Worker

  1. ThreadPoolExecutor
    1. BlockingQueueadd/offer/put
  2. worker是什么
    1. 关联线程
    2. worker执行方式
    3. worker和aqs
    4. 响应中断
  3. 感想

ThreadPoolExecutor

线程池线程创建的流程:

  1. 任务提交,如果worker不到core pool size,增加worker。如果是首次创建的Worker(因为没到达corePoolSize),创建时任务已经以firstTask传入Worker,直接执行firstTask,而不是从queue里取任务
  2. 如果线程数够corePoolSize了,新任务放到队列BlockingQueue
  3. 如果queue放不下,考虑继续增加worker到max pool size
  4. 如果已经到max pool size,没法增加worker了,调用reject策略,决定task的命运

所以它是生产者消费者模型。

至于怎么记住是先放进队列再增加到max pool size?可以理解为不到万不得已,不继续增加线程。所以如果是个无界队列,就永远不会创建超过core pool size的线程。而且之前犯过这个错误,确实用过无界队列,导致线程数一直是core pool size,没增长。

这些逻辑通过submit(调用execute)的实现可以很清楚地看出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

分别对应:

  1. if (workerCountOf(c) < corePoolSize)addWorker(command, true),add worker时候的参数true指的是core worker。addWorker方法会创建一个worker(会创建一个thread),然后启动这个thread,执行worker的run方法,run方法其实就是从BlockingQueue里取一个任务,然后执行。具体的调用链路见下文对worker的介绍;
  2. workQueue.offer(command),注意放到队列用的是offer,而不是put,因为放不进去的时候不需要阻塞;
  3. else if (!addWorker(command, false)),放不进queue就尝试继续增加worker,这里参数里的false指的是非core worker;
  4. reject(command),如果worker也没法增加,调用reject策略;

BlockingQueueadd/offer/put

如果放任务的时候,BlockingQueue用的不是put而是offer,那么挂起的worker是怎么被唤醒的?其实是我理解错了。我之前一直以为只有put是会唤醒的,offer不会,实际上都会,他们的区别仅在于放不进去的时候会不会阻塞,其他地方是没区别的

BlockingQueue里offer和put的实现,会发现区别在于是return false还是Condition#await,其他都一样,都是用enqueue方法入队的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.  This method is generally preferable to method {@link #add},
     * which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

enqueue方法会唤醒挂起的消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }

所以add/offer/put,区别不大,仅在于最终是抛异常还是返回false还是阻塞。

worker是什么

是基于aqs的实现类,同时实现了Runnable方法。

关联线程

worker肯定是一个单独的thread,用来异步执行任务。它是怎么和thread关联的?worker里封装了一个Thread:

1
2
3
4
5
6
7
8
9
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

thread是由ThreadFactory创建的,thread factory就一个接口,负责创建一个接口并执行runnable任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ThreadFactory {

    /**
     * Constructs a new unstarted {@code Thread} to run the given runnable.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     *
     * @see <a href="../../lang/Thread.html#inheritance">Inheritance when
     * creating threads</a>
     */
    Thread newThread(Runnable r);
}

最简单的thread factory可以这么实现:

1
2
3
4
5
6
7
class SimplestThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r);
    }
}

ThreadPoolExecutor里默认的ThreadFactoryExecutors.defaultThreadFactory(),它其实跟上面实现的最简单的factory差不多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    /**
     * The default thread factory.
     */
    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            @SuppressWarnings("removal")
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

它以pool-x-thread-y这个经典的jdk里线程的命名方式给thread命名,并设置thread为非daemon。

worker执行方式

addWorker之后,start了这个thread,但是没有start worker,所以worker的run方法是怎么启动的呢?看thread的创建方式:ThreadFactory在创建线程的时候,就把Runnable传给了新创建的thread,所以线程已经持有了这个Runnable。当Thread#start的时候,就执行了这个Runnable。仔细看worker创建thread时候的代码this.thread = getThreadFactory().newThread(this)worker把自己作为Runnable传给了这个新的thread。所以执行thread的时候,就是在执行worker。具体流程:

  1. addWorker
  2. start thread in Worker
  3. thread run Worker#run(具体见下文对worker的介绍)
  4. Worker#run == runWorker
  5. get task from BlockingQueue and task.run

thread启动了之后一直存在,并从BlockingQueue里取任务并执行。执行的时候其实就是直接调用了任务的run方法。由于它已经是一个额外的线程了,所以它执行任务只需要直接调用Runnable#run就行了。

再来看对应的实现——它自己是Runnable,它的run方法调用了runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. getTask方法就是Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(),取任务用了take,没任务会挂起;
  2. task.run(),取到任务之后,直接调用任务的run方法执行;

worker和aqs

那么worker和aqs有什么关系呢?worker是一个基于aqs实现的互斥锁,所以实现了tryAcquire/tryRelease。state有两种状态,为0代表未锁定,为1代表已占用。

runWorker的时候,从BlockingQueue取到task之后,要执行任务,执行前后要先Worker#lockWorker#unlock

1
2
        public void lock()        { acquire(1); }
        public void unlock()      { release(1); }

所以这里worker基于aqs实现的作用就是:确保任务的执行流程是互斥的,不存在一个worker同时执行两个任务的情况

响应中断

ExecutorService#shutdownNow()会强行关闭线程池,抛弃那些还未执行的任务。其实就是调用worker thread的interrupt方法,给worker thread发送中断信号。这就要求worker在执行任务的时候,要响应中断。

runWorker是一个不断从BlockingQueue取任务的循环,退出方式就是在任务执行前校验是否收到interrupt中断,如果收到就退出。这一行为就是对interrupt的响应。

感想

ThreadPoolExecutor就此翻篇~

本文由作者按照 CC BY 4.0 进行授权