公司公众网站微信平台建设方案网站的建设项目是什么
公司公众网站微信平台建设方案,网站的建设项目是什么,网站栏目功能,wordpress电脑手机端同时一、概述
最近几天公司项目开发上线完成#xff0c;做个收获总结吧~ 今天记录Redis的收获和提升。
二、Redis异步队列
Redis做异步队列一般使用 list 结构作为队列#xff0c;rpush 生产消息#xff0c;lpop 消费消息。当 lpop 没有消息的时候#xff0c;要适当sleep再…一、概述
最近几天公司项目开发上线完成做个收获总结吧~ 今天记录Redis的收获和提升。
二、Redis异步队列
Redis做异步队列一般使用 list 结构作为队列rpush 生产消息lpop 消费消息。当 lpop 没有消息的时候要适当sleep再重试。若不用 sleeplist 还有个指令叫 blpop在没有消息的时候它会阻塞队列直到消息到来。
若要生产一次消费多次的需求可以使用 pub/sub 主题订阅者模式可以实现 1:N 的消息队列。但是缺点在消费者下线的情况下生产的消息会丢失还得使用专业的消息队列如 RabbitMQ等。
若要redis实现延时队列使用 sortedset拿时间戳作为score消息内容作为 key 调用 zadd 来生产消息消费者用 zrangebyscore 指令获取 N 秒之前的数据轮询进行处理。
三、Redis分布式锁
核心思路是先用setnx来争抢锁抢到之后再用 expire 给锁加一个过期时间防止锁忘记释放。若在 setnx 之后执行expire之前进程意外崩溃或者要重启维护导致锁永远得不到释放就要采用 set 指令配合非常复杂的参数可以同时把 setnx 和 expire 合成一条指令来用
SET key value NX EX 103.1 Redis分布式锁的实现
pom.xml引入依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-redis/artifactIdversion1.4.7.RELEASE/version
/dependency配置Redis spring.redis.host127.0.0.1
spring.redis.port6379
spring.redis.database0
spring.redis.password123456
spring.redis.timeout10000# 设置jedis连接池
spring.redis.jedis.pool.max-active50
spring.redis.jedis.pool.min-idle20配置RedisConfig属性
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;Configuration
public class RedisConfig {Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws Exception {RedisTemplate redisTemplate new RedisTemplate();redisTemplate.setConnectionFactory(redisConnectionFactory);// 3.创建 序列化类GenericToStringSerializer genericToStringSerializer new GenericToStringSerializer(Object.class);redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(genericToStringSerializer);
// redisTemplate.setKeySerializer(new StringRedisSerializer());
// redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());redisTemplate.setDefaultSerializer(new StringRedisSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}
}RedisLock工具类
import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;Slf4j
Component
public class RedisLock {Resourceprivate RedisTemplate redisTemplate;private static MapString, LockInfo lockInfoMap new ConcurrentHashMap();private static final Long SUCCESS 1L;Datapublic static class LockInfo {private String key;private String value;private int expireTime;//更新时间private long renewalTime;//更新间隔private long renewalInterval;public static LockInfo getLockInfo(String key, String value, int expireTime) {LockInfo lockInfo new LockInfo();lockInfo.setKey(key);lockInfo.setValue(value);lockInfo.setExpireTime(expireTime);lockInfo.setRenewalTime(System.currentTimeMillis());lockInfo.setRenewalInterval(expireTime * 2000 / 3);return lockInfo;}}/*** Lua脚本* // 加锁* if* redis.call(setNx,KEYS[1],ARGV[1])* then* if redis.call(get,KEYS[1])ARGV[1]* return redis.call(expire,KEYS[1],ARGV[2])* else* return 0* end* end** // 解锁* redis.call(get, KEYS[1]) ARGV[1]* then* return redis.call(del, KEYS[1])* else* return 0** //更新时间* if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(expire, KEYS[1], ARGV[2]) else return 0 end*//*** 使用lua脚本加锁** param lockKey 锁* param value 身份标识保证锁不会被其他人释放* param expireTime 锁的过期时间单位秒* Desc 注意事项redisConfig配置里面必须使用 genericToStringSerializer序列化,否则获取不了返回值*/public boolean tryLock(String lockKey, String value, int expireTime) {String luaScript if redis.call(setNx,KEYS[1],ARGV[1]) then if redis.call(get,KEYS[1])ARGV[1] then return redis.call(expire,KEYS[1],ARGV[2]) else return 0 end end;DefaultRedisScriptBoolean redisScript new DefaultRedisScript();redisScript.setResultType(Boolean.class);redisScript.setScriptText(luaScript);ListString keys new ArrayList();keys.add(lockKey);//Object result redisTemplate.execute(redisScript, Collections.singletonList(lockKey),value,expireTime );// Object result redisTemplate.execute(redisScript, new StringRedisSerializer(), new StringRedisSerializer(), Collections.singletonList(lockKey), identity, expireTime);Object result redisTemplate.execute(redisScript, keys, value, expireTime);log.info(已获取到{}对应的锁!, lockKey);if (expireTime 10) {lockInfoMap.put(lockKey value, LockInfo.getLockInfo(lockKey, value, expireTime));}return (boolean) result;}/*** 使用lua脚本释放锁** param lockKey* param value* return 成功返回true, 失败返回false*/public boolean unlock(String lockKey, String value) {lockInfoMap.remove(lockKey value);String luaScript if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end;DefaultRedisScriptBoolean redisScript new DefaultRedisScript();redisScript.setResultType(Boolean.class);redisScript.setScriptText(luaScript);ListString keys new ArrayList();keys.add(lockKey);Object result redisTemplate.execute(redisScript, keys, value);log.info(解锁成功{}, result);return (boolean) result;}/*** 使用lua脚本更新redis锁的过期时间** param lockKey* param value* return 成功返回true, 失败返回false*/public boolean renewal(String lockKey, String value, int expireTime) {String luaScript if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(expire, KEYS[1], ARGV[2]) else return 0 end;DefaultRedisScriptBoolean redisScript new DefaultRedisScript();redisScript.setResultType(Boolean.class);redisScript.setScriptText(luaScript);ListString keys new ArrayList();keys.add(lockKey);Object result redisTemplate.execute(redisScript, keys, value, expireTime);log.info(更新redis锁的过期时间{}, result);return (boolean) result;}/*** redisTemplate加锁** param lockKey 锁* param value 身份标识保证锁不会被其他人释放* param expireTime 锁的过期时间单位秒* return 成功返回true, 失败返回false*/public boolean lock(String lockKey, String value, long expireTime) {return redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);}/*** redisTemplate解锁** param key* param value* return 成功返回true, 失败返回false*/public boolean unlock2(String key, String value) {Object currentValue redisTemplate.opsForValue().get(key);boolean result false;if (StringUtils.isNotEmpty(String.valueOf(currentValue)) currentValue.equals(value)) {result redisTemplate.opsForValue().getOperations().delete(key);}return result;}/*** 定时去检查redis锁的过期时间* Param* Return*/Scheduled(fixedRate 5000L)Async(redisExecutor)public void renewal() {long now System.currentTimeMillis();for (Map.EntryString, LockInfo lockInfoEntry : lockInfoMap.entrySet()) {LockInfo lockInfo lockInfoEntry.getValue();if (lockInfo.getRenewalTime() lockInfo.getRenewalInterval() now) {renewal(lockInfo.getKey(), lockInfo.getValue(), lockInfo.getExpireTime());lockInfo.setRenewalTime(now);log.info(lockInfo {}, JSON.toJSONString(lockInfo));}}}/*** 分布式锁设置单独线程池* Param* Return*/Bean(redisExecutor)public Executor redisExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setQueueCapacity(1);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix(redis-renewal-);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());return executor;}
}开启定时任务在启动类上加 EnableScheduling 注解
SpringBootApplication
MapperScan(value com.example.recordlog.mapper)
EnableAspectJAutoProxy(proxyTargetClass true)
//开启定时任务
EnableScheduling
public class RecordLogApplication {public static void main(String[] args) {SpringApplication.run(RecordLogApplication.class, args);}
}Controller测试接口 import com.example.recordlog.tools.RedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import java.util.List;
import java.util.UUID;RestController
RequestMapping(/api)
public class OperateController {Autowired
private RedisLock redisLock;RequestMapping(value /locked, method {RequestMethod.GET})
public void lockedTest() {String key String.format(data-mining:task_statistic:%d, System.currentTimeMillis());String requestId UUID.randomUUID().toString();boolean locked false;try {locked redisLock.tryLock(key, requestId, 30);if (!locked) {return;}//执行业务逻辑System.out.println(----------------执行业务逻辑);} finally {if (locked) {redisLock.unlock(key, requestId);}}}
}3.2 锁过期问题
上述代码提到使用lua脚本处理锁更新就顺带记录当Redis 分布式锁过期但任务未完成时的三种处理方案。
锁过期问题是在使用Redis分布式锁时锁恰好到过期时间但业务逻辑还没有处理完毕这可能导致多个进程同时进入临界区造成数据不一致或业务逻辑冲突、资源浪费等其他问题。 假设分布式锁使用的简单逻辑如下
获取锁
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;public class RedisLock {private Jedis jedis;private String lockKey;private int lockExpire;public RedisLock(Jedis jedis, String lockKey, int lockExpire) {this.jedis jedis;this.lockKey lockKey;this.lockExpire lockExpire;}public boolean tryLock(String requestId) {SetParams params new SetParams();params.nx().px(lockExpire);String result jedis.set(lockKey, requestId, params);return OK.equals(result);}public void unlock(String requestId) {String script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end;jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));}
}使用锁 Jedis jedis new Jedis(localhost);
RedisLock redisLock new RedisLock(jedis, my_lock, 5000);String requestId UUID.randomUUID().toString();
if (redisLock.tryLock(requestId)) {try {// 业务逻辑Thread.sleep(6000); // 模拟业务逻辑处理时间超过锁过期时间} catch (InterruptedException e) {e.printStackTrace();} finally {redisLock.unlock(requestId);}
} else {System.out.println(获取锁失败);
}解决方案 1自动续期当锁接近过期时自动延长锁的过期时间确保业务逻辑在锁持有期间不会被其他进程获取。使用 ScheduledExecutorService 定期检查并延长锁的过期时间确保锁在业务逻辑处理完之前不会过期。提供简单代码思路
public class RedisLockWithRenewal extends RedisLock {private ScheduledExecutorService scheduler;public RedisLockWithRenewal(Jedis jedis, String lockKey, int lockExpire) {super(jedis, lockKey, lockExpire);scheduler Executors.newScheduledThreadPool(1);}Overridepublic boolean tryLock(String requestId) {boolean locked super.tryLock(requestId);if (locked) {startAutoRenewal(requestId);}return locked;}private void startAutoRenewal(String requestId) {scheduler.scheduleAtFixedRate(() - {String script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(pexpire, KEYS[1], ARGV[2]) else return 0 end;jedis.eval(script, Collections.singletonList(lockKey), Arrays.asList(requestId, String.valueOf(lockExpire)));}, lockExpire / 3, lockExpire / 3, TimeUnit.MILLISECONDS);}Overridepublic void unlock(String requestId) {scheduler.shutdown();super.unlock(requestId);}
}2锁重入机制锁重入机制允许持有锁的线程可以再次获取锁而不被阻塞这可以通过记录锁持有者的信息来实现。同一线程可以多次获取锁直到所有业务逻辑执行完毕最后一次调用 unlock 时才真正释放锁。
import java.util.concurrent.ConcurrentHashMap;public class ReentrantRedisLock extends RedisLock {private ConcurrentHashMapString, Integer lockHolderMap new ConcurrentHashMap();public ReentrantRedisLock(Jedis jedis, String lockKey, int lockExpire) {super(jedis, lockKey, lockExpire);}Overridepublic synchronized boolean tryLock(String requestId) {if (lockHolderMap.containsKey(requestId)) {lockHolderMap.put(requestId, lockHolderMap.get(requestId) 1);return true;} else {boolean locked super.tryLock(requestId);if (locked) {lockHolderMap.put(requestId, 1);}return locked;}}Overridepublic synchronized void unlock(String requestId) {if (lockHolderMap.containsKey(requestId)) {int count lockHolderMap.get(requestId);if (count 1) {lockHolderMap.put(requestId, count - 1);} else {lockHolderMap.remove(requestId);super.unlock(requestId);}}}
}3使用 Redisson 库Redisson是Redis客户端提供分布式锁的实现支持自动续期、锁重入等高级特性简化分布式锁的使用。
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;import java.util.concurrent.TimeUnit;public class RedissonLockExample {public static void main(String[] args) {Config config new Config();config.useSingleServer().setAddress(redis://127.0.0.1:6379);RedissonClient redissonClient Redisson.create(config);RLock lock redissonClient.getLock(my_lock);try {if (lock.tryLock(0, 10, TimeUnit.SECONDS)) {try {// 业务逻辑Thread.sleep(6000); // 模拟业务逻辑处理时间} finally {lock.unlock();}} else {System.out.println(获取锁失败);}} catch (InterruptedException e) {e.printStackTrace();} finally {redissonClient.shutdown();}}
}四、Redis不立刻删除已过期数据的原因
比较有印象的经历曾经尝试监听redis key过期来实现延迟消息发送后来发现这个延迟非常不稳定明明key已经过期但还是没有触发后来了解到过期key不会立马删除…这个问题需要对Redis 内存管理有了解下面通过四个问题来考虑吧
4.1 Redis 给缓存数据设置过期时间有什么用
内存是有限且珍贵的如果不对缓存数据设置过期时间那内存占用就会一直增长最终可能会导致 OOM 问题。通过设置合理的过期时间Redis 会自动删除暂时不需要的数据为新的缓存数据腾出空间。
Redis 自带给缓存数据设置过期时间的功能比如
127.0.0.1:6379 expire key 60 # 数据在 60s 后过期
(integer) 1
127.0.0.1:6379 setex key 60 value # 数据在 60s 后过期 (setex:[set] [ex]pire)
OK
127.0.0.1:6379 ttl key # 查看数据还有多久过期
(integer) 56Redis 中除字符串类型是独有设置过期时间的命令 setex 外其他类型都需要依靠 expire 命令来设置过期时间 。另外 persist 命令可以移除一个键的过期时间。
当然除缓解内存的消耗还有其他用途很多时候业务场景就是需要某个数据只在某一时间段内存在比如短信验证码可能只在 1 分钟内有效用户登录的 Token 可能只在 1 天内有效秒杀系统等。
若用传统的数据库来处理的话基本是SQL判断过期更麻烦且性能很差。
4.2 Redis 如何判断数据是否过期
Redis 通过过期字典可看作是 hash 表来保存数据过期的时间。过期字典的键指向 Redis 数据库中的某个 key(键)过期字典的值是一个 long long 类型的整数这个整数保存 key 所指向的数据库键的过期时间毫秒精度的 UNIX 时间戳。 过期字典是存储在 redisDb 这个结构里的
typedef struct redisDb {...dict *dict; //数据库键空间,保存着数据库中所有键值对dict *expires // 过期字典,保存着键的过期时间...
} redisDb;在查询一个 key 的时候Redis 首先检查该 key 是否存在于过期字典中时间复杂度为 O(1)如果不在就直接返回在的话需要判断一下这个 key 是否过期过期直接删除 key 然后返回 null。
4.3 Redis 过期 key 删除策略
如果设置一批 key 只能存活 1 分钟那1分钟后Redis 对这批 key 进行删除就需要遵循删除策略。常用的过期数据的删除策略有四种
1惰性删除只会在取出/查询 key 的时候才对数据进行过期检查。这种方式对CPU最友好但是可能会造成太多过期key没有被删除。
2定期删除周期性地随机从设置过期时间的 key 中抽查一批然后逐个检查这些 key 是否过期过期就删除 key。相比于惰性删除定期删除对内存更友好对 CPU 不太友好。
3延迟队列把设置过期时间的 key 放到一个延迟队列里到期之后就删除 key。这种方式可以保证每个过期 key 都能被删除但维护延迟队列太麻烦队列本身也要占用资源。
4定时删除每个设置过期时间的 key 都会在设置的时间到达时立即被删除。这种方法可以确保内存中不会有过期的键但是它对 CPU 的压力最大因为它需要为每个键都设置一个定时器。Redis 采用的是 定期删除惰性/懒汉式删除 结合的策略这也是大部分缓存框架的选择。定期删除对内存更加友好惰性删除对 CPU 更加友好。两者各有优势结合起来使用既能兼顾 CPU 友好又能兼顾内存友好。
Redis 的定期删除过程是随机的周期性地随机从设置过期时间的 key 中抽查一批也因此并不保证所有过期键都会被立即删除。这也就解释为什么有的 key 已经过期但并没有被删除。并且Redis 底层会通过限制删除操作执行的时长和频率来减少删除操作对 CPU 时间的影响。 另外定期删除还受到执行时间和过期 key 的比例的影响
执行时间已经超过阈值那就中断本次定期删除循环避免使用过多占用CPU 。如果这批过期 key比例超过一个比例就会重复执行此删除流程更积极地清理过期 key。相应地若过期的 key 比例低于这个比例就会中断这次定期删除循环避免做过多的工作而获得很少的内存回收。
查看源码所知Redis7.2版本的执行时间阈值是25ms过期key比例设定值是 10%。
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which we do extra efforts. */每次随机抽查数量在expire.c中定义Redis 7.2 版本为 20 即每次会随机选择 20 个已设置过期时间的 key 判断是否过期。
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */那如何控制定期删除的执行频率
在 Redis 中定期删除的频率是由 hz 参数控制的。hz 默认为 10代表每秒执行 10 次也就是每秒钟进行 10 次尝试来查找并删除过期的 key。hz 的取值范围为 1~500。增大 hz 参数的值会提升定期删除的频率。如果想要更频繁地执行定期删除任务可以适当增加 hz 的值但这会加 CPU 的使用率。根据 Redis 官方建议hz 的值不建议超过100其实使用默认的 10 就已足够。
下面是 hz 参数的官方注释Redis 7.2 版本redis.conf中的重要信息。 类似的参数还有 “dynamic-hz”redis.conf中这个参数开启之后 Redis 就会在 hz 的基础上动态计算一个值。Redis 提供并默认启用使用自适应 hz 值的能力
# 默认为 10
hz 10
# 默认开启
dynamic-hz yes另外hz 参数控制着定期删除过期 key 的定期任务之外还控制一些其他定期任务例如关闭超时的客户端连接、更新统计信息等。
最后定期删除不是把所有过期 key 都删除 若key 数量非常庞大的话挨个遍历检查是非常耗时的会严重影响性能。Redis 设计这种策略的目的就是平衡内存和性能。 而 key 过期之后不立马删除是因为成本太高不太好办到就算使用延迟队列作为删除策略这样存在问题
1队列本身的开销可能很大key 多的情况下一个延迟队列可能无法容纳。
2维护延迟队列太麻烦修改 key 的过期时间就需要调整期在延迟队列中的位置并且还需要引入并发控制。4.4 如何防止大量 key 集中过期
如果存在大量 key 集中过期的问题可能会使 Redis 的请求延迟变高。可选方案
1尽量避免 key 集中过期在设置键的过期时间时尽量随机一点。
2对过期的 key 开启 lazyfree 机制修改 redis.conf 中的 lazyfree-lazy-expire参数即可这样会在后台异步删除过期的 key不会阻塞主线程的运行。五、Redis bigkey处理
若key 对应的 value 所占用的内存比较大那这个 key 就可以看作是 bigkey。
String 类型的 value 超过 1MB
复合类型List、Hash、Set、Sorted Set 等的 value 包含的元素超过 5000 个bigkey会引发客户端超时阻塞、网络阻塞、工作线程阻塞
1.使用 Redis 自带的 --bigkeys 参数来查找
# redis-cli -p 6379 --bigkeys# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type. You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).[00.00%] Biggest string found so far ballcat:oauth:refresh_auth:f6cdb384-9a9d-4f2f-af01-dc3f28057c20 with 4437 bytes
[00.00%] Biggest list found so far my-list with 17 items-------- summary -------Sampled 5 keys in the keyspace!
Total key length in bytes is 264 (avg len 52.80)Biggest list found my-list has 17 items
Biggest string found ballcat:oauth:refresh_auth:f6cdb384-9a9d-4f2f-af01-dc3f28057c20 has 4437 bytes1 lists with 17 items (20.00% of keys, avg size 17.00)
0 hashs with 0 fields (00.00% of keys, avg size 0.00)
4 strings with 4831 bytes (80.00% of keys, avg size 1207.75)
0 streams with 0 entries (00.00% of keys, avg size 0.00)
0 sets with 0 members (00.00% of keys, avg size 0.00)
0 zsets with 0 members (00.00% of keys, avg size 0.002.使用 Redis 自带的 SCAN 命令 3.借助开源工具分析 RDB 文件 4.借助公有云的 Redis 分析服务
处理方案
(1)分割 bigkey将一个 bigkey 分割为多个小 key。例如将一个含有上万字段数量的 Hash 按照一定策略比如二次哈希拆分为多个 Hash。
(2)手动清理Redis 4.0 可以使用 UNLINK 命令来异步删除一个或多个指定的 key。Redis 4.0 以下可以考虑使用 SCAN 命令结合 DEL 命令来分批次删除。
(3)采用合适的数据结构例如文件二进制数据不使用 String 保存、使用 HyperLogLog 统计页面 UV、Bitmap 保存状态信息0/1。
(4)开启 lazy-free惰性删除/延迟释放 lazy-free 特性是 Redis 4.0 开始引入的指的是让 Redis 采用异步方式延迟释放 key 使用的内存将该操作交给单独的子线程处理避免阻塞主线程。六、Redis持久化机制
使用缓存的时候需要对内存中的数据进行持久化也就是将内存中的数据写入到硬盘中。主要是为之后重用数据比如重启机器、机器故障之后恢复数据或者是为做数据同步比如 Redis 集群的主从节点通过 RDB 文件同步数据。 Redis 支持持久化方式
快照snapshottingRDB
只追加文件append-only file, AOF
RDB和AOF混合持久化(Redis4.0新增通过配置项 aof-use-rdb-preamble 开启AOF 重写的时候就直接把 RDB 的内容写到 AOF 文件开头) 6.1 RDB持久化
通过创建快照来获得存储在内存里面的数据在 某个时间点 上的副本然后将快照复制到其他服务器从而创建具有相同数据的服务器副本Redis 主从结构主要用来提高 Redis 性能也可将快照留存本地以便重启服务器时使用。
快照持久化是 Redis 默认采用的持久化方式在 redis.conf 配置文件中配置
save 900 1 #在900秒(15分钟)之后如果至少有1个key发生变化Redis就会自动触发bgsave命令创建快照。
save 300 10 #在300秒(5分钟)之后如果至少有10个key发生变化Redis就会自动触发bgsave命令创建快照。
save 60 10000 #在60秒(1分钟)之后如果至少有10000个key发生变化Redis就会自动触发bgsave命令创建快照。Redis 提供两个命令来生成 RDB 快照文件
save : 同步保存操作会阻塞 Redis 主线程(Redis启动后是通过单线程的方式工作)
bgsave : fork 出一个子进程子进程执行不会阻塞 Redis 主线程默认选项。6.2 AOF持久化
AOF持久化的实时性比RDB好通过 appendonly 参数开启
appendonly yes开启 AOF 持久化后每执行一条会更改 Redis 中的数据的命令Redis 就会将该命令写入到 AOF 缓冲区 server.aof_buf 中然后再写入到 AOF 文件中此时还在系统内核缓存区未同步到磁盘依然存在数据丢失的风险最后再根据持久化方式 fsync策略的配置来决定何时将系统内核缓存区的数据同步到硬盘中完成持久化保存。
AOF 文件的保存位置和 RDB 文件的位置相同都通过 dir 参数设置默认的文件名是 appendonly.aof。
6.3 AOF工作流程
AOF 持久化功能的实现可以简单分为 5 步
1命令追加append所有的写命令会追加到 AOF 缓冲区中。
2文件写入write将 AOF 缓冲区的数据写入到 AOF 文件中。这一步需要调用write函数系统调用write将数据写入系统内核缓冲区之后直接返回延迟写。注意此时并没有同步到磁盘。
3文件同步fsyncAOF 缓冲区根据对应的持久化方式 fsync 策略向硬盘做同步操作。这一步需要调用 fsync 函数系统调用 fsync 针对单个文件操作对其进行强制硬盘同步fsync 将阻塞直到写入磁盘完成后返回保证了数据持久化。
4文件重写rewrite随着 AOF 文件越来越大需要定期对 AOF 文件进行重写达到压缩的目的。
5重启加载load当 Redis 重启时可以加载 AOF 文件进行数据恢复。名词解释
1系统调用syscallLinux系统直接提供用于对文件和设备进行访问和控制的函数
2write写入系统内核缓冲区之后直接返回仅是写到缓冲区不会立即同步到硬盘。虽然提高效率但也有数据丢失的风险。同步硬盘操作通常依赖于系统调度机制Linux 内核通常为 30s 同步一次具体值取决于写出的数据量和 I/O 缓冲区的状态。
3fsyncfsync用于强制刷新系统内核缓冲区同步到到磁盘确保写磁盘操作结束才会返回。AOF 工作流程表示为
6.4 AOF持久化方式
Redis的配置文件中存在三种不同的 AOF 持久化方式 fsync策略主要区别在于 fsync 同步 AOF 文件的时机刷盘。
1appendfsync always主线程调用 write 执行写操作后后台线程aof_fsync线程立即调用fsyn 函数同步AOF文件刷盘fsync完成后线程返回这样会严重降低Redis的性能writefsync.2appendfsync everysec主线程调用write执行写操作后立即返回由后台线程aof_fsync线程每秒钟调用fsync函数系统调用同步一次 AOF文件writefsyncfsync间隔为1秒3appendfsync no主线程调用write执行写操作后立即返回让操作系统决定何时进行同步Linux下一般为30秒一次write但不fsyncfsync的时机由操作系统决定因此要兼顾数据和写入性能可以考虑 appendfsync everysec 策略 让Redis每秒同步一次 AOF 文件Redis 性能受到的影响较小。而且这样即使出现系统崩溃最多只会丢失一秒之内产生的数据。当硬盘忙于执行写入操作时Redis 还会适当放慢速度以适应硬盘的最大写入速度。
ps有个了解知识点详情参考《Redis 7.0 Multi Part AOF 的设计和实现》
从Redis7.0开始Redis使用Multi Part AOF机制将原来的单个AOF文件拆分成多个AOF文件。在Multi Part AOF中AOF文件被分为三种类型分别为
1BASE表示基础AOF文件一般由子进程通过重写产生该文件最多只有一个。
2INCR表示增量AOF文件一般会在AOFRW开始执行时被创建该文件可能存在多个。
3HISTORY表示历史AOF文件它由BASE和INCR AOF整合每次AOFRW成功完成时本次 AOFRW 前对应的BASE和INCR AOF 将变为HISTORYHISTORY类型的AOF会被Redis自动删除。相关 issueRedis 的 AOF 方式 #783
6.5 AOF日志记录
关系型数据库如 MySQL通常都是执行命令之前记录日志方便故障恢复而 Redis AOF 持久化机制是在执行完命令之后再记录日志。 后记录日志的原因
避免额外的检查开销AOF记录日志不会对命令进行语法检查
在命令执行完之后再记录不会阻塞当前的命令执行。当然前面提到过风险
如果刚执行完命令 Redis 就宕机会导致对应的修改丢失
可能会阻塞后续其他命令的执行AOF 记录日志是在 Redis 主线程中进行的。6.6 AOF重写
当AOF存储量太大时Redis能够在后台自动重写AOF产生一个新AOF文件这个新AOF文件和原本AOF文件所保存的数据库状态一样但体积更小。
Redis 将AOF重写程序放到子进程里执行避免大量的写入操作对Redis正常处理命令请求造成影响这个重写操作是redis通过读取数据库中的键值对来实现的与Java描述的重写不同。 AOF文件重写期间Redis还会维护一个AO 重写缓冲区该缓冲区会在子进程创建新 AOF文件期间记录服务器执行的所有写命令。当子进程完成创建新AOF文件的工作之后服务器会将重写缓冲区中的所有内容追加到新AOF文件末尾使得新AOF件保存的数据库状态与现有的数据库状态一致。最后服务器用新AOF文件替换旧的AOF文件来完成AOF文件重写操作。
开启AOF重写功能可以调用BGREWRITEAOF命令手动执行也可设置两个配置项让程序自动决定触发时机
auto-aof-rewrite-min-size如果 AOF 文件大小小于该值则不会触发 AOF 重写。默认值为 64 MB;
auto-aof-rewrite-percentage执行 AOF 重写时当前 AOF 大小aof_current_size和上一次重写时 AOF 大小aof_base_size的比值。如果当前 AOF 文件大小增加了这个百分比值将触发 AOF 重写。将此值设置为 0 将禁用自动 AOF 重写。默认值为 100。AOF重写机制优化改进可参考《从 Redis7.0 发布看 Redis 的过去与未来》
6.7 AOF校验机制
AOF校验机制是Redis在启动时对AOF文件进行检查判断文件是否完整是否有损坏或者丢失的数据。通过使用校验和checksum 对整个 AOF 文件内容进行 CRC64 算法计算得出的数字来验证 AOF 文件。如果文件内容发生变化那么校验和也会随之改变。因此Redis 在启动时会比较计算出的校验和与文件末尾保存的校验和计算的时候会把最后一行保存校验和的内容忽略从而判断 AOF 文件是否完整。若发现文件有问题Redis就会拒绝启动并提供相应的错误信息。AOF校验机制简单有效提高 Redis 数据的可靠性。
类似地RDB 文件也有类似的校验机制来保证 RDB 文件的正确性。
6.8 如何选择 RDB 和 AOF
1Redis保存的数据丢失一些也没什么影响的话可以选择使用RDB。
2不建议单独使用AOF因为随时创建一个RDB快照可以进行数据库备份、更快的重启以及解决AOF引擎错误。
3如果保存的数据要求安全性比较高的话建议同时开启RDB和AOF两种持久化或者开启RDB和AOF混合持久化。其实了解计算机底层原理的都知道宕机数据丢失无法避免要么是以性能很差的方式来解决。没有完美的方案只有均衡的方案。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/88627.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!