对于AQS的一点理解

为什么标题叫做对于AQS的一点理解呢,因为感觉自己还没有达到能够对AQS学习透彻,哈哈。
这段时间在总结以前学过的东西,发现大牛写的东西真的是厉害,越学越谦虚,越学越感觉自己还有好多好多的东西要学。大牛写的东西不仅体现在结构、架构上,而且更令人惊叹的是在细微之处。
比如,HashMap的数组长度为什么是2的整数次幂;CopyOnWriteArrayList的set方法什么都没做的时候还要重新进行赋值(详情请看CopyOnWriteArrayList类set方法疑惑?);JDK1.8中HashMap和ConcurrentHashMap的扩容操作,等等。这些细节才是体现水平的地方,我只有对大牛仰慕的份。

好吧,回到正题。
正如标题,这只是部分总结,我尝试用自己语言将其表达出来,目的是理清AQS的一些关键脉络,比如:工作原理、内部结构、线程的处理等等;
而没有对AbstractQueuedSynchronizer就行全面介绍。

如果您之前还没有接触到AQS,那么本篇博客可能对您帮助不大;
如果您也是跟我一样,已经了解过AQS想总结一下,那么我们可能会产生共鸣;
如果您之前已经对AQS非常熟悉了,如果有不对的地方希望您不吝赐教,谢谢~

一句话概括

如果要我对AQS用一句话进行概括,我会这样概括:

AQS利用CAS原子操作维护自身的状态,结合LockSupport对线程进行阻塞和唤醒从而实现更为灵活的同步操作。

两个关键点

上面的一句话提炼一下,我其实想表达两个点:

  1. 通过CAS操作维护自身的状态
  2. 一个就是如何对线程的进行处理

CAS维护自身状态

CAS

CAS,CompareAndSwap,比较并交换。是一种乐观锁的思想,不对资源进行加锁而是与原来的值进行比较并交换:如果相等,意味着这个值没有被更新过,可以进行操作;否则说明该值已经被其它线程更改过了,那么需要进行重试。
因为不需要阻塞,也就没有了线程切换的开销,所以这种方法能够取得比较好的效率;

CAS需要三个参数:

  • offset(内存地址)
  • expect(期望值)
  • update(需要更新的值)

操作成功返回true,否则返回false。

维护什么状态

AQS内部的属性并不是很多,我们可以找到有这么一个属性:

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

这个变量,我觉得是AQS非常核心的一个变量。并发包中的工具类都是通过继承AQS暴露的方法(protected)来操作这个变量,从来实现各种各样有的同步功能;

怎么维护?

这里涉及到了一个设计模式-模版方法,子类通过实现protected方法,在protected方法里面实现自己的逻辑从而影响整个流程以及对status变量的操作。

也就是说子类所有的同步功能都是建立在这个status变量上的。

这样说起来有点抽象,考虑到可阅读性以及博客的重心是在AQS上,所以我后面把concurrent包下的同步器单独写一篇博客介绍,希望大家继续关注 ^_^

LockSupport对线程的处理

如标题所示,AQS对线程的处理主要是依赖LockSupport的功能

阻塞与唤醒

最开始接触AQS的时候,我最好奇的也是这个地方:AQS不是JVM直接支持的一个关键字,不像synchronized关键字可以得到JVM的支持;而AQS只是一个普通的Java类,那么它如何对线程进行阻塞和唤醒的呢?

其实很简单(只是思路很简单,实现起来可就不简单了,哈哈)。
我们来看一个源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//调用unparkSuccessor()唤醒线程
unparkSuccessor(h);
return true;
}
return false;
}
//来看upparkSuccessor()方法
private void unparkSuccessor(Node node) {
//省略无关代码
LockSupport.unpark(s.thread);
}
public static void unpark(Thread thread) {
//省略无关代码
if (thread != null)
//委托给UNSAFE#unpark方法,这是一个本地方法
UNSAFE.unpark(thread);
}

从调用链中我们可以了解到线程的唤醒操作是委托UNSAFE#unpark方法的,而这是一个本地方法,也就是说这个唤醒操作实际上是通过操作系统完成的。
同理,我们可以推导出阻塞的实现是通过UNSAFE#park方法方法实现的。

