文章

线程执行器(Executor)和线程池

介绍一下线程执行器(Executor#execute),和其中一种比较复杂的实现,线程池。

  1. Executor:任务执行器
  2. ThreadPoolExecutor:池化任务执行器
    1. BlockingQueueadd/offer/put
  3. worker是什么
    1. worker绑定线程
    2. 线程绑定任务?
    3. worker启动方式
    4. worker流程总结
    5. worker和aqs
    6. 响应中断
  4. 总结

Executor:任务执行器

executor本身的概念很简单:一个任务的执行器。怎么执行?有各种实现,比如直接由提交任务的主线程执行:

1
2
3
4
5
6
7
public class CallerRunExecutor implements Executor {
    
    @Override
    public void execute(@NotNull Runnable command) {
        command.run();
    }
}

或者更实用一些,由一个异步线程执行:

1
2
3
4
5
6
7
public class AsyncRunExecutor implements Executor {

    @Override
    public void execute(@NotNull Runnable command) {
        new Thread(command).start();
    }
}

所以executor的概念本身很简单

即便是jdk提供的executor,也有很多简单版本,比如Executors.newThreadPerTaskExecutor(ThreadFactory),就类似上面这个AsyncRunExecutor的实现。只不过它里面的线程不是直接由new Thread创建的,而是用ThreadFactory#newThread创建的。

他们的execute方法其实没啥区别:

1
2
3
4
5
6
7
8
9
10
11
12
    @Override
    public void execute(Runnable task) {
        start(task);
    }

    private Thread start(Runnable task) {
        Objects.requireNonNull(task);
        ensureNotShutdown();
        Thread thread = newThread(new TaskRunner(this, task));
        start(thread);
        return thread;
    }

顺带一提,Executors#newVirtualThreadPerTaskExecutor()其实就是一个传入了能创建虚拟线程的factory的ThreadPerTaskExecutor,相当于:Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory())

不过这个factory创建出来的虚拟线程是没有名字的……所以不如自己写一个:Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("puppylpg-", 0).factory())

ThreadPoolExecutor:池化任务执行器

线程“池”的概念来自ThreadPoolExecutor,它功能比较强,所以对Executor接口的实现比较复杂(漂亮),也一定程度上导致大家误以为executor是个很复杂的概念。

创建ThreadPoolExecutor时的几个必要参数:

  • int corePoolSize:线程池线程不足corePoolSize时,如果有任务到来,就通过创建新的线程来处理;
  • BlockingQueue<Runnable> workQueue:存放任务的队列,当线程池线程数达到core pool size时,新的任务会放到queue里,由消费者(worker)取出并执行。实际上就是生产者-消费者模式中的缓冲区;
  • int maximumPoolSize当queue满了之后,再添加新任务会导致继续创建非core线程,最多创建到maximumPoolSize个线程;
  • long keepAliveTime, TimeUnit unit:超出corePoolSize数的线程的最大闲置时间,超过就终止该线程;
  • ThreadFactory threadFactory:创建子线程的factory,设置了factory就可以自定义线程,比如线程名称、daemon;
  • RejectedExecutionHandler handler:如果BlockingQueue放不下,应该怎么办;

线程池创建线程的几个关键点:

  1. 任务提交,如果worker不到core pool size,增加worker。如果是首次创建的Worker(因为没到达corePoolSize),创建时任务已经以firstTask传入Worker**,直接执行firstTask,而不是从queue里取任务
  2. 如果线程数够corePoolSize了,新任务放到队列BlockingQueue
  3. 如果queue放不下,考虑继续增加worker到max pool size
  4. 如果已经到max pool size,没法增加worker了,调用reject策略,决定task的命运

所以它是生产者消费者模型。

为什么是先增加到max pool size的worker,然后再把任务放入队列?后文有详细介绍。所以如果是个无界队列,就永远不会创建超过core pool size的线程。而且之前犯过这个错误,确实用过无界队列,导致线程数一直是core pool size,没增长。

这些逻辑通过execute的实现可以很清楚地看出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

