Redis缓存框架基本介绍

Redisredis 是完全开源免费的,是一个高性能的key-value数据库,目前市面上主流的数据库
Redis、Memcache、Tair(淘宝自研发)

Redis的官网:https://redis.io/

内存数据库(nosql数据库)、mysql、sqlserver
关系数据库存放在硬盘中 查询实现io操作
非关系数据库 Redis 持久化机制 淘汰策略
Jvm内置缓存框架 ECACH os cache

Redis应用场景

  1. 分布式锁
  2. token令牌生成
  3. 缓存使用
  4. 短信验证码code
  5. 分布式消息中间件(发布订阅)

redis线程模型

redis采用nio的io多路复用原则,一个线程去维护多个客户端链接。从而处理并发安全效率

redis穿透雪崩以及击穿(redis安全)

缓存穿透

  • 使用不存在的key,频繁的进行查询。缓存无法命中,导致对数据库压力过大。
    对接口实现限流
    从数据库和redis都查询不到,将数据库null数值,写入到缓存中。

使用布隆过滤器解决缓存穿透

布隆过滤器基本介绍

隆过滤器适用于判断一个元素在集合中是否存在,但是可能会存在误判的问题。
实现的原理采用二进制向量数组和随机映射hash函数。

缓存击穿

  • 热点key在某个时间点过期的时候,而恰好在这个时间点对这个Key有大量的并发请求过来,从而大量的请求打到db,并同时写入到redis
    使用分布式锁

缓存雪崩

  • 大量缓存集中的实效
    过期时间随机,设置不一样的过期时间。

redis存储对象json和二进制

存放redis的对象不跨语言二进制就可以,如果跨语言,就需要使用json对象。

redis六大淘汰策略

redis数据存放在内存里面,如果高并发情况下 ,内存不够。有可能会爆。
防止内存内存爆,都会有淘汰策略。

  • noeviction:当内存使用达到阈值的时候,所有引起申请内存的命令会报错。
  • allkeys-lru:在主键空间中,优先移除最近未使用的key。(推荐)
  • volatile-lru:在设置了过期时间的键空间中,优先移除最近未使用的key。
  • allkeys-random:在主键空间中,随机移除某个key。
  • volatile-random:在设置了过期时间的键空间中,随机移除某个key。
  • volatile-ttl:在设置了过期时间的键空间中,具有更早过期时间的key优先移除。

如何配置Redis淘汰策略

在redis.conf文件中, 设置Redis 内存大小的限制,
我们可以设置maxmemory
image-1655360464799
当数据达到限定大小后,会选择配置的策略淘汰数据
比如:maxmemory 300mb。
image-1655360473717
通过配置 设置Redis的淘汰策略。比如:maxmemory-policy volatile-lru

Linux环境下安装Redis

推荐使用腾讯云的redis

环境安装Redis

  1. 上传Redis的安装包
    https://logaaaaa.oss-cn-beijing.aliyuncs.com/redis-5.0.6.tar.gz
  2. 解压我们的Redis安装包
2.  tar -zxvf redis-5.0.6.tar.gz
3.	mkdir /usr/redis
4.	make install PREFIX=/usr/redis
5.	启动Redis cd /usr/redis/bin     ./redis-server 

环境核心配置

将Redis设置为后台启动

cp /usr/redis-5.0.6/redis.conf    /usr/redis/bin
vi redis.conf  daemonize yes
./redis-server ./redis.conf 重启启动Redis
ps aux | grep 'redis'

设置Redis账号密码

搜索:# requirepass foobared 
requirepass 123456

客户端连接:auth 123456

设置Reids允许ip访问
注释掉bind 127.0.0.1
protected-mode no ###允许外界访问

mysql与redis数据不同步的问题

方案一:直接清理redis缓存,重新查询数据库
方案二:直接使用mq订阅mysql日志文件,增量同步redis中
方案三:使用阿里canal

redis持久化机制

数据都会缓存在-》redis内存
分为rdb与aof

rdb(默认)

定时持久化(全量同步),保存快照,是个dump.rdp,会有一个临时的rdb,以及最终的edb文件(一直在覆盖)。
在redis.conf中有三个数值
image-1655346455185

aof

增量操作日志(增量同步)
两种策略
在Redis的配置文件中存在三种同步方式,它们分别是:
appendfsync always #每次有数据修改发生时都会写入AOF文件,能够保证数据不丢失,但是效率非常低。
appendfsync everysec #每秒钟同步一次,可能会丢失1s内的数据,但是效率非常高。
appendfsync no #从不同步。高效但是数据不会被持久化。
直接修改redis.conf中 appendonly yes
建议最好还是使用everysec 既能够保证数据的同步、效率也还可以。
image-1655347054339
Aof是以执行命令的形式实现同步

