AQS

Catalogue
  1. 1. Synchronizers是什么?
    1. 1.1. 同步器最重要的两类方法
  2. 2. AQS提供的功能
    1. 2.1. 同步状态
    2. 2.2. 阻塞(Blocking)
    3. 2.3. 同步队列
    4. 2.4. Condition队列
  3. 3. 详解
    1. 3.1. Node
      1. 3.1.1. 共享模式和独占模式是什么意思?
    2. 3.2. acquire
    3. 3.3. release
    4. 3.4. acquireShared
    5. 3.5. releaseShared

AbstractQueuedSynchronizer是一个抽象类,为Java中各种Synchronizers(同步器)的实现提供了一个模版框架。

Synchronizers是什么?

Synchronizers是实现多线程通信、同步的工具和手段。如锁、Semaphore、阻塞队列、CyclicBarrier、CountDownLatch等。同步器通常包含如下几个部分来实现多线程同步

  • 状态(state)
  • 访问控制(access control)
  • 状态转化(state change)
  • 通知策略(Notification Strategy)
  • Test and Set方法
  • Set方法

当然,不是所有同步器都需要这些东西,有些同步器也可能通过其他方式来实现同步。

访问控制利用同步器的同步状态来决定线程是否可以进入临界区(发生竞争的代码段)。线程进入后,改变状态,从而阻塞(可能)其他线程。有时候,当一个线程改变状态后,可能需要通知其他线程状态发生了改变。这几个过程如下两个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Lock{

private boolean isLocked = false; // state

public synchronized void lock()
throws InterruptedException{
// access control
while(isLocked){ //
//wait strategy - related to notification strategy
wait();
}
isLocked = true; // state change
}

public synchronized void unlock(){
isLocked = false;
notify(); //notification strategy
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class BoundedSemaphore {
private int signals = 0; // state
private int bound = 0; // state

public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}

public synchronized void take() throws InterruptedException{
// access control
while(this.signals == bound) wait();
//state change
this.signals++;
this.notify();
}

public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
//state change
this.signals--;
this.notify(); // notification strategy
}
}

同步器最重要的两类方法

同步器最重要的两类方法是:acquire和release方法。acquire方法阻塞调用这个方法的线程直到同步状态满足访问控制要求,release方法改变同步状态,这个同步状态的改变有可能使得一个或多个阻塞线程恢复。

acquire的逻辑如下

1
2
3
4
5
6
7
8
9
while (当前同步状态不允许进行acquire) {	

如果当前线程不在队列则将其加入队列;

有可能阻塞当前线程;

}

如果当前线程在队列中则出队列

release的逻辑如下

1
2
3
更新同步状态
if (同步状态有可能使某些阻塞线程acquire)
去阻塞这些线程

具体参考synchronizerDoug Lea的论文

AQS提供的功能

作为一个框架,AQS主要提供了对同步状态的原子操作阻塞与唤醒线程队列管理线程等功能,在上一节我们知道了我们需要对同步状态进行改变,这个改变需要保证原子性,通过同步状态来实现访问控制,从而实现线程的阻塞与唤醒。

同步状态

AQS中通过一个带volatile语义的int变量来维护同步状态。提供getStatesetStatecompareAndSetState方法来访问和更新状态,AQS的具体实现类需要根据这些方法来实现tryAcquiretryRelease方法。

阻塞(Blocking)

java.util.concurrent.locks提供了LockSupport类来实现线程的阻塞与唤醒。

LockSupport.park阻塞线程,LockSupport.unpark唤醒线程。

同步队列

同步队列用来维护线程队列,它是一个FIFO队列。来完成线程获取锁的排队工作。

Condition队列

等待队列,通过内部ConditionObject构成,当Condition调用await()方法后,线程将会加入等待队列中,而当Condition调用signal()方法后,线程将从等待队列转移动同步队列中进行锁竞争。Condition提供的方法类似于Object对象的监视器方法wait()、notify()和notifyAll()。关于Condition,参考

详解

同步队列是一个FIFO双向队列,在该队列中,一个Node节点表示一个线程。

Node

