上篇文章大致说了下 ReentrantLock 类的使用,对 ReentrantLock 类有了初步的认识之后让我们一起来看下基于 ReentrantLock 的几种细粒度锁实现。

这里我们还是接着用之前 synchronize 关键字加锁实现线程安全 文章中举的账户扣款的例子好了,不过这里为了更贴近系统的功能实现,我们换一下思路,功能实现不变,只是把钱转换成我们系统中的使用的礼券好了,用户每次在系统中购买某项功能需要支付一定的礼券。那既然要实现细粒度锁,那就意味着不同用户账户扣除礼券的操作互不影响,只需要保证相同账户下的礼券扣除操作是线程安全的即可,而且仅仅是扣除礼券的那一小部分的代码块,对于账户的校验我们可以接受并发执行。

分段锁

我们先来想象这样一个场景好了,可能我们的系统只有那么一小部分的忠实用户,他们更愿意在我们的系统中通过礼券购买,从而使用一些特殊的功能,更多的用户只是在使用我们系统的基本功能。接下来我们需要考虑这样两种情况:

  • 同一用户在同一时间通过礼券购买系统的付费功能,也就是在扣款时需要保证线程安全的情况
  • 不同用户在同一时间通过礼券购买系统的付费功能,这是系统的付费用户并发量,这个和我们需要创建的细粒度锁的数量有关。

基于上面我们描述的场景可以看出,对于第一种情况只要有礼券消费就可能存在,虽然不常出现但肯定存在,那么我们就需要通过细粒度锁来控制礼券的扣除操作。而第二种情况也是存在的,但是由于付费用户的基数不是很大,所以这种情况也可以说很少了,这也就意味着在同一时间我们实际要创建的锁的数量并不是很多。这里稍微解释一下,因为在礼券消费的时候我们肯定是要创建锁来保证线程安全,而且锁是和用户绑定的,同一时间对于同一用户只会创建一把锁,而同一时间如果很多用户都在消费礼券,那这一刻就是有多少用户就需要创建多少把锁了。

好了,通过上面的分析我们来看下分段锁是如何实现细粒度锁的。

public class SegmentLock<T> {

    /**
     * 默认预先创建的锁数量.
     */
    private int DEFAULT_LOCK_COUNT = 20;

    private final ConcurrentHashMap<Integer, ReentrantLock> lockMap = new ConcurrentHashMap<>();

    public SegmentLock() {
        init(null, false);
    }

    public SegmentLock(Integer count, boolean isFair) {
        init(count, isFair);
    }

    private void init(Integer count, boolean isFair) {
        if (count != null && count != 0) {
            this.DEFAULT_LOCK_COUNT = count;
        }
        // 预先初始化指定数量的锁
        for (int i = 0; i < this.DEFAULT_LOCK_COUNT; i++) {
            this.lockMap.put(i, new ReentrantLock(isFair));
        }
    }

    public ReentrantLock get(T key) {
        return this.lockMap.get((key.hashCode() >>> 1) % DEFAULT_LOCK_COUNT);
    }

    public void lock(T key) {
        ReentrantLock lock = this.get(key);
        lock.lock();
    }

    public void unlock(T key) {
        ReentrantLock lock = this.get(key);
        lock.unlock();
    }
}

由于上一篇 synchronized 的替代品 ReentrantLock 文章中我们对 ReentrantLock 已经有了了解了,上面的代码看起来就很简单,SegmentLock 类构造方法中调用 init 方法预先初始化指定数量的锁,然后就是提供了锁的获取以及加锁和解锁的方法。这里需要注意的一个地方就是我们获取锁的方式是:使用 key 的 hashCode 值向右无符号位移一位得到的值再对锁的数量取余,然后再用这个值作为索引去 Map 中获取锁。至于这里为什么要无符号位移呢,那是因为 hashCode 值有可能是得到一个负数,取余之后还是一个负数,用一个负数索引去 Map 中取值得到就是 null,会导致后面在使用时产生 NPE。接下来一起来看看如何使用:

private static SegmentLock<String> segmentLock = new SegmentLock<>();