redis过期回调

将订单id或者token,当为key,value为订单id,有效期为30分钟,对该key绑定一个事件回调,执行回调方法。主动查询数据库表。

使用Redis Key自动过期机制

当我们的key失效时,可以执行我们的客户端回调监听的方法。
需要在Redis中配置:
notify-keyspace-events “Ex”

springboot整合redis

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
@Component
public class RedisUtils {

  @Autowired
  private StringRedisTemplate stringRedisTemplate;

  public void setString(String key, String value) {
      setString(key, value, null);
  }

  public void setString(String key, String value, Long timeOut) {
      stringRedisTemplate.opsForValue().set(key, value);
      if (timeOut != null) {
          stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
      }
  }

  public String getString(String key) {
      return stringRedisTemplate.opsForValue().get(key);
  }
}
@Component
public class RedisTemplateUtils {

  @Resource
  private RedisTemplate<String, Object> redisTemplate;

  public void setObject(String key, Object object) {
      redisTemplate.opsForValue().set(key, object);
  }

  public Object getObjet(String key) {
      return redisTemplate.opsForValue().get(key);
  }

  public void setObject(String key, Object value, Long timeOut) {
      redisTemplate.opsForValue().set(key, value);
      if (timeOut != null) {
          redisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
      }
  }

}


spring:
redis:
  host: 192.168.212.155
  password: 123456
  port: 6379

SpringBoot整合key失效监听

