1 文章概述-LMLPHP

1 文章概述

在互联网场景中缓存系统是一个重要系统,为了防止流量频繁访问数据库,一般会在数据库层前设置一道缓存层作为保护。

缓存是一个广义概念,核心要义是将数据存放在离用户更近的地方,或者是将数据存放在访问更快的介质。

缓存对应到实际应用中可以分为内存缓存、远程缓存。内存缓存常见工具例如Guava、Ecache等,远程缓存常见系统例如Redis,memcache等。本文以远程缓存Redis为例进行讲解。

缓存穿透和击穿是高并发场景下必须面对的问题,这些问题会导致访问请求绕过缓存直接打到数据库,可能会造成数据库挂掉或者系统雪崩,下面本文根据下图提纲来分析这些问题的原理和解决方案。

1 文章概述-LMLPHP

2 缓存穿透与击穿区分

缓存穿透和击穿从最终结果上来说都是流量绕过缓存打到了数据库,可能会导致数据库挂掉或者系统雪崩,但是仔细区分还是有一些不同,我们分析一张业务读取缓存一般流程图。

1 文章概述-LMLPHP

我们用文字简要描述这张图:

假设业务方要查询A数据,缓存穿透是指数据库根本不存在A数据,所以根本没有数据可以写入缓存,导致缓存层失去意义,大量请求会频繁访问数据库。

缓存击穿是指请求在查询数据库前,首先查缓存看看是否存在,这是没有问题的。但是并发量太大,导致第一个请求还没有来得及将数据写入缓存,后续大量请求已经开始访问缓存,这是数据在缓存中还是不存在的,所以瞬时大量请求会打到数据库。

3 CAS实例与源码分析

现在我们把缓存问题放一放,一起来分析CAS这个概念的实例源码,后面我们编写缓存工具需要借鉴这个思想。

3.1 一道面试题

我们来看一道常见面试题,相信这个面试题大家并不会陌生:分析下面这段代码输出的值是多少:

class Data {
    volatile int num = 0;
    public void increase() {
        num++;
    }
}
public class VolatileTest {
    public static void main(String[] args) {
        Data data = new Data();

        // 100个线程操作num累加
        for (int i = 1; i <= 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000L);
                        data.increase();
                    } catch (Exception ex) {
                        System.out.println(ex.getMessage());
                    }
                }
            }).start();
        }

        // 等待上述线程执行完 -数值2表示只有主线程和GC线程在运行
        while (Thread.activeCount() 2) {
            // 主线程让出CPU时间片
            Thread.yield();
        }
        System.out.println(data.num);
    }
}

运行结果num值一般小于100,这是因为num++不是原子性,我们编写一段简单代码进行证明。

public class VolatileTest2 {
    volatile int num = 0;
    public void increase() {
        num++;
    }
}

执行下列命令获取字节码:

javac VolatileTest2.java
javap -c VolatileTest2.class

字节码文件如下所示:

$ javap -c VolatileTest2.class
Compiled from "VolatileTest2.java"
public class com.java.front.test.VolatileTest2 {
  volatile int num;
  public com.java.front.test.VolatileTest2();
    Code:
       0: aload_0
       1: invokespecial 1                  // Method java/lang/Object."<init>":()V
       4: aload_0
       5: iconst_0
       6: putfield      2                  // Field num:I
       9: return
  public void increase();
    Code:
       0: aload_0
       1: dup
       2: getfield      2                  // Field num:I
       5: iconst_1
       6: iadd
       7: putfield      2                  // Field num:I
      10: return
}

我们观察num++代码片段,发现其实分为三个步骤:

(1) getfield
(2) iadd
(3) putfield 

getfield读取num值,iadd运算num+1,最后putfield将新值赋值给num。这就不难理解为什么num最终会小于100:因为线程A在执行到第二步后执行第三步前,还没来得及将新值赋给num,数据就被线程B取走了,这时还是没有加1的旧值。

3.2 CAS实例分析

那么怎么解决上述问题呢?常见方案有两种:加锁方案和无锁方案。

