AQS类图明细
AQS的使用,也就是子类继承AQS之后,根据需要重写AQS中定义的protected方法,从上述类图可以看出:
tryAcquire: 获取独占锁
tryRelease: 释放独占锁
isHeldExclusively: 判断当前线程是否持有独占锁
tryAcquireShared: 获取共享锁
tryReleaseShared: 释放共享锁
操作和获取属性state的getState()以及CAS修改state的方法
操作线程独占方法get&set方法
如果锁需要Condition,则可以直接使用ConditionObject来实现对应的分类锁逻辑实现线程通信(可选)
// Node 部分代码,在AQS内部中定义
static final class Node{
// 锁的mode
static final Node SHARED = new Node(); // 表示持有共享锁
static final Node EXCLUSIVE = null; // 持有独占锁
// waiter status,即节点所处的阻塞状态列表如下
static final int CANCELLED = 1; // 被取消,意味着放弃竞争锁资源,移出阻塞队列
static final int SIGNAL = -1; // 持有锁状态
static final int CONDITION = -2; // 仅用于条件变量,线程处于条件等待队列中,也就是condition.await让线程挂起
static final int PROPAGATE = -3; // 仅用于共享锁,表示已经释放锁并已唤醒下一个阻塞节点的线程状态
volatile int waitStatus; // 当前节点的状态,初始化为0,不属于上述任何一种状态,属于可竞争获取锁的状态
// 实现双端链表
volatile Node prev;
volatile Node next;
// 当前节点的线程
volatile Thread thread;
// 标志当前节点是共享锁还是独占锁,用节点指针引用指向对应的mode
Node nextWaiter;
// 独占锁:nextWaiter = null & thread = Thread.currentThread;
// 共享锁:nextWaiter = SHARED & thread = Thread.currentThread;
// 不具备上述条件属于正常对象,不持有锁状态
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
ConditionObject: 自定义接口Condition的实现,业务线程可以创建对应的锁条件来完成线程之间的通信.即线程唤醒与等待,其核心要素如下:
自定义条件队列,即使拥有属性firstWaiter与lastWaiter
实现接口await/signal/signalAll等方法
ConditionObject内部实现的核心方法
// AQS.java
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 自定义实现双向FIFO的队列, 定义队列的head & tail
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state; // 当前AQS同步状态信息,具体实现类含义可能不一样
// CAS操作上述的state,head&tail, waitstuts
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 除了获取/释放锁之外,内部定义核心方法
private Node enq(final Node node) {} // 将Node插入队列
private Node addWaiter(Node mode) {} // 创建想要持有mode的锁方式的Node并将其添加到阻塞队列中
// 内部实现的独占锁方式
final boolean acquireQueued(final Node node, int arg) {} // 获取独占锁,失败则挂起
private void unparkSuccessor(Node node) {} // 唤醒当前Node的下一个节点的线程
// 内部实现共享锁
private void doReleaseShared() {} // 唤醒下一个节点的线程并通知其他节点的线程已经释放锁
private void doAcquireShared(int arg) {} // 如果获取锁失败则挂起线程,成功则持有锁
}
模板方法: acquire/release
具体实现方法: tryAcquire/tryRelease
模板方法: acquireShared/releaseShared
具体实现方法: tryAcquireShared/tryReleaseShared
acquire/acquireShared定义资源争抢锁逻辑,没有拿到则加入队列等待池中等待
tryAcquire/tryAcquireShared实际执行占用资源的操作,交由具体实现的AQS完成
release/releaseShared定义释放资源的逻辑,释放后唤醒等待池下一个的节点线程
tryRelease/tryReleaseShared实际释放资源的操作,由具体实现的AQS完成
// DefineAQS.java
public abstract class DefineAQS {
final static class AQSNode{
final static int SHARED = 9999;
final static int EXCLUSIVE = -9999;
private int mode;
private volatile Thread thread;
public AQSNode(int mode){
this.thread = Thread.currentThread();
this.mode = mode;
}
public Thread getThread() {
return thread;
}
public int getMode() {
return mode;
}
}
private AtomicInteger state = null;
private AtomicReference<Thread> exclusiveOwnerThread = new AtomicReference<>();
private LinkedBlockingQueue<AQSNode> waiters = new LinkedBlockingQueue<>();
public AtomicInteger getState() {
return state;
}
public void setState(int state) {
this.state = new AtomicInteger(state);
}
public void compareAndSetState(int expect, int update){
this.state.compareAndSet(expect, update);
}
public void acquire(int arg){
// 加入阻塞队列中
AQSNode node = new AQSNode(AQSNode.EXCLUSIVE);
waiters.offer(node);
while (!tryAcquire(arg)){
LockSupport.park(node.getThread());
}
// 当前线程已经获取到锁,移出阻塞队列,通知后续节点
waiters.remove(node);
}
public void release(int arg){
if (tryRelease(arg)) {
while (true){
AQSNode node = waiters.peek();
if (node.getMode() == AQSNode.EXCLUSIVE){
LockSupport.unpark(node.getThread());
break;
}
}
}
}
public void acquireShared(int arg){
AQSNode node = new AQSNode(AQSNode.SHARED);
waiters.offer(node);
while (tryAcquireShared(arg) < 0){
LockSupport.park(node.getThread());
}
waiters.remove(node);
}
public void releaseShared(int arg){
if (tryReleaseShared(arg) > 0){
while (true){
AQSNode node = waiters.peek();
if (node.getMode() == AQSNode.SHARED){
LockSupport.unpark(node.getThread());
break;
}
}
}
}
// abstract method :
// for tryReleaseShared & tryAcquireShared
// fortryAcquire & tryRelease
}
老铁们关注走一走,不迷路
往期锁原理技术文章
本文分享自微信公众号 - 疾风先生(Gale2Writing)。
如有侵权,请联系 [email protected] 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。