分布式锁(3)-Redisson实现
文章分布式锁(2)- 基于Redis的实现中,最后给出的redis实现的分布式锁,还有一个严重的问题,那就是这种实现是不可重入的,而要实现可重入的分布式锁,会很麻烦,幸亏已经有现成的轮子可以使用。
系列文章链接:
- 分布式锁(1)-分布式锁简介
- 分布式锁(2)-基于Redis的实现
- 分布式锁(3)-Redisson实现
- 分布式锁(4)-基于Mysql实现
- 分布式锁(5)-MLock使用介绍(自己实现,基于redis,适用于真实项目)
1.Redisson简介
相对于Jedis而言,Redisson强大的一批。当然了,随之而来的就是它的复杂性。它里面也实现了分布式锁,而且包含多种类型的锁,更多请参阅分布式锁和同步器
2.可重入锁
首先引入jar包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.1</version>
</dependency>
然后,通过配置获取`RedissonClient
客户端的实例,然后getLock
获取锁的实例,进行操作即可。
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
config.useSingleServer().setPassword("redis1234");
final RedissonClient client = Redisson.create(config);
RLock lock = client.getLock("lock1");
try{
lock.lock();
}finally{
lock.unlock();
}
}
3.获取锁实例
我们先来看RLock lock = client.getLock("lock1");
这句代码就是为了获取锁的实例,然后我们可以看到它返回的是一个RedissonLock
对象。
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
在RedissonLock
构造方法中,主要初始化一些属性。
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
//命令执行器
this.commandExecutor = commandExecutor;
//UUID字符串
this.id = commandExecutor.getConnectionManager().getId();
//内部锁过期时间
this.internalLockLeaseTime = commandExecutor.
getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
4.加锁
当我们调用lock
方法,定位到lockInterruptibly
。在这里,完成了加锁的逻辑。
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//当前线程ID
long threadId = Thread.currentThread().getId();
//尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 如果ttl为空,则证明获取锁成功
if (ttl == null) {
return;
}
//如果获取锁失败,则订阅到对应这个锁的channel
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
//再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
//ttl为空,说明成功获取锁,返回
if (ttl == null) {
break;
}
//ttl大于0 则等待ttl时间后继续尝试获取
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
//取消对channel的订阅
unsubscribe(future, threadId);
}
//get(lockAsync(leaseTime, unit));
}
如上代码,就是加锁的全过程。先调用tryAcquire
来获取锁,如果返回值ttl为空,则证明加锁成功,返回;如果不为空,则证明加锁失败。这时候,它会订阅这个锁的Channel,等待锁释放的消息,然后重新尝试获取锁。流程如下:
获取锁
获取锁的过程是怎样的呢?接下来就要看tryAcquire
方法。在这里,它有两种处理方式,一种是带有过期时间的锁,一种是不带过期时间的锁。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//如果带有过期时间,则按照普通方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//先按照30秒的过期时间来执行获取锁的方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
接着往下看,tryLockInnerAsync
方法是真正执行获取锁的逻辑,它是一段LUA脚本代码。在这里,它使用的是hash数据结构。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,
long threadId, RedisStrictCommand<T> command) {
//过期时间
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//如果锁不存在,则通过hset设置它的值,并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在,但并非本线程,则返回过期时间ttl
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
这段LUA代码看起来并不复杂,有三个判断:
- 通过exists判断,如果锁不存在,则设置值和过期时间,加锁成功
- 通过hexists判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功
- 如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间,加锁失败
加锁成功后,在redis的内存数据中,就有一条hash结构的数据。Key为锁的名称;field为随机字符串+线程ID;值为1。如果同一线程多次调用lock
方法,值递增1。
127.0.0.1:6379> hgetall lock1
1) "b5ae0be4-5623-45a5-8faa-ab7eb167ce87:1"
2) "1"
5.解锁
我们通过调用unlock
方法来解锁。
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
//解锁方法
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
//获取返回值
Boolean opStatus = future.getNow();
//如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
if (opStatus == null) {
IllegalMonitorStateException cause =
new IllegalMonitorStateException("
attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
//解锁成功,取消刷新过期时间的那个定时任务
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
然后我们再看unlockInnerAsync
方法。这里也是一段LUA脚本代码。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
//如果锁已经不存在, 发布锁释放的消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//通过hincrby递减1的方式,释放一次锁
//若剩余次数大于0 ,则刷新过期时间
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//否则证明锁已经释放,删除key并发布锁释放的消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
如上代码,就是释放锁的逻辑。同样的,它也是有三个判断:
- 如果锁已经不存在,通过publish发布锁释放的消息,解锁成功
- 如果解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常
- 通过hincrby递减1,先释放一次锁。若剩余次数还大于0,则证明当前锁是重入锁,刷新过期时间;若剩余次数小于0,删除key并发布锁释放的消息,解锁成功
至此,Redisson中的可重入锁的逻辑,就分析完了。但值得注意的是,上面的两种实现方式都是针对单机Redis实例而进行的。如果我们有多个Redis实例,请参阅Redlock算法。该算法的具体内容,请参考http://redis.cn/topics/distlo...