加锁方案是对increase加上同步关键字,这样就可以保证同一时刻只有一个线程操作,这不是我们这篇文章重点,不详细展开了。

无锁方案可以采用JUC提供的AtomicInteger进行运算,我们看一下改进后的代码。

import java.util.concurrent.atomic.AtomicInteger;
class Data {
    volatile AtomicInteger num = new AtomicInteger(0);
    public void increase() {
        num.incrementAndGet();
    }
}
public class VolatileTest {
    public static void main(String[] args) {
        Data data = new Data();
        for (int i = 1; i <= 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000L);
                        data.increase();
                    } catch (Exception ex) {
                        System.out.println(ex.getMessage());
                    }
                }
            }).start();
        }
        while (Thread.activeCount() 2) {
            Thread.yield();
        }
        System.out.println(data.num);
    }
}

这样改写之后结果正如我们预期等于100,我们并没有加锁,那么为什么改用AtomicInteger就可以达到预期效果呢?

3.3 CAS源码分析

本章节我们以incrementAndGet方法作为入口,进行CAS源码分析。

class Data {
    volatile AtomicInteger num = new AtomicInteger(0);
    public void increase() {
        num.incrementAndGet();
    }
}

进入incrementAndGet方法:

import sun.misc.Unsafe;
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
}

我们看到一个名为Unsafe的类。这个类并不常见,到底有什么用呢?Unsafe是位于sun.misc包下的一个类,具有操作底层资源的能力。例如可以直接访问操作系统,操作特定内存数据,提供许多CPU原语级别的API。

我们继续分析源码跟进getAndAddInt方法:

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

我们对参数进行说明:o表示待修改的对象,offset表示待修改字段在内存中的偏移量,delta表示本次修改的增量。

整个方法核心是一段do-while循环代码,其中方法getIntVolatile比较好理解,就是获取对象o偏移量为offset的某个字段值。

我们重点分析while中compareAndSwapInt方法:

public final native boolean compareAndSwapInt(
    Object o,
    long offset,
    int expected,
    int x);

其中o和offset含义不变,expected表示期望值,x表更新值,这就引出了CAS核心操作三个值:内存位置值、预期原值及新值。

执行CAS操作时,内存位置值会与预期原值比较。如果相匹配处理器会自动将该位置值更新为新值,否则处理器不做任何操作。

Unsafe提供的CAS方法是一条CPU的原子指令,底层实现即为CPU指令cmpxchg,不会造成数据不一致。

我们再回过头分析这段代码:

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

代码执行流程如下:

(1) 线程A执行累加,执行到getAndAddInt方法,首先根据内存地址获取o对象offset偏移量的字段值v1
(2) while循环中compareAndSwapInt执行,这个方法将再次获取o对象offset偏移量的字段值v2,此时判断v1和v2是否相等,如果相等则自动将该位置值更新为v1加上增量后的值,跳出循环
(3) 如果执行compareAndSwapInt时字段值已经被线程B改掉,则该方法会返回false,所以无法跳出循环,继续执行直至成功,这就是自旋设计思想

通过上述分析我们知道,Unsafe类和自旋设计思想是CAS实现核心,其中自旋设计思想会在我们缓存工具中体现。

4 分布式锁实例分析

在相同JVM进程中为了保证同一段代码块在同一时刻只能被一个线程访问,JAVA提供了锁机制,例如我们可以使用synchroinzed、ReentrantLock进行并发控制。

如果在多个服务器的集群环境,每个服务器运行着一个JVM进程。如果希望对多个JVM进行并发控制,此时JVM锁就不适用了。这时就需要引入分布式锁。顾名思义分布式锁是对分布式场景下,多个JVM进程进行并发控制。

分布式锁在实现时小心踩坑:例如没有设置超时时间,如果获取到锁的节点由于某种原因挂掉没有释放锁,导致其它节点永远拿不到锁。

分布式锁有多种实现方式,可以自己通过Redis或者Zookeeper进行实现,也可以直接使用Redisson框架。本章节给出Redis分布式锁Lua脚本实现。