分别对应:

  1. if (workerCountOf(c) < corePoolSize)addWorker(command, true),add worker时候的参数true指的是core worker。addWorker方法会创建一个worker(会创建一个thread),然后启动这个thread,执行worker的run方法,run方法其实就是从BlockingQueue里取一个任务,然后执行。具体的调用链路见下文对worker的介绍;
  2. workQueue.offer(command),注意放到队列用的是offer,而不是put,因为放不进去的时候不需要阻塞(put方法会阻塞)
  3. else if (!addWorker(command, false))如果queue满了,放不进去任务,就看看有没有可以超借的quota,有的话先尝试增加临时worker。这里addWorker方法参数里的false指的是非core worker,可以理解为超借额度;
  4. reject(command),如果worker也没法增加,调用reject策略;

BlockingQueueadd/offer/put

如果放任务的时候,BlockingQueue用的不是put而是offer,那么挂起的worker是怎么被唤醒的?其实是我理解错了。我之前一直以为只有put是会唤醒的,offer不会,实际上都会,他们的区别仅在于放不进去的时候会不会阻塞,其他地方是没区别的

BlockingQueue里offer和put的实现,会发现区别在于是return false还是Condition#await,其他都一样,都是用enqueue方法入队的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.  This method is generally preferable to method {@link #add},
     * which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

enqueue方法会唤醒挂起的消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }

所以add/offer/put,区别不大,仅在于最终是抛异常还是返回false还是阻塞。

worker是什么

可以理解为一个不停循环处理提交到线程池里的任务的线程。

worker绑定线程

worker肯定是一个单独的thread,用来异步执行任务。它是怎么和thread绑定的?worker里封装了一个Thread:

1
2
3
4
5
6
7
8
9
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

thread是由ThreadFactory创建的,thread factory就一个接口,负责创建一个接口并执行runnable任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ThreadFactory {

    /**
     * Constructs a new unstarted {@code Thread} to run the given runnable.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     *
     * @see <a href="../../lang/Thread.html#inheritance">Inheritance when
     * creating threads</a>
     */
    Thread newThread(Runnable r);
}

最简单的thread factory可以这么实现:

1
2
3
4
5
6
7
class SimplestThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r);
    }
}

ThreadPoolExecutor里默认的ThreadFactoryExecutors.defaultThreadFactory(),它其实跟上面实现的最简单的factory差不多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    /**
     * The default thread factory.
     */
    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            @SuppressWarnings("removal")
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

它以pool-x-thread-y这个经典的jdk里线程的命名方式给thread命名,并设置thread为非daemon。

线程绑定任务?

线程用于执行任务,一个线程执行完任务后,会再取下一个任务执行,所以从逻辑上来讲,线程和任务是1对n的关系,不是一一绑定的。

但是Thread这个类,thread是和Runnable绑定的,它只能执行这一个Runnable。

那么线程是怎么做到能不停执行新任务的?

也很简单,这个runnable(initial task)实现这么一套机制就行了:thread先执行firstTask(ThreadFactory在创建线程的时候,会把一个Runnable(firstTask)传给thread),等把它执行完了,就去queue里取下一个任务执行,循环往复。这个机制(initial task)一直不间断,就可以不断执行交给线程池的任务。

“线程池一开始创建线程的时候,会先扔一个firstTask给线程”,从这里也可以看出,初始任务是不需要从queue里取的。所以线程池是等task达到core pool size之后才开始把task入队。这也回答了开头提出的问题。

worker启动方式

addWorker之后,会start这个thread,所以thread就启动了。启动后的thread,就会执行initial task。看看代码,这个initial task不是别人,正是worker自己:

1
2
3
4
5
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

所以这里的initial task不是外部传进来的firstTask,而是一套持续执行task的逻辑——runWorker

1
2
3
4
        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

runWorker会先执行firstTask,然后在task执行完毕后,使用getTask()从队列里取一个新任务,继续执行,循环往复:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. getTask方法就是Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(),取任务用了take,没任务会挂起;
  2. task.run(),取到任务之后,直接调用任务的run方法执行。因为它已经是一个额外的线程了,所以它执行任务只需要直接调用Runnable#run就行了。

注意看这个方法里的firstTaskfirstTask跟后续从queue里取的task没什么区别,一上来就可以置null了,让jvm gc掉,没必要一直留着它

