得物熱點探測技術架構設計與實踐

語言: CN / TW / HK

1.概述

説到熱點問題,首先我們先理解一下什麼是熱點?

熱點通常意義來説,是指在一段時間內,被廣泛關注的物品或事件,例如微博熱搜,熱賣商品,熱點新聞,明星直播等等,所以熱點產生主要包含2個條件:1.有限時間, 2流量高聚。

640.png

而在互聯網領域,熱點又主要分為2大類:

1.有預期的熱點:比如在電商活動當中推出的爆款聯名限量款的商品,又或者是秒殺的會場活動等

2.無預期的熱點:比如受到了黑客的惡意攻擊,網絡爬蟲頻繁訪問,又或者突發新聞帶來的流量衝擊等

針對於有預期的熱點可以通過熱點數據預熱, 流量限制和異步隊列進行處理。但是對於突發性無感知的熱點數據流量,往往由於請求過於集中,導致訪問數據流量超出的server的正常負載水位,從而出現服務過載不可用的情況,這種問題被稱之為熱點問題。

2.熱點場景

看完關於熱點問題的簡單介紹,我們已經理解了熱點產生的條件是短時間內被頻繁訪問導致流量高聚,而流量高聚就會出現一系列的熱點問題。那被頻繁訪問的Key,就是我們通常所説的熱Key。

接下來我們來看一下哪些場景會導致熱點問題以及對應的熱Key:

  • MySQL中被頻繁訪問的數據 ,如熱門商品的主鍵Id

  • Redis緩存中被密集訪問的Key,如熱門商品的詳情需要get goods$Id

  • 惡意攻擊或機器人爬蟲的請求信息,如特定標識的userId、機器IP

  • 頻繁被訪問的接口地址,如獲取用户信息接口 /userInfo/ + userId

3.熱點探測技術原理

瞭解完什麼是熱點問題和熱Key出現的場景以後,我們會提出一個疑問,如何去提前感知這些熱點數據?這裏就需要聊到熱點探測技術。

3.1 熱點探測可以帶來什麼好處?

3.1.1 提升性能

解決熱點問題通常會使用分佈式緩存,但是在讀取時還是需要進行網絡通訊,就會有額外的時間開銷。那如果能對熱點數據提前進行本地緩存,即本地預熱,就能大幅提升機器讀取數據的性能,減輕下層緩存集羣的壓力。

  • 注意,本地緩存與實時數據存在不一致的風險。需要根據具體業務場景進行評估,緩存級數越多,數據不一致的風險就越大!

3.1.2 規避風險

對於無預期的熱數據(即突發場景下形成的熱Key),可能會對業務系統帶來極大的風險,可將風險分為兩個層次:

  • 對數據層的風險

正常情況下,Redis 緩存單機就可支持十萬左右 QPS,並能通過集羣部署提高整體負載能力。對於併發量一般的系統,用 Redis 做緩存就足夠了。但是對於瞬時過高併發的請求,因為Redis單線程原因會導致正常請求排隊,或者因為熱點集中導致分片集羣壓力過載而癱瘓,從而擊穿到DB引起服務器雪崩。

  • 對應用服務的風險

每個應用在單位時間所能接受和處理的請求量是有限的,如果受到惡意請求的攻擊,讓惡意用户獨自佔用了大量請求處理資源,就會導致其他人畜無害的正常用户的請求無法及時響應。

因此,需要一套動態熱Key 檢測機制,通過對需要檢測的熱Key規則進行配置,實時監聽統計熱Key數據,當無預期的熱點數據出現時,第一時間發現他,並針對這些數據進行特殊處理。如本地緩存、拒絕惡意用户、接口限流 / 降級等。

3.2 如何進行熱點探測?

首先我們要定義一下如何才能算是一個熱點,我們知道熱點產生的條件是2個:一個時間,一個流量。那麼根據這個條件我們可以簡單定義一個規則:比如 1 秒內訪問 1000 次的數據算是熱數據,當然這個數據需要根據具體的業務場景和過往數據進行具體評估。

對於單機應用,檢測熱數據很簡單,直接在本地為每個Key創建一個滑動窗口計數器,統計單位時間內的訪問總數(頻率),並通過一個集合存放檢測到的熱 Key。

6401.png

而對於分佈式應用,對熱 Key 的訪問是分散在不同的機器上的,無法在本地獨立地進行計算,因此,需要一個獨立的、集中的熱Key 計算單元