public class RedisLockManager {
    private static final String DEFAULT_VALUE = "lock";
    private static final String LOCK_SCRIPT =
        "\nlocal r = tonumber(redis.call('SETNX', KEYS[1], ARGV[1]));"
        + "\nif r == 1 then"
        + "\nredis.call('PEXPIRE',KEYS[1],ARGV[2]);"
        + "\nend"
        + "\nreturn r";
    private static final String UNLOCK_SCRIPT =
        "\nlocal v = redis.call('GET', KEYS[1]);"
        + "\nlocal r = 0;"
        + "\nif v == ARGV[1] then"
        + "\nr = redis.call('DEL',KEYS[1]);"
        + "\nend"
        + "\nreturn r";
    @Resource
    private RedisClient redisClient;
    public boolean tryLock(String key, int seconds) {
        try {
            String lockValue = executeLuaScript(key, lockSeconds);
            if (lockValue != null) {
                return true;
            }
            return false;
        } catch (Exception ex) {
            LOGGER.error("key={},lockSeconds={}", key, lockSeconds, ex);
            return false;
        }
    }
    public boolean unLock(String key) {
        try {
            Long r = (Long) redisClient.eval(UNLOCK_SCRIPT, 1, key, DEFAULT_VALUE);
            if (new Long(1).equals(r)) {
                return true;
            }
        } catch (Exception ex) {
            LOGGER.info("key={}", key, ex);
        }
        return false;
    }
    private String executeLuaScript(String key, int lockSeconds) {
        try {
            Long returnValue = (Long) redisClient.eval(LOCK_SCRIPT, 1, key, DEFAULT_VALUE, String.valueOf(lockSeconds));
            if (new Long(1).equals(returnValue)) {
                return DEFAULT_VALUE;
            }
        } catch (Exception ex) {
            LOGGER.error("key={},lockSeconds={}", key, lockSeconds, ex);
        }
        return null;
    }
}

5 缓存工具实例分析

上述章节分析了CAS原理和分布式锁实现,现在我们要将上述知识结合起来,实现一个可以解决缓存击穿问题的缓存工具。

缓存工具核心思想是如果发现缓存中无数据,利用分布式锁使得同一时刻只有一个JVM进程可以访问数据库,并将数据写入缓存。

那么没有抢到分布式锁的进程怎么办呢?我们提供以下三种选择:

缓存工具代码如下:

/**
 * 业务回调
 *
 * @author 今日头条号「JAVA前线」
 *
 */
public interface RedisBizCall {
    /**
     * 业务回调方法
     *
     * @return 序列化后数据值
     */
    String call();
}

/**
 * 安全缓存管理器
 *
 * @author 今日头条号「JAVA前线」
 *
 */
