萬字長文解析Redis資料傾斜與JD開源hotkey原始碼分析

語言: CN / TW / HK

1 前言

之前旁邊的小夥伴問我熱點資料相關問題,在給他粗略的講解一波redis資料傾斜的案例之後,自己也順道回顧了一些關於熱點資料處理的方法論,同時也想起去年所學習JD開源專案hotkey——專門用來解決熱點資料問題的框架。在這裡結合兩者所關聯到的知識點,通過幾個小圖和部分粗略的講解,來讓大家瞭解相關方法論以及hotkey的原始碼解析。

2 Redis資料傾斜

2.1 定義與危害

先說說資料傾斜的定義,借用百度詞條的解釋:

對於集群系統,一般快取是分散式的,即不同節點負責一定範圍的快取資料。我們把快取資料分散度不夠,導致大量的快取資料集中到了一臺或者幾臺服務節點上,稱為資料傾斜。一般來說資料傾斜是由於負載均衡實施的效果不好引起的。

從上面的定義中可以得知,資料傾斜的原因一般是因為LB的效果不好,導致部分節點資料量非常集中。

那這又會有什麼危害呢?

如果發生了資料傾斜,那麼儲存了大量資料,或者是儲存了熱點資料的例項的處理壓力就會增大,速度變慢,甚至還可能會引起這個例項的記憶體資源耗盡,從而崩潰。這是我們在應用切片叢集時要避免的。

2.2 資料傾斜的分類

2.2.1 資料量傾斜(寫入傾斜)

1.圖示

如圖,在某些情況下,例項上的資料分佈不均衡,某個例項上的資料特別多。

2.bigkey導致傾斜

某個例項上正好儲存了 bigkey。bigkey 的 value 值很大(String 型別),或者是 bigkey 儲存了大量集合元素(集合型別),會導致這個例項的資料量增加,記憶體資源消耗也相應增加。

應對方法

  • 在業務層生成資料時,要儘量避免把過多的資料儲存在同一個鍵值對中。

  • 如果 bigkey 正好是集合型別,還有一個方法,就是把 bigkey 拆分成很多個小的集合型別資料,分散儲存在不同的例項上。

3.Slot分配不均導致傾斜

先簡單的介紹一下slot的概念,slot其實全名是Hash Slot(雜湊槽),在Redis Cluster切片叢集中一共有16384 個 Slot,這些雜湊槽類似於資料分割槽,每個鍵值對都會根據它的 key,被對映到一個雜湊槽中。Redis Cluster 方案採用雜湊槽來處理資料和例項之間的對映關係。

一張圖來解釋,資料、雜湊槽、例項這三者的對映分佈情況。

這裡的CRC16(city)%16384可以簡單的理解為將key1根據CRC16演算法取hash值然後對slot個數取模,得到的就是slot位置為14484,他所對應的例項節點是第三個。

運維在構建切片叢集時候,需要手動分配雜湊槽,並且把16384 個槽都分配完,否則 Redis 叢集無法正常工作。由於是手動分配,則可能會導致部分例項所分配的slot過多,導致資料傾斜。

應對方法

使用CLUSTER SLOTS 命令來檢視slot分配情況,使用CLUSTER SETSLOT,CLUSTER GETKEYSINSLOT,MIGRATE這三個命令來進行slot資料的遷移,具體內容不再這裡細說,感興趣的同學可以自行學習一下。

4.Hash Tag導致傾斜

  • Hash Tag 定義 :指當一個key包含 {} 的時候,就不對整個key做hash,而僅對 {} 包括的字串做hash。

  • 假設hash演算法為sha1。對user:{user1}:ids和user:{user1}:tweets,其hash值都等同於sha1(user1)。

  • Hash Tag 優勢 :如果不同 key 的 Hash Tag 內容都是一樣的,那麼,這些 key 對應的資料會被對映到同一個 Slot 中,同時會被分配到同一個例項上。

  • Hash Tag 劣勢 :如果不合理使用,會導致大量的資料可能被集中到一個例項上發生資料傾斜,叢集中的負載不均衡。

2.2.2 資料訪問傾斜(讀取傾斜-熱key問題)

一般來說資料訪問傾斜就是熱key問題導致的,如何處理redis熱key問題也是面試中常會問到的。所以瞭解相關概念及方法論也是不可或缺的一環。

1.圖示

如圖,雖然每個叢集例項上的資料量相差不大,但是某個例項上的資料是熱點資料,被訪問得非常頻繁。

但是為啥會有熱點資料的產生呢?

2.產生熱key的原因及危害

1)使用者消費的資料遠大於生產的資料(熱賣商品、熱點新聞、熱點評論、明星直播)。

在日常工作生活中一些突發的的事件,例如:雙十一期間某些熱門商品的降價促銷,當這其中的某一件商品被數萬次點選瀏覽或者購買時,會形成一個較大的需求量,這種情況下就會造成熱點問題。

同理,被大量刊發、瀏覽的熱點新聞、熱點評論、明星直播等,這些典型的讀多寫少的場景也會產生熱點問題。

2)請求分片集中,超過單 Server 的效能極限。

在服務端讀資料進行訪問時,往往會對資料進行分片切分,此過程中會在某一主機 Server 上對相應的 Key 進行訪問,當訪問超過 Server 極限時,就會導致熱點 Key 問題的產生。

如果熱點過於集中,熱點 Key 的快取過多,超過目前的快取容量時,就會導致快取分片服務被打垮現象的產生。當快取服務崩潰後,此時再有請求產生,會快取到後臺 DB 上,由於DB 本身效能較弱,在面臨大請求時很容易發生請求穿透現象,會進一步導致雪崩現象,嚴重影響裝置的效能。

3.常用的熱key問題解決辦法:

解決方案一: 備份熱key

可以把熱點資料複製多份,在每一個數據副本的 key 中增加一個隨機字尾,讓它和其它副本資料不會被對映到同一個 Slot 中。

這裡相當於把一份資料複製到其他例項上,這樣在訪問的時候也增加隨機字首,將對一個例項的訪問壓力,均攤到其他例項上

例如:

我們在放入快取時就將對應業務的快取key拆分成多個不同的key。如下圖所示,我們首先在更新快取的一側,將key拆成N份,比如一個key名字叫做”good_100”,那我們就可以把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增時都需要去改動這N個key,這一步就是拆key。

對於service端來講,我們就需要想辦法儘量將自己訪問的流量足夠的均勻。

如何給自己即將訪問的熱key上加入字尾?幾種辦法,根據本機的ip或mac地址做hash,之後的值與拆key的數量做取餘,最終決定拼接成什麼樣的key字尾,從而打到哪臺機器上;服務啟動時的一個隨機數對拆key的數量做取餘。

虛擬碼如下:

const M = N * 2
//生成隨機數
random = GenRandom(0, M)
//構造備份新key
bakHotKey = hotKey + “_” + random
data = redis.GET(bakHotKey)
if data == NULL {
data = GetFromDB()
redis.SET(bakHotKey, expireTime + GenRandom(0,5))
}

解決方案二: 本地快取+動態計算自動發現熱點快取

基本流程圖

該方案通過主動發現熱點並對其進行儲存來解決熱點 Key 的問題。首先 Client 也會訪問 SLB,並且通過 SLB 將各種請求分發至 Proxy 中,Proxy 會按照基於路由的方式將請求轉發至後端的 Redis 中。

在熱點 key 的解決上是採用在服務端增加快取的方式進行。具體來說就是在 Proxy 上增加本地快取,本地快取採用 LRU 演算法來快取熱點資料,後端節點增加熱點資料計算模組來返回熱點資料。

Proxy 架構的主要有以下優點:

  • Proxy 本地快取熱點,讀能力可水平擴充套件

  • DB 節點定時計算熱點資料集合

  • DB 反饋 Proxy 熱點資料

  • 對客戶端完全透明,不需做任何相容

熱點資料的發現與儲存

對於熱點資料的發現,首先會在一個週期內對 Key 進行請求統計,在達到請求量級後會對熱點 Key 進行熱點定位,並將所有的熱點 Key 放入一個小的 LRU 連結串列內,在通過 Proxy 請求進行訪問時,若 Redis 發現待訪點是一個熱點,就會進入一個反饋階段,同時對該資料進行標記。

可以使用一個etcd或者zk叢集來儲存反饋的熱點資料,然後本地所有節點監聽該熱點資料,進而載入到本地JVM快取中。

熱點資料的獲取

在熱點 Key 的處理上主要分為寫入跟讀取兩種形式,在資料寫入過程當 SLB 收到資料 K1 並將其通過某一個 Proxy 寫入一個 Redis,完成資料的寫入。

假若經過後端熱點模組計算髮現 K1 成為熱點 key 後, Proxy 會將該熱點進行快取,當下次客戶端再進行訪問 K1 時,可以不經 Redis。

最後由於 proxy 是可以水平擴充的,因此可以任意增強熱點資料的訪問能力。

最佳成熟方案: JD開源hotKey

這是目前較為成熟的自動探測熱key、分散式一致性快取解決方案。原理就是在client端做洞察,然後上報對應hotkey,server端檢測到後,將對應hotkey下發到對應服務端做本地快取,並且能保證本地快取和遠端快取的一致性。

在這裡咱們就不細談了,這篇文章的第三部分:JD開源hotkey原始碼解析裡面會帶領大家瞭解其整體原理。

3 JD開源hotkey—自動探測熱key、分散式一致性快取解決方案

3.1 解決痛點

從上面可知,熱點key問題在併發量比較高的系統中(特別是做秒殺活動)出現的頻率會比較高,對系統帶來的危害也很大。

那麼針對此,hotkey誕生的目的是什麼?需要解決的痛點是什麼?以及它的實現原理。

在這裡引用專案上的一段話來概述:

對任意突發性的無法預先感知的熱點資料,包括並不限於熱點資料(如突發大量請求同一個商品)、熱使用者(如惡意爬蟲刷子)、熱介面(突發海量請求同一個介面)等,進行毫秒級精準探測到。然後對這些熱資料、熱使用者等,推送到所有服務端JVM記憶體中,以大幅減輕對後端資料儲存層的衝擊,並可以由使用者決定如何分配、使用這些熱key(譬如對熱商品做本地快取、對熱使用者進行拒絕訪問、對熱介面進行熔斷或返回預設值)。這些熱資料在整個服務端叢集內保持一致性,並且業務隔離。

核心功能:熱資料探測並推送至叢集各個伺服器

