文章

AQS:显式锁的深层原理

AbstractQueuedSynchronizer(AQS)是 包括显式锁在内的一系列并发控制组件 的基础实现类。了解了AQS,对显式锁(和其他并发控制组件)的功能会有更深刻的理解。

所以不要一提到AQS就说“锁”,ReentrantLock只是它的一个具体实现。CountDownLatch、Phaser等也都是。

  1. 显式锁
  2. AQS
  3. 互斥锁
    1. acquire(int):获取互斥锁
      1. tryAcquire - 交由子类实现
      2. acquireQueued:然后挂起线程(park)
    2. release(int):释放互斥锁
      1. tryRelease - 交由子类实现
      2. 唤醒线程 - unpark
  4. 共享锁
    1. acquireShared(int):获取共享锁
    2. releaseShared(int):释放共享锁
  5. ReentrantLock - 互斥锁
    1. ReentrantLock() & ReentrantLock(boolean)
    2. lock() - 阻塞获取锁:成就撤,不成就入队等
      1. tryAcquire实现
    3. tryLock() - 非阻塞获取锁:就抢一下,不排队
    4. tryLock(long, TimeUnit) - 有限阻塞可中断获取锁:光试着抢,不排队,时间到了就撤
      1. retry - 唤醒 vs. 自醒
    5. unlock()
    6. newCondition()
  6. Condition - “独立waitset”
    1. await()
    2. signal()
    3. 使用方式
  7. ReentrantReadWriteLock - 既同步又互斥的锁
    1. ReentrantReadWriteLock() & ReentrantReadWriteLock(boolean)
    2. readLock() & writeLock()
    3. tryAcquire - 写锁的lock
    4. tryAcquireShared - 读锁的lock
  8. StampedLock - 乐观读,互斥写
    1. tryOptimisticRead() & validate(long)
  9. Semaphore - 共享锁,控制并发度
    1. 用途
  10. CountDownLatch - 共享锁,“一波人给另一波人放行”
    1. 用途
  11. CyclicBarrier - “自己给自己放行”
    1. 用途
  12. Phaser - count up and down
    1. 用途
  13. ThreadPoolExecutor
  14. RateLimiter - 访问速率

显式锁

显式锁阐述了显式锁和内置锁synchronized的区别,并介绍了显式锁的功能。现在结合AQS来逐一认识这些功能的深入实现。

先说结论:AQS是基于底层锁实现的,它是底层锁的一个上层的封装,把它理解为一个底层锁的“代理”也不是不可以——它封装出了更方便更强大的功能,但本质的wait/notify还是用的底层锁。

  • 基于内置锁monitor实现:线程本来在object的waitset挂起等待,现在当然也还是在object的waitset上挂起等待。等待和唤醒这一套用的还是底层的东西;
  • 队列:虽然依然在object的waitset上挂起等待,但是同时在AQS里搞了一个队列。通过这个队列,给线程的先来后到排个序,唤醒的时候就能按自己维护的顺序去唤醒了。这个队列是AQS创造出来的,是一个更上层的东西,依托它实现了很多方便的功能
  • 公平锁:如果发现队列里有人了,就不再先尝试获取锁,而是直接排队去。但是非公平锁则是“先尝试抢一下”,不行再排队(而不是我们普通意义上的的“插队”);
  • condition:一个单独的封装出来的能显式调用await/signal(基于wait/notify)的对象罢了;

所以AQS并没有太多神秘的地方。它就像一个底层锁的“代办”:本质给你办理贷款的还是公积金和银行,代办只是帮你跑腿。而且代办搞了一套自己的队列,所以比直接使用裸的公积金和银行能多提供一些功能(比如代办的帮排队帮等通知功能)。

AQS

synchronized和Monitor中,介绍了内置锁synchronized的c++底层实现。主要就是就是entry set和wait set,获取锁失败的线程都在entry set里等着。

一个锁要么是互斥的,要么是共享的。AQS使用state表示当前锁的状态,初始值为0。锁的类型不同,AQS对state的解释也不相同:

  • 共享锁:可以初始化为大于0的值,代表可以同时访问共享资源的线程数(决定了线程的并发度)。每次线程占用,值减一,直到为0,说并共享锁的并发使用到达了上限。
  • 互斥锁互斥锁可以理解为state初始值为1的共享锁,只能由一个线程使用。实际实现时,初始值为0,代表没有被占用(state值和共享锁的含义相反)。如果非零,就说明被别人占用了;