@Configuration
public class RedisListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}
  
  
  @Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }


    /**
     * Redis失效事件 key
     *
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        //失效打印
        String expiraKey = message.toString();
        
    }
}

redis事物策略(没有隔离级别)

  • Multi 开启事务
  • EXEC 提交事务
  • Watch 可以监听一个或者多个key,在提交事务之前是否有发生了变化 如果发生边了变化就不会提交事务,没有发生变化才可以提交事务 版本号码 乐观锁
  • watch name
  • multi
  • set name xiaoxiao
  • exec
  • Discard 取消提交事务
    注意:Redis官方是没有提供回滚方法, 值提供了取消事务。

redis哨兵实现原理&rediscluster

Redis主从复制

基本概念:
单个Redis如果因为某种原因宕机的话,可能会导致Redis服务不可用,可以使用主从复制实现一主多从,主节点负责写的操作,从节点负责读的操作,主节点会定期将数据同步到从节点中,保证数据一致性的问题。
image-1655368782469

相关配置Redis.conf(从服务器配置)

replicaof <主服务器ip> <主服务器端口>

slaveof 192.168.212.155 6379
masterauth 123456

查看集群节点。 info replication
image-1655370508831
该主从同步方式存在 如果从节点非常多的话,会导致对主节点同步多个从节点压力非常大
可以采用树状类型解决该问题

Redis主从复制的缺陷

如果主节点存在了问题,整个Redis环境是不可以实现写的操作,需要人工更改配置变为主操作
如何解决该问题:使用哨兵机制可以帮助解决Redis集群主从选举策略。

Redis哨兵机制

Redis的哨兵机制就是解决我们以上主从复制存在缺陷(选举问题),解决问题保证我们的Redis高可用,实现自动化故障发现与故障转移。

哨兵机制原理

    1. 哨兵机制每隔10s时间只需要配置监听我们的主节点就可以获取当前整个Redis集群的环境列表,采用info 命令形式。
    1. 哨兵不建议是单机的,最好每个Redis节点都需要配置哨兵监听。
    1. 哨兵集群原理是如何:多个哨兵都执行同一个主的master节点,订阅到相同都通道,有新的哨兵加入都会向通道中发送自己服务的信息,该通道的订阅者可以发现新哨兵的加入,随后相互建立长连接。
    1. Master的故障发现 单个哨兵会向主的master节点发送ping的命令,如果master节点没有及时的响应,哨兵会认为该master节点为“主观不可用状态”会发送给其他都哨兵确认该Master节点是否不可用,当前确认的哨兵节点数>=quorum(可配置),会实现重新选举。

相关核心配置

cp /app/redis/redis/redis-5.0.3/sentinel.conf /usr/local/redis/bin/
cd /usr/local/redis/bin
vi sentinel.conf
sentinel monitor mymaster 192.168.212.160 6379 2(这个ip就是master)
sentinel auth-pass mymaster 123456

集群启动:./redis-sentinel sentinel.conf

RedisCluster集群

Redis3.0开始官方推出了集群模式 RedisCluster,原理采用hash槽的概念,预先分配16384个卡槽,并且将该卡槽分配给具体服务的节点;通过key进行crc16(key)%16384 获取余数,余数就是对应的卡槽的位置,一个卡槽可以存放多个不同的key,从而将读或者写转发到该卡槽的服务的节点。 最大的有点:动态扩容、缩容

RedisCluster集群模式环境搭建


mkdir rediscluster
cd rediscluster/
mkdir redis7000
mkdir redis7001
mkdir redis7002
mkdir redis7003
mkdir redis7004
mkdir redis7005


每个配置文件内容

daemonize yes #后台启动
protected-mode no ; ## 允许外部访问
port 7005 #修改端口号,从7000到7005
cluster-enabled yes #开启cluster,去掉注释
cluster-config-file 7000nodes.conf #自动生成(7000nodes.conf不能一样)
cluster-node-timeout 15000 #节点通信时间
logfile   /usr/rediscluster/redis7000/redis.log(redis7000不能一样)


启动我们的redis

/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7000/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7001/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7002/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7003/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7004/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7005/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7006/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7007/redis.conf

/usr/local/redis/bin/redis-cli --cluster create  192.168.31.185:7000  192.168.31.185:7001  192.168.31.185:7002  192.168.31.185:7003  192.168.31.185:7004  192.168.31.185:7005  --cluster-replicas 1 (建议最好使用服务器的ip地址搭建)

分配我们hash操作

![image-1655690418110](https://logaaaaa.oss-cn-beijing.aliyuncs.com/image-1655690418110.png)

/usr/redis/bin/redis-cli -h 192.168.31.185 -p 7000
![image-1655690442804](https://logaaaaa.oss-cn-beijing.aliyuncs.com/image-1655690442804.png)
/usr/local/redis/bin/redis-cli -h 192.168.31.185 -p 7002 -c

RedisCluster集群模式扩容节点

首先新增两台Redis节点 一主、一从;

/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7006/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7007/redis.conf
案例:
/usr/redis/bin/redis-cli --cluster add-node 新增集群地址:端口 集群中已经存在的任意一台连接地址:端口
image-1655693476940
/usr/local/redis/bin/redis-cli --cluster add-node 192.168.31.185:7007 192.168.31.185:7000 --cluster-slave --cluster-master-id 81c109475bfe3cf0e40cc7ab3f8977e4a7b98d5e

/usr/local/redis/bin/redis-cli --cluster add-node 192.168.31.185:7007 192.168.31.185:7000 --cluster --cluster-master-id 81c109475bfe3cf0e40cc7ab3f8977e4a7b98d5e
image-1655692314215
区分一下rdb文件即可。

image-1655693559088

新增的7006 (master)是没有任何槽位

分配Redis槽位扩容
/usr/local/redis/bin/redis-cli --cluster reshard 192.168.31.185:7000
image-1655693983411

### RedisCluster集群模式缩容节点

/usr/local/redis/bin/redis-cli --cluster reshard 192.168.31.185:7000 --cluster-from 5d94171eb34ed4396bf5b9db8efaab4d96d0cf10 --cluster-to 511058958a3b80dd600e060c2500050c6c5a02ab --cluster-slots

基于redis实现分布式锁

分布式锁的应用场景有那些

  • 分布式任务调度平台保证任务的幂等性。
  • 分布式全局id的生成

Zookeeper实现分布式锁思路

Zookeeper实现分布式锁核心采用临时节点+事件通知,因为zk节点路径是保证全局唯一的,当多个线程同时创建该临时节点,只要谁能够创建成功谁就能够获取到锁。

获取锁:当多个线程同时创建该临时节点,只要谁能够创建成功谁就能够获取到锁。
释放锁:关闭当前Session连接,自动的删除当前的zk节点路径,其他线程重新进入到获取锁阶段。

Redis分布式锁实现思路

Redis实现分布式锁基于SetNx命令,因为在Redis中key是保证是唯一的。所以当多个线程同时的创建setNx时,只要谁能够创建成功谁就能够获取到锁。

Set 命令 每次set时,可以修改原来旧值;
SetNx命令 每次SetNx检查该 key是否已经存在,如果已经存在的话不会执行任何操作。返回为0 如果已经不存在的话直接新增该key。
1:新增key成功 0 失败

获取锁的时候:当多个线程同时创建SetNx k,只要谁能够创建成功谁就能够获取到锁。
释放锁:可以对该key设置一个有效期可以避免死锁的现象。

手动-redis实现分布式锁代码部分(单节点)

依赖

      <dependency>
          <groupId>redis.clients</groupId>
          <artifactId>jedis</artifactId>
          <version>3.6.3</version>
      </dependency>

工具类

package com.gtf.redisfbss;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {
   //protected static Logger logger = Logger.getLogger(RedisUtil.class);
   
   private static String IP = "127.0.0.1";

   //Redis的端口号
   private static int PORT = 6379;

   //可用连接实例的最大数目,默认值为8;
   //如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
   private static int MAX_ACTIVE = 100;

   //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
   private static int MAX_IDLE = 20;

   //等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
   private static int MAX_WAIT = 3000;

   private static int TIMEOUT = 3000;

   //在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
   private static boolean TEST_ON_BORROW = true;

   //在return给pool时,是否提前进行validate操作;
   private static boolean TEST_ON_RETURN = true;

   private static JedisPool jedisPool = null;

   /**
    * redis过期时间,以秒为单位
    */
   public final static int EXRP_HOUR = 60 * 60; //一小时
   public final static int EXRP_DAY = 60 * 60 * 24; //一天
   public final static int EXRP_MONTH = 60 * 60 * 24 * 30; //一个月

   /**
    * 初始化Redis连接池
    */
   private static void initialPool() {
       try {
           JedisPoolConfig config = new JedisPoolConfig();
           config.setMaxTotal(MAX_ACTIVE);
           config.setMaxIdle(MAX_IDLE);
           config.setMaxWaitMillis(MAX_WAIT);
           config.setTestOnBorrow(TEST_ON_BORROW);

           jedisPool = new JedisPool(config, IP, PORT, TIMEOUT, "123456");
       } catch (Exception e) {
           //logger.error("First create JedisPool error : "+e);
           e.getMessage();
       }
   }


   /**
    * 在多线程环境同步初始化
    */
   private static synchronized void poolInit() {
       if (jedisPool == null) {
           initialPool();
       }
   }


   /**
    * 同步获取Jedis实例
    *
    * @return Jedis
    */
   public synchronized static Jedis getJedis() {
       if (jedisPool == null) {
           poolInit();
       }
       Jedis jedis = null;
       try {
           if (jedisPool != null) {
               jedis = jedisPool.getResource();
           }
       } catch (Exception e) {
           e.getMessage();
           // logger.error("Get jedis error : "+e);
       }
       return jedis;
   }


   /**
    * 释放jedis资源
    *
    * @param jedis
    */
   public static void returnResource(final Jedis jedis) {
       if (jedis != null && jedisPool != null) {
           jedisPool.returnResource(jedis);
       }
   }

   public static Long sadd(String key, String... members) {
       Jedis jedis = null;
       Long res = null;
       try {
           jedis = getJedis();
           res = jedis.sadd(key, members);
       } catch (Exception e) {
           //logger.error("sadd  error : "+e);
           e.getMessage();
       }
       return res;
   }
}