private static void consume(String userId, int amount) throws InterruptedException {
  System.out.println("verify that the account is normal...");
  TimeUnit.MILLISECONDS.sleep(500);
  ReentrantLock lock = segmentLock.get(userId);
  lock.lock();
  try {
    System.out.println("enter the deduction code block");
    Integer userAccountBalance = accountMap.get(userId);
    if (userAccountBalance >= amount) {
      // deduction
      TimeUnit.MILLISECONDS.sleep(2000);
      userAccountBalance -= amount;
      accountMap.put(userId, userAccountBalance);
      System.out.println(Thread.currentThread().getName() + " deduction success");
    } else {
      TimeUnit.MILLISECONDS.sleep(1000);
      System.out.println(Thread.currentThread().getName() + " deduction failed, insufficient account balance.");
    }
  } finally {
    lock.unlock();
  }
}

从使用代码中可以看出,基本上和 ReentrantLock 的使用一样,只是由于我们预先创建好了一定数量的锁,直接根据用户 id 取锁,然后再进行加锁解锁的操作,这可以减少锁创建的性能开销,这对于并发付费的用户量不大的情况下性能会有很好的提升,这也是为什么采用分段锁的原因。

这里有个问题就是如果刚好两个用户 id 的 hash 值一样或者说 hash 值取余的结果一样,那么这两个用户获取的就是同一把锁,那在同时进行礼券消费时,一个用户先获取了锁并执行了加锁操作,另一个用户也获取了这把锁,在执行加锁的时候就需要等了,因为上一个用户已经用这把锁进行加锁操作了,等到上一个用户执行成功锁释放了之后就能进行加锁了。

但你想想,本身付费用户基数就不大,这种情况出现的概率其实很小了,所以在这里其实问题不大,只是提醒一下可能会有这种情况的出现,那么就需要注意加锁的代码块如果本来执行时间就很长的情况,这里可能会让其中一个用户等待的时间加倍,当然一般也不建议锁住一个资源很长时间,也就是要加锁的代码块执行时间很长。

优点:对于付费用户基数不大时,由于预创建了一部分锁,所以在付费加锁时性能表现很好。

缺点:可能会出现不同用户获取到相同锁的情况,导致用户需要等待上一用户释放锁后才能加锁往下执行。

哈希锁

对于分段锁,其实我们是设定了一个并发付费用户量不是很大的场景,那如果说我们的系统随着慢慢的运营迭代已经俘获了更多的忠实用户,越来越多的用户认可我们的系统,这时候可能并发付费用户量已经上来了,而且有少部分用户已经开始抱怨我们的系统在付费的时候有卡顿,需要等一段时间。

这很可能就是我们所实现的分段锁数量已经不够了,很多用户 id 的 hash 值经过取余之后结果是一样的,那么获取的锁也是一样的,这时就需要等了,严重的时候可能多个用户同一时间获取的都是同一把锁,这时等待的时间就更长了。

对此我们就需要进行优化改进了,既然是锁不够那就需要创建更多的锁,那是不是可以直接预创建更多数量的锁呢,但这时付费用户基数已经上来了,我们可能需要为每一个用户分配一把锁,这种方式恐怕不行,一是我们不知道哪些用户需要用,还有就是一上来就创建这么多的锁也不现实啊。

那么我们可以为每一个来付费的用户创建一把锁,付费结束之后再移除,注意不是预创建,而是付费时再创建,创建好之后同时会把锁放到 Map 中,以防同一个用户并发付费,这时就可以直接 Map 中获取同时加锁的次数加一,等到锁释放的时候再看下加锁的次数,如果等于一则从 Map 中移除,否则暂先不移除,只是释放锁就好了。来看看代码实现:

public class HashLock<T> {
    private boolean fair = false;
    private final SegmentLock<T> segmentLock = new SegmentLock<>();
    private final ConcurrentHashMap<T, ReentrantLockCount> lockMap = new ConcurrentHashMap<>();

    public HashLock() {

    }

    public HashLock(boolean fair) {
        this.fair = fair;
    }