3.2 整合方式

整合方式在這裡就不詳述了,感興趣的同學可以自行搜尋。

3.3 原始碼解析

3.3.1 架構簡介

1.全景圖一覽

流程介紹:

  • 客戶端通過引用hotkey的client包,在啟動的時候上報自己的資訊給worker,同時和worker之間建立長連線。定時拉取配置中心上面的規則資訊和worker叢集資訊。

  • 客戶端呼叫hotkey的ishot()的方法來首先匹配規則,然後統計是不是熱key。

  • 通過定時任務把熱key資料上傳到worker節點。

  • worker叢集在收取到所有關於這個key的資料以後(因為通過hash來決定key 上傳到哪個worker的,所以同一個key只會在同一個worker節點上),在和定義的規則進行匹配後判斷是不是熱key,如果是則推送給客戶端,完成本地快取。

2.角色構成

這裡直接借用作者的描述:

1)etcd叢集

etcd作為一個高效能的配置中心,可以以極小的資源佔用,提供高效的監聽訂閱服務。主要用於存放規則配置,各worker的ip地址,以及探測出的熱key、手工新增的熱key等。

2)client端jar包

就是在服務中新增的引用jar,引入後,就可以以便捷的方式去判斷某key是否熱key。同時,該jar完成了key上報、監聽etcd裡的rule變化、worker資訊變化、熱key變化,對熱key進行本地caffeine快取等。

3) worker端叢集

worker端是一個獨立部署的Java程式,啟動後會連線etcd,並定期上報自己的ip資訊,供client端獲取地址並進行長連線。之後,主要就是對各個client發來的待測key進行累加計算,當達到etcd裡設定的rule閾值後,將熱key推送到各個client。

4) dashboard控制檯

控制檯是一個帶視覺化介面的Java程式,也是連線到etcd,之後在控制檯設定各個APP的key規則,譬如2秒20次算熱。然後當worker探測出來熱key後,會將key發往etcd,dashboard也會監聽熱key資訊,進行入庫儲存記錄。同時,dashboard也可以手工新增、刪除熱key,供各個client端監聽。

3.hotkey工程結構

3.3.2 client端

主要從下面三個方面來解析原始碼:

4.客戶端啟動器

1)啟動方式

@PostConstruct
public void init() {
ClientStarter.Builder builder = new ClientStarter.Builder();
ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build();
starter.startPipeline();
}

appName:是這個應用的名稱,一般為${spring.application.name}的值,後續所有的配置都以此為開頭

etcd:是etcd叢集的地址,用逗號分隔,配置中心。

還可以看到ClientStarter實現了建造者模式,使程式碼更為簡介。

2)核心入口

com.jd.platform.hotkey.client.ClientStarter#startPipeline

  1. /**

  2. * 啟動監聽etcd

  3. */

  4. public void startPipeline() {

  5. JdLogger.info(getClass(), "etcdServer:" + etcdServer);

  6. //設定caffeine的最大容量

  7. Context.CAFFEINE_SIZE = caffeineSize;

  8. //設定etcd地址

  9. EtcdConfigFactory.buildConfigCenter(etcdServer);

  10. //開始定時推送

  11. PushSchedulerStarter.startPusher(pushPeriod);

  12. PushSchedulerStarter.startCountPusher(10);

  13. //開啟worker重連器

  14. WorkerRetryConnector.retryConnectWorkers();

  15. registEventBus();

  16. EtcdStarter starter = new EtcdStarter();

  17. //與etcd相關的監聽都開啟

  18. starter.start();

  19. }

該方法主要有五個功能:

① 設定本地快取(caffeine)的最大值,並建立etcd例項

  1. //設定caffeine的最大容量

  2. Context.CAFFEINE_SIZE = caffeineSize;

  3. //設定etcd地址

  4. EtcdConfigFactory.buildConfigCenter(etcdServer);

caffeineSize是本地快取的最大值,在啟動的時候可以設定,不設定預設為200000。

etcdServer是上面說的etcd叢集地址。

Context可以理解為一個配置類,裡面就包含兩個欄位:

  1. public class Context {

  2. public static String APP_NAME;

  3. public static int CAFFEINE_SIZE;

  4. }

EtcdConfigFactory是ectd配置中心的工廠類

  1. public class EtcdConfigFactory {

  2. private static IConfigCenter configCenter;

  3. private EtcdConfigFactory() {}

  4. public static IConfigCenter configCenter() {

  5. return configCenter;

  6. }

  7. public static void buildConfigCenter(String etcdServer) {

  8. //連線多個時,逗號分隔

  9. configCenter = JdEtcdBuilder.build(etcdServer);

  10. }

  11. }

通過其configCenter()方法獲取建立etcd例項物件,IConfigCenter介面封裝了etcd例項物件的行為(包括基本的crud、監控、續約等)

② 建立並啟動定時任務:PushSchedulerStarter

//開始定時推送
PushSchedulerStarter.startPusher(pushPeriod);//每0.5秒推送一次待測key
PushSchedulerStarter.startCountPusher(10);//每10秒推送一次數量統計,不可配置

pushPeriod是推送的間隔時間,可以再啟動的時候設定,最小為0.05s,推送越快,探測的越密集,會越快探測出來,但對client資源消耗相應增大

PushSchedulerStarter類

  1. /**

  2. * 每0.5秒推送一次待測key

  3. */

  4. public static void startPusher(Long period) {

  5. if (period == null || period <= 0) {

  6. period = 500L;

  7. }

  8. @SuppressWarnings("PMD.ThreadPoolCreationRule")

  9. ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-pusher-service-executor", true));

  10. scheduledExecutorService.scheduleAtFixedRate(() -> {

  11. //熱key的收集器

  12. IKeyCollector<HotKeyModel, HotKeyModel> collectHK = KeyHandlerFactory.getCollector();

  13. //這裡相當於每0.5秒,通過netty來給worker來推送收集到的熱key的資訊,主要是一些熱key的元資料資訊(熱key來源的app和key的型別和是否是刪除事件,還有該熱key的上報次數)

  14. //這裡面還有就是該熱key在每次上報的時候都會生成一個全域性的唯一id,還有該熱key每次上報的建立時間是在netty傳送的時候來生成,同一批次的熱key時間是相同的

  15. List<HotKeyModel> hotKeyModels = collectHK.lockAndGetResult();

  16. if(CollectionUtil.isNotEmpty(hotKeyModels)){

  17. //積攢了半秒的key集合,按照hash分發到不同的worker

  18. KeyHandlerFactory.getPusher().send(Context.APP_NAME, hotKeyModels);

  19. collectHK.finishOnce();

  20. }

  21. },0, period, TimeUnit.MILLISECONDS);

  22. }

  23. /**

  24. * 每10秒推送一次數量統計

  25. */

  26. public static void startCountPusher(Integer period) {

  27. if (period == null || period <= 0) {

  28. period = 10;

  29. }

  30. @SuppressWarnings("PMD.ThreadPoolCreationRule")

  31. ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-count-pusher-service-executor", true));

  32. scheduledExecutorService.scheduleAtFixedRate(() -> {

  33. IKeyCollector<KeyHotModel, KeyCountModel> collectHK = KeyHandlerFactory.getCounter();

  34. List<KeyCountModel> keyCountModels = collectHK.lockAndGetResult();

  35. if(CollectionUtil.isNotEmpty(keyCountModels)){

  36. //積攢了10秒的數量,按照hash分發到不同的worker

  37. KeyHandlerFactory.getPusher().sendCount(Context.APP_NAME, keyCountModels);

  38. collectHK.finishOnce();

  39. }

  40. },0, period, TimeUnit.SECONDS);

  41. }

從上面兩個方法可知,都是通過定時執行緒池來實現定時任務的,都是守護執行緒。

咱們重點關注一下KeyHandlerFactory類,它是client端設計的一個比較巧妙的地方,從類名上直譯為key處理工廠。具體的例項物件是DefaultKeyHandler:

  1. public class DefaultKeyHandler {

  2. //推送HotKeyMsg訊息到Netty的推送者

  3. private IKeyPusher iKeyPusher = new NettyKeyPusher();

  4. //待測key的收集器,這裡麵包含兩個map,key主要是熱key的名字,value主要是熱key的元資料資訊(比如:熱key來源的app和key的型別和是否是刪除事件)

  5. private IKeyCollector<HotKeyModel, HotKeyModel> iKeyCollector = new TurnKeyCollector();

  6. //數量收集器,這裡麵包含兩個map,這裡面key是相應的規則,HitCount裡面是這個規則的總訪問次數和熱後訪問次數

  7. private IKeyCollector<KeyHotModel, KeyCountModel> iKeyCounter = new TurnCountCollector();

  8. public IKeyPusher keyPusher() {

  9. return iKeyPusher;

  10. }

  11. public IKeyCollector<HotKeyModel, HotKeyModel> keyCollector() {

  12. return iKeyCollector;

  13. }

  14. public IKeyCollector<KeyHotModel, KeyCountModel> keyCounter() {

  15. return iKeyCounter;

  16. }

  17. }

這裡面有三個成員物件,分別是封裝推送訊息到netty的NettyKeyPusher、待測key收集器TurnKeyCollector、數量收集器TurnCountCollector,其中後兩者都實現了介面IKeyCollector

,能對hotkey的處理起到有效的聚合,充分體現了程式碼的高內聚。