我們可以簡單理解為:分佈式應用節點感知熱點規則配置,將熱點數據進行上報,工作節點進行熱點數據統計,對於符合閾值的熱點進行推送給客户端,應用收到熱點信息進行本地緩存等策略這五個步驟:

1.熱點規則:配置熱Key的上報規則,圈出需要重點監測的Key

2.熱點上報:應用服務將自己的熱Key訪問情況上報給集中計算單元

3.熱點統計:收集各應用實例上報的信息,使用滑動窗口算法計算Key的熱度

4.熱點推送:當Key的熱度達到設定值時,推送熱Key信息至所有應用實例

5.熱點緩存:各應用實例收到熱Key信息後,對Key值進行本地緩存(此步驟根據具體業務策略調整) 6402.png

4.Burning

理解完熱點探測原理以後,我們來聊聊得物的熱點探測中間件Burning。

作為潮流互聯網電商平台,得物的電商業務高速發展,突發性的熱點數據不斷的衝擊着我們的系統服務,比如大促秒殺,熱點商品,惡意攻擊等等。針對於這種突發性的大流量,單純的機器擴容並不是一個有效的解決手段,我們需要一個集熱點探測,熱點感知,熱點數據推送,熱點數據預熱,熱點監控分析等功能於一體的熱點探測中間件,因此Burning應運而生。

4.1 價值意義

Burning作為得物的熱點探測中間件,提供可供業務方接入的SDK包和管理台規則配置,用於對熱點數據的實時性監控,探測,操作和本地緩存等。主要解決了以下問題:

  • 實時熱點感知:能實時監控熱點數據,包含熱Key,熱數據,熱接口等,秒級上報集羣統一計算

  • 本地數據預熱:對於特定場景可以通過動態本地緩存配置,防止流量突增導致Redis或DB數據流量壓力過大導致系統雪崩

  • 週期熱點統計:對熱點數據進行週期性統計分析,標記出熱Key規則及分佈比例等,可以幫助業務方進行鍼對性優化治理和營銷策略選擇

  • 系統安全治理:可以通過熱點Key探測分析,對於刷子用户,問題IP,機器人和爬蟲進行標識,可實時熔斷存在安全風險的請求,提高系統安全和可用性

4.2 關鍵指標

為滿足高併發場景,熱點探測中間件Burning在設計的時候,重點關注瞭如下指標:

1.實時性:熱點問題往往具備突發性,客户端必須能夠實時發現可疑熱Key並推送給計算單元進行探測

2.高性能:熱點探測往往需要處理大量的熱點探測請求和熱點計算,因此熱點探測中間件的性能要求較高,才能滿足巨量的併發並有效降低成本

3.準確性:熱點探測需要精準的探測符合規則熱Key,實時監聽規則的變化,正確的進行熱Key上報和熱Key計算

4.一致性:熱點探測需要保證應用實例的本地緩存熱Key一致,當熱Key變更導致value失效時,應用需要同時進行失效來保證數據一致性,不能出現數據錯誤

5.可擴展:熱點探測需要統計和計算的Key量級很大,而且存在突發流量的情況,統一計算集羣需要具備水平擴展的能力

6403.png

4.3 架構設計

Burning的架構設計遵循了以上熱點探測的技術原理,同時借鑑了jd-hotKey的設計思路,主要分為Burning-Admin、Burning-Worker、Burning-Config、Burning-Client四個模塊:

  • Burning-Admin (熱點探測管理台):與Worker節點Netty長鏈接通信,提供不同維度的應用管理和熱點規則配置,提供查詢熱點數據統計,規則和熱點數據監控大盤,提供工作集羣信息查詢及客户端節點信息查詢,提供本地緩存動態配置及熱點信息實時通知

  • Burning-Worker(熱點集中計算單元):無狀態server端,與管理台和客户端進行Netty長鏈接通信,獲取規則,滑動窗口計算熱點,將熱點記錄推送到管理台展示和客户端處理

  • Burning-Config(熱點配置中心):作為熱點、規則配置中心和註冊中心,將規則配置下發到Worker節點和客户端,通過Raft算法進行系統高可用一致性保證

  • Burning-Client(熱點客户端SDK):與Worker節點建立Netty長鏈接通信,監聽配置中心配置變化定時推送熱Key數據,獲取熱Key推送本地內緩存設置,與Redis-client無縫集成及其他ORM框架無縫集成

6404.png

4.4 鏈路流程

熱點探測主要包含以下幾個主要流程:

