一文搞懂Redis
作者: 京東物流 劉麗俠 姚再毅 康睿 劉斌 李振
一、Redis的特性
1.1 Redis為什麼快?
- 基於記憶體操作,操作不需要跟磁碟互動,單次執行很快
- 命令執行是單執行緒,因為是基於記憶體操作,單次執行的時間快於執行緒切換時間,同時通訊採用多路複用
- Redis本身就是一個k-v結構,類似於hashMap,所以查詢效能接近於O(1)
- 同時redis自己底層資料結構支援,比如跳錶、SDS
- lO多路複用,單個執行緒中通過記錄跟蹤每一個sock(I/O流)的狀態來管理多個I/O流
1.2 Redis其他特性
- 更豐富的資料型別,雖然都是k、v結構,value可以儲存很多的資料型別
- 完善的記憶體管理機制、保證資料一致性:持久化機制、過期策略
- 支援多種程式語言
- 高可用,叢集、保證高可用
1.3 Redis高可用
- 很完善的記憶體管理機制,過期、淘汰、持久化
- 叢集模式,主從、哨兵、cluster叢集
二、Redis資料型別以及使用場景
Redis的資料型別有String、Hash、Set、List、Zset、bitMap(基於String型別)、 Hyperloglog(基於String型別)、Geo(地理位置)、Streams流。
2.1 String
2.1.1 基本指令
``` // 批量設定
mset key1 value1 key2 value2 // 批量獲取 mget key1 key2 // 獲取長度 strlen key
// 字串追加內容 append key xxx // 獲取指定區間的字元 getrange key 0 5 // 整數值遞增 (遞增指定的值) incr intkey (incrby intkey 10) // 當key 存在時將覆蓋 SETEX key seconds value // 將 key 的值設為 value ,當且僅當 key 不存在。 SETNX key value ```
2.1.2 應用場景
- 快取相關場景 快取、 token(跟過期屬性完美契合)
- 執行緒安全的計數場景 (軟限流、分散式ID等)
2.2 Hash
2.2.1 基本指令
``` // 將雜湊表 key 中的域 field 的值設為 value 。
HSET key field value // 返回雜湊表 key 中給定域 field 的值。 HGET key field // 返回雜湊表 key 中的所有域。 HKEYS key // 返回雜湊表 key 中所有域的值。 HVALS key // 為雜湊表 key 中的域 field 的值加上增量 increment 。 HINCRBY key field increment // 檢視雜湊表 key 中,給定域 field 是否存在。 HEXISTS key field ```
2.2.2 應用場景
- 儲存物件類的資料(官網說的)比如一個物件下有多個欄位
- 統計類的資料,可以對單個統計資料進行單獨操作
2.3 List
儲存有序的字串列表,元素可以重複。列表的最大長度為 2^32 - 1 個元素(4294967295,每個列表超過 40 億個元素)。
2.3.1 基本指令
``` // 將一個或多個值 value 插入到列表 key 的表頭
LPUSH key value [value ...] // 將一個或多個值 value 插入到列表 key 的表尾(最右邊)。 RPUSH key value [value ...] // 移除並返回列表 key 的頭元素。 LPOP key // 移除並返回列表 key 的尾元素。 RPOP key // BLPOP 是列表的阻塞式(blocking)彈出原語。 BLPOP key [key ...] timeout // BRPOP 是列表的阻塞式(blocking)彈出原語。 BRPOP key [key ...] timeout // 獲取指點位置元素 LINDEX key index ```
2.3.2 應用場景
- 使用者訊息時間線
因為list是有序的,所以我們可以先進先出,先進後出的列表都可以做
2.4 Set
2.4.1 基本指令
``` // 將一個或多個 member 元素加入到集合 key 當中,已經存在於集合的 member 元素將被忽略。
SADD key member [member ...] // 返回集合 key 中的所有成員。 SMEMBERS key // 返回集合 key 的基數(集合中元素的數量)。 SCARD key // 如果命令執行時,只提供了 key 引數,那麼返回集合中的一個隨機元素。 SRANDMEMBER key [count] // 移除並返回集合中的一個隨機元素。 SPOP key // 移除集合 key 中的一個或多個 member 元素,不存在的 member 元素會被忽略。 SREM key member [member ...] // 判斷 member 元素是否集合 key 的成員。 SISMEMBER key member // 獲取前一個集合有而後面1個集合沒有的 sdiff huihuiset huihuiset1 // 獲取交集 sinter huihuiset huihuiset1 // 獲取並集 sunion huihuiset huihuiset1 ```
2.4.2 應用場景
- 抽獎 spop跟srandmember隨機彈出或者獲取元素
- 點贊、簽到等,sadd集合儲存
- 交集並集 關注等場景
2.5 ZSet(SortedSet)
sorted set,有序的set,每個元素有個score。
score相同時,按照key的ASCII碼排序。
2.5.1 基本指令
``` //將一個或多個 member 元素及其 score 值加入到有序集 key 當中。
ZADD key score member [[score member] [score member] ...] // 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞增(從小到大)來排序 ZRANGE key start stop [WITHSCORES] // 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞減(從大到小)來排列。 ZREVRANGE key start stop [WITHSCORES] // 返回有序集 key 中,所有 score 值介於 min 和 max 之間(包括等於 min 或 max )的成員。有序整合員按 score 值遞增(從小到大)次序排列。 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] // 移除有序集 key 中的一個或多個成員,不存在的成員將被忽略。 ZREM key member [member ...] // 返回有序集 key 的基數。 ZCARD key // 為有序集 key 的成員 member 的 score 值加上增量 increment 。 ZINCRBY key increment member // 返回有序集 key 中, score 值在 min 和 max 之間(預設包括 score 值等於 min 或 max )的成員的數量。 ZCOUNT key min max // 返回有序集 key 中成員 member 的排名。其中有序整合員按 score 值遞增(從小到大)順序排列。 ZRANK key member ```
2.5.2 應用場景
- 排行榜
三、Redis的事務
3.1 事務基本操作
// 1. multi命令開啟事務,exec命令提交事務
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
// 2. 組隊的過程中可以通過discard來放棄組隊。
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> set k4 v4
QUEUED
127.0.0.1:6379(TX)> discard
OK
3.2 Redis事務特性
- 單獨的隔離操作
事務中的所有命令都會序列化、按順序地執行。
事務在執行過程中,不會被其他客戶端傳送來的命令請求所打斷。
- 沒有隔離級別的概念
佇列中的命令沒有提交之前,都不會被實際地執行,因為事務提交之前任何指令都不會被實際執行,也就不存在“事務內的查詢要看到事務裡的更新,在事務外查詢不能看到”這個讓人萬分頭疼的問題。
- 不保證原子性
Redis同一個事務中如果有一條命令執行失敗,其後的命令仍然會被執行,沒有回滾。
四、Redis 分散式鎖
4.1 Lua 實現分散式鎖
local lockSet = redis.call('exists', KEYS[1])
if lockSet == 0
then
redis.call('set', KEYS[1], ARG[1])
redis.call('expire', KEYS[1], ARG[2])
end
return lockSet
五、Redis總體資料結構
5.1 Redis dict字典
5.1.1 RedisDb資料介面(server.h檔案)
資料最外層的結構為RedisDb
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */ //資料庫的鍵
dict *expires; /* Timeout of keys with a timeout set */ //設定了超時時間的鍵
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //客戶端等待的keys
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */ //所在數 據庫ID
long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,僅用於統計
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
5.1.2 dict資料結構(dict.h檔案)
我們發現數據儲存在dict中
typedef struct dict {
dictType *type; //理解為面向物件思想,為支援不同的資料型別對應dictType抽象方法,不同的資料型別可以不同實現
void *privdata; //也可不同的資料型別相關,不同型別特定函式的可選引數。
dictht ht[2]; //2個hash表,用來資料儲存 2個dictht
long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash標記 -1代表不再rehash
unsigned long iterators; /* number of iterators currently running */
} dict;
5.1.3 dictht結構(dict.h檔案)
typedef struct dictht {
dictEntry **table; //dictEntry 陣列
unsigned long size; //陣列大小 預設為4 #define DICT_HT_INITIAL_SIZE 4
unsigned long sizemask; //size-1 用來取模得到資料的下標值
unsigned long used; //改hash表中已有的節點資料
} dictht;
5.1.4 dictEntry節點結構(dict.h檔案)
typedef struct dictEntry {
void *key; //key
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; //value
struct dictEntry *next; //指向下一個節點
} dictEntry;
5.1.5 RedisObject(service.h檔案)
``` / Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. /
define OBJ_ENCODING_RAW 0 / Raw representation /
define OBJ_ENCODING_INT 1 / Encoded as integer /
define OBJ_ENCODING_HT 2 / Encoded as hash table /
define OBJ_ENCODING_ZIPMAP 3 / Encoded as zipmap /
define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old
list encoding. */
define OBJ_ENCODING_ZIPLIST 5 / Encoded as ziplist /
define OBJ_ENCODING_INTSET 6 / Encoded as intset /
define OBJ_ENCODING_SKIPLIST 7 / Encoded as skiplist /
define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string
encoding */
define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list
of ziplists */
define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree
of listpacks */
define LRU_BITS 24
define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of
obj->lru */
define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution
in ms */
define OBJ_SHARED_REFCOUNT INT_MAX /* Global object
never destroyed. */
define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object
allocated in the stack. */
define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
typedef struct redisObject { unsigned type:4; //資料型別 string hash list unsigned encoding:4; //底層的資料結構 跳錶 unsigned lru:LRU_BITS; / LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). / int refcount; //用於回收,引用計數法 void *ptr; //指向具體的資料結構的記憶體地址 比如 跳錶、壓縮列表 } robj; ```
5.2 Redis資料結構圖
5.3 Redis擴容機制
因為我們的dictEntry陣列預設大小是4,如果不進行擴容,那麼我們所有的資料都會以連結串列的形式新增至陣列下標 隨著資料量越來越大,之前只需要hash取模就能得到下標位置,現在得去迴圈我下標的連結串列,所以效能會越來越慢,當我們的資料量達到一定程度後,我們就得去觸發擴容操作。
5.3.1 什麼時候擴容
5.3.1.1 擴容機制
- 當沒有fork子程序在進行RDB或AOF持久化時,ht[0]的used大於size時,觸發擴容
- 如果有子程序在進行RDB或者AOF時,ht[0]的used大於等於size的5倍的時候,會觸發擴容
5.3.1.2 原始碼驗證
擴容,肯定是在新增資料的時候才會擴容,所以我們找一個新增資料的入口。
- 入口,新增或替換dictReplace (dict.c檔案)
int dictReplace(dict *d, void *key, void *val) {
dictEntry *entry, *existing, auxentry;
/* Try to add the element. If the key
* does not exists dictAdd will succeed. */
entry = dictAddRaw(d,key,&existing);
if (entry) {
dictSetVal(d, entry, val);
return 1;
}
/* Set the new value and free the old one. Note that
it is important
* to do that in this order, as the value may just be
exactly the same
* as the previous one. In this context, think to
reference counting,
* you want to increment (set), and then decrement
(free), and not the
* reverse. */
auxentry = *existing;
dictSetVal(d, existing, val);
dictFreeVal(d, &auxentry);
return 0;
}
- 進入dictAddRaw方法 (dict.c檔案)
``` dictEntry dictAddRaw(dict d, void key, dictEntry existing){ long index; dictEntry entry; dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d); //如果正在Rehash 進行漸進式hash 按步rehash
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that
in a database
* system it is more likely that recently added
entries are accessed
* more frequently. */
//如果在hash 放在ht[1] 否則放在ht[0]
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
//採用頭插法插入hash桶
entry->next = ht->table[index];
ht->table[index] = entry;
//已使用++
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
} ```
- 得到資料下標的方法 _dictKeyIndex (dict.c檔案)
``` static long _dictKeyIndex(dict d, const void key,uint64_t hash, dictEntry existing){ unsigned long idx, table; dictEntry he; if (existing) existing = NULL;
/* Expand the hash table if needed */
//擴容如果需要
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
//比較是否存在這個值
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain
the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key,he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return idx;
} ```
- _dictExpandIfNeeded 驗證是否需要擴容 (dict.c檔案)
``` / Expand the hash table if needed / static int _dictExpandIfNeeded(dict d){ / Incremental rehashing already in progress. Return.*/ if (dictIsRehashing(d)) return DICT_OK; //正在rehash,返回
/* If the hash table is empty expand it to the initialsize. */
//如果ht[0]的長度為0,設定預設長度4 dictExpand為擴容的關鍵方法
if (d->ht[0].size == 0)
return dictExpand(d,DICT_HT_INITIAL_SIZE);
//擴容條件 hash表中已使用的數量大於等於 hash表的大小
//並且dict_can_resize為1 或者 已使用的大小大於hash表大小的 5倍,大於等於1倍的時候,下面2個滿足一個條件即可
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){
//擴容成原來的2倍
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
} ```
5.3.2 怎麼擴容
5.3.2.1 擴容步驟
- 當滿足我擴容條件,觸發擴容時,判斷是否在擴容,如果在擴容,或者 擴容的大小跟我現在的ht[0].size一樣,這次擴容不做
- new一個新的dictht,大小為ht[0].used * 2(但是必須向上2的冪,比 如6 ,那麼大小為8) ,並且ht[1]=新建立的dictht
- 我們有個更大的table了,但是需要把資料遷移到ht[1].table ,所以將 dict的rehashidx(資料遷移的偏移量)賦值為0 ,代表可以進行資料遷 移了,也就是可以rehash了
- 等待資料遷移完成,資料不會馬上遷移,而是採用漸進式rehash,慢慢的把資料遷移到ht[1]
- 當資料遷移完成,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0
- 把dict的rehashidex=-1
5.3.2.2 原始碼驗證
- dictExpand方法在_dictExpandIfNeeded 方法中呼叫 (dict.c檔案),為擴容的關鍵方法
``` int dictExpand(dict d, unsigned long size){ / the size is invalid if it is smaller than the number of * elements already inside the hash table */ //正在擴容,不允許第二次擴容,或者ht[0]的資料量大於擴容後的數量,沒有意義,這次不擴容 if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR;
dictht n; /* the new hash table */
//得到最接近2的冪 跟hashMap思想一樣
unsigned long realsize = _dictNextPower(size);
/* Rehashing to the same table size is not useful. */
//如果跟ht[0]的大小一致 直接返回錯誤
if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all
pointers to NULL */
//為新的dictht賦值
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not
really a rehashing
* we just set the first hash table so that it can
accept keys. */
//ht[0].table為空,代表是初始化
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
//將擴容後的dictht賦值給ht[1]
d->ht[1] = n;
//標記為0,代表可以開始rehash
d->rehashidx = 0;
return DICT_OK;
} ```
5.3.3 資料怎麼遷移(漸進式hash)
5.3.3.1 遷移方式
假如一次性把資料全部遷移會很耗時間,會讓單條指令等待很久,會形成阻塞。
所以,redis採用漸進式Rehash,所謂漸進式,就是慢慢的,不會一次性把所有資料遷移。
那什麼時候會進行漸進式資料遷移?
- 每次進行key的crud操作都會進行一個hash桶的資料遷移
- 定時任務,進行部分資料遷移
5.3.3.2 原始碼驗證
CRUD觸發都會觸發_dictRehashStep(漸進式hash)
- 每次增刪改的時候都會呼叫_dictRehashStep方法
if (dictIsRehashing(d)) _dictRehashStep(d); //這個程式碼在增刪改查的方法中都會呼叫
- _dictRehashStep方法
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
- dictRehash方法
``` int dictRehash(dict d, int n) { int empty_visits = n10; / Max number of empty buckets to visit. / //要訪問的最大的空桶數 防止此次耗時過長
if (!dictIsRehashing(d)) return 0; //如果沒有在rehash返回0
//迴圈,最多1次,並且ht[0]的資料沒有遷移完 進入迴圈
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are
sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
//rehashidx rehash的索引,預設0 如果hash桶為空 進入自旋 並且判斷是否到了最大的訪問空桶數
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下標為rehashidx桶
/* Move all the keys in this bucket from the old
to the new hash HT */
while(de) { //迴圈hash桶的連結串列 並且轉移到ht[1]
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d-
>ht[1].sizemask;
de->next = d->ht[1].table[h]; //頭插
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
//轉移完後 將ht[0]相對的hash桶設定為null
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
//ht[0].used=0 代表rehash全部完成
if (d->ht[0].used == 0) {
//清空ht[0]table
zfree(d->ht[0].table);
//將ht[0] 重新指向已經完成rehash的ht[1]
d->ht[0] = d->ht[1];
//將ht[1]所有資料重新設定
_dictReset(&d->ht[1]);
//設定-1,代表rehash完成
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
//將ht[1]的屬性復位 static void _dictReset(dictht *ht) { ht->table = NULL; ht->size = 0; ht->sizemask = 0; ht->used = 0; } ```
定時任務觸發
再講定時任務之前,我們要知道Redis中有個時間事件驅動,在 server.c檔案下有個serverCron 方法。
serverCron 每隔多久會執行一次,執行頻率根據redis.conf中的hz配置,serverCorn中有個方法databasesCron()
- 定時方法serverCron
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//.......
/* We need to do a few operations on clients
asynchronously. */
clientsCron();
/* Handle background operations on Redis databases. */
databasesCron(); //處理Redis資料庫上的後臺操作。
//.......
}
- databasesCron方法
void databasesCron(void) {
/* Expire keys by random sampling. Not required for
slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/* Defrag keys gradually. */
activeDefragCycle();
/* Perform hash tables rehashing if needed, but only
if there are no
* other processes saving the DB on disk. Otherwise
rehashing is bad
* as will cause a lot of copy-on-write of memory
pages. */
if (!hasActiveChildProcess()) {
/* We use global counters so if we stop the
computation at a given
* DB we'll be able to start from the successive
in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum)
dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */ //Rehash邏輯
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
//進入incrementallyRehash
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop
here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash,
we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
- 進入incrementallyRehash方法(server.c)
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1;
/* already used our millisecond for this
loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this
loop... */
}
return 0;
}
- 進入dictRehashMilliseconds(dict.c)
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
//進入rehash方法 dictRehash 去完成漸進時HASH
while(dictRehash(d,100)) {
rehashes += 100;
//判斷是否超時
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
5.4 String型別資料結構
Redis中String的底層沒有用c的char來實現,而是採用了SDS(simple Dynamic String)的資料結構來實現。並且提供了5種不同的型別
5.4.1 SDS資料結構定義(sds.h檔案)
``` typedef char sds; / Note: sdshdr5 is never used, we just access the flags byte directly. * However is here to document the layout of type 5 SDS strings. / struct attribute ((packed)) sdshdr5 { unsigned char flags; / 3 lsb of type, and 5 msb of string length */ char buf[]; }
struct attribute ((packed)) sdshdr8 { uint8_t len; / used / //已使用的長度 uint8_t alloc; / excluding the header and null terminator / //分配的總容量 不包含頭和空終止符 unsigned char flags; / 3 lsb of type, 5 unused bits / //低三位型別 高5bit未使用 char buf[]; //資料buf 儲存字串陣列 };
struct attribute ((packed)) sdshdr16 { uint16_t len; / used / uint16_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; };
struct attribute ((packed)) sdshdr32 { uint32_t len; / used / uint32_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; };
struct attribute ((packed)) sdshdr64 { uint64_t len; / used / uint64_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; }; ```
5.4.2 SDS資料結構的優勢
- char字串不記錄自身長度,所以獲取一個字串長度的複雜度是O(n),但是SDS記錄分配的長度alloc,已使用的長度len,獲取長度為 O(1)
- 減少修改字串帶來的記憶體重分配次數
- 空間預分配,SDS長度如果小於1MB,預分配跟長度一樣的,大於1M,每次跟len的大小多1M
- 惰性空間釋放,擷取的時候不馬上釋放空間,供下次使用!同時提供相應的釋放SDS未使用空間的API
- 二進位制安全
5.5 Hash型別資料結構
Redis的字典相當於Java語言中的HashMap,他是無序字典。內部結構上同樣是陣列 + 連結串列二維結構。第一維hash的陣列位置碰撞時,就會將碰撞的元素使用連結串列串起來。
5.5.1 Hash資料結構圖
5.5.2 Hash的壓縮列表
壓縮列表會根據存入的資料的不同型別以及不同大小,分配不同大小的空間。所以是為了節省記憶體而採用的。
因為是一塊完整的記憶體空間,當裡面的元素髮生變更時,會產生連鎖更新,嚴重影響我們的訪問效能。所以,只適用於資料量比較小的場景。
所以,Redis會有相關配置,Hashes只有小資料量的時候才會用到ziplist,當hash物件同時滿足以下兩個條件的時候,使用ziplist編碼:
- 雜湊物件儲存的鍵值對數量<512個
- 所有的鍵值對的鍵和值的字串長度都<64byte(一個英文字母一個位元組)
hash-max-ziplist-value 64 // ziplist中最大能存放的值長度
hash-max-ziplist-entries 512 // ziplist中最多能存放的entry節點數量
5.6 List型別資料結構
5.6.1 quickList快速列表
兼顧了ziplist的節省記憶體,並且一定程度上解決了連鎖更新的問題,我們的 quicklistNode節點裡面是個ziplist,每個節點是分開的。那麼就算髮生了連鎖更新,也只會發生在一個quicklistNode節點。
quicklist.h檔案
typedef struct{
struct quicklistNode *prev; //前指標
struct quicklistNode *next; //後指標
unsigned char *zl; //資料指標 指向ziplist結果
unsigned int sz; //ziplist大小 /* ziplist
size in bytes */
unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素
unsigned int encoding : 2; /* RAW==1 or LZF==2 */ //
是否壓縮, 1沒有壓縮 2 lzf壓縮
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //預留容器欄位
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */ //預留欄位
} quicklistNode;
5.6.2 list資料結構圖
5.7 Set型別資料結構
Redis用intset或hashtable儲存set。滿足下面條件,就用inset儲存。
- 如果不是整數型別,就用dictht hash表(陣列+連結串列)
- 如果元素個數超過512個,也會用hashtable儲存。跟一個配置有關:
set-max-intset-entries 512
不滿足上述條件的,都使用hash表儲存,value存null。
5.8 ZSet資料結構
5.8.1 ZipList
預設使用ziplist編碼。
在ziplist的內部,按照score排序遞增來儲存。插入的時候要移動之後的資料。
如果元素數量大於等於128個,或者任一member長度大於等於64位元組使用 skiplist+dict儲存。
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
如果不滿足條件,採用跳錶。
5.8.2 跳錶(skiplist)
結構定義(servier.h)
``` / ZSETs use a specialized version of Skiplists / typedef struct zskiplistNode { sds ele; //sds資料 double score; //score struct zskiplistNode backward; //後退指標 //層級陣列 struct zskiplistLevel { struct zskiplistNode forward; //前進指標 unsigned long span; //跨度 } level[]; } zskiplistNode;
//跳錶列表 typedef struct zskiplist { struct zskiplistNode header, tail; //頭尾節點 unsigned long length; //節點數量 int level; //最大的節點層級 } zskiplist; ```
zslInsert 新增節點方法 (t_zset.c)
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds
ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert
position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
咕泡出品 必屬精品精品屬精品 咕泡出品 必屬精品 咕泡出品 必屬精品 咕泡咕泡
(x->level[i].forward->score == score
&&
sdscmp(x->level[i].forward->ele,ele) <
0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since
we allow duplicated
* scores, reinserting the same element should never
happen since the
* caller of zslInsert() should test in the hash table
if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
//不同層級建立連結串列聯絡
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is
inserted here */
x->level[i].span = update[i]->level[i].span -
(rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) +
1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL :
update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
ZSKIPLIST_MAXLEVEL預設32 定義在server.h
```
define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64
elements */ ```
六、Redis過期策略
6.1 惰性過期
所謂惰性過期,就是在每次訪問操作key的時候,判斷這個key是不是過期了,過期了就刪除。
6.1.1 原始碼驗證
- expireIfNeeded方法(db.c檔案)
int expireIfNeeded(redisDb *db, robj *key) {
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a slave,
instead of
* evicting the expired key from the database, we
return ASAP:
* the slave key expiration is controlled by the
master that will
* send us synthesized DEL operations for expired
keys.
*
* Still we try to return the right information to the
caller,
* that is, 0 if we think the key should be still
valid, 1 if
* we think the key is expired at this time. */
// 如果配置有masterhost,說明是從節點,那麼不操作刪除
if (server.masterhost != NULL) return 1;
/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
//是否是非同步刪除 防止單個Key的資料量很大 阻塞主執行緒 是4.0之後新增的新功能,預設關閉
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
}
每次呼叫到相關指令時,才會執行expireIfNeeded判斷是否過期,平時不會去判斷是否過期。
該策略可以最大化的節省CPU資源。
但是卻對記憶體非常不友好。因為如果沒有再次訪問,就可能一直堆積再記憶體中。佔用記憶體
所以就需要另一種策略來配合使用,就是定期過期
6.2 定期過期
那麼多久去清除一次,我們在講Rehash的時候,有個方法是serverCron,執行頻率根據redis.conf中的hz配置。
這方法除了做Rehash以外,還會做很多其他的事情,比如:
- 清理資料庫中的過期鍵值對
- 更新伺服器的各類統計資訊,比如時間、記憶體佔用、資料庫佔用情況等
- 關閉和清理連線失效的客戶端
- 嘗試進行持久化操作
6.2.1 實現流程
- 定時serverCron方法去執行清理,執行頻率根據redis.cong中的hz配置的值(預設是10,也就是1s執行10次,100ms執行一次)
- 執行清理的時候,不是去掃描所有的key,而是去掃描所有設定了過期時間的key redisDB.expires
- 如果每次去把所有的key都拿出來,那麼假如過期的key很多,就會很慢,所以也不是一次性拿出所有的key
- 根據hash桶的維度去掃描key,掃到20(可配置)個key為止。假如第一個桶是15個key沒有滿足20,繼續掃描第二個桶,第二個桶20個key,由於是以hash桶的維度掃描的,所以第二個掃到了就會全掃,總共掃描35個key
- 找到掃描的key裡面過期的key,並進行刪除
- 如果取了400個空桶(最多拿400個桶),或者掃描的刪除比例跟掃描的總數超過10%,繼續執行4、5步
- 也不能無限的迴圈,迴圈16次後回去檢測時間,超過指定時間會跳出
6.2.2 原始碼驗證
- 入口serverCrom
``` // serverCron方法呼叫databasesCron方法(server.c) / Handle background operations on Redis databases. / databasesCron();
void databasesCron(void) { / Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. / if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); //過期刪除方法 } else { expireSlaveKeys(); } } } ```
七、Redis淘汰策略
由於Redis記憶體是有大小的,並且我可能裡面的資料都沒有過期,當快滿的時候,我又沒有過期的資料進行淘汰,那麼這個時候記憶體也會滿。記憶體滿了,Redis也會不能放入新的資料。所以,我們不得已需要一些策略來解決這個問題來保證可用性。
7.1 淘汰策略
7.1.1 noeviction
New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database.
預設,不淘汰 能讀不能寫
7.1.2 allkeys-lru
Keeps most recently used keys; removes least recently used (LRU) keys
基於偽LRU演算法,在所有的key中去淘汰
7.1.3 allkeys-lfu
Keeps frequently used keys; removes least frequently used (LFU) keys.
基於偽LFU演算法,在所有的key中去淘汰
7.1.4 volatile-lru
Removes least recently used keys with the expire field set to true.
基於偽LRU演算法,在設定了過期時間的key中去淘汰
7.1.5 volatile-lfu
Removes least frequently used keys with the expire field set to true.
基於偽LFU演算法 在設定了過期時間的key中去淘汰
7.1.6 allkeys-random
Randomly removes keys to make space for the new data added.
基於隨機演算法,在所有的key中去淘汰
7.1.7 volatile-random
Randomly removes keys with expire field set to true.
基於隨機演算法 在設定了過期時間的key中去淘汰
7.1.8 volatile-ttl
Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value.
根據過期時間來,淘汰即將過期的
7.2 淘汰流程
- 首先,我們會有個淘汰池,預設大小是16,並且裡面的資料是末尾淘汰制
- 每次指令操作的時候,自旋會判斷當前記憶體是否滿足指令所需要的記憶體
- 如果當前不滿足,會從淘汰池的尾部拿一個最適合淘汰的資料
- 會取樣(配置 maxmemory-samples),從Redis中獲取隨機獲取到取樣的資料,解決一次性讀取所有的資料慢的問題
- 在取樣的資料中,根據淘汰演算法找到最適合淘汰的資料
- 將最合適的那個資料跟淘汰池中的資料進行比較,是否比淘汰池的資料更適合淘汰,如果更適合,放入淘汰池
- 按照適合的程度進行排序,最適合淘汰的放入尾部
- 將需要淘汰的資料從Redis刪除,並且從淘汰池移除
7.3 LRU演算法
Lru,Least Recently Used 翻譯過來是最久未使用,根據時間軸來走,仍很久沒用的資料。只要最近有用過,我就預設是有效的。
那麼它的一個衡量標準是啥?時間!根據使用時間,從近到遠,越遠的越容易淘汰。
7.3.1 實現原理
需求:得到物件隔多久沒訪問,隔的時間越久,越容易被淘汰
- 首先,LRU是根據這個物件的訪問操作時間來進行淘汰的,那麼我們需要知道這個物件最後操作的訪問時間
- 知道了物件的最後操作訪問時間後,我們只需要跟當前系統時間來進行對比,就能算出物件多久沒訪問了
7.3.2 原始碼驗證
redis中,物件都會被redisObject物件包裝,其中有個欄位叫lru。
redisObject物件 (server.h檔案)
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
看LRU的欄位的說明,那麼我們大概能猜出來,redis去實現lru淘汰演算法肯定跟我們這個物件的lru相關
並且這個欄位的大小為24bit,那麼這個欄位記錄的是物件操作訪問時候的秒單位時間的後24bit
相當於:
long timeMillis=System.currentTimeMillis();
System.out.println(timeMillis/1000); //獲取當前秒
System.out.println(timeMillis/1000 & ((1<<24)-1)); //獲取秒的後24位
我們知道了這個物件的最後操作訪問的時間。如果我們要得到這個物件多久沒訪問了,我們是不是就很簡單,用我當前的時間-這個物件的訪問時間就可以了!但是這個訪問時間是秒單位時間的後24bit 所以,也是用當前的時間的秒單位的後24bit去減。
estimateObjectIdleTime方法(evict.c)
unsigned long long estimateObjectIdleTime(robj *o) {
//獲取秒單位時間的最後24位
unsigned long long lruclock = LRU_CLOCK();
//因為只有24位,所有最大的值為2的24次方-1
//超過最大值從0開始,所以需要判斷lruclock(當前系統時間)跟快取物件的lru欄位的大小
if (lruclock >= o->lru) {
//如果lruclock>=robj.lru,返回lruclock-o->lru,再轉換單位
return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
} else {
//否則採用lruclock + (LRU_CLOCK_MAX - o->lru),得到對
象的值越小,返回的值越大,越大越容易被淘汰
return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
LRU_CLOCK_RESOLUTION;
}
}
7.3.3 LRU總結
用lruclock與 redisObject.lru進行比較,因為lruclock只獲取了當前秒單位時間的後24位,所以肯定有個輪詢。
所以,我們會用lruclock跟 redisObject.lru進行比較,如果lruclock>redisObject.lru,那麼我們用lruclock- redisObject.lru,否則lruclock+(24bit的最大值-redisObject.lru),得到lru越小,那麼返回的資料越大,相差越大的越優先會被淘汰!
http://www.processon.com/view/link/5fbe0541e401fd2d6ed8fc38
7.4 LFU演算法
LFU,英文 Least Frequently Used,翻譯成中文就是最不常用的優先淘汰。
不常用,它的衡量標準就是次數,次數越少的越容易被淘汰。
這個實現起來應該也很簡單,物件被操作訪問的時候,去記錄次數,每次操作訪問一次,就+1;淘汰的時候,直接去比較這個次數,次數越少的越容易淘汰!
7.4.1 LFU的時效性問題
何為時效性問題?就是隻會去考慮數量,但是不會去考慮時間。
ps:去年的某個新聞很火,點選量3000W,今年有個新聞剛出來點選量100次,本來我們是想保留今年的新的新聞的,但是如果根據LFU來做的話,我們發現淘汰的是今年的新聞,明顯是不合理的
導致的問題:新的資料進不去,舊的資料出不來。
那麼如何解決呢,且往下看
7.4.2 原始碼分析
我們還是來看redisObject(server.h)
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
我們看它的lru,它如果在LFU演算法的時候!它前面16位代表的是時間,後8位代表的是一個數值,frequency是頻率,應該就是代表這個物件的訪問次數,我們先給它叫做counter。
前16bits時間有啥用?
跟時間相關,我猜想應該是跟解決時效性相關的,那麼怎麼解決的?從生活中找例子
大家應該充過一些會員,比如我這個年紀的,小時候喜歡充騰訊的黃鑽、綠鑽、藍鑽等等。但是有一個點,假如哪天我沒充錢了的話,或者沒有續VIP的時候,我這個鑽石等級會隨著時間的流失而降低。比如我本來是黃鑽V6,但是一年不充錢的話,可能就變成了V4。
那麼回到Redis,大膽的去猜測,那麼這個時間是不是去記錄這個物件有多久沒訪問了,如果多久沒訪問,我就去減少對應的次數。
LFUDecrAndReturn方法(evict.c)
unsigned long LFUDecrAndReturn(robj *o) {
//lru欄位右移8位,得到前面16位的時間
unsigned long ldt = o->lru >> 8;
//lru欄位與255進行&運算(255代表8位的最大值),
//得到8位counter值
unsigned long counter = o->lru & 255;
//如果配置了lfu_decay_time,用LFUTimeElapsed(ldt) 除以配置的值
//LFUTimeElapsed(ldt)原始碼見下
//總的沒訪問的分鐘時間/配置值,得到每分鐘沒訪問衰減多少
unsigned long num_periods = server.lfu_decay_time ?LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
//不能減少為負數,非負數用couter值減去衰減值
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
衰減因子由配置
lfu-decay-time 1 //多少分鐘沒操作訪問就去衰減一次
後8bits的次數,最大值是255,夠不夠?
肯定不夠,一個物件的訪問操作次數肯定不止255次。
但是我們可以讓資料達到255的很難。那麼怎麼做的?這個比較簡單,我們先看原始碼,然後再總結
LFULogIncr方法 (evict.c 檔案)
uint8_t LFULogIncr(uint8_t counter) {
//如果已經到最大值255,返回255 ,8位的最大值
if (counter == 255) return 255;
//得到隨機數(0-1)
double r = (double)rand()/RAND_MAX;
//LFU_INIT_VAL表示基數值(在server.h配置)
double baseval = counter - LFU_INIT_VAL;
//如果達不到基數值,表示快不行了,baseval =0
if (baseval < 0) baseval = 0;
//如果快不行了,肯定給他加counter
//不然,按照機率是否加counter,同時跟baseval與lfu_log_factor相關
//都是在分子,所以2個值越大,加counter機率越小
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;
return counter;
}
八、Redis 持久化
8.1 RBD
8.1.1 何時觸發RBD
自動觸發
- 配置觸發
save 900 1 900s檢查一次,至少有1個key被修改就觸發
save 300 10 300s檢查一次,至少有10個key被修改就觸發
save 60 10000 60s檢查一次,至少有10000個key被修改
- shutdown正常關閉
任何元件在正常關閉的時候,都會去完成應該完成的事。比如Mysql 中的Redolog持久化,正常關閉的時候也會去持久化。
- flushall指令觸發
資料清空指令會觸發RDB操作,並且是觸發一個空的RDB檔案,所以, 如果在沒有開啟其他的持久化的時候,flushall是可以刪庫跑路的,在生產環境慎用。
8.1.2 RDB的優劣
優勢
- 是個非常緊湊型的檔案,非常適合備份與災難恢復
- 最大限度的提升了效能,會fork一個子程序,父程序永遠不會產於磁碟 IO或者類似操作
- 更快的重啟
不足
- 資料安全性不是很高,因為是根據配置的時間來備份,假如每5分鐘備份 一次,也會有5分鐘資料的丟失
- 經常fork子程序,所以比較耗CPU,對CPU不是很友好
8.2 AOF
由於RDB的資料可靠性非常低,所以Redis又提供了另外一種持久化方案: Append Only File 簡稱:AOF
AOF預設是關閉的,你可以在配置檔案中進行開啟:
``` appendonly no //預設關閉,可以進行開啟
The name of the append only file (default:"appendonly.aof")
appendfilename "appendonly.aof" //AOF檔名 ```
追加檔案,即每次更改的命令都會附加到我的AOF檔案中。
8.2.1 同步機制
AOF會記錄每個寫操作,那麼問題來了。我難道每次操作命令都要和磁碟互動?
當然不行,所以redis支援幾種策略,由使用者來決定要不要每次都和磁碟互動
```
appendfsync always 表示每次寫入都執行fsync(重新整理)函式 效能會非常非常慢 但是非常安全
appendfsync everysec 每秒執行一次fsync函式 可能丟失1s的資料
appendfsync no 由作業系統保證資料同步到磁碟,速度最快 你的資料只需要交給作業系統就行
```
預設1s一次,最多有1s丟失
8.2.2 重寫機制
由於AOF是追加的形式,所以檔案會越來越大,越大的話,資料載入越慢。 所以我們需要對AOF檔案進行重寫。
何為重寫
比如 我們的incr指令,假如我們incr了100次,現在資料是100,但是我們的aof檔案中會有100條incr指令,但是我們發現這個100條指令用處不大, 假如我們能把最新的記憶體裡的資料儲存下來的話。所以,重寫就是做了這麼一件事情,把當前記憶體的資料重寫下來,然後把之前的追加的檔案刪除。
重寫流程
-
在Redis7之前
-
Redis fork一個子程序,在一個臨時檔案中寫入新的AOF(當前記憶體的資料生成的新的AOF)
- 那麼在寫入新的AOF的時候,主程序還會有指令進入,那麼主程序會在記憶體快取區中累計新的指令 (但是同時也會寫在舊的AOF檔案中,就算 重寫失敗,也不會導致AOF損壞或者資料丟失)
- 如果子程序重寫完成,父程序會收到完成訊號,並且把記憶體快取中的指令追加到新的AOF檔案中
-
替換舊的AOF檔案 ,並且將新的指令附加到重寫好的AOF檔案中
-
在Redis7之後,AOF檔案不再是一個,所以會有臨時清單的概念
-
子程序開始在一個臨時檔案中寫入新的基礎AOF
- 父級開啟一個新的增量 AOF 檔案以繼續寫入更新。如果重寫失敗,舊的基礎和增量檔案(如果有的話)加上這個新開啟的增量檔案就代表了完整的更新資料集,所以我們是安全的
- 當子程序完成基礎檔案的重寫後,父程序收到一個訊號,並使用新開啟 的增量檔案和子程序生成的基礎檔案來構建臨時清單,並將其持久化
- 現在 Redis 對清單檔案進行原子交換,以便此 AOF 重寫的結果生效。 Redis 還會清理舊的基礎檔案和任何未使用的增量檔案
但是重寫是把當前記憶體的資料,寫入一個新的AOF檔案,如果當前資料比較大,然後以指令的形式寫入的話,會很慢很慢
所以在4.0之後,在重寫的時候是採用RDB的方式去生成新的AOF檔案,然 後後續追加的指令,還是以指令追加的形式追加的檔案末尾
aof-use-rdb-preamble yes //是否開啟RDB與AOF混合模式
什麼時候重寫
配置檔案redis.conf
```
重寫觸發機制
auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb 就算達到了第一個百分比的大小,也必須大於 64M
在 aof 檔案小於64mb的時候不進行重寫,當到達64mb的時候,就重寫一 次。重寫後的 aof 檔案可能是40mb。上面配置了auto-aof-rewritepercentag為100,即 aof 檔案到了80mb的時候,進行重寫。 ```
8.2.3 AOF的優勢與不足
優勢
- 安全性高,就算是預設的持久化同步機制,也最多隻會丟失1s資料
- AOF由於某些原因,比如磁碟滿了等導致追加失敗,也能通過redischeck-aof 工具來修復
- 格式都是追加的日誌,所以可讀性更高
不足
- 資料集一般比RDB大
- 持久化跟資料載入比RDB更慢
- 在7.0之前,重寫的時候,因為重寫的時候,新的指令會快取在記憶體區,所以會導致大量的記憶體使用
- 並且重寫期間,會跟磁碟進行2次IO,一個是寫入老的AOF檔案,一個是寫入新的AOF檔案
九、Redis常見問題總結
9.1 Redis資料丟失場景
9.1.1 持久化丟失
採用RDB或者不持久化, 會有資料丟失,因為是手動或者配置以快照的形式來進行備份。
解決:啟用AOF,以命令追加的形式進行備份,但是預設也會有1s丟失, 這是在效能與資料安全性中尋求的一個最適合的方案,如果為了保證資料 一致性,可以將配置更改為always,但是效能很慢,一般不用。
9.1.2 主從切換
因為Redis的資料是主非同步同步給從的,提升了效能,但是由於是非同步同步到從。所以存在資料丟失的可能
- master寫入資料k1 ,由於是非同步同步到slave,當master沒有同步給slave的時候,master掛了
- slave會成為新的master,並且沒有同步k1
- master重啟,會成為新master的slave,同步資料會清空自己的資料,從新的master載入
- k1丟失
9.2 Redis快取雪崩、穿透、擊穿問題分析
9.2.1 快取雪崩
快取雪崩就是Redis的大量熱點資料同時過期(失效),因為設定了相同的過期時間,剛好這個時候Redis請求的併發量又很大,就會導致所有的請求落到資料庫。
- 保證Redis的高可用,防止由於Redis不可用導致全部打到DB
- 加互斥鎖或者使用佇列,針對同一個key只允許一個執行緒到資料庫查詢
- 快取定時預先更新,避免同時失效
- 通過加隨機數,使key在不同的時間過期
9.2.2 快取穿透
快取穿透是指快取和資料庫中都沒有的資料,但是使用者一直請求不存在的資料!這時的使用者很可能就是攻擊者,惡意搞你們公司的,攻擊會導致資料庫壓力過大。
解決方案:布隆過濾器
布隆過濾器的思想:既然是因為你Redis跟DB都沒有,然後使用者惡意一直訪問不存在的key,然後全部打到Redis跟DB。那麼我們能不能單獨把DB的資料單獨存到另外一個地方,但是這個地方只存一個簡單的標記,標記這個key存不存在,如果存在,就去訪問Redis跟DB,否則直接返回。並且這個記憶體最好很小。
9.2.3 快取擊穿
單個key過期的時候有大量併發,使用互斥鎖,回寫redis,並且採用雙重檢查鎖來提升效能!減少對DB的訪問。
- 應用健康度隱患刨析解決系列之資料庫時區設定
- 對於Vue3和Ts的心得和思考
- 一文詳解擴散模型:DDPM
- zookeeper的Leader選舉原始碼解析
- 一文帶你搞懂如何優化慢SQL
- 京東金融Android瘦身探索與實踐
- 微前端框架single-spa子應用載入解析
- cookie時效無限延長方案
- 聊聊前端效能指標那些事兒
- Spring竟然可以建立“重複”名稱的bean?—一次專案中存在多個bean名稱重複問題的排查
- 京東金融Android瘦身探索與實踐
- Spring原始碼核心剖析
- 深入淺出RPC服務 | 不同層的網路協議
- 安全測試之探索windows遊戲掃雷
- 關於資料庫分庫分表的一點想法
- 對於Vue3和Ts的心得和思考
- Bitmap、RoaringBitmap原理分析
- 京東小程式CI工具實踐
- 測試用例設計指南
- 當你對 redis 說你中意的女孩是 Mia