场景

公司有块业务需要跑定时业务,来处理数据,后台采用 springBoot 开发的,springBoot 集成定时任务还是挺简单的,使用 Scheduled 配置好 cron 就可以使用了,有个业务是每个小时的45分钟执行一次,因为后台是集群部署的,前端用 Nginx 做的负责均衡,就牵扯到一个问题,同一时刻,后台的定时任务只能用一个触发,最开始采用的redis 做分布式锁,客户端是使用的Jedis,实现的分布式锁,网上基于redis 实现分布式锁的文章太多了,这里就不再阐述了

基于Jedis 实现分布式锁的初始代码如下

package com.juyi.camera.cache;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

import java.util.Collections;

/**
 * Created by IntelliJ IDEA.
 * User: xuzhou
 * Date: 2019/2/26
 * Time: 15:34
 */
@Component
@Slf4j
public class RedisLock {
    @Autowired
    private RedisTemplate redisTemplate;
    private static final Long RELEASE_SUCCESS = 1L;
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "EX";
    private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";


    /**
     * 该加锁方法仅针对单实例 Redis 可实现分布式加锁
     * 对于 Redis 集群则无法使用
     * <p>
     * 支持重复,线程安全
     *
     * @param lockKey  加锁键
     * @param clientId 加锁客户端唯一标识
     * @param seconds  锁过期时间
     * @return
     */
    public Boolean tryLock(String lockKey, String clientId, long seconds) {
        return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
            Jedis jedis = (Jedis) redisConnection.getNativeConnection();
            String result = jedis.set(lockKey, clientId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, seconds);
            redisConnection.close();
            if (LOCK_SUCCESS.equals(result)) {
                return Boolean.TRUE;
            }
            return Boolean.FALSE;
        });
    }

    /**
     * 与 tryLock 相对应,用作释放锁
     *
     * @param lockKey
     * @param clientId
     * @return
     */
    public Boolean releaseLock(String lockKey, String clientId) {
        return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
            Jedis jedis = (Jedis) redisConnection.getNativeConnection();
            Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey), Collections.singletonList(clientId));
            redisConnection.close();
            if (RELEASE_SUCCESS.equals(result)) {
                return Boolean.TRUE;
            }
            return Boolean.FALSE;
        });
    }

}

代码里面的锁工具类,自己用lua 脚本实现了加锁的原子性,这里就不过多介绍了

具体的业务逻辑代码大体如下

 @Scheduled(cron = "0 45 * ? * *")
    public void deleteCloudStorage() {
        //加上redis锁,避免集群重复执行
        String CLOUD_STORAGE_EXPIRED_DELETE_LOACK_KEY = "storage_expired_delete_loack_";
        String clientId = String.valueOf(IdWorker.nextId());
        if (redisLock.tryLock(CLOUD_STORAGE_EXPIRED_DELETE_LOACK_KEY, clientId, 60)) {
          //处理具体额业务逻辑
	}
}

改造

后来在github闲逛的时候,发现java还有个很厉害的 redis 客户端,叫做 redisson ,整体介绍可以看这里 ,文档写的很全面[https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D] redisson 采用了基于NIO的Netty 框架 自带分布式锁 的实现,使用者不需要关心加锁和解锁的原子性问题,可以写出更优雅代码。

关于分布式锁定具体介绍可以看这里 [https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8] 基于这个,大胆尝试改造了一下 代码大体如下,采用spring配置,托管bean

package com.juyi.camera.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

import java.io.IOException;