先來看看封裝推送訊息到netty的NettyKeyPusher:

  1. /**

  2. * 將msg推送到netty的pusher

  3. * @author wuweifeng wrote on 2020-01-06

  4. * @version 1.0

  5. */

  6. public class NettyKeyPusher implements IKeyPusher {

  7. @Override

  8. public void send(String appName, List<HotKeyModel> list) {

  9. //積攢了半秒的key集合,按照hash分發到不同的worker

  10. long now = System.currentTimeMillis();

  11. Map<Channel, List<HotKeyModel>> map = new HashMap<>();

  12. for(HotKeyModel model : list) {

  13. model.setCreateTime(now);

  14. Channel channel = WorkerInfoHolder.chooseChannel(model.getKey());

  15. if (channel == null) {

  16. continue;

  17. }

  18. List<HotKeyModel> newList = map.computeIfAbsent(channel, k -> new ArrayList<>());

  19. newList.add(model);

  20. }

  21. for (Channel channel : map.keySet()) {

  22. try {

  23. List<HotKeyModel> batch = map.get(channel);

  24. HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_NEW_KEY, Context.APP_NAME);

  25. hotKeyMsg.setHotKeyModels(batch);

  26. channel.writeAndFlush(hotKeyMsg).sync();

  27. } catch (Exception e) {

  28. try {

  29. InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();

  30. JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());

  31. } catch (Exception ex) {

  32. JdLogger.error(getClass(),"flush error");

  33. }

  34. }

  35. }

  36. }

  37. @Override

  38. public void sendCount(String appName, List<KeyCountModel> list) {

  39. //積攢了10秒的數量,按照hash分發到不同的worker

  40. long now = System.currentTimeMillis();

  41. Map<Channel, List<KeyCountModel>> map = new HashMap<>();

  42. for(KeyCountModel model : list) {

  43. model.setCreateTime(now);

  44. Channel channel = WorkerInfoHolder.chooseChannel(model.getRuleKey());

  45. if (channel == null) {

  46. continue;

  47. }

  48. List<KeyCountModel> newList = map.computeIfAbsent(channel, k -> new ArrayList<>());

  49. newList.add(model);

  50. }

  51. for (Channel channel : map.keySet()) {

  52. try {

  53. List<KeyCountModel> batch = map.get(channel);

  54. HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_HIT_COUNT, Context.APP_NAME);

  55. hotKeyMsg.setKeyCountModels(batch);

  56. channel.writeAndFlush(hotKeyMsg).sync();

  57. } catch (Exception e) {

  58. try {

  59. InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();

  60. JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());

  61. } catch (Exception ex) {

  62. JdLogger.error(getClass(),"flush error");

  63. }

  64. }

  65. }

  66. }

  67. }

send(String appName, List

list)

主要是將TurnKeyCollector收集的待測key通過netty推送給worker,HotKeyModel物件主要是一些熱key的元資料資訊(熱key來源的app和key的型別和是否是刪除事件,還有該熱key的上報次數)

sendCount(String appName, List

list)

主要是將TurnCountCollector收集的規則所對應的key通過netty推送給worker,KeyCountModel物件主要是一些key所對應的規則資訊以及訪問次數等

WorkerInfoHolder.chooseChannel(model.getRuleKey())

根據hash演算法獲取key對應的伺服器,分發到對應伺服器相應的Channel 連線,所以服務端可以水平無限擴容,毫無壓力問題。

再來分析一下key收集器:TurnKeyCollector與TurnCountCollector:

實現IKeyCollector介面:

  1. /**

  2. * 對hotkey進行聚合

  3. * @author wuweifeng wrote on 2020-01-06

  4. * @version 1.0

  5. */

  6. public interface IKeyCollector<T, V> {

  7. /**

  8. * 鎖定後的返回值

  9. */

  10. List<V> lockAndGetResult();

  11. /**

  12. * 輸入的引數

  13. */

  14. void collect(T t);

  15. void finishOnce();

  16. }

lockAndGetResult()

主要是獲取返回collect方法收集的資訊,並將本地暫存的資訊清空,方便下個統計週期積攢資料。

collect(T t)

顧名思義他是收集api呼叫的時候,將收集的到key資訊放到本地儲存。

finishOnce()

該方法目前實現都是空,無需關注。

待測key收集器:TurnKeyCollector

  1. public class TurnKeyCollector implements IKeyCollector<HotKeyModel, HotKeyModel> {

  2. //這map裡面的key主要是熱key的名字,value主要是熱key的元資料資訊(比如:熱key來源的app和key的型別和是否是刪除事件)

  3. private ConcurrentHashMap<String, HotKeyModel> map0 = new ConcurrentHashMap<>();

  4. private ConcurrentHashMap<String, HotKeyModel> map1 = new ConcurrentHashMap<>();

  5. private AtomicLong atomicLong = new AtomicLong(0);

  6. @Override

  7. public List<HotKeyModel> lockAndGetResult() {

  8. //自增後,對應的map就會停止被寫入,等待被讀取

  9. atomicLong.addAndGet(1);

  10. List<HotKeyModel> list;

  11. //可以觀察這裡與collect方法裡面的相同位置,會發現一個是操作map0一個是操作map1,這樣保證在讀map的時候,不會阻塞寫map,

  12. //兩個map同時提供輪流提供讀寫能力,設計的很巧妙,值得學習

  13. if (atomicLong.get() % 2 == 0) {

  14. list = get(map1);

  15. map1.clear();

  16. } else {

  17. list = get(map0);

  18. map0.clear();

  19. }

  20. return list;

  21. }

  22. private List<HotKeyModel> get(ConcurrentHashMap<String, HotKeyModel> map) {

  23. return CollectionUtil.list(false, map.values());

  24. }

  25. @Override

  26. public void collect(HotKeyModel hotKeyModel) {

  27. String key = hotKeyModel.getKey();

  28. if (StrUtil.isEmpty(key)) {

  29. return;

  30. }

  31. if (atomicLong.get() % 2 == 0) {

  32. //不存在時返回null並將key-value放入,已有相同key時,返回該key對應的value,並且不覆蓋

  33. HotKeyModel model = map0.putIfAbsent(key, hotKeyModel);

  34. if (model != null) {

  35. //增加該hotMey上報的次數

  36. model.add(hotKeyModel.getCount());

  37. }

  38. } else {

  39. HotKeyModel model = map1.putIfAbsent(key, hotKeyModel);

  40. if (model != null) {

  41. model.add(hotKeyModel.getCount());

  42. }

  43. }

  44. }

  45. @Override

  46. public void finishOnce() {}

  47. }

可以看到該類中有兩個ConcurrentHashMap

和一個AtomicLong,通過對AtomicLong來自增,然後對2取模,來分別控制兩個map的讀寫能力,保證每個map都能做讀寫,並且同一個map不能同時讀寫,這樣可以避免併發集合讀寫不阻塞,這一塊無鎖化的設計還是非常巧妙的,極大的提高了收集的吞吐量。

key數量收集器:TurnCountCollector

這裡的設計與TurnKeyCollector大同小異,咱們就不細談了。值得一提的是它裡面有個並行處理的機制,當收集的數量超過DATA_CONVERT_SWITCH_THRESHOLD=5000的閾值時,lockAndGetResult處理是使用java Stream並行流處理,提升處理的效率。

③ 開啟worker重連器

  1. //開啟worker重連器

  2. WorkerRetryConnector.retryConnectWorkers();

  3. public class WorkerRetryConnector {

  4. /**

  5. * 定時去重連沒連上的workers

  6. */

  7. public static void retryConnectWorkers() {

  8. @SuppressWarnings("PMD.ThreadPoolCreationRule")

  9. ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("worker-retry-connector-service-executor", true));

  10. //開啟拉取etcd的worker資訊,如果拉取失敗,則定時繼續拉取

  11. scheduledExecutorService.scheduleAtFixedRate(WorkerRetryConnector::reConnectWorkers, 30, 30, TimeUnit.SECONDS);

  12. }

  13. private static void reConnectWorkers() {

  14. List<String> nonList = WorkerInfoHolder.getNonConnectedWorkers();

  15. if (nonList.size() == 0) {

  16. return;

  17. }

  18. JdLogger.info(WorkerRetryConnector.class, "trying to reConnect to these workers :" + nonList);

  19. NettyClient.getInstance().connect(nonList);//這裡會觸發netty連線方法channelActive

  20. }

  21. }

也是通過定時執行緒來執行,預設時間間隔是30s,不可設定。

通過WorkerInfoHolder來控制client的worker連線資訊,連線資訊是個List,用的CopyOnWriteArrayList,畢竟是一個讀多寫少的場景,類似與元資料資訊。

/**
* 儲存worker的ip地址和Channel的對映關係,這是有序的。每次client傳送訊息時,都會根據該map的size進行hash
* 如key-1就傳送到workerHolder的第1個Channel去,key-2就發到第2個Channel去
*/
private static final List<Server> WORKER_HOLDER = new CopyOnWriteArrayList<>();

④ 註冊EventBus事件訂閱者

private void registEventBus() {
//netty聯結器會關注WorkerInfoChangeEvent事件
EventBusCenter.register(new WorkerChangeSubscriber());
//熱key探測回撥關注熱key事件
EventBusCenter.register(new ReceiveNewKeySubscribe());
//Rule的變化的事件
EventBusCenter.register(new KeyRuleHolder());
}

使用guava的EventBus事件訊息匯流排,利用釋出/訂閱者模式來對專案進行解耦。它可以利用很少的程式碼,來實現多元件間通訊。

基本原理圖如下:

監聽worker資訊變動:WorkerChangeSubscriber

  1. /**

  2. * 監聽worker資訊變動

  3. */

  4. @Subscribe

  5. public void connectAll(WorkerInfoChangeEvent event) {

  6. List<String> addresses = event.getAddresses();

  7. if (addresses == null) {

  8. addresses = new ArrayList<>();

  9. }

  10. WorkerInfoHolder.mergeAndConnectNew(addresses);

  11. }

  12. /**

  13. * 當client與worker的連線斷開後,刪除

  14. */

  15. @Subscribe

  16. public void channelInactive(ChannelInactiveEvent inactiveEvent) {

  17. //獲取斷線的channel

  18. Channel channel = inactiveEvent.getChannel();

  19. InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();

  20. String address = socketAddress.getHostName() + ":" + socketAddress.getPort();

  21. JdLogger.warn(getClass(), "this channel is inactive : " + socketAddress + " trying to remove this connection");

  22. WorkerInfoHolder.dealChannelInactive(address);

  23. }

監聽熱key回撥事件:ReceiveNewKeySubscribe

  1. private ReceiveNewKeyListener receiveNewKeyListener = new DefaultNewKeyListener();

  2. @Subscribe

  3. public void newKeyComing(ReceiveNewKeyEvent event) {

  4. HotKeyModel hotKeyModel = event.getModel();

  5. if (hotKeyModel == null) {

  6. return;

  7. }

  8. //收到新key推送

  9. if (receiveNewKeyListener != null) {

  10. receiveNewKeyListener.newKey(hotKeyModel);

  11. }

  12. }

該方法會收到新的熱key訂閱事件之後,會將其加入到KeyHandlerFactory的收集器裡面處理。

核心處理邏輯:DefaultNewKeyListener#newKey:

@Override
public void newKey(HotKeyModel hotKeyModel) {
long now = System.currentTimeMillis();
//如果key到達時已經過去1秒了,記錄一下。手工刪除key時,沒有CreateTime
if (hotKeyModel.getCreateTime() != 0 && Math.abs(now - hotKeyModel.getCreateTime()) > 1000) {
JdLogger.warn(getClass(), "the key comes too late : " + hotKeyModel.getKey() + " now " +
+now + " keyCreateAt " + hotKeyModel.getCreateTime());
}
if (hotKeyModel.isRemove()) {
//如果是刪除事件,就直接刪除
deleteKey(hotKeyModel.getKey());
return;
}
//已經是熱key了,又推過來同樣的熱key,做個日誌記錄,並重新整理一下
if (JdHotKeyStore.isHot(hotKeyModel.getKey())) {
JdLogger.warn(getClass(), "receive repeat hot key :" + hotKeyModel.getKey() + " at " + now);
}
addKey(hotKeyModel.getKey());
}
private void deleteKey(String key) {
CacheFactory.getNonNullCache(key).delete(key);
}
private void addKey(String key) {
ValueModel valueModel = ValueModel.defaultValue(key);
if (valueModel == null) {
//不符合任何規則
deleteKey(key);
return;
}
//如果原來該key已經存在了,那麼value就被重置,過期時間也會被重置。如果原來不存在,就新增的熱key
JdHotKeyStore.setValueDirectly(key, valueModel);
}
  1. 如果該HotKeyModel裡面是刪除事件,則獲取RULE_CACHE_MAP裡面該key超時時間對應的caffeine,然後從中刪除該key快取,然後返回(這裡相當於刪除了本地快取)。

  2. 如果不是刪除事件,則在RULE_CACHE_MAP對應的caffeine快取中新增該key的快取。

  3. 這裡有個注意點,如果不為刪除事件,呼叫addKey()方法在caffeine增加快取的時候,value是一個魔術值0x12fcf76,這個值只代表加了這個快取,但是這個快取在查詢的時候相當於為null。

監聽Rule的變化事件:KeyRuleHolder

可以看到裡面有兩個成員屬性:RULE_CACHE_MAP,KEY_RULES

/**
* 儲存超時時間和caffeine的對映,key是超時時間,value是caffeine[(String,Object)]
*/
private static final ConcurrentHashMap<Integer, LocalCache> RULE_CACHE_MAP = new ConcurrentHashMap<>();
/**
* 這裡KEY_RULES是儲存etcd裡面該appName所對應的所有rule
*/
private static final List<KeyRule> KEY_RULES = new ArrayList<>();

ConcurrentHashMap  RULE_CACHE_MAP:

  • 儲存超時時間和caffeine的對映,key是超時時間,value是caffeine[(String,Object)]。

  • 巧妙的設計:這裡將key的過期時間作為分桶策略,這樣同一個過期時間的key就會在一個桶(caffeine)裡面,這裡面每一個caffeine都是client的本地快取,也就是說hotKey的本地快取的KV實際上是儲存在這裡面的。

List  KEY_RULES:

  • 這裡KEY_RULES是儲存etcd裡面該appName所對應的所有rule。

具體監聽KeyRuleInfoChangeEvent事件方法:

  1. @Subscribe

  2. public void ruleChange(KeyRuleInfoChangeEvent event) {

  3. JdLogger.info(getClass(), "new rules info is :" + event.getKeyRules());

  4. List<KeyRule> ruleList = event.getKeyRules();

  5. if (ruleList == null) {

  6. return;

  7. }

  8. putRules(ruleList);

  9. }

核心處理邏輯:KeyRuleHolder#putRules:

/**
* 所有的規則,如果規則的超時時間變化了,會重建caffeine
*/
public static void putRules(List<KeyRule> keyRules) {
synchronized (KEY_RULES) {
//如果規則為空,清空規則表
if (CollectionUtil.isEmpty(keyRules)) {
KEY_RULES.clear();
RULE_CACHE_MAP.clear();
return;
}
KEY_RULES.clear();
KEY_RULES.addAll(keyRules);
Set<Integer> durationSet = keyRules.stream().map(KeyRule::getDuration).collect(Collectors.toSet());
for (Integer duration : RULE_CACHE_MAP.keySet()) {
//先清除掉那些在RULE_CACHE_MAP裡存的,但是rule裡已沒有的
if (!durationSet.contains(duration)) {
RULE_CACHE_MAP.remove(duration);
}
}
//遍歷所有的規則
for (KeyRule keyRule : keyRules) {
int duration = keyRule.getDuration();
//這裡如果RULE_CACHE_MAP裡面沒有超時時間為duration的value,則新建一個放入到RULE_CACHE_MAP裡面
//比如RULE_CACHE_MAP本來就是空的,則在這裡來構建RULE_CACHE_MAP的對映關係
//TODO 如果keyRules裡面包含相同duration的keyRule,則也只會建一個key為duration,value為caffeine,其中caffeine是(string,object)
if (RULE_CACHE_MAP.get(duration) == null) {
LocalCache cache = CacheFactory.build(duration);
RULE_CACHE_MAP.put(duration, cache);
}
}
}
}
  • 使用synchronized關鍵字來保證執行緒安全;

  • 如果規則為空,清空規則表(RULE_CACHE_MAP、KEY_RULES);

  • 使用傳遞進來的keyRules來覆蓋KEY_RULES;

  • 清除掉RULE_CACHE_MAP裡面在keyRules沒有的對映關係;

  • 遍歷所有的keyRules,如果RULE_CACHE_MAP裡面沒有相關的超時時間key,則在裡面賦值;

⑤ 啟動EtcdStarter(etcd連線管理器)

  1. EtcdStarter starter = new EtcdStarter();

  2. //與etcd相關的監聽都開啟

  3. starter.start();

  4. public void start() {

  5. fetchWorkerInfo();

  6. fetchRule();

  7. startWatchRule();

  8. //監聽熱key事件,只監聽手工新增、刪除的key

  9. startWatchHotKey();

  10. }

fetchWorkerInfo()

從etcd裡面拉取worker叢集地址資訊allAddress,並更新WorkerInfoHolder裡面的WORKER_HOLDER

  1. /**

  2. * 每隔30秒拉取worker資訊

  3. */

  4. private void fetchWorkerInfo() {

  5. ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

  6. //開啟拉取etcd的worker資訊,如果拉取失敗,則定時繼續拉取

  7. scheduledExecutorService.scheduleAtFixedRate(() -> {

  8. JdLogger.info(getClass(), "trying to connect to etcd and fetch worker info");

  9. fetch();

  10. }, 0, 30, TimeUnit.SECONDS);

  11. }

  • 使用定時執行緒池來執行,單執行緒。

  • 定時從etcd裡面獲取,地址/jd/workers/+$appName或default,時間間隔不可設定,預設30秒,這裡面儲存的是worker地址的ip+port。

  • 釋出WorkerInfoChangeEvent事件。

  • 備註:地址有$appName或default,在worker裡面配置,如果把worker放到某個appName下,則該worker只會參與該app的計算。

fetchRule()

定時執行緒來執行,單執行緒,時間間隔不可設定,預設是5秒,當拉取規則配置和手動配置的hotKey成功後,該執行緒被終止(也就是說只會成功執行一次),執行失敗繼續執行

private void fetchRule() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//開啟拉取etcd的worker資訊,如果拉取失敗,則定時繼續拉取
scheduledExecutorService.scheduleAtFixedRate(() -> {
JdLogger.info(getClass(), "trying to connect to etcd and fetch rule info");
boolean success = fetchRuleFromEtcd();
if (success) {
//拉取已存在的熱key
fetchExistHotKey();
//這裡如果拉取規則和拉取手動配置的hotKey成功之後,則該定時執行執行緒停止
scheduledExecutorService.shutdown();
}
}, 0, 5, TimeUnit.SECONDS);
}

fetchRuleFromEtcd()

  • 從etcd裡面獲取該appName配置的rule規則,地址/jd/rules/+$appName。

  • 如果查出來規則rules為空,會通過釋出KeyRuleInfoChangeEvent事件來清空本地的rule配置快取和所有的規則key快取。

  • 釋出KeyRuleInfoChangeEvent事件。

fetchExistHotKey()

  • 從etcd裡面獲取該appName手動配置的熱key,地址/jd/hotkeys/+$appName。

  • 釋出ReceiveNewKeyEvent事件,並且內容HotKeyModel不是刪除事件。

startWatchRule()

  1. /**

  2. * 非同步監聽rule規則變化

  3. */

  4. private void startWatchRule() {

  5. ExecutorService executorService = Executors.newSingleThreadExecutor();

  6. executorService.submit(() -> {

  7. JdLogger.info(getClass(), "--- begin watch rule change ----");

  8. try {

  9. IConfigCenter configCenter = EtcdConfigFactory.configCenter();

  10. KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.rulePath + Context.APP_NAME);

  11. //如果有新事件,即rule的變更,就重新拉取所有的資訊

  12. while (watchIterator.hasNext()) {

  13. //這句必須寫,next會讓他卡住,除非真的有新rule變更

  14. WatchUpdate watchUpdate = watchIterator.next();

  15. List<Event> eventList = watchUpdate.getEvents();

  16. JdLogger.info(getClass(), "rules info changed. begin to fetch new infos. rule change is " + eventList);

  17. //全量拉取rule資訊

  18. fetchRuleFromEtcd();

  19. }

  20. } catch (Exception e) {

  21. JdLogger.error(getClass(), "watch err");

  22. }

  23. });

  24. }

  • 非同步監聽rule規則變化,使用etcd監聽地址為/jd/rules/+$appName的節點變化。

  • 使用執行緒池,單執行緒,非同步監聽rule規則變化,如果有事件變化,則呼叫fetchRuleFromEtcd()方法。

startWatchHotKey()

