1、怎么确保 Redis 数据不丢失

 title=
Redis 是一种内存 ,它的数据都保存在内存中,因此在断电或重启等异常情况下,数据可能会丢失。为了确保 Redis 数据不丢失,可以采取以下措施:

  • 持久化
    Redis 支持两种持久化方式,分别是 RDB 和 AOF。RDB 是将 Redis 内存中的数据定期保存到磁盘中,而 AOF 则是将 Redis 执行的每个命令记录到日志文件中。这样可以在 Redis 重启时,通过加载持久化文件来恢复数据。
  • 主从复制
    Redis 支持主从复制,可以将主节点的数据复制到从节点中,从而实现数据备份和容灾。如果主节点出现故障,可以通过从节点来提供服务。
  • 集群模式
    Redis 支持集群模式,可以将数据分散存储到多个节点中,从而提高数据的可靠性和可用性。如果某个节点出现故障,可以通过其他节点来提供服务。
  • 内存快照
    Redis 支持内存快照,可以将 Redis 内存中的数据保存到文件中。如果出现异常情况,可以通过加载内存快照文件来恢复数据。
  • 数据备份
    定期备份 Redis 数据可以保证数据的安全和可靠性。可以使用 Redis 自带的备份工具或者第三方备份工具来备份数据,以防止数据丢失。

2、RDB 和 AOF 两种方式优劣

2.1、RDB

RDB 是意图在某一时刻保存一份完整的内存快照数据集到后缀为 .rdb的二进制文件中,文件中的内容是到那一刻为止内存中完整的数据状态,那一刻之后的操作跟它无关。

  • 优点
    因为是数据快照,所以生成的文件内容紧凑占用磁盘空间小,重启恢复到内存速度也较快,持久化的频率一般也会配置得比较低,并且执行过程交给子进程,对服务性能影响小
  • 缺点
    因为是保存整个内存的数据,所以执行的整个过程会相对较长;因为间隔较长,会丢失较多的数据,在间隔期内服务进程终止的话上一次创建快照到终止那一刻对 Redis 数据的改动都会丢失。

2.2、AOF

AOF 则是在 .aof 文件中以追加写指令的方式实现的。

  • 优点
    因为追加写指令执行的频率高、间隔短,所以间隔期内进程停止丢失的数据较少,数据比较完整。
  • 缺点
    也是因为执行频率高,影响服务性能;写指令跟数据本身比占用空间较大,导致落到磁盘上的文件也很
    大,重启恢复的时间长。

3、Redis 实现分布式锁的几种方案

3.1、SETNX + EXPIRE

setnx(SET IF NOT EXISTS)+ expire命令。先用setnx来抢锁,如果抢到锁,再用expire给锁设置一个过期时间,这样持有锁超时时释放锁,防止锁忘记释放。但此时setnx和expire两个命令无法保证原子性,例如:

//scss复制代码
if(jedis.setnx(key_resource_id, lock_value)==1){ //加锁
    expire(key_resource_id,100); //设置过期时间
    try{
    //业务代码块
    }catch(){}
    finally{
      jedis.del(key_resource_id); //释放锁
    }
}

3.2、SETNX + value(系统时间+过期时间)

可以把过期时间放到setnx的value值里面。如果加锁失败,再拿出value值校验一下即可。加锁代码如下:

//kotlin复制代码
long expires = System.currentTimeMillis()+ expireTime; //系统时间+设置的过期时间
String expiresStr = String.valueof(expires);
// 如果当前锁不存在,则加锁成功
if (jedis.setnx(key resource id, expiresStr) == 1) (
    return true
// 如果锁已经存在,获取锁的过期时间
String currentValueStr = jedis.get(key resource id);
// 如果获取到的过期时间,小于系统当前时间,表示已经过期
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis())
    // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
    String oldValueStr = jedis.getSet(key resource id, expiresStr);
    if (oldValueStr != null && oldValuestr.equals(currentValueStr)) {
    // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才可以加锁
    return true:
//其他情况均返回加锁失败
return false:

3.3、通过开源框架-Redisson

那么此时就要去想了,如果已经超过了加锁的过期时间,可是业务还没执行完成,这个时候怎么做呢?是把过期时间延长吗?显然不合理,可以通过开源框架-Redisson优化这个问题,简单来说,Redisson就是当一个线程获得锁以后,给该线程开启一个定时守护线程,每隔一段时间检查锁是否还
存在,存在则对锁的过期时间延长,防止锁过期提前释放。假设两个线程争夺统一公共资源:线程A获取锁,并通过哈希算法选择节点,执行Lua脚本加锁,同时其看门狗机制会启动一个watch dog(后台线程),每隔10秒检查线程,如果线程A还持有锁,那么就会不断的延长锁key的生存时间。线程B获得锁失败,就会订阅解锁消息,当获取锁到剩余过期时间后,调用信号量方法阻塞住,直到被唤醒或等待超时。一旦线程A释放了锁,就会广播解锁消息。于是,解锁消息的监听器会释放信号量,获取锁被阻塞的线程B就会被唤醒,并重新尝试获取锁。
Redisson 支持单点模式、主从模式、哨兵模式、集群模式,假设现为单点模式:

//scss复制代码 构造Config
Config config = new Config();
config.usesingleServen().setAddress("redis://ip:port").setPassword("Password,~#") ,setDa
tabase(0);
//构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
//获取锁实例
RLock rLock = redissonClient.getLock(lockKey);
try {
    //获取锁,waitTimeout为最大等待时间,超过这个值,则认为获取锁失败。leaseTime为锁的持有时间
    boolean res = rLock.tryLock((long)waitTimeout,(long)leaseTime,TimeUnit.SECONDS);
    if (res) {
    //业务块
    }catch (Exception e) {
    }finally{
    //解锁
    rLock.unlock();
}

4、Redis 分布式锁的缺陷

4.1、客户端长时间阻塞导致锁失效问题

客户端1得到了锁,因为网络问题或者GC等原因导致长时间阻塞,然后业务程序还没执行完锁就过期了,这时候客户端2也能正常拿到锁,可能会导致线程安全的问题。

4.2、Redis 服务器时钟漂移问题

如果redis服务器的机器时钟发生了向前跳跃,就会导致这个key过早超时失效,比如说客户端1拿到锁后,key的过期时间是12:02分,但redis服务器本身的时钟比客户端快了2分钟,导致key在12:00的时候就失效了,这时候,如果客户端1还没有释放锁的话,就可能导致多个客户端同时持有同一把锁的问题。

4.3、单点实例安全问题

如果redis是单master模式的,当这台机宕机的时候,那么所有的客户端都获取不到锁了,为了提高可用性,可能就会给这个master加一个slave,但是因为redis的主从同步是异步进行的,可能会出现客户端1设置完锁后,master挂掉,slave提升为master,因为异步复制的特性,客户端1设置的锁丢失了,这时候客户端2设置锁也能够成功,导致客户端1和客户端2同时拥有锁。

5、缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不生效,这些请求都会打到数据库。常用的解决方案有两种:

5.1、缓存空对象

  • 优点
    实现简单,维护方便
  • 缺点
    额外的内存消耗,可能造成短期的不一致(比如查询某key时,缓存不存在,数据库不存在,设置了空值,后续真正插入了一条该key的值,在空对象缓存未过期时,就造成了短期的不一致)
  • 代码示例
@GetMapping("list")
public Result<List<User>> list() {
    String key = "user";
    String json = (String) redisTemplate.opsForValue().get(key);
    List<User> list = new ArrayList<>();
    if (StringUtils.isNotBlank(json)) {
        list = JSON.parseArray(json, User.class);
        return Result.ok(list);
    }
    //防止缓存穿透
    if ("".equals(json)) {
        return Result.error("用户信息不存在");
    }
    //从数据库查询
    list = userService.list();
    //数据库也没有数据
    if (CollectionUtils.isEmpty(list)) {
        //缓存redis空对象
        redisTemplate.opsForValue().set(key, "", 10, TimeUnit.MINUTES);
        return Result.error("用户信息不存在");
    }
    redisTemplate.opsForValue().set(key, JSON.toJSONString(list), 10,TimeUnit.MINUTES);
    return Result.ok(list);
}

5.2、布隆过滤

  • 优点
    内存占用较少,没有多余key
  • 缺点
    实现复杂
    存在误判可能
    增加id的复杂度,避免被猜测id规律
    做好数据的基础格式校验
    加强用户权限校验
    做好热点参数的限流

6、缓存雪崩

缓存雪崩是指同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

  • 给不同的Key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

7、缓存击穿

缓存击穿问题也叫热点key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大压力。

解决方案优点缺点
互斥锁没有额外的内存消耗,保证一致性,实现简单线程需要等待、性能受影响,可能有死锁风险
逻辑过期线程无需等待、性能较好不保证一致性,有额外的内存消耗,实现复杂

7.1、互斥锁

  • 代码实现

    /**
     * 缓存击穿(互斥锁)
     * @param id 店铺id
     * @return Shop
     */
     public Shop queryWithMutex(Long id){
       String key = RedisConstants.CACHE_SHOP_KEY + id;
       // 从redis查询商铺缓存
       String shopJson = stringRedisTemplate.opsForValue().get(key);
       // 判断是否存在
       if (StrUtil.isNotBlank(shopJson)) {
           // 存在,直接返回
           return JSONUtil.toBean(shopJson, Shop.class);
       }
    
       // 判断命中的是否为空值
       if (StrUtil.isBlank(shopJson)) {
           // 返回一个错误信息
           return null;
       }
       // 锁key
       String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
       Shop shop;
       try {
           // 实现缓存重建
           // 获取互斥锁
           boolean isLock = tryLock(lockKey);
           // 判断是否获取成功
           if (!isLock) {
               // 失败,休眠并重试
               Thread.sleep(50);
               return queryWithMutex(id);
           }
    
           //成功,根据id查询并写入redis
           shop = getById(id);
    
           // 数据库不存在,返回错误
           if (shop == null) {
               // 将空值写入redis,避免缓存穿透
               // 返回错误信息
              stringRedisTemplate.opsForValue().set(key, "",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
               return null;
           }
           // 存在,写入redis并返回
          stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       } finally {
           // 释放互斥锁
           unlock(lockKey);
       }
       return shop;
     }
    
     /**
     * 上锁
     * @param key 互斥锁 key
     * @return boolean 上锁标记
     */
     private boolean tryLock(String key) {
       Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10,TimeUnit.SECONDS);
       return BooleanUtil.isTrue(flag);
     }
    
     /**
     * 删除锁
     * @param key 互斥锁 key
     */
     private void unlock(String key) {
       stringRedisTemplate.delete(key);
     }

7.2、逻辑过期

 title=

/**
  * 初始化店铺缓
  * @param id            店铺id
  * @param expireSeconds 逻辑过期时间
  * @throws InterruptedException
*/
public void saveShopToRedis(Long id, Long expireSeconds) throws InterruptedException {
    // 查询店铺数据
    Shop shop = getById(id);

    // 休眠100毫秒
    Thread.sleep(100);
    // 封装逻辑过期时间
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
    // 写入redis    
    stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(redisData));
}

// 单元测试,初始化店铺缓存
@Test
void testSaveShop1() throws InterruptedException {
    shopService1.saveShopToRedis(1L, 10L);
}

/* 线程池 */
private static final ExecutorService CACHE_REBUILD_EXECUTOR =  Executors.newFixedThreadPool(10);

/**
 * 缓存击穿(逻辑过期)
 *
 * @param id 店铺id
 * @return Shop
 */
public Shop queryWithLogicalExpire(Long id) {
    // 从redis查询商铺缓存
    String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);

    // 判断缓存是否命中,未命中,直接返回空if (StrUtil.isBlank(shopJson)) {
    // 存在,直接返回
    return null;
}

