一文搞懂Redis

語言: CN / TW / HK

作者: 京東物流 劉麗俠 姚再毅 康睿 劉斌 李振

一、Redis的特性

1.1 Redis為什麼快?

  • 基於內存操作,操作不需要跟磁盤交互,單次執行很快
  • 命令執行是單線程,因為是基於內存操作,單次執行的時間快於線程切換時間,同時通信採用多路複用
  • Redis本身就是一個k-v結構,類似於hashMap,所以查詢性能接近於O(1)
  • 同時redis自己底層數據結構支持,比如跳錶、SDS
  • lO多路複用,單個線程中通過記錄跟蹤每一個sock(I/O流)的狀態來管理多個I/O流

1.2 Redis其他特性

  • 更豐富的數據類型,雖然都是k、v結構,value可以存儲很多的數據類型
  • 完善的內存管理機制、保證數據一致性:持久化機制、過期策略
  • 支持多種編程語言
  • 高可用,集羣、保證高可用

1.3 Redis高可用

  • 很完善的內存管理機制,過期、淘汰、持久化
  • 集羣模式,主從、哨兵、cluster集羣

二、Redis數據類型以及使用場景

Redis的數據類型有String、Hash、Set、List、Zset、bitMap(基於String類型)、 Hyperloglog(基於String類型)、Geo(地理位置)、Streams流。

2.1 String

2.1.1 基本指令

``` // 批量設置

mset key1 value1 key2 value2 // 批量獲取 mget key1 key2 // 獲取長度 strlen key
// 字符串追加內容 append key xxx // 獲取指定區間的字符 getrange key 0 5 // 整數值遞增 (遞增指定的值) incr intkey (incrby intkey 10) // 當key 存在時將覆蓋 SETEX key seconds value // 將 key 的值設為 value ,當且僅當 key 不存在。 SETNX key value ```

2.1.2 應用場景

  1. 緩存相關場景 緩存、 token(跟過期屬性完美契合)
  1. 線程安全的計數場景 (軟限流、分佈式ID等)

2.2 Hash

2.2.1 基本指令

``` // 將哈希表 key 中的域 field 的值設為 value 。

HSET key field value // 返回哈希表 key 中給定域 field 的值。 HGET key field // 返回哈希表 key 中的所有域。 HKEYS key // 返回哈希表 key 中所有域的值。 HVALS key // 為哈希表 key 中的域 field 的值加上增量 increment 。 HINCRBY key field increment // 查看哈希表 key 中,給定域 field 是否存在。 HEXISTS key field ```

2.2.2 應用場景

  1. 存儲對象類的數據(官網説的)比如一個對象下有多個字段
  1. 統計類的數據,可以對單個統計數據進行單獨操作

2.3 List

存儲有序的字符串列表,元素可以重複。列表的最大長度為 2^32 - 1 個元素(4294967295,每個列表超過 40 億個元素)。

2.3.1 基本指令

``` // 將一個或多個值 value 插入到列表 key 的表頭

LPUSH key value [value ...] // 將一個或多個值 value 插入到列表 key 的表尾(最右邊)。 RPUSH key value [value ...] // 移除並返回列表 key 的頭元素。 LPOP key // 移除並返回列表 key 的尾元素。 RPOP key // BLPOP 是列表的阻塞式(blocking)彈出原語。 BLPOP key [key ...] timeout // BRPOP 是列表的阻塞式(blocking)彈出原語。 BRPOP key [key ...] timeout // 獲取指點位置元素 LINDEX key index ```

2.3.2 應用場景

  1. 用户消息時間線

因為list是有序的,所以我們可以先進先出,先進後出的列表都可以做

2.4 Set

2.4.1 基本指令

``` // 將一個或多個 member 元素加入到集合 key 當中,已經存在於集合的 member 元素將被忽略。

SADD key member [member ...] // 返回集合 key 中的所有成員。 SMEMBERS key // 返回集合 key 的基數(集合中元素的數量)。 SCARD key // 如果命令執行時,只提供了 key 參數,那麼返回集合中的一個隨機元素。 SRANDMEMBER key [count] // 移除並返回集合中的一個隨機元素。 SPOP key // 移除集合 key 中的一個或多個 member 元素,不存在的 member 元素會被忽略。 SREM key member [member ...] // 判斷 member 元素是否集合 key 的成員。 SISMEMBER key member // 獲取前一個集合有而後面1個集合沒有的 sdiff huihuiset huihuiset1 // 獲取交集 sinter huihuiset huihuiset1 // 獲取並集 sunion huihuiset huihuiset1 ```

2.4.2 應用場景

  1. 抽獎 spop跟srandmember隨機彈出或者獲取元素
  1. 點贊、簽到等,sadd集合存儲
  1. 交集並集 關注等場景

2.5 ZSet(SortedSet)

sorted set,有序的set,每個元素有個score。

score相同時,按照key的ASCII碼排序。

2.5.1 基本指令

``` //將一個或多個 member 元素及其 score 值加入到有序集 key 當中。

ZADD key score member [[score member] [score member] ...] // 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞增(從小到大)來排序 ZRANGE key start stop [WITHSCORES] // 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞減(從大到小)來排列。 ZREVRANGE key start stop [WITHSCORES] // 返回有序集 key 中,所有 score 值介於 min 和 max 之間(包括等於 min 或 max )的成員。有序集成員按 score 值遞增(從小到大)次序排列。 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] // 移除有序集 key 中的一個或多個成員,不存在的成員將被忽略。 ZREM key member [member ...] // 返回有序集 key 的基數。 ZCARD key // 為有序集 key 的成員 member 的 score 值加上增量 increment 。 ZINCRBY key increment member // 返回有序集 key 中, score 值在 min 和 max 之間(默認包括 score 值等於 min 或 max )的成員的數量。 ZCOUNT key min max // 返回有序集 key 中成員 member 的排名。其中有序集成員按 score 值遞增(從小到大)順序排列。 ZRANK key member ```