    public void lock(T key) {
        ReentrantLockCount lock;
        // 通过分段锁来保证获取锁时的线程安全
        this.segmentLock.lock(key);
        try {
            lock = this.lockMap.get(key);
            if (lock == null) {
                lock = new ReentrantLockCount(this.fair);
                this.lockMap.put(key, lock);
            } else {
                // map 中已经存在说明锁已经创建,直接数量加一
                lock.incrementAndGet();
            }
        } finally {
            this.segmentLock.unlock(key);
        }
        lock.lock();
    }

    public void unlock(T key) {
        ReentrantLockCount reentrantLockCount = this.lockMap.get(key);
        // 判断加锁的次数等于一的话可以将 map 中的锁移除
        if (reentrantLockCount.getCount() == 1) {
            this.segmentLock.lock(key);
            try {
                if (reentrantLockCount.getCount() == 1) {
                    this.lockMap.remove(key);
                }
            } finally {
                this.segmentLock.unlock(key);
            }
        }
        reentrantLockCount.unlock();
    }

    static class ReentrantLockCount {
        private ReentrantLock reentrantLock;
        // 记录加锁的次数
        private AtomicInteger count = new AtomicInteger(1);

        public ReentrantLockCount(boolean fair) {
            this.reentrantLock = new ReentrantLock(fair);
        }

        public void lock() {
            this.reentrantLock.lock();
        }

        public void unlock() {
            this.count.decrementAndGet();
            this.reentrantLock.unlock();
        }

        public int incrementAndGet() {
            return this.count.incrementAndGet();
        }

        public int getCount() {
            return this.count.get();
        }
    }
}

上面代码实现主要看 lock 和 unlock 方法就好了,当然还有一个包装了 ReentrantLock 的内部类 ReentrantLockCount,其中有一个字段来统计加锁的次数,这是为了避免同一个用户进行并发付费的时候重复创建锁,直接 Map 中获取,释放的时候如果加锁次数只有一次就可以直接移除。上面的 lock 方法和 unlock 方法采用了分段锁来保证锁的获取过程和移除过程是线程安全,不然可能导致锁的重复创建和重复移除问题。至于使用其实和分段锁差不多的:

private static HashLock<String> hashLock = new HashLock<>();
private static void consume1(String userId, int amount) {
  System.out.println("verify that the account is normal...");
  TimeUnit.MILLISECONDS.sleep(500);
  hashLock.lock(userId);
  try {
    System.out.println("enter the deduction code block");
    Integer userAccountBalance = accountMap.get(userId);
    if (userAccountBalance >= amount) {
      // deduction
      TimeUnit.MILLISECONDS.sleep(2000);
      userAccountBalance -= amount;
      accountMap.put(userId, userAccountBalance);
      System.out.println(Thread.currentThread().getName() + " deduction success");
    } else {
      TimeUnit.MILLISECONDS.sleep(1000);
      System.out.println(Thread.currentThread().getName() + " deduction failed, insufficient account balance.");
    }
  } finally {
    hashLock.unlock(userId);
  }
}

使用代码其实和分段锁的使用差不多的,至于里面加的那些睡眠可以不用管,只是为了测试的时候更能看出效果而已。

优点:很好地解决了不同用户共用锁的问题

缺点:需要通过分段锁来维护锁的获取和移除,同时还要维护加锁的次数,分段锁这里锁的数量会成为性能的瓶颈,而且稍有不慎锁没释放成功可能会产生内存泄漏的问题。

弱引用锁

上面对 HashLock 缺点中也提到由于需要通过分段锁来维护锁的获取和移除,同时分段锁锁的数量可能会成为性能的瓶颈。那么有没有更好地解决办法呢。既然说到了这里那肯定是有的,这里就涉及到了之前 谈谈 Java 中的各种引用类型 这篇文章中的知识点了,利用弱引用的特性,这样就能够拿掉分段锁,把锁对象的资源回收交给 Java 虚拟机,然后对于已经被回收的锁进行移除,能有效避免不小心发生内存泄漏的问题。代码实现:

public class WeakHashLock<T> {

    /**
     * map 中锁数量阈值.
     */
    private static final int LOCK_SIZE_THRESHOLD = 1000;
    private ReferenceQueue<ReentrantLock> queue = new ReferenceQueue<>();
    private ConcurrentHashMap<T, WeakRefLock<T, ReentrantLock>> lockMap = new ConcurrentHashMap<>();