AQS维护了一个双端链表存储想要获取锁的线程。AQS作为abstract类,主要实现了双端链表的维护工作,具体加锁、释放锁的操作,由具体的子类去完成。不同的锁在实现时,有着不同的加锁解锁行为。

一般来讲,锁要么是互斥的,要么是共享的,所以一个子类可能只需要考虑实现以下两组方法中的一组

  • tryAcquire(int)/tryRelease(int):独占方式。arg为获取/释放锁的次数,尝试获取/释放资源,成功则返回True,失败则返回False。
  • tryAcquireShared(int)/tryReleaseShared(int):共享方式。arg为获取锁的次数,尝试获取资源。acquire方法返回负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。release方法如果释放后允许唤醒后续等待结点返回True,否则返回False。

Ref:

  • https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

互斥锁

acquire(int):获取互斥锁

AQS获取互斥锁的方式:

1
2
3
4
5
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. tryAcquire:先尝试获取锁,如果成功就获取到了;
  2. acquireQueued:如果失败,将线程加入链表。

加入链表后,要使用park方法把线程挂起在object的waitset上

最终还是要使用底层的monitor。

tryAcquire - 交由子类实现

尝试获取锁的方法tryAcquire是交由子类实现的重要方法

有一点需要搞明白:tryAcquire只返回true/false,不入队。入队挂起线程是在acquire方法里tryAcquire失败后做的事情。所以tryAcquire不会阻塞

acquireQueued:然后挂起线程(park)

入队等待,怎么等待?AQS在tryAcquire失败之后,会先把这个线程封装成一个Node,然后调用acquireQueued

acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

可以领悟“入队挂起”的本质:

  1. “挂起线程”用的是LockSupport.park(this),把线程挂起到this,就是把线程挂起到这个对象上(AQS object)。所以挂起用的还是底层的挂起
  2. 只不过AQS维护了一个队列,在这个队列里,给这些线程排排座次,下次unpark的时候,按照AQS维护的队列挑一个线程唤醒。这样就避免了内置锁synchronized的缺点——不能指定唤醒,只能随机唤醒;

release(int):释放互斥锁

1
2
3
4
5
6
7
8
9
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease,如果成功release互斥锁,要用unpark唤醒一个线程

acquire/release本质上,用的是park/unpark方法,挂起/唤醒线程。

tryRelease - 交由子类实现

tryRelease也是交由子类实现的重要方法

唤醒线程 - unpark

tryRelease之后,因为锁释放了,AQS会unpark一个等待的线程。

很重要的一点:这个方法是unparkSuccessor,看名字就知道AQS是依次唤醒队列里的下一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

所以只要进入了AQS的等待队列,所有的线程都是依次唤醒的。公平锁和非公平锁在这里表现一致。公平不公平体现在线程入队前,非公平锁入队前是可以抢占的。

这点和Redis - 分布式锁 vs. zookeeper介绍的zk的等待队列是一样的。

唤醒用的是unpark,unpark不需要指定blocker,只需要提供线程就行了:

1
2
3
4
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

应该是因为线程知道挂起它的对象吧?它应该持有blocker的引用。

共享锁

acquireShared(int):获取共享锁

1
2
3
4
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

获取共享锁失败,线程加入链表。同样,也会park。

tryAcquireShared也是交由子类实现的重要方法。

releaseShared(int):释放共享锁

1
2
3
4
5
6
7
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

释放共享锁成功,unpark唤醒一个线程。同样,也会unpark。

tryReleaseShared也是交由子类实现的重要方法。

ReentrantLock - 互斥锁

最常用的AQS的一个实现应该是显式互斥锁ReentrantLock,它是互斥锁,所以主要实现的是tryAcquire和tryRelease。它是Lock接口的主要实现。Lock接口的主要API:

  • lock(): 阻塞式获取锁;
  • lockInterruptibly()可中断lock()
  • boolean tryLock()非阻塞lock(),返回boolean,代表是否成功获取锁。即使失败,也不阻塞线程;
  • tryLock(timeout)有限阻塞可中断lock(),在timeout时间内阻塞,或者成功获取锁返回,或者被中断抛异常。或者时间到返回false;
  • unlock():显式释放锁;
  • newCondition():获取Condition对象,进阶版wait/notify。

ReentrantLock的静态内部类Sync是AQS的主要实现类。ReentrantLock的方法实际都是委托给Sync去实现的。

ReentrantLock因为支持公平锁和非公平锁,所以Sync进一步分成了:

  • FairSync;
  • NonfairSync;

