AQS:显式锁的深层原理
AbstractQueuedSynchronizer(AQS)是 包括显式锁在内的一系列并发控制组件 的基础实现类。了解了AQS,对显式锁(和其他并发控制组件)的功能会有更深刻的理解。
所以不要一提到AQS就说“锁”,ReentrantLock只是它的一个具体实现。CountDownLatch、Phaser、Semaphore等也都是。
- 显式锁
- AQS
- 基于aqs的实现
- 其他非aqs的并发工具
显式锁
显式锁阐述了显式锁和内置锁synchronized的区别,并介绍了显式锁的功能。现在结合AQS来逐一认识这些功能的深入实现。
先说结论:AQS是基于底层锁实现的,它是底层锁的一个上层的封装,把它理解为一个底层锁的“代理”也不是不可以——它封装出了更方便更强大的功能,但本质的wait/notify还是用的底层锁。
- 基于
内置锁monitor实现:线程本来在object的waitset挂起等待,现在当然也还是在object的waitset上挂起等待。等待和唤醒这一套用的还是底层的东西; - 队列:虽然依然在object的waitset上挂起等待,但是同时在AQS里搞了一个队列。通过这个队列,给线程的先来后到排个序,唤醒的时候就能按自己维护的顺序去唤醒了。这个队列是AQS创造出来的,是一个更上层的东西,依托它实现了很多方便的功能;
- 公平锁:如果发现队列里有人了,就不再先尝试获取锁,而是直接排队去。但是非公平锁则是“先尝试抢一下”,不行再排队(而不是我们普通意义上的的“插队”);
- condition:一个单独的封装出来的能显式调用await/signal(基于wait/notify)的对象罢了;
所以AQS并没有太多神秘的地方。它就像一个底层锁的“代办”:本质给你办理贷款的还是公积金和银行,代办只是帮你跑腿。而且代办搞了一套自己的队列,所以比直接使用裸的公积金和银行能多提供一些功能(比如代办的帮排队帮等通知功能)。
AQS
aqs名为AbstractQueuedSynchronizer
,抽象的排队同步器。这名字听起来也很抽象,但是细品还挺贴切:
- 抽象:说明它只实现了一些通用功能,还有一些抽象方法交给子类去实现了。这些方法的具体实现决定了不同子类的差异;
- 排队同步器:指出了这些基于aqs的子类的通用逻辑,“排队同步”。他们都是在一个队列中通过排队完成同步工作。
所以学习aqs和基于它实现的共享/互斥组件,抓住两方面就行了:
- 变的:异
- 不变的:同,也就是aqs的部分
设计理念
在synchronized和Monitor中,介绍了内置锁synchronized的c++底层实现。主要就是entry set和wait set,获取锁失败的线程都在entry set里等着。
一个锁要么是互斥的,要么是共享的。AQS使用state表示当前锁的状态,初始值为0。锁的类型不同,AQS对state的解释也不相同,共享锁是值大于0,用的时候就减小值;互斥锁是值为0,用的时候就+1:
- 共享锁:可以初始化为大于0的值,代表可以同时访问共享资源的线程数(决定了线程的并发度)。每次线程占用,值减一,直到为0,说并共享锁的并发使用到达了上限;
- 互斥锁:互斥锁只能由一个线程使用,初始值为0,代表没有被占用(state值和共享锁的含义相反)。需要占用时,
acquire()
实际调用的是acquire(1)
,所以如果非零,就说明被别人占用了;
AQS维护了一个双端链表存储想要获取锁的线程。AQS作为abstract类,主要实现了双端链表的维护工作(也就是排队),具体加锁、释放锁的操作,由具体的子类去完成。不同的锁在实现时,有着不同的加锁解锁行为。
一般来讲,锁要么是互斥的,要么是共享的,所以aqs的两组方法,我们只需要关注其中的一组:
acquire(int)
/release(int)
:独占方式。arg为获取/释放锁的次数,尝试获取/释放资源,成功则返回True,失败则返回False。acquireShared(int)
/releaseShared(int)
:共享方式。arg为获取锁的次数,尝试获取资源。acquire方法返回负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。release方法如果释放后允许唤醒后续等待结点返回True,否则返回False。
aqs实现了上述方法里的大部分,把其中的一小部分封装为抽象方法交给子类去实现。比如查看acquire(int)
方法,可以看到:已实现的部分是acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)
,未实现的抽象方法是tryAcquire(int)
:
1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
这个是jdk21的实现,和8略有不同。
同样,release(int)
方法里调用了抽象方法tryRelease(int)
。
所以子类(CountDownLatch
、Semaphore
、Phaser
、ReentrantLock
等)的区别就在于他们是如何实现这些抽象的方法的!更具体的说,某两种互斥锁的区别就在于他们是怎么实现tryAcquire(int)
/tryRelease(int)
的;某两种同步锁的区别就在于他们是怎么实现tryAcquireShared(int)
/tryReleaseShared(int)
的。
因此,看到一个没见过的互斥/共享锁,我们只需要看看它以下两组方法中的一组是怎么实现的即可:
tryAcquire(int)
/tryRelease(int)
tryAcquireShared(int)
/tryReleaseShared(int)
Ref:
- https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
继承体系
由于基于aqs实现的东西实在是比较多,所以jdk有一套非常精致却庞杂的继承体系。如果直接自顶向下介绍,等看到感兴趣的地方的时候,已经不知道在说啥了,所以这次不打算直接自顶向下介绍aqs的架构。这次从用户的视角,以ReentrantLock#lock
为切入点,先自底向上简单介绍一下继承体系。然后点到为止,再自顶向下介绍一下aqs的架构。
ReentrantLock有两种实现:公平锁FairSync、非公平锁NonFairSync,二者都继承自Sync,先挑其中一个比如FairSync看即可。Sync继承自aqs,本身也是abstract,留下了tryAcquire和initialTryLock,交给FairSync去实现。
有了这两种Sync的存在,ReentrantLock#lock
的实现就很简单了,就是在调用Sync#lock
:
1
2
3
public void lock() {
sync.lock();
}
互斥/共享组件有一个统一的实现模式:他们都没有直接继承自aqs,而是使用了类似静态代理的方式,在内部持有一个或多个Sync,相关方法委托给Sync去实现,Sync继承自aqs。如果该组件有多种模式,就会在内部持有多种Sync。比如ReentrantLock,有公平/非公平两种模式,就持有FairSync/NonFairSync两种Sync。CountDownLatch没有这种区别,就只持有一种Sync。
至于ReentrantLock用的是哪一种Sync?默认是unfair,也可以指定使用fair:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Sync#lock
回到Sync#lock
的实现:
1
2
3
4
5
@ReservedStackAccess
final void lock() {
if (!initialTryLock())
acquire(1);
}
先忽略initialTryLock方法
显然,ReentrantLock是互斥锁,所以用到的方法对儿应该是aqs的acquire(1)
/release(1)
。仔细看Sync#lock
,它的实现里就是直接调用了aqs的acquire(1)
,符合我们的预期。
ReentrantLock作为互斥锁和别的锁的区别在哪儿呢?由前面的内容可知,就在tryAcquire(int)
的实现里。
Sync#tryAcquire(int)
但是Sync并没有实现tryAcquire,它是一个抽象类,有两个子类FairSync和NonFairSync。二者的区别,正在于他们对tryAcquire的实现不同。
前面刚说过:某两种互斥锁的区别就在于他们是怎么实现
tryAcquire(int)
/tryRelease(int)
的。
那么可以盲猜:
- FairSync是用公平的方式tryAcquire的
- NonFairSync是用非公平的方式tryAcquire的
什么是“公平的方式”,什么又是“非公平的方式”?看一眼实际代码就知道了:所谓是否公平,就是在排队之前,看看是否已经有人在排队了:
- 公平锁:如果有人排队,那我也排队,不插队
- 非公平锁:管它有没有人排队,先插队试试,可以抢到就撤,失败了再排队
FairSync#tryAcquire
:
1
2
3
4
5
6
7
8
9
10
11
/**
* Acquires only if thread is first waiter or empty
*/
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
只要有人在排队(hasQueuedPredecessors()
),就tryAcquire失败(return false)。
NonFairSync#tryAcquire
:
1
2
3
4
5
6
7
8
9
10
/**
* Acquire for non-reentrant cases after initialTryLock prescreen
*/
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
根本不判断队列里是否有人,直接就试试能不能拿到锁!
Sync#tryRelease(int)
Sync#tryRelease(int)
是在Sync里实现的,而不是在FairSync和NonFairSync里分别实现一份,说明他们的区别(公平/非公平)在于锁获取的逻辑,锁释放的逻辑是没区别的。
1
2
3
4
5
6
7
8
9
10
11
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
这些就是不同的aqs子类的主要区别。共享锁也可以用类似的方式去探索。
通用逻辑
上面说了半天互斥锁的不同,那么相同的地方是什么呢?就是排队逻辑。而这些逻辑已经由aqs统一实现了,下面自顶向下来看看这些在aqs中实现的各组件相同的框架。
以1.8里的aqs为例。
互斥锁
先看看aqs怎么实现互斥锁的。也就是怎么实现的acquire(int)
/release(int)
。
acquire(int):获取互斥锁
AQS获取互斥锁的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
:先尝试获取锁,如果成功就获取到了;acquireQueued
:如果失败,将线程加入链表。
加入链表后,要使用park方法把线程挂起在object的waitset上。
最终还是要使用底层的monitor。
tryAcquire - 交由子类实现
由前可知,尝试获取锁的方法tryAcquire是交由子类实现的重要方法。
需要注意的是:tryAcquire只返回true/false,不入队。入队挂起线程是在acquire方法里tryAcquire失败后做的事情。所以tryAcquire不会阻塞。
acquireQueued:然后挂起线程(park)
入队等待,怎么等待?AQS在tryAcquire失败之后,会先把这个线程封装成一个Node
,然后调用acquireQueued
。
acquireQueued
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
可以领悟“入队挂起”的本质:
- “挂起线程”用的是
LockSupport.park(this)
,把线程挂起到this,就是把线程挂起到这个对象上(AQS object)。所以挂起用的还是底层的挂起; - 只不过AQS维护了一个队列,在这个队列里,给这些线程排排座次,下次unpark的时候,按照AQS维护的队列挑一个线程唤醒。这样就避免了内置锁synchronized的缺点——不能指定唤醒,只能随机唤醒;
release(int):释放互斥锁
1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease,如果成功release互斥锁,要用unpark唤醒一个线程。
acquire/release本质上,用的是park/unpark方法,挂起/唤醒线程。
tryRelease - 交由子类实现
由前可知,tryRelease也是交由子类实现的重要方法。
唤醒线程 - unpark
如果tryRelease成功,代表锁释放了,AQS就会unpark一个等待的线程。
很重要的一点:这个方法是unparkSuccessor,看名字就知道AQS是依次唤醒队列里的下一个节点!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
所以只要进入了AQS的等待队列,所有的线程都是依次唤醒的。公平锁和非公平锁在这里表现一致。公平不公平体现在线程入队前,非公平锁入队前是可以抢占的。
这点和Redis - 分布式锁 vs. zookeeper介绍的zk的等待队列是一样的。
唤醒用的是unpark,unpark不需要指定blocker,只需要提供线程就行了:
1
2
3
4
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
应该是因为线程知道挂起它的对象吧?它应该持有blocker的引用。
共享锁
再看看aqs怎么实现共享锁的。也就是怎么实现的acquireShared(int)
/releaseShared(int)
。
acquireShared(int)
:获取共享锁
1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
获取共享锁失败,线程加入链表。同样,也会park。
由前可知:tryAcquireShared
也是交由子类实现的重要方法。
releaseShared(int)
:释放共享锁
1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
释放共享锁成功,unpark唤醒一个线程。同样,也会unpark。
由前可知:tryReleaseShared
也是交由子类实现的重要方法。
基于aqs的实现
介绍完了aqs的体系,下面再具体看一下各个基于aqs的实现者有什么不同。
ReentrantLock.Sync
ReentrantReadWriteLock.Sync
Semaphore.Sync
CountDownLatch.Sync
ThreadPoolExecutor.Worker
ReentrantLock
- 互斥锁
最常用的AQS的一个实现应该是显式互斥锁ReentrantLock,它是互斥锁,所以主要实现的是tryAcquire和tryRelease。它是Lock接口的主要实现。Lock接口的主要API:
lock()
: 阻塞式获取锁;lockInterruptibly()
:可中断lock()
;boolean tryLock()
:非阻塞lock()
,返回boolean,代表是否成功获取锁。即使失败,也不阻塞线程;tryLock(timeout)
:有限阻塞可中断lock()
,在timeout时间内阻塞,或者成功获取锁返回,或者被中断抛异常。或者时间到返回false;unlock()
:显式释放锁;newCondition()
:获取Condition对象,进阶版wait/notify。
ReentrantLock的静态内部类Sync是AQS的主要实现类。ReentrantLock的方法实际都是委托给Sync去实现的。
ReentrantLock因为支持公平锁和非公平锁,所以Sync进一步分成了:
- FairSync;
- NonfairSync;
这两个实现,都是ReentrantLock的静态内部类。创建ReentrantLock时,可以传入boolean决定是公平锁实现方式还是非公平锁实现方式:
1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock()
- 阻塞获取锁:成就撤,不成就入队等
ReentrantLock直接交由内部的Sync去实现:
1
2
3
public void lock() {
sync.lock();
}
由于Sync有FairSync和NonFairSync两个实现类,所以lock的实现,在公平锁和非公平锁两种场景下是不同的。
公平锁的lock实现,是使用父类的acquire:
1
2
3
final void lock() {
acquire(1);
}
再想想AQS的acquire实现,分了两步:
- tryAcquire;
- 成了线程就获取到锁了,失败了线程就乖乖加入队列;
tryAcquire实现
公平锁的tryAcquire是一个乖孩子:只要有人在我前面排队,我就不和他们抢,获取锁失败。这个try就没成功。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前没人占用
if (c == 0) {
// 但是有人排队
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 我自己占用,重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置重入后的state
setState(nextc);
return true;
}
return false;
}
tryAcquire只抢,不排队。如果tryAcquire失败,只是直接返回false。排队是acquire方法里的行为,是在tryAcquire失败后才进行的。因为tryAcquire失败了,按照父类AQS的acquire实现,线程乖乖入队。
线程如果重入锁了,会多扣减一份锁的共享资源值。
非公平锁的tryAcquire就不一样了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前没人占用
if (c == 0) {
// 管他有没有人排队,我先尝试CAS抢占一波
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 我自己占用,重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
显然,当state==0,即当前锁没有被占用的情况下,公平锁会先确认队列里有没有人已经在等待锁了,非公平锁根本不讲先来后到,先抢再说!这就是公平锁和非公平锁的本质区别。
当然,如果tryAcquire失败,二者都会按照AQS的acquire实现,乖乖入队。但这不属于tryAcquire做的事情。
非公平并不是传统意义上的“插队”,而是“入队前先尝试抢占”。和“插队”的意思还不完全一样。
tryLock()
- 非阻塞获取锁:就抢一下,不排队
tryLock的语义是此时此刻试着去抢一下,抢不到返回false就行了,根本不入队,所以不阻塞。tryLock也不分公平非公平,用了同一个实现(调用的都是nonfairTryAcquire
)。实际上tryLock的语义决定了它就不可能是公平的,因为公平锁只排队,不会在忽略队伍的情况下直接去抢,这和tryLock的语义本身就是矛盾的。
1
2
3
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
tryLock(long, TimeUnit)
- 有限阻塞可中断获取锁:光试着抢,不排队,时间到了就撤
tryLock(long, TimeUnit)
是要等待的,所以和tryLock()
不一样,而是和lock
一样,也需要排队。所以虽然名字都叫tryLock,两个方法差别还挺大的。
只不过,同样是等待唤醒,它和普通的排队唤醒不同:
- 线程唤醒:普通的排队线程需要等待自己的前置节点线程把自己唤醒;
- os唤醒:tryLock(long, TimeUnit)的排队是时间到后由os唤醒(看起来像是自己醒来了)。
因为有一定时间的阻塞,所以允许中断。
1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
调用了父类AQS的tryAcquireNanos
:
1
2
3
4
5
6
7
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
看最后一句,如果没有被中断,先tryAcquire一下,再进行timeout相关的阻塞获取。
由上面的内容可以知道,tryAcquire是子类实现的,公平锁和非公平锁有着不同的tryAcquire实现。
所以tryLock(timeout)和锁是否公平有关系:
- 对非公平锁来讲:试着抢一下,抢到就撤,抢不到就retry;
- 对公平锁来讲:看看是不是没人排队了,如果有就retry;
所以tryLock(timeout)是区分是否是公平锁的,和tryLock()直接抢不同。这是他们的一大区别。
retry - 唤醒 vs. 自醒
再看retry,也就是AQS实现阻塞的doAcquireNanos:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 排队
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 如果我就是队头,我会尝试获得锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 如果我不是队头,那就没法获取锁。时间到就结束,时间没到就继续挂起
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 接着挂起
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
先给线程入队等待:
- 如果醒来的时候它是队头,就能获得锁;
- 如果不是队头,时间也还没到,就使用
LockSupport.parkNanos()
继续挂起; - 否则返回false,tryLock失败;
当然,这里还做了优化,如果剩余时间不足
spinForTimeoutThreshold
(1000ns,即1微秒),不如做个自旋更高效。否则可能线程挂起操作所引起的耗时都比1微秒多。
这种写法是while true + LockSupport.park(nanos)
,而非我们经常会写的while true + Thread.sleep(millis)
。关于这种写法的问题,可以参考:https://blog.csdn.net/u013332124/article/details/84647915
unlock()
unlock调用的是AQS的release(int)
方法:
1
2
3
public void unlock() {
sync.release(1);
}
它和acquire方法一样,会调用子类的tryRelease
方法,最后再唤醒自己后面那个等待的线程:
1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒下一个排队的线程
unparkSuccessor(h);
return true;
}
return false;
}
公平锁和非公平锁在释放锁的时候并没有什么区别,都是唤醒aqs队列里的下一个线程,所以FairSync和NonfairSync两个子类共用了父类Sync的release方法。
newCondition()
获取一个Condition
对象。实际获取的是ConditionObject
,一个实现在AQS里的Condition
实现类。
Condition
- “独立waitset”
Condition接口想了半天,还是“独立waitset”的称呼更易懂一些。
synchronized里,试图获取锁,如果阻塞,放在entry set里;获取锁后,如果因为条件不满足而阻塞,放在waitset里。且这个waitset是混杂的,所有等待monitor资源的线程(both生产者and消费者)都在这里wait。Condition则分离了waitset。一个Condition相当于一个waitset,这样,不同类型的线程可以去不同的waitset里wait,同样唤醒线程也可以去不同的waitset里唤醒,就不会出现生产者唤醒生产者的尴尬局面,唤醒的效率提升了。
在生产者 - 消费者一文介绍Condition时,使用了生产者消费者的例子。两类线程,因同一个monitor(队列)的不同情形,进入不同的waitset。唤醒线程时,生产者可以只去消费者的waitset唤醒消费者,而不会不小心唤醒生产者。显然这样做更高效。
Condition的API:
await
/await(timeout)
:类似wait;signal
/signalAll
:类似notify/notifyAll;
其他基于aqs的组件都是内部一个Sync,Sync extends AQS,然后利用aqs的acquire/release实现自身逻辑。Condition则不然,它的实现类是ConditionObject,是AQS的内部类。所以await/signal这俩api并不是直接使用aqs的acquire/release实现的。
一个Condition就是一个AQS,就是一个独立的等待队列。所以可以new两个Condition对象,一个专门wait生产者,一个专门wait消费者。所以在使用的时候,我们获取两个condition就行了。队列节点和AQS用的是同一种Node,在condition的await/signal里,维护这个队列的入队出队。
await()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
主要是addConditionWaiter()的实现。Node用的依然是AQS里定义的Node。Node是一个双向链表的节点,但是Condition用Node的时候,并没有当做双向链表来用,仅仅是作为一个单链表去构建。
- https://www.cnblogs.com/nullllun/p/9000807.html
await就是把线程入队。
signal()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal是选队列里的第一个thread去唤醒。
使用方式
Condition仅仅是一个waitset,是条件不满足时,阻塞线程的地方,所以需要配合锁来使用。只要是aqs的实现类都能提供ConditionObject,也就是各个同步组件所持有的Sync都能提供ConditionObject。Lock
接口提供了newCondition()
方法,ReentrantLock就是Lock的一个实现,所以在使用ReentrantLock的时候可以直接从它那里获取Condition:
- 从锁获取一个Condition给生产者躺;
- 从锁获取一个Condition给消费者躺;
- 以生产者线程为例,尝试获取锁,执行锁内代码;
- 如果条件不满足(木有空间了),进到生产者的Condition躺好;
- 如果生产完成,有可以消费的东西了,唤醒一个消费者Condition里的线程。
所以Condition起什么名字不重要,关键是知道他们用来放哪种线程。
一开始获取锁失败,进入的是锁的等待队列(底层自然还是锁的entry set)。但是该wait的时候,不进入锁的wait set,而是进入Condition的等待队列(底层自然是Condition的entry set)。所以锁+Condition完成了之前锁的功能,好处在之前的一个wait set相当于细分了,变成了数个wait set。
ReentrantReadWriteLock
- 既同步又互斥的锁
read write lock的语义是:
- 读写互斥;
- 写写互斥;
- 读读不互斥;
它适用于读多写少的场景(当然1.8引入的StampedLock更优秀一些),它实现了ReadWriteLock接口,主要API就两个:
readLock()
:获取读锁;writeLock()
:获取写锁;
ReentrantReadWriteLock
是ReadWriteLock
,ReentrantLock是Lock
,他们并没有什么关系。但是ReentrantReadWriteLock
获取的读锁和写锁都是Lock
,和ReentrantLock
是同样的接口。所以ReentrantReadWriteLock
的用法就是先readLock()
/writeLock()
获取读/写锁,再像使用ReentrantLock
一样使用这两个锁的lock
/unlock
。
虽然看起来像是在一个ReentrantReadWriteLock里创建了两个AQS,但实际上ReentrantReadWriteLock只用了一个AQS,因为要做到读写互斥,两个aqs不能是完全独立的,要相互能够影响。只有一个AQS,怎么同时记录读锁和写锁的状态呢?jdk做了个trick,state是int,32bit,所以使用高16bit记录读锁state,使用低16bit记录写锁state。操作state的整体理念还是不变的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
shardCount和exclusiveCount是方便从state取read state和write state的封装方法。
如果用两个AQS,一个用于共享,一个用于互斥,那么他们的tryAcquire/tryAcquireShared如何根据对方的state以判断自己应不应该获取锁?确实不太方便。倒不如两个state合在一起,相当于记录在一个类里,读起来就很方便。
因为只用了一个AQS,这个AQS既可以共享,又可以互斥,所以tryAcquire/tryRelease和tryAcquireShared/tryReleaseShard这两套方法,它都要实现。
ReentrantReadWriteLock也支持公平锁和非公平锁两种,构造方法和ReentrantLock一样:
ReentrantReadWriteLock()
ReentrantReadWriteLock(boolean)
readLock()
& writeLock()
1
2
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
别忘了ReadLock和WriteLock是和ReentrantLock类似的锁,基本可以当做ReentrantLock来理解。所以获取完ReadLock和WriteLock,接下来就可以像操作ReentrantLock的方法一样来操作他们了,比如:
- lock/unlock
- lockInterruptibly
- tryLock/tryLock(timeout)
但事情并没有这么简单!读锁和写锁是相关联的,读锁被占用的时候(悲观读),写锁不能被获取!反之亦然。所以ReadLock和WriteLock并不是两个孤立的Lock,并不能像new的两个不相干的ReentrantLock一样去理解他们。
ReentrantReadWriteLock在创建时,让ReadLock和WriteLock持有了同一个Sync:
1
2
3
4
5
6
7
8
9
10
11
12
13
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
所以ReadLock和WriteLock关联了同一个锁实现。
tryAcquire - 写锁的lock
WriteLock获取锁的lock()方法:
1
2
3
public void lock() {
sync.acquire(1);
}
调用了AQS的acquire,根据前面的经验,它是需要先调用子类的tryAcquire,失败了再排队的。
WriteLock的tryAcquire在它的父类Sync里:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
// 写锁state
int w = exclusiveCount(c);
// 如果有读锁或写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 如果只有读锁,或者锁的占用者不是自己,失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
别的不说,这一段注释说明了一切:读锁被占用时,WriteLock的tryAcquire也是会失败的!
“如果有读锁或者写锁,且写锁的占用者是自己,获取锁成功”:所以一个读写锁的读和写是可以重入的——毕竟自己一个线程再怎么又读又写,也不会造成并发问题!
tryAcquireShared - 读锁的lock
同理,ReadLock获取读锁的方法lock():
1
2
3
public void lock() {
sync.acquireShared(1);
}
是一个共享锁实现,同样根据AQS的该方法实现,要先调用子类的tryAcquireShared。ReadLock的该方法也是在其父类Sync中的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 如果写锁不为0,失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁state
int r = sharedCount(c);
// 如果没有写锁,读锁是肯定可以获取成功的。但是要标记一下header是谁,重入了几次
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
同样注释说明了一切:写锁被占有时,ReadLock的tryAcquireShared也会失败!
正是这种互相限制,才能在互斥写、共享读的同时,做到读写互斥。
Semaphore
- 共享锁,控制并发度
信号量其实就是os里讲的那个mutex:
- mutex有个初始值,代表线程并发度;
- 每次acquire消耗一个值,如果没有值可消耗,线程阻塞;
- 每次release增加一个值,值增加会唤醒AQS等待队列里的下一个等待节点线程;
Semaphore一般用来控制线程并发度,比如hystrix会使用semaphore来控制下游服务调用的并发度。
Semaphore是个共享锁,它提供的api是:
- acquire(int)
- release(int)
虽然api的名字听起来像互斥锁,但api的名字不重要,重要的是实现:
1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
这个acquire api用的方法并不是aqs的acquire,而是acruireShared。所以和之前的总结一致,只要是共享锁,我们需要看的方法就是tryAcquiredShared
/tryReleaseShared
。共享锁和互斥锁的实现大同小异,互斥锁是state+1,只要state不是0就代表被占了;共享锁是state - k,只要够扣减就可以获取成功。所以tryAcquiredShared
/tryReleaseShared
和tryAcquire
/tryRelease
主要的区别是state - k。
和ReentrantLock一样,Semaphore内部也持有两种Sync,也是有公平非公平之分。公平看有无排队,有排队说明state已经不足了,直接获取失败。没有排队就试试余下的值还够不够扣除自己需要的值:
1
2
3
4
5
6
7
8
9
10
11
12
13
protected int tryAcquireShared(int acquires) {
for (;;) {
// 当前有没有人在排队
if (hasQueuedPredecessors())
return -1;
int available = getState();
// 还够不够扣的
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平同样不考虑排队,只考虑值够不够用:
1
2
3
4
5
6
7
8
9
10
11
12
13
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
总体来看,共享锁的tryAcquireShard比互斥锁(ReentrantLock)的tryAcquire好实现:
- 共享锁只需要考虑值够不够;
- 互斥锁还要判断有线程获取锁时,那个线程是不是自己;
用途
hystrix调用下游服务的时候,用Semaphore控制并发度。
jdk里Semaphore的doc也举了个类似的例子,用Semaphore控制对某些资源访问的并发线程数。
CountDownLatch
- 共享锁,“一波人给另一波人放行”
倒数n次之后,所有阻塞的线程都可以执行了。
先看看countdown latch的api:
- await:等着,其实就是线程挂起;
- countdown:倒数一次,数到0,所有await的线程都可以执行了。
怎么用互斥/共享锁实现这个语义呢?
- 首先肯定用共享锁,互斥锁是只有一个人能成功,这里countdown得好多人都能countdown。
- aqs的acquire会阻塞,release不会。这里await会阻塞,countdown不会。所以countdown方法用release实现,await方法用acquire实现。
- 普通共享锁的release是state + k。这里countdown要用到release,所以这里的release得是state - 1;
- 普通共享锁的acquire是在不断state - k,余额>0为成功。这里await要用到acquire,所以这里的acquire就判断state是否>0就行了;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void countDown() {
sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
用途
CountDownLatch
适合一堆异步线程任务都结束了,再在主线程里继续执行代码。但是必须在构造函数里设置好countdown数,也就是说异步任务数量是提前确定的。
ThreadPoolExecutor#Worker
ThreadPoolExecutor
里的Worker
是AQS的互斥锁实现,主要为了保证任务的执行是互斥的。
具体参考ThreadPoolExecutor#Worker。
其他非aqs的并发工具
还有一些好用的并发工具,用途和上述并发工具不尽相同,可以放在一起讨论,但他们并不基于aqs实现。
StampedLock
- 乐观读,互斥写
比ReadWriteLock更适合读多写少的场景!
关于它能取代ReentrantReadWriteLock
的原因,参考显式锁。它的主要优势就是把读分成了乐观读、悲观读,写还是悲观写。这样平时乐观读不加锁,偶尔有了写线程也不会饥饿,所以更适合读多写少的场景。
StampedLock
不是谁的子类,所以跟ReadWriteLock接口也没有什么关系。它的主要方法有:
- readLock()
- writeLock()
- tryOptimisticRead()/validate(long)
他们返回的基本都是long,作为凭证。其实就是“版本”,用来校验两次读之间,数据有没有被改过。
tryOptimisticRead()
& validate(long)
看之前介绍的例子,能够很直观理解他们的用法。
他们的实现则比较简单,就是返回一个状态:
1
2
3
4
5
6
7
8
9
10
/**
* Returns a stamp that can later be validated, or zero
* if exclusively locked.
*
* @return a stamp, or zero if exclusively locked
*/
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
校验当前状态还是不是之前读的那个状态:
1
2
3
4
public boolean validate(long stamp) {
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
CyclicBarrier
- “自己给自己放行”
CyclicBarrier类似于“xx式过马路”,是只要凑够一拨人就一起冲破barrier继续做后面的事情。是“自己给自己放行”。CountDownLatch是一拨人给另一拨人放行。
api:
CyclicBarrier(int parties)
:构造函数里必须指明要凑够多少参与者;await()
:到达一个参与者;reset()
:重置。如果还有人在await,抛异常BrokenBarrierException
;
人齐了,await就之星成功,不再阻塞,线程就可以继续往下执行了。
也可以提前设置好回调barrierAction,当await成功时,回调会被执行(貌似所有await的线程都会执行一遍?):
CyclicBarrier(int parties, Runnable barrierAction)
为什么用party这个词?
在Java中,CyclicBarrier是一种同步辅助工具,它允许一组线程相互等待,直到所有线程都到达某个公共屏障点。在CyclicBarrier中,“party”指的是要等待的线程数量,也就是到达屏障点的线程数。当达到指定的数量时,所有线程将被释放并可以继续执行。“Party”这个词在这里的意思是一群人或者一组参与者。在CyclicBarrier中,这个术语用于表示一组线程或参与者,因为CyclicBarrier是用于协调多个线程之间的同步,所以使用”party”这个词来表示参与者或者线程组是比较贴切的。
participate?
用途
适合一堆线程互相等待,够数之后再一起执行。有一种“自动倒计时”的感觉。
CyclicBarrier只有一个方法:
- await:等够人了就可以继续执行,等不够就挂起。
Phaser
- count up and down
CountDownLatch
和CyclicBarrier
可以手动或自动控制任务都完成了再一起行动,但是必须提前设置好countdown数,也就是说异步任务数量是提前确定的。
Phaser可以随时动态调整任务数,或者说既可以count down,又可以count up(灵活性主要体现在在构造函数时不需要强制指定目前有多少参与协作的线程,可以在运行时动态改变)。
- 初始化
Phaser(1)
,最后主线程也要完成任务,所以初始值(待完成任务)为1; - 有一个异步任务,
Phaser#register()
让要完成的任务数+1,相当于count up; - 完成一个异步任务,
Phaser#arriveAndDeregister()
让已完成任务数+1,相当于countdown; - 主线程可以使用
arriveAndAwaitAdvance
阻塞,直到数值归零再继续运行;
或者也可以用Phaser#arrive(),不countdown,而是给完成的任务数+1,相当于逻辑上的countdown。所以不deregister,这样就可以记录完整的数据统计,通过getRegisteredParties/getArrivedParties/getUnarrivedParties获取。
用途
适合不断产生新异步任务的场景。比如相等所有的异步任务都结束了再退出,就可以:
- 产生新异步任务,+1;
- 完成一个异步任务,-1;
当所有异步任务发出去之后,数值归零的时候,就是所有异步任务都完成的时候。
RateLimiter - 访问速率
guava,用来控制事情发生的速率,比如http请求速率:
This is in contrast to
Semaphore
which restricts the number of concurrent accesses instead of the rate (note though that concurrency and rate are closely related, e.g. see Little’s Law ).