AQS

学习AQS的思路

学习AQS的目的主要是理解原理、提高技术以及应对面试

先从应用层面理解为什么需要它、如何使用它,然后再看一看Java代码的设计者是如何使用它的,了解它的应用场景

为什么学习AQS

锁和协作类有共同的特点就是闸门

事实上,不仅是ReentrantLock和Semaphore,包括CountDownLatch、ReentrantReadWriteLock都有这样类似“协作”(或者叫“同步”)的功能,其实他们底层都用了一个共同的基类,这就是AQS

为什么需要AQS

因为上面的那些协作类,它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接用,对于ReentrantLock和Semaphore而言就可以屏蔽很多的细节,只关注它们自己的“业务逻辑”就可以

Semaphore和AQS的关系

Semaphore内部有一个Sync类,Sync类继承了AQS

image-20200225201434705

AQS的比喻

比喻:群面、单面

安排就坐、叫号、先来后到等HR的工作就是AQS做的工作

面试官不会去关心两个面试者是不是号码相同冲突了,也不想去管面试者是不是需要一个地方坐着休息,这些都交给HR处理

Semaphore:一个人面试完以后,后一个人才能进来继续面试

CountDownLatch:群面,等待10人到齐

Semaphore、CountDownLatch这些同步工具要做的就只是写下自己的招聘规则,比如是出一个进一个,或者说凑齐10人,一起面试

如果没有AQS

就需要每个协作工具自己实现

  • 同步状态的原子性管理

  • 线程的阻塞与解除阻塞

  • 队列的管理

在并发场景下,自己正确且高效地实现这些内容,是相当有难度的,所以使用AQS来帮我们把这些脏活累活都搞定,我们只关心业务逻辑就够了

AQS的作用

AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类就可以很方便的被写出来

一句话总结:有了AQS,构建线程协作类就容易多了

AQS的重要性

AbstractQueueSynchronizer是Doug Lea写的,从JDK5

加入的一个基于FIFO等待队列实现的一个用于实现同步器的基础框架,用IDE看AQS的实现类,可以发现实现类有以下这些

image-20200226092323742

AQS内部原理解析

AQS最核心的就是三大部分

  • state
  • 控制线程抢锁和配合的FIFO队列
  • 期望协作工具类去实现的获取、释放等重要方法

state

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

这里的state的具体含义,会根据具体实现类的不同而不同,比如在Semaphore里,它表示剩余的许可证的数量,而在CountDownLatch里,它表示还需要倒数的数量

state是volatile修饰的,会并发的修改,所以所有修改state的方法都需要保证线程安全,比如getState、setState以及compareAndSetState操作来读取和更新这个状态,这些方法都依赖于juc.atomic包的支持

1
2
3
4
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

在ReentrantLock中,state用来表示“锁”的占有情况,包括可重入计数

当state的值为0的时候,表示该Lock不被任何线程所占有,加一表示有一个线程占用,减一表示有一个线程释放锁

控制线程抢锁和配合的FIFO队列

这个队列用来存放等待的线程,AQS就是排队管理器,当多个线程争用同一把锁时,必须有排队机制将那些没有拿到锁的线程串起来,当锁释放的时候,锁管理器就会挑选一个合适的线程来占用刚被释放的锁

AQS会维护一个等待的线程队列,把线程都放到这个队列里,这是一个双向形式的队列

image-20200226094956287

期望协作工具类去实现的获取/释放等重要方法

这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同

获取方法

获取操作会依赖state变量,经常会阻塞,比如获取不到锁的时候

在Semphore中,获取就是acquire方法,作用是获取一个许可证,而在CountDownLatch中,获取就是await方法,作用是等待,直到倒数结束

释放方法

释放操作不会阻塞

在Semaphore中,释放就是release方法,作用是释放一个许可证

在CountDownLatch中,释放就是countDown方法,作用是倒数1个数

需要重新tryAcquire和tryRelease等方法

应用实例与源码解析

AQS的用法

第一步:写一个类,想好协作的逻辑,实现获取/释放方法

第二步:内部写一个Sync类继承

第三步:根据是否独占来重写tryAcquire/tryRelease或tryAcquireShared(int acquires)和tryReleaseShared(int releases)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法

AQS在CountDownLatch的应用

内部类Sync继承AQS

构造函数

CountDownLatch中

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

private static final class Sync extends AbstractQueuedSynchronizer中有如下定义

1
2
3
Sync(int count) {
setState(count);
}

在setState()方法中有如下定义

1
2
3
protected final void setState(int newState) {
state = newState;
}

getCount()

CountDownLatch中