Node与CLH队列

从doc中,我们可以找到这么一幅图,它描述了内部的队列是如何构成的(我很好奇为什么只画了prev指针,为什么没有next指针,是不是因为地方太小画不下了,哈哈);

1
2
3
4
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+

Node

Node,队列节点,每一个Node都持有了一个线程,对线程进行包装,方便操作。

我们需要特别关注一下其内部的五个属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static final class Node{
/*当前node对象的等待状态,注意该状态并不是描述当前对象而是描述下一个节点的状态,
* 从而来决定是否唤醒下一个节点,该节点总共有四个取值:
* a. CANCELLED = 1:因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,
* 只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收;
* b. SIGNAL = -1:表示这个结点的继任结点被阻塞了,到时需要通知它;
* c. CONDITION = -2:表示这个结点在条件队列中,因为等待某个条件而被阻塞;
* d. PROPAGATE = -3:使用在共享模式头结点有可能牌处于这种状态,表示锁的下一次获取可以无条件传播;
* e. 0: None of the above,新结点会处于这种状态。
*
* 非负值标识节点不需要被通知(唤醒)。
*/
volatile int waitStatus;
//当前节点的上一个节点,如果是头节点那么值为null
volatile Node prev;
//当前节点的下一个节点
volatile Node next;
//与Node绑定的线程对象
volatile Thread thread;
//下一个等待条件(Condition)的节点,由于Condition是独占模式,因此这里有一个简单的队列来描述Condition上的线程节点。
Node nextWaiter;
}

当线程中的对象调用AQS子类的方法尝试更改AQS维护的状态失败时,就会将Thread对象抽象成这样的Node对象,这样更加利于管理。

Node中状态的作用:
当头节点也就是当前线程运行完毕以后,会检查自身的waitStatus,从而决定是否将后继节点唤醒。

为什么是通过检查头节点的waitStatus来决定唤醒操作,而不是检查其它节点的waitStatus?请参看请参看这篇文章-JAVA并发编程学习笔记之CLH队列锁

CLH队列

CLH队列是什么鬼?请参看这篇文章-JAVA并发编程学习笔记之CLH队列锁

上面说到当线程尝试更改AQS状态操作获得失败时,会将Thread对象抽象成Node对象,但是这些Node对象在AQS内部是如何内部管理的呢?
AQS会将Node对象加入到内部维护的一个的队列中,从上面Node的属性中我们可以得出这个队列是一个双向的链表结构。既然是链表,那么就会遵循先进先出(FIFO)规则,所以也就解决了线程排队的问题。

既然解决了排队问题,那么这个队列又是如何运作的呢?
因为遵循先进先出规则,所以我们不难想到队列的头节点应该是表示当前正在运行的线程;当头节点运行完毕,会检查自身的状态waitStatus来决定是否唤醒下一个节点。如果需要对下一个节点进行唤醒,那么会通过CAS操作将下一个节点设置成头节点;否则从队尾开始往前找,直到找到最靠前的需要唤醒的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*
* 如果waitStatus小于0,那么将下一个节点设置成头节点
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*
* 如果waitStatus大于0,或者下一个节点为null,那么从后往前找
* 之前与同学讨论的时候不是很明白为什么要从后往前找,现在看了下doc瞬间明白了:
* 下一个节点有可能因为任务被取消了,节点有可能变为null
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

总结

这篇文章没有详细说明关于AQS的所有信息,而是尝试从这两个角度去理清AQS的脉络:

  1. CAS操作自身状态;
  2. 如何处理线程;

然后展开的说明了CAS、自身状态的作用、如何操作、以及对线程的处理、阻塞时怎么办、如何唤醒等等;

抓住了运行原理,那么理解AQS就比较简单了;

限于篇幅以及文章的侧重点,所以没有叙述concurrent包中的同步器是如何使用AQS的,后面我会把这部分内容补上~

参考资料


我不能保证写的东西完全正确
但是我能保证我写的东西都是经过思考之后所得出,而不是复制粘贴得来
如果有不对的地方,希望您能够指正,谢谢~