非同步開始監聽熱key變化資訊,使用etcd監聽地址字首為/jd/hotkeys/+$appName

  1. /**

  2. * 非同步開始監聽熱key變化資訊,該目錄裡只有手工新增的key資訊

  3. */

  4. private void startWatchHotKey() {

  5. ExecutorService executorService = Executors.newSingleThreadExecutor();

  6. executorService.submit(() -> {

  7. JdLogger.info(getClass(), "--- begin watch hotKey change ----");

  8. IConfigCenter configCenter = EtcdConfigFactory.configCenter();

  9. try {

  10. KvClient.WatchIterator watchIterator = configCenter.watchPrefix(ConfigConstant.hotKeyPath + Context.APP_NAME);

  11. //如果有新事件,即新key產生或刪除

  12. while (watchIterator.hasNext()) {

  13. WatchUpdate watchUpdate = watchIterator.next();

  14. List<Event> eventList = watchUpdate.getEvents();

  15. KeyValue keyValue = eventList.get(0).getKv();

  16. Event.EventType eventType = eventList.get(0).getType();

  17. try {

  18. //從這個地方可以看出,etcd給的返回是節點的全路徑,而我們需要的key要去掉字首

  19. String key = keyValue.getKey().toStringUtf8().replace(ConfigConstant.hotKeyPath + Context.APP_NAME + "/", "");

  20. //如果是刪除key,就立刻刪除

  21. if (Event.EventType.DELETE == eventType) {

  22. HotKeyModel model = new HotKeyModel();

  23. model.setRemove(true);

  24. model.setKey(key);

  25. EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

  26. } else {

  27. HotKeyModel model = new HotKeyModel();

  28. model.setRemove(false);

  29. String value = keyValue.getValue().toStringUtf8();

  30. //新增熱key

  31. JdLogger.info(getClass(), "etcd receive new key : " + key + " --value:" + value);

  32. //如果這是一個刪除指令,就什麼也不幹

  33. //TODO 這裡有個疑問,監聽到worker自動探測發出的惰性刪除指令,這裡之間跳過了,但是本地快取沒有更新吧?

  34. //TODO 所以我猜測在客戶端使用判斷快取是否存在的api裡面,應該會判斷相關快取的value值是否為"#[DELETE]#"刪除標記

  35. //解疑:這裡確實只監聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的

  36. if (Constant.DEFAULT_DELETE_VALUE.equals(value)) {

  37. continue;

  38. }

  39. //手工建立的value是時間戳

  40. model.setCreateTime(Long.valueOf(keyValue.getValue().toStringUtf8()));

  41. model.setKey(key);

  42. EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

  43. }

  44. } catch (Exception e) {

  45. JdLogger.error(getClass(), "new key err :" + keyValue);

  46. }

  47. }

  48. } catch (Exception e) {

  49. JdLogger.error(getClass(), "watch err");

  50. }

  51. });

  52. }

  • 使用執行緒池,單執行緒,非同步監聽熱key變化

  • 使用etcd監聽字首地址的當前節點以及子節點的所有變化值

    • 刪除節點動作

    • 釋出ReceiveNewKeyEvent事件,並且內容HotKeyModel是刪除事件

    • 新增or更新節點動作

    • 事件變化的value值為刪除標記#[DELETE]#

    • 如果是刪除標記的話,代表是worker自動探測或者client需要刪除的指令。

    • 如果是刪除標記則什麼也不做,直接跳過(這裡從HotKeyPusher#push方法可以看到,做刪除事件的操作時候,他會給/jd/hotkeys/+$appName的節點裡面增加一個值為刪除標記的節點,然後再刪除相同路徑的節點,這樣就可以觸發上面的刪除節點事件,所以這裡判斷如果是刪除標記直接跳過)。

    • 不為刪除標記

    • 釋出ReceiveNewKeyEvent事件,事件內容HotKeyModel裡面的createTime是kv對應的時間戳

疑問: 這裡程式碼註釋裡面說只監聽手工新增或者刪除的hotKey,難道說/jd/hotkeys/+$appName地址只是手工配置的地址嗎?

解疑: 這裡確實只監聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的

5.API解析

1)流程圖示

① 查詢流程

② 刪除流程:

從上面的流程圖中,大家應該知道該熱點key在程式碼中是如何扭轉的,這裡再給大家講解一下核心API的原始碼解析,限於篇幅的原因,咱們不一個個貼相關原始碼了,只是單純的告訴你它的內部邏輯是怎麼樣的。

2)核心類:JdHotKeyStore

JdHotKeyStore是封裝client呼叫的api核心類,包含上面10個公共方法,咱們重點解析其中6個重要方法:

① isHotKey(String key)

判斷是否在規則內,如果不在返回false

判斷是否是熱key,如果不是或者是且過期時間在2s內,則給TurnKeyCollector#collect收集

最後給TurnCountCollector#collect做統計收集

② get(String key)

從本地caffeine取值

如果取到的value是個魔術值,只代表加入到caffeine快取裡面了,查詢的話為null

③ smartSet(String key, Object value)

判斷是否是熱key,這裡不管它在不在規則內,如果是熱key,則給value賦值,如果不為熱key什麼也不做

④ forceSet(String key, Object value)

強制給value賦值

如果該key不在規則配置內,則傳遞的value不生效,本地快取的賦值value會被變為null

⑤ getValue(String key, KeyType keyType)

獲取value,如果value不存在則呼叫HotKeyPusher#push方法發往netty

如果沒有為該key配置規則,就不用上報key,直接返回null

如果取到的value是個魔術值,只代表加入到caffeine快取裡面了,查詢的話為null

⑥ remove(String key)

刪除某key(本地的caffeine快取),會通知整個叢集刪除(通過etcd來通知叢集刪除)

3)client上傳熱key入口呼叫類:HotKeyPusher

核心方法:

  1. public static void push(String key, KeyType keyType, int count, boolean remove) {

  2. if (count <= 0) {

  3. count = 1;

  4. }

  5. if (keyType == null) {

  6. keyType = KeyType.REDIS_KEY;

  7. }

  8. if (key == null) {

  9. return;

  10. }

  11. //這裡之所以用LongAdder是為了保證多執行緒計數的執行緒安全性,雖然這裡是在方法內呼叫的,但是在TurnKeyCollector的兩個map裡面,

  12. //儲存了HotKeyModel的例項物件,這樣在多個執行緒同時修改count的計數屬性時,會存線上程安全計數不準確問題

  13. LongAdder adderCnt = new LongAdder();

  14. adderCnt.add(count);

  15. HotKeyModel hotKeyModel = new HotKeyModel();

  16. hotKeyModel.setAppName(Context.APP_NAME);

  17. hotKeyModel.setKeyType(keyType);

  18. hotKeyModel.setCount(adderCnt);

  19. hotKeyModel.setRemove(remove);

  20. hotKeyModel.setKey(key);

  21. if (remove) {

  22. //如果是刪除key,就直接發到etcd去,不用做聚合。但是有點問題現在,這個刪除只能刪手工新增的key,不能刪worker探測出來的

  23. //因為各個client都在監聽手工新增的那個path,沒監聽自動探測的path。所以如果手工的那個path下,沒有該key,那麼是刪除不了的。

  24. //刪不了,就達不到叢集監聽刪除事件的效果,怎麼辦呢?可以通過新增的方式,新增一個熱key,然後刪除它

  25. //TODO 這裡為啥不直接刪除該節點,難道worker自動探測處理的hotKey不會往該節點增加新增事件嗎?

  26. //釋疑:worker根據探測配置的規則,當判斷出某個key為hotKey後,確實不會往keyPath裡面加入節點,他只是單純的往本地快取裡面加入一個空值,代表是熱點key

  27. EtcdConfigFactory.configCenter().putAndGrant(HotKeyPathTool.keyPath(hotKeyModel), Constant.DEFAULT_DELETE_VALUE, 1);

  28. EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyPath(hotKeyModel));//TODO 這裡很巧妙待補充描述

  29. //也刪worker探測的目錄

  30. EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyRecordPath(hotKeyModel));

  31. } else {

  32. //如果key是規則內的要被探測的key,就積累等待傳送

  33. if (KeyRuleHolder.isKeyInRule(key)) {

  34. //積攢起來,等待每半秒傳送一次

  35. KeyHandlerFactory.getCollector().collect(hotKeyModel);

  36. }

  37. }

  38. }

從上面的原始碼中可知:

  • 這裡之所以用LongAdder是為了保證多執行緒計數的執行緒安全性,雖然這裡是在方法內呼叫的,但是在TurnKeyCollector的兩個map裡面,儲存了HotKeyModel的例項物件,這樣在多個執行緒同時修改count的計數屬性時,會存線上程安全計數不準確問題。

  • 如果是remove刪除型別,在刪除手動配置的熱key配置路徑的同時,還會刪除dashboard展示熱key的配置路徑。

  • 只有在規則配置的key,才會被積攢探測傳送到worker內進行計算。

6.通訊機制(與worker互動)

1)NettyClient:netty聯結器

  1. public class NettyClient {

  2. private static final NettyClient nettyClient = new NettyClient();

  3. private Bootstrap bootstrap;

  4. public static NettyClient getInstance() {

  5. return nettyClient;

  6. }

  7. private NettyClient() {

  8. if (bootstrap == null) {

  9. bootstrap = initBootstrap();

  10. }

  11. }

  12. private Bootstrap initBootstrap() {

  13. //少執行緒

  14. EventLoopGroup group = new NioEventLoopGroup(2);

  15. Bootstrap bootstrap = new Bootstrap();

  16. NettyClientHandler nettyClientHandler = new NettyClientHandler();

  17. bootstrap.group(group).channel(NioSocketChannel.class)

  18. .option(ChannelOption.SO_KEEPALIVE, true)

  19. .option(ChannelOption.TCP_NODELAY, true)

  20. .handler(new ChannelInitializer<SocketChannel>() {

  21. @Override

  22. protected void initChannel(SocketChannel ch) {

  23. ByteBuf delimiter = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());

  24. ch.pipeline()

  25. .addLast(new DelimiterBasedFrameDecoder(Constant.MAX_LENGTH, delimiter))//這裡就是定義TCP多個包之間的分隔符,為了更好的做拆包

  26. .addLast(new MsgDecoder())

  27. .addLast(new MsgEncoder())

  28. //30秒沒訊息時,就發心跳包過去

  29. .addLast(new IdleStateHandler(0, 0, 30))

  30. .addLast(nettyClientHandler);

  31. }

  32. });

  33. return bootstrap;

  34. }

  35. }

  • 使用Reactor執行緒模型,只有2個工作執行緒,沒有單獨設定主執行緒

  • 長連線,開啟TCP_NODELAY

  • netty的分隔符”$( )$”,類似TCP報文分段的標準,方便拆包

  • Protobuf序列化與反序列化

  • 30s沒有訊息發給對端的時候,傳送一個心跳包判活

  • 工作執行緒處理器NettyClientHandler

JDhotkey的tcp協議設計就是收發字串,每個tcp訊息包使用特殊字元$( )$來分割

優點:這樣實現非常簡單。

獲得訊息包後進行json或者protobuf反序列化。

缺點:是需要,從位元組流-》反序列化成字串-》反序列化成訊息物件,兩層序列化損耗了一部分效能。

protobuf還好序列化很快,但是json序列化的速度只有幾十萬每秒,會損耗一部分效能。

2)NettyClientHandler:工作執行緒處理器

  1. @ChannelHandler.Sharable

  2. public class NettyClientHandler extends SimpleChannelInboundHandler<HotKeyMsg> {

  3. @Override

  4. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

  5. if (evt instanceof IdleStateEvent) {

  6. IdleStateEvent idleStateEvent = (IdleStateEvent) evt;

  7. //這裡表示如果讀寫都掛了

  8. if (idleStateEvent.state() == IdleState.ALL_IDLE) {

  9. //向服務端傳送訊息

  10. ctx.writeAndFlush(new HotKeyMsg(MessageType.PING, Context.APP_NAME));

  11. }

  12. }

  13. super.userEventTriggered(ctx, evt);

  14. }

  15. //在Channel註冊EventLoop、繫結SocketAddress和連線ChannelFuture的時候都有可能會觸發ChannelInboundHandler的channelActive方法的呼叫

  16. //類似TCP三次握手成功之後觸發

  17. @Override

  18. public void channelActive(ChannelHandlerContext ctx) {

  19. JdLogger.info(getClass(), "channelActive:" + ctx.name());

  20. ctx.writeAndFlush(new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME));

  21. }

  22. //類似TCP四次揮手之後,等待2MSL時間之後觸發(大概180s),比如channel通道關閉會觸發(channel.close())

  23. //客戶端channel主動關閉連線時,會向服務端傳送一個寫請求,然後服務端channel所在的selector會監聽到一個OP_READ事件,然後

  24. //執行資料讀取操作,而讀取時發現客戶端channel已經關閉了,則讀取資料位元組個數返回-1,然後執行close操作,關閉該channel對應的底層socket,

  25. //並在pipeline中,從head開始,往下將InboundHandler,並觸發handler的channelInactive和channelUnregistered方法的執行,以及移除pipeline中的handlers一系列操作。

  26. @Override

  27. public void channelInactive(ChannelHandlerContext ctx) throws Exception {

  28. super.channelInactive(ctx);

  29. //斷線了,可能只是client和server斷了,但都和etcd沒斷。也可能是client自己斷網了,也可能是server斷了

  30. //釋出斷線事件。後續10秒後進行重連,根據etcd裡的worker資訊來決定是否重連,如果etcd裡沒了,就不重連。如果etcd裡有,就重連

  31. notifyWorkerChange(ctx.channel());

  32. }

  33. private void notifyWorkerChange(Channel channel) {

  34. EventBusCenter.getInstance().post(new ChannelInactiveEvent(channel));

  35. }

  36. @Override

  37. protected void channelRead0(ChannelHandlerContext channelHandlerContext, HotKeyMsg msg) {

  38. if (MessageType.PONG == msg.getMessageType()) {

  39. JdLogger.info(getClass(), "heart beat");

  40. return;

  41. }

  42. if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {

  43. JdLogger.info(getClass(), "receive new key : " + msg);

  44. if (CollectionUtil.isEmpty(msg.getHotKeyModels())) {

  45. return;

  46. }

  47. for (HotKeyModel model : msg.getHotKeyModels()) {

  48. EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

  49. }

  50. }

  51. }

  52. }

userEventTriggered

  • 收到對端發來的心跳包,返回new HotKeyMsg(MessageType.PING, Context.APP_NAME)

channelActive

  • 在Channel註冊EventLoop、繫結SocketAddress和連線ChannelFuture的時候都有可能會觸發ChannelInboundHandler的channelActive方法的呼叫

  • 類似TCP三次握手成功之後觸發,給對端傳送new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME)

channelInactive

  • 類似TCP四次揮手之後,等待2MSL時間之後觸發(大概180s),比如channel通道關閉會觸發(channel.close())該方法,釋出ChannelInactiveEvent事件,來10s後重連

channelRead0

  • 接收PONG訊息型別時,打個日誌返回

  • 接收RESPONSE_NEW_KEY訊息型別時,釋出ReceiveNewKeyEvent事件

3.3.3 worker端

1.入口啟動載入:7個@PostConstruct

1)worker端對etcd相關的處理:EtcdStarter

① 第一個@PostConstruct:watchLog()

@PostConstruct
public void watchLog() {
AsyncPool.asyncDo(() -> {
try {
//取etcd的是否開啟日誌配置,地址/jd/logOn
String loggerOn = configCenter.get(ConfigConstant.logToggle);
LOGGER_ON = "true".equals(loggerOn) || "1".equals(loggerOn);
} catch (StatusRuntimeException ex) {
logger.error(ETCD_DOWN);
}
//監聽etcd地址/jd/logOn是否開啟日誌配置,並實時更改開關
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.logToggle);
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List<Event> eventList = watchUpdate.getEvents();
KeyValue keyValue = eventList.get(0).getKv();
logger.info("log toggle changed : " + keyValue);
String value = keyValue.getValue().toStringUtf8();
LOGGER_ON = "true".equals(value) || "1".equals(value);
}
});
}
  • 放到執行緒池裡面非同步執行

  • 取etcd的是否開啟日誌配置,地址/jd/logOn,預設true

  • 監聽etcd地址/jd/logOn是否開啟日誌配置,並實時更改開關

  • 由於有etcd的監聽,所以會一直執行,而不是執行一次結束

② 第二個@PostConstruct:watch()

/**
* 啟動回撥監聽器,監聽rule變化
*/
@PostConstruct
public void watch() {
AsyncPool.asyncDo(() -> {
KvClient.WatchIterator watchIterator;
if (isForSingle()) {
watchIterator = configCenter.watch(ConfigConstant.rulePath + workerPath);
} else {
watchIterator = configCenter.watchPrefix(ConfigConstant.rulePath);
}
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List<Event> eventList = watchUpdate.getEvents();
KeyValue keyValue = eventList.get(0).getKv();
logger.info("rule changed : " + keyValue);
try {
ruleChange(keyValue);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* rule發生變化時,更新快取的rule
*/
private synchronized void ruleChange(KeyValue keyValue) {
String appName = keyValue.getKey().toStringUtf8().replace(ConfigConstant.rulePath, "");
if (StrUtil.isEmpty(appName)) {
return;
}
String ruleJson = keyValue.getValue().toStringUtf8();
List<KeyRule> keyRules = FastJsonUtils.toList(ruleJson, KeyRule.class);
KeyRuleHolder.put(appName, keyRules);
}

通過etcd.workerPath配置,來判斷該worker是否為某個app單獨服務的,預設為”default”,如果是預設值,代表該worker參與在etcd上所有app client的計算,否則只為某個app來服務計算

使用etcd來監聽rule規則變化,如果是共享的worker,監聽地址字首為”/jd/rules/“,如果為某個app獨享,監聽地址為”/jd/rules/“+$etcd.workerPath

如果規則變化,則修改對應app在本地儲存的rule快取,同時清理該app在本地儲存的KV快取

KeyRuleHolder:rule快取本地儲存

  • Map<string, list

  • 相對於client的KeyRuleHolder的區別:worker是儲存所有app規則,每個app對應一個規則桶,所以用map

CaffeineCacheHolder:key快取本地儲存

  • Map<string, cache

  • 相對於client的caffeine,第一是worker沒有做快取介面比如LocalCache,第二是client的map的kv分別是超時時間、以及相同超時時間所對應key的快取桶

放到執行緒池裡面非同步執行,由於有etcd的監聽,所以會一直執行,而不是執行一次結束

③ 第三個@PostConstruct:watchWhiteList()

/**
* 啟動回撥監聽器,監聽白名單變化,只監聽自己所在的app,白名單key不參與熱key計算,直接忽略
*/
@PostConstruct
public void watchWhiteList() {
AsyncPool.asyncDo(() -> {
//從etcd配置中獲取所有白名單
fetchWhite();
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.whiteListPath + workerPath);
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
logger.info("whiteList changed ");
try {
fetchWhite();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
  • 拉取並監聽etcd白名單key配置,地址為/jd/whiteList/+$etcd.workerPath

  • 在白名單的key,不參與熱key計算,直接忽略

  • 放到執行緒池裡面非同步執行,由於有etcd的監聽,所以會一直執行,而不是執行一次結束

    ④ 第四個@PostConstruct:makeSureSelfOn()

/**
* 每隔一會去check一下,自己還在不在etcd裡
*/
@PostConstruct
public void makeSureSelfOn() {
//開啟上傳worker資訊
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
if (canUpload) {
uploadSelfInfo();
}
} catch (Exception e) {
//do nothing
}
}, 0, 5, TimeUnit.SECONDS);
}
  • 線上程池裡面非同步執行,定時執行,時間間隔為5s

  • 將本機woker的hostName,ip+port以kv的形式定時上報給etcd,地址為/jd/workers/+$etcd.workPath+”/“+$hostName,續期時間為8s

  • 有一個canUpload的開關來控制worker是否向etcd來定時續期,如果這個開關關閉了,代表worker不向etcd來續期,這樣當上面地址的kv到期之後,etcd會刪除該節點,這樣client迴圈判斷worker資訊變化了

2)將熱key推送到dashboard供入庫:DashboardPusher

① 第五個@PostConstruct:uploadToDashboard()

  1. @Component

  2. public class DashboardPusher implements IPusher {

  3. /**

  4. * 熱key集中營

  5. */

  6. private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();

  7. @PostConstruct

  8. public void uploadToDashboard() {

  9. AsyncPool.asyncDo(() -> {

  10. while (true) {

  11. try {

  12. //要麼key達到1千個,要麼達到1秒,就彙總上報給etcd一次

  13. List<HotKeyModel> tempModels = new ArrayList<>();

  14. Queues.drain(hotKeyStoreQueue, tempModels, 1000, 1, TimeUnit.SECONDS);

  15. if (CollectionUtil.isEmpty(tempModels)) {

  16. continue;

  17. }

  18. //將熱key推到dashboard

  19. DashboardHolder.flushToDashboard(FastJsonUtils.convertObjectToJSON(tempModels));

  20. } catch (Exception e) {

  21. e.printStackTrace();

  22. }

  23. }

  24. });

  25. }

  26. }

  • 當熱key的數量達到1000或者每隔1s,把熱key的資料通過與dashboard的netty通道來發送給dashboard,資料型別為REQUEST_HOT_KEY

  • LinkedBlockingQueue

    hotKeyStoreQueue:worker計算的給dashboard熱key的集中營,所有給dashboard推送熱key儲存在裡面

    3)推送到各客戶端伺服器:AppServerPusher

    ① 第六個@PostConstruct:batchPushToClient()

  1. public class AppServerPusher implements IPusher {

  2. /**

  3. * 熱key集中營

  4. */

  5. private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();

  6. /**

  7. * 和dashboard那邊的推送主要區別在於,給app推送每10ms一次,dashboard那邊1s一次

  8. */

  9. @PostConstruct

  10. public void batchPushToClient() {

  11. AsyncPool.asyncDo(() -> {

  12. while (true) {

  13. try {

  14. List<HotKeyModel> tempModels = new ArrayList<>();

  15. //每10ms推送一次

  16. Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);

  17. if (CollectionUtil.isEmpty(tempModels)) {

  18. continue;

  19. }

  20. Map<String, List<HotKeyModel>> allAppHotKeyModels = new HashMap<>();

  21. //拆分出每個app的熱key集合,按app分堆

  22. for (HotKeyModel hotKeyModel : tempModels) {

  23. List<HotKeyModel> oneAppModels = allAppHotKeyModels.computeIfAbsent(hotKeyModel.getAppName(), (key) -> new ArrayList<>());

  24. oneAppModels.add(hotKeyModel);

  25. }

  26. //遍歷所有app,進行推送

  27. for (AppInfo appInfo : ClientInfoHolder.apps) {

  28. List<HotKeyModel> list = allAppHotKeyModels.get(appInfo.getAppName());

  29. if (CollectionUtil.isEmpty(list)) {

  30. continue;

  31. }

  32. HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.RESPONSE_NEW_KEY);

  33. hotKeyMsg.setHotKeyModels(list);

  34. //整個app全部發送

  35. appInfo.groupPush(hotKeyMsg);

  36. }

  37. //推送完,及時清理不使用記憶體

  38. allAppHotKeyModels = null;

  39. } catch (Exception e) {

  40. e.printStackTrace();

  41. }

  42. }

  43. });

  44. }

  45. }

  • 會按照key的appName來進行分組,然後通過對應app的channelGroup來推送

  • 當熱key的數量達到10或者每隔10ms,把熱key的資料通過與app的netty通道來發送給app,資料型別為RESPONSE_NEW_KEY

  • LinkedBlockingQueue

    hotKeyStoreQueue:worker計算的給client熱key的集中營,所有給client推送熱key儲存在裡面

    4)client例項節點處理:NodesServerStarter

    ① 第七個@PostConstruct:start()

  1. public class NodesServerStarter {

  2. @Value("${netty.port}")

  3. private int port;

  4. private Logger logger = LoggerFactory.getLogger(getClass());

  5. @Resource

  6. private IClientChangeListener iClientChangeListener;

  7. @Resource

  8. private List<INettyMsgFilter> messageFilters;

  9. @PostConstruct

  10. public void start() {

  11. AsyncPool.asyncDo(() -> {

  12. logger.info("netty server is starting");

  13. NodesServer nodesServer = new NodesServer();

  14. nodesServer.setClientChangeListener(iClientChangeListener);

  15. nodesServer.setMessageFilters(messageFilters);

  16. try {

  17. nodesServer.startNettyServer(port);

  18. } catch (Exception e) {

  19. e.printStackTrace();

  20. }

  21. });

  22. }

  23. }

  • 執行緒池裡面非同步執行,啟動client端的nettyServer

  • iClientChangeListener和messageFilters這兩個依賴最終會被傳遞到netty訊息處理器裡面,iClientChangeListener會作為channel下線處理來刪除ClientInfoHolder下線或者超時的通道,messageFilters會作為netty收到事件訊息的處理過濾器(責任鏈模式)

    ② 依賴的bean:IClientChangeListener iClientChangeListener

public interface IClientChangeListener {
/**
* 發現新連線
*/
void newClient(String appName, String channelId, ChannelHandlerContext ctx);
/**
* 客戶端掉線
*/
void loseClient(ChannelHandlerContext ctx);
}

對客戶端的管理,新來(newClient)(會觸發netty的連線方法channelActive)、斷線(loseClient)(會觸發netty的斷連方法channelInactive())的管理

client的連線資訊主要是在ClientInfoHolder裡面

  • List

     apps,這裡面的AppInfo主要是appName和對應的channelGroup
  • 對apps的add和remove主要是通過新來(newClient)、斷線(loseClient)

    ③ 依賴的bean:List

     messageFilters
/**
* 對netty來的訊息,進行過濾處理
* @author wuweifeng wrote on 2019-12-11
* @version 1.0
*/
public interface INettyMsgFilter {
boolean chain(HotKeyMsg message, ChannelHandlerContext ctx);
}

對client發給worker的netty訊息,進行過濾處理,共有四個實現類,也就是說底下四個過濾器都是收到client傳送的netty訊息來做處理

④ 各個訊息處理的型別:MessageType

APP_NAME((byte) 1),
REQUEST_NEW_KEY((byte) 2),
RESPONSE_NEW_KEY((byte) 3),
REQUEST_HIT_COUNT((byte) 7), //命中率
REQUEST_HOT_KEY((byte) 8), //熱key,worker->dashboard
PING((byte) 4), PONG((byte) 5),
EMPTY((byte) 6);

順序1:HeartBeatFilter

  • 當訊息型別為PING,則給對應的client示例返回PONG

順序2:AppNameFilter

  • 當訊息型別為APP_NAME,代表client與worker建立連線成功,然後呼叫iClientChangeListener的newClient方法增加apps元資料資訊

順序3:HotKeyFilter

  • 處理接收訊息型別為REQUEST_NEW_KEY

  • 先給HotKeyFilter.totalReceiveKeyCount原子類增1,該原子類代表worker例項接收到的key的總數

  • publishMsg方法,將訊息通過自建的生產者消費者模型(KeyProducer,KeyConsumer),來把訊息給發到生產者中分發消費

    • 接收到的訊息HotKeyMsg裡面List

    • 首先判斷HotKeyModel裡面的key是否在白名單內,如果在則跳過,否則將HotKeyModel通過KeyProducer傳送

順序4:KeyCounterFilter

  • 處理接收型別為REQUEST_HIT_COUNT

  • 這個過濾器是專門給dashboard來彙算key的,所以這個appName直接設定為該worker配置的appName

  • 該過濾器的資料來源都是client的NettyKeyPusher#sendCount(String appName, List

     list),這裡面的資料都是預設積攢10s的,這個10s是可以配置的,這一點在client裡面有講
  • 將構造的new KeyCountItem(appName, models.get(0).getCreateTime(), models)放到阻塞佇列LinkedBlockingQueue

     COUNTER_QUEUE中,然後讓CounterConsumer來消費處理,消費邏輯是單執行緒的
  • CounterConsumer:熱key統計消費者

    • 放在公共執行緒池中,來單執行緒執行

    • 從阻塞佇列COUNTER_QUEUE裡面取資料,然後將裡面的key的統計資料釋出到etcd的/jd/keyHitCount/+ appName + “/“ + IpUtils.getIp() + “-“ + System.currentTimeMillis()裡面,該路徑是worker服務的client叢集或者default,用來存放客戶端hotKey訪問次數和總訪問次數的path,然後讓dashboard來訂閱統計展示

2.三個定時任務:3個@Scheduled

1)定時任務1:EtcdStarter#pullRules()

/**
* 每隔1分鐘拉取一次,所有的app的rule
*/
@Scheduled(fixedRate = 60000)
public void pullRules() {
try {
if (isForSingle()) {
String value = configCenter.get(ConfigConstant.rulePath + workerPath);
if (!StrUtil.isEmpty(value)) {
List<KeyRule> keyRules = FastJsonUtils.toList(value, KeyRule.class);
KeyRuleHolder.put(workerPath, keyRules);
}
} else {
List<KeyValue> keyValues = configCenter.getPrefix(ConfigConstant.rulePath);
for (KeyValue keyValue : keyValues) {
ruleChange(keyValue);
}
}
} catch (StatusRuntimeException ex) {
logger.error(ETCD_DOWN);
}
}

每隔1分鐘拉取一次etcd地址為/jd/rules/的規則變化,如果worker所服務的app或者default的rule有變化,則更新規則的快取,並清空該appName所對應的本地key快取

2)定時任務2:EtcdStarter#uploadClientCount()

/**
* 每隔10秒上傳一下client的數量到etcd中
*/
@Scheduled(fixedRate = 10000)
public void uploadClientCount() {
try {
String ip = IpUtils.getIp();
for (AppInfo appInfo : ClientInfoHolder.apps) {
String appName = appInfo.getAppName();
int count = appInfo.size();
//即便是full gc也不能超過3秒,因為這裡給的過期時間是13s,由於該定時任務每隔10s執行一次,如果full gc或者說上報給etcd的時間超過3s,
//則在dashboard查詢不到client的數量
configCenter.putAndGrant(ConfigConstant.clientCountPath + appName + "/" + ip, count + "", 13);
}
configCenter.putAndGrant(ConfigConstant.caffeineSizePath + ip, FastJsonUtils.convertObjectToJSON(CaffeineCacheHolder.getSize()), 13);
//上報每秒QPS(接收key數量、處理key數量)
String totalCount = FastJsonUtils.convertObjectToJSON(new TotalCount(HotKeyFilter.totalReceiveKeyCount.get(), totalDealCount.longValue()));
configCenter.putAndGrant(ConfigConstant.totalReceiveKeyCount + ip, totalCount, 13);
logger.info(totalCount + " expireCount:" + expireTotalCount + " offerCount:" + totalOfferCount);
//如果是穩定一直有key傳送的應用,建議開啟該監控,以避免可能發生的網路故障
if (openMonitor) {
checkReceiveKeyCount();
}
//            configCenter.putAndGrant(ConfigConstant.bufferPoolPath + ip, MemoryTool.getBufferPool() + "", 10);
} catch (Exception ex) {
logger.error(ETCD_DOWN);
}
}
  • 每個10s將worker計算儲存的client資訊上報給etcd,來方便dashboard來查詢展示,比如/jd/count/對應client數量,/jd/caffeineSize/對應caffeine快取的大小,/jd/totalKeyCount/對應該worker接收的key總量和處理的key總量

  • 可以從程式碼中看到,上面所有etcd的節點租期時間都是13s,而該定時任務是每10s執行一次,意味著如果full gc或者說上報給etcd的時間超過3s,則在dashboard查詢不到client的相關彙算資訊

  • 長時間不收到key,判斷網路狀態不好,斷開worker給etcd地址為/jd/workers/+$workerPath節點的續租,因為client會迴圈判斷該地址的節點是否變化,使得client重新連線worker或者斷開失聯的worker

    3)定時任務3:EtcdStarter#fetchDashboardIp()

/**
* 每隔30秒去獲取一下dashboard的地址
*/
@Scheduled(fixedRate = 30000)
public void fetchDashboardIp() {
try {
//獲取DashboardIp
List<KeyValue> keyValues = configCenter.getPrefix(ConfigConstant.dashboardPath);
//是空,給個警告
if (CollectionUtil.isEmpty(keyValues)) {
logger.warn("very important warn !!! Dashboard ip is null!!!");
return;
}
String dashboardIp = keyValues.get(0).getValue().toStringUtf8();
NettyClient.getInstance().connect(dashboardIp);
} catch (Exception e) {
e.printStackTrace();
}
}

每隔30s拉取一次etcd字首為/jd/dashboard/的dashboard連線ip的值,並且判斷DashboardHolder.hasConnected裡面是否為未連線狀態,如果是則重新連線worker與dashboard的netty通道

3.自建的生產者消費者模型(KeyProducer,KeyConsumer)

一般生產者消費者模型包含三大元素:生產者、消費者、訊息儲存佇列

這裡訊息儲存佇列是DispatcherConfig裡面的QUEUE,使用LinkedBlockingQueue,預設大小為200W

1)KeyProducer

  1. @Component

  2. public class KeyProducer {

  3. public void push(HotKeyModel model, long now) {

  4. if (model == null || model.getKey() == null) {

  5. return;

  6. }

  7. //5秒前的過時訊息就不處理了

  8. if (now - model.getCreateTime() > InitConstant.timeOut) {

  9. expireTotalCount.increment();

  10. return;

  11. }

  12. try {

  13. QUEUE.put(model);

  14. totalOfferCount.increment();

  15. } catch (InterruptedException e) {

  16. e.printStackTrace();

  17. }

  18. }

  19. }

判斷接收到的HotKeyModel是否超出”netty.timeOut”配置的時間,如果是將expireTotalCount紀錄過期總數給自增,然後返回

2)KeyConsumer

  1. public class KeyConsumer {

  2. private IKeyListener iKeyListener;

  3. public void setKeyListener(IKeyListener iKeyListener) {

  4. this.iKeyListener = iKeyListener;

  5. }

  6. public void beginConsume() {

  7. while (true) {

  8. try {

  9. //從這裡可以看出,這裡的生產者消費者模型,本質上還是拉模式,之所以不使用EventBus,是因為需要佇列來做緩衝

  10. HotKeyModel model = QUEUE.take();

  11. if (model.isRemove()) {

  12. iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);

  13. } else {

  14. iKeyListener.newKey(model, KeyEventOriginal.CLIENT);

  15. }

  16. //處理完畢,將數量加1

  17. totalDealCount.increment();

  18. } catch (InterruptedException e) {

  19. e.printStackTrace();

  20. }

  21. }

  22. }

  23. }

  24. @Override

  25. public void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {

  26. //cache裡的key,appName+keyType+key

  27. String key = buildKey(hotKeyModel);

  28. hotCache.invalidate(key);

  29. CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);

  30. //推送所有client刪除

  31. hotKeyModel.setCreateTime(SystemClock.now());

  32. logger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());

  33. for (IPusher pusher : iPushers) {

  34. //這裡可以看到,刪除熱key的netty訊息只給client端發了過去,沒有給dashboard發過去(DashboardPusher裡面的remove是個空方法)

  35. pusher.remove(hotKeyModel);

  36. }

  37. }

  38. @Override

  39. public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {

  40. //cache裡的key

  41. String key = buildKey(hotKeyModel);

  42. //判斷是不是剛熱不久

  43. //hotCache對應的caffeine有效期為5s,也就是說該key會儲存5s,在5s內不重複處理相同的hotKey。

  44. //畢竟hotKey都是瞬時流量,可以避免在這5s內重複推送給client和dashboard,避免無效的網路開銷

  45. Object o = hotCache.getIfPresent(key);

  46. if (o != null) {

  47. return;

  48. }

  49. //********** watch here ************//

  50. //該方法會被InitConstant.threadCount個執行緒同時呼叫,存在多執行緒問題

  51. //下面的那句addCount是加了鎖的,代表給Key累加數量時是原子性的,不會發生多加、少加的情況,到了設定的閾值一定會hot

  52. //譬如閾值是2,如果多個執行緒累加,在沒hot前,hot的狀態肯定是對的,譬如thread1 加1,thread2加1,那麼thread2會hot返回true,開啟推送

  53. //但是極端情況下,譬如閾值是10,當前是9,thread1走到這裡時,加1,返回true,thread2也走到這裡,加1,此時是11,返回true,問題來了

  54. //該key會走下面的else兩次,也就是2次推送。

  55. //所以出現問題的原因是hotCache.getIfPresent(key)這一句在併發情況下,沒return掉,放了兩個key+1到addCount這一步時,會有問題

  56. //測試程式碼在TestBlockQueue類,直接執行可以看到會同時hot

  57. //那麼該問題用解決嗎,NO,不需要解決,1 首先要發生的條件極其苛刻,很難觸發,以京東這樣高的併發量,線上我也沒見過觸發連續2次推送同一個key的

  58. //2 即便觸發了,後果也是可以接受的,2次推送而已,毫無影響,客戶端無感知。但是如果非要解決,就要對slidingWindow例項加鎖了,必然有一些開銷

  59. //所以只要保證key數量不多計算就可以,少計算了沒事。因為熱key必然頻率高,漏計幾次沒事。但非熱key,多計算了,被幹成了熱key就不對了

  60. SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);//從這裡可知,每個app的每個key都會對應一個滑動視窗

  61. //看看hot沒

  62. boolean hot = slidingWindow.addCount(hotKeyModel.getCount());

  63. if (!hot) {

  64. //如果沒hot,重新put,cache會自動重新整理過期時間

  65. CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).put(key, slidingWindow);

  66. } else {

  67. //這裡之所以放入的value為1,是因為hotCache是用來專門儲存剛生成的hotKey

  68. //hotCache對應的caffeine有效期為5s,也就是說該key會儲存5s,在5s內不重複處理相同的hotKey。

  69. //畢竟hotKey都是瞬時流量,可以避免在這5s內重複推送給client和dashboard,避免無效的網路開銷

  70. hotCache.put(key, 1);

  71. //刪掉該key

  72. //這個key從實際上是專門針對slidingWindow的key,他的組合邏輯是appName+keyType+key,而不是給client和dashboard推送的hotKey

  73. CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);

  74. //開啟推送

  75. hotKeyModel.setCreateTime(SystemClock.now());

  76. //當開關開啟時,列印日誌。大促時關閉日誌,就不列印了

  77. if (EtcdStarter.LOGGER_ON) {

  78. logger.info(NEW_KEY_EVENT + hotKeyModel.getKey());

  79. }

  80. //分別推送到各client和etcd

  81. for (IPusher pusher : iPushers) {

  82. pusher.push(hotKeyModel);

  83. }

  84. }

  85. }

“thread.count”配置即為消費者個數,多個消費者共同消費一個QUEUE佇列

生產者消費者模型,本質上還是拉模式,之所以不使用EventBus,是因為需要佇列來做緩衝

根據HotKeyModel裡面是否是刪除訊息型別

  • 刪除訊息型別

    • 根據HotKeyModel裡面的appName+keyType+key的名字,來構建caffeine裡面的newkey,該newkey在caffeine裡面主要是用來與slidingWindow滑動時間窗對應

    • 刪除hotCache裡面newkey的快取,放入的快取kv分別是newKey和1,hotCache作用是用來儲存該生成的熱key,hotCache對應的caffeine有效期為5s,也就是說該key會儲存5s,在5s內不重複處理相同的hotKey。畢竟hotKey都是瞬時流量,可以避免在這5s內重複推送給client和dashboard,避免無效的網路開銷

    • 刪除CaffeineCacheHolder裡面對應appName的caffeine裡面的newKey,這裡面儲存的是slidingWindow滑動視窗

    • 推送給該HotKeyModel對應的所有client例項,用來讓client刪除該HotKeyModel

  • 非刪除訊息型別

    • 根據HotKeyModel裡面的appName+keyType+key的名字,來構建caffeine裡面的newkey,該newkey在caffeine裡面主要是用來與slidingWindow滑動時間窗對應

    • 通過hotCache來判斷該newkey是否剛熱不久,如果是則返回

    • 根據滑動時間視窗來計算判斷該key是否為hotKey(這裡可以學習一下滑動時間視窗的設計),並返回或者生成該newKey對應的滑動視窗

    • 如果沒有達到熱key的標準

      通過CaffeineCacheHolder重新put,cache會自動重新整理過期時間

    • 如果達到了熱key標準

      向hotCache裡面增加newkey對應的快取,value為1表示剛為熱key。

    刪除CaffeineCacheHolder裡面對應newkey的滑動視窗快取。

    向該hotKeyModel對應的app的client推送netty訊息,表示新產生hotKey,使得client本地快取,但是推送的netty訊息只代表為熱key,client本地快取不會儲存key對應的value值,需要呼叫JdHotKeyStore裡面的api來給本地快取的value賦值

    向dashboard推送hotKeyModel,表示新產生hotKey

    3)計算熱key滑動視窗的設計

    限於篇幅的原因,這裡就不細談了,直接貼出專案作者對其寫的說明文章:Java簡單實現滑動視窗

    3.3.4 dashboard端

    這個沒啥可說的了,就是連線etcd、mysql,增刪改查,不過京東的前端框架很方便,直接返回list就可以成列表。

    4 總結

    文章第二部分為大家講解了redis資料傾斜的原因以及應對方案,並對熱點問題進行了深入,從發現熱key到解決熱key的兩個關鍵問題的總結。

    文章第三部分是熱key問題解決方案——JD開源hotkey的原始碼解析,分別從client端、worker端、dashboard端來進行全方位講解,包括其設計、使用及相關原理。

    希望通過這篇文章,能夠使大家不僅學習到相關方法論,也能明白其方法論具體的落地方案,一起學習,一起成長。

    -End-