/**
 * Created by IntelliJ IDEA.
 * User: xuzhou
 * Date: 2020/5/14
 * Time: 18:03
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redisson() throws IOException {
        Config config = Config.fromYAML(new ClassPathResource("sentinelServersConfig.yml").getInputStream());
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }

    @Bean
    public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) {
        return new RedissonConnectionFactory(redisson);
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        setSerializer(template);//设置序列化工具
        template.afterPropertiesSet();
        return template;
    }

    private void setSerializer(StringRedisTemplate template) {
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
    }
}

配置文件如下

sentinelServersConfig:
  # 连接空闲超时,单位:毫秒 默认10000
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  # 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000
  connectTimeout: 10000
  # 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000
  timeout: 3000
  # 命令失败重试次数
  retryAttempts: 3
  # 命令重试发送时间间隔,单位:毫秒
  retryInterval: 1500
  # 重新连接时间间隔,单位:毫秒
  reconnectionTimeout: 3000
  # 执行失败最大次数
  failedAttempts: 3
  # 单个连接最大订阅数量
  subscriptionsPerConnection: 5
  # mymaster
  masterName: mymaster
  # loadBalancer 负载均衡算法类的选择
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  #从节点发布和订阅连接的最小空闲连接数
  slaveSubscriptionConnectionMinimumIdleSize: 1
  #从节点发布和订阅连接池大小 默认值50
  slaveSubscriptionConnectionPoolSize: 50
  # 从节点最小空闲连接数 默认值32
  slaveConnectionMinimumIdleSize: 32
  # 从节点连接池大小 默认64
  slaveConnectionPoolSize: 64
  # 主节点最小空闲连接数 默认32
  masterConnectionMinimumIdleSize: 32
  # 主节点连接池大小 默认64
  masterConnectionPoolSize: 64
  # 订阅操作的负载均衡模式
  subscriptionMode: SLAVE
  # 只在从服务器读取
  readMode: SLAVE
  # 烧饼地址
  sentinelAddresses:
    - "redis://192.168.188.129:7700"
    - "redis://192.168.188.129:7800"
    - "redis://192.168.188.129:7900"
  # 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000
  scanInterval: 1000
  #这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2
threads: 0
#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2
nettyThreads: 0
# 编码方式 默认org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#传输模式
transportMode: NIO
# 分布式锁自动过期时间,防止死锁,默认30000
lockWatchdogTimeout: 30000
# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true
keepPubSubOrder: true

然后基于单元测试来跑下代码,验证下 redisson 的分布式锁,测试代码比较粗糙,基于多线程来模拟的集群定时任务

package com.juyi.camera;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created by IntelliJ IDEA.
 * User: xuzhou
 * Date: 2020/5/14
 * Time: 18:06
 */
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = CameraApplication.class)
@Slf4j
public class RedissonTests {
    @Resource
    RedissonClient redissonClient;
    @Resource
    ExecutorService executorService;

    @Test
    public void testLock() {
        try {
            //使用多线程模拟集群定时任务
            for (int i = 1; i <= 5; i++) {
                int id = i;
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        RLock rLock = redissonClient.getLock("testLock");
                        try {
                            log.info(id + " 申请锁");
                            //尝试加锁,最多等待5 秒,上锁以后10秒自动解锁
                            boolean lockRes = rLock.tryLock(5, 10, TimeUnit.SECONDS);
                            if (lockRes) {
                                try {
                                    log.info(id + " 申请成功,处理业务逻辑");
                                    Thread.sleep(10000);
                                } finally {
                                    //是否锁定,并且当前线程是否获取了锁
                                    if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                                        rLock.unlock();
                                        log.info(id + " 释放锁");
                                    }
                                }
                            } else {
                                log.info(id + " 申请锁失败=" + lockRes);
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            Thread.sleep(500000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

最终运行结果如下

springBoot集成redisson实现分布式锁-LMLPHP

可以看到,同一时刻,只有一个线程拿到了锁,避免了竞争,运行效果也能达到预期,

后记

redisson 的功能还是很多的,基于redisson 对于业务的改造也结束了,接下来要整体替换掉Jedis redisson 它本身自带的分布式锁只是其中一项很小的功能,工作中,常思考,多尝试,才能发现不一样的风景。

03-30 22:00