@Service
public class SafeRedisManager {
    @Resource
    private RedisClient RedisClient;
    @Resource
    private RedisLockManager redisLockManager;
    public String getDataSafe(String key, int lockExpireSeconds, int dataExpireSeconds, RedisBizCall bizCall, boolean alwaysRetry) {
        try {
            boolean getLockSuccess = false;
            while(true) {
                String value = redisClient.get(key);
                if (StringUtils.isNotEmpty(value)) {
                    return value;
                }
                /** 竞争分布式锁 **/
                if (getLockSuccess = redisLockManager.tryLock(key, lockExpireSeconds)) {
                    value = redisClient.get(key);
                    if (StringUtils.isNotEmpty(value)) {
                        return value;
                    }
                    /** 查询数据库 **/
                    value = bizCall.call();
                    /** 数据库无数据则返回**/
                    if (StringUtils.isEmpty(value)) {
                        return null;
                    }
                    /** 数据存入缓存 **/
                    redisClient.setex(key, dataExpireSeconds, value);
                    return value;
                } else {
                    if (!alwaysRetry) {
                        logger.warn("竞争分布式锁失败,key={}", key);
                        return null;
                    }
                    Thread.sleep(100L);
                    logger.warn("尝试重新获取数据,key={}", key);
                }
            }
        } catch (Exception ex) {
            logger.error("getDistributeSafeError", ex);
            return null;
        } finally {
            if (getLockSuccess) {
                redisLockManager.unLock(key);
            }
        }
    }
    public String getDataSafeRetry(String key, int lockExpireSeconds, int dataExpireSeconds, RedisBizCall bizCall, int retryMaxTimes) {
        try {
            int currentTimes = 0;
            boolean getLockSuccess = false;
            while(currentTimes < retryMaxTimes) {
                String value = redisClient.get(key);
                if (StringUtils.isNotEmpty(value)) {
                    return value;
                }
                /** 竞争分布式锁 **/
                if (getLockSuccess = redisLockManager.tryLock(key, lockExpireSeconds)) {
                    value = redisClient.get(key);
                    if (StringUtils.isNotEmpty(value)) {
                        return value;
                    }
                    /** 查询数据库 **/
                    value = bizCall.call();
                    /** 数据库无数据则返回**/
                    if (StringUtils.isEmpty(value)) {
                        return null;
                    }
                    /** 数据存入缓存 **/
                    redisClient.setex(key, seconds, value);
                    return value;
                } else {
                    Thread.sleep(100L);
                    logger.warn("尝试重新获取数据,key={}", key);
                    currentTimes++;
                }
            }
        } catch (Exception ex) {
            logger.error("getDistributeSafeRetryError", ex);
            return null;
        } finally {
            if (getLockSuccess) {
                redisLockManager.unLock(key);
            }
        }
    }
}

在上面代码中我们采用分布式锁,对访问数据库资源的行为进行了限制,同一时刻只有一个进程可以访问数据库资源。如果有数据则放入缓存,解决了缓存击穿问题。如果没有数据则结束循环,解决了缓存穿透问题。使用方法如下:

/**
 * 缓存工具使用
 *
 * @author 今日头条号「JAVA前线」
 *
 */
@Service
public class StudentService implements StudentService {
    private static final String KEY_PREFIX = "stuKey_";
    @Resource
    private StudentDao studentDao;
    @Resource
    private SafeRedisManager safeRedisManager;
    public Student getStudentInfo(String studentId) {
        String studentJSON = safeRedisManager.getDataRetry(KEY_PREFIX + studentId, 30, 600, new RedisBizCall() {
            public String call() {
                StudentDO student = studentDao.getStudentById(studentId);
                if (null == student) {
                    return StringUtils.EMPTY;
                }
                return JSON.toJSONString(student);
            }, 5);
            if(StringUtils.isEmpty(studentJSON) {
                return null;
            }
            return JSON.toJSONString(studentJSON, Student.class);
        }
    }
}

6 数据库与缓存一致性问题

本文到第五章节缓存击穿问题从原理到解决方案已经讲清楚了,这个章节我想引申一个问题:到底是先写缓存还是先写数据库,或者说数据库与缓存一致性怎么保证?

我的结论非常清晰明确:先写数据库再写缓存。核心思想是数据库和缓存之间追求最终一致性,如无必要则无需保证强一致性。

(1) 在缓存作为提升系统性能手段的背景下,不需要保证数据库和缓存的强一致性。如果非要保证二者的强一致性,会增大系统的复杂度没有必要

(2) 如果更新数据库成功,再更新缓存。此时存在两种情况:更新缓存成功则万事大吉。更新缓存失败没有关系,等待缓存失效,此处一定要合理设置失效时间

(3) 如果更新数据库失败,则操作失败,重试或者等待用户重新发起

(4) 数据库是持久化数据,是操作成功还是失败的判断依据。缓存是提升性能的手段,允许短时间和数据库的不一致

(5) 在互联网架构中,一般不追求强一致性,而追求最终一致性。如果非要保证缓存和数据库的一致性,本质上是在解决分布式一致性问题

(6) 分布式一致性问题解决方案有很多,可以选择比如两阶段提交、TCC、本地消息表、MQ事务性消息

7 文章总结

本文介绍了缓存击穿问题原因和解决方案,其中参考率CAS源码的自旋设计思想,结合分布式锁实现了缓存工具,希望文章对大家有所帮助。

05-24 10:41