一、前言
更多内容见Zookeeper专栏:https://blog.csdn.net/saintmm/category_11579394.html
至此,Zookeeper系列的内容已出:
紧接着上一篇的内容,从源码层面来看curator是如何实现zookeeper分布式锁的?
二、curator分布式锁种类
curator提供了四种分布式锁,都实现自接口InterProcessLock
;
JAVA-doc:https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/package-summary.html
下面以可重入排他锁InterProcessMutex
为例,展开讨论;
三、Zookeeper分布式锁概述
1、Zookeeper分布式锁实现思路
Zookeeper实现排他锁的设计思路如下:
-
zk用
/lock
节点作为分布式锁,当不同的客户端到zk竞争这把锁的时候,zk会按顺序给不同的客户端创建一个临时子节点,挂在作为分布式锁的节点下面。 -
假设第一个来到的客户端为A,第二个来到的是B,分布式锁节点下挂的第一个节点就是A(
/lock/_c_A
),B(/lock/_c_B
)紧跟着A,且B会监听着A的生命状态;- 这里B会先获取到
/lock
路径下所有的节点,发现自己的锁节点(/lock/_c_B
)不在第一位,进而监听自己前一位的锁节点(/lock/_c_A
)。
- 这里B会先获取到
-
当A释放锁后A节点会被删除;B监听到A被删除,B可以尝试获得分布式锁了。
-
具体体现为:客户端B获取
/lock
下的所有子节点,并进行排序,判断排在最前面的是否为自己,如果自己的锁节点在第一位,代表取锁成功。 -
如果还有C节点、D节点,他们都只会监听他们前一个节点,即:C监听B、D监听C。
-
2、Zookeeper分布式锁解决的问题
3、Zookeeper分布式锁优缺点?
四、InterProcessMute实现分布式锁原理
InterProcessMute首先是一个互斥锁,其次是依赖Zookeeper临时顺序节点实现的分布式锁;对于锁而言,最重要的是保护临界区,让多个线程对临界区的访问互斥;InterProcessMute依赖Zookeeper临时顺序节点的有序性实现分布式环境下互斥,依赖JVM层面的synchronized实现节点监听的互斥(防止羊群效应)。
InterProcessMute的acquire()
方法用于获取锁,release()
方法用于释放锁。
以如下测试类为例,展开源码分析:
public class LockTest {
public static void main(String[] args) {
//重试策略,定义初试时间3s,重试3次
ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 3);
//初始化客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(3000)
.connectionTimeoutMs(3000)
.retryPolicy(exponentialBackoffRetry)
.build();
// start()开始连接,没有此会报错
client.start();
//利用zookeeper的类似于文件系统的特性进行加锁 第二个参数指定锁的路径
InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/lock");
try {
//加锁
interProcessMutex.acquire();
System.out.println(Thread.currentThread().getName() + "获取锁成功");
Thread.sleep(60_000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//释放锁
interProcessMutex.release();
System.out.println(Thread.currentThread().getName() + "释放锁成功");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
1、加锁流程(acquire()方法)
InterProcessMutex#acquire()方法:
acquire()方法中直接调用internalLock()方法以不加锁成功就一直等待
的方式加锁;
如果加锁出现异常,则直接抛出IOException。
0)加锁流程图
1)internalLock()
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
// 当前线程
Thread currentThread = Thread.currentThread();
// 当前线程持有的锁信息
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// 可重入,lockCount +1;
// 此处只在本地变量变化了,没发生任何网络请求;对比redisson的分布式锁可重入的实现是需要操作redis的
lockData.lockCount.incrementAndGet();
return true;
}
// 进行加锁,继续往里跟
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
// 加锁成功
LockData newLockData = new LockData(currentThread, lockPath);
// 放入map
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
internalLock()方法有两个入参:long类型的time 和 TimeUnit类型的 unit 共同表示加锁的超时时间。
一个InterProcessMutex在同一个JVM中可以由多个线程共同操作,因为其可重入性体现在JVM的线程层面,所以其维护了一个Map类型的变量threadData
:
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
用于记录每个线程持有的锁信息;锁信息采用LockData表示;
LockData
LockData是InterProcessMutex的静态内部类,其仅有三个变量:持有锁的线程、锁路径、锁重入的次数;
private static class LockData {
// 持有锁的线程
final Thread owningThread;
// 锁的路径
final String lockPath;
// 重入锁的次数
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
internalLock()方法逻辑
2)LockInternals#attemptLock() --> 尝试加锁
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
final long startMillis = System.currentTimeMillis();
// 将时间统一格式化ms
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while (!isDone) {
isDone = true;
try {
// 创建临时有序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 判断是否为第一个节点 如果是表明加锁成功。跟进去
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
} catch (KeeperException.NoNodeException e) {
// 重试机制
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
isDone = false;
} else {
throw e;
}
}
}
if (hasTheLock) {
return ourPath;
}
return null;
}
attemptLock()方法有三个入参:long类型的time 和 TimeUnit类型的 unit 共同表示尝试加锁的超时时间,字节数组类型的lockNodeBytes表示锁路径对应的节点值。
通过InterProcessMutex#internalLock()方法进入到attemptLock()方法时,lockNodeBytes为null,即:不给锁路径对应的节点赋值。
尝试加锁逻辑:
1> StandardLockInternalsDriver#createsTheLock() --> 创建临时有序节点
为什么LockInternalsDriver接口的实现是StandardLockInternalsDriver?
- 因为在LockInternals构造器被调用时,传入的LockInternalsDriver是StandardLockInternalsDriver。
createsTheLock()方法:
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
方法中会级联创建锁路径,即:锁路径的父路径不存在时,会一级一级的创建,而不是像原生的zookeeper create命令一样报错–父路径不存在。
2> 判断刚创建的锁路径是否为第一个节点
LockInternals#internalLockLoop()方法:
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
// debug进不去,暂时忽略
if (revocable.get() != null) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
// 获取锁成功才会退出这个while 或者客户端状态不正常
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
// 获取所有子节点(网络IO访问Zookeeper) 并排好序
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
/**
* 判断是否为第一个节点
* 返回值predicateResults中的getsTheLock()表示加锁是否成功;
* pathToWatch()表示加锁失败后监听的节点
*/
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {
// 当前节点是第一个节点 加锁成功 退出while循环
haveTheLock = true;
} else {
// 监听上一个节点 getPathToWatch()返回的就是自己前面的节点
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
// 这里加互斥锁的对象和Watcher唤醒的对象是一样的
synchronized (this) {
try {
/**
* 监听前一个节点,watcher里面会进行唤醒;
* 这里只会监听前一个节点,防止羊群效应。这块对比redisson是使用pubsub 唤醒全部节点
*/
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true; // timed out - delete our node
break;
}
// 加锁的时间限制
wait(millisToWait);
} else {
// 加锁没有时间限制,则一直等待
wait();
}
} catch (KeeperException.NoNodeException e) {
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
} finally {
// 加锁超时 或 加锁异常 后删除当前节点
if (doDelete) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
internalLockLoop()方法逻辑:
此外,StandardLockInternalsDriver#getsTheLock()方法负责判断当前节点是否为第一个节点、如果当前节点不是第一个节点,当前节点应该监听哪一个节点;
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
// 查看当前节点在所有有序节点中的位置
int ourIndex = children.indexOf(sequenceNodeName);
// 校验节点位置不能 < 0
validateOurIndex(sequenceNodeName, ourIndex);
// maxLeases为1,如果节点是顺序节点中的第一个,表示可以获取到锁,maxLeases为1
boolean getsTheLock = ourIndex < maxLeases;
// 如果获取不到锁,pathToWatch赋值为当前节点的前一个节点,即:Watcher去监听当前节点的前一个节点
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
3)监听器的运作
获取锁失败监听当前节点的前一个节点时,会针对LockInternals类加互斥锁,然后挂起线程;等到相应事件触发监听器时,需要调用notify()唤醒这个线程;
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到监听的事件之后进行唤醒,唤醒的对象和synchronized的对象是同一个
client.postSafeNotify(LockInternals.this);
}
};
// CuratorFramework类的方法
default CompletableFuture<Void> postSafeNotify(Object monitorHolder)
{
return runSafe(() -> {
synchronized(monitorHolder) {
// 唤醒wait的线程 注意此处是全部唤醒 去检查自己是不是第一个节点
monitorHolder.notifyAll();
}
});
}
Watcher的实现并没有判断事件是什么,而是直接调用client.postSafeNotify()方法,进而调用给定对象(LockInternals
)的notifyAll()方法唤醒所有挂起的线程;
之前获取锁失败用于挂起线程的wait(),也是在LockInternals.this对象上调用的,这样JVM层面的互斥锁就对上了。
挂起的线程被Watcher唤醒之后,会回到LockInternals#internalLockLoop()
方法继续while循环,判断当前节点是否为第一个节点,进而决定获取分布式锁是否成功。
2、解锁流程(release()方法)
0)解锁流程图
1)文字描述
InterProcessMutex#release()方法:
@Override
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
// 当前线程未持有锁,则抛出异常
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
// 当前线程持有锁,并且持有多次,持有锁次数 - 1
if (newLockCount > 0) {
// 表示锁重入,不直接删除节点
return;
}
// 线程当前持有锁数量 < 0,抛出异常(理论上不会出现这种情况)
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
// 释放锁:删除临时节点,删除watch
internals.releaseLock(lockData.lockPath);
} finally {
// 从threadData中移除当前线程持有锁的信息
threadData.remove(currentThread);
}
}
释放锁的逻辑很简单:
五、总结
加锁实现:
- 向Zookeeper中添加顺序临时节点
- 根据顺序节点的序号,判断当前线程创建的临时节点是否是第一个,是则获取锁成功,否则Watcher前一个节点,挂起线程。
- 前一个节点获取到锁并释放锁之后,当前节点所在线程被唤醒,获取到锁,加锁成功
解锁实现
- 当前线程未持有锁,则抛出异常IllegalMonitorStateException;
- 当前线程持有锁,并且持有多次,持有锁次数 - 1;
- 线程当前持有锁数量 < 1,抛出异常IllegalMonitorStateException(理论上不会出现这种情况);
- 如果持有的锁次数为1,则释放锁:删除临时节点,删除watch;从threadData中移除当前线程持有锁的信息;
网络IO次数
整个加锁过程:
- 锁重入,不会访问ZK;
- 加锁成功,会访问两次ZK:创建临时有序节点、获取锁路径的所有子节点;
- 加锁失败,会访问3次ZK:创建临时有序节点、获取锁路径的所有子节点、对上一节点节点创建Watcher。
整个解锁过程:
- 锁重入,不会访问ZK;
- 正常释放锁,会访问两次ZK:删除当前节点路径的监听器、删除当前节点路径