    public ReentrantLock get(T key) {
        // 可以设置一个阈值,当锁的数量超过这个阈值时移除一部分被回收的锁
        if (this.lockMap.size() > LOCK_SIZE_THRESHOLD) {
            clearEmptyRef();
        }

        WeakRefLock<T, ReentrantLock> weakRefLock = this.lockMap.get(key);
        ReentrantLock lock = weakRefLock == null ? null : weakRefLock.get();
        while (lock == null) {
            lockMap.putIfAbsent(key, new WeakRefLock<>(new ReentrantLock(), this.queue, key));
          	// 再次从 Map 中获取,保证同一用户获取的锁是一致的
            weakRefLock = lockMap.get(key);
            lock = weakRefLock == null ? null : weakRefLock.get();
            if (lock != null) {
                return lock;
            }
            // 这里注意如果堆资源过于紧张可能会返回空的情况,需要移除一部分被回收的锁
            clearEmptyRef();
        }
        return lock;
    }

    @SuppressWarnings("unchecked")
    public void clearEmptyRef() {
        Reference<? extends ReentrantLock> ref;
        while ((ref = this.queue.poll()) != null) {
            WeakRefLock<T, ? extends ReentrantLock> weakRefLock = (WeakRefLock<T, ? extends ReentrantLock>) ref;
            this.lockMap.remove(weakRefLock.key);
        }
    }


    static final class WeakRefLock<T, K> extends WeakReference<K> {

        private final T key;

        public WeakRefLock(K referent, ReferenceQueue<? super K> queue, T key) {
            super(referent, queue);
            this.key = key;
        }
    }
}

上面代码中主要是利用了弱引用的特性,拿掉了锁的获取和创建时维护加锁次数的判断过程,在获取锁时直接从 Map 中获取,如果拿到为空则创建,同时这里要解释一下代码里面清理被回收的锁的过程。第一处在 Map 中的锁数量超过设定的阈值后将已经被回收的锁进行移除,主要是为了不让 Map 中存放过多的已经被回收的锁占用资源,第二处移除主要是以防资源过于紧张的情况,刚刚创建的弱引用锁立即就被回收了,这时急需移除一部分已经被回收的锁。当然如果资源真的都已经紧张到这个程度了的话,也应该考虑考虑提高一下机器的配置了。使用代码:

private static WeakHashLock<String> weakHashLock = new WeakHashLock<>();

private static void consume2(String userId, int amount) {
  System.out.println("verify that the account is normal...");
  TimeUnit.MILLISECONDS.sleep(500);
  ReentrantLock lock = weakHashLock.get(userId);
  lock.lock();
  try {
    System.out.println("enter the deduction code block");
    Integer userAccountBalance = accountMap.get(userId);
    if (userAccountBalance >= amount) {
      // deduction
      TimeUnit.MILLISECONDS.sleep(2000);
      userAccountBalance -= amount;
      accountMap.put(userId, userAccountBalance);
      System.out.println(Thread.currentThread().getName() + " deduction success");
    } else {
      TimeUnit.MILLISECONDS.sleep(1000);
      System.out.println(Thread.currentThread().getName() + " deduction failed, insufficient account balance.");
    }
  } finally {
    lock.unlock();
  }
}

使用上这里就不多说了,都基本上差不多,主要的区别还是锁的实现上不同而已。

优点:利用了弱引用的特性,解除了分段锁那部分代码带来的性能瓶颈问题,将回收操作交给 Java 虚拟机。

缺点:获取锁的代码实现看起来有点繁琐,应该还有更优雅的方式实现。

好了,到这里就基本上将细粒度锁的实现方式都说完了,每种实现方式优缺点也大概总结了一下,根据优缺点其实也就能够知道每种实现方式所适用的场景,在选择的时候根据业务需求来进行选择。对于最后一种实现方式其实还有更优雅的实现,我们接下来再用一篇文章来说下最后一种实现方式。

Java 中常见的细粒度锁实现-LMLPHP

11-24 02:23