JUC实现AQS树:
简介
AQS(AbstractQueuedSynchronizer)抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架;juc包中许多类基于该类实现的,比如:ReentrantLock、CountDownLatch等。
在整个AQS存在两种链表。一个链表就是整个Sync Node链表,横向链表。另一种链表就是Condition的Wait Node链表,相对于Sync node,它属于node节点的一个纵向链表。当纵向列表被single通知后,会进入对应的Sync Node进行排队处理
volatile关键字
-
原子性(不一定能保证其原子性,如i++)
-
可见性
-
顺序性
预留方法介绍
以下5个protected方法,用于client自行实现,进行业务行为控制;juc包下Lock、Futrue很多实现都是基于此扩展的
-
tryAcquire(int):独占方式,尝试获取资源,成功则返回true,失败则返回false
-
tryRelease(int):独占方式,尝试释放资源,成功则返回true,失败则返回false
-
tryAcquireShared(int):共享方式,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
-
tryReleaseShared(int):共享方式,尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false
-
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它
独占锁、共享锁案例
-
以ReentrantLock独占锁为例,state初始化为0,表示未锁定状态。线程Alock时,会调用tryAcquire独占该锁并cas设置state为1。此时其他线程来tryAcquire时会失败,直到线程A调用unlock释放锁将state设置为0为止。当然,在释放锁前,线程A自己可以重复获取此锁的(state累加),这就是可重入概念。但要注意获取多少次就对应需要释放多少次,这样才能保证state回到0状态
-
以CountDownLatch共享锁为例,任务分为N个子线程执行,state也为N(state要保证和线程个数一致)。这N个子线程是并行执行的,每个子线程执行countDown方法时,state减1再cas设置state,等待所有子线程执行完后(state=0),会unpark()唤醒等待主线程,执行后续操作
Node设计
static final class Node {
// 共享锁模式,多个线程可同时执行,如Semaphore/CountDownLatch
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
// 独占锁模式,只有一个线程能执行,如ReentrantLock
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
// waitStatus状态
// 表示当前节点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
// 代表是需要当前Node节点需要唤起后一个Node节点。在Node节点enqueue时,会设置前一个节点的状态。这样链式的唤醒,完成这样的一个交接棒
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
// 表示节点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的节点将从等待队列转移到同步队列中,等待获取同步锁
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
// 共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继结点(传播)
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
// 状态,新节点默认为0
volatile int waitStatus;
// 链表前继节点
volatile Node prev;
// 链表后继节点
volatile Node next;
// 排队此节点的线程,初始化构造为null
volatile Thread thread;
// 一个标志位,就是用于表明是采用的共享锁还是排他锁。同时也是其对应condition队列的引用节点
Node nextWaiter;
}
Node设计有以下几点需要注意:
-
Node实现了CLH锁;即Craig, Landin, and Hagersten (CLH) locks。CLH锁是一个自旋锁。能确保无饥饿性,提供先来先服务的公平性
-
是一个FIFO的链表的实现,对于队列的控制经常要做double-check
-
waitStatus负值代表节点处于等待状态,正值代表节点已被取消
入队enqueue分析(double-check)
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 将当前node节点的prev指向tail节点
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 位置1
// tail节点的下一个节点指向当前node节点
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize //位置3
if (compareAndSetHead(new Node())) // 位置4
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 位置2
t.next = node;
return t;
}
}
}
}
以上代码标记的4个位置:
-
node默认的enqueue操作都是接在tail节点之后,在prev节点指定完成之后,会进行一个cas操作,将当前加入的节点作为tail。因为会有并发操作,原先的tail节点会有所变化,位置1处cas出现失败。这样就进入第2步check
-
位置2,每次都获取一次当前的tail节点,尝试进行cas操作,将当前node节点作为tail,有并发/竞争导致处理失败,继续重复这一动作,直至cas成功
-
在位置1和位置2处理的一种异常情况,就是tail节点为空,有可能当前node节点是第一个进行enqueue,位置4这时需要创建一个新的队列,cas设置一个空的Head Node节点,并将自己作为tail节点。另一种异常情况也有可能是node节点被所有唤醒
-
同样考虑并发因素,位置3在处理时,有可能被其他线程已创建好Head Node节点,这样又回到位置2上处理,将node节点添加到tail节点之后
出队dequeue
出库分为两个动作:
1. 动作
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 位置1
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) { // 位置2
head = node;
node.thread = null;
node.prev = null;
}
以上代码标记的2个位置:
-
在位置1处,每次获取当前node节点的prev前个节点,判断是否等于head节点。这里head节点比较抽象,有点难理解,可以将head节点理解成一个“虚拟”或者“傀儡”节点,就纯粹代表上一个出库的节点。因为是一个FIFO队列,如果当前node节点的上一个节点已经出库,那就可以轮到自己
-
在位置2处,轮自己出库,将当前node节点作为head,同时进行gc处理,手动断开node节点和链表的一些关联
2. 动作2
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 位置1
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
释放锁的时候会调用该方法;位置1处,循环跳过已被取消的节点,最后unpark唤醒对应节点的线程
源码分析
**独占锁**
-
require
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 位置1
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 位置2
parkAndCheckInterrupt()) //位置3
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 位置4
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 如果为SIGNAL状态,则需要park
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 处理被取消的node节点
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // cas设置上一个节点的waitStatus
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
-
tryAcquire自定义扩展,一般是对value值就行控制,类似P/V原语控制
P:申请一个空闲资源(把信号量减1),若成功,则退出;若失败,则该进程被阻塞
V:释放一个被占用的资源(把信号量加1),如果发现有被阻塞的进程,则选择一个唤醒之 -
addWaiter,入库将Node添加到链表中,上面入队有讲解
-
acquireQueued分析:
-
位置1处,就是对比一下我的上一个节点是否已经出队列,如果已经出队列,就认为当前轮到自己出队列,返回interrupted的标志
-
位置2处,执行
shouldParkAfterFailedAcquire
,就是设置一下当前节点的上一个节点的waitStatus状态为SINGLE,让其在出队列的时候能唤醒自己进行处理 -
在设置了上一个节点为SINGLE后,当前线程就可以进行park,转到阻塞状态,直到等到被唤醒。(唤醒条件有2个:
前一个节点的唤醒和Thread.interupte事件
) -
位置4处,就是一个cancel
-
acquireInterruptibly中断
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 位置1
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 位置2
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquire和acquireInterruptibly不同之处,就在于位置1判断线程是否中断,如果中断则抛出异常由调用方处理;位置2处直接抛出InterruptedException中断异常
-
tryAcquireNanos超时
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算最后期限时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算超时时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 位置1
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) // 位置2
LockSupport.parkNanos(this, nanosTimeout); // 位置3
if (Thread.interrupted()) // 位置4
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireNanos支持设置超时时间,在指定时间未获取到锁就返回false
-
位置1处,如果超时直接返回false
-
位置2处,如果计算出的超时时间大于自旋锁超时阀值时,则位置3parkNanos阻塞超时时间
-
位置4处,线程中断直接抛出中断异常
**共享锁**
-
acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 位置1
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 位置2
}
}
-
位置1处,设置队列头,并检查后继者是否可能在共享模式下等待,如果正在传播,则传播是否设置为传播> 0或PROPAGATE状态
-
位置2处,释放锁,出队列的时候,同时唤醒下一个Node
ConditionObject
使用场景:
生产者与消费者
整体讲下这个ConditionObject的实现,其实维护两个队列:
-
Condition队列:表示等待的队列,其waitStatus=Node.Condition,由firstWaiter和lastWaiter两个属性操控
-
Sync队列:表示可以竞争锁的队列,这个跟AQS一致,waitStatus=0
-
await()方法:就是把当前线程创建一个Node加入Condition队列,接着就一直循环查其在不在Sync队列,如果当前节点在Sync队列里了,就可以竞争锁,恢复运行了
-
signal()方法:就是把某个节点的nextWaiter设为null,再把其从Condition队列转到Sync队列