ReentrantLock

ReentrantReadWriteLock

Lock和synchronized的简单对比

AQS(AbstractQueuedSynchronizer)

总结:独占锁是一种悲观保守的加锁策略,它限制了读/读冲突,如果某个只读线程获取锁,则其他读线程都只
能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性。共享锁则是一种乐观锁,它
放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。

内部实现

compareAndSet

结论:

  • 首先, cpu会把内存中将要被更改的数据与期望值做比较
  • 然后,当两个值相等时,cpu才会将内存中的对象替换为新的值。否则,不做变更操作
  • 最后,返回操作执行结果

    很显然,这是一种乐观锁的实现思路。

ReentrantLock的实现原理分析

公平锁和非公平锁的区别

ReentrantLock.lock

sync有两个实现:

  • NonfairSync(非公平锁)

  • FailSync(公平锁)

    下面是非公平锁的解析:

  • AbstractQueuedSynchronizer.acquire
> 主要作用是:
>
> 1. 尝试获取独占锁,获取成功则返回
> 2. 自旋获取锁,并且判断中断标识,如果中断标识为true,则设置线程中断
> 3. addWaiter方法把当前线程封装成Node,并添加到队列的尾部
  • NonfairSync.tryAcquire

    protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
    }
final boolean nonfairTryAcquire(int acquires) {
  final Thread current = Thread.currentThread();
  int c = getState(); //获取当前的状态,前面讲过,默认情况下是0表示无锁状态
  if (c == 0) {
    if (compareAndSetState(0, acquires)) { //通过cas来改变state状态的值,如果更新成功,表示获取锁成功, 这个操作外部方法lock()就做过一次,这里再做只是为了再尝试一次,尽量以最简单的方式获取锁。
      setExclusiveOwnerThread(current);
      return true;
   }
 }
  else if (current == getExclusiveOwnerThread()) {//如果当前线程等于获取锁的线程,表示重入,直接累加重入次数
    int nextc = c + acquires;
    if (nextc < 0) // overflow 如果这个状态值越界,抛出异常;如果没有越界,则设置后返回true
      throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
 }
  //如果状态不为0,且当前线程不是owner,则返回false。
  return false; //获取锁失败,返回false
}
  • addWaiter
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode); //创建一个独占的Node节点,mode为排他模式
 // 尝试快速入队,如果失败则降级至full enq
  Node pred = tail; // tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方
法
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) { // 防止有其他线程修改tail,使用CAS进行修改,如果失败则降级至full enq
      pred.next = node; // 如果成功之后旧的tail的next指针再指向新的tail,成为双向链表
      return node;
   }
 }
  enq(node); // 如果队列为null或者CAS设置新的tail失败
  return node;
}
  • enq
private Node enq(final Node node) {
  for (;;) {  //无效的循环,为什么采用for(;;),是因为它执行的指令少,不占用寄存器
    Node t = tail;// 此时head, tail都为null
    if (t == null) { // Must initialize// 如果tail为null则说明队列首次使用,需要进行初始化
      if (compareAndSetHead(new Node()))// 设置头节点,如果失败则存在竞争,留至下一轮循环
        tail = head; // 用CAS的方式创建一个空的Node作为头结点,因为此时队列中只一个头结点,所以tail也指向head,第一次循环执行结束
   } else {
//进行第二次循环时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
//这部分代码和addWaiter代码一样,将当前节点添加到队列
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node; //t此时指向tail,所以可以CAS成功,将tail重新指向CNode。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点
        return t;
     }
   }
 }
}

aqs队列的结构:

  • acquireQueued
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出
NullPointException
      if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺
        setHead(node); // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点
      //凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null
        p.next = null; // help GC
        failed = false; //获取锁成功
        return interrupted;
     }
     //如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志
        interrupted = true;
   }
 } finally {
    if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作
      cancelAcquire(node);
 }
}
  • shouldParkAfterFailedAcquire

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus; //前继节点的状态
      if (ws == Node.SIGNAL)//如果是SIGNAL状态,意味着当前线程需要被unpark唤醒
           return true;
    //如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。
      if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节
    点'的前继节点”。
      
        do {
          node.prev = pred = pred.prev;
       } while (pred.waitStatus > 0);
        pred.next = node;
     } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
        /*
        * waitStatus must be 0 or PROPAGATE. Indicate that we
        * need a signal, but don't park yet. Caller will need to
        * retry to make sure it cannot acquire before parking.
        */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     }
      return false;
    }
  • node节点状态

    SIGNAL-1当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后继节
    CONDITION-2当前节点在等待condition,即在condition队列中
    PROPAGATE-3releaseShared需要被传播给后续节点(仅在共享模式下使用)
    CANCELLED1标示线程已取消
  • parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this);// LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞
      return Thread.interrupted();
    }

    ReentrantLock.unlock

 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
protected final boolean tryRelease(int releases) {
  int c = getState() - releases; // 这里是将锁的数量减1
  if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
      throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
      // 由于重入的关系,不是每次释放锁c都等于0,
      // 直到最后一次释放锁时,才会把当前线程释放
        free = true;
        setExclusiveOwnerThread(null);
     }
      setState(c);
      return free;
}

LockSupport

public native void unpark(Thread jthread);
public native void park(boolean isAbsolute, long time); 

总结:

公平锁和非公平锁的区别

差异点有两个:

  • FairSync.tryAcquire

    final void** lock() {
      acquire(1);
    }

    非公平锁在获取锁的时候,会先通过CAS进行抢占,而公平锁则不会

  • FairSync.tryAcquire

    protected final boolean* tryAcquire(int acquires) {
      final Thread current = Thread.currentThread*();
      int c = getState();
      if (c == 0) {
        if (!hasQueuedPredecessors() &&
          compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
        }
      }
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }

    ​ 这个方法与nonfairTryAcquire(int acquires)比较,不同的地方在于判断条件多了hasQueuedPredecessors()方法,也就是加入了[同步队列中当前节点是否有前驱节点]的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

01-08 15:53