这两个实现,都是ReentrantLock的静态内部类。

ReentrantLock() & ReentrantLock(boolean)

先看构造方法。创建ReentrantLock时,可以传入boolean决定是公平锁实现方式还是非公平锁实现方式:

1
2
3
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

lock() - 阻塞获取锁:成就撤,不成就入队等

ReentrantLock直接交由内部的Sync去实现:

1
2
3
    public void lock() {
        sync.lock();
    }

由于Sync有FailSync和NonFairSync两个实现类,所以lock的实现,在公平锁和非公平锁两种场景下是不同的。

公平锁的lock实现,是使用父类的acquire:

1
2
3
        final void lock() {
            acquire(1);
        }

再想想AQS的acquire实现,分了两步:

  1. tryAcquire;
  2. 成了线程就获取到锁了,失败了线程就乖乖加入队列;

tryAcquire实现

公平锁的tryAcquire是一个乖孩子:只要有人在我前面排队,我就不和他们抢,获取锁失败。这个try就没成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            
            // 当前没人占用
            if (c == 0) {
                // 但是有人排队
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 我自己占用,重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 设置重入后的state
                setState(nextc);
                return true;
            }
            return false;
        }

tryAcquire只抢,不排队。如果tryAcquire失败,只是直接返回false。排队是acquire方法里的行为,是在tryAcquire失败后才进行的。因为tryAcquire失败了,按照父类AQS的acquire实现,线程乖乖入队。

线程如果重入锁了,会多扣减一份锁的共享资源值

