线程执行器(Executor)和线程池
介绍一下线程执行器(Executor#execute
),和其中一种比较复杂的实现,线程池。
Executor
:任务执行器
executor本身的概念很简单:一个任务的执行器。怎么执行?有各种实现,比如直接由提交任务的主线程执行:
1
2
3
4
5
6
7
public class CallerRunExecutor implements Executor {
@Override
public void execute(@NotNull Runnable command) {
command.run();
}
}
或者更实用一些,由一个异步线程执行:
1
2
3
4
5
6
7
public class AsyncRunExecutor implements Executor {
@Override
public void execute(@NotNull Runnable command) {
new Thread(command).start();
}
}
所以executor的概念本身很简单。
即便是jdk提供的executor,也有很多简单版本,比如Executors.newThreadPerTaskExecutor(ThreadFactory)
,就类似上面这个AsyncRunExecutor的实现。只不过它里面的线程不是直接由new Thread
创建的,而是用ThreadFactory#newThread
创建的。
他们的execute
方法其实没啥区别:
1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void execute(Runnable task) {
start(task);
}
private Thread start(Runnable task) {
Objects.requireNonNull(task);
ensureNotShutdown();
Thread thread = newThread(new TaskRunner(this, task));
start(thread);
return thread;
}
顺带一提,Executors#newVirtualThreadPerTaskExecutor()
其实就是一个传入了能创建虚拟线程的factory的ThreadPerTaskExecutor
,相当于:Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory())
。
不过这个factory创建出来的虚拟线程是没有名字的……所以不如自己写一个:
Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("puppylpg-", 0).factory())
。
ThreadPoolExecutor
:池化任务执行器
线程“池”的概念来自ThreadPoolExecutor
,它功能比较强,所以对Executor
接口的实现比较复杂(漂亮),也一定程度上导致大家误以为executor是个很复杂的概念。
创建ThreadPoolExecutor时
的几个必要参数:
int corePoolSize
:线程池线程不足corePoolSize时,如果有任务到来,就通过创建新的线程来处理;BlockingQueue<Runnable> workQueue
:存放任务的队列,当线程池线程数达到core pool size时,新的任务会放到queue里,由消费者(worker)取出并执行。实际上就是生产者-消费者模式中的缓冲区;int maximumPoolSize
:当queue满了之后,再添加新任务会导致继续创建非core线程,最多创建到maximumPoolSize个线程;long keepAliveTime, TimeUnit unit
:超出corePoolSize数的线程的最大闲置时间,超过就终止该线程;ThreadFactory threadFactory
:创建子线程的factory,设置了factory就可以自定义线程,比如线程名称、daemon;RejectedExecutionHandler handler
:如果BlockingQueue
放不下,应该怎么办;
线程池创建线程的几个关键点:
- 任务提交,如果worker不到core pool size,增加worker。如果是首次创建的
Worker
(因为没到达corePoolSize),创建时任务已经以firstTask
传入Worker**
,直接执行firstTask
,而不是从queue里取任务 - 如果线程数够corePoolSize了,新任务放到队列
BlockingQueue
里 - 如果queue放不下,考虑继续增加worker到max pool size
- 如果已经到max pool size,没法增加worker了,调用reject策略,决定task的命运
所以它是生产者消费者模型。
为什么是先增加到max pool size的worker,然后再把任务放入队列?后文有详细介绍。所以如果是个无界队列,就永远不会创建超过core pool size的线程。而且之前犯过这个错误,确实用过无界队列,导致线程数一直是core pool size,没增长。
这些逻辑通过execute
的实现可以很清楚地看出来:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
分别对应:
if (workerCountOf(c) < corePoolSize)
,addWorker(command, true)
,add worker时候的参数true指的是core worker。addWorker
方法会创建一个worker(会创建一个thread),然后启动这个thread,执行worker的run方法,run方法其实就是从BlockingQueue
里取一个任务,然后执行。具体的调用链路见下文对worker的介绍;workQueue.offer(command)
,注意放到队列用的是offer,而不是put,因为放不进去的时候不需要阻塞(put方法会阻塞);else if (!addWorker(command, false))
,如果queue满了,放不进去任务,就看看有没有可以超借的quota,有的话先尝试增加临时worker。这里addWorker
方法参数里的false
指的是非core worker,可以理解为超借额度;reject(command)
,如果worker也没法增加,调用reject策略;
BlockingQueue
的add
/offer
/put
如果放任务的时候,BlockingQueue
用的不是put而是offer,那么挂起的worker是怎么被唤醒的?其实是我理解错了。我之前一直以为只有put是会唤醒的,offer不会,实际上都会,他们的区别仅在于放不进去的时候会不会阻塞,其他地方是没区别的。
看BlockingQueue
里offer和put的实现,会发现区别在于是return false
还是Condition#await
,其他都一样,都是用enqueue
方法入队的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
enqueue
方法会唤醒挂起的消费者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E e) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
所以add
/offer
/put
,区别不大,仅在于最终是抛异常还是返回false还是阻塞。
worker是什么
可以理解为一个不停循环处理提交到线程池里的任务的线程。
worker绑定线程
worker肯定是一个单独的thread,用来异步执行任务。它是怎么和thread绑定的?worker里封装了一个Thread:
1
2
3
4
5
6
7
8
9
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
thread是由ThreadFactory
创建的,thread factory就一个接口,负责创建一个接口并执行runnable任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ThreadFactory {
/**
* Constructs a new unstarted {@code Thread} to run the given runnable.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*
* @see <a href="../../lang/Thread.html#inheritance">Inheritance when
* creating threads</a>
*/
Thread newThread(Runnable r);
}
最简单的thread factory可以这么实现:
1
2
3
4
5
6
7
class SimplestThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
}
ThreadPoolExecutor
里默认的ThreadFactory
是Executors.defaultThreadFactory()
,它其实跟上面实现的最简单的factory差不多:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* The default thread factory.
*/
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
@SuppressWarnings("removal")
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
它以pool-x-thread-y
这个经典的jdk里线程的命名方式给thread命名,并设置thread为非daemon。
线程绑定任务?
线程用于执行任务,一个线程执行完任务后,会再取下一个任务执行,所以从逻辑上来讲,线程和任务是1对n的关系,不是一一绑定的。
但是Thread
这个类,thread是和Runnable
绑定的,它只能执行这一个Runnable。
那么线程是怎么做到能不停执行新任务的?
也很简单,这个runnable(initial task)实现这么一套机制就行了:thread先执行firstTask(ThreadFactory
在创建线程的时候,会把一个Runnable
(firstTask)传给thread),等把它执行完了,就去queue里取下一个任务执行,循环往复。这个机制(initial task)一直不间断,就可以不断执行交给线程池的任务。
“线程池一开始创建线程的时候,会先扔一个firstTask给线程”,从这里也可以看出,初始任务是不需要从queue里取的。所以线程池是等task达到core pool size之后才开始把task入队。这也回答了开头提出的问题。
worker启动方式
在addWorker
之后,会start这个thread,所以thread就启动了。启动后的thread,就会执行initial task。看看代码,这个initial task不是别人,正是worker自己:
1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
所以这里的initial task不是外部传进来的firstTask,而是一套持续执行task的逻辑——runWorker
:
1
2
3
4
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
runWorker
会先执行firstTask,然后在task执行完毕后,使用getTask()
从队列里取一个新任务,继续执行,循环往复:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask
方法就是Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take()
,取任务用了take,没任务会挂起;task.run()
,取到任务之后,直接调用任务的run方法执行。因为它已经是一个额外的线程了,所以它执行任务只需要直接调用Runnable#run
就行了。
注意看这个方法里的firstTask
:firstTask跟后续从queue里取的task没什么区别,一上来就可以置null了,让jvm gc掉,没必要一直留着它。
getTask()
用poll或take取任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 用take,是因为线程没有任务执行的时候,可以挂起,等待被唤醒即可;
- 用
poll(keepAliveTime)
,是为了实现线程池的另一套机制:如果设置了keepAliveTime,在到达时间还没任务的话,就把core线程销毁,而不是一直活下去。这里如果poll超时了,就设置个timeout标记,用于销毁线程;
worker流程总结
总结一下worker的整体流程:
addWorker
- start
thread
inWorker
thread
runWorker#run
Worker#run
==runWorker
- get task from
BlockingQueue
andtask.run
, again and again
worker和aqs
worker本身继承了aqs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
worker和aqs有什么关系呢?worker基于aqs实现互斥锁,所以实现了tryAcquire
/tryRelease
。state有两种状态,为0代表未锁定,为1代表已占用。
runWorker
的时候,从BlockingQueue
取到task之后,要执行任务,执行前后要先Worker#lock
和Worker#unlock
:
1
2
public void lock() { acquire(1); }
public void unlock() { release(1); }
所以这里worker基于aqs实现的作用就是:确保任务的执行流程是互斥的,不存在一个worker同时执行两个任务的情况。
响应中断
ExecutorService#shutdownNow()
会强行关闭线程池,抛弃那些还未执行的任务。其实就是调用worker thread的interrupt方法,给worker thread发送中断信号。这就要求worker在执行任务的时候,要响应中断。
runWorker()
是一个不断从BlockingQueue
取任务的循环,退出循环的方式就是在任务执行前校验是否收到interrupt中断,如果收到就退出。这一行为就是对interrupt的响应。
shutdownNow()
会interrupt所有的thread:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* interrupts tasks via {@link Thread#interrupt}; any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}
shutdown()
则只会interrupt空闲worker,会让执行任务中的worker继续执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
还有一个awaitTermination(long, TimeUnit)
:阻塞方法,如果timeout内任务执行完,返回true;如果没有,返回false。 这个方法可以在关闭线程池的时候作为一个“中庸”逻辑:既不直接关闭,也不等太久。
最终JDK建议关闭线程池的样例是这样的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void shutdownAndAwaitTermination(ExecutorService pool) {
// 告知关闭,先不接收新任务了
pool.shutdown();
try {
// 等待已有任务结束
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
// 关闭当前执行任务
pool.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
- 先尝试温柔关闭线程池(结束idle线程);
- 等60s,等待已有任务结束;
- 如果60s后还没结束,强行终止在执行的任务(这就要求worker在执行任务的时候,要响应中断);
- 如果60s内任务执行完了,就结束了,什么也不用做;
- 因为“等待60s”这一行为本身也是可中断的,所以如果在这一过程中,本线程也收到了interrupt信号(“别等60s了”),那就不再坚持,直接调用shutdownNow,给worker发送中断信号;
但是用起来还是太麻烦了,推荐Guava的MoreExecutors.getExitingExecutorService(ThreadPoolExecutor, long, TimeUnit)
。它的作用是把ExecutorService
转换成一个exit executor service(当jvm退出时会自动关闭的线程池)。
总结
ThreadPoolExecutor
就此翻篇~ 看看它用到了哪些技术:
- 维护任务列表,用到了BlockingQueue;
- 底层是生产者/消费者模型,线程的wait/notify机制(或者说更高级的await/signal);
- ThreadFactory,创建线程;
- AtomicInteger:默认thread factory创建出的线程的名字都是
pool-x-thread-y
,这里的x和y都是AtomicInteger。pool是每创建一个x+1,thread是每创建一个y+1。pool的计数器是static变量,所以整个jvm的pool number共用一个计数器。thread的计数器是factory实例的成员变量,所以每一个pool有一个自己的计数器;
- AtomicInteger:默认thread factory创建出的线程的名字都是
- aqs:只worker本身集成aqs,而不是指BlockingQueue;
- 中断与响应;