1.Redis缓存双写一致性

2.数据库和缓存一致性的几种更新策略

2.1先更新数据库,再更新缓存

2.2先删除缓存,再更新数据库

2.3先更新数据库,再删除缓存

2.4先更新缓存,再更新数据库

3.Redis与MySQL数据双写一致性工程落地案例之canal

4.总结

1.Redis缓存双写一致性
我们都知道,只要我们使用redis,就会遇到缓存与数据库的双存储双写,那么只要是双写,就一定会有数据一致性问题,为了保证双写一致性,我们要先动redis还是mysql?

通常地来说,有为了保证数据的一致性,会有以下两种情况:

1)如果redis中数据,我们需要和数据库中的值相同

2)如果redis中数据,数据库里的值是最新值

2.数据库和缓存一致性的几种更新策略

我们要保证数据库和缓存的一致性,但是这毕竟是两个工具,必然会造成一定的延迟,所以我们要保证的是最终一致性

我们可以给缓存的key设置过期时间,这是保证最终一致性的解决方案。

我们对存入缓存的数据设置过期时间,所有写操作以数据库为准,对缓存操作只用尽最大努力即可。如果数据库写成功,缓存更新失败,只要到达过期时间,后面的读请求自然会从数据库中取新值,然后回写缓存,达到一致性。

上述的案例只是目前主流+成熟的做法,考虑到每个公司的业务性质不同,请选择适合我们自己公司的方法。

2.1先更新数据库,再更新缓存

当我们进行这种操作的时候,如果线程并发量足够大,一般会出现两个问题,我们用列表格的方式来进行描述:

异常情况1:

t1更新数据库的值
t2 查询请求,缓存命中旧数据,导致查询脏数据
t3更新缓存的数据

异常情况2:

t1更新数据库的值
t2 查询请求,缓存命中旧数据,导致查询脏数据
t3缓存更新失败,导致一定时间内查询的都为脏数据

2.2先删除缓存,再更新数据库

异常情况1:

t1删除缓存
t2 大量查询请求,直接导致缓存击穿
t3服务器宕机

异常情况1解决方案:
缓存击穿的解决方案,我们在前面这篇博客已经解释过了。
深入理解redis——缓存雪崩/缓存击穿/缓存穿透

异常情况2:

t1删除缓存
t2 查询请求,缓存无数据,去数据库查询旧数据
t3更新mysql的值,导致和缓存中数据不一致

异常情况2解决方案:
采用延时双删策略

    public void delayDoubleDeleteUser(TUser user) throws InterruptedException {
        //线程成功删除redis缓存
        redisTemplate.delete(CACHE_KEY_USER + user.getId());
        //线程再更新mysql
        userMapper.updateById(user);
        //休眠两秒钟,等待其它查询业务逻辑先执行完毕,缓存中已经完全是旧值的时候
        Thread.sleep(2000);
        //再删除一遍缓存
        redisTemplate.delete(CACHE_KEY_USER + user.getId());
    }

延时双删造成的问题1:那么,这个延时双删,线程要休眠多久呢?

一般在业务项目运行的时候,先统计下线程的读和写操作的时间,由此为基础,再根据写数据的休眠时间在读数据业务逻辑的耗时基础上增加百毫秒即可。

延时双删造成的问题2:这种同步策略造成吞吐量降低怎么办?

再开一个线程就可以了。

    public void delayDoubleDeleteUser(TUser user) throws InterruptedException, ExecutionException {
        //线程成功删除redis缓存
        redisTemplate.delete(CACHE_KEY_USER + user.getId());
        //线程再更新mysql
        userMapper.updateById(user);
        //休眠两秒钟,等待其它业务逻辑先执行完毕
        //开一个线程,再删除一遍缓存
        CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return redisTemplate.delete(CACHE_KEY_USER + user.getId());
        }).get();
    }

2.3先更新数据库,再删除缓存

这种是业内提倡的解决方案:

异常情况1:

t1更新数据库
t2 还未来得及删除,就从缓存里查出了旧数据
t3删除缓存

那么问题就来了,我们是先删除缓存,然后再更新数据库,还是先更新数据库,再删缓存呢?

一般地来说,对于一个不能保证事务性的操作,一定涉及"哪个任务先做,哪个任务后做"的问题,解决这个问题的方向是:如果出现不一致,谁先做对业务的影响较小,谁先执行。

先删缓存,再更新数据库:该方法的异常出现在,删除缓存后,在数据库还未更新完成,立马又把旧值刷新回缓存。

