概述:
AbstractQueuedSynchronizer
,可以称为抽象队列同步器。
AQS有独占模式和共享模式两种:
- 独占模式:
公平锁:
非公平锁:
- 共享模式:
数据结构:
- 基本属性:
/** * 同步等待队列的头结点 */ private transient volatile Node head; /** * 同步等待队列的尾结点 */ private transient volatile Node tail; /** * 同步资源状态 */ private volatile int state;
- 内部类:
static final class Node { /** * 标记节点为共享模式 */ static final Node SHARED = new Node(); /** * 标记节点为独占模式 */ static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** * CANCELLED: 值为1,表示当前的线程被取消 * SIGNAL: 值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark; * CONDITION: 值为-2,表示当前节点在等待condition,也就是在condition队列中; * PROPAGATE: 值为-3,表示当前场景下后续的acquireShared能够得以执行; * 0: 表示当前节点在sync队列中,等待着获取锁。 * 表示当前节点的状态值 */ volatile int waitStatus; /** * 前置节点 */ volatile Node prev; /** * 后继节点 */ volatile Node next; /** * 节点同步状态的线程 */ volatile Thread thread; /** * 存储condition队列中的后继节点 */ Node nextWaiter; /** * 是否为共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 获取前驱结点 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
主要方法解析:
tryAcquire/tryAcquireShared(int arg)
独占/共享模式获取锁;由子类实现,仅仅获取锁,获取锁失败时不进行阻塞排队。
tryRelease/tryReleaseShared(int arg)
独占/共享模式释放锁;由子类实现,仅仅释放锁,释放锁成功不对后继节点进行唤醒操作。
acquire/acquireShared(int arg)
独占/共享模式获取锁,如果线程被中断唤醒,会返回线程中断状态,不会抛异常中止执行操作(忽略中断)。
acquireInterruptibly/acquireSharedInterruptibly(int arg)
独占/共享模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止)。
tryAcquireNanos/tryAcquireSharedNanos(int arg, long nanosTimeout)
独占/共享时间中断模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止);如果超出等待时间则返回加锁失败。
release/releaseShared(int arg)
独占/共享模式释放锁。
addWaiter(Node mode)
将给定模式节点进行入队操作。
1 private Node addWaiter(Node mode) { 2 // 根据指定模式,新建一个当前节点的对象 3 Node node = new Node(Thread.currentThread(), mode); 4 // Try the fast path of enq; backup to full enq on failure 5 Node pred = tail; 6 if (pred != null) { 7 // 将当前节点的前置节点指向之前的尾结点 8 node.prev = pred; 9 // 将当前等待的节点设置为尾结点(原子操作) 10 if (compareAndSetTail(pred, node)) { 11 // 之前尾结点的后继节点设置为当前等待的节点 12 pred.next = node; 13 return node; 14 } 15 } 16 enq(node); 17 return node; 18 }
enq(final Node node)
将节点设置为尾结点。注意这里会进行自旋操作,确保节点设置成功。因为等待的线程需要被唤醒操作;如果操作失败,当前节点没有与其他节点没有引用指向关系,一直就不会被唤醒(除非程序代码中断线程)。
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 // 判断尾结点是否为空,尾结点初始值是为空 5 if (t == null) { // Must initialize 6 // 尾结点为空,需要初始化 7 if (compareAndSetHead(new Node())) 8 tail = head; 9 } else { 10 // 设置当前节点设置为尾结点 11 node.prev = t; 12 if (compareAndSetTail(t, node)) { 13 t.next = node; 14 return t; 15 } 16 } 17 } 18 }
acquireQueued(final Node node, int arg)
已经在队列当中的节点,准备阻塞获取锁。在阻塞前会判断前置节点是否为头结点,如果为头结点;这时会尝试获取下锁(因为这时头结点有可能会释放锁)。
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 // 当前节点的前置节点 7 final Node p = node.predecessor(); 8 // 入队前会先判断下该节点的前置节点是否是头节点(此时头结点有可能会释放锁);然后尝试去抢锁 9 // 在非公平锁场景下有可能会抢锁失败,这时候会继续往下执行 阻塞线程 10 if (p == head && tryAcquire(arg)) { 11 //如果抢到锁,将头节点后移(也就是将该节点设置为头结点) 12 setHead(node); 13 p.next = null; // help GC 14 failed = false; 15 return interrupted; 16 } 17 // 如果前置节点不是头结点,或者当前节点抢锁失败;通过shouldParkAfterFailedAcquire判断是否应该阻塞 18 // 当前置节点的状态为SIGNAL=-1,才可以安全被parkAndCheckInterrupt阻塞线程 19 if (shouldParkAfterFailedAcquire(p, node) && 20 parkAndCheckInterrupt()) 21 // 该线程已被中断 22 interrupted = true; 23 } 24 } finally { 25 if (failed) 26 cancelAcquire(node); 27 } 28 }
shouldParkAfterFailedAcquire(Node pred, Node node)
检查和更新未能获取锁节点的状态,返回是否可以被安全阻塞。
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus; // 获取前置节点的状态 3 if (ws == Node.SIGNAL) 4 /* 5 * 前置节点的状态waitStatus为SIGNAL=-1,当前线程可以安全的阻塞 6 */ 7 return true; 8 if (ws > 0) { 9 /* 10 * 如果前置节点的状态waitStatus>0,即waitStatus为CANCELLED=1(无效节点),需要从同步状态队列中取消等待(移除队列) 11 */ 12 do { 13 node.prev = pred = pred.prev; 14 } while (pred.waitStatus > 0); 15 pred.next = node; 16 } else { 17 /* 18 * 将前置状态的waitStatus修改为SIGNAL=-1,然后当前节点才可以被安全的阻塞 19 */ 20 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 21 } 22 return false; 23 }
parkAndCheckInterrupt()
阻塞当前节点,返回当前线程的中断状态。
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this); //阻塞 3 return Thread.interrupted(); 4 }
cancelAcquire(Node node)
取消进行的获取锁操作,在非忽略中断模式下,线程被中断唤醒抛异常时会调用该方法。
1 // 将当前节点的状态设置为CANCELLED,无效的节点,同时移除队列 2 private void cancelAcquire(Node node) { 3 if (node == null) 4 return; 5 6 node.thread = null; 7 Node pred = node.prev; 8 while (pred.waitStatus > 0) 9 node.prev = pred = pred.prev; 10 11 Node predNext = pred.next; 12 node.waitStatus = Node.CANCELLED; 13 if (node == tail && compareAndSetTail(node, pred)) { 14 compareAndSetNext(pred, predNext, null); 15 } else { 16 int ws; 17 if (pred != head && 18 ((ws = pred.waitStatus) == Node.SIGNAL || 19 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 20 pred.thread != null) { 21 Node next = node.next; 22 if (next != null && next.waitStatus <= 0) 23 compareAndSetNext(pred, predNext, next); 24 } else { 25 unparkSuccessor(node); 26 } 27 28 node.next = node; // help GC 29 } 30 }
hasQueuedPredecessors()
判断当前线程是否应该排队。
1.第一种结果——返回true:(1.1和1.2同时存在,1.2.1和1.2.2有一个存在)
1.1 h != t为true,说明头结点和尾结点不相等,表示队列中至少有两个不同节点存在,至少有一点不为null。
1.2 ((s = h.next) == null || s.thread != Thread.currentThread())为true
1.2.1 (s = h.next) == null为true,表示头结点之后没有后续节点。
1.2.2 (s = h.next) == null为false,s.thread != Thread.currentThread()为true
表示头结点之后有后续节点,但是头节点的下一个节点不是当前线程
2.第二种结果——返回false,无需排队。(2.1和2.2有一个存在)
2.1 h != t为false,即h == t;表示h和t同时为null或者h和t是同一个节点,无后续节点。
2.2 h != t为true,((s = h.next) == null || s.thread != Thread.currentThread())为false
表示队列中至少有两个不同节点存在,同时持有锁的线程为当前线程。
1 public final boolean hasQueuedPredecessors() { 2 Node t = tail; // Read fields in reverse initialization order 3 Node h = head; 4 Node s; 5 return h != t && 6 ((s = h.next) == null || s.thread != Thread.currentThread()); 7 }