1
2
3
public long getCount() {
return sync.getCount();
}
1
2
3
int getCount() {
return getState();
}

public abstract class AbstractQueuedSynchronizer类中

1
2
3
protected final int getState() {
return state;
}

countDown()

CountDownLatch中

1
2
3
public void countDown() {
sync.releaseShared(1);
}

public abstract class AbstractQueuedSynchronizer类中

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

CountDownLatch中

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

await()

CountDownLatch中

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public abstract class AbstractQueuedSynchronizer类中

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

AQS在CountDownLatch的总结

调用CountDownLatch的await方法,便会尝试获取共享锁,不过一开始是获取不到该锁的,于是线程被阻塞

而共享锁可以获得到的条件,就是锁计数器的值为0,而锁计数器的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将锁计数器减1

count个线程调用countDown()之后,锁计数器才为0,而前面提到的等待获取共享锁的线程才能继续运行

AQS在Semaphore的应用

在Semaphore中,state表示许可证的剩余数量

1
2
3
4
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

看tryAcquire方法,判断nonfairTryAcquireShared,若大于等于0的则代表成功

这里会先检查剩余许可证数量够不够这次需要,用减法来计算,如果直接不够,则返回负数,表示失败。如果够了,就用自旋加compareAndSetState来改变state状态,直到改变成功就返回正数

如果在这期间被其他人修改了导致剩余数量不够了,那也返回负数代表获取失败

AQS在ReentrantLock的应用

分析释放锁的方法tryRelease

由于是可重入的,所以state代表重入的次数,每次释放锁先判断当前持有锁的线程是不是都是十方的,如果不是就抛出异常,如果是的话,重入次数就减一,当减到0时说明完全释放了,于是free就为true,并把state设置为0

加锁的方法

利用AQS实现一个自己的Latch门闩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 描述: 自己用AQS实现一个简单的线程协作器
*/
public class OneShotLatch {

private final Sync sync = new Sync();

public void signal() {
sync.releaseShared(0);
}
public void await() {
sync.acquireShared(0);
}

private class Sync extends AbstractQueuedSynchronizer {

@Override
protected int tryAcquireShared(int arg) {
return (getState() == 1) ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(int arg) {
setState(1);

return true;
}
}


public static void main(String[] args) throws InterruptedException {
OneShotLatch oneShotLatch = new OneShotLatch();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
oneShotLatch.await();
System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
}
}).start();
}
Thread.sleep(5000);
oneShotLatch.signal();

new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
oneShotLatch.await();
System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
}
}).start();
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Thread-0尝试获取latch,获取失败那就等待
Thread-2尝试获取latch,获取失败那就等待
Thread-1尝试获取latch,获取失败那就等待
Thread-3尝试获取latch,获取失败那就等待
Thread-4尝试获取latch,获取失败那就等待
Thread-5尝试获取latch,获取失败那就等待
Thread-6尝试获取latch,获取失败那就等待
Thread-7尝试获取latch,获取失败那就等待
Thread-8尝试获取latch,获取失败那就等待
Thread-9尝试获取latch,获取失败那就等待
开闸放行Thread-0继续运行
开闸放行Thread-2继续运行
开闸放行Thread-1继续运行
开闸放行Thread-3继续运行
开闸放行Thread-4继续运行
开闸放行Thread-5继续运行
开闸放行Thread-6继续运行
开闸放行Thread-7继续运行
开闸放行Thread-8继续运行
开闸放行Thread-9继续运行
Thread-10尝试获取latch,获取失败那就等待
开闸放行Thread-10继续运行
文章目录
  1. 1. AQS
    1. 1.1. 学习AQS的思路
    2. 1.2. 为什么学习AQS
      1. 1.2.1. 为什么需要AQS
      2. 1.2.2. Semaphore和AQS的关系
      3. 1.2.3. AQS的比喻
      4. 1.2.4. 如果没有AQS
    3. 1.3. AQS的作用
    4. 1.4. AQS的重要性
    5. 1.5. AQS内部原理解析
      1. 1.5.1. state
      2. 1.5.2. 控制线程抢锁和配合的FIFO队列
      3. 1.5.3. 期望协作工具类去实现的获取/释放等重要方法
        1. 1.5.3.1. 获取方法
        2. 1.5.3.2. 释放方法
    6. 1.6. 应用实例与源码解析
      1. 1.6.1. AQS的用法
      2. 1.6.2. AQS在CountDownLatch的应用
      3. 1.6.3. AQS在CountDownLatch的总结
      4. 1.6.4. AQS在Semaphore的应用
      5. 1.6.5. AQS在ReentrantLock的应用
    7. 1.7. 利用AQS实现一个自己的Latch门闩
|