Executor - Thread Pool
JDK中任务和任务的执行者是一套设计的比较好的相互解耦的框架。之前在Java并发编程:并发任务执行及结果获取描述过jdk中关于Executor
的基本逻辑,这次从更加宏观的角度重新全面梳理一下,并补充一点Guava对Executor
和Future
的拓展。
任务
Runnable
/Callable
- 简单任务表示
任务由Callable
/Runnble
表示。
Runnable
在Java 1.0就有了,Callable
是Java 1.5才有的。
Runnable
转Callable
Runnable
和Callable
相比,缺少返回值。所以Runnable
是可以转为Callable
的,只要返回null就行了。或者说Runnable
就是返回值为null的Callable
。Runnable
转Callable
主要是因为有些方法只接受Callable
不接受Runnable
,所以把Runnable
转成Callable
:
This can be useful when applying methods requiring a Callable to an otherwise resultless action. 当将需要Callable的方法应用到其他无结果的操作时,这会很有用。
在Executors
工具类方法里,提供了Runnable
转Callable
的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* {@code Callable} to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @param <T> the type of the result
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns {@code null}.
* @param task the task to run
* @return a callable object
* @throws NullPointerException if task null
*/
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
RunnableAdapter
接收两个参数:Runnable
和result(泛型T)。这个result必然是null,有一种未卜先知的感觉,还没执行就知道结果了,因为Runnable
本身就不返回值:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
// 你尽管执行
task.run();
// 最终一定return null
return result;
}
}
Future
- 高级任务表示
更高级的方式是用Future<T>
表示结果。这么做主要是引入了对任务生命周期的控制:通过Future
可以判断任务是否完成isDone
/isCancelled
、获取任务结果get
,或者取消任务cancel
。
boolean isCancelled()
boolean isDone()
boolean cancel(boolean mayInterruptIfRunning)
V get() throws InterruptedException, ExecutionException
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
所以一般不直接new Thread(Runnable)#start()
,多是通过线程池提交Callable
/Runnble
,然后获取Future
,再和Future
打交道。
RunnableFuture
- 真正的任务表示
Runnable
+ Future
,两个接口的结合。既有任务执行方法,又有任务结果表示,所以这个类才是实际执行前创建的任务形态。
RunnableFuture
的实现类是FutureTask
,它的模型也比较简单:
- 内置一个变量outcome,一个int值state表示当前任务的状态;
- 线程执行任务的时候,会改变state,如果有结果,会放入outcome;
- 其他线程要获取结果,就检查state,如果是已完成,就获取outcome。否则就挂起(以该
FutureTask
对象作为锁,挂起到它的等待队列)。
任务的执行者
Executor
- 简单执行者
任务的执行者最简单的抽象是Executor
。它只提供了execute
方法,返回值是void。
任务不是execute
了就立刻执行的,而是会在将来某个时刻执行,具体取决于线程池的线程什么时候有空,且抢到了CPU资源。
void execute(Runnable command)
线程为什么能执行任务?
线程池之所以能执行任务,是因为里面的线程直接执行了任务的
run()
方法(由这个线程去运行task的那些干活的代码,而非主线程,所以是异步执行的)。
Executor
接口的缺点是:
- 对任务的执行没有暴露任何控制接口;
- 它只能做到执行
Runnable
任务,无法返回任何结果。
ExecutorService
- 高级执行者
所以真正有用的接口是ExecutorService
。正如Future
对Callable
/Runnable
的拓展给任务加上了状态判断一样,ExecutorService
也对Executor
进行了拓展,给执行者加上了状态控制和判断方法。比如关闭executor的shutdown
,和判断executor是否关闭的isShutdown
等。
boolean isShutdown()
boolean isTerminated()
void shutdown()
List<Runnable> shutdownNow()
Executor#execute
提交Runnable
无返回值,相对应的ExecutorService
新增了提交Callable
/Runnable
的submit
方法,返回任务的执行结果Future
。
<T> Future<T> submit(Callable<T> task)
Future<?> submit(Runnable task)
:submit
一个Runnable
和execute
一个Runnable
没啥区别,即使通过submit
提交,返回值也依然是null,和使用RunnableAdapter
类似<T> Future<T> submit(Runnable task, T result)
:也不一定非得返回null,也可以返回其他指定的result。但无论是什么result,都属于“未卜先知”了
关闭ExecutorService
关于关闭ExecutorService
的方法:
shutdown()
:线程池不再接受新任务了,但不是立即关停,也不保证等待已有任务执行完;shutdownNow()
:强关。正在执行的也别执行了。其实就是调用worker thread的interrupt方法,给worker thread发送中断信号;awaitTermination(long, TimeUnit)
:阻塞方法,要么任务执行完,要么超时,要么被interrupt,否则一直阻塞。
JDK建议关闭一个线程池的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void shutdownAndAwaitTermination(ExecutorService pool) {
// 告知关闭,先不接收新任务了
pool.shutdown();
try {
// 等待已有任务结束
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
// 关闭当前执行任务
pool.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
- 先关闭线程池,结束idle线程;
- 等60s,等待已有任务结束;
- 如果60s后还没结束,强行终止在执行的任务(这就要求worker在执行任务的时候,要响应中断);
- 如果在这一过程中,本线程也收到了interrupt信号(“别等60s了”),那就不再坚持60s的等待,直接调用shutdownNow,给worker发送中断信号;
但是用起来还是太麻烦了,推荐Guava的MoreExecutors.getExitingExecutorService(ThreadPoolExecutor, long, TimeUnit)
。它的作用是把ExecutorService
转换成一个exit executor service(当jvm退出时会自动关闭的线程池)。详情见后面对exiting executor service的介绍。
ThreadPoolExecutor
- ExecutorService
实现者
Executor
/ExecutorService
的实现是ThreadPoolExecutor
,以线程池的形式实现了ExecutorService
。
ThreadPoolExecutor
是一个典型的生产者消费者模型:
- 用户提交任务进入线程池,实际是加入了
BlockingQueue
; - 线程池的线程从
BlockingQueue
取任务,执行;
详情可以参考ThreadPoolExecutor
。
所以生产者消费者模型,也可以用现成的
ThreadPoolExecutor
去实现啊!
创建ThreadPoolExecutor时
的几个必要参数:
int corePoolSize
:线程池线程不足corePoolSize时,如果有任务到来,就通过创建新的线程来处理;BlockingQueue<Runnable> workQueue
:存放任务的队列,当线程池线程数达到core pool size时,新的任务会放到queue里,由消费者(worker)取出并执行。实际上就是生产者-消费者模式中的缓冲区;int maximumPoolSize
:当queue满了之后,再添加新任务会导致继续创建非core线程,最多创建到maximumPoolSize个线程;long keepAliveTime, TimeUnit unit
:超出corePoolSize数的线程的最大闲置时间,超过就终止该线程;ThreadFactory threadFactory
:创建子线程的factory,设置了factory就可以自定义线程,比如线程名称、daemon;RejectedExecutionHandler handler
:如果BlockingQueue
放不下,应该怎么办;
所以ThreadPoolExecutor
通过里面的BlockingQueue
,实现了生产者消费者模式:
- 通过
execute
/submit
方法,提交Runnable
/Callable
到BlockingQueue
; - 任务提交后,使用内部的
Worker
从BlockingQueue
获取并执行任务;
Worker
ThreadPoolExecutor
的内部类。线程池ThreadPoolExecutor
里的工作线程持有者,内部持有Thread
对象(由ThreadPoolExecutor
的ThreadFactory
创建,所以创建线程和使用线程也解耦了)。线程池的大小其实就是Worker
的多少。实际以HashSet<Worker> workers
的形式存在。
所以线程池大小getPoolSize()实际就是返回workers.size(),当然要加锁获取其size,毕竟并发,数量不定。
当执行execute方法时,实际就是调用Worker#run
方法:
- 如果是首次创建的
Worker
(因为没到达corePoolSize),创建时任务已经以firstTask
传入Worker
,直接执行firstTask
; - 如果是已有的
Worker
,从BlockingQueue
里取一个任务,执行;
具体参考ThreadPoolExecutor#Worker。
BlockingQueue
生产者消费者同步资源(这里是待执行任务)的队列。
具体参考生产者 - 消费者。
SynchronousQueue
- 长度为0的阻塞队列
如果BlockingQueue
使用大小为0的队列,会出现什么情况?对于上述线程池,如果设置size=0,会出错,因为ArrayBlockingQueue
不允许大小<=0,最小得是1。
如果大小为1,那么在任一任务执行完之前,最多提交n+1个任务(n1个core线程+queue里放一个+n2个非core线程)。之后提交的任务会根据RejectedExecutionHandler
的行为处理。
SynchronousQueue
则能够保证任务提交者和任务执行者(或者说生产者和消费者)做手递手传递:即只在有人接手任务的情况下,任务的提交才能成功,否则就只能等着(同样只有有人在等着提交任务,任务的获取才能成功,否则也只能等着)。
所以如果使用SynchronousQueue
,相当于在使用size=0的queue。
SynchronousQueue
常用来处理一些 两个(或多个)线程之间通过状态位进行协同阻塞唤醒 的场景。比如一个线程执行到一种状态后,另外一个线程才能开始执行。可以使用CountDownLatch
,也可以使用SynchronousQueue
,会更简单。
- https://www.baeldung.com/java-synchronous-queue
用于生产者消费者模型
生产者消费者模型中,二者通过BlockingQueue
协同。而ThreadPoolExecutor
中本身就带有一个BlockingQueue
。所以如果我们直接定义一个CallerBlocksPolicy
,同时把线程池的RejectedExecutionHandler
设置为CallerBlocksPolicy
,是不是就可以直接用线程池替代生产者消费者模型中的“BlockingQueue+消费者”这两部分了?
1
2
3
4
5
6
7
8
RejectedExecutionHandler callerBlocksPolicy = (r, executor) -> {
try {
// 不建议直接操作这个内部queue:Access to the task queue is intended primarily for debugging and monitoring.
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
理论上是的,但是不建议这么做!ThreadPoolExecutor
的确提供了getQueue()
方法来获取线程池内部的queue,但是它是一个内部queue,不应该被这么使用。而且该接口的doc也说了,仅应该用作debug或监控:
Access to the task queue is intended primarily for debugging and monitoring.
ThreadFactory
ThreadFactory
接口,只有一个newThread(Runnable)
方法用于创建Thread
,并执行任务(别忘了Runnable
参数),可以使得创建线程和使用线程解耦。
DefaultThreadFactory
:Executors
工具类里有内部类DefaultThreadFactory
。可通过Executors.defaultThreadFactory()
创建一个DefaultThreadFactory
。它给创建的线程设置了name和group,以非daemon的形式存在,优先级为Thread.NORM_PRIORITY
。Executors
里还有一个PrivilegedThreadFactory
继承自DefaultThreadFactory
,不过暂时应该还用不上。BasicThreadFactory
:JDK里的DefaultThreadFactory
不能配置线程优先级、daemon等,apache commons提供了BasicThreadFactory
,可以手动配置这些。
一般创建周期性任务的时候都用BasicThreadFactory
,创建的线程就可以全设为daemon了。
1
2
3
4
5
6
7
8
ExecutorService executor = new ThreadPoolExecutor(
2,
4,
20L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(),
new ThreadFactoryBuilder().setNameFormat("sub process" + "-%d").setDaemon(true).build()
);
RejectedExecutionHandler
当core线程满了,放置任务的BlockingQueue
也满了,非core线程也满了,那当前要提交的任务应该何去何从?
RejectedExecutionHandler
就是对这一行为的定义。该接口比较简单,就一个方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
该方法在ThreadPoolExecutor
无法提交任务时调用,r代表要提交的方法,executor是大年线程池。
AbortPolicy
:默认使用AbortPolicy
,即提交不了直接throw new RejectedExecutionException
。这就是CompletionService#submit
会抛RejectedExecutionException
的原因;1 2 3 4 5
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
DiscardPolicy
:提交不了拉倒,啥也不做。啥也不做其实就是扔了。但这个一定要注意,不要获取它的Future
。因为任务已经扔了,不会再被执行了,提交时创建的Future
的任务执行状态永远不会被改变,所以想要获取其值无异于等待戈多——永远也不可能等到;1 2
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
DiscardOldestPolicy
:提交不了,就把队头的拉出来扔了,把新的任务放进去;1 2 3 4 5 6 7 8
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 把队头的任务取出来,直接扔了 e.getQueue().poll(); // 再次尝试提交新任务 e.execute(r); } }
CallerRunsPolicy
:提交任务的线程自己跑。其实就是提交任务的当前线程直接调用Runnable#run
。但这样可能会带来性能问题,因为会妨碍当前线程提交任务。类似于领导自己亲自干活,等到小弟闲下来了,却发现没有领导给他们安排活了,导致工作线程空闲下来。1 2 3 4 5
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
自定义一个。CallerBlocksPolicy
:如果提交任务的线程发现交不了了,就卡着(线程挂起),直到队列有了新的位置,可以提交进去位置1 2 3 4
RejectedExecutionHandler callerBlocksPolicy = (r, executor) -> { // 不建议直接操作这个内部queue:Access to the task queue is intended primarily for debugging and monitoring. executor.getQueue().put(r); };
CallerBlocksPolicy
这种自定义的方式不是很合适,不过可以在这里列出来用作头脑风暴,以加深对线程池额理解。目前可能用到的场景就是大量数据从数据库读取时,如果直接读全部会OOM。所以采用流式读取。读数据的线程发现worker线程满负载运转,且BlockingQueue
队列堆满时,就直接卡住,不再继续从数据库流式加载数据了。
不推荐这种策略的主要原因是ExecutorService
里的BlockingQueue
本质上是由线程池管理的。如果手动操作这个BlockingQueue
,会影响线程池的状态。
而且,ExecutorService
接口本身没有getQueue()
方法,该方法是ThreadPoolExecutor
独有的。说明接口没想暴露queue给使用者。同时,该方法的javadoc也做了如下说明:
Returns the task queue used by this executor. Access to the task queue is intended primarily for debugging and monitoring. This queue may be in active use. Retrieving the task queue does not prevent queued tasks from executing.
ScheduledExecutorService
- 定时、周期执行者
拓展了ExecutorService
接口,加上了:
- 定时、只执行一次方法:
schedule
; - 周期执行方法:
scheduleAtFixedRate和scheduleWithFixedDelay
;
ScheduledThreadPoolExecutor
是其实现,和它的接口拓展了ExecutorService
一样,它拓展了ThreadPoolExecutor
。
Executors
工具类
在Executors
工具类中,提供了多种简单创建线程池(ThreadPoolExecutor
和ScheduledThreadPoolExecutor
)的方法,但是不建议使用Executors
创建thread pool。
此外还提供了:
RunnableAdapter
:Runnable
转Callable
用;DefaultThreadFactory
:创建线程用。
newCachedThreadPool
BlockingQueue
使用的是SynchronousQueue
,这个队列只在有消费者消费时才能put。所以这其实是个0大小的队列,实际上就是让生产者和消费者手递手(hand-off)交付任务。
所以它的名字叫CachedThread
,其实它相当于cache了一堆thread,当有任务出现时,直接把已有的cache好的thread拿来用。
同时它的thread数没设上限,如果线程不够就会一直创建线程。短时间内如果有大量任务,且执行时间不定,不要用这个(否则会创建巨多线程)。
关于SynchronousQueue
和LinkedBlockingQueue(1)
的区别:
- https://stackoverflow.com/questions/8591610/when-should-i-use-synchronousqueue
newFixedThreadPool
固定线程数的线程池。但是它使用的是一个无限大小的LinkedBlockingQueue
,可能会消耗大量内存资源,甚至会导致oom。而且,对于大多数场景无限排队没什么意义,client超时就不等待了,server把任务排下来也没什么意义。还有一个缺点是线程数固定,没什么弹性。
所以建议自己创建一个有线程限制、有排队大小限制、有弹性的ThreadPoolExecutor
:
1
new ThreadPoolExecutor(10, 20, 60, SECONDS, new ArrayBlockingQueue<Runnable>(1000), new AbortPolicy());
newWorkStealingPool
@since 1.8
MoreExecutors
- Guava
Guava也提供了一些方便的Executor
/ExecutorService
,但是都只能通过MoreExecutors
这个工具类创建实例。
DirectExecutor
使用本线程执行任务的Executor
。实现起来很简单——不通过Thread
start任务,直接调用run即可:
1
2
3
4
@Override
public void execute(Runnable command) {
command.run();
}
毕竟这也是一种特殊的Executor
。
值得注意的是,这是我第一次见到一个enum类实现接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* An {@link Executor} that runs each task in the thread that invokes {@link Executor#execute
* execute}.
*/
@GwtCompatible
enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
需要用的时候,直接获取该实例即可:
1
2
3
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}
如果有什么工具类,又不想全static,也不想new一个全局唯一的工具类实例,使用enum这种真的很方便啊!
exiting executor service
非daemon线程是会阻止JVM退出的。所以在创建执行不重要任务的线程池的时候,应该给它设置一个创建daemon线程的ThreadFactory
,所有创建出来的线程都是daemon。
Guava提供的MoreExecutors.getExitingExecutorService()
可以帮助把一个会阻挠JVM退出的刁民ExecutorService
转成良民,把ExecutorService
转换成一个exit executor service(当jvm退出时会自动关闭的线程池),这样我们也不需要操心线程池的关闭问题了。
具体实现就是:
- 修改
ExecutorService
的ThreadFactory
设置,线程属性全都设置为daemon(jdk默认的Executors.defaultThreadFactory()
创建的线程都不是daemon)。这一步很关键,如果没有显式调用exit,在有非daemon线程的情况下,jvm不会退出; - 给jvm添加一个shutdown hook:分别调用
service.shutdown()
和service.awaitTermination(terminationTimeout, timeUnit)
关闭这个线程池。即在jvm退出时,关闭该ExecutorService
;1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
final void addDelayedShutdownHook( final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) { checkNotNull(service); checkNotNull(timeUnit); addShutdownHook( MoreExecutors.newThread( "DelayedShutdownHook-for-" + service, new Runnable() { @Override public void run() { try { // We'd like to log progress and failures that may arise in the // following code, but unfortunately the behavior of logging // is undefined in shutdown hooks. // This is because the logging code installs a shutdown hook of its // own. See Cleaner class inside {@link LogManager}. service.shutdown(); service.awaitTermination(terminationTimeout, timeUnit); } catch (InterruptedException ignored) { // We're shutting down anyway, so just ignore. } } })); } void addShutdownHook(Thread hook) { Runtime.getRuntime().addShutdownHook(hook); }
这样我们只需要操心创建线程池就行了,不需要操心关闭线程池。
shutdown hook
所谓shutdown hook,就是一个thread。当jvm退出时,会执行该thread。shutdown hook的设计就是为了在关闭前释放资源,很符合线程池关闭的场景。
用jshell来演示一下shutdown hook:在退出(Ctrl + D或者System.exit()
)的时候,会调用shutdown hook:
1
2
3
4
5
6
7
8
9
10
11
~> jshell
| Welcome to JShell -- Version 17.0.9
| For an introduction type: /help intro
jshell> Thread printingHook = new Thread(() -> System.out.println("In the middle of a shutdown"));
...> Runtime.getRuntime().addShutdownHook(printingHook);
printingHook ==> Thread[Thread-0,5,main]
jshell> (使用Ctrl + D)
In the middle of a shutdown
~>
不过shutdown hook仅限于正常退出的场景:
- The last non-daemon thread terminates. For example, when the main thread exits, the JVM starts its shutdown process
- Sending an interrupt signal from the OS. For instance, by pressing Ctrl + C or logging off the OS
- Calling System.exit() from Java code
如果进程被突然kill了(kill -9),或者os崩了,jvm是没有机会调用shutdown hook的。
ListenableFuture
- https://www.baeldung.com/thread-pool-java-and-guava
感想
写四年了都……