一文搞懂Redis
作者: 京東物流 劉麗俠 姚再毅 康睿 劉斌 李振
一、Redis的特性
1.1 Redis為什麼快?
- 基於內存操作,操作不需要跟磁盤交互,單次執行很快
- 命令執行是單線程,因為是基於內存操作,單次執行的時間快於線程切換時間,同時通信採用多路複用
- Redis本身就是一個k-v結構,類似於hashMap,所以查詢性能接近於O(1)
- 同時redis自己底層數據結構支持,比如跳錶、SDS
- lO多路複用,單個線程中通過記錄跟蹤每一個sock(I/O流)的狀態來管理多個I/O流
1.2 Redis其他特性
- 更豐富的數據類型,雖然都是k、v結構,value可以存儲很多的數據類型
- 完善的內存管理機制、保證數據一致性:持久化機制、過期策略
- 支持多種編程語言
- 高可用,集羣、保證高可用
1.3 Redis高可用
- 很完善的內存管理機制,過期、淘汰、持久化
- 集羣模式,主從、哨兵、cluster集羣
二、Redis數據類型以及使用場景
Redis的數據類型有String、Hash、Set、List、Zset、bitMap(基於String類型)、 Hyperloglog(基於String類型)、Geo(地理位置)、Streams流。
2.1 String
2.1.1 基本指令
``` // 批量設置
mset key1 value1 key2 value2 // 批量獲取 mget key1 key2 // 獲取長度 strlen key
// 字符串追加內容 append key xxx // 獲取指定區間的字符 getrange key 0 5 // 整數值遞增 (遞增指定的值) incr intkey (incrby intkey 10) // 當key 存在時將覆蓋 SETEX key seconds value // 將 key 的值設為 value ,當且僅當 key 不存在。 SETNX key value ```
2.1.2 應用場景
- 緩存相關場景 緩存、 token(跟過期屬性完美契合)
- 線程安全的計數場景 (軟限流、分佈式ID等)
2.2 Hash
2.2.1 基本指令
``` // 將哈希表 key 中的域 field 的值設為 value 。
HSET key field value // 返回哈希表 key 中給定域 field 的值。 HGET key field // 返回哈希表 key 中的所有域。 HKEYS key // 返回哈希表 key 中所有域的值。 HVALS key // 為哈希表 key 中的域 field 的值加上增量 increment 。 HINCRBY key field increment // 查看哈希表 key 中,給定域 field 是否存在。 HEXISTS key field ```
2.2.2 應用場景
- 存儲對象類的數據(官網説的)比如一個對象下有多個字段
- 統計類的數據,可以對單個統計數據進行單獨操作
2.3 List
存儲有序的字符串列表,元素可以重複。列表的最大長度為 2^32 - 1 個元素(4294967295,每個列表超過 40 億個元素)。
2.3.1 基本指令
``` // 將一個或多個值 value 插入到列表 key 的表頭
LPUSH key value [value ...] // 將一個或多個值 value 插入到列表 key 的表尾(最右邊)。 RPUSH key value [value ...] // 移除並返回列表 key 的頭元素。 LPOP key // 移除並返回列表 key 的尾元素。 RPOP key // BLPOP 是列表的阻塞式(blocking)彈出原語。 BLPOP key [key ...] timeout // BRPOP 是列表的阻塞式(blocking)彈出原語。 BRPOP key [key ...] timeout // 獲取指點位置元素 LINDEX key index ```
2.3.2 應用場景
- 用户消息時間線
因為list是有序的,所以我們可以先進先出,先進後出的列表都可以做
2.4 Set
2.4.1 基本指令
``` // 將一個或多個 member 元素加入到集合 key 當中,已經存在於集合的 member 元素將被忽略。
SADD key member [member ...] // 返回集合 key 中的所有成員。 SMEMBERS key // 返回集合 key 的基數(集合中元素的數量)。 SCARD key // 如果命令執行時,只提供了 key 參數,那麼返回集合中的一個隨機元素。 SRANDMEMBER key [count] // 移除並返回集合中的一個隨機元素。 SPOP key // 移除集合 key 中的一個或多個 member 元素,不存在的 member 元素會被忽略。 SREM key member [member ...] // 判斷 member 元素是否集合 key 的成員。 SISMEMBER key member // 獲取前一個集合有而後面1個集合沒有的 sdiff huihuiset huihuiset1 // 獲取交集 sinter huihuiset huihuiset1 // 獲取並集 sunion huihuiset huihuiset1 ```
2.4.2 應用場景
- 抽獎 spop跟srandmember隨機彈出或者獲取元素
- 點贊、簽到等,sadd集合存儲
- 交集並集 關注等場景
2.5 ZSet(SortedSet)
sorted set,有序的set,每個元素有個score。
score相同時,按照key的ASCII碼排序。
2.5.1 基本指令
``` //將一個或多個 member 元素及其 score 值加入到有序集 key 當中。
ZADD key score member [[score member] [score member] ...] // 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞增(從小到大)來排序 ZRANGE key start stop [WITHSCORES] // 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞減(從大到小)來排列。 ZREVRANGE key start stop [WITHSCORES] // 返回有序集 key 中,所有 score 值介於 min 和 max 之間(包括等於 min 或 max )的成員。有序集成員按 score 值遞增(從小到大)次序排列。 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] // 移除有序集 key 中的一個或多個成員,不存在的成員將被忽略。 ZREM key member [member ...] // 返回有序集 key 的基數。 ZCARD key // 為有序集 key 的成員 member 的 score 值加上增量 increment 。 ZINCRBY key increment member // 返回有序集 key 中, score 值在 min 和 max 之間(默認包括 score 值等於 min 或 max )的成員的數量。 ZCOUNT key min max // 返回有序集 key 中成員 member 的排名。其中有序集成員按 score 值遞增(從小到大)順序排列。 ZRANK key member ```
2.5.2 應用場景
- 排行榜
三、Redis的事務
3.1 事務基本操作
// 1. multi命令開啟事務,exec命令提交事務
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
// 2. 組隊的過程中可以通過discard來放棄組隊。
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> set k4 v4
QUEUED
127.0.0.1:6379(TX)> discard
OK
3.2 Redis事務特性
- 單獨的隔離操作
事務中的所有命令都會序列化、按順序地執行。
事務在執行過程中,不會被其他客户端發送來的命令請求所打斷。
- 沒有隔離級別的概念
隊列中的命令沒有提交之前,都不會被實際地執行,因為事務提交之前任何指令都不會被實際執行,也就不存在“事務內的查詢要看到事務裏的更新,在事務外查詢不能看到”這個讓人萬分頭疼的問題。
- 不保證原子性
Redis同一個事務中如果有一條命令執行失敗,其後的命令仍然會被執行,沒有回滾。
四、Redis 分佈式鎖
4.1 Lua 實現分佈式鎖
local lockSet = redis.call('exists', KEYS[1])
if lockSet == 0
then
redis.call('set', KEYS[1], ARG[1])
redis.call('expire', KEYS[1], ARG[2])
end
return lockSet
五、Redis總體數據結構
5.1 Redis dict字典
5.1.1 RedisDb數據接口(server.h文件)
數據最外層的結構為RedisDb
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */ //數據庫的鍵
dict *expires; /* Timeout of keys with a timeout set */ //設置了超時時間的鍵
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //客户端等待的keys
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */ //所在數 據庫ID
long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,僅用於統計
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
5.1.2 dict數據結構(dict.h文件)
我們發現數據存儲在dict中
typedef struct dict {
dictType *type; //理解為面向對象思想,為支持不同的數據類型對應dictType抽象方法,不同的數據類型可以不同實現
void *privdata; //也可不同的數據類型相關,不同類型特定函數的可選參數。
dictht ht[2]; //2個hash表,用來數據存儲 2個dictht
long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash標記 -1代表不再rehash
unsigned long iterators; /* number of iterators currently running */
} dict;
5.1.3 dictht結構(dict.h文件)
typedef struct dictht {
dictEntry **table; //dictEntry 數組
unsigned long size; //數組大小 默認為4 #define DICT_HT_INITIAL_SIZE 4
unsigned long sizemask; //size-1 用來取模得到數據的下標值
unsigned long used; //改hash表中已有的節點數據
} dictht;
5.1.4 dictEntry節點結構(dict.h文件)
typedef struct dictEntry {
void *key; //key
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; //value
struct dictEntry *next; //指向下一個節點
} dictEntry;
5.1.5 RedisObject(service.h文件)
``` / Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. /
define OBJ_ENCODING_RAW 0 / Raw representation /
define OBJ_ENCODING_INT 1 / Encoded as integer /
define OBJ_ENCODING_HT 2 / Encoded as hash table /
define OBJ_ENCODING_ZIPMAP 3 / Encoded as zipmap /
define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old
list encoding. */
define OBJ_ENCODING_ZIPLIST 5 / Encoded as ziplist /
define OBJ_ENCODING_INTSET 6 / Encoded as intset /
define OBJ_ENCODING_SKIPLIST 7 / Encoded as skiplist /
define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string
encoding */
define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list
of ziplists */
define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree
of listpacks */
define LRU_BITS 24
define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of
obj->lru */
define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution
in ms */
define OBJ_SHARED_REFCOUNT INT_MAX /* Global object
never destroyed. */
define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object
allocated in the stack. */
define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
typedef struct redisObject { unsigned type:4; //數據類型 string hash list unsigned encoding:4; //底層的數據結構 跳錶 unsigned lru:LRU_BITS; / LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). / int refcount; //用於回收,引用計數法 void *ptr; //指向具體的數據結構的內存地址 比如 跳錶、壓縮列表 } robj; ```
5.2 Redis數據結構圖
5.3 Redis擴容機制
因為我們的dictEntry數組默認大小是4,如果不進行擴容,那麼我們所有的數據都會以鏈表的形式添加至數組下標 隨着數據量越來越大,之前只需要hash取模就能得到下標位置,現在得去循環我下標的鏈表,所以性能會越來越慢,當我們的數據量達到一定程度後,我們就得去觸發擴容操作。
5.3.1 什麼時候擴容
5.3.1.1 擴容機制
- 當沒有fork子進程在進行RDB或AOF持久化時,ht[0]的used大於size時,觸發擴容
- 如果有子進程在進行RDB或者AOF時,ht[0]的used大於等於size的5倍的時候,會觸發擴容
5.3.1.2 源碼驗證
擴容,肯定是在添加數據的時候才會擴容,所以我們找一個添加數據的入口。
- 入口,添加或替換dictReplace (dict.c文件)
int dictReplace(dict *d, void *key, void *val) {
dictEntry *entry, *existing, auxentry;
/* Try to add the element. If the key
* does not exists dictAdd will succeed. */
entry = dictAddRaw(d,key,&existing);
if (entry) {
dictSetVal(d, entry, val);
return 1;
}
/* Set the new value and free the old one. Note that
it is important
* to do that in this order, as the value may just be
exactly the same
* as the previous one. In this context, think to
reference counting,
* you want to increment (set), and then decrement
(free), and not the
* reverse. */
auxentry = *existing;
dictSetVal(d, existing, val);
dictFreeVal(d, &auxentry);
return 0;
}
- 進入dictAddRaw方法 (dict.c文件)
``` dictEntry dictAddRaw(dict d, void key, dictEntry existing){ long index; dictEntry entry; dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d); //如果正在Rehash 進行漸進式hash 按步rehash
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that
in a database
* system it is more likely that recently added
entries are accessed
* more frequently. */
//如果在hash 放在ht[1] 否則放在ht[0]
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
//採用頭插法插入hash桶
entry->next = ht->table[index];
ht->table[index] = entry;
//已使用++
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
} ```
- 得到數據下標的方法 _dictKeyIndex (dict.c文件)
``` static long _dictKeyIndex(dict d, const void key,uint64_t hash, dictEntry existing){ unsigned long idx, table; dictEntry he; if (existing) existing = NULL;
/* Expand the hash table if needed */
//擴容如果需要
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
//比較是否存在這個值
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain
the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key,he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return idx;
} ```
- _dictExpandIfNeeded 驗證是否需要擴容 (dict.c文件)
``` / Expand the hash table if needed / static int _dictExpandIfNeeded(dict d){ / Incremental rehashing already in progress. Return.*/ if (dictIsRehashing(d)) return DICT_OK; //正在rehash,返回
/* If the hash table is empty expand it to the initialsize. */
//如果ht[0]的長度為0,設置默認長度4 dictExpand為擴容的關鍵方法
if (d->ht[0].size == 0)
return dictExpand(d,DICT_HT_INITIAL_SIZE);
//擴容條件 hash表中已使用的數量大於等於 hash表的大小
//並且dict_can_resize為1 或者 已使用的大小大於hash表大小的 5倍,大於等於1倍的時候,下面2個滿足一個條件即可
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){
//擴容成原來的2倍
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
} ```
5.3.2 怎麼擴容
5.3.2.1 擴容步驟
- 當滿足我擴容條件,觸發擴容時,判斷是否在擴容,如果在擴容,或者 擴容的大小跟我現在的ht[0].size一樣,這次擴容不做
- new一個新的dictht,大小為ht[0].used * 2(但是必須向上2的冪,比 如6 ,那麼大小為8) ,並且ht[1]=新創建的dictht
- 我們有個更大的table了,但是需要把數據遷移到ht[1].table ,所以將 dict的rehashidx(數據遷移的偏移量)賦值為0 ,代表可以進行數據遷 移了,也就是可以rehash了
- 等待數據遷移完成,數據不會馬上遷移,而是採用漸進式rehash,慢慢的把數據遷移到ht[1]
- 當數據遷移完成,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0
- 把dict的rehashidex=-1
5.3.2.2 源碼驗證
- dictExpand方法在_dictExpandIfNeeded 方法中調用 (dict.c文件),為擴容的關鍵方法
``` int dictExpand(dict d, unsigned long size){ / the size is invalid if it is smaller than the number of * elements already inside the hash table */ //正在擴容,不允許第二次擴容,或者ht[0]的數據量大於擴容後的數量,沒有意義,這次不擴容 if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR;
dictht n; /* the new hash table */
//得到最接近2的冪 跟hashMap思想一樣
unsigned long realsize = _dictNextPower(size);
/* Rehashing to the same table size is not useful. */
//如果跟ht[0]的大小一致 直接返回錯誤
if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all
pointers to NULL */
//為新的dictht賦值
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not
really a rehashing
* we just set the first hash table so that it can
accept keys. */
//ht[0].table為空,代表是初始化
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
//將擴容後的dictht賦值給ht[1]
d->ht[1] = n;
//標記為0,代表可以開始rehash
d->rehashidx = 0;
return DICT_OK;
} ```
5.3.3 數據怎麼遷移(漸進式hash)
5.3.3.1 遷移方式
假如一次性把數據全部遷移會很耗時間,會讓單條指令等待很久,會形成阻塞。
所以,redis採用漸進式Rehash,所謂漸進式,就是慢慢的,不會一次性把所有數據遷移。
那什麼時候會進行漸進式數據遷移?
- 每次進行key的crud操作都會進行一個hash桶的數據遷移
- 定時任務,進行部分數據遷移
5.3.3.2 源碼驗證
CRUD觸發都會觸發_dictRehashStep(漸進式hash)
- 每次增刪改的時候都會調用_dictRehashStep方法
if (dictIsRehashing(d)) _dictRehashStep(d); //這個代碼在增刪改查的方法中都會調用
- _dictRehashStep方法
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
- dictRehash方法
``` int dictRehash(dict d, int n) { int empty_visits = n10; / Max number of empty buckets to visit. / //要訪問的最大的空桶數 防止此次耗時過長
if (!dictIsRehashing(d)) return 0; //如果沒有在rehash返回0
//循環,最多1次,並且ht[0]的數據沒有遷移完 進入循環
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are
sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
//rehashidx rehash的索引,默認0 如果hash桶為空 進入自旋 並且判斷是否到了最大的訪問空桶數
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下標為rehashidx桶
/* Move all the keys in this bucket from the old
to the new hash HT */
while(de) { //循環hash桶的鏈表 並且轉移到ht[1]
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d-
>ht[1].sizemask;
de->next = d->ht[1].table[h]; //頭插
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
//轉移完後 將ht[0]相對的hash桶設置為null
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
//ht[0].used=0 代表rehash全部完成
if (d->ht[0].used == 0) {
//清空ht[0]table
zfree(d->ht[0].table);
//將ht[0] 重新指向已經完成rehash的ht[1]
d->ht[0] = d->ht[1];
//將ht[1]所有數據重新設置
_dictReset(&d->ht[1]);
//設置-1,代表rehash完成
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
//將ht[1]的屬性復位 static void _dictReset(dictht *ht) { ht->table = NULL; ht->size = 0; ht->sizemask = 0; ht->used = 0; } ```
定時任務觸發
再講定時任務之前,我們要知道Redis中有個時間事件驅動,在 server.c文件下有個serverCron 方法。
serverCron 每隔多久會執行一次,執行頻率根據redis.conf中的hz配置,serverCorn中有個方法databasesCron()
- 定時方法serverCron
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//.......
/* We need to do a few operations on clients
asynchronously. */
clientsCron();
/* Handle background operations on Redis databases. */
databasesCron(); //處理Redis數據庫上的後台操作。
//.......
}
- databasesCron方法
void databasesCron(void) {
/* Expire keys by random sampling. Not required for
slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/* Defrag keys gradually. */
activeDefragCycle();
/* Perform hash tables rehashing if needed, but only
if there are no
* other processes saving the DB on disk. Otherwise
rehashing is bad
* as will cause a lot of copy-on-write of memory
pages. */
if (!hasActiveChildProcess()) {
/* We use global counters so if we stop the
computation at a given
* DB we'll be able to start from the successive
in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum)
dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */ //Rehash邏輯
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
//進入incrementallyRehash
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop
here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash,
we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
- 進入incrementallyRehash方法(server.c)
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1;
/* already used our millisecond for this
loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this
loop... */
}
return 0;
}
- 進入dictRehashMilliseconds(dict.c)
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
//進入rehash方法 dictRehash 去完成漸進時HASH
while(dictRehash(d,100)) {
rehashes += 100;
//判斷是否超時
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
5.4 String類型數據結構
Redis中String的底層沒有用c的char來實現,而是採用了SDS(simple Dynamic String)的數據結構來實現。並且提供了5種不同的類型
5.4.1 SDS數據結構定義(sds.h文件)
``` typedef char sds; / Note: sdshdr5 is never used, we just access the flags byte directly. * However is here to document the layout of type 5 SDS strings. / struct attribute ((packed)) sdshdr5 { unsigned char flags; / 3 lsb of type, and 5 msb of string length */ char buf[]; }
struct attribute ((packed)) sdshdr8 { uint8_t len; / used / //已使用的長度 uint8_t alloc; / excluding the header and null terminator / //分配的總容量 不包含頭和空終止符 unsigned char flags; / 3 lsb of type, 5 unused bits / //低三位類型 高5bit未使用 char buf[]; //數據buf 存儲字符串數組 };
struct attribute ((packed)) sdshdr16 { uint16_t len; / used / uint16_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; };
struct attribute ((packed)) sdshdr32 { uint32_t len; / used / uint32_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; };
struct attribute ((packed)) sdshdr64 { uint64_t len; / used / uint64_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; }; ```
5.4.2 SDS數據結構的優勢
- char字符串不記錄自身長度,所以獲取一個字符串長度的複雜度是O(n),但是SDS記錄分配的長度alloc,已使用的長度len,獲取長度為 O(1)
- 減少修改字符串帶來的內存重分配次數
- 空間預分配,SDS長度如果小於1MB,預分配跟長度一樣的,大於1M,每次跟len的大小多1M
- 惰性空間釋放,截取的時候不馬上釋放空間,供下次使用!同時提供相應的釋放SDS未使用空間的API
- 二進制安全
5.5 Hash類型數據結構
Redis的字典相當於Java語言中的HashMap,他是無序字典。內部結構上同樣是數組 + 鏈表二維結構。第一維hash的數組位置碰撞時,就會將碰撞的元素使用鏈表串起來。
5.5.1 Hash數據結構圖
5.5.2 Hash的壓縮列表
壓縮列表會根據存入的數據的不同類型以及不同大小,分配不同大小的空間。所以是為了節省內存而採用的。
因為是一塊完整的內存空間,當裏面的元素髮生變更時,會產生連鎖更新,嚴重影響我們的訪問性能。所以,只適用於數據量比較小的場景。
所以,Redis會有相關配置,Hashes只有小數據量的時候才會用到ziplist,當hash對象同時滿足以下兩個條件的時候,使用ziplist編碼:
- 哈希對象保存的鍵值對數量<512個
- 所有的鍵值對的鍵和值的字符串長度都<64byte(一個英文字母一個字節)
hash-max-ziplist-value 64 // ziplist中最大能存放的值長度
hash-max-ziplist-entries 512 // ziplist中最多能存放的entry節點數量
5.6 List類型數據結構
5.6.1 quickList快速列表
兼顧了ziplist的節省內存,並且一定程度上解決了連鎖更新的問題,我們的 quicklistNode節點裏面是個ziplist,每個節點是分開的。那麼就算髮生了連鎖更新,也只會發生在一個quicklistNode節點。
quicklist.h文件
typedef struct{
struct quicklistNode *prev; //前指針
struct quicklistNode *next; //後指針
unsigned char *zl; //數據指針 指向ziplist結果
unsigned int sz; //ziplist大小 /* ziplist
size in bytes */
unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素
unsigned int encoding : 2; /* RAW==1 or LZF==2 */ //
是否壓縮, 1沒有壓縮 2 lzf壓縮
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //預留容器字段
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */ //預留字段
} quicklistNode;
5.6.2 list數據結構圖
5.7 Set類型數據結構
Redis用intset或hashtable存儲set。滿足下面條件,就用inset存儲。
- 如果不是整數類型,就用dictht hash表(數組+鏈表)
- 如果元素個數超過512個,也會用hashtable存儲。跟一個配置有關:
set-max-intset-entries 512
不滿足上述條件的,都使用hash表存儲,value存null。
5.8 ZSet數據結構
5.8.1 ZipList
默認使用ziplist編碼。
在ziplist的內部,按照score排序遞增來存儲。插入的時候要移動之後的數據。
如果元素數量大於等於128個,或者任一member長度大於等於64字節使用 skiplist+dict存儲。
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
如果不滿足條件,採用跳錶。
5.8.2 跳錶(skiplist)
結構定義(servier.h)
``` / ZSETs use a specialized version of Skiplists / typedef struct zskiplistNode { sds ele; //sds數據 double score; //score struct zskiplistNode backward; //後退指針 //層級數組 struct zskiplistLevel { struct zskiplistNode forward; //前進指針 unsigned long span; //跨度 } level[]; } zskiplistNode;
//跳錶列表 typedef struct zskiplist { struct zskiplistNode header, tail; //頭尾節點 unsigned long length; //節點數量 int level; //最大的節點層級 } zskiplist; ```
zslInsert 添加節點方法 (t_zset.c)
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds
ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert
position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
咕泡出品 必屬精品精品屬精品 咕泡出品 必屬精品 咕泡出品 必屬精品 咕泡咕泡
(x->level[i].forward->score == score
&&
sdscmp(x->level[i].forward->ele,ele) <
0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since
we allow duplicated
* scores, reinserting the same element should never
happen since the
* caller of zslInsert() should test in the hash table
if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
//不同層級建立鏈表聯繫
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is
inserted here */
x->level[i].span = update[i]->level[i].span -
(rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) +
1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL :
update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
ZSKIPLIST_MAXLEVEL默認32 定義在server.h
```
define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64
elements */ ```
六、Redis過期策略
6.1 惰性過期
所謂惰性過期,就是在每次訪問操作key的時候,判斷這個key是不是過期了,過期了就刪除。
6.1.1 源碼驗證
- expireIfNeeded方法(db.c文件)
int expireIfNeeded(redisDb *db, robj *key) {
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a slave,
instead of
* evicting the expired key from the database, we
return ASAP:
* the slave key expiration is controlled by the
master that will
* send us synthesized DEL operations for expired
keys.
*
* Still we try to return the right information to the
caller,
* that is, 0 if we think the key should be still
valid, 1 if
* we think the key is expired at this time. */
// 如果配置有masterhost,説明是從節點,那麼不操作刪除
if (server.masterhost != NULL) return 1;
/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
//是否是異步刪除 防止單個Key的數據量很大 阻塞主線程 是4.0之後添加的新功能,默認關閉
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
}
每次調用到相關指令時,才會執行expireIfNeeded判斷是否過期,平時不會去判斷是否過期。
該策略可以最大化的節省CPU資源。
但是卻對內存非常不友好。因為如果沒有再次訪問,就可能一直堆積再內存中。佔用內存
所以就需要另一種策略來配合使用,就是定期過期
6.2 定期過期
那麼多久去清除一次,我們在講Rehash的時候,有個方法是serverCron,執行頻率根據redis.conf中的hz配置。
這方法除了做Rehash以外,還會做很多其他的事情,比如:
- 清理數據庫中的過期鍵值對
- 更新服務器的各類統計信息,比如時間、內存佔用、數據庫佔用情況等
- 關閉和清理連接失效的客户端
- 嘗試進行持久化操作
6.2.1 實現流程
- 定時serverCron方法去執行清理,執行頻率根據redis.cong中的hz配置的值(默認是10,也就是1s執行10次,100ms執行一次)
- 執行清理的時候,不是去掃描所有的key,而是去掃描所有設置了過期時間的key redisDB.expires
- 如果每次去把所有的key都拿出來,那麼假如過期的key很多,就會很慢,所以也不是一次性拿出所有的key
- 根據hash桶的維度去掃描key,掃到20(可配置)個key為止。假如第一個桶是15個key沒有滿足20,繼續掃描第二個桶,第二個桶20個key,由於是以hash桶的維度掃描的,所以第二個掃到了就會全掃,總共掃描35個key
- 找到掃描的key裏面過期的key,並進行刪除
- 如果取了400個空桶(最多拿400個桶),或者掃描的刪除比例跟掃描的總數超過10%,繼續執行4、5步
- 也不能無限的循環,循環16次後回去檢測時間,超過指定時間會跳出
6.2.2 源碼驗證
- 入口serverCrom
``` // serverCron方法調用databasesCron方法(server.c) / Handle background operations on Redis databases. / databasesCron();
void databasesCron(void) { / Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. / if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); //過期刪除方法 } else { expireSlaveKeys(); } } } ```
七、Redis淘汰策略
由於Redis內存是有大小的,並且我可能裏面的數據都沒有過期,當快滿的時候,我又沒有過期的數據進行淘汰,那麼這個時候內存也會滿。內存滿了,Redis也會不能放入新的數據。所以,我們不得已需要一些策略來解決這個問題來保證可用性。
7.1 淘汰策略
7.1.1 noeviction
New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database.
默認,不淘汰 能讀不能寫
7.1.2 allkeys-lru
Keeps most recently used keys; removes least recently used (LRU) keys
基於偽LRU算法,在所有的key中去淘汰
7.1.3 allkeys-lfu
Keeps frequently used keys; removes least frequently used (LFU) keys.
基於偽LFU算法,在所有的key中去淘汰
7.1.4 volatile-lru
Removes least recently used keys with the expire field set to true.
基於偽LRU算法,在設置了過期時間的key中去淘汰
7.1.5 volatile-lfu
Removes least frequently used keys with the expire field set to true.
基於偽LFU算法 在設置了過期時間的key中去淘汰
7.1.6 allkeys-random
Randomly removes keys to make space for the new data added.
基於隨機算法,在所有的key中去淘汰
7.1.7 volatile-random
Randomly removes keys with expire field set to true.
基於隨機算法 在設置了過期時間的key中去淘汰
7.1.8 volatile-ttl
Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value.
根據過期時間來,淘汰即將過期的
7.2 淘汰流程
- 首先,我們會有個淘汰池,默認大小是16,並且裏面的數據是末尾淘汰制
- 每次指令操作的時候,自旋會判斷當前內存是否滿足指令所需要的內存
- 如果當前不滿足,會從淘汰池的尾部拿一個最適合淘汰的數據
- 會取樣(配置 maxmemory-samples),從Redis中獲取隨機獲取到取樣的數據,解決一次性讀取所有的數據慢的問題
- 在取樣的數據中,根據淘汰算法找到最適合淘汰的數據
- 將最合適的那個數據跟淘汰池中的數據進行比較,是否比淘汰池的數據更適合淘汰,如果更適合,放入淘汰池
- 按照適合的程度進行排序,最適合淘汰的放入尾部
- 將需要淘汰的數據從Redis刪除,並且從淘汰池移除
7.3 LRU算法
Lru,Least Recently Used 翻譯過來是最久未使用,根據時間軸來走,仍很久沒用的數據。只要最近有用過,我就默認是有效的。
那麼它的一個衡量標準是啥?時間!根據使用時間,從近到遠,越遠的越容易淘汰。
7.3.1 實現原理
需求:得到對象隔多久沒訪問,隔的時間越久,越容易被淘汰
- 首先,LRU是根據這個對象的訪問操作時間來進行淘汰的,那麼我們需要知道這個對象最後操作的訪問時間
- 知道了對象的最後操作訪問時間後,我們只需要跟當前系統時間來進行對比,就能算出對象多久沒訪問了
7.3.2 源碼驗證
redis中,對象都會被redisObject對象包裝,其中有個字段叫lru。
redisObject對象 (server.h文件)
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
看LRU的字段的説明,那麼我們大概能猜出來,redis去實現lru淘汰算法肯定跟我們這個對象的lru相關
並且這個字段的大小為24bit,那麼這個字段記錄的是對象操作訪問時候的秒單位時間的後24bit
相當於:
long timeMillis=System.currentTimeMillis();
System.out.println(timeMillis/1000); //獲取當前秒
System.out.println(timeMillis/1000 & ((1<<24)-1)); //獲取秒的後24位
我們知道了這個對象的最後操作訪問的時間。如果我們要得到這個對象多久沒訪問了,我們是不是就很簡單,用我當前的時間-這個對象的訪問時間就可以了!但是這個訪問時間是秒單位時間的後24bit 所以,也是用當前的時間的秒單位的後24bit去減。
estimateObjectIdleTime方法(evict.c)
unsigned long long estimateObjectIdleTime(robj *o) {
//獲取秒單位時間的最後24位
unsigned long long lruclock = LRU_CLOCK();
//因為只有24位,所有最大的值為2的24次方-1
//超過最大值從0開始,所以需要判斷lruclock(當前系統時間)跟緩存對象的lru字段的大小
if (lruclock >= o->lru) {
//如果lruclock>=robj.lru,返回lruclock-o->lru,再轉換單位
return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
} else {
//否則採用lruclock + (LRU_CLOCK_MAX - o->lru),得到對
象的值越小,返回的值越大,越大越容易被淘汰
return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
LRU_CLOCK_RESOLUTION;
}
}
7.3.3 LRU總結
用lruclock與 redisObject.lru進行比較,因為lruclock只獲取了當前秒單位時間的後24位,所以肯定有個輪詢。
所以,我們會用lruclock跟 redisObject.lru進行比較,如果lruclock>redisObject.lru,那麼我們用lruclock- redisObject.lru,否則lruclock+(24bit的最大值-redisObject.lru),得到lru越小,那麼返回的數據越大,相差越大的越優先會被淘汰!
https://www.processon.com/view/link/5fbe0541e401fd2d6ed8fc38
7.4 LFU算法
LFU,英文 Least Frequently Used,翻譯成中文就是最不常用的優先淘汰。
不常用,它的衡量標準就是次數,次數越少的越容易被淘汰。
這個實現起來應該也很簡單,對象被操作訪問的時候,去記錄次數,每次操作訪問一次,就+1;淘汰的時候,直接去比較這個次數,次數越少的越容易淘汰!
7.4.1 LFU的時效性問題
何為時效性問題?就是隻會去考慮數量,但是不會去考慮時間。
ps:去年的某個新聞很火,點擊量3000W,今年有個新聞剛出來點擊量100次,本來我們是想保留今年的新的新聞的,但是如果根據LFU來做的話,我們發現淘汰的是今年的新聞,明顯是不合理的
導致的問題:新的數據進不去,舊的數據出不來。
那麼如何解決呢,且往下看
7.4.2 源碼分析
我們還是來看redisObject(server.h)
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
我們看它的lru,它如果在LFU算法的時候!它前面16位代表的是時間,後8位代表的是一個數值,frequency是頻率,應該就是代表這個對象的訪問次數,我們先給它叫做counter。
前16bits時間有啥用?
跟時間相關,我猜想應該是跟解決時效性相關的,那麼怎麼解決的?從生活中找例子
大家應該充過一些會員,比如我這個年紀的,小時候喜歡充騰訊的黃鑽、綠鑽、藍鑽等等。但是有一個點,假如哪天我沒充錢了的話,或者沒有續VIP的時候,我這個鑽石等級會隨着時間的流失而降低。比如我本來是黃鑽V6,但是一年不充錢的話,可能就變成了V4。
那麼回到Redis,大膽的去猜測,那麼這個時間是不是去記錄這個對象有多久沒訪問了,如果多久沒訪問,我就去減少對應的次數。
LFUDecrAndReturn方法(evict.c)
unsigned long LFUDecrAndReturn(robj *o) {
//lru字段右移8位,得到前面16位的時間
unsigned long ldt = o->lru >> 8;
//lru字段與255進行&運算(255代表8位的最大值),
//得到8位counter值
unsigned long counter = o->lru & 255;
//如果配置了lfu_decay_time,用LFUTimeElapsed(ldt) 除以配置的值
//LFUTimeElapsed(ldt)源碼見下
//總的沒訪問的分鐘時間/配置值,得到每分鐘沒訪問衰減多少
unsigned long num_periods = server.lfu_decay_time ?LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
//不能減少為負數,非負數用couter值減去衰減值
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
衰減因子由配置
lfu-decay-time 1 //多少分鐘沒操作訪問就去衰減一次
後8bits的次數,最大值是255,夠不夠?
肯定不夠,一個對象的訪問操作次數肯定不止255次。
但是我們可以讓數據達到255的很難。那麼怎麼做的?這個比較簡單,我們先看源碼,然後再總結
LFULogIncr方法 (evict.c 文件)
uint8_t LFULogIncr(uint8_t counter) {
//如果已經到最大值255,返回255 ,8位的最大值
if (counter == 255) return 255;
//得到隨機數(0-1)
double r = (double)rand()/RAND_MAX;
//LFU_INIT_VAL表示基數值(在server.h配置)
double baseval = counter - LFU_INIT_VAL;
//如果達不到基數值,表示快不行了,baseval =0
if (baseval < 0) baseval = 0;
//如果快不行了,肯定給他加counter
//不然,按照機率是否加counter,同時跟baseval與lfu_log_factor相關
//都是在分子,所以2個值越大,加counter機率越小
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;
return counter;
}
八、Redis 持久化
8.1 RBD
8.1.1 何時觸發RBD
自動觸發
- 配置觸發
save 900 1 900s檢查一次,至少有1個key被修改就觸發
save 300 10 300s檢查一次,至少有10個key被修改就觸發
save 60 10000 60s檢查一次,至少有10000個key被修改
- shutdown正常關閉
任何組件在正常關閉的時候,都會去完成應該完成的事。比如Mysql 中的Redolog持久化,正常關閉的時候也會去持久化。
- flushall指令觸發
數據清空指令會觸發RDB操作,並且是觸發一個空的RDB文件,所以, 如果在沒有開啟其他的持久化的時候,flushall是可以刪庫跑路的,在生產環境慎用。
8.1.2 RDB的優劣
優勢
- 是個非常緊湊型的文件,非常適合備份與災難恢復
- 最大限度的提升了性能,會fork一個子進程,父進程永遠不會產於磁盤 IO或者類似操作
- 更快的重啟
不足
- 數據安全性不是很高,因為是根據配置的時間來備份,假如每5分鐘備份 一次,也會有5分鐘數據的丟失
- 經常fork子進程,所以比較耗CPU,對CPU不是很友好
8.2 AOF
由於RDB的數據可靠性非常低,所以Redis又提供了另外一種持久化方案: Append Only File 簡稱:AOF
AOF默認是關閉的,你可以在配置文件中進行開啟:
``` appendonly no //默認關閉,可以進行開啟
The name of the append only file (default:"appendonly.aof")
appendfilename "appendonly.aof" //AOF文件名 ```
追加文件,即每次更改的命令都會附加到我的AOF文件中。
8.2.1 同步機制
AOF會記錄每個寫操作,那麼問題來了。我難道每次操作命令都要和磁盤交互?
當然不行,所以redis支持幾種策略,由用户來決定要不要每次都和磁盤交互
```
appendfsync always 表示每次寫入都執行fsync(刷新)函數 性能會非常非常慢 但是非常安全
appendfsync everysec 每秒執行一次fsync函數 可能丟失1s的數據
appendfsync no 由操作系統保證數據同步到磁盤,速度最快 你的數據只需要交給操作系統就行
```
默認1s一次,最多有1s丟失
8.2.2 重寫機制
由於AOF是追加的形式,所以文件會越來越大,越大的話,數據加載越慢。 所以我們需要對AOF文件進行重寫。
何為重寫
比如 我們的incr指令,假如我們incr了100次,現在數據是100,但是我們的aof文件中會有100條incr指令,但是我們發現這個100條指令用處不大, 假如我們能把最新的內存裏的數據保存下來的話。所以,重寫就是做了這麼一件事情,把當前內存的數據重寫下來,然後把之前的追加的文件刪除。
重寫流程
-
在Redis7之前
-
Redis fork一個子進程,在一個臨時文件中寫入新的AOF(當前內存的數據生成的新的AOF)
- 那麼在寫入新的AOF的時候,主進程還會有指令進入,那麼主進程會在內存緩存區中累計新的指令 (但是同時也會寫在舊的AOF文件中,就算 重寫失敗,也不會導致AOF損壞或者數據丟失)
- 如果子進程重寫完成,父進程會收到完成信號,並且把內存緩存中的指令追加到新的AOF文件中
-
替換舊的AOF文件 ,並且將新的指令附加到重寫好的AOF文件中
-
在Redis7之後,AOF文件不再是一個,所以會有臨時清單的概念
-
子進程開始在一個臨時文件中寫入新的基礎AOF
- 父級打開一個新的增量 AOF 文件以繼續寫入更新。如果重寫失敗,舊的基礎和增量文件(如果有的話)加上這個新打開的增量文件就代表了完整的更新數據集,所以我們是安全的
- 當子進程完成基礎文件的重寫後,父進程收到一個信號,並使用新打開 的增量文件和子進程生成的基礎文件來構建臨時清單,並將其持久化
- 現在 Redis 對清單文件進行原子交換,以便此 AOF 重寫的結果生效。 Redis 還會清理舊的基礎文件和任何未使用的增量文件
但是重寫是把當前內存的數據,寫入一個新的AOF文件,如果當前數據比較大,然後以指令的形式寫入的話,會很慢很慢
所以在4.0之後,在重寫的時候是採用RDB的方式去生成新的AOF文件,然 後後續追加的指令,還是以指令追加的形式追加的文件末尾
aof-use-rdb-preamble yes //是否開啟RDB與AOF混合模式
什麼時候重寫
配置文件redis.conf
```
重寫觸發機制
auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb 就算達到了第一個百分比的大小,也必須大於 64M
在 aof 文件小於64mb的時候不進行重寫,當到達64mb的時候,就重寫一 次。重寫後的 aof 文件可能是40mb。上面配置了auto-aof-rewritepercentag為100,即 aof 文件到了80mb的時候,進行重寫。 ```
8.2.3 AOF的優勢與不足
優勢
- 安全性高,就算是默認的持久化同步機制,也最多隻會丟失1s數據
- AOF由於某些原因,比如磁盤滿了等導致追加失敗,也能通過redischeck-aof 工具來修復
- 格式都是追加的日誌,所以可讀性更高
不足
- 數據集一般比RDB大
- 持久化跟數據加載比RDB更慢
- 在7.0之前,重寫的時候,因為重寫的時候,新的指令會緩存在內存區,所以會導致大量的內存使用
- 並且重寫期間,會跟磁盤進行2次IO,一個是寫入老的AOF文件,一個是寫入新的AOF文件
九、Redis常見問題總結
9.1 Redis數據丟失場景
9.1.1 持久化丟失
採用RDB或者不持久化, 會有數據丟失,因為是手動或者配置以快照的形式來進行備份。
解決:啟用AOF,以命令追加的形式進行備份,但是默認也會有1s丟失, 這是在性能與數據安全性中尋求的一個最適合的方案,如果為了保證數據 一致性,可以將配置更改為always,但是性能很慢,一般不用。
9.1.2 主從切換
因為Redis的數據是主異步同步給從的,提升了性能,但是由於是異步同步到從。所以存在數據丟失的可能
- master寫入數據k1 ,由於是異步同步到slave,當master沒有同步給slave的時候,master掛了
- slave會成為新的master,並且沒有同步k1
- master重啟,會成為新master的slave,同步數據會清空自己的數據,從新的master加載
- k1丟失
9.2 Redis緩存雪崩、穿透、擊穿問題分析
9.2.1 緩存雪崩
緩存雪崩就是Redis的大量熱點數據同時過期(失效),因為設置了相同的過期時間,剛好這個時候Redis請求的併發量又很大,就會導致所有的請求落到數據庫。
- 保證Redis的高可用,防止由於Redis不可用導致全部打到DB
- 加互斥鎖或者使用隊列,針對同一個key只允許一個線程到數據庫查詢
- 緩存定時預先更新,避免同時失效
- 通過加隨機數,使key在不同的時間過期
9.2.2 緩存穿透
緩存穿透是指緩存和數據庫中都沒有的數據,但是用户一直請求不存在的數據!這時的用户很可能就是攻擊者,惡意搞你們公司的,攻擊會導致數據庫壓力過大。
解決方案:布隆過濾器
布隆過濾器的思想:既然是因為你Redis跟DB都沒有,然後用户惡意一直訪問不存在的key,然後全部打到Redis跟DB。那麼我們能不能單獨把DB的數據單獨存到另外一個地方,但是這個地方只存一個簡單的標記,標記這個key存不存在,如果存在,就去訪問Redis跟DB,否則直接返回。並且這個內存最好很小。
9.2.3 緩存擊穿
單個key過期的時候有大量併發,使用互斥鎖,回寫redis,並且採用雙重檢查鎖來提升性能!減少對DB的訪問。
- 應用健康度隱患刨析解決系列之數據庫時區設置
- 對於Vue3和Ts的心得和思考
- 一文詳解擴散模型:DDPM
- zookeeper的Leader選舉源碼解析
- 一文帶你搞懂如何優化慢SQL
- 京東金融Android瘦身探索與實踐
- 微前端框架single-spa子應用加載解析
- cookie時效無限延長方案
- 聊聊前端性能指標那些事兒
- Spring竟然可以創建“重複”名稱的bean?—一次項目中存在多個bean名稱重複問題的排查
- 京東金融Android瘦身探索與實踐
- Spring源碼核心剖析
- 深入淺出RPC服務 | 不同層的網絡協議
- 安全測試之探索windows遊戲掃雷
- 關於數據庫分庫分表的一點想法
- 對於Vue3和Ts的心得和思考
- Bitmap、RoaringBitmap原理分析
- 京東小程序CI工具實踐
- 測試用例設計指南
- 當你對 redis 説你中意的女孩是 Mia