Java多线程第十篇--通过CountDownLatch再探AbstractQueuedSynchronizer的共享模式
highlight: a11y-dark
我正在参与掘金技术社区创作者签约计划招募活动,点击链接报名投稿。
上一篇中,我们一起学习了ReentrantLock的独占式公平上锁的过程,从中我们深入了解了AbstractQueuedSynchronizer(AQS) 同步队列的源码以及Condition等待队列的实现机制。
这一篇中,我们再从另一个实现类CountDownLatch再来看看AQS共享模式的源码分析。
先来看看CountDownLatch。
CountDownLatch是啥?怎么用?如何工作的?
关于CountDownLatch的名字,网上众说纷纭:倒计时器,倒数的门闩,同步计数器等等。它究竟是什么呢?
场景示例
下面直接上demo,以及场景来说明CountDownLatch的用法和工作原理: ```java public class CountDownLatchDemo extends Thread {
public CountDownLatchDemo(String name) {
this.setName(name);
}
/**
* 本例是一个模仿三人欢乐斗地主的游戏,当三个玩家都准备好的时候,游戏就开始发牌了
*/
static CountDownLatch countDownLatch = new CountDownLatch(3);
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "进入游戏房间~");
try {
// 模拟玩家进入房间准备的过程
Random random = new Random();
int time = (random.nextInt(10) + 1);
for (int i = 0; i < time; i++) {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "准备中。。。");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "已进入房间并且准备好了~");
countDownLatch.countDown();
}
public static void main(String[] args) throws InterruptedException {
System.out.println("游戏开始,等待玩家进入游戏房间~");
CountDownLatchDemo player1 = new CountDownLatchDemo("玩家1");
CountDownLatchDemo player2 = new CountDownLatchDemo("玩家2");
CountDownLatchDemo player3 = new CountDownLatchDemo("玩家3");
player1.start();
player2.start();
player3.start();
countDownLatch.await();
System.out.println("所有玩家都已进入游戏房间,开始发牌了~~!!");
}
}
运行效果如下:
java
游戏开始,等待玩家进入游戏房间~
玩家1进入游戏房间~
玩家3进入游戏房间~
玩家2进入游戏房间~
玩家3准备中。。。
玩家3已进入房间并且准备好了~
玩家1准备中。。。
玩家2准备中。。。
玩家1准备中。。。
玩家2准备中。。。
玩家1准备中。。。
玩家2准备中。。。
玩家2准备中。。。
玩家2已进入房间并且准备好了~
玩家1准备中。。。
玩家1准备中。。。
玩家1已进入房间并且准备好了~
所有玩家都已进入游戏房间,开始发牌了~~!!
```
代码场景描述:这段代码很简单,我们平时都玩欢乐斗地主的手机游戏吧,示例就模拟了这个场景:平台开了一个房间,然后等待三个玩家进入房间并准备,当三个玩家都准备好了,就开始发牌进入游戏~我们做如下假设:平台并不知道玩家什么时候进入房间并且准备好,这时候平台准备了一个倒数的计数器,初始值为3,当有一个玩家进入房间并点击准备好了后,计数器则减一,当计数器等于0的时候,则说明房间三人已满,启动发牌指令,游戏开始!
CountDownLatch工作原理
从这个简单的例子来看,我们的主角CountDownLatch好像一个计数器,当计数器的数值大于0时,所有调用await方法的线程都会等待(当然,例子中是main主线程),当其他线程达成某些条件,或者任务做完了,则会将计数器减一,当计数器的值变为0 的时候,所有调用await的线程都将被唤醒,继续执行下面的业务。
其实在之前的文章的开头部分讲线程间协作的方法的时候就做过说明,CountDownLatch是可以使得多个线程协作完成一个或者多个任务的一个工具类,用来协调多个线程间的同步,可以起到线程间通信的作用。
CountDownLatch使用场景
CountDownLatch的使用场景真的很多,总结如下: - 当一个线程A需要其他一些线程的执行结果,A线程才能继续执行的场景 - 比如上面的三人斗地主的场景;再比如我们在写Service层一个大型查询统计的时候,这个业务需要统计好几张大表,每张表统计都需要很长一段时间,如果按照顺序一个个统计,则是时间累计;而当我们使用CountDownLatch,再new几个线程,每个线程各执行一个查询,每当一个查询线程做完,将计数器减一,最后为0的时候,主线程则汇总各个线程的结果即可。 - 可以实现指定几个线程并控制这些线程并行执行。 注意:这里是并行,而不是并发。 - 这里的意思就是你可以指定这个任务几个线程去共同完成,并且可以控制他们同时进行,从而减少时间的浪费和性能的提升。
CountDownLatch三大方法分析
以上通过例子说了CountDownLatch的工作原理以及如何使用,下面将进入我们的主题,源码分析:透过CountDownLatch来看AQS共享方式获取同步资源。
以下便是CountDownLatch的源码实现了,其实很简单: 其中,自定义同步器的方法具体实现见下图: 由此可见,要搞明白CountDownLatch的实现原理,还是要看AQS中是如何通过共享方式同步资源的。
继续,Sync作为AQS的子类且作为CountDownLatch的内部类,这种实现方式几乎和重入锁一样。从Sync的源码可以看出,实现的是共享式获取tryAcquireShared和释放tryReleaseShared,这就证明了CountDownLatch是共享式锁的典型实现。
由上面的Demo场景分析,我们只需要分析三大函数具体做了啥就可以搞清楚啦。
游戏平台定义计数器,构造函数public CountDownLatch(int count)
java
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
这是CountDownLatch的构造函数,我们注意看注释部分对count的解释,中文意思大概如下:在线程通过await方法之前,必须调用countDown()方法。简单理解就是就是想要调用await方法的线程被唤醒,只有调用countdown方法使得count变为0。
我们继续深入此方法,此方法最终其实是初始化自定义同步器Sync,如下:
java
Sync(int count) {
setState(count);
}
到此我们可以发现,CountDownLatch构造器最后其实就是初始化了AQS同步器并且对state值进行了设置,这个值即为CountDownLatch的同步资源的个数。
主线程调用await方法
这时通过打断点的方式,我们得到了下面的调用路径,await()-> sync(AQS).acquireSharedInterruptibly(1)->tryAcquireShared(arg),由此我们先看AQS.acquireSharedInterruptibly方法:
java
//此时传入的arg==1
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//首先判断当前线程是否中断,如果中断则直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
/**
调用tryAcquireShared方法,如果返回值小于0,则执行doAcquireSharedInterruptibly方法,
大概意思就是如果尝试获取锁资源成功的话,就会返回,否则将要进行排队,或许会进入阻塞等环节。
即tryAcquireShared(arg)小于0,代表线程获取共享锁失败
大于0,代表当前线程获取锁成功,接下来的线程能够成功获取看情况
等于0,则代表当前线程获取成功,接下来的线程肯定就会失败了
*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
到这里,我们再结合场景和CountDownLatch分析:
场景1
main线程执行await()方法,此时没有玩家准备好,即没有线程执行countDown方法,则方法执行到Sync.tryAcquireShared方法,此时state=3,则tryAcquireShared返回-1,则【if (tryAcquireShared(arg) < 0)】返回true,进入到方法doAcquireSharedInterruptibly(arg),见如下源码实现以及底下的分析:
java
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//1
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//2
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 1、这时主线程main进来,调用addWaiter方法进行node包装
- 此处addWaiter方法和之前的讲独占锁几乎一样,做的事情大概如下:封装线程节点,判断队列是否初始化,没有的话,先初始化同步队列,然后通过尾插入的方式,将当前节点插入到同步队列
- 2、取出当前节点的前置节点,判断前置节点是否为头节点,此时队列里只有主线程节点,很明显当前主线程节点的前置节点就是head,所以又会再次tryAcquireShared,如果这时还是没有玩家线程执行countdown方法,很明显【if (r >= 0)】不会成立,则进入到分支【if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())】,这一段的实现几乎和上一篇的一样,就是将前置节点的ws状态改为SIGNAL,然后正式进入同步队列后执行park等待(当然这里面也有个自旋再次尝试获取同步资源的执行,我们假设还是不行)。
到此main线程,就进入等待状态了。
场景2
main主线程处于park等待状态后,这时玩家1准备好了执行countDown方法,这时我们看看执行countDown方法后会发生什么?调用逻辑如下countDown-> sync.releaseShared(1)->AQS.releaseShared(1)->Sync.tryReleaseShared(1),先来看看AQS.releaseShared(1)
java
//AQS.releaseShared(1)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
这时【if (tryReleaseShared(arg))】返回true,执行doReleaseShared(),如果返回false,则直接返回false
再来看看Sync.tryReleaseShared(1)
java
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
上段源码大概意思如下:进入一个自旋过程,如果取出的state等于0,则说明同步资源此时已经全部被减掉了,直接返回false结束,不做任何处理;如果不等于0,则进行CAS state-1操作,直到成功减1,并且返回减1后的值是否为0,如果为0,则返回true,如果不为0,则返回false。
按照场景2的分析,此时玩家1执行countdown后,最终会在【releaseShared(int arg)】方法返回false,不做任何处理,这样合情合理。
场景3
玩家1执行后,紧接着玩家2也执行了countDown方法,然后这时玩家3也执行了countDown方法。
在按照上面源码的分析我们可以知道,当玩家3执行countDown方法后,这时tryReleaseShared(int releases)返回true,则执行AQS.doReleaseShared()方法,我们来看下:
java
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这段代码其实很复杂,我们先结合场景把流程走下去,玩家3进入该方法,取出头节点,且此时头节点ws==SIGNAL,所以执行【compareAndSetWaitStatus(h, Node.SIGNAL, 0)】将头节点的状态CAS方式修改成0,如果失败的话,将继续来一遍,成功的话,则执行【unparkSuccessor(h)】,这个方法就是唤醒头节点的下一个节点。
这时主线程节点将被唤醒,则会进入进入到doAcquireSharedInterruptibly方法的循环体中,如下:
```java
/*
* Acquires in shared interruptible mode.
* @param arg the acquire argument
/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这时主线程节点再次执行tryAcquireShared(arg)的返回结果为1,即【if (r >= 0)】成立,则执行这个分支下的语句,即【setHeadAndPropagate(node, r);p.next = null;failed = false;return;】首先是setHeadAndPropagate(node, r),如下:
java
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
``` 将主线程节点设为头节点后,如果可能的话,将继续唤醒后继的节点,去尝试加锁,为什么继续唤醒呢,因为是共享锁嘛,当前的释放了,那说明后继等待的又可以继续获取锁了,这边跟独占锁不一样,独占锁只能是当前获取锁的线程释放后,才能唤醒下一个,而共享锁是可以继续后继节点来尝试获取资源。而显然,按照我们当前场景,主线程从park状态出来,则程序就继续往下执行了,到此执行结束。
以上便是CountDownLatch的构造器/await/countDown的流程分析了,我们来总结下: - CountDownLatch三大函数:构造器对计数器初始值的设定;await方法能让线程进入等待状态,继续执行的条件是计数器变为0;countdown方法则是将计数器进行减1操作,当减到为0就会唤醒等待的线程。 - CountDownLatch内部自定义了Sync的同步器,实现了共享式的tryAcquireShared和tryReleaseShared两个方法,分别用于获取共享锁和释放共享锁。 - tryAcquireShared:如果state为0,则返回1,即当计数器变为0的时候,全部线程都可以尝试得到共享锁,而当计数器非0的时候,返回-1,则将线程交给AQS同步队列管理。 - tryReleaseShared:主要是通过自旋的形式进行减1操作,如果减1之后发现计数器变为0了,则会进行释放锁并唤醒等待线程的操作。
AQS的共享锁模式
在上面分析CountDownLatch的时候,关于AQS共享锁的源码分析都是一笔带过,有很多的细节没抠,很复杂的~因为说多了,反而不利于大家理解CountDownLatch的原理,下面我们就着重来看下AQS的共享锁模式。
在前面篇章的渲染下,我们就不写demo来看了,其实如果你搞懂了前篇文章独占锁的上锁和释放锁的流程,再来看共享锁,我相信会容易很多,因为很多地方都是类似的。
共享锁VS独占锁
共享锁和独占锁最大区别就是,独占锁是当前只有一个线程可以持有锁,从源码就可以看出AQS有个属性exclusiveOwnerThread,就是用来存放当前持有锁的线程对象,这时其他线程来尝试加锁,只能进行排队等待,只有等持有锁的线程释放了,才会唤醒队列中排在第一个的线程进行尝试加锁,当然这期间也可以被其他线程尝试抢占,但同一时间只能有一个线程能够成功。
而对于共享锁,顾名思义,它可以同时被多个线程持有锁资源,换句话说,如果有一个线程获取锁成功了,其他来加锁的或者在队列等待的线程都可以尝试加锁,而且极有可能会加锁成功。
(PS:还不懂?再举个粗俗的例子,上厕所都上过吧:独占锁就像家里的卫生间,只有一个马桶,同一时间只能有一个人在里面方便;而共享锁就像外面的公共厕所,有好几个坑,同一时间可以有多个人一起方便。。。)
下面我来搞个表格直接从方法的角度来对比下:(我们只看最简单的哈~那些什么中断的,超时处理的,就留给小伙伴们自己分析了) | 独占锁 | 共享锁 | | --- | --- | | acquire(int arg) | acquireShared(int arg) | | tryAcquire(int arg) | tryAcquireShared(int arg) | | acquireQueued(final Node node, int arg) | doAcquireShared(int arg) | | release(int arg) | releaseShared(int arg) | | tryRelease(int arg) | tryReleaseShared(int arg) | | unparkSuccessor(Node node) | doReleaseShared() |
貌似他俩的方法差不多都能对应上,也就最后一个名字好像不太一样,但其实你进入doReleaseShared方法里面,你就会知道,其实它最终也调用了unparkSuccessor方法,只不过共享锁比独占锁多了不一样的地方,我们下面具体分析会说的。下面正文开始:(PS:本篇文字较多,需要加点想象力)
共享锁的获取分析
上源码:代码1acquireShared(int arg) 代码2tryAcquireShared(int arg) ```java / * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ //代码1 //AQS.acquireShared(arg) public final void acquireShared(int arg) { / 这个方法的实现很简单,如果调用全了,就只调用了两个方法 一个是尝试获取共享锁,见下面代码2 的分析 一个是获取失败了,进行排队(也可能自旋再次尝试获取锁成功,或者进入排队等待的方法) */ if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
//代码2
//AQS.tryAcquireShared
//是一个留待程序员自己实现的方法,是一个空方法,为什么我们还要看他呢,因为它的定义很重要,我们看
//它的英文注释,以及我的大概翻译。。。
/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
大概意思如下:
该方法尝试以共享模式获取锁资源,实现该方法的自定义同步器需要自己去定义查阅资源是否可用
如果可以,则获取它,如果不可以,则调用此方法的acquire方法将会让当前线程排队
第二个我们看返回值的定义:
返回值小于0:获取锁资源失败
返回值等于0:获取锁资源成功,且再有线程来获取,将会不成功
返回值大于0:获取锁资源成功,再有线程来获取,有可能会成功。
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
以上分析总结来看,也就是说**只要tryAcquireShared返回值大于等于0,就代表获取共享锁成功,而小于0,就代表没有资源可以获取,就要进入doAcquireShared(arg)方法,想来也合情合理哈~** 下面着重来看doAcquireShared(arg)方法,**代码3**
java
//代码3
//AQS.doAcquireShared
private void doAcquireShared(int arg) {
/
对比来看,这个方法的流程几乎和独占式的一样
首先调用addWaiter包装当前线程为队列节点,并且将节点尾插入到队列,如果队列还未初始化,则先初始化队列
要说不同,也就是节点的类型不一样,一个是独占式的,一个是共享式的
/
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
/
再然后也是进入一个自旋的过程,这个过程和独占式也是差不多的
首先也是取出当前节点的前置节点,如果前置节点是head,则再次尝试获取锁资源(防止之前持有锁
的线程释放锁了);如果获取成功了,则调用setHeadAndPropagate(node, r),将当前节点设置为
head,这里有个不同的是,独占是是直接setHead,而这边是setHeadAndPropagate,见代码4
如果不成功的话,就会执行下面的shouldParkAfterFailedAcquire(p, node)
&&parkAndCheckInterrupt(),这一段在之前的篇章分析过了,主要就是设置前节点的状态为
SIGNAL,然后会再次尝试获取,如果再次失败,就会正式入队,阻塞等待
如果前置节点不是head,则会直接进入shouldParkAfterFailedAcquire(p, node)
&&parkAndCheckInterrupt(),紧接着又会去获取一次前置节点再来一次过程,防止做上一段循环的
过程中,已经有人释放锁了,当前的节点变成了head的后继节点。。。如果不是,则会正式入队,阻塞
等待。。。。。是不是很绕。。。(当然这一段的过程和独占锁是一样的)
*/
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//代码4 //AQS.setHeadAndPropagate private void setHeadAndPropagate(Node node, int propagate) { / 我们看到,首先直接调用setHead方法,设置为头节点,不同的是下面一段 / Node h = head; // Record old head for check below setHead(node); / * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. 咋一看,好多条件啊,都是||的关系,即下面的条件只要有一个为true,就会去取后继节点 且如果后继节点不为空或者后继节点是共享节点的话,就会调用doReleaseShared(),这个方法 是用来释放锁资源的,我们下面会分析
从上面的分析来看,这个方法里除了设置head节点外,还加了一个过程,就是当满足一些条件后,
除了当前的线程会获得锁资源并且出队列后,将继续释放锁资源,以唤醒后继节点尝试获取锁。
哪些条件呢,而且这些条件的判断讲究了一个顺序。。。我们知道||操作有个特点,||之前的为
true,他就会看都不看后面一眼,直接true,我们来看:
1、如果 propagate > 0 成立:propagate是上一层传过来的,是tryAcquireShared(arg)方法的
返回值,我们上面分析过,tryAcquireShared(arg)返回值大于0,代表当前线程获取锁成功,再有
线程来获取,有可能会成功,即说明锁资源现在有多余的了,那这时去通知后继节点去加锁,没毛病。
如果不成立,则不就代表没有锁资源可以供尝试获取吗?答案是否定的,你要记住我们分析的是多线
程并发场景,可能就在setHead方法的这段时间里,已经发生了很多的事情。。。(所以这边的||操作
大神讲究了顺序。)
2、下面的判断其实很有意思,把他合起来看h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0),h为旧的head,看旧的head是否为空,然后设置头节点的
head重新赋给h,即为新的头节点,且他们判断的条件是一致的,他其实就是在看,在设置头节点的前后,
头节点是否为空,如果为空则直接调用doReleaseShared,其实意思也就是head都为空了,说明一个问题,
队列不存在了,没有人排队,那是不是意味着可能会获取到锁资源啊;如果head节点不为空,则看它的状态
如果状态小于0,说明头节点可以被唤醒/传播唤醒。。。所以也可以执行doReleaseShared
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
// private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ```
共享锁的获取分析总结
- 在线程获取失败的情况下,会进入【doAcquireShared(int arg)】方法,取出前置节点:
- 1、如果前置节点是head,则进行CAS尝试加锁操作:
- 1.1、如果加锁失败,或者没机会加锁,则就会将前置节点设置为SIGNAL,然后再阻塞等待。
- 1.2、如果加锁成功,则设置为头节点,并且可能会尝试唤醒后继节点去尝试加锁
- 2、如果前置节点不是head,则先将前置节点设置为SIGNAL,然后再继续自旋,可能会再次进入到1的流程,也有可能直接将前置节点设置为SIGNAL,然后再阻塞等待。
- 1、如果前置节点是head,则进行CAS尝试加锁操作:
共享锁的释放分析
直接上源码,代码5,代码6 ```java //代码5 //AQS.releaseShared public final boolean releaseShared(int arg) { /* 线程调用release之后,会先调用tryReleaseShared(arg),boolean类型返回值,见代码6分析 即当前线程尝试释放锁资源,成功的话,执行doReleaseShared,释放锁的源码分析的精髓在这里。。 见代码7的分析 / if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
//代码6
/**
这也是个空方法,我们看方法的定义和作用
参数arg,释放锁资源的个数,但经常是1,即一个线程持有一个锁资源
返回值boolean,如果尝试释放锁资源成功的话,则返回true,返回到releaseShared就是去执行
doReleaseShared,进行锁的释放
失败的话,则没有操作。。。
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
**代码7**
java
//代码7
//AQS.doReleaseShared
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
先大体翻译下:
确保释放过程的传播性,即使有其他线程正在进行共享资源的获取/释放。
如果以通常的方式进行,且头节点为SIGNAL状态,尝试唤醒解除head的后继等待节点。
如果不是上面的操作,则把状态设置为PROPAGATE,以确保在释放时进行传播。
另外,我们必须进行循环,以防在执行此操作时添加了新节点。
而且,与unpark后继器的其他使用不同,我们需要知道CAS重置状态是否失败,如果失败则需要重新检查
按照上面的意思翻译下来,乍一看,不懂他在说啥。。。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//1
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//2
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//3
if (h == head) // loop if head changed
break;
}
}
``` 在分析代码7之前,我们先看下,这个函数貌似在之前加锁的过程中也调用了,所以这个方法有两个地方调用,第一个就是releaseShared的时候,第二个就是在尝试加锁成功,设置为head节点后,也会去调用该方法。
几个问题
- 为什么这两边都要调用,为什么在尝试加锁成功后,还去调用释放锁的方法?到底是哪个线程调用了这个方法?
- 我们细看这个方法里的实现,for(;;),又是一个自旋,头都大了,根据之前的经验,既然是自旋,那肯定有退出循环的条件,那条件是什么?
- 按照我们之前分析独占锁释放的过程理解,独占锁释放锁成功,并且成功唤醒后继线程,就直接退出了,为什么共享锁这边需要进行自旋,它的目的是什么?也就是说这个方法到底做了哪些事情? 我们带着这些问题,逐行分析上面的过程。。。其实挺难理解的,如果不对,望海涵。。。
我们在上面的代码中,我标出了 1 2 3 三个关键if分支: - 1、首先进入1的判断,取出头节点h,并判断头节点状态 - 如果状态为SIGNAL,则通过CAS方式将SIGNAL改为0 - 成功的话,就会去执行unparkSuccessor释放head的后继等待节点,然后去进入3的判断 - 如果CAS失败,则继续再来一遍CAS。 - 如果取出的ws不为SIGNAL,则进入2(注意:1和2其实是一个互斥的情况,也就是说当前调用此方法的线程取出来的ws值,要么走1的分支,要么走到2) - 2、如果ws==0,什么情况下ws==0,首先在执行上面的【compareAndSetWaitStatus(h, Node.SIGNAL, 0)】后,ws会等于0,在当前线程下显然不会,那还有什么情况ws==0,这时我们就会想到,我们每次有新节点的加入时,新节点(其实也是最后一个节点)ws会等于0,且恰恰刚变为head,那么这时就会将头节点的状态CAS改为PROPAGATE,失败的话,将继续CAS修改,成功的话,则进入3 - 3、如果之前取出的h和head没有发生变化,则跳出循环,否则继续循环,将继续循环唤醒下一个节点。。。循环往复。。
分析到这,我们来看看上面的问题 - 首先这个方法在两处调用,一处是在线程尝试获取锁成功,然后设置头节点后,调用了;还有一处是线程自己去调用releaseShared的时候。假设如下场景,同步队列中有head(X)->A->B->C->D四个线程排队等待(X代表当前任意一个线程持有了锁),假设现在A线程被唤醒并且拿到了共享锁,那么A就会被设置为头节点,如下head(A)->B->C->D,这时A会调用doReleaseShared方法,唤醒B,假设B也获取到了锁,则就成为了新的头节点,当下队列变为head(B)->C->D,B这时设置头节点后,他也会去调用doReleaseShared方法(其实在上面两个过程中,是极有可能同时进行的,记住,我们是多线程并发,多线程并发!!这时有很多人就说明明是A唤醒B的,怎么可能同时呢?那我这边再假设假如不是A唤醒的,是上面的X呢?。。)也就是说上面的A线程在执行doReleaseShared循环体的过程中,B线程已经把头节点设置为自己了,且B也去触发了doReleaseShared方法,B也进入了doReleaseShared的循环体中了(也就是说A线程还在执行doReleaseShared循环体的过程中,头节点已经不是之前的头节点了),所以我们才有上面代码3的判断(h==head),而且我们也看到了只有当h==head的时候,才会退出循环,这也就是回答了第二问题。 从中我们也可以看到其实这个过程好像是一个不断发生,不断的去触发doReleaseShared的过程。为什么大神代码这么写,我猜测是这样,大神是想:不断地通过这种方式,来加快共享锁获取/释放的过程,只要当前执行循环体中head节点发生了变化,其实也就是说有其他线程在释放锁,说明锁资源又有了,那么我就继续循环,继续释放锁,继续唤醒后继节点来尝试加锁(我认为这是大神极致优化的体现)。 其实不止上面的流程会导致这样不断的循环,我们再假设,这时有个线程F,本来是持有锁的,现在任务结束了,他就调用releaseShared方法,同样也会进入这个循环体。 相信分析到这里已经回答了上面的几个问题了。
这时我们再回过头来,看看这个方法里的注释以及翻译,是不是觉得抓住点什么了,我把注释及翻译再贴一次:
java
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//1
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//2
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//3
if (h == head) // loop if head changed
break;
}
}
翻译解释,也当作是doReleaseShared的总结
- 1、确保释放过程的传播性,即使有其他线程正在进行共享资源的获取/释放。
- 这里其实是对整个方法的一个描述,他是一个不断重复进行释放锁尝试获取锁的过程。
- 2、如果以通常的方式进行,且头节点为SIGNAL状态,尝试唤醒解除head的后继等待节点。
- 这边其实是对第一个if分支的解释
- 3、如果不是上面的操作,则把状态设置为PROPAGATE,以确保在释放时进行传播。另外,我们必须进行循环,以防在执行此操作时添加了新节点。
- 这两句话就是对第二个分支的解释:这边呼应了我上面的分析,每次有新节点的加入时,新节点(其实也是最后一个节点)ws会等于0,且恰恰刚变为head,再然后其实在【(ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))】的前面,还有一个大前提条件【(h != null && h != tail】,说明此时队列里至少两个节点(一个是head,还有一个又是一个新节点),虽然很极限,但却是有可能存在这种情况。 这时我们再看:【(ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))】假如ws==0为true,当【compareAndSetWaitStatus(h, 0, Node.PROPAGATE)】为false的时候,代表这个CAS操作失败了,说明有其他线程将ws的值修改了,什么情况会有其他线程修改ws的值,肯定又有新节点加入且将他改为了SIGNAL,则这时取反后为true,则会continue,继续CAS修改,否则进入3,又是一个有可能继续循环,继续唤醒的过程。。。。从上面的分析来看,这个条件很苛刻,反正我是想不到这样的场景。。。大神就是大神,牛逼plus!!真的是把各种细节和场景都想到了(我也是想了好久,翻阅了无数资料总结的,如有不对,望海涵。。)
- 4、而且,与unpark后继器的其他使用不同,我们需要知道CAS重置状态是否失败,如果失败则需要重新检查
- 这边是对第一个分支下,CAS将SIGNAL改为0失败的解释,失败的话,则continue,继续CAS尝试修改。修改成功的话,则唤醒h的后继节点。
到这里其实releaseShared还未讲完,继续:如果节点被唤醒,则被唤醒的节点,将继续进入doAcquireShared的循环体,继续CAS尝试获取锁,而这时一般情况下都是可以获取到锁资源的,则会设置为head,同时又将继续执行doReleaseShared,再来循环。。。(脑袋都快被循环坏了)
其实这时再回过头看AQS.setHeadAndPropagate的源码,里面说到了在设置好头节点后,满足一定的条件后,将调用doReleaseShared,其实这个条件无非就是大神优化到极致的体现了。
java
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
共享锁的获取/释放总结
共享锁的实现其实大部分和独占锁类似,但也有不同,大致如下: - 第一个区别当然也是最大的区别就是,共享锁的锁资源可以被多个线程同时占有(同时上厕所方便~),而独占锁一个时间只能是一个线程持有锁。 - 其次,由于共享锁资源可以被多个线程同时占有,所以在线程自旋尝试获取到锁的同时,会去唤醒后继节点也来尝试获取锁。 - 再者,共享锁的获取/释放过程,是一个一旦发生了,就有可能会不停执行、循环往复的过程,就如状态PROPAGATE(传播)一样,这个过程具备传播性(而且是病毒性的。。哈哈~),只有当头节点没发生变化才会停止。
到此,AQS的共享模式结束。以上只是我个人的看法,当然对于一些特殊细节和场景可能小伙伴们看法不一致,希望大家提出哈~
我正在参与掘金技术社区创作者签约计划招募活动,点击链接报名投稿。