// 命中,需要先把json数据反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
// 转为json对象,再转为shop对象
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();

8、Redis 的四种模式

8.1、单机模式

单机模式是Redis最基本的部署方式。在单机模式中,Redis仅运行在一台服务器上,所有的读写操作都在这台服务器上完成。这种部署方式非常简单,因为不需要配置其他的服务器,但是它存在单点故障的风险,如果这台服务器出现问题,整个系统将会停止工作。
 title=

8.2、主从模式

主从模式是为了解决单机模式的单点故障问题而设计的。在主从模式中,一个Redis服务器被指定为主服务器,其他的Redis服务器则被指定为从服务器。主服务器可以执行读写操作,并将更新的数据同步到从服务器上。从服务器只能执行读操作,它们的数据是由主服务器同步过来的。这样,即使主服务器发生故障,从服务器也可以接管主服务器的工作。
 title=

8.3、哨兵模式

哨兵模式是为了解决主从模式中主服务器故障切换问题而设计的。在哨兵模式中,一个或多个Redis服务器被指定为哨兵服务器。哨兵服务器的主要工作是监控主服务器的状态,并在主服务器发生故障时,自动将从服务器切换为主服务器。哨兵服务器也可以监控多个主从集群,确保整个系统的可用性。
 title=

8.4、集群模式

集群模式是为了解决单个Redis服务器内存有限的问题而设计的。在集群模式中,多个Redis服务器被组成一个集群。集群中的每个节点都保存部分数据,当一个节点无法存储更多的数据时,系统会自动将一部分数据迁移到其他节点上。集群模式可以提高系统的吞吐量和可用性,但是它也需要更多的服务器资源和更复杂的部署和维护。
 title=

标签:Redis
本文到此就结束啦
Last modification:November 18, 2023
如果觉得我的文章对你有用,请随意赞赏