1.用户在管理後台(Burning-Admin)進行熱點規則配置並進行熱點數據實時監控

2.管理後台(Burning-Admin)將規則配置信息上傳給配置中心(Burning-Config)

3.配置中心(Burning-Config)將熱點規則下發給客户端(Buring-Client)和工作節點(Burning-Worker)

4.客户端(Burning-Client)獲取到規則, 將指定規則的熱Key定時上報給工作節點(Burning-Worker)

5.工作節點(Burning-Worker)獲取到上報的熱Key後進行滑動時間窗口計算,對於滿足閾值的熱點推送給客户端(Burning-Client)

6.客户端(Burning-Client)拿到熱點數據後,進行對應的本地緩存配置

6405.png

4.5 核心代碼

  • 客户端啟動器ClientStarter,啟動配置中心和註冊中心,Worker建連,註冊事件監聽,設置app_name、port、caffeine緩存大小、cache配置、監控配置等

public synchronized static void startPipeline(BurningCommonProperties burningCommonProperties) {

    if (STARTED.get() == Boolean.FALSE) {
        DwLogger.info("start pipeline");
        // 設置參數上下文
        setToContext(burningCommonProperties);
        // 配置中心啟動
        EtcdConfigFactory.buildConfigCenter(burningCommonProperties.getConfigServer());
        ConfigStarter starter = EtcdConfigStarter.getInstance();
        starter.start();
        // 註冊中心啟動
        RegisterFactory.buildRegisterCenter(burningCommonProperties);
        RegisterStarter registerStarter = RegisterStarter.getInstance();
        registerStarter.start();
        // 熱點探測啟動
        DetectFactory.startDetect(burningCommonProperties.getPushPeriod());
        // 開啟worker重連器
        WorkerRetryConnector.retryConnectWorkers();
        // 註冊事件監聽
        registEventBus();
        // 開啟監控
        MetricsFactory.startMetrics();
        STARTED.set(Boolean.TRUE);
    }

}
  • 客户端進行熱Key判斷,如果符合規則就上報給Worker節點計算,同時進行統計計數
public static Object dynamicGetValue(String key, KeyType keyType) {
    try {
        //如果沒有為該key配置規則,就不用上報key
        Boolean dynamicRule = dynamicRule(key);
        if (dynamicRule == null) {
            return null;
        }
        Object userValue = null;

        ValueModel value = getValueSimple(key);

        if (value == null) {
            HotKeyPusher.push(key, keyType);
        } else {
            //臨近過期了,也發
            if (isNearExpire(value)) {
                HotKeyPusher.push(key, keyType);
            }
            Object object = value.getValue();
            //如果是默認值,也返回null
            if (object instanceof Integer && Constant.MAGIC_NUMBER == (int) object) {
                userValue = null;
            } else if (Boolean.FALSE.equals(dynamicRule)) {
                userValue = null;
            } else {
                userValue = object;
            }
        }

        //統計計數
        MetricsFactory.metrics(new KeyHotModel(key, value != null));

        return userValue;
    } catch (Exception e) {
        DwLogger.error(DwHotKeyStore.class, "get value error");
        return null;
    }
}
  • Worker節點啟動nettyServer,用於各個業務服務實例進行長連接,接收客户端上報數據
public void startNettyServer(int port) throws Exception {
    //boss單線程
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    //worker節點組
    EventLoopGroup WorkerGroup = new NioEventLoopGroup(CpuNum.WorkerCount());
    try {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, WorkerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .option(ChannelOption.SO_BACKLOG, 1024)
                //保持長連接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //出來網絡io事件,如記錄日誌、對消息編解碼等
                .childHandler(new ChildChannelHandler());
        //綁定端口,同步等待成功
        ChannelFuture future = bootstrap.bind(port).sync();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            bossGroup.shutdownGracefully (1000, 3000, TimeUnit.MILLISECONDS);
            WorkerGroup.shutdownGracefully (1000, 3000, TimeUnit.MILLISECONDS);
        }));
        //等待服務器監聽端口關閉
        future.channel().closeFuture().sync();
    } catch (Exception e) {
        DwLogger.error("netty server start error.", e);
    } finally {
        //優雅退出,釋放線程池資源
        bossGroup.shutdownGracefully();
        WorkerGroup.shutdownGracefully();
    }
}
  • Worker節點通過監聽客户端上報,異步消費隊列Client消息