getTask()用poll或take取任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();

            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  1. 用take,是因为线程没有任务执行的时候,可以挂起,等待被唤醒即可;
  2. poll(keepAliveTime),是为了实现线程池的另一套机制:如果设置了keepAliveTime,在到达时间还没任务的话,就把core线程销毁,而不是一直活下去。这里如果poll超时了,就设置个timeout标记,用于销毁线程;

worker流程总结

总结一下worker的整体流程:

  1. addWorker
  2. start thread in Worker
  3. thread run Worker#run
  4. Worker#run == runWorker
  5. get task from BlockingQueue and task.run, again and again

worker和aqs

worker本身继承了aqs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

worker和aqs有什么关系呢?worker基于aqs实现互斥锁,所以实现了tryAcquire/tryRelease。state有两种状态,为0代表未锁定,为1代表已占用。

runWorker的时候,从BlockingQueue取到task之后,要执行任务,执行前后要先Worker#lockWorker#unlock

1
2
        public void lock()        { acquire(1); }
        public void unlock()      { release(1); }

所以这里worker基于aqs实现的作用就是:确保任务的执行流程是互斥的,不存在一个worker同时执行两个任务的情况

响应中断

ExecutorService#shutdownNow()会强行关闭线程池,抛弃那些还未执行的任务。其实就是调用worker thread的interrupt方法,给worker thread发送中断信号。这就要求worker在执行任务的时候,要响应中断。

runWorker()是一个不断从BlockingQueue取任务的循环,退出循环的方式就是在任务执行前校验是否收到interrupt中断,如果收到就退出。这一行为就是对interrupt的响应。

shutdownNow()会interrupt所有的thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * interrupts tasks via {@link Thread#interrupt}; any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }


    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        // assert mainLock.isHeldByCurrentThread();
        for (Worker w : workers)
            w.interruptIfStarted();
    }

shutdown()则只会interrupt空闲worker,会让执行任务中的worker继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

还有一个awaitTermination(long, TimeUnit):阻塞方法,如果timeout内任务执行完,返回true;如果没有,返回false。 这个方法可以在关闭线程池的时候作为一个“中庸”逻辑:既不直接关闭,也不等太久。

最终JDK建议关闭线程池的样例是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void shutdownAndAwaitTermination(ExecutorService pool) {
    // 告知关闭,先不接收新任务了
    pool.shutdown();
    try {
        // 等待已有任务结束
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
            // 关闭当前执行任务
            pool.shutdownNow();
            // Wait a while for tasks to respond to being cancelled
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("Pool did not terminate");
            }
        }
    } catch (InterruptedException ie) {
        // (Re-)Cancel if current thread also interrupted
        pool.shutdownNow();
        // Preserve interrupt status
        Thread.currentThread().interrupt();
    }
}
  1. 先尝试温柔关闭线程池(结束idle线程);
  2. 等60s,等待已有任务结束;
    1. 如果60s后还没结束,强行终止在执行的任务(这就要求worker在执行任务的时候,要响应中断);
    2. 如果60s内任务执行完了,就结束了,什么也不用做;
    3. 因为“等待60s”这一行为本身也是可中断的,所以如果在这一过程中,本线程也收到了interrupt信号(“别等60s了”),那就不再坚持,直接调用shutdownNow,给worker发送中断信号;

但是用起来还是太麻烦了,推荐Guava的MoreExecutors.getExitingExecutorService(ThreadPoolExecutor, long, TimeUnit)。它的作用是把ExecutorService转换成一个exit executor service(当jvm退出时会自动关闭的线程池)。

总结

ThreadPoolExecutor就此翻篇~ 看看它用到了哪些技术:

  1. 维护任务列表,用到了BlockingQueue;
    1. 底层是生产者/消费者模型,线程的wait/notify机制(或者说更高级的await/signal);
  2. ThreadFactory,创建线程;
    1. AtomicInteger:默认thread factory创建出的线程的名字都是pool-x-thread-y,这里的x和y都是AtomicInteger。pool是每创建一个x+1,thread是每创建一个y+1。pool的计数器是static变量,所以整个jvm的pool number共用一个计数器。thread的计数器是factory实例的成员变量,所以每一个pool有一个自己的计数器;
  3. aqs:只worker本身集成aqs,而不是指BlockingQueue;
  4. 中断与响应;
本文由作者按照 CC BY 4.0 进行授权