先更新数据库,再删缓存:该方法的异常出现在,更新数据库后,缓存短暂的时间还没有淘汰,而出现的旧数据被读取的问题。

我们可以很明显地得出,第一种方案,出现异常的概率更大,删除缓存后到写操作完成,中间的间隔时间,远比更新数据库后到删除缓存中的间隔时间要长,所以我们更应该选择第二种。

不过如果你一定要保证一致性怎么办?

没有办法做到绝对的一致性,这是由CAP理论决定的,缓存系统适用的场景就是非强一致性的场景,所以它属于CAP中的AP。

所以,我们得委曲求全,可以去做到BASE理论中说的最终一致性。

此时就引出了我们的canal工具!(下文会有使用讲解)

2.4先更新缓存,再更新数据库

一般没人会这么做,不建议缓存的数据比数据库超前

3.Redis与MySQL数据双写一致性工程落地案例之canal

canal是什么:
canal主要用于mysql数据库增量日志数据的订阅,消费和解析,是阿里巴巴开发并开源的,采用java语言开发。

历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;

总结:Canal是基于Mysql变更日志增量订阅和消费的组件订阅mysql的增量日志,再同步到其它组件(mysql/redis/mq等等),实现数据的最终一致性。

canal的工作原理:

在看canal的工作原理前,我们先了解一下mysql主从同步的工作原理:

Mysql的主从同步流程:
1.当master主服务器上的数据发生改变的时候,将其改变写入二进制事件日志文件中
2.slave从服务器会在一定时间间隔内对master主服务器上的二进制日志进行探测, 探测其是否发生改变。
如果探测到其发生了改变,则开启一个I/O Thread 请求read master的二进制日志。
3.同时master主服务器为每个I/O Thread启动一个dump(转储) Thread,用于向该I/O Thread发送二进制事件日志。
4.slave从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中。
5.slave从服务器将启动SQL Thread从中级日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致。
6.最后I/O Thread和SQL Thread执行完毕后将进行休眠状态,等待下一次被唤醒。

而canal也是模拟mysql slave的同步协议,伪装自己成为mysql Slave,向mysql Master发送dump协议,Mysql Master 收到dump请求,开始推送 binary log 给 slave (即 canal )canal 解析 binary log 对象(原始为 byte 流)

但是这种解决方案,也只能是做到最终一致性,无法到达强一致性。

canal的使用:

连接MySQL 键入命令:show variables like 'log_%'; 查看binlog是否开启

若log_bin的值为OFF,则需要对配置文件my.ini进行修改,加入如下代码:

log-bin=log-bin
binlog-format=ROW

ROW:模式除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
STATEMENT: 模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
MIX: 模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

修改conf /canal.properties配置文件,注册地址为本机数据库地址

修改conf/example/instance.properties配置文件,配置数据库信息

Windows下使用startup.bat

pom.xml增加依赖

<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
package com.example.demo.redisDemo.config.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @author sulingfeng
 * @title: RedisUtils
 * @projectName demo
 * @description: TODO
 * @date 2022/2/16 16:01
 */
public class RedisUtils {

    private static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379);
    }

    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool was not init");
    }


}

package com.example.demo.canalDemo;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.example.demo.redisDemo.config.utils.RedisUtils;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @auther
 * @create 2020-11-11 17:13
 */
public class RedisCanalClientExample {

    public static final Integer _60SECONDS = 60;

    public static void main(String args[]) {

        // 创建链接canal服务端 example为canal实例名字
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("199.16.1.135",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        System.out.println("----------------程序启动,开始监听mysql的变化:");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            //connector.subscribe("db2020.t_order");
            //订阅mysql这张表
            connector.subscribe("test.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;

            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                //如果没数据修改
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据修改
                    emptyCount = 0;
                    printEntry(message.getEntries());
                    System.out.println();
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    //主要干活的方法
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                //如果是新增
                if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                    //如果是删除
                } else if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }
    //数据库增加的时候,redis增加
    private static void redisInsert(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    insert=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    //数据库删除的时候,redis删除
    private static void redisDelete(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    delete=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.del(columns.get(0).getValue());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    //数据库修改的时候,redis也修改
    private static void redisUpdate(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
                System.out.println("---------update after: " + jedis.get(columns.get(0).getValue()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        long startTime = System.currentTimeMillis();

        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");

    }


}

原本数据库里的数据:

原本redis里的数据:

mysql增加了一条数据:

redis也增加了一条数据:

控制台:

严谨一点的做法的流程图:

4.总结

关于缓存的双写一致性,因为无法做到强一致性,我们还是以最终一致性为解决方案。

03-05 22:13