public void beginConsume() {
    while (true) {
        try {
            HotKeyModel model = QUEUE.take();
            if (model.isRemove()) {
                iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
            } else {
                iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
            }
            //處理完畢,將數量加1
            totalDealCount.increment();
        } catch (Exception e) {
            DwLogger.error("consumer error.", e);
        }
    }
}
  • 如果是新增一個Key,就生成滑動窗口,基於時間窗口數據判斷是否熱Key
@Override
public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
    //cache裏的key
    String key = buildKey(hotKeyModel);
    String name = StringUtils.isBlank(hotKeyModel.getGroup()) ? hotKeyModel.getAppName() : hotKeyModel.getGroup();

    //判斷是不是剛熱不久
    Object o = hotCache.getIfPresent(key);
    if (o != null) {
        return;
    }
    SlidingWindow slidingWindow = checkWindow(hotKeyModel, key, name);
    //看看hot沒
    boolean hot = slidingWindow.addCount(hotKeyModel.getCount());

    if (!hot) {
        //如果沒hot,重新put,cache會自動刷新過期時間
        CaffeineCacheHolder.getCache(name).put(key, slidingWindow);
    } else {
        hotCache.put(key, 1);

        //刪掉該key
        CaffeineCacheHolder.getCache(name).invalidate(key);

        //開啟推送
        hotKeyModel.setCreateTime(SystemClock.now());

        //當開關打開時,打印日誌。大促時關閉日誌,就不打印了
        if (ConfigStarter.LOGGER_ON) {
            DwLogger.info(NEW_KEY_EVENT + hotKeyModel.getKey());
        }

        //分別推送到各client和etcd
        for (IPusher pusher : iPushers) {
            pusher.push(hotKeyModel);
        }

    }

}
  • 如果是刪除一個Key,這裏刪除包含客户端發消息刪除,本地線程掃描過期Key和管理台刪除
@Override
public void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
    //cache裏的key
    String key = buildKey(hotKeyModel);
    String name = StringUtils.isBlank(hotKeyModel.getGroup()) ? hotKeyModel.getAppName() : hotKeyModel.getGroup();
    hotCache.invalidate(key);
    CaffeineCacheHolder.getCache(name).invalidate(key);

    //推送所有client刪除
    hotKeyModel.setCreateTime(SystemClock.now());
    DwLogger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());

    for (IPusher pusher : iPushers) {
        pusher.remove(hotKeyModel);
    }

}
  • Worker計算完成後將結果異步推送給Client,通過app進行分組批量推送
@PostConstruct
public void batchPushToClient() {
    AsyncPool.asyncDo(() -> {
        while (true) {
            try {
                List<HotKeyModel> tempModels = new ArrayList<>();
                //每10ms推送一次
                Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);
                if (CollectionUtil.isEmpty(tempModels)) {
                    continue;
                }

                Map<String, List<HotKeyModel>> allAppHotKeyModels = Maps.newHashMap();
                Map<String, List<HotKeyModel>> allGroupHotKeyModels = Maps.newHashMap();

                //拆分出每個app的熱key集合,按app分堆
                for (HotKeyModel hotKeyModel : tempModels) {
                    if (StringUtils.isNotBlank(hotKeyModel.getGroup())) {
                        List<HotKeyModel> groupModels = allGroupHotKeyModels.computeIfAbsent(hotKeyModel.getGroup(), (key) -> new ArrayList<>());
                        groupModels.add(hotKeyModel);
                    } else {
                        List<HotKeyModel> oneAppModels = allAppHotKeyModels.computeIfAbsent(hotKeyModel.getAppName(), (key) -> new ArrayList<>());
                        oneAppModels.add(hotKeyModel);
                    }
                }

                CustomizedMetricsProcessor processor = CustomizedMetricsProcessor.builder(MetricsConstant.BURNING_NETTY_OUT).build();

                // group hot key push
                pushGroup(processor, allGroupHotKeyModels);

                // app hot key push
                pushApp(processor, allAppHotKeyModels);

            } catch (Exception e) {
                DwLogger.error("push to client error.", e);
            }
        }
    });
}

4.6 最佳實踐

Burning提供了2種使用方式,一是通過原生方法調用,二是通過聲明式註解@EnableBurning , 以下對使用註解進行熱點探測的部分場景提供最佳實踐:

1.進行熱點判斷,用於熱點攔截和自定義處理實現

@Component
public class Cache {
    @EnableBurning(prefix = "hot_Key_", cache = false, hitHandler = ExceptionHitHandler.class)
    public String getResult2(String Key) {
        return "這是一個測試結果" + Key;
    }
}