每个线程用一个Node表示。Node有个重要的字段waitStatus,表示节点线程的状态。其值含义如下:

  • SIGNAL,表示当前节点的后继节点处于或即将处于阻塞状态,当前节点在release或者cancel的时候需要唤醒后继节点
  • CANCELLED,表示当前节点线程由于超时或者中断而被取消
  • CONDITION,表示当前节点在条件队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从条件队列中转移到同步队列中,加入到同步状态的获取中
  • PROPAGATE,在共享模式下,后续节点传播唤醒的操作。
  • 默认值

除了waitStatus,Node还保存了线程的引用、前驱节点、后继节点和条件队列中用到的下一节点(nextWaiter)。其中,还有两个静态变量

1
2
3
4
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

共享模式和独占模式是什么意思?

共享模式指的是允许多个线程获取同一个锁而且可能获取成功,独占模式指的是一个锁如果被一个线程持有,其他线程必须等待。

同步队列结构图如下:

我们先来看看独占模式下的acquirerelease两个重要方法。

acquire

acquire的流程图如下

输入图片说明

acquire方法是AQS提供的模版方法,其中tryAcquire方法由子类来实现。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  • tryAcquire,由子类实现,去尝试获取锁
  • addWaiter,创建一个新节点加入到队尾
  • acquireQueued,如果前驱节点为头节点,循环去获取锁,否则线程等待。

我们来看一下三个线程同时去争取锁的时候发生了什么。

首先,tryAcquire中,一般是通过CAS来实现对锁的竞争,这三个线程其中一个CAS成功,另外两个失败。它们的tryAcquire返回false;

addWaiter方法将这两个失败的线程以EXCLUSIVE类型节点加入同步队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果队尾节点不为null
if (pred != null) {
// 直接CAS入队尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 队尾为空或者CAS失败
enq(node);
return node;
}

首先判断是否存在尾节点,如果存在,则CAS将新生成的节点设置为尾节点。如果CAS失败或者尾节点不存在,则用enq将其加入尾节点,总之,addWaiter就是将创建新节点加入到队尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾节点不存在,需要初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾节点有了
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

这里两个线程进入,其中一个线程发现尾节点为空,则CAS生成尾节点,另外一个节点CAS失败或者在if时就判断尾节点生成了,直接将自己设成尾节点即可。

acquireQueued方法大概过程如下:

对于同时进入的线程2和线程3,对于线程2前一个节点为头节点,会再尝试获取锁一次,如果失败,则将头节点的waitStatus改成SIGNAL,下次循环的时候会再次去尝试获取锁,如果还是失败,其waitStatus为SIGNAL,则通过LockSupport将线程挂起;对于线程3,由于它的前驱节点为线程2,根据FIFO,需要等至少线程2获取到锁3才去获取锁,因此3只需将2点waitStatus改成SIGNAL,然后再检查一下自己是否有资格去获取锁,没有则挂起。

这里有个问题,这个FIFO顺序是否就实现了公平锁呢?

让我们看看线程1释放锁后这个锁是怎么传递到。

release

线程1在释放锁的时候的操作如下

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease同样由子类来实现,在tryRelease中,1会释放掉锁。会将state从1改成0,此时,如果线程4来了,那么这个锁就有可能被4给拿去而不是依次从2到3,因此上节中的问题,FIFO同步队列并不能保证锁的公平获取,它只能保证入了队列的线程获取锁的一个先后顺序,这也是为什么ReentrantLock中需要实现公平锁的逻辑。

当1释放锁后,判断队列中是否有节点,如果有节点,则去通知节点来拿锁。unparkSuccessor主要做三件事情:

  1. 将队头的waitStatus设置为0.
  2. 通过从队列尾部向队列头部移动,找到最后一个waitStatus<=0的那个节点,也就是离队头最近的没有被cancelled的那个节点,队头这个时候指向这个节点。
  3. 将这个节点唤醒,这个时候线程1已经出队列了。

acquireShared

共享模式与独占模式acquire逻辑大致相同,区别在于入队列后获取锁成功后的操作,如果获取锁成功,共享模式会判断后继节点是否是共享模式,如果是,则立即对其进行唤醒操作。

releaseShared

共享模式下的release逻辑也是要保证释放锁的时候往后传播,使得所有共享模式的节点都可以被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}