非公平锁的tryAcquire就不一样了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            
            // 当前没人占用
            if (c == 0) {
                // 管他有没有人排队,我先尝试CAS抢占一波
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 我自己占用,重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

显然,当state==0,即当前锁没有被占用的情况下,公平锁会先确认队列里有没有人已经在等待锁了,非公平锁根本不讲先来后到,先抢再说!这就是公平锁和非公平锁的本质区别。

当然,如果tryAcquire失败,二者都会按照AQS的acquire实现,乖乖入队。但这不属于tryAcquire做的事情。

非公平并不是传统意义上的“插队”,而是“入队前先尝试抢占”。和“插队”的意思还不完全一样

tryLock() - 非阻塞获取锁:就抢一下,不排队

tryLock是不分公平非公平的,用了同一个实现(调用的都是nonfairTryAcquire),因为tryLock的语义就是此时此刻试着去抢一下。

看实现也很明显,不管什么锁,就调用nonfairTryAcquire:

1
2
3
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

抢不到返回false就行了,根本不入队,所以不阻塞

tryLock(long, TimeUnit) - 有限阻塞可中断获取锁:光试着抢,不排队,时间到了就撤

tryLock(long, TimeUnit)和lock一样,也需要排队。这一点和tryLock()不同。

只不过,同样是等待唤醒,它和普通的排队唤醒不同:

  • 线程唤醒:普通的排队线程需要等待自己的前置节点线程把自己唤醒;
  • os唤醒:tryLock(long, TimeUnit)的排队是时间到后由os唤醒(看起来像是自己醒来了)

因为有一定时间的阻塞,所以允许中断。

1
2
3
4
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

调用了父类AQS的tryAcquireNanos:

1
2
3
4
5
6
7
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

看最后一句,如果没有被中断,先tryAcquire一下,再进行timeout相关的阻塞获取。

由上面的内容可以知道,tryAcquire是子类实现的,公平锁和非公平锁有着不同的tryAcquire实现。

所以tryLock(timeout)不等于 tryLock() + retry,而是“公平/非公平tryLock() + retry”

  • 对非公平锁来讲:试着抢一下,抢到就撤,抢不到就retry;
  • 对公平锁来讲:看看是不是没人排队了,如果有就retry;

所以tryLock(timeout)是区分是否是公平锁的,和tryLock()直接抢不同。这是他们的一大区别。

retry - 唤醒 vs. 自醒

再看retry,也就是AQS实现阻塞的doAcquireNanos:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    /**
     * Acquires in exclusive timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        
        // 排队
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                // 如果我就是队头,我会尝试获得锁
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                
                // 如果我不是队头,那就没法获取锁。时间到就结束,时间没到就继续park
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

先给线程入队等待:

  1. 如果醒来的时候它是队头,就能获得锁;
  2. 如果不是队头,时间也还没到,就使用LockSupport.parkNanos()继续挂起;
  3. 否则返回false,tryLock失败;

当然,这里还做了优化,如果剩余时间不足spinForTimeoutThreshold(1000ns,即1微秒),不如做个自旋更高效。否则可能线程挂起操作所引起的耗时都比1微秒多

这种写法是while true + LockSupport.park(nanos),而非我们经常会写的while true + Thread.sleep(millis)。关于这种写法的问题,可以参考:https://blog.csdn.net/u013332124/article/details/84647915

unlock()

unlock调用的是AQS的release(int)方法:

1
2
3
    public void unlock() {
        sync.release(1);
    }

它和acquire方法一样,会调用子类的tryRelease方法,最后再唤醒自己后面那个等待的线程:

1
2
3
4
5
6
7
8
9
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

公平锁和非公平锁在释放锁的时候并没有什么区别,都是唤醒aqs队列里的下一个线程,所以FairSync和NonfairSync两个子类共用了父类Sync的release方法

newCondition()

获取一个Condition对象。实际获取的是ConditionObject,一个实现在AQS里的Condition实现类。

Condition - “独立waitset”

Condition接口想了半天,还是“独立waitset”的称呼更易懂一些。

无论内置锁还是显式锁,都有一个内置的waitset,用来存放wait的线程。且这个waitset是混杂的,所有等待monitor资源的线程都在这里wait。

Condition则分离了waitset。一个Condition相当于一个waitset,这样,不同类型的线程可以去不同的waitset里wait,同样唤醒线程也可以去不同的waitset里唤醒。

生产者 - 消费者一文介绍Condition时,使用了生产者消费者的例子。两类线程,因同一个monitor(队列)的不同情形,进入不同的waitset。唤醒线程时,生产者可以只去消费者的waitset唤醒消费者,而不会不小心唤醒生产者。显然这样做更高效。

Condition的API:

  • await/await(timeout):类似wait;
  • signal/signalAll:类似notify/notifyAll;

Condition的实现类是ConditionObject,是AQS的内部类。这样,所有的AQS子类,如果有获取Condition的需要,都可以获取这个ConditionObject。

一个Condition就是一个AQS,一个专门wait生产者,一个专门wait消费者。所以在使用的时候,我们获取两个condition就行了。

一个condition object就是一个单独的等待队列,队列节点和AQS用的是同一种Node,在condition的await/signal里,维护这个队列的入队出队。

await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
        
        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

主要是addConditionWaiter()的实现。Node用的依然是AQS里定义的Node。Node是一个双向链表的节点,但是Condition用Node的时候,并没有当做双向链表来用,仅仅是作为一个单链表去构建。

  • https://www.cnblogs.com/nullllun/p/9000807.html

await就是把线程入队。

signal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        
        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
        
    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

signal是选队列里的第一个thread去唤醒。

使用方式

Condition仅仅是一个waitset,是条件不满足时,阻塞线程的地方,但是并没有锁的功能。所以需要配合锁来使用:

  1. 从锁获取一个Condition给生产者躺;
  2. 从锁获取一个Condition给消费者躺;
  3. 以生产者线程为例,尝试获取锁,执行锁内代码;
  4. 如果条件不满足(木有空间了),进到生产者的Condition躺好;
  5. 如果生产完成,有可以消费的东西了,唤醒一个消费者Condition里的线程。

所以Condition起什么名字不重要,关键是知道他们用来放哪种线程。

一开始获取锁失败,进入的自然还是锁的entry set。但是该wait的时候,不进入锁的wait set,而是进入Condition的waitset。所以锁+Condition完成了之前锁的功能,好处在之前的一个wait set相当于细分了,变成了数个wait set

ReentrantReadWriteLock - 既同步又互斥的锁

read write lock的语义是:

  1. 读写互斥;
  2. 写写互斥;
  3. 读读不互斥;

它适用于读多写少的场景(当然1.8引入的StampedLock更优秀一些),它实现了ReadWriteLock接口,主要API就两个:

  • readLock():获取读锁;
  • writeLock():获取写锁;

ReentrantReadWriteLockReadWriteLock,ReentrantLock是Lock,他们并没有什么关系……

所以它的用法就是先readLock()/writeLock()获取读/写锁,再使用这两个锁的lock/unlock使用锁。

ReentrantReadWriteLock的读锁和写锁用的是内部类实现ReadLock和WriteLock,他们都实现了Lock接口。所以ReadLock和WriteLock才更像ReentrantLock,他们都是Lock接口的子类。

虽然看起来像是在一个ReentrantReadWriteLock里创建了两个AQS,但实际上ReentrantReadWriteLock只用了一个AQS。但是只有一个AQS,怎么同时记录读锁和写锁的状态呢?jdk做了个trick,state是int,32bit,所以使用高16bit记录读锁state,使用低16bit记录写锁state……操作state的整体理念还是不变的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         */

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

shardCount和exclusiveCount是方便从state取read state和write state的封装方法。

如果用两个AQS,一个用于共享,一个用于互斥,那么他们的tryAcquire/tryAcquireShared如何根据对方的state以判断自己应不应该获取锁?确实不太方便。倒不如两个state合在一起,相当于记录在一个类里,读起来就很方便。

因为只用了一个AQS,这个AQS既可以共享,又可以互斥,所以tryAcquire/tryRelease和tryAcquireShared/tryReleaseShard这两套方法,它都要实现

ReentrantReadWriteLock也支持公平锁和非公平锁两种。

ReentrantReadWriteLock() & ReentrantReadWriteLock(boolean)

构造方法和ReentrantLock一样。

readLock() & writeLock()

1
2
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

别忘了ReadLock和WriteLock是和ReentrantLock类似的锁,基本可以当做ReentrantLock来理解。所以获取完ReadLock和WriteLock,接下来就可以像操作ReentrantLock的方法一样来操作他们了,比如:

  • lock/unlock
  • lockInterruptibly
  • tryLock/tryLock(timeout)

但事情并没有这么简单!ReadLock和WriteLock并不是两个孤立的Lock,所以并不能像new的两个不相干的ReentrantLock一样去理解他们。

ReentrantReadWriteLock在创建时,让ReadLock和WriteLock持有了同一个Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

所以ReadLock和WriteLock关联了同一个锁实现。为什么?因为读锁和写锁是相关联的,读锁被占用的时候(悲观读),写锁不能被获取!反之亦然。

tryAcquire - 写锁的lock

WriteLock获取锁的lock()方法:

1
2
3
        public void lock() {
            sync.acquire(1);
        }

调用了AQS的acquire,根据前面的经验,它是需要先调用子类的tryAcquire,失败了再排队的。

WriteLock的tryAcquire在它的父类Sync里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            // 写锁state
            int w = exclusiveCount(c);
            // 如果有读锁或写锁
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // 如果只有读锁,或者锁的占用者不是自己,失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

别的不说,这一段注释说明了一切:读锁被占用时,WriteLock的tryAcquire也是会失败的

“如果有读锁或者写锁,且写锁的占用者是自己,获取锁成功”:所以一个读写锁的读和写是可以重入的——毕竟自己一个线程再怎么又读又写,也不会造成并发问题!

tryAcquireShared - 读锁的lock

同理,ReadLock获取读锁的方法lock():

1
2
3
        public void lock() {
            sync.acquireShared(1);
        }

是一个共享锁实现,同样根据AQS的该方法实现,要先调用子类的tryAcquireShared。ReadLock的该方法也是在其父类Sync中的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            // 如果写锁不为0,失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 获取读锁state
            int r = sharedCount(c);
            // 如果没有写锁,读锁是肯定可以获取成功的。但是要标记一下header是谁,重入了几次
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

同样注释说明了一切:写锁被占有时,ReadLock的tryAcquireShared也会失败

正是这种互相限制,才能在互斥写、共享读的同时,做到读写互斥

StampedLock - 乐观读,互斥写

比ReadWriteLock更适合读多写少的场景!

关于它能取代ReentrantReadWriteLock的原因,参考显式锁。它的主要优势就是把读分成了乐观读、悲观读,写还是悲观写。这样平时乐观读不加锁,偶尔有了写线程也不会饥饿,所以更适合读多写少的场景。

StampedLock不是谁的子类,所以跟ReadWriteLock接口也没有什么关系。它的主要方法有:

  • readLock()
  • writeLock()
  • tryOptimisticRead()/validate(long)

他们返回的基本都是long,作为凭证。其实就是“版本”,用来校验两次读之间,数据有没有被改过。

tryOptimisticRead() & validate(long)

看之前介绍的例子,能够很直观理解他们的用法。

他们的实现则比较简单,就是返回一个状态:

1
2
3
4
5
6
7
8
9
10
    /**
     * Returns a stamp that can later be validated, or zero
     * if exclusively locked.
     *
     * @return a stamp, or zero if exclusively locked
     */
    public long tryOptimisticRead() {
        long s;
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }

校验当前状态还是不是之前读的那个状态:

1
2
3
4
    public boolean validate(long stamp) {
        U.loadFence();
        return (stamp & SBITS) == (state & SBITS);
    }

Semaphore - 共享锁,控制并发度

信号量其实就是os里讲的那个mutex:

  1. mutex有个初始值,代表线程并发度
  2. 每次acquire消耗一个值,如果没有值可消耗,线程阻塞;
  3. 每次release增加一个值,值增加会唤醒AQS等待队列里的下一个等待节点线程;

所以Semaphore实现的方法是tryAcquiredShared/tryReleaseShared。它也有公平非公平之分:

公平看有无排队,有排队说明state已经不足了,直接获取失败。没有排队就试试余下的值还够不够扣除自己需要的值:

1
2
3
4
5
6
7
8
9
10
11
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

非公平同样不考虑排队,只考虑值够不够用:

1
2
3
4
5
6
7
8
9
10
11
12
13
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
        
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

总体来看,共享锁的tryAcquireShard比互斥锁(ReentrantLock)的tryAcquire好实现:

  1. 共享锁只需要考虑值够不够;
  2. 互斥锁还要判断有线程获取锁时,那个线程是不是自己;

用途

可以使用两个Semaphore控制生产者和消费者,允许一定程度的并发生产,生产完成后消费者消费,消费掉才能继续生产。

CountDownLatch - 共享锁,“一波人给另一波人放行”

倒数n次之后,所有阻塞的线程都可以执行了。

用途

CountDownLatch适合一堆异步线程任务都结束了,再在主线程里继续执行代码。但是必须在构造函数里设置好countdown数,也就是说异步任务数量是提前确定的。

CountDownLatch的语义:

  • await:等着,其实就是线程挂起;
  • countdown:倒数一次,数到0,所有await的线程都可以执行了。

CountDownLatch是用AQS实现的共享锁。别人的release都是state+1,它的tryReleaseShared把计数器-1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    public void countDown() {
        sync.releaseShared(1);
    }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

tryAcquire也改变了语义,变成当state为0是,线程执行,否则无法执行(会被挂起到AQS队列):

1
2
3
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

对于计数器为1的latch,减一次,state == 0的条件就都满足了,所有挂起“等待锁”的线程都可以执行了。

CyclicBarrier - “自己给自己放行”

CyclicBarrier类似于“xx式过马路”,是只要凑够一拨人就一起冲破barrier继续做后面的事情。是“自己给自己放行”。CountDownLatch是一个人给另一拨人放行。

用途

适合一堆线程互相等待,够数之后再一起执行。有一种“自动倒计时”的感觉。

CyclicBarrier只有一个方法:

  • await:等够人了就可以继续执行,等不够就挂起。

Phaser - count up and down

CountDownLatch和CyclicBarrier可以手动或自动控制任务都完成了再一起行动,但是必须提前设置好countdown数,也就是说异步任务数量是提前确定的

Phaser可以随时动态调整任务数,或者说既可以countdown,又可以count up(灵活性主要体现在在构造函数时不需要强制指定目前有多少参与协作的线程,可以在运行时动态改变)。

  1. 初始化Phaser(1),最后主线程也要完成任务,所以初始值(待完成任务)为1;
  2. 有一个异步任务,Phaser#register()让要完成的任务数+1,相当于count up;
  3. 完成一个异步任务,Phaser#arriveAndDeregister()让已完成任务数+1,相当于countdown;
  4. 主线程可以使用arriveAndAwaitAdvance阻塞,直到数值归零再继续运行;

或者也可以用Phaser#arrive(),不countdown,而是给完成的任务数+1,相当于逻辑上的countdown。所以不deregister,这样就可以记录完整的数据统计,通过getRegisteredParties/getArrivedParties/getUnarrivedParties获取。

用途

适合不断产生新异步任务的场景。比如相等所有的异步任务都结束了再退出,就可以:

  1. 产生新异步任务,+1;
  2. 完成一个异步任务,-1;

当所有异步任务发出去之后,数值归零的时候,就是所有异步任务都完成的时候。

ThreadPoolExecutor

ThreadPoolExecutor的Worker是AQS的互斥锁实现……

RateLimiter - 访问速率

guava,用来控制事情发生的速率,比如http请求速率:

This is in contrast to Semaphore which restricts the number of concurrent accesses instead of the rate (note though that concurrency and rate are closely related, e.g. see Little’s Law ).

本文由作者按照 CC BY 4.0 进行授权