队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架。
它主要的设计思想是使用一个名为state
的int类型成员变量来表示同步状态,AQS里面大部分方法都是再对这个边进行操作;再内置一个FIFO队列来完成资源获取线程的排队工作。
AQS的使用方式主要使用继承方式,并且推荐使用静态内部类,这样做的好处是隔离了使用者和实现者所关注的领域。
AQS的主要接口
访问或修改同步状态
可重写的方法
这些可以覆盖的方法其实都是有默认实现的,默认实现直接抛出一个异常,如:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
没有把这些方法直接定义成抽象方法的好处是,根据需要我们只覆写其中一部分方法就行了。假如我们要实现一个排它锁,我们只需要去覆写tryAcquire、treRelease和isHeldExclusively就行了。
AQS的模板方法
AQS的使用模板方法的设计模式,大致分为三类接口:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。
AQS的队列同步器
同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取 同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其 加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再 次尝试获取同步状态。
同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和 后继节点,他是一个静态内部类。
static final class Node {
/**
* 标记表示节点正在共享模式下等待
*/
static final Node SHARED = new Node();
/**
* 标记表示节点正在独占模式下等待
*/
static final Node EXCLUSIVE = null;
/**
* 取消状态
*/
static final int CANCELLED = 1;
/**
* 后继节点处于等待状态
*/
static final int SIGNAL = -1;
/**
* 表示线线程在等待队列中
*/
static final int CONDITION = -2;
/**
* 共享,表示状态要往后面的节点传播
*/
static final int PROPAGATE = -3;
/**
* CANCELLED 值为1: 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态后将不会发生变化
* SIGNAL 值为-1: 后继节点的线程处于等待状态,当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
* CONDITION 值为-2: 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点会从等待队列中转移到同步队列中,加入到对同步状态的获取中
* PROPAGATE 值为-3: 表示下一次共享同步状态将会无条件地被传播下去
* INITIAL 值为0: 节点的初始状态
*/
volatile int waitStatus;
/**
* 前驱结点,当节点加入同步队列时被设置(尾部添加)
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 获取同步状态的线程
*/
volatile Thread thread;
/**
* 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHAEED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个字段
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
节点是构成同步队列(等待队列)的基础,同步器拥有首节点(head) 和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的 基本结构如图:
- 为了保证线程安全所以设置尾部节点需要使用compareAndSetTail(Node expect,Node update)方法。
- 由于只有获取同步状态的线程才能设置头结点,所以没有竞争可以直接使用setHead(Node update)方法。
独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代码主要完成了***同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作***,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node) 方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该 节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的 唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
下面分析一下相关工作。首先是节点的构造以及加入同步队列:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先假设没有竞争,快速的尝试一下将当前节点设置成尾节点,如果失败再调用enq方法,自旋
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线 程安全添加。
在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循 环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线 程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变 得“串行化”了。
节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自 省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这 个自旋过程中(并会阻塞节点的线程),代码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 以自旋的方式来获取同步状态
for (;;) {
final Node p = node.predecessor();
// 获取同步状态
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检查并设置同步状态,然后挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,原因有两个,如下:
- 头节点是成功获取到同状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
- 维护同步队列的FIFO原则。
节点自旋获取同步状态结构图:
独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如图:
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能 够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释 放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。该方法代码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒头结点的后继节点
if (s != null)
LockSupport.unpark(s.thread);
}
独占锁示例
package com.xiaolyuh;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 互斥锁
*
* @author yuhao.wang3
* @since 2019/7/10 17:21
*/
public class MutexLock implements Lock {
// 使用静态内部类的方式来自定义同步器,隔离使用者和实现者
static class Sync extends AbstractQueuedSynchronizer {
// 我们定义状态标志位是1时表示获取到了锁,为0时表示没有获取到锁
@Override
protected boolean tryAcquire(int arg) {
// 获取锁有竞争所以需要使用CAS原子操作
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
// 只有获取到锁的线程才会解锁,所以这里没有竞争,直接使用setState方法在来改变同步状态
setState(0);
setExclusiveOwnerThread(null);
return true;
}
@Override
protected boolean isHeldExclusively() {
// 如果货物到锁,当前线程独占
return getState() == 1;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryRelease(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(0);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public static void main(String[] args) {
MutexLock lock = new MutexLock();
final User user = new User();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
lock.lock();
try {
user.setAge(user.getAge() + 1);
System.out.println(user.getAge());
} finally {
lock.unlock();
}
}).start();
}
}
}
独占式超时获取同步态
独占式超时获取同步态起始就是在独占式获取同步状态的自旋方法体加上了时间判断,每次循环都会判断是否超少,代码如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 在非常短的超时等待无法做到十分精确,所以同步器会直接进入无条件的快速自旋
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 检查是否超时
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在非常短的超时等待无法做到十分精确,所以同步器会直接进入无条件的快速自旋。
共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。
左半部分,共享式访问资源时,其他共享式的访问均被允许,而独占式访问被 阻塞,右半部分是独占式访问资源时,同一时刻其他访问均被阻塞。
通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态,该方法代码如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。
与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以 释放同步状态,代码如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线 程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg) 方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。
共享锁示例
package com.xiaolyuh;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 共享锁,可以允许多个线程同时获得锁
*
* @author yuhao.wang3
* @since 2019/7/12 10:00
*/
public class SharedLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
int count = getState();
if (count > 0 && compareAndSetState(count, count - arg)) {
setExclusiveOwnerThread(Thread.currentThread());
return count;
}
return -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
// 通过循环和CAS来保证安全的释放锁
int count = getState();
if (compareAndSetState(count, count + arg)) {
setExclusiveOwnerThread(null);
return true;
}
}
}
@Override
protected boolean isHeldExclusively() {
return getState() <= 0;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private Sync sync;
/**
* @param count 能同时获取到锁的线程数
*/
public SharedLock(int count) {
this.sync = new Sync(count);
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
try {
return sync.tryAcquireSharedNanos(1, 100L);
} catch (InterruptedException e) {
return false;
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public static void main(String[] args) {
final Lock lock = new SharedLock(5);
// 启动10个线程
for (int i = 0; i < 100; i++) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
} catch (Exception e) {
} finally {
lock.unlock();
}
}).start();
}
// 每隔1秒换行
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println();
}
}
}
源码:https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
参考
《java并发编程的艺术》