Redis缓存框架基本介绍
Redisredis 是完全开源免费的,是一个高性能的key-value数据库,目前市面上主流的数据库
Redis、Memcache、Tair(淘宝自研发)
Redis的官网:https://redis.io/
内存数据库(nosql数据库)、mysql、sqlserver
关系数据库存放在硬盘中 查询实现io操作
非关系数据库 Redis 持久化机制 淘汰策略
Jvm内置缓存框架 ECACH os cache
Redis应用场景
- 分布式锁
- token令牌生成
- 缓存使用
- 短信验证码code
- 分布式消息中间件(发布订阅)
redis线程模型
redis采用nio的io多路复用原则,一个线程去维护多个客户端链接。从而处理并发安全效率
redis穿透雪崩以及击穿(redis安全)
缓存穿透
- 使用不存在的key,频繁的进行查询。缓存无法命中,导致对数据库压力过大。
对接口实现限流
从数据库和redis都查询不到,将数据库null数值,写入到缓存中。
使用布隆过滤器解决缓存穿透
布隆过滤器基本介绍
隆过滤器适用于判断一个元素在集合中是否存在,但是可能会存在误判的问题。
实现的原理采用二进制向量数组和随机映射hash函数。
缓存击穿
- 热点key在某个时间点过期的时候,而恰好在这个时间点对这个Key有大量的并发请求过来,从而大量的请求打到db,并同时写入到redis
使用分布式锁
缓存雪崩
- 大量缓存集中的实效
过期时间随机,设置不一样的过期时间。
redis存储对象json和二进制
存放redis的对象不跨语言二进制就可以,如果跨语言,就需要使用json对象。
redis六大淘汰策略
redis数据存放在内存里面,如果高并发情况下 ,内存不够。有可能会爆。
防止内存内存爆,都会有淘汰策略。
- noeviction:当内存使用达到阈值的时候,所有引起申请内存的命令会报错。
- allkeys-lru:在主键空间中,优先移除最近未使用的key。(推荐)
- volatile-lru:在设置了过期时间的键空间中,优先移除最近未使用的key。
- allkeys-random:在主键空间中,随机移除某个key。
- volatile-random:在设置了过期时间的键空间中,随机移除某个key。
- volatile-ttl:在设置了过期时间的键空间中,具有更早过期时间的key优先移除。
如何配置Redis淘汰策略
在redis.conf文件中, 设置Redis 内存大小的限制,
我们可以设置maxmemory
当数据达到限定大小后,会选择配置的策略淘汰数据
比如:maxmemory 300mb。
通过配置 设置Redis的淘汰策略。比如:maxmemory-policy volatile-lru
Linux环境下安装Redis
推荐使用腾讯云的redis
环境安装Redis
- 上传Redis的安装包
https://logaaaaa.oss-cn-beijing.aliyuncs.com/redis-5.0.6.tar.gz - 解压我们的Redis安装包
2. tar -zxvf redis-5.0.6.tar.gz
3. mkdir /usr/redis
4. make install PREFIX=/usr/redis
5. 启动Redis cd /usr/redis/bin ./redis-server
环境核心配置
将Redis设置为后台启动
cp /usr/redis-5.0.6/redis.conf /usr/redis/bin
vi redis.conf daemonize yes
./redis-server ./redis.conf 重启启动Redis
ps aux | grep 'redis'
设置Redis账号密码
搜索:# requirepass foobared
requirepass 123456
客户端连接:auth 123456
设置Reids允许ip访问
注释掉bind 127.0.0.1
protected-mode no ###允许外界访问
mysql与redis数据不同步的问题
方案一:直接清理redis缓存,重新查询数据库
方案二:直接使用mq订阅mysql日志文件,增量同步redis中
方案三:使用阿里canal
redis持久化机制
数据都会缓存在-》redis内存
分为rdb与aof
rdb(默认)
定时持久化(全量同步),保存快照,是个dump.rdp,会有一个临时的rdb,以及最终的edb文件(一直在覆盖)。
在redis.conf中有三个数值
aof
增量操作日志(增量同步)
两种策略
在Redis的配置文件中存在三种同步方式,它们分别是:
appendfsync always #每次有数据修改发生时都会写入AOF文件,能够保证数据不丢失,但是效率非常低。
appendfsync everysec #每秒钟同步一次,可能会丢失1s内的数据,但是效率非常高。
appendfsync no #从不同步。高效但是数据不会被持久化。
直接修改redis.conf中 appendonly yes
建议最好还是使用everysec 既能够保证数据的同步、效率也还可以。
Aof是以执行命令的形式实现同步
redis过期回调
将订单id或者token,当为key,value为订单id,有效期为30分钟,对该key绑定一个事件回调,执行回调方法。主动查询数据库表。
使用Redis Key自动过期机制
当我们的key失效时,可以执行我们的客户端回调监听的方法。
需要在Redis中配置:
notify-keyspace-events “Ex”
springboot整合redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
@Component
public class RedisUtils {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void setString(String key, String value) {
setString(key, value, null);
}
public void setString(String key, String value, Long timeOut) {
stringRedisTemplate.opsForValue().set(key, value);
if (timeOut != null) {
stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
}
}
public String getString(String key) {
return stringRedisTemplate.opsForValue().get(key);
}
}
@Component
public class RedisTemplateUtils {
@Resource
private RedisTemplate<String, Object> redisTemplate;
public void setObject(String key, Object object) {
redisTemplate.opsForValue().set(key, object);
}
public Object getObjet(String key) {
return redisTemplate.opsForValue().get(key);
}
public void setObject(String key, Object value, Long timeOut) {
redisTemplate.opsForValue().set(key, value);
if (timeOut != null) {
redisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
}
}
}
spring:
redis:
host: 192.168.212.155
password: 123456
port: 6379
SpringBoot整合key失效监听
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* Redis失效事件 key
*
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
//失效打印
String expiraKey = message.toString();
}
}
redis事物策略(没有隔离级别)
- Multi 开启事务
- EXEC 提交事务
- Watch 可以监听一个或者多个key,在提交事务之前是否有发生了变化 如果发生边了变化就不会提交事务,没有发生变化才可以提交事务 版本号码 乐观锁
- watch name
- multi
- set name xiaoxiao
- exec
- Discard 取消提交事务
注意:Redis官方是没有提供回滚方法, 值提供了取消事务。
redis哨兵实现原理&rediscluster
Redis主从复制
基本概念:
单个Redis如果因为某种原因宕机的话,可能会导致Redis服务不可用,可以使用主从复制实现一主多从,主节点负责写的操作,从节点负责读的操作,主节点会定期将数据同步到从节点中,保证数据一致性的问题。
相关配置Redis.conf(从服务器配置)
replicaof <主服务器ip> <主服务器端口>
slaveof 192.168.212.155 6379
masterauth 123456
查看集群节点。 info replication
该主从同步方式存在 如果从节点非常多的话,会导致对主节点同步多个从节点压力非常大
可以采用树状类型解决该问题
Redis主从复制的缺陷
如果主节点存在了问题,整个Redis环境是不可以实现写的操作,需要人工更改配置变为主操作
如何解决该问题:使用哨兵机制可以帮助解决Redis集群主从选举策略。
Redis哨兵机制
Redis的哨兵机制就是解决我们以上主从复制存在缺陷(选举问题),解决问题保证我们的Redis高可用,实现自动化故障发现与故障转移。
哨兵机制原理
-
- 哨兵机制每隔10s时间只需要配置监听我们的主节点就可以获取当前整个Redis集群的环境列表,采用info 命令形式。
-
- 哨兵不建议是单机的,最好每个Redis节点都需要配置哨兵监听。
-
- 哨兵集群原理是如何:多个哨兵都执行同一个主的master节点,订阅到相同都通道,有新的哨兵加入都会向通道中发送自己服务的信息,该通道的订阅者可以发现新哨兵的加入,随后相互建立长连接。
-
- Master的故障发现 单个哨兵会向主的master节点发送ping的命令,如果master节点没有及时的响应,哨兵会认为该master节点为“主观不可用状态”会发送给其他都哨兵确认该Master节点是否不可用,当前确认的哨兵节点数>=quorum(可配置),会实现重新选举。
相关核心配置
cp /app/redis/redis/redis-5.0.3/sentinel.conf /usr/local/redis/bin/
cd /usr/local/redis/bin
vi sentinel.conf
sentinel monitor mymaster 192.168.212.160 6379 2(这个ip就是master)
sentinel auth-pass mymaster 123456
集群启动:./redis-sentinel sentinel.conf
RedisCluster集群
Redis3.0开始官方推出了集群模式 RedisCluster,原理采用hash槽的概念,预先分配16384个卡槽,并且将该卡槽分配给具体服务的节点;通过key进行crc16(key)%16384 获取余数,余数就是对应的卡槽的位置,一个卡槽可以存放多个不同的key,从而将读或者写转发到该卡槽的服务的节点。 最大的有点:动态扩容、缩容
RedisCluster集群模式环境搭建
mkdir rediscluster
cd rediscluster/
mkdir redis7000
mkdir redis7001
mkdir redis7002
mkdir redis7003
mkdir redis7004
mkdir redis7005
每个配置文件内容
daemonize yes #后台启动
protected-mode no ; ## 允许外部访问
port 7005 #修改端口号,从7000到7005
cluster-enabled yes #开启cluster,去掉注释
cluster-config-file 7000nodes.conf #自动生成(7000nodes.conf不能一样)
cluster-node-timeout 15000 #节点通信时间
logfile /usr/rediscluster/redis7000/redis.log(redis7000不能一样)
启动我们的redis
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7000/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7001/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7002/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7003/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7004/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7005/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7006/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7007/redis.conf
/usr/local/redis/bin/redis-cli --cluster create 192.168.31.185:7000 192.168.31.185:7001 192.168.31.185:7002 192.168.31.185:7003 192.168.31.185:7004 192.168.31.185:7005 --cluster-replicas 1 (建议最好使用服务器的ip地址搭建)
分配我们hash操作
![image-1655690418110](https://logaaaaa.oss-cn-beijing.aliyuncs.com/image-1655690418110.png)
/usr/redis/bin/redis-cli -h 192.168.31.185 -p 7000
![image-1655690442804](https://logaaaaa.oss-cn-beijing.aliyuncs.com/image-1655690442804.png)
/usr/local/redis/bin/redis-cli -h 192.168.31.185 -p 7002 -c
RedisCluster集群模式扩容节点
首先新增两台Redis节点 一主、一从;
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7006/redis.conf
/usr/local/redis/bin/redis-server /usr/local/rediscluster/redis7007/redis.conf
案例:
/usr/redis/bin/redis-cli --cluster add-node 新增集群地址:端口 集群中已经存在的任意一台连接地址:端口
/usr/local/redis/bin/redis-cli --cluster add-node 192.168.31.185:7007 192.168.31.185:7000 --cluster-slave --cluster-master-id 81c109475bfe3cf0e40cc7ab3f8977e4a7b98d5e
/usr/local/redis/bin/redis-cli --cluster add-node 192.168.31.185:7007 192.168.31.185:7000 --cluster --cluster-master-id 81c109475bfe3cf0e40cc7ab3f8977e4a7b98d5e
区分一下rdb文件即可。
新增的7006 (master)是没有任何槽位
分配Redis槽位扩容
/usr/local/redis/bin/redis-cli --cluster reshard 192.168.31.185:7000
### RedisCluster集群模式缩容节点
/usr/local/redis/bin/redis-cli --cluster reshard 192.168.31.185:7000 --cluster-from 5d94171eb34ed4396bf5b9db8efaab4d96d0cf10 --cluster-to 511058958a3b80dd600e060c2500050c6c5a02ab --cluster-slots
基于redis实现分布式锁
分布式锁的应用场景有那些
- 分布式任务调度平台保证任务的幂等性。
- 分布式全局id的生成
Zookeeper实现分布式锁思路
Zookeeper实现分布式锁核心采用临时节点+事件通知,因为zk节点路径是保证全局唯一的,当多个线程同时创建该临时节点,只要谁能够创建成功谁就能够获取到锁。
获取锁:当多个线程同时创建该临时节点,只要谁能够创建成功谁就能够获取到锁。
释放锁:关闭当前Session连接,自动的删除当前的zk节点路径,其他线程重新进入到获取锁阶段。
Redis分布式锁实现思路
Redis实现分布式锁基于SetNx命令,因为在Redis中key是保证是唯一的。所以当多个线程同时的创建setNx时,只要谁能够创建成功谁就能够获取到锁。
Set 命令 每次set时,可以修改原来旧值;
SetNx命令 每次SetNx检查该 key是否已经存在,如果已经存在的话不会执行任何操作。返回为0 如果已经不存在的话直接新增该key。
1:新增key成功 0 失败
获取锁的时候:当多个线程同时创建SetNx k,只要谁能够创建成功谁就能够获取到锁。
释放锁:可以对该key设置一个有效期可以避免死锁的现象。
手动-redis实现分布式锁代码部分(单节点)
依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.6.3</version>
</dependency>
工具类
package com.gtf.redisfbss;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
//protected static Logger logger = Logger.getLogger(RedisUtil.class);
private static String IP = "127.0.0.1";
//Redis的端口号
private static int PORT = 6379;
//可用连接实例的最大数目,默认值为8;
//如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
private static int MAX_ACTIVE = 100;
//控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
private static int MAX_IDLE = 20;
//等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int MAX_WAIT = 3000;
private static int TIMEOUT = 3000;
//在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
private static boolean TEST_ON_BORROW = true;
//在return给pool时,是否提前进行validate操作;
private static boolean TEST_ON_RETURN = true;
private static JedisPool jedisPool = null;
/**
* redis过期时间,以秒为单位
*/
public final static int EXRP_HOUR = 60 * 60; //一小时
public final static int EXRP_DAY = 60 * 60 * 24; //一天
public final static int EXRP_MONTH = 60 * 60 * 24 * 30; //一个月
/**
* 初始化Redis连接池
*/
private static void initialPool() {
try {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(MAX_ACTIVE);
config.setMaxIdle(MAX_IDLE);
config.setMaxWaitMillis(MAX_WAIT);
config.setTestOnBorrow(TEST_ON_BORROW);
jedisPool = new JedisPool(config, IP, PORT, TIMEOUT, "123456");
} catch (Exception e) {
//logger.error("First create JedisPool error : "+e);
e.getMessage();
}
}
/**
* 在多线程环境同步初始化
*/
private static synchronized void poolInit() {
if (jedisPool == null) {
initialPool();
}
}
/**
* 同步获取Jedis实例
*
* @return Jedis
*/
public synchronized static Jedis getJedis() {
if (jedisPool == null) {
poolInit();
}
Jedis jedis = null;
try {
if (jedisPool != null) {
jedis = jedisPool.getResource();
}
} catch (Exception e) {
e.getMessage();
// logger.error("Get jedis error : "+e);
}
return jedis;
}
/**
* 释放jedis资源
*
* @param jedis
*/
public static void returnResource(final Jedis jedis) {
if (jedis != null && jedisPool != null) {
jedisPool.returnResource(jedis);
}
}
public static Long sadd(String key, String... members) {
Jedis jedis = null;
Long res = null;
try {
jedis = getJedis();
res = jedis.sadd(key, members);
} catch (Exception e) {
//logger.error("sadd error : "+e);
e.getMessage();
}
return res;
}
}
分布式锁实现
@Component
public class RedisLock {
private static int lockSussess=1;
/** 获取锁
* @param lockKey 在redis创建的key
* @param notLockTime 尝试获取锁的超时时间
* @return 返回lock成功值
*/
public String getLock(String lockKey, int notLockTime,int timeout){
//获取redis连接
Jedis jedis = RedisUtil.getJedis();
//计算尝试获取锁的超时时间
long endTime=System.currentTimeMillis()+notLockTime;
//没有超时,否则退出循环
while (System.currentTimeMillis()<endTime){
String lockValue= UUID.randomUUID().toString();
Long setnx = jedis.setnx(lockKey, lockValue);
//当不同的jvm创建相同的key,只要能创建成功。获取锁成功
if (lockSussess==setnx) {
//加上有效期
jedis.expire(lockKey,timeout/1000);
return lockValue;
}
//否则 继续循环
}
try {
if (jedis !=null) {
jedis.close();
}
}catch (Exception e) {
}finally {
jedis.close();
}
return "";
}
/**
* 释放锁
* @param lockKey 在redis创建的key
* @param lockValue 获取到锁的value
*/
public boolean unlock(String lockKey,String lockValue){
Jedis jedis = RedisUtil.getJedis();
try {
String s = jedis.get(lockValue);
if (s .equals(lockValue)) {
return (jedis.del(lockKey) > 0) ? true : false;
}
}catch (Exception e) {
}finally {
jedis.close();
}
return false;
}
public static void main(String[] args) {
RedisLock redisLock = new RedisLock();
// 1.获取锁
String lockValue = redisLock.getLock("lockKey", 5000, 5000);
if (StringUtils.isEmpty(lockValue)) {
System.out.println(Thread.currentThread().getName() + ",获取锁失败!");
return;
}
// 2.获取锁成功执行业务逻辑
System.out.println(Thread.currentThread().getName() + ",获取成功,lockValue:" + lockValue);
// // 3.释放lock锁
redisLock.unlock("lockKey", lockValue);
}
}