分布式锁实现

@Component
public class RedisLock {
    private static int lockSussess=1;
    /**  获取锁
     * @param lockKey 在redis创建的key
     * @param notLockTime  尝试获取锁的超时时间
     * @return  返回lock成功值
     */
    public String getLock(String lockKey, int notLockTime,int timeout){
        //获取redis连接
        Jedis jedis = RedisUtil.getJedis();
        //计算尝试获取锁的超时时间
        long endTime=System.currentTimeMillis()+notLockTime;
        //没有超时,否则退出循环
        while (System.currentTimeMillis()<endTime){
            String lockValue= UUID.randomUUID().toString();
            Long setnx = jedis.setnx(lockKey, lockValue);
            //当不同的jvm创建相同的key,只要能创建成功。获取锁成功
            if (lockSussess==setnx) {
                //加上有效期
                jedis.expire(lockKey,timeout/1000);
                return lockValue;
            }
            //否则  继续循环
        }
        try {
            if (jedis !=null) {
                jedis.close();
            }
        }catch (Exception e) {

        }finally {
            jedis.close();
        }
        return "";
    }

    /**
     * 释放锁
     * @param lockKey    在redis创建的key
     * @param lockValue  获取到锁的value
     */
    public boolean unlock(String lockKey,String lockValue){
        Jedis jedis = RedisUtil.getJedis();
        try {
            String s = jedis.get(lockValue);
            if (s .equals(lockValue)) {
               return  (jedis.del(lockKey) > 0) ? true : false;
            }
        }catch (Exception e) {

        }finally {
            jedis.close();
        }
        return false;
    }

    public static void main(String[] args) {
        RedisLock redisLock = new RedisLock();
        // 1.获取锁
        String lockValue = redisLock.getLock("lockKey", 5000, 5000);
        if (StringUtils.isEmpty(lockValue)) {
            System.out.println(Thread.currentThread().getName() + ",获取锁失败!");
            return;
        }
        // 2.获取锁成功执行业务逻辑
        System.out.println(Thread.currentThread().getName() + ",获取成功,lockValue:" + lockValue);
//        // 3.释放lock锁
        redisLock.unlock("lockKey", lockValue);

    }
}