Redis-分布式锁
如何使用分布式锁
正常在一个java服务中使用sync锁或lock锁完全可以满足线程安全问题的,但是在部署集群的情况下,不同的jvm不能锁同一个方法,因此需要分布式锁用来保护线程安全问题。
分布式锁实现
常见的分布式锁解决方案:
- Mysql:自带悲观锁,但是不太好维护
- redis:利用setnx实现互斥,操作方便,推荐使用
- zookeeper:利用节点实现互斥
本章主要采用redis的方式进行实现
public interface ILock {
/**
* 分布式-互斥锁
*/
boolean tryLock(String name, Long time, TimeUnit unit);
/**
* 分布式-释放互斥锁
*/
void unLock(String name);
}
/**
* 分布式锁实现
*/
@Component
public class DistributedLock implements ILock{
@Resource
private StringRedisTemplate stringRedisTemplate;
/** 分布式锁key */
private final String DISTRIBUTED_LOCK = "distributed_lock:";
/**
* 分布式互斥锁
*/
@Override
public boolean tryLock(String name, Long time, TimeUnit unit) {
// value
String value = Thread.currentThread().getId() + "";
// key
String key = DISTRIBUTED_LOCK + name;
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);
// 防止自动拆箱空指针
return BooleanUtil.isTrue(aBoolean);
}
/**
* 分布式释放锁
*/
@Override
public void unLock(String name) {
String key = DISTRIBUTED_LOCK + name;
stringRedisTemplate.delete(key);
}
}
分布式锁误删问题
在设置互斥锁的时候为了解决redis宕机导致互斥锁永久失效的情况下,加了一个过期时间。此时如果缓存重建的时间比过期时间更长,会导致多个线程释放不同的锁资源导致分布式锁误删问题。
解决误删问题:
- 需要在获取锁时存入线程表示(uuid + 线程id)的方式
- 在释放锁时需要先获取锁中的线程标识,判断是否与当前线程标识一致
- 如果一致则释放锁
- 如果不一致则不释放锁
更新代码:
@Component
public class DistributedLock implements ILock{
@Resource
private StringRedisTemplate stringRedisTemplate;
/** 分布式锁key */
private final String DISTRIBUTED_LOCK = "distributed_lock:";
/** UUID */
private String uuid = UUID.randomUUID(true).toString();
/**
* 分布式互斥锁
*/
@Override
public boolean tryLock(String name, Long time, TimeUnit unit) {
// value
String value = uuid + Thread.currentThread().getId();
// key
String key = DISTRIBUTED_LOCK + name;
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);
// 防止自动拆箱空指针
return BooleanUtil.isTrue(aBoolean);
}
/**
* 分布式释放锁
*/
@Override
public void unLock(String name) {
String key = DISTRIBUTED_LOCK + name;
String value = uuid + Thread.currentThread().getId();
// 获取互斥锁中的值
String redisValue = stringRedisTemplate.opsForValue().get(key);
if (StringUtils.equals(value,redisValue)){
stringRedisTemplate.delete(key);
}
}
}
Redisson入门
正常使用setnx实现分布式锁存在以下几种问题
- 不可重入锁:同一现成无法多次获取同一把锁
- 不可重试:获取锁只尝试一次就返回,无法重试
- 超时释放:业务执行耗时较长,会导致锁释放
- 主从一致性:集群的情况下主节点宕机后同步数据过程种,导致锁失效
Redisson 是一个 Java 高级 Redis 客户端,提供了基于 Redis 的分布式和可扩展的 Java 数据结构,如并发集合(Concurrent Collections)、同步器(Synchronizers)、分布式服务(Distributed Services)等。Redisson 构建于 Jedis 之上,旨在简化 Redis 的使用,尤其对于分布式环境中的应用程序而言,它提供了一种易于使用的 API 来处理 Redis 中的数据,并实现了多种分布式锁和其他高级功能。Redisson底层采用的是Netty 框架
案例:每个用户对一件商品只能下一单。
配置文件
redisson:
# redis key前缀
keyPrefix:
# 线程池数量
threads: 4
# Netty线程池数量
nettyThreads: 8
# 单节点配置
singleServerConfig:
# 客户端名称
clientName: ${ruoyi.name}
# 最小空闲连接数
connectionMinimumIdleSize: 8
# 连接池大小
connectionPoolSize: 32
# 连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
# 命令等待超时,单位:毫秒
timeout: 3000
# 发布和订阅连接池大小
subscriptionConnectionPoolSize: 50
/**
* Redisson 配置属性
*
*/
@Data
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {
/**
* redis缓存key前缀
*/
private String keyPrefix;
/**
* 线程池数量,默认值 = 当前处理核数量 * 2
*/
private int threads;
/**
* Netty线程池数量,默认值 = 当前处理核数量 * 2
*/
private int nettyThreads;
/**
* 单机服务配置
*/
private SingleServerConfig singleServerConfig;
/**
* 集群服务配置
*/
private ClusterServersConfig clusterServersConfig;
@Data
@NoArgsConstructor
public static class SingleServerConfig {
/**
* 客户端名称
*/
private String clientName;
/**
* 最小空闲连接数
*/
private int connectionMinimumIdleSize;
/**
* 连接池大小
*/
private int connectionPoolSize;
/**
* 连接空闲超时,单位:毫秒
*/
private int idleConnectionTimeout;
/**
* 命令等待超时,单位:毫秒
*/
private int timeout;
/**
* 发布和订阅连接池大小
*/
private int subscriptionConnectionPoolSize;
}
@Data
@NoArgsConstructor
public static class ClusterServersConfig {
/**
* 客户端名称
*/
private String clientName;
/**
* master最小空闲连接数
*/
private int masterConnectionMinimumIdleSize;
/**
* master连接池大小
*/
private int masterConnectionPoolSize;
/**
* slave最小空闲连接数
*/
private int slaveConnectionMinimumIdleSize;
/**
* slave连接池大小
*/
private int slaveConnectionPoolSize;
/**
* 连接空闲超时,单位:毫秒
*/
private int idleConnectionTimeout;
/**
* 命令等待超时,单位:毫秒
*/
private int timeout;
/**
* 发布和订阅连接池大小
*/
private int subscriptionConnectionPoolSize;
/**
* 读取模式
*/
private ReadMode readMode;
/**
* 订阅模式
*/
private SubscriptionMode subscriptionMode;
}
}
/**
* redis配置
*
*/
@Slf4j
@AutoConfiguration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {
@Autowired
private RedissonProperties redissonProperties;
@Autowired
private ObjectMapper objectMapper;
@Bean
public RedissonAutoConfigurationCustomizer redissonCustomizer() {
return config -> {
ObjectMapper om = objectMapper.copy();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的。序列化时将对象全类名一起保存下来
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
TypedJsonJacksonCodec jsonCodec = new TypedJsonJacksonCodec(Object.class, om);
// 组合序列化 key 使用 String 内容使用通用 json 格式
CompositeCodec codec = new CompositeCodec(StringCodec.INSTANCE, jsonCodec, jsonCodec);
config.setThreads(redissonProperties.getThreads())
.setNettyThreads(redissonProperties.getNettyThreads())
// 缓存 Lua 脚本 减少网络传输(redisson 大部分的功能都是基于 Lua 脚本实现)
.setUseScriptCache(true)
.setCodec(codec);
RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();
if (ObjectUtil.isNotNull(singleServerConfig)) {
// 使用单机模式
config.useSingleServer()
//设置redis key前缀
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
.setTimeout(singleServerConfig.getTimeout())
.setClientName(singleServerConfig.getClientName())
.setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout())
.setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize())
.setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize())
.setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());
}
// 集群配置方式 参考下方注释
RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();
if (ObjectUtil.isNotNull(clusterServersConfig)) {
config.useClusterServers()
//设置redis key前缀
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
.setTimeout(clusterServersConfig.getTimeout())
.setClientName(clusterServersConfig.getClientName())
.setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout())
.setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize())
.setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize())
.setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize())
.setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize())
.setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize())
.setReadMode(clusterServersConfig.getReadMode())
.setSubscriptionMode(clusterServersConfig.getSubscriptionMode());
}
log.info("初始化 redis 配置");
};
}
@RequiredArgsConstructor
@Service
public class BookOrderServiceImpl implements IBookOrderService {
private final BookOrderMapper baseMapper;
private final SysUserMapper sysUserMapper;
private final BooksMapper booksMapper;
private final BookOrderDetailMapper bookOrderDetailMapper;
/** 自定义分布式锁 */
private final DistributedLock distributedLock;
/** redission */
private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
/**
* 模拟库存扣减并发问题
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void inventory(String bookId,Long userId) {
// 一人一单校验
Long aLong = bookOrderDetailMapper.selectCount(
Wrappers.lambdaQuery(BookOrderDetail.class).eq(BookOrderDetail::getNumber, userId)
.eq(BookOrderDetail::getBookId,bookId)
);
if (aLong > 0){
throw new ServiceException("下单失败");
}
// 自定义获取锁
// boolean piker = distributedLock.tryLock("PIKER", 10L, TimeUnit.SECONDS);
// redisClient分布式锁
RLock lock = CLIENT.getLock("lock:order:");
// 默认失败不等待锁时间,锁过期时间30秒
boolean piker = lock.tryLock();
if (piker){
try {
// 订单业务
placingOrder(bookId, userId);
}finally {
// 自定义释放锁
// distributedLock.unLock("PIKER");
// redisson 释放锁
lock.unlock();
}
}
}
/**
* 业务操作
*/
private void placingOrder(String bookId, Long userId) {
// 1.减少库存
Books books = booksMapper.selectById(bookId);
books.setStockQuantity(books.getStockQuantity() - 1);
booksMapper.updateById(books);
// 2.增加订单
BookOrderDetail bookOrder = new BookOrderDetail();
bookOrder.setBookId(Long.parseLong(bookId));
bookOrder.setNumber(userId.intValue());
bookOrderDetailMapper.insert(bookOrder);
}
}
Redisson-分布式锁实现原理
1.可重入锁
方法1{
获取锁
调用方法2
}
方法2{
获取锁
}
以上这种情况下使用自定义的setnx方式就会造成死锁的情况,比较经典的重入锁。
Rdisson使用Lua脚本来实现可重入锁的。
2.重试机制,超时释放
重试机制:在设置互斥锁时有两个线程A,B。A线程先获取锁资源,之后B在获取锁就会一直失败,因为锁的互斥性,没有重试的机制。
超时释放:给锁设置一个过期时间,防止redis宕机情况下锁一直没有办法被释放导致死锁情况,或者因为业务原因导致缓存重建时间大于锁过期时间导致数据丢失
注意:redisson不同版本的代码不同,但是整体流程是大差不大的,下面是结合黑马程序猿老师结合总结的流程图。
如果自己设置失效时间的话,锁过期时间就不是-1因此就不会触发看门狗机制了。
获取锁:
释放锁:
有了以上的机制可以实现:我有三个线程 A,B 设置等待时间3秒,线程A先获取到锁,由于业务原因进行阻塞,此时线程2开始获取锁。线程A业务执行了4秒,那么首先线程2获取锁失败。如果线程A执行业务在3秒内完成,那么线程2可以成功获取锁。