2.命中熱點規則處理類,可進行自定義實現hitHandler接口(注意cache=false)

public class ExceptionHitHandler implements HitHandler {
   @Override
   public Object handle(String Key, ProceedingJoinPoint joinPoint) {
       //此處可自定義實現
      throw new RuntimeException("對不起,您沒有權限訪問: " + Key);
   }
}

3.用於Redis緩存熱點探測

@Component
public class Cache {
​
    @Resource
    private RedisTemplate<String, String> RedisTemplate;
​
    @EnableBurning
    public String getResult(String Key) {
        return RedisTemplate.opsForValue().get(Key);
    }
}

4.用於MySQL熱數據緩存

@Repository
public class SmsSignRepo {
​
   @Autowired
   private SmsSignMapper smsSignMapper;
​
   @EnableBurning(prefix = "SMS_SIGN", dynamic = false, KeyType = DATABASE_Key)
   public List<SmsSign> getAll() {
      Example example = new Example(SmsSign.class);
      Example.Criteria criteria = example.createCriteria();
      criteria.andEqualTo("status", 1);
      return smsSignMapper.selectByExample(example);
   }
}

4.7 性能表現

4.7.1 Worker節點性能壓測

上游40個測試調用實例共同調用的場景下,併發數800,遞進壓測

640.jpg

壓測結果:1個4C8G工作節點每秒可平穩處理約15W個key的熱點探測,成功率大於99.999%,worker節點CPU平均佔用為80%,內存佔用60%

4.7.2 Client業務應用性能壓測

  • DB場景壓測

Client配置為4C8G,120個併發請求,壓測時長10min

1.原生未接入Burning的DB操作接口場景

640.png

壓測結果:未接入burning,處理總請求數約112萬,平均TPS約1500,平均RT約63MS。CPU在壓測滿載情況下100%,內存平均使用48%

2.接入Burning的DB操作接口場景

6402.jpg

壓測結果:接入burning後,處理總請求數457萬(對比未接入Burning增加345萬),平均TPS約5800(對比未接入Burning增加4300),平均RT約8MS(對比未接入Burning下降55MS)。CPU在壓測滿載情況下100%,內存平均使用50%(對比未接入上升2%,本地緩存消耗

  • Redis場景壓測

Client配置為4C8G,120個併發請求,壓測時長10min

1.原生未接入Burning的Redis操作接口場景

6401.png

壓測結果:未接入burning,處理總請求數約298萬,平均TPS約3800,平均RT約14MS。CPU在壓測滿載情況下100%,內存平均使用48%

2.已接入Burning的Redis操作接口場景

6402.png

壓測結果:已接入burning,處理總請求數約443萬(對比未接入增加145萬),平均TPS約5700(對比未接入上升1900),平均RT約8MS(對比未接入下降6ms)。CPU在壓測滿載情況下100%,內存平均使用48%,基本持平

4.7.3 壓測報告

  • Burning工作節點單機每秒處理15萬個key的探測請求,CPU穩定在80%左右,基本無任何異常

  • 客户端應用接入burning後,對應用實例本身CPU負載基本無影響,內存佔用上升主要取決於指定的本地緩存大小,接入後接口性能提升明顯,QPS明顯上升,RT明顯下降

5.總結

熱點問題在互聯網場景中屢屢出現,特別是電商業務的需求場景,例如對於大促期間或者活動搶購期間的某個爆品,可能會出現在幾秒時間內流入大量的流量,由於商品數據在Redis cluster場景下會按照hash規則被存放在某個Redis分片上,那麼這個瞬間流量也有可能出現打掛Redis分片,導致系統雪崩。所以我們要善於利用熱點探測中間件進行熱Key探測,通過預置本地緩存解決突發流量導致的系統瓶頸,也能通過熱點數據監控分析進行鍼對性的系統調優。

得物熱點探測組件Burning上線至今,支持了數十個交易核心鏈路服務,在滿足基礎熱點探測的前提下,Burning還支持本地緩存壓測標/染色標識別能力,客户端本地Ecache/Caffeine緩存模式選擇,熱點規則Group聚合統計等擴展能力。應用服務接入Burning後對於熱點數據探測及數據獲取性能顯著提高,通過預熱&實時本地緩存,極大的降低了下層緩存集羣和數據庫的負載壓力,為業務服務的健康運作保駕護航。

文/Leo