• 一、是什么?

    1、锁的应用场景:

    在单体应用中,我们会使用ReentrantLock或Synchronized来应对并发场景。比如最常见的卖票场景,假如总共有100张票,线程A和线程B同时操作,如下图:

    这时有一个共享变量100,线程A和B将100拷贝到自己的工作内存中,当线程A抢到执行权的时候,此时A工作内存中的值是100,然后售票,进行自减操作,将自己工作内存中的值变成了99。当A还没来得及将99刷回到主内存的时候,线程B进来了,此时B拿到的主内存的值还是100,然后售票,进行自减,也是99。这就出现了同一张票出售了两次的情况。所以我们会加锁加volatile保证原子性保证可见性。

    2、分布式锁是什么?

    上面的场景中,我们可以通过ReentrantLock或者Synchronized搞定,因为你的项目只运行在一台服务器上,只有一个JVM,所有的共享变量都加载到同一个主内存中。而分布式应用中,一个项目部署在多台服务器上,最基本的架构如下图:

    比如现在server1、server2和server3读取到数据库的票数都是100,在每一个server中,我们可以用JDK的锁来保证多个用户同时访问我这台server时不会出问题。但问题是,如果client1访问到的是server1,票数是100,然后购票,还没来得及将数据库票数改为99,client2也开始访问系统购票了,client2如果访问的是server1,自然不会出问题,如果访问的是server2,这时server2读取到数据库的票数还是100,那么就出问题了,又出现了同一张票卖了两次的情况。在分布式应用中,JDK的锁机制就无法满足需求了,所以就出现了分布式锁。

    3、分布式锁应该满足的条件:

    4、分布式锁的实现方式:

    二、基于数据库实现

    1、建表:

    CREATE TABLE `tb_distributed_lock` (
     `dl_id` INT NOT NULL auto_increment COMMENT '主键,自增',
     `dl_method_name` VARCHAR (64) NOT NULL DEFAULT '' COMMENT '方法名',
     `dl_device_info` VARCHAR (100) NOT NULL DEFAULT '' COMMENT 'ip+线程id',
     `dl_operate_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据被操作的时间',
     PRIMARY KEY (`dl_id`),
     UNIQUE KEY `uq_method_name` (`dl_method_name`) USING BTREE
    ) ENGINE = INNODB DEFAULT charset = utf8 COMMENT = '分布式锁表';

    2、思路:

    当执行一个方法的时候,我们首先尝试往表中插入一条数据。如果插入成功,则占锁成功,继续往下执行,执行完删除该记录。如果插入失败,我们再以当前方法名、当前机器ip+线程id、数据被操作时间为5分钟内(5分钟表示锁失效的时间)为条件去查询,如果有记录,表示该机器的该线程在5分钟内占有过锁了,直接往下执行最后删除记录;如果没有记录,占有锁失败。一个用户就是一个线程,所以我们可以把机器ip和用户id组合一起当成dl_device_info

    3、占有锁和释放锁:

    INSERT INTO tb_distributed_lock (
     dl_method_name,
     dl_device_info
    )
    VALUES
     ('方法名''ip&用户id');

    如果insert失败,则:

    SELECT
     count(*)
    FROM
     tb_distributed_lock
    WHERE
     dl_method_name = '方法名'
    AND dl_device_info = 'ip&用户id'
    AND dl_operate_time < SYSDATE() - 5;
    DELETE
    FROM
     tb_distributed_lock
    WHERE
     dl_method_name = '方法名'
    AND dl_device_info = 'ip&用户id';

    4、小总结:

    以上表结构可能并不是很好,只是提供了这么一个思路。下面说它的优缺点:

    三、基于redis实现

    1、原理:

    基于redis的set key value nx ex 30,这条语句的意思就是如果key不存在就设置,并且过期时间为30s,如果key已经存在就会返回false。如果要以毫秒为单位,把ex换成px就好了。我们执行方法前,先将方法名当成key,执行这条语句,如果执行成功就是获取锁成功,执行失败就是获取锁失败。

    2、代码实现:

    /**
    * key不存在时就设置,返回true,key已存在就返回false
    * @param key
    * @param value
    * @param timeout
    * @return
    */
    public static boolean setIfAbsent(String key, String value, Long timeout) {
     return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
    }
    /**
    * 获取key-value
    * @param key
    * @return
    */
    public static String getString(String key) {
     return (String) redisTemplate.opsForValue().get(key);
    }
    /**
    * 删除key
    * @param key
    * @return
    */
    public static boolean delKey(String key) {
     return redisTemplate.delete(key);
    }
    public String hello() {
     // 方法名当作key
     String key = "hello";
     String value = "hellolock";
     if (RedisUtil.setIfAbsent(key, value, 60 * 2L)) {
      System.out.println("成功获取到锁,开始执行业务逻辑……");
      // 假如执行业务逻辑需要1分钟
      try {TimeUnit.MINUTES.sleep(1L); } catch (Exception e) { e.printStackTrace();};
      // 释放锁先校验value,避免释放错
      if (value.equals(RedisUtil.getString(key))) {
       RedisUtil.delKey(key);
       System.out.println("执行完业务逻辑,释放锁成功");
      }
      return "success";
     } else {
      System.out.println("锁被别的线程占有,获取锁失败");
      return "acquire lock failed";
     }
    }

    3、小总结:

    四、基于Redisson实现

    1、是什么?

    官网地址:https://github.com/redisson/redisson/wiki/Table-of-Content Redisson是一个功能十分强大的redis客户端,封装了很多分布式操作,比如分布式对象、分布式集合、分布式锁等。它的分布式锁也很多,什么公平锁、可重入锁、redlock等一应俱全,下面来看看如何在springboot项目中使用它。

    2、使用redisson做分布式锁:

    <!-- redisson-springboot-starter -->
    <dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson-spring-boot-starter</artifactId>
     <version>3.12.3</version>
    </dependency>
    <!-- io.netty/netty-all  -->
    <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
    </dependency>
    spring:
      application:
        name: distributed-lock
      redis:
        # redis单机版的写法
        host: 192.168.2.43
        port: 6379
        # 集群的写法
        #cluster:
          #nodes:
          #- 192.168.0.106,192.168.0.107
        #哨兵的写法
        #sentinel:
          #master: 192.168.0.106
          #nodes:
          #- 192.168.0.107,192.168.0.108
    @Autowired
    private RedissonClient redisson;

    /**
     * 未设置过期时间,没获取到就会一直阻塞着
     * @return
     */
    @GetMapping("/testLock")
    public String testLock() {
     log.info("进入testLock方法,开始获取锁");
     String key = "testLock";
     RLock lock = redisson.getLock(key);
     lock.lock();
     log.info("获取锁成功,开始执行业务逻辑……");
     try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
     log.info("执行完业务逻辑,释放锁");
     lock.unlock();
     return "success";
    }
     
    /**
     * 尝试获取锁,没获取到就直接失败,不会阻塞
     * @return
     */
    @GetMapping("/testTryLock")
    public String testTryLock() {
     log.info("进入testTryLock方法,开始获取锁");
     String key = "testTryLock";
     RLock lock = redisson.getLock(key);
     boolean res = lock.tryLock();
     if (!res) {
      log.error("尝试获取锁失败");
      return "fail";
     } else {
      log.info("获取锁成功,开始执行业务逻辑……");
      try {TimeUnit.SECONDS.sleep(30L); } catch (Exception e) { e.printStackTrace();};
      log.info("执行完业务逻辑,释放锁");
      lock.unlock();
      return "success";
     }
    }
     
    /**
     * 锁设置了过期时间,即使最后面的unlock失败,20秒后也会自动释放锁
     * @return
     */
    @GetMapping("/testLockTimeout")
    public String testLockTimeout() {
     log.info("进入testLockTimeout方法,开始获取锁");
     String key = "testLockTimeout";
     RLock lock = redisson.getLock(key);
     // 20秒后自动释放锁
     lock.lock(20, TimeUnit.SECONDS);
     log.info("获取锁成功,开始执行业务逻辑……");
     try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
     lock.unlock();
     return "success";
    }
     
    /**
     * 尝试获取锁,15秒还没获取到就获取锁失败;获取到了会持有20秒,20秒后自动释放锁
     * @return
     */
    @GetMapping("/testTryLockTimeout")
    public String testTryLockTimeout() {
     log.info("进入testTryLockTimeout方法,开始获取锁");
     String key = "testTryLockTimeout";
     RLock lock = redisson.getLock(key);
     boolean res = false;
     try {
      res = lock.tryLock(15, 20, TimeUnit.SECONDS);
     } catch (InterruptedException e1) {
      e1.printStackTrace();
     }
     if (!res) {
      log.error("尝试获取锁失败");
      return "fail";
     } else {
      log.info("获取锁成功,开始执行业务逻辑……");
      try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
      log.info("执行完业务逻辑,释放锁");
      lock.unlock();
      return "success";
     }
    }

    3、小总结:

    以上就是使用redisson做分布式锁的简单demo,用起来十分的方便。上面是与springboot项目集成,直接用它提供的springboot的starter就好了。用它来做分布式锁的更多用法请移步至官网:redisson分布式锁。

    五、基于zookeeper实现

    1、zookeeper知识点回顾:

    zookeeper有四种类型的节点:

    2、基于zookeeper实现分布式锁的原理:

    我们正是利用了zookeeper的临时顺序节点来实现分布式锁。首先我们创建一个名为lock(节点名称随意)的持久节点。线程1获取锁时,就在lock下面创建一个名为lock1的临时顺序节点,然后查找lock下所有的节点,判断自己的lock1是不是第一个,如果是,获取锁成功,继续执行业务逻辑,执行完后删除lock1节点;如果不是第一个,获取锁失败,就watch排在自己前面一位的节点,当排在自己前一位的节点被干掉时,再检查自己是不是排第一了,如果是,获取锁成功。图解过程如下:

    线程1创建了一个lock1,发现lock1的第一个节点,占锁成功;在线程1还没释放锁的时候,线程2来了,创建了一个lock2,发现lock2不是第一个,便监控lock1,线程3此时进行就监控lock2。直到自己是第一个节点时才占锁成功。假如某个线程释放锁的时候zookeeper崩了也没关系,因为是临时节点,断开连接节点就没了,其他线程还是可以正常获取锁,这就是要用临时节点的原因。

    说清楚了原理,用代码实现也就不难了,可以引入zookeeper的客户端zkClient,自己写代码实现(偷个懒,自己就不写了,有兴趣的可以参考我zookeeper的文章,肯定可以自己写出来的)。不过有非常优秀的开源解决方案比如curator,下面就看看curator怎么用。

    六、基于curator实现

    1、springboot整合curator:

    <!-- curator start-->
    <dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.4.14</version>
    </dependency>
    <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>4.2.0</version>
    </dependency>
    <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
     <version>4.2.0</version>
    </dependency>
    <dependency>
     <groupId>org.seleniumhq.selenium</groupId>
     <artifactId>selenium-java</artifactId>
    </dependency>
    <!-- curator end-->
    curator:
      retryCount: 5 # 连接失败的重试次数
      retryTimeInterval: 5000 # 每隔5秒重试一次
      url: 192.168.2.43:2181 # zookeeper连接地址
      sessionTimeout: 60000 # session超时时间1分钟
      connectionTimeout: 5000 # 连接超时时间5秒钟
    @Configuration
    public class CutatorConfig {

     @Value("${curator.retryCount}")
     private Integer retryCount;

     @Value("${curator.retryTimeInterval}")
     private Integer retryTimeInterval;

     @Value("${curator.url}")
     private String url;

     @Value("${curator.sessionTimeout}")
     private Integer sessionTimeout;

     @Value("${curator.connectionTimeout}")
     private Integer connectionTimeout;

     @Bean
     public CuratorFramework curatorFramework() {
      return CuratorFrameworkFactory.newClient(url, sessionTimeout, connectionTimeout,
        new RetryNTimes(retryCount, retryTimeInterval));
     }
    }
    @SpringBootTest(classes = {DistributedLockApplication.class})
    @RunWith(SpringRunner.class)
    public class DistributedLockApplicationTests {
     
     @Autowired
     private CuratorFramework curatorFramework;

     @Test
     public void contextLoads() {
      curatorFramework.start();
      try {
       curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/zhusl""test".getBytes());
      } catch (Exception e) {
       e.printStackTrace();
      }
     }
    }

    在确保zookeeper成功启动了的情况下,执行这个单元测试,最后回到linux中,用zkCli.sh连接,查看是否成功创建节点。

    2、使用Curator做分布式锁:

    Curator封装了很多锁,比如可重入共享锁、不可重入共享锁、可重入读写锁、联锁等。具体可以参考官网:curator分布式锁的用法。

    @Component
    @Slf4j
    public class ZookeeperUtil {

     private static CuratorFramework curatorFramework;
     
     private static InterProcessLock lock;

     /** 持久节点 */
     private final static String ROOT_PATH = "/lock/";
     
     /** 可重入共享锁 */
     private static InterProcessMutex interProcessMutex;
     /** 不可重入共享锁 */
     private static InterProcessSemaphoreMutex interProcessSemaphoreMutex;
     /** 可重入读写锁 */
     private static InterProcessReadWriteLock interProcessReadWriteLock;
     /** 多共享锁(将多把锁当成一把来用) */
     private static InterProcessMultiLock interProcessMultiLock;

     @Autowired
     private void setCuratorFramework(CuratorFramework curatorFramework) {
      ZookeeperUtil.curatorFramework = curatorFramework;
      ZookeeperUtil.curatorFramework.start();
     }

     /**
      * 获取可重入排他锁
      * 
      * @param lockName
      * @return
      */
     public static boolean interProcessMutex(String lockName) {
      interProcessMutex = new InterProcessMutex(curatorFramework, ROOT_PATH + lockName);
      lock = interProcessMutex;
      return acquireLock(lockName, lock);
     }

     /**
      * 获取不可重入排他锁
      * 
      * @param lockName
      * @return
      */
     public static boolean interProcessSemaphoreMutex(String lockName) {
      interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(curatorFramework, ROOT_PATH + lockName);
      lock = interProcessSemaphoreMutex;
      return acquireLock(lockName, lock);
     }

     /**
      * 获取可重入读锁
      * 
      * @param lockName
      * @return
      */
     public static boolean interProcessReadLock(String lockName) {
      interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, ROOT_PATH + lockName);
      lock = interProcessReadWriteLock.readLock();
      return acquireLock(lockName, lock);
     }

     /**
      * 获取可重入写锁
      * 
      * @param lockName
      * @return
      */
     public static boolean interProcessWriteLock(String lockName) {
      interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, ROOT_PATH + lockName);
      lock = interProcessReadWriteLock.writeLock();
      return acquireLock(lockName, lock);
     }

     /**
      * 获取联锁(多把锁当成一把来用)
      * @param lockNames
      * @return
      */
     public static boolean interProcessMultiLock(List<String> lockNames) {
      if (lockNames == null || lockNames.isEmpty()) {
       log.error("no lockNames found");
       return false;
      }
      interProcessMultiLock = new InterProcessMultiLock(curatorFramework, lockNames);
      try {
       if (!interProcessMultiLock.acquire(10, TimeUnit.SECONDS)) {
        log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock fail");
        return false;
       } else {
        log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock success");
        return true;
       }
      } catch (Exception e) {
       log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
       return false;
      }
     }

     /**
      * 释放锁
      * 
      * @param lockName
      */
     public static void releaseLock(String lockName) {
      try {
       if (lock != null && lock.isAcquiredInThisProcess()) {
        lock.release();
        curatorFramework.delete().inBackground().forPath(ROOT_PATH + lockName);
        log.info("Thread:" + Thread.currentThread().getId() + " release lock success");
       }
      } catch (Exception e) {
       log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
      }
     }
     
     /**
      * 释放联锁
      */
     public static void releaseMultiLock(List<String> lockNames) {
      try {
       if (lockNames == null || lockNames.isEmpty()) {
        log.error("no no lockNames found to release");
        return;
       }
       if (interProcessMultiLock != null && interProcessMultiLock.isAcquiredInThisProcess()) {
        interProcessMultiLock.release();
        for (String lockName : lockNames) {
         curatorFramework.delete().inBackground().forPath(ROOT_PATH + lockName);
        }
        log.info("Thread:" + Thread.currentThread().getId() + " release lock success");
       }
      } catch (Exception e) {
       log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
      }
     }
     

     /**
      * 获取锁
      * 
      * @param lockName
      * @param interProcessLock
      * @return
      */
     private static boolean acquireLock(String lockName, InterProcessLock interProcessLock) {
      int flag = 0;
      try {
       while (!interProcessLock.acquire(2, TimeUnit.SECONDS)) {
        flag++;
        if (flag > 1) {
         break;
        }
       }
      } catch (Exception e) {
       log.error("acquire lock occured an exception = " + e);
       return false;
      }
      if (flag > 1) {
       log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock fail");
       return false;
      } else {
       log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock success");
       return true;
      }
     }
    }
    @RestController
    @RequestMapping("/zookeeper-lock")
    public class ZookeeperLockController {
     
     @GetMapping("/testLock")
     public String testLock() {
      // 获取锁
      boolean lockResult = ZookeeperUtil.interProcessMutex("testLock");
      if (lockResult) {
       try {
        // 模拟执行业务逻辑
        TimeUnit.MINUTES.sleep(1L);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
       // 释放锁
       ZookeeperUtil.releaseLock("testLock");
       return "success";
      } else {
       return "fail";
      }
     }
    }

    打开一个浏览器窗口访问,后台打印出获取锁成功的日志,在1分钟之内,开启另一个窗口再次访问,打印出获取锁失败的日志,说明分布式锁生效了。

    七、实现分布式锁的各方案比较

    本文项目地址:分布式锁

    本文分享自微信公众号 - java开发那些事(javawebkf)。
    如有侵权,请联系 [email protected] 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    09-01 18:39