场景
公司有块业务需要跑定时业务,来处理数据,后台采用 springBoot
开发的,springBoot
集成定时任务还是挺简单的,使用 Scheduled
配置好 cron
就可以使用了,有个业务是每个小时的45分钟执行一次,因为后台是集群部署的,前端用 Nginx
做的负责均衡,就牵扯到一个问题,同一时刻,后台的定时任务只能用一个触发,最开始采用的redis
做分布式锁,客户端是使用的Jedis
,实现的分布式锁,网上基于redis
实现分布式锁的文章太多了,这里就不再阐述了
基于Jedis
实现分布式锁的初始代码如下
package com.juyi.camera.cache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.util.Collections;
/**
* Created by IntelliJ IDEA.
* User: xuzhou
* Date: 2019/2/26
* Time: 15:34
*/
@Component
@Slf4j
public class RedisLock {
@Autowired
private RedisTemplate redisTemplate;
private static final Long RELEASE_SUCCESS = 1L;
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "EX";
private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
/**
* 该加锁方法仅针对单实例 Redis 可实现分布式加锁
* 对于 Redis 集群则无法使用
* <p>
* 支持重复,线程安全
*
* @param lockKey 加锁键
* @param clientId 加锁客户端唯一标识
* @param seconds 锁过期时间
* @return
*/
public Boolean tryLock(String lockKey, String clientId, long seconds) {
return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
String result = jedis.set(lockKey, clientId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, seconds);
redisConnection.close();
if (LOCK_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
/**
* 与 tryLock 相对应,用作释放锁
*
* @param lockKey
* @param clientId
* @return
*/
public Boolean releaseLock(String lockKey, String clientId) {
return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey), Collections.singletonList(clientId));
redisConnection.close();
if (RELEASE_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
}
代码里面的锁工具类,自己用lua
脚本实现了加锁的原子性,这里就不过多介绍了
具体的业务逻辑代码大体如下
@Scheduled(cron = "0 45 * ? * *")
public void deleteCloudStorage() {
//加上redis锁,避免集群重复执行
String CLOUD_STORAGE_EXPIRED_DELETE_LOACK_KEY = "storage_expired_delete_loack_";
String clientId = String.valueOf(IdWorker.nextId());
if (redisLock.tryLock(CLOUD_STORAGE_EXPIRED_DELETE_LOACK_KEY, clientId, 60)) {
//处理具体额业务逻辑
}
}
改造
后来在github闲逛的时候,发现java还有个很厉害的 redis
客户端,叫做 redisson
,整体介绍可以看这里 ,文档写的很全面[https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D] redisson
采用了基于NIO的Netty
框架 自带分布式锁
的实现,使用者不需要关心加锁和解锁的原子性问题
,可以写出更优雅代码。
关于分布式锁定具体介绍可以看这里 [https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8] 基于这个,大胆尝试改造了一下 代码大体如下,采用spring配置,托管bean
package com.juyi.camera.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import java.io.IOException;
/**
* Created by IntelliJ IDEA.
* User: xuzhou
* Date: 2020/5/14
* Time: 18:03
*/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redisson() throws IOException {
Config config = Config.fromYAML(new ClassPathResource("sentinelServersConfig.yml").getInputStream());
RedissonClient redisson = Redisson.create(config);
return redisson;
}
@Bean
public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) {
return new RedissonConnectionFactory(redisson);
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
setSerializer(template);//设置序列化工具
template.afterPropertiesSet();
return template;
}
private void setSerializer(StringRedisTemplate template) {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
}
}
配置文件如下
sentinelServersConfig:
# 连接空闲超时,单位:毫秒 默认10000
idleConnectionTimeout: 10000
pingTimeout: 1000
# 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000
connectTimeout: 10000
# 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000
timeout: 3000
# 命令失败重试次数
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
# 重新连接时间间隔,单位:毫秒
reconnectionTimeout: 3000
# 执行失败最大次数
failedAttempts: 3
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
# mymaster
masterName: mymaster
# loadBalancer 负载均衡算法类的选择
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
#从节点发布和订阅连接的最小空闲连接数
slaveSubscriptionConnectionMinimumIdleSize: 1
#从节点发布和订阅连接池大小 默认值50
slaveSubscriptionConnectionPoolSize: 50
# 从节点最小空闲连接数 默认值32
slaveConnectionMinimumIdleSize: 32
# 从节点连接池大小 默认64
slaveConnectionPoolSize: 64
# 主节点最小空闲连接数 默认32
masterConnectionMinimumIdleSize: 32
# 主节点连接池大小 默认64
masterConnectionPoolSize: 64
# 订阅操作的负载均衡模式
subscriptionMode: SLAVE
# 只在从服务器读取
readMode: SLAVE
# 烧饼地址
sentinelAddresses:
- "redis://192.168.188.129:7700"
- "redis://192.168.188.129:7800"
- "redis://192.168.188.129:7900"
# 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000
scanInterval: 1000
#这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2
threads: 0
#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2
nettyThreads: 0
# 编码方式 默认org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#传输模式
transportMode: NIO
# 分布式锁自动过期时间,防止死锁,默认30000
lockWatchdogTimeout: 30000
# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true
keepPubSubOrder: true
然后基于单元测试来跑下代码,验证下 redisson
的分布式锁,测试代码比较粗糙,基于多线程来模拟的集群定时任务
package com.juyi.camera;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Created by IntelliJ IDEA.
* User: xuzhou
* Date: 2020/5/14
* Time: 18:06
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = CameraApplication.class)
@Slf4j
public class RedissonTests {
@Resource
RedissonClient redissonClient;
@Resource
ExecutorService executorService;
@Test
public void testLock() {
try {
//使用多线程模拟集群定时任务
for (int i = 1; i <= 5; i++) {
int id = i;
executorService.execute(new Runnable() {
@Override
public void run() {
RLock rLock = redissonClient.getLock("testLock");
try {
log.info(id + " 申请锁");
//尝试加锁,最多等待5 秒,上锁以后10秒自动解锁
boolean lockRes = rLock.tryLock(5, 10, TimeUnit.SECONDS);
if (lockRes) {
try {
log.info(id + " 申请成功,处理业务逻辑");
Thread.sleep(10000);
} finally {
//是否锁定,并且当前线程是否获取了锁
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
rLock.unlock();
log.info(id + " 释放锁");
}
}
} else {
log.info(id + " 申请锁失败=" + lockRes);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(500000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最终运行结果如下
可以看到,同一时刻,只有一个线程拿到了锁,避免了竞争,运行效果也能达到预期,
后记
redisson
的功能还是很多的,基于redisson
对于业务的改造也结束了,接下来要整体替换掉Jedis
redisson
它本身自带的分布式锁只是其中一项很小的功能,工作中,常思考,多尝试,才能发现不一样的风景。