引言
前文我们有介绍《看了CopyOnWriteArrayList后自己实现了一个CopyOnWriteHashMap》 关于CopyOnWrite
容器的,但是它也有一些缺点:
内存占用问题:因为
CopyOnWrite
的写时复制机制每次进行写操作的时候都会有两个数组对象的内存,如果这个数组对象占用的内存较大的话,如果频繁的进行写入就会造成频繁的Yong GC
和Full GC
数据一致性问题:
CopyOnWrite
容器只能保证数据的最终一致性,不能保证数据的实时一致性。读操作的线程可能不会立即读取到新修改的数据,因为修改操作发生在副本上。但最终修改操作会完成并更新容器所以这是最终一致性。当时有说到解决这两个缺点我们可以使用Collections.synchronizedList()
来替代,找个无非就是对list的增删改查方法都加了synchronized实现。我们知道synchronized
其实是一个独占锁 (排他锁),如果不知道什么是独占锁的可以看看这个文章《史上最全 Java 中各种锁的介绍》 里面基本上把java里面的锁都介绍完了。但是这样的话就会存在一个性能问题,如果对于读多写少的场景,每次读也要去获取锁,读完了之后再释放锁,这样就造成了每个读的请求都要进行获取锁,但是读的话并不会引起数据不安全,这样就会造成一个性能瓶颈。为了解决这个问题,就又出现了一种新的锁,读写锁(ReadWriteLock
)。
什么是读写锁
根据名字我们也可以猜个大概,就是有两把锁,分别是读锁和写锁。读锁在同一时刻可以允许多个读线程获取,但是在写线程访问的时候,所有的读线程和其他写线程都会被阻塞。写锁同一时刻只能有一个写线程获取成功,其他都会被阻塞。读写锁实际维护了两把锁,一个读锁和一个写锁,通过读锁和写锁进行区分,在读多写少的情况下并发性比独占锁有了很大的提升。在java里面对读写锁的实现就是ReentrantReadWriteLock
,它有以下特性:
ReentrantReadWriteLock 的使用
我们先从官网来个事例https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html,看看它是如何使用的
`class RWDictionary {`
`private final Map<String, Data> m = new TreeMap<String, Data>();`
`private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();`
`private final Lock r = rwl.readLock();`
`private final Lock w = rwl.writeLock();`
`public Data get(String key) {`
`r.lock();`
`try { return m.get(key); }`
`finally { r.unlock(); }`
`}`
`public String[] allKeys() {`
`r.lock();`
`try { return m.keySet().toArray(); }`
`finally { r.unlock(); }`
`}`
`public Data put(String key, Data value) {`
`w.lock();`
`try { return m.put(key, value); }`
`finally { w.unlock(); }`
`}`
`public void clear() {`
`w.lock();`
`try { m.clear(); }`
`finally { w.unlock(); }`
`}`
`}`
这个使用起来还是非常简单明了的,跟ReentrantLock
的用法基本一致,写的时候获取写锁,写完了释放写锁,读的时候获取读锁,读完了就释放读写。
读写锁的实现分析
我们知道ReentrantLock
是通过state
来控制锁的状态,以及前面所介绍的《Java高并发编程基础三大利器之Semaphore》《Java高并发编程基础三大利器之CountDownLatch》《Java高并发编程基础三大利器之CyclicBarrier》 都是通过state
来进行实现的那ReentrantReadWriteLock毋庸置疑肯定也是通过AQS
的state
来实现的,不过state
是一个int
值它是如何来读锁和写锁的。
读写锁状态的实现分析
如果我们有看过线程池的源码,我们知道线程池的状态和线程数是通过一个int
类型原子变量(高3
位保存运行状态,低29
位保存线程数)来控制的。同样的ReentrantReadWriteLock
也是通过一个state
的高16
位和低16
位来分别控制读的状态和写状态。
下面我们就来看看它是如何通过一个字段来实现读写分离的,
`static final int SHARED_SHIFT = 16;`
`static final int SHARED_UNIT = (1 << SHARED_SHIFT);`
`static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;`
`static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;`
`/** Returns the number of shared holds represented in count */`
`static int sharedCount(int c) { return c >>> SHARED_SHIFT; }`
`/** Returns the number of exclusive holds represented in count */`
`static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }`
sharedCount
: 读锁数量 是将同步状态(int c
)无符号右移16
位,即取同步状态的高16
位。exclusiveCount
:写锁数量 我们要看下EXCLUSIVE_MASK
这个静态变量:它是1进行左移16位然后减1也就是0X0000FFFF
即(1 << SHARED_SHIFT) - 1= 0X0000FFFF
所以exclusiveCount
就是相当于c&0X0000FFFF
所以也就是低16位用来表示写锁的获取次数。
源码分析
基于jdk1.8 既然ReentrantReadWriteLock
也是基于AQS
来实现的,那么它肯定是重写了AQS
的获取锁的方法,那我们就直接去ReentrantReadWriteLock
这个类里面看看lock
的地方我们先看看获取读锁的地方
`protected final boolean tryAcquire(int acquires) {`
`/*`
`* Walkthrough:`
`* 1. If read count nonzero or write count nonzero`
`* and owner is a different thread, fail.`
`* 2. If count would saturate, fail. (This can only`
`* happen if count is already nonzero.)`
`* 3. Otherwise, this thread is eligible for lock if`
`* it is either a reentrant acquire or`
`* queue policy allows it. If so, update state`
`* and set owner.`
`*/`
`Thread current = Thread.currentThread();`
`// 获取写锁当前的同步状态`
`int c = getState();`
`// 写锁次数`
`int w = exclusiveCount(c);`
`if (c != 0) {`
`// (Note: if c != 0 and w == 0 then shared count != 0)`
`// 当前状态不为0,但是写锁为0 就说明读锁不为0`
`// 当读锁已被读线程获取或者当前线程不是已经获取写锁的线程的话获取写锁失败`
`if (w == 0 || current != getExclusiveOwnerThread())`
`return false;`
`if (w + exclusiveCount(acquires) > MAX_COUNT)`
`throw new Error("Maximum lock count exceeded");`
`// Reentrant acquire 获取到写锁`
`setState(c + acquires);`
`return true;`
`}`
`//writerShouldBlock 公平锁和非公平锁的判断`
`if (writerShouldBlock() ||`
`!compareAndSetState(c, c + acquires))`
`return false;`
`setExclusiveOwnerThread(current);`
`return true;`
`}`
写锁完了,接下来肯定就是读锁了由于读锁是共享锁,所以也应该重写了tryAcquireShared
这个就不贴代码了,和读锁差不多这个就不做分析了。其实把AQS
弄明白了再来看这些基于AQS
来实现的玩意还是比较容易的。
读写锁的升级与降级
前面我们有提到读写锁是可以降级的,但是没有说是否可以升级。我们先看看什么是锁降级和锁升级
锁降级:从写锁变成读锁;它的过程是先持有写锁,在获取读锁,再释放写锁。如果是持有写锁,释放写锁,再获取读锁这种情况不是锁降级。
为什么要锁降级?
- 锁升级:从读锁变成写锁。先持有读锁,再去获取写锁(这是不会成功的)因为获取写锁是独占锁,如果有读锁被占用了,写锁就会放入队列中等待,直至读锁全部被释放之后才有可能获取到写锁。
思考题
本篇文章主要介绍了单机情况的读写锁,如果要实现一个分布式的读写锁该如何实现?
ReentrantReadWriteLock
的饥饿问题如何解决?(ReentrantReadWriteLock实现了读写分离,想要获取读锁就必须确保当前没有其他任何读写锁了,但是一旦读操作比较多的时候,想要获取写锁就变得比较困难了,因为当前有可能会一直存在读锁。而无法获得写锁。)
结束
由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
感谢您的阅读,十分欢迎并感谢您的关注。
站在巨人的肩膀上摘苹果:
《并发编程的艺术》
往期精选
推荐👍 :Java高并发编程基础三大利器之CyclicBarrier
推荐👍 :Java高并发编程基础三大利器之CountDownLatch
推荐👍 :Java高并发编程基础三大利器之Semaphore
推荐👍 :Java高并发编程基础之AQS
推荐👍 :可恶的爬虫直接把生产6台机器爬挂了!