2.5.2 應用場景

  1. 排行榜

三、Redis的事務

3.1 事務基本操作

// 1. multi命令開啟事務,exec命令提交事務 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> set k1 v1 QUEUED 127.0.0.1:6379(TX)> set k2 v2 QUEUED 127.0.0.1:6379(TX)> exec 1) OK 2) OK // 2. 組隊的過程中可以通過discard來放棄組隊。 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> set k3 v3 QUEUED 127.0.0.1:6379(TX)> set k4 v4 QUEUED 127.0.0.1:6379(TX)> discard OK

3.2 Redis事務特性

  1. 單獨的隔離操作

事務中的所有命令都會序列化、按順序地執行。

事務在執行過程中,不會被其他客户端發送來的命令請求所打斷。

  1. 沒有隔離級別的概念

隊列中的命令沒有提交之前,都不會被實際地執行,因為事務提交之前任何指令都不會被實際執行,也就不存在“事務內的查詢要看到事務裏的更新,在事務外查詢不能看到”這個讓人萬分頭疼的問題。

  1. 不保證原子性

Redis同一個事務中如果有一條命令執行失敗,其後的命令仍然會被執行,沒有回滾。

四、Redis 分佈式鎖

4.1 Lua 實現分佈式鎖

local lockSet = redis.call('exists', KEYS[1]) if lockSet == 0 then redis.call('set', KEYS[1], ARG[1]) redis.call('expire', KEYS[1], ARG[2]) end return lockSet

五、Redis總體數據結構

5.1 Redis dict字典

5.1.1 RedisDb數據接口(server.h文件)

數據最外層的結構為RedisDb

typedef struct redisDb { dict *dict; /* The keyspace for this DB */ //數據庫的鍵 dict *expires; /* Timeout of keys with a timeout set */ //設置了超時時間的鍵 dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //客户端等待的keys dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ //所在數 據庫ID long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,僅用於統計 unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb;

5.1.2 dict數據結構(dict.h文件)

我們發現數據存儲在dict中

typedef struct dict { dictType *type; //理解為面向對象思想,為支持不同的數據類型對應dictType抽象方法,不同的數據類型可以不同實現 void *privdata; //也可不同的數據類型相關,不同類型特定函數的可選參數。 dictht ht[2]; //2個hash表,用來數據存儲 2個dictht long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash標記 -1代表不再rehash unsigned long iterators; /* number of iterators currently running */ } dict;

5.1.3 dictht結構(dict.h文件)

typedef struct dictht { dictEntry **table; //dictEntry 數組 unsigned long size; //數組大小 默認為4 #define DICT_HT_INITIAL_SIZE 4 unsigned long sizemask; //size-1 用來取模得到數據的下標值 unsigned long used; //改hash表中已有的節點數據 } dictht;

5.1.4 dictEntry節點結構(dict.h文件)

typedef struct dictEntry { void *key; //key union { void *val; uint64_t u64; int64_t s64; double d; } v; //value struct dictEntry *next; //指向下一個節點 } dictEntry;

5.1.5 RedisObject(service.h文件)

``` / Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. /

define OBJ_ENCODING_RAW 0 / Raw representation /

define OBJ_ENCODING_INT 1 / Encoded as integer /

define OBJ_ENCODING_HT 2 / Encoded as hash table /

define OBJ_ENCODING_ZIPMAP 3 / Encoded as zipmap /

define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old

list encoding. */

define OBJ_ENCODING_ZIPLIST 5 / Encoded as ziplist /

define OBJ_ENCODING_INTSET 6 / Encoded as intset /

define OBJ_ENCODING_SKIPLIST 7 / Encoded as skiplist /

define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string

encoding */

define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list

of ziplists */

define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree

of listpacks */

define LRU_BITS 24

define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of

obj->lru */

define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution

in ms */

define OBJ_SHARED_REFCOUNT INT_MAX /* Global object

never destroyed. */

define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object

allocated in the stack. */

define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT

typedef struct redisObject { unsigned type:4; //數據類型 string hash list unsigned encoding:4; //底層的數據結構 跳錶 unsigned lru:LRU_BITS; / LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). / int refcount; //用於回收,引用計數法 void *ptr; //指向具體的數據結構的內存地址 比如 跳錶、壓縮列表 } robj; ```

5.2 Redis數據結構圖

2.jpg

5.3 Redis擴容機制

因為我們的dictEntry數組默認大小是4,如果不進行擴容,那麼我們所有的數據都會以鏈表的形式添加至數組下標 隨着數據量越來越大,之前只需要hash取模就能得到下標位置,現在得去循環我下標的鏈表,所以性能會越來越慢,當我們的數據量達到一定程度後,我們就得去觸發擴容操作。

5.3.1 什麼時候擴容

5.3.1.1 擴容機制

  1. 當沒有fork子進程在進行RDB或AOF持久化時,ht[0]的used大於size時,觸發擴容
  1. 如果有子進程在進行RDB或者AOF時,ht[0]的used大於等於size的5倍的時候,會觸發擴容

5.3.1.2 源碼驗證

擴容,肯定是在添加數據的時候才會擴容,所以我們找一個添加數據的入口。

  1. 入口,添加或替換dictReplace (dict.c文件)

int dictReplace(dict *d, void *key, void *val) { dictEntry *entry, *existing, auxentry; /* Try to add the element. If the key * does not exists dictAdd will succeed. */ entry = dictAddRaw(d,key,&existing); if (entry) { dictSetVal(d, entry, val); return 1; } /* Set the new value and free the old one. Note that it is important * to do that in this order, as the value may just be exactly the same * as the previous one. In this context, think to reference counting, * you want to increment (set), and then decrement (free), and not the * reverse. */ auxentry = *existing; dictSetVal(d, existing, val); dictFreeVal(d, &auxentry); return 0; }

  1. 進入dictAddRaw方法 (dict.c文件)

``` dictEntry dictAddRaw(dict d, void key, dictEntry existing){ long index; dictEntry entry; dictht *ht;

if (dictIsRehashing(d)) _dictRehashStep(d); //如果正在Rehash 進行漸進式hash 按步rehash

/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1)
    return NULL;

/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that
in a database
* system it is more likely that recently added
entries are accessed
* more frequently. */
//如果在hash 放在ht[1] 否則放在ht[0]
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
//採用頭插法插入hash桶
entry->next = ht->table[index];
ht->table[index] = entry;
//已使用++
ht->used++;

/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;

} ```

  1. 得到數據下標的方法 _dictKeyIndex (dict.c文件)

``` static long _dictKeyIndex(dict d, const void key,uint64_t hash, dictEntry existing){ unsigned long idx, table; dictEntry he; if (existing) existing = NULL;

/* Expand the hash table if needed */
//擴容如果需要
if (_dictExpandIfNeeded(d) == DICT_ERR)
    return -1;
//比較是否存在這個值
for (table = 0; table <= 1; table++) {
    idx = hash & d->ht[table].sizemask;
    /* Search if this slot does not already contain
    the given key */
    he = d->ht[table].table[idx];
    while(he) {
        if (key==he->key || dictCompareKeys(d, key,he->key)) {
            if (existing) *existing = he;
                return -1;
            }
        he = he->next;
    }
    if (!dictIsRehashing(d)) break;
}
return idx;

} ```

  1. _dictExpandIfNeeded 驗證是否需要擴容 (dict.c文件)

``` / Expand the hash table if needed / static int _dictExpandIfNeeded(dict d){ / Incremental rehashing already in progress. Return.*/ if (dictIsRehashing(d)) return DICT_OK; //正在rehash,返回

/* If the hash table is empty expand it to the initialsize. */
//如果ht[0]的長度為0,設置默認長度4 dictExpand為擴容的關鍵方法
if (d->ht[0].size == 0) 
        return dictExpand(d,DICT_HT_INITIAL_SIZE);

//擴容條件 hash表中已使用的數量大於等於 hash表的大小
//並且dict_can_resize為1 或者 已使用的大小大於hash表大小的 5倍,大於等於1倍的時候,下面2個滿足一個條件即可
if (d->ht[0].used >= d->ht[0].size &&
    (dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){
    //擴容成原來的2倍
    return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;

} ```

5.3.2 怎麼擴容

5.3.2.1 擴容步驟

  1. 當滿足我擴容條件,觸發擴容時,判斷是否在擴容,如果在擴容,或者 擴容的大小跟我現在的ht[0].size一樣,這次擴容不做
  1. new一個新的dictht,大小為ht[0].used * 2(但是必須向上2的冪,比 如6 ,那麼大小為8) ,並且ht[1]=新創建的dictht
  1. 我們有個更大的table了,但是需要把數據遷移到ht[1].table ,所以將 dict的rehashidx(數據遷移的偏移量)賦值為0 ,代表可以進行數據遷 移了,也就是可以rehash了
  1. 等待數據遷移完成,數據不會馬上遷移,而是採用漸進式rehash,慢慢的把數據遷移到ht[1]
  1. 當數據遷移完成,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0
  1. 把dict的rehashidex=-1

5.3.2.2 源碼驗證

  1. dictExpand方法在_dictExpandIfNeeded 方法中調用 (dict.c文件),為擴容的關鍵方法

``` int dictExpand(dict d, unsigned long size){ / the size is invalid if it is smaller than the number of * elements already inside the hash table */ //正在擴容,不允許第二次擴容,或者ht[0]的數據量大於擴容後的數量,沒有意義,這次不擴容 if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR;

dictht n; /* the new hash table */
//得到最接近2的冪 跟hashMap思想一樣
unsigned long realsize = _dictNextPower(size);

/* Rehashing to the same table size is not useful. */
//如果跟ht[0]的大小一致 直接返回錯誤
if (realsize == d->ht[0].size) return DICT_ERR;

/* Allocate the new hash table and initialize all
pointers to NULL */
//為新的dictht賦值
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;

/* Is this the first initialization? If so it's not
really a rehashing
* we just set the first hash table so that it can
accept keys. */
//ht[0].table為空,代表是初始化
if (d->ht[0].table == NULL) {
    d->ht[0] = n;
    return DICT_OK;
}

/* Prepare a second hash table for incremental rehashing */
//將擴容後的dictht賦值給ht[1]
d->ht[1] = n;
//標記為0,代表可以開始rehash
d->rehashidx = 0;
return DICT_OK;

} ```

5.3.3 數據怎麼遷移(漸進式hash)

5.3.3.1 遷移方式

假如一次性把數據全部遷移會很耗時間,會讓單條指令等待很久,會形成阻塞。

所以,redis採用漸進式Rehash,所謂漸進式,就是慢慢的,不會一次性把所有數據遷移。

那什麼時候會進行漸進式數據遷移?

  1. 每次進行key的crud操作都會進行一個hash桶的數據遷移
  1. 定時任務,進行部分數據遷移

5.3.3.2 源碼驗證

CRUD觸發都會觸發_dictRehashStep(漸進式hash)
  1. 每次增刪改的時候都會調用_dictRehashStep方法

if (dictIsRehashing(d)) _dictRehashStep(d); //這個代碼在增刪改查的方法中都會調用

  1. _dictRehashStep方法

static void _dictRehashStep(dict *d) { if (d->iterators == 0) dictRehash(d,1); }

  1. dictRehash方法

``` int dictRehash(dict d, int n) { int empty_visits = n10; / Max number of empty buckets to visit. / //要訪問的最大的空桶數 防止此次耗時過長

if (!dictIsRehashing(d)) return 0; //如果沒有在rehash返回0

//循環,最多1次,並且ht[0]的數據沒有遷移完 進入循環
while(n-- && d->ht[0].used != 0) {
    dictEntry *de, *nextde;
    /* Note that rehashidx can't overflow as we are
    sure there are more
    * elements because ht[0].used != 0 */
    assert(d->ht[0].size > (unsigned long)d->rehashidx);
    //rehashidx rehash的索引,默認0 如果hash桶為空 進入自旋 並且判斷是否到了最大的訪問空桶數
    while(d->ht[0].table[d->rehashidx] == NULL) {
        d->rehashidx++;
        if (--empty_visits == 0) return 1;
    }
    de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下標為rehashidx桶
    /* Move all the keys in this bucket from the old
    to the new hash HT */
    while(de) { //循環hash桶的鏈表 並且轉移到ht[1]
        uint64_t h;
        nextde = de->next;
        /* Get the index in the new hash table */
        h = dictHashKey(d, de->key) & d-
        >ht[1].sizemask;
        de->next = d->ht[1].table[h]; //頭插
        d->ht[1].table[h] = de;
        d->ht[0].used--;
        d->ht[1].used++;
        de = nextde;
    }
    //轉移完後 將ht[0]相對的hash桶設置為null
    d->ht[0].table[d->rehashidx] = NULL;
    d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
//ht[0].used=0 代表rehash全部完成
if (d->ht[0].used == 0) {
    //清空ht[0]table
    zfree(d->ht[0].table);
    //將ht[0] 重新指向已經完成rehash的ht[1]
    d->ht[0] = d->ht[1];
    //將ht[1]所有數據重新設置
    _dictReset(&d->ht[1]);
    //設置-1,代表rehash完成
    d->rehashidx = -1;
    return 0;
}
/* More to rehash... */
return 1;

}

//將ht[1]的屬性復位 static void _dictReset(dictht *ht) { ht->table = NULL; ht->size = 0; ht->sizemask = 0; ht->used = 0; } ```

定時任務觸發

再講定時任務之前,我們要知道Redis中有個時間事件驅動,在 server.c文件下有個serverCron 方法。

serverCron 每隔多久會執行一次,執行頻率根據redis.conf中的hz配置,serverCorn中有個方法databasesCron()

  1. 定時方法serverCron

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { //....... /* We need to do a few operations on clients asynchronously. */ clientsCron(); /* Handle background operations on Redis databases. */ databasesCron(); //處理Redis數據庫上的後台操作。 //....... }

  1. databasesCron方法

void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); } } /* Defrag keys gradually. */ activeDefragCycle(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ if (!hasActiveChildProcess()) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; /* Don't test more DBs than we have. */ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; /* Resize */ for (j = 0; j < dbs_per_call; j++) { tryResizeHashTables(resize_db % server.dbnum); resize_db++; } /* Rehash */ //Rehash邏輯 if (server.activerehashing) { for (j = 0; j < dbs_per_call; j++) { //進入incrementallyRehash int work_done = incrementallyRehash(rehash_db); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ break; } else { /* If this db didn't need rehash, we'll try the next one. */ rehash_db++; rehash_db %= server.dbnum; } } } } }

  1. 進入incrementallyRehash方法(server.c)

int incrementallyRehash(int dbid) { /* Keys dictionary */ if (dictIsRehashing(server.db[dbid].dict)) { dictRehashMilliseconds(server.db[dbid].dict,1); return 1; /* already used our millisecond for this loop... */ } /* Expires */ if (dictIsRehashing(server.db[dbid].expires)) { dictRehashMilliseconds(server.db[dbid].expires,1); return 1; /* already used our millisecond for this loop... */ } return 0; }

  1. 進入dictRehashMilliseconds(dict.c)

int dictRehashMilliseconds(dict *d, int ms) { long long start = timeInMilliseconds(); int rehashes = 0; //進入rehash方法 dictRehash 去完成漸進時HASH while(dictRehash(d,100)) { rehashes += 100; //判斷是否超時 if (timeInMilliseconds()-start > ms) break; } return rehashes; }

5.4 String類型數據結構

Redis中String的底層沒有用c的char來實現,而是採用了SDS(simple Dynamic String)的數據結構來實現。並且提供了5種不同的類型

5.4.1 SDS數據結構定義(sds.h文件)

``` typedef char sds; / Note: sdshdr5 is never used, we just access the flags byte directly. * However is here to document the layout of type 5 SDS strings. / struct attribute ((packed)) sdshdr5 { unsigned char flags; / 3 lsb of type, and 5 msb of string length */ char buf[]; }

struct attribute ((packed)) sdshdr8 { uint8_t len; / used / //已使用的長度 uint8_t alloc; / excluding the header and null terminator / //分配的總容量 不包含頭和空終止符 unsigned char flags; / 3 lsb of type, 5 unused bits / //低三位類型 高5bit未使用 char buf[]; //數據buf 存儲字符串數組 };

struct attribute ((packed)) sdshdr16 { uint16_t len; / used / uint16_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; };

struct attribute ((packed)) sdshdr32 { uint32_t len; / used / uint32_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; };

struct attribute ((packed)) sdshdr64 { uint64_t len; / used / uint64_t alloc; / excluding the header and null terminator / unsigned char flags; / 3 lsb of type, 5 unused bits / char buf[]; }; ```

5.4.2 SDS數據結構的優勢

  1. char字符串不記錄自身長度,所以獲取一個字符串長度的複雜度是O(n),但是SDS記錄分配的長度alloc,已使用的長度len,獲取長度為 O(1)
  1. 減少修改字符串帶來的內存重分配次數
  1. 空間預分配,SDS長度如果小於1MB,預分配跟長度一樣的,大於1M,每次跟len的大小多1M
  1. 惰性空間釋放,截取的時候不馬上釋放空間,供下次使用!同時提供相應的釋放SDS未使用空間的API
  1. 二進制安全

5.5 Hash類型數據結構

Redis的字典相當於Java語言中的HashMap,他是無序字典。內部結構上同樣是數組 + 鏈表二維結構。第一維hash的數組位置碰撞時,就會將碰撞的元素使用鏈表串起來。

5.5.1 Hash數據結構圖

3.png

5.5.2 Hash的壓縮列表

壓縮列表會根據存入的數據的不同類型以及不同大小,分配不同大小的空間。所以是為了節省內存而採用的。

因為是一塊完整的內存空間,當裏面的元素髮生變更時,會產生連鎖更新,嚴重影響我們的訪問性能。所以,只適用於數據量比較小的場景。

所以,Redis會有相關配置,Hashes只有小數據量的時候才會用到ziplist,當hash對象同時滿足以下兩個條件的時候,使用ziplist編碼:

  1. 哈希對象保存的鍵值對數量<512個
  1. 所有的鍵值對的鍵和值的字符串長度都<64byte(一個英文字母一個字節)

hash-max-ziplist-value 64 // ziplist中最大能存放的值長度 hash-max-ziplist-entries 512 // ziplist中最多能存放的entry節點數量

5.6 List類型數據結構

5.6.1 quickList快速列表

兼顧了ziplist的節省內存,並且一定程度上解決了連鎖更新的問題,我們的 quicklistNode節點裏面是個ziplist,每個節點是分開的。那麼就算髮生了連鎖更新,也只會發生在一個quicklistNode節點。

quicklist.h文件

typedef struct{ struct quicklistNode *prev; //前指針 struct quicklistNode *next; //後指針 unsigned char *zl; //數據指針 指向ziplist結果 unsigned int sz; //ziplist大小 /* ziplist size in bytes */ unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素 unsigned int encoding : 2; /* RAW==1 or LZF==2 */ // 是否壓縮, 1沒有壓縮 2 lzf壓縮 unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //預留容器字段 unsigned int recompress : 1; /* was this node previous compressed? */ unsigned int attempted_compress : 1; /* node can't compress; too small */ unsigned int extra : 10; /* more bits to steal for future usage */ //預留字段 } quicklistNode;

5.6.2 list數據結構圖

4.png

5.7 Set類型數據結構

Redis用intset或hashtable存儲set。滿足下面條件,就用inset存儲。

  1. 如果不是整數類型,就用dictht hash表(數組+鏈表)
  1. 如果元素個數超過512個,也會用hashtable存儲。跟一個配置有關:

set-max-intset-entries 512

不滿足上述條件的,都使用hash表存儲,value存null。

5.8 ZSet數據結構

5.8.1 ZipList

默認使用ziplist編碼。

在ziplist的內部,按照score排序遞增來存儲。插入的時候要移動之後的數據。

如果元素數量大於等於128個,或者任一member長度大於等於64字節使用 skiplist+dict存儲。

zset-max-ziplist-entries 128 zset-max-ziplist-value 64

如果不滿足條件,採用跳錶。

5.8.2 跳錶(skiplist)

結構定義(servier.h)

``` / ZSETs use a specialized version of Skiplists / typedef struct zskiplistNode { sds ele; //sds數據 double score; //score struct zskiplistNode backward; //後退指針 //層級數組 struct zskiplistLevel { struct zskiplistNode forward; //前進指針 unsigned long span; //跨度 } level[]; } zskiplistNode;

//跳錶列表 typedef struct zskiplist { struct zskiplistNode header, tail; //頭尾節點 unsigned long length; //節點數量 int level; //最大的節點層級 } zskiplist; ```

zslInsert 添加節點方法 (t_zset.c)

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || 咕泡出品 必屬精品精品屬精品 咕泡出品 必屬精品 咕泡出品 必屬精品 咕泡咕泡 (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); //不同層級建立鏈表聯繫 for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; }

ZSKIPLIST_MAXLEVEL默認32 定義在server.h

```

define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64

elements */ ```

六、Redis過期策略

6.1 惰性過期

所謂惰性過期,就是在每次訪問操作key的時候,判斷這個key是不是過期了,過期了就刪除。

6.1.1 源碼驗證

  1. expireIfNeeded方法(db.c文件)

int expireIfNeeded(redisDb *db, robj *key) { if (!keyIsExpired(db,key)) return 0; /* If we are running in the context of a slave, instead of * evicting the expired key from the database, we return ASAP: * the slave key expiration is controlled by the master that will * send us synthesized DEL operations for expired keys. * * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ // 如果配置有masterhost,説明是從節點,那麼不操作刪除 if (server.masterhost != NULL) return 1; /* Delete the key */ server.stat_expiredkeys++; propagateExpire(db,key,server.lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); //是否是異步刪除 防止單個Key的數據量很大 阻塞主線程 是4.0之後添加的新功能,默認關閉 int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key); if (retval) signalModifiedKey(NULL,db,key); return retval; }

每次調用到相關指令時,才會執行expireIfNeeded判斷是否過期,平時不會去判斷是否過期。

該策略可以最大化的節省CPU資源。

但是卻對內存非常不友好。因為如果沒有再次訪問,就可能一直堆積再內存中。佔用內存

所以就需要另一種策略來配合使用,就是定期過期

6.2 定期過期

那麼多久去清除一次,我們在講Rehash的時候,有個方法是serverCron,執行頻率根據redis.conf中的hz配置。

這方法除了做Rehash以外,還會做很多其他的事情,比如:

  1. 清理數據庫中的過期鍵值對
  1. 更新服務器的各類統計信息,比如時間、內存佔用、數據庫佔用情況等
  1. 關閉和清理連接失效的客户端
  1. 嘗試進行持久化操作

6.2.1 實現流程

  1. 定時serverCron方法去執行清理,執行頻率根據redis.cong中的hz配置的值(默認是10,也就是1s執行10次,100ms執行一次)
  1. 執行清理的時候,不是去掃描所有的key,而是去掃描所有設置了過期時間的key redisDB.expires
  1. 如果每次去把所有的key都拿出來,那麼假如過期的key很多,就會很慢,所以也不是一次性拿出所有的key
  1. 根據hash桶的維度去掃描key,掃到20(可配置)個key為止。假如第一個桶是15個key沒有滿足20,繼續掃描第二個桶,第二個桶20個key,由於是以hash桶的維度掃描的,所以第二個掃到了就會全掃,總共掃描35個key
  1. 找到掃描的key裏面過期的key,並進行刪除
  1. 如果取了400個空桶(最多拿400個桶),或者掃描的刪除比例跟掃描的總數超過10%,繼續執行4、5步
  1. 也不能無限的循環,循環16次後回去檢測時間,超過指定時間會跳出

6.2.2 源碼驗證

  1. 入口serverCrom

``` // serverCron方法調用databasesCron方法(server.c) / Handle background operations on Redis databases. / databasesCron();

void databasesCron(void) { / Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. / if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); //過期刪除方法 } else { expireSlaveKeys(); } } } ```

七、Redis淘汰策略

由於Redis內存是有大小的,並且我可能裏面的數據都沒有過期,當快滿的時候,我又沒有過期的數據進行淘汰,那麼這個時候內存也會滿。內存滿了,Redis也會不能放入新的數據。所以,我們不得已需要一些策略來解決這個問題來保證可用性。

7.1 淘汰策略

7.1.1 noeviction

New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database.

默認,不淘汰 能讀不能寫

7.1.2 allkeys-lru

Keeps most recently used keys; removes least recently used (LRU) keys

基於偽LRU算法,在所有的key中去淘汰

7.1.3 allkeys-lfu

Keeps frequently used keys; removes least frequently used (LFU) keys.

基於偽LFU算法,在所有的key中去淘汰

7.1.4 volatile-lru

Removes least recently used keys with the expire field set to true.

基於偽LRU算法,在設置了過期時間的key中去淘汰

7.1.5 volatile-lfu

Removes least frequently used keys with the expire field set to true.

基於偽LFU算法 在設置了過期時間的key中去淘汰

7.1.6 allkeys-random

Randomly removes keys to make space for the new data added.

基於隨機算法,在所有的key中去淘汰

7.1.7 volatile-random

Randomly removes keys with expire field set to true.

基於隨機算法 在設置了過期時間的key中去淘汰

7.1.8 volatile-ttl

Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value.

根據過期時間來,淘汰即將過期的

7.2 淘汰流程

  1. 首先,我們會有個淘汰池,默認大小是16,並且裏面的數據是末尾淘汰制
  1. 每次指令操作的時候,自旋會判斷當前內存是否滿足指令所需要的內存
  1. 如果當前不滿足,會從淘汰池的尾部拿一個最適合淘汰的數據
  1. 會取樣(配置 maxmemory-samples),從Redis中獲取隨機獲取到取樣的數據,解決一次性讀取所有的數據慢的問題
  1. 在取樣的數據中,根據淘汰算法找到最適合淘汰的數據
  1. 將最合適的那個數據跟淘汰池中的數據進行比較,是否比淘汰池的數據更適合淘汰,如果更適合,放入淘汰池
  1. 按照適合的程度進行排序,最適合淘汰的放入尾部
  1. 將需要淘汰的數據從Redis刪除,並且從淘汰池移除

7.3 LRU算法

Lru,Least Recently Used 翻譯過來是最久未使用,根據時間軸來走,仍很久沒用的數據。只要最近有用過,我就默認是有效的。

那麼它的一個衡量標準是啥?時間!根據使用時間,從近到遠,越遠的越容易淘汰。

7.3.1 實現原理

需求:得到對象隔多久沒訪問,隔的時間越久,越容易被淘汰

  1. 首先,LRU是根據這個對象的訪問操作時間來進行淘汰的,那麼我們需要知道這個對象最後操作的訪問時間
  1. 知道了對象的最後操作訪問時間後,我們只需要跟當前系統時間來進行對比,就能算出對象多久沒訪問了

7.3.2 源碼驗證

redis中,對象都會被redisObject對象包裝,其中有個字段叫lru。

redisObject對象 (server.h文件)

typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj;

看LRU的字段的説明,那麼我們大概能猜出來,redis去實現lru淘汰算法肯定跟我們這個對象的lru相關

並且這個字段的大小為24bit,那麼這個字段記錄的是對象操作訪問時候的秒單位時間的後24bit

相當於:

long timeMillis=System.currentTimeMillis(); System.out.println(timeMillis/1000); //獲取當前秒 System.out.println(timeMillis/1000 & ((1<<24)-1)); //獲取秒的後24位

我們知道了這個對象的最後操作訪問的時間。如果我們要得到這個對象多久沒訪問了,我們是不是就很簡單,用我當前的時間-這個對象的訪問時間就可以了!但是這個訪問時間是秒單位時間的後24bit 所以,也是用當前的時間的秒單位的後24bit去減。

estimateObjectIdleTime方法(evict.c)

unsigned long long estimateObjectIdleTime(robj *o) { //獲取秒單位時間的最後24位 unsigned long long lruclock = LRU_CLOCK(); //因為只有24位,所有最大的值為2的24次方-1 //超過最大值從0開始,所以需要判斷lruclock(當前系統時間)跟緩存對象的lru字段的大小 if (lruclock >= o->lru) { //如果lruclock>=robj.lru,返回lruclock-o->lru,再轉換單位 return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION; } else { //否則採用lruclock + (LRU_CLOCK_MAX - o->lru),得到對 象的值越小,返回的值越大,越大越容易被淘汰 return (lruclock + (LRU_CLOCK_MAX - o->lru)) * LRU_CLOCK_RESOLUTION; } }

7.3.3 LRU總結

用lruclock與 redisObject.lru進行比較,因為lruclock只獲取了當前秒單位時間的後24位,所以肯定有個輪詢。

所以,我們會用lruclock跟 redisObject.lru進行比較,如果lruclock>redisObject.lru,那麼我們用lruclock- redisObject.lru,否則lruclock+(24bit的最大值-redisObject.lru),得到lru越小,那麼返回的數據越大,相差越大的越優先會被淘汰!

http://www.processon.com/view/link/5fbe0541e401fd2d6ed8fc38

7.4 LFU算法

LFU,英文 Least Frequently Used,翻譯成中文就是最不常用的優先淘汰。

不常用,它的衡量標準就是次數,次數越少的越容易被淘汰。

這個實現起來應該也很簡單,對象被操作訪問的時候,去記錄次數,每次操作訪問一次,就+1;淘汰的時候,直接去比較這個次數,次數越少的越容易淘汰!

7.4.1 LFU的時效性問題

何為時效性問題?就是隻會去考慮數量,但是不會去考慮時間。

ps:去年的某個新聞很火,點擊量3000W,今年有個新聞剛出來點擊量100次,本來我們是想保留今年的新的新聞的,但是如果根據LFU來做的話,我們發現淘汰的是今年的新聞,明顯是不合理的

導致的問題:新的數據進不去,舊的數據出不來。

那麼如何解決呢,且往下看

7.4.2 源碼分析

我們還是來看redisObject(server.h)

typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj;

我們看它的lru,它如果在LFU算法的時候!它前面16位代表的是時間,後8位代表的是一個數值,frequency是頻率,應該就是代表這個對象的訪問次數,我們先給它叫做counter。

前16bits時間有啥用?

跟時間相關,我猜想應該是跟解決時效性相關的,那麼怎麼解決的?從生活中找例子

大家應該充過一些會員,比如我這個年紀的,小時候喜歡充騰訊的黃鑽、綠鑽、藍鑽等等。但是有一個點,假如哪天我沒充錢了的話,或者沒有續VIP的時候,我這個鑽石等級會隨着時間的流失而降低。比如我本來是黃鑽V6,但是一年不充錢的話,可能就變成了V4。

那麼回到Redis,大膽的去猜測,那麼這個時間是不是去記錄這個對象有多久沒訪問了,如果多久沒訪問,我就去減少對應的次數。

LFUDecrAndReturn方法(evict.c)

unsigned long LFUDecrAndReturn(robj *o) { //lru字段右移8位,得到前面16位的時間 unsigned long ldt = o->lru >> 8; //lru字段與255進行&運算(255代表8位的最大值), //得到8位counter值 unsigned long counter = o->lru & 255; //如果配置了lfu_decay_time,用LFUTimeElapsed(ldt) 除以配置的值 //LFUTimeElapsed(ldt)源碼見下 //總的沒訪問的分鐘時間/配置值,得到每分鐘沒訪問衰減多少 unsigned long num_periods = server.lfu_decay_time ?LFUTimeElapsed(ldt) / server.lfu_decay_time : 0; if (num_periods) //不能減少為負數,非負數用couter值減去衰減值 counter = (num_periods > counter) ? 0 : counter - num_periods; return counter; }

衰減因子由配置

lfu-decay-time 1 //多少分鐘沒操作訪問就去衰減一次

後8bits的次數,最大值是255,夠不夠?

肯定不夠,一個對象的訪問操作次數肯定不止255次。

但是我們可以讓數據達到255的很難。那麼怎麼做的?這個比較簡單,我們先看源碼,然後再總結

LFULogIncr方法 (evict.c 文件)

uint8_t LFULogIncr(uint8_t counter) { //如果已經到最大值255,返回255 ,8位的最大值 if (counter == 255) return 255; //得到隨機數(0-1) double r = (double)rand()/RAND_MAX; //LFU_INIT_VAL表示基數值(在server.h配置) double baseval = counter - LFU_INIT_VAL; //如果達不到基數值,表示快不行了,baseval =0 if (baseval < 0) baseval = 0; //如果快不行了,肯定給他加counter //不然,按照機率是否加counter,同時跟baseval與lfu_log_factor相關 //都是在分子,所以2個值越大,加counter機率越小 double p = 1.0/(baseval*server.lfu_log_factor+1); if (r < p) counter++; return counter; }

八、Redis 持久化

8.1 RBD

8.1.1 何時觸發RBD

自動觸發

  1. 配置觸發

save 900 1 900s檢查一次,至少有1個key被修改就觸發 save 300 10 300s檢查一次,至少有10個key被修改就觸發 save 60 10000 60s檢查一次,至少有10000個key被修改

  1. shutdown正常關閉

任何組件在正常關閉的時候,都會去完成應該完成的事。比如Mysql 中的Redolog持久化,正常關閉的時候也會去持久化。

  1. flushall指令觸發

數據清空指令會觸發RDB操作,並且是觸發一個空的RDB文件,所以, 如果在沒有開啟其他的持久化的時候,flushall是可以刪庫跑路的,在生產環境慎用。

8.1.2 RDB的優劣

優勢

  1. 是個非常緊湊型的文件,非常適合備份與災難恢復
  1. 最大限度的提升了性能,會fork一個子進程,父進程永遠不會產於磁盤 IO或者類似操作
  1. 更快的重啟

不足

  1. 數據安全性不是很高,因為是根據配置的時間來備份,假如每5分鐘備份 一次,也會有5分鐘數據的丟失
  1. 經常fork子進程,所以比較耗CPU,對CPU不是很友好

8.2 AOF

由於RDB的數據可靠性非常低,所以Redis又提供了另外一種持久化方案: Append Only File 簡稱:AOF

AOF默認是關閉的,你可以在配置文件中進行開啟:

``` appendonly no //默認關閉,可以進行開啟

The name of the append only file (default:"appendonly.aof")

appendfilename "appendonly.aof" //AOF文件名 ```

追加文件,即每次更改的命令都會附加到我的AOF文件中。

8.2.1 同步機制

AOF會記錄每個寫操作,那麼問題來了。我難道每次操作命令都要和磁盤交互?

當然不行,所以redis支持幾種策略,由用户來決定要不要每次都和磁盤交互

```

appendfsync always 表示每次寫入都執行fsync(刷新)函數 性能會非常非常慢 但是非常安全

appendfsync everysec 每秒執行一次fsync函數 可能丟失1s的數據

appendfsync no 由操作系統保證數據同步到磁盤,速度最快 你的數據只需要交給操作系統就行

```

默認1s一次,最多有1s丟失

8.2.2 重寫機制

由於AOF是追加的形式,所以文件會越來越大,越大的話,數據加載越慢。 所以我們需要對AOF文件進行重寫。

何為重寫

比如 我們的incr指令,假如我們incr了100次,現在數據是100,但是我們的aof文件中會有100條incr指令,但是我們發現這個100條指令用處不大, 假如我們能把最新的內存裏的數據保存下來的話。所以,重寫就是做了這麼一件事情,把當前內存的數據重寫下來,然後把之前的追加的文件刪除。

重寫流程

  • 在Redis7之前

  • Redis fork一個子進程,在一個臨時文件中寫入新的AOF(當前內存的數據生成的新的AOF)

  1. 那麼在寫入新的AOF的時候,主進程還會有指令進入,那麼主進程會在內存緩存區中累計新的指令 (但是同時也會寫在舊的AOF文件中,就算 重寫失敗,也不會導致AOF損壞或者數據丟失)
  1. 如果子進程重寫完成,父進程會收到完成信號,並且把內存緩存中的指令追加到新的AOF文件中
  1. 替換舊的AOF文件 ,並且將新的指令附加到重寫好的AOF文件中

  2. 在Redis7之後,AOF文件不再是一個,所以會有臨時清單的概念

  3. 子進程開始在一個臨時文件中寫入新的基礎AOF

  1. 父級打開一個新的增量 AOF 文件以繼續寫入更新。如果重寫失敗,舊的基礎和增量文件(如果有的話)加上這個新打開的增量文件就代表了完整的更新數據集,所以我們是安全的
  1. 當子進程完成基礎文件的重寫後,父進程收到一個信號,並使用新打開 的增量文件和子進程生成的基礎文件來構建臨時清單,並將其持久化
  1. 現在 Redis 對清單文件進行原子交換,以便此 AOF 重寫的結果生效。 Redis 還會清理舊的基礎文件和任何未使用的增量文件

但是重寫是把當前內存的數據,寫入一個新的AOF文件,如果當前數據比較大,然後以指令的形式寫入的話,會很慢很慢

所以在4.0之後,在重寫的時候是採用RDB的方式去生成新的AOF文件,然 後後續追加的指令,還是以指令追加的形式追加的文件末尾

aof-use-rdb-preamble yes //是否開啟RDB與AOF混合模式

什麼時候重寫

配置文件redis.conf

```

重寫觸發機制

auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb 就算達到了第一個百分比的大小,也必須大於 64M

在 aof 文件小於64mb的時候不進行重寫,當到達64mb的時候,就重寫一 次。重寫後的 aof 文件可能是40mb。上面配置了auto-aof-rewritepercentag為100,即 aof 文件到了80mb的時候,進行重寫。 ```

8.2.3 AOF的優勢與不足

優勢

  1. 安全性高,就算是默認的持久化同步機制,也最多隻會丟失1s數據
  1. AOF由於某些原因,比如磁盤滿了等導致追加失敗,也能通過redischeck-aof 工具來修復
  1. 格式都是追加的日誌,所以可讀性更高

不足

  1. 數據集一般比RDB大
  1. 持久化跟數據加載比RDB更慢
  1. 在7.0之前,重寫的時候,因為重寫的時候,新的指令會緩存在內存區,所以會導致大量的內存使用
  1. 並且重寫期間,會跟磁盤進行2次IO,一個是寫入老的AOF文件,一個是寫入新的AOF文件

九、Redis常見問題總結

9.1 Redis數據丟失場景

9.1.1 持久化丟失

採用RDB或者不持久化, 會有數據丟失,因為是手動或者配置以快照的形式來進行備份。

解決:啟用AOF,以命令追加的形式進行備份,但是默認也會有1s丟失, 這是在性能與數據安全性中尋求的一個最適合的方案,如果為了保證數據 一致性,可以將配置更改為always,但是性能很慢,一般不用。

9.1.2 主從切換

因為Redis的數據是主異步同步給從的,提升了性能,但是由於是異步同步到從。所以存在數據丟失的可能

  1. master寫入數據k1 ,由於是異步同步到slave,當master沒有同步給slave的時候,master掛了
  1. slave會成為新的master,並且沒有同步k1
  1. master重啟,會成為新master的slave,同步數據會清空自己的數據,從新的master加載
  1. k1丟失

9.2 Redis緩存雪崩、穿透、擊穿問題分析

9.2.1 緩存雪崩

緩存雪崩就是Redis的大量熱點數據同時過期(失效),因為設置了相同的過期時間,剛好這個時候Redis請求的併發量又很大,就會導致所有的請求落到數據庫。

  1. 保證Redis的高可用,防止由於Redis不可用導致全部打到DB
  1. 加互斥鎖或者使用隊列,針對同一個key只允許一個線程到數據庫查詢
  1. 緩存定時預先更新,避免同時失效
  1. 通過加隨機數,使key在不同的時間過期

9.2.2 緩存穿透

緩存穿透是指緩存和數據庫中都沒有的數據,但是用户一直請求不存在的數據!這時的用户很可能就是攻擊者,惡意搞你們公司的,攻擊會導致數據庫壓力過大。

解決方案:布隆過濾器

布隆過濾器的思想:既然是因為你Redis跟DB都沒有,然後用户惡意一直訪問不存在的key,然後全部打到Redis跟DB。那麼我們能不能單獨把DB的數據單獨存到另外一個地方,但是這個地方只存一個簡單的標記,標記這個key存不存在,如果存在,就去訪問Redis跟DB,否則直接返回。並且這個內存最好很小。

9.2.3 緩存擊穿

單個key過期的時候有大量併發,使用互斥鎖,回寫redis,並且採用雙重檢查鎖來提升性能!減少對DB的訪問。