精讀 RocketMQ 源碼系列(1)--- NameServer

語言: CN / TW / HK

theme: channing-cyan

一、前言

「本文已參與好文召集令活動,點擊查看:後端、大前端雙賽道投稿,2萬元獎池等你挑戰!」

相信看了 https://juejin.cn/post/6982206285462634510 中推薦的官方中文文檔的同學一定已經對 NameServer 有了初步的瞭解。這裏我們總結一下:

  1. NameServer 是路由註冊中心
  2. NameServer 的主要功能是:註冊發現和路由刪除

在閲讀源碼時,我希望自己是帶着問題去看源碼。我也認為,學習的第一步就是要學會提問,有時候一個好的問題比一個精彩的答案更重要。

在瞭解 NameServer 的技術架構、主要功能之後,可能會提出這麼幾個問題:

  1. NameServer 是怎麼實現註冊發現和路由刪除功能的?
  2. NameServer 是集羣部署的,但彼此之間不通信,不同 NameServer 之間產生的數據不一致問題,怎麼解決?
  3. 為什麼不選擇使用 zookeeper 作為註冊中心,而選擇自研 NameServer?

針對這三個問題,嘗試去源碼中尋找答案吧......

二、啟動流程

面對一個東西,我們的第一個問題往往是:它從何而來,生命週期的源頭在哪? 對於一些組件,這個問題就是:它是如何啟動的?

接下來就先來看看 NameServer 是如何啟動的。

啟動類:org/apache/rocketmq/namesrv/NamesrvStartup.java

nameserver.png

可以參考上面這張流程圖,自己將這部分源碼過一遍。

2.1 NameSrvStartup#main0

首先看到 main0方法:

```java public static NamesrvController main0(String[] args) {

    try {
        NamesrvController controller = createNamesrvController(args);
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

```

這個方法就做了兩件事:

  1. 創建一個 NamesrvController 實例
  2. 啟動該實例

從名字上可以看出 NamesrvControllerNameServer 的核心控制器,因此 NamesrvController 的啟動,主要也是在啟動它。

2.2 NameSrvStartup#createNamesrvController

填充配置對象的屬性

該方法首先是對兩個配置對象進行屬性填充,填充方式有兩種:

  • -c:後跟配置文件路徑
  • -p:表示通過--屬性名 屬性值的形式配置

相關屬性如下:

```java public class NamesrvConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

// rocketmq 主目錄,通過設置 ROCKETMQ_HOME 配置主目錄,在源碼環境搭建的過程中有這一步
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// 用於存儲 KV配置的持久化路徑
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// 默認配置文件路徑,不生效。若需要在啟動時配置 NameServer 啟動屬性,使用 -c 配置文件路徑 的方法
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
// 是否支持順序消息,默認不支持,沒大用,就是在處理客户端獲取路由數據時標識下,如果支持順序消息,需要返回對應 topic 的順序消息配置屬性
private boolean orderMessageEnable = false;

// getter and setter ... } ```

```java public class NettyServerConfig implements Cloneable { private int listenPort = 8888; // NameServer 監聽端口,默認會被初始化為 9876 private int serverWorkerThreads = 8; // Netty 業務線程池線程個數 private int serverCallbackExecutorThreads = 0; // Netty public 任務線程池線程個數,Netty 網絡設計,根據業務類型會創建不同的線程池 // 比如處理消息發送,消息消費、心跳檢測等。如果該業務類型未註冊線程池,則由public線程池執行 private int serverSelectorThreads = 3; // IO 線程池個數,主要是 NameServer、Broker 端解析請求、返回相的線程個數,這類線程主要是處理 // 網絡請求的,解析請求包,然後轉發到各個業務線程池完成具體的操作,然後將結果返回給調用方 private int serverOnewaySemaphoreValue = 256; // send oneway 消息請求併發度(broker 端參數) private int serverAsyncSemaphoreValue = 64; // 異步消息發送最大併發度 private int serverChannelMaxIdleTimeSeconds = 120; // 網絡連接最大空閒時間

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 網絡 socket 發送緩衝區大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; // 網絡接收端緩存區大小
private boolean serverPooledByteBufAllocatorEnable = true; // bytebuffer是否開啟緩存

/**
 * make make install
 *
 *
 * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
 * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
 */
private boolean useEpollNativeSelector = false; // 是否啟用 Epoll IO 模型

```

創建 NameServerController 對象

根據以上填充好的配置創建對象,並將配置備份在 NameServerController

```java final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); ```

2.3 NameSrvStartup#start

首先執行 initialize 方法,這個方法還是做了很多事:

```java public boolean initialize() {

    // 1. 加載 kv 配置
    this.kvConfigManager.load();

    // 2. 創建 netty 服務端
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    // 3. 創建接收客户端請求的線程池
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            // 4. 創建一個定時任務,每隔 10 秒掃描不活躍的 Broker
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

... } ```

然後做了一件很重要的事:註冊了一個鈎子方法,監聽 JVM 退出事件,在退出時進行 controller 的資源釋放。

然後啟動 controller

```java // 註冊了一個鈎子方法,監聽 JVM 退出事件,在退出時進行 controller 的資源釋放 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() { @Override public Void call() throws Exception { controller.shutdown(); return null; } }));

controller.start(); ```

“如果代碼中使用了線程池,一種優雅停機的方式就是註冊一個 JVM 鈎子函數,在 JVM 關閉之前,先將線程池關閉,及時釋放資源”

——《RocketMQ 技術內幕》

到這還沒結束,我們繼續往下看,controller.start()

```java public void start() throws Exception { // 啟動 netty 服務端,用於接收客户端請求 this.remotingServer.start();

    if (this.fileWatchService != null) {
        // 啟動監聽TLS配置文件的線程
        this.fileWatchService.start();
    }
}

```

所以最後可以看到,啟動 NameServer 就是為了啟動這個 netty 服務端,然後就可以接收來自 broker 的註冊請求和客户端的路由發現請求。

對於fileWatchService,它監聽的是 TLS(Transport Layer Security,傳輸層安全協議) 配置文件,並不涉及業務邏輯。因此就不詳細深入了,對網絡安全協議感興趣的同學可以上網學習。

至此,啟動流程分析完畢。最後來小結下:

  1. 首先通過命令行參數、配置文件、默認配置填充NamesrvConfigNettyServerConfig
  2. 根據以上兩個配置對象創建NamesrvController,並備份配置信息到NamesrvController
  3. 啟動NamesrvController實例,實際上啟動的是 netty 服務端

怎麼樣,其實並不是很難對不對,接下來我們繼續問問題。

啟動之後,NameServer 又做了哪些事呢?本文一開始就已經給出了答案:註冊發現和路由刪除,緊接着文章開頭其實就提出了一個問題:

NameServer 是怎麼實現註冊發現和路由刪除功能的?

繼續從源碼中尋找答案......

三、註冊發現

註冊和發現其實是兩個動作,但針對的都是 broker。註冊是指 broker 註冊到 NameServer,發現是指 producer 和 consumer 通過 NameServer 發現 broker。

3.1 註冊

既然是“broker 註冊到 NameServer”,那當然要先去 broker 的源碼中尋找答案。問題是 broker 源碼那麼多,我該從哪下手呢?這時候可以停下來想想,如果是我自己去設計一個系統,這個系統需要將自己的存活狀態上報至一個註冊中心,我會選擇在什麼時候去註冊呢?

應該容易想到,最好是啟動成功的時候就馬上去註冊,然後與註冊中心建立一個心跳機制,一直不停的吿訴註冊中心:我還活着!

不管這個想法對不對,但至少這個時候有方向了,我知道應該去 broker 的一大堆源碼中,先找它的啟動流程源碼。

這就是一直強調的帶着問題去讀源碼,通過問問題,讓自己閲讀源碼時更具有目的性;通過對問題的思考,可以提出自己的猜想,如果猜想是對的,恭喜你,你將會收穫成就感;如果猜想是錯的,更要恭喜你,你可以對比自己的猜想和源碼的實現,看看差在哪個地方,這個地方就是你提升的空間。

3.1.1 broker 啟動流程

代碼位置:BrokerStartup#main

流程圖如下:

broker_startup.png

以上流程圖的大致步驟是:

  1. 創建 broker 的核心控制器 BrokerController
  2. 啟動 BrokerController:這一步會啟動很多服務,如消息存儲服務、netty 服務端、fileWatchService、以及我們重點關心的給 NameServer 發送心跳的服務

這裏我們重點分析圖中的第 18~20:

```java // 1. 在 broker 啟動時,先做一次註冊 if (!messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); this.registerBrokerAll(true, false, true); }

    // 2. 接下來先延遲 10s,然後每隔30s(brokerConfig.getRegisterNameServerPeriod()我配置的是30s)進行一次註冊
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

```

為什麼要先延遲 10s?

因為 broker 剛剛已經發送了註冊請求,沒有必要立馬再進行註冊,所以定時任務線程池先延遲了10s。這種設計很細節,但在業務上是有效的,避免不必要的資源浪費。

要正確理解 registerBrokerAll方法的意思:這個方法並不是把“所有的broker”都註冊,而是把該 broker 註冊到所有的 NameServer 上,這一點在後面的源碼中可以得到驗證。

3.1.2 registerBrokerAll

這個方法有三個參數:

java boolean checkOrderConfig, // 是否校驗 順序消息配置 boolean oneway, // 是否是 單向發送,單向發送不接收返回值 boolean forceRegister // 是否強制註冊

```java public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { // topicConfigWrapper 中封裝了 該broker 上的 topic 信息和 dataVersion TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    // 這塊代碼的作用是將 topicConfigWrapper 中的值取出來重新封裝一遍,又再塞回 topicConfigWrapper,我理解是為了將 this.brokerConfig.getBrokerPermission()
    // 的屬性值 set 進去。不過這並不是很重要的細節,我們只要知道 topicConfigWrapper 至少包含了該broker 上的 topic 信息和 dataVersion 即可
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp =
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                    this.brokerConfig.getBrokerPermission());
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }

    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

```

重點看一下最下面的這塊if語句的內容,由於forceRigister == true,所以後面的 needRegister方法的邏輯並沒有機會執行。但這裏還是簡單講一下這個方法:

  1. broker 請求 NameServer,查詢 broker 相關的配置在 NameServer 端的數據版本,請求類型:QUERY_DATA_VERSION = 322
  2. NameServer 接收請求,DefaultRequestProcessor#processRequest,會將從 broker 發送過來的 dataVersion 和 NameServer 存儲的進行比較,如果不相等,則需要在 NameServer 端更新 broker 的心跳更新時間lastUpdateTimestamp;如果相等返回changed == false的結果
  3. Broker 端處理所有 NameServer 返回的結果,只要有一個 changed == true,那麼needRegister == true

也就是需要執行doRegisterBrokerAll

當然在當前 broker 啟動過程中,是一定會執行 doRegisterBrokerAll的。

3.1.3 doRegisterBrokerAll

這個方法主要做了兩件事:

  1. 調用 brokerOuterAPI.registerBrokerAll進行註冊
  2. 處理註冊結果 registerBrokerResultList:進行 master 地址的更新、順序消息Topic的配置更新

重點在第一步。

BrokerOuterAPI 這個類是 broker 對外交互的類,其中封裝了 RemotingClient remotingClient ,在這裏是作為客户端向 NameServer 發送真正的註冊請求。

看一下registerBrokerAll 中核心的一塊代碼

```java // CountDownLatch 使得只有所有 nameServer 的響應結果都返回時才會繼續執行後續的邏輯 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 遍歷所有的 NameServer,並將註冊任務 registerBroker 丟進 brokerOuterExecutor 線程池中執行 for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); }

                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        // 每返回一個結果,減1
                        countDownLatch.countDown();
                    }
                }
            });
        }

        try {
            // 主線程阻塞在此,直到所有的 countDownLatch 減為0
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }

```

至於registerBroker,裏面的邏輯就是去調用 netty 客户端的 invokeSyncinvokeOneway,去向 NameServer 發送請求。具體的通信過程和原理涉及到 netty,不是本文的重點。筆者也計劃後續會進行 netty 源碼的解析,敬請期待。

值得一提的是,上面這段代碼採用多線程的方式,使用到了:

  • CountDownLatch
  • BrokerFixedThreadPoolExecutor :父類是 ThreadPoolExecutor
  • CopyOnWriteArrayList:存儲線程執行結果,因為存在多線程的寫操作,所以需要使用併發安全的容器

對併發編程感興趣的同學可以學習下這裏的用法,同時有興趣瞭解其原理的,可以查看 JDK 源碼。

到這一步,對於 註冊 這件事來説,broker 端算是完成了它的工作,後續就是 NameServer 接收到請求去處理的事了。

3.1.4 NameServer 處理註冊請求

我們再回到 NameServer ,看看是怎麼處理註冊請求的。我們稍微思考下,這個處理請求的代碼入口在哪呢?(實際過程中是通過代碼調試得知入口的,但代碼調試得到的結果實際上有點像翻答案,我們可以嘗試自己先思考下)

當然首先是 netty 服務端先接收到請求,因此我們先去看一下 NettyRemotingServer,看了一圈發現這個類裏並沒有類似處理請求的方法。但是這個類集成了 NettyRemotingAbstract,我們繼續在這裏找一下,發現了這個類裏有個方法叫 processRequestCommand

這個類最後會調用到 NameServer 的 DefaultRequestProcessor#processRequest,這個方法中幫助 NameServer 處理來自客户端和 Broker 的各種請求。

流程圖:

namesrv_process_request.png

首先方法一進來就是一個 switch case,找到 REGISTER_BROKER

java switch (request.getCode()) { ... case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); // 根據版本區分,實際兩個方法沒有很大的區別 if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } }

中間的請求crc校驗、請求參數的處理過程我們不詳細看了。值得一提的是,NameServer 中管理路由信息的類是 RouteInfoManager,其維護了五張表:

java private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;// Topic 消息隊列的路由信息,消息發送時根據此表進行負載均衡 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;// broker 地址信息 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;// 集羣信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;// 存活的broker private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

看一下更新 brokerLiveTable 的邏輯,注意:其它幾張表也會被更新

java BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr));

把當前系統時間作為 lastUpdateTimestamp broker 上報心跳的時間。

後續會將處理結果封裝並返回到請求響應中,通過 netty 再返回給 broker,與上面分析的 broker 端的流程形成閉環。

至此,關於註冊的源碼分析全部完成,最後來小結一下

3.1.5 小結

  1. broker 服務器在啟動時會向所有的 NameServer 註冊,並建立長連接,之後每隔30s發送一次心跳,內容包括 brokerId,broker 地址,名稱和集羣信息
  2. NameServer 接收到心跳包後,會將整個消息集羣的數據存入到 RouteInfoManager 的幾個HashMap中,並更新 lastUpdateTimestamp

其中涉及到的一些可深入的技術點:

  1. 併發編程,包括線程池的使用、併發組件(CountDownLatch、CopyOnWriteArrayList)、鎖的使用(NameServer 更新幾張路由表的時候)
  2. netty 相關的網絡編程知識

3.2 發現

客户端在從 NameServer 中獲取 broker 相關信息,這個過程就是路由發現。我們以生產者為例分析路由發現。

路由信息在生產者中存放在 DefaultMQProducerImpl.topicPublishInfoTable

java private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

是一個併發安全的容器,為什麼要使用ConcurrentMap呢,因為它的寫入口實際上有兩個,也就是生產者路由發現的時機有兩個:

  1. 發送消息時會去檢查 topicPublishInfoTable 是否為空或可用,不符合條件則去 NameServer 中查詢
  2. DefaultMQProducer#send,實際調用的是 DefaultMQProducerImpl#tryToFindTopicPublishInfo
  3. 生產者啟動時也會啟動一個定時任務,定時從 NameServer 上拉取 topic 信息
  4. 這個代碼路徑是:DefaultMQProducerImpl#start -> mQClientFactory.start() -> MQClientInstance#this.startScheduledTask()-> MQClientInstance#this.updateTopicRouteInfoFromNameServer();

為什麼要有兩個入口呢?

  • 這是因為路由信息變更時,nameserver不會主動推送,需要客户端主動拉取路由信息才能將客户端上路由信息進行更新。請求類型 GET_ROUTEINFO_BY_TOPIC,調用RouteInfoManagerpickupTopicRouteData方法,這樣設計的目的是降低 NameServer 的複雜度。因此第2種方式是必不可少的。
  • 而第一種方式,就更好理解了,按需更新,這裏的需是指發消息的需求。

兩個方法都會調用下面這個方法MQClientInstance

java public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)

最終調用 MQClientAPIImpl#getTopicRouteInfoFromNameServer

```java public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic);

  // 請求code: GET_ROUTEINFO_BY_TOPIC,最終又會前面提到的 processRequest 方法中,根據code找到處理邏輯
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.TOPIC_NOT_EXIST: {
            if (allowTopicNotExist) {
                log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
            }

            break;
        }
        case ResponseCode.SUCCESS: {
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        default:
            break;
    }

    throw new MQClientException(response.getCode(), response.getRemark());
}

```

NameServer 端處理請求的邏輯比較簡單,就是查詢一下路由信息然後返回,這裏就不再贅述。

至此,NameServer 的註冊發現分析完畢

四、路由刪除

路由刪除有兩個觸發點:

  1. broker 非正常關閉,NameServer 發現 broker 無響應,將其刪除。詳細過程:
  2. Broker 每隔 30s 向 nameserver 發送心跳包,並更新 brokerLiveTable 中的信息,尤其是 lastUpdateTimestamp。
  3. namaserver 每隔10s會掃描 brokerLiveTable,如果發現 lastUpdateTimestamp 距離當前時間已經超過了120s,則認為 Broker 宕機,會進行路由刪除操作。
  4. Broker 正常關閉時,與 NameServer 斷開長連接,會執行 unregisterBroker 指令

路由刪除比較簡單,大家可以對照源碼驗證下這裏講的兩個觸發點。

五、前言中提到的幾個問題

現在來回顧一下文章開頭我們提到的三個問題。第一個問題已經解答完了,重點看一下第二和第三個問題

5.1 NameServer 如何解決數據不一致的問題

首先要理解為什麼 NameServer 會有數據不一致的問題。因為 NameServer 雖然是集羣部署,但各個節點之間是相互獨立不進行通信的。那麼在進行路由註冊、刪除時,不同節點之間存在不一樣數據的情況是必然存在的

如何解決呢?事實上,RocketMQ 並不認為這是一個需要去解決的問題。因為 Topic 路由信息本身就不需要追求集羣中各個節點的強一致性,只需要做到最終一致性。

説白了,NameServer 的各個節點根本不關心自己的數據和別的節點是不是一致。關心這件事的人是生產者和消費者。而客户端關心這件事的本質其實是:我希望我拿到的路由信息儘量是正確的,可用的,也就是我根據獲取到路由信息選擇了一個 broker 去發送消息,這個 broker 是能正常接收到的。

那這就有問題了,因為上面講路由刪除的時候,提到了: NameServer 發現 lastUpdateTimestamp 距離當前時間已經超過了120s,才認為 Broker 宕機,會進行路由刪除操作。也就是説,會有 2 分鐘的空檔,這 2 分鐘,很可能生產者會向一個已經宕機的 broker 發送消息。那這種情況怎麼辦呢?

這個問題先按下不表,因為答案並不在 NameServer,而是在 producer 中。重要的是,現在我又有了一個好問題!

5.2 為什麼rocketmq選擇自己開發一個NameServer,而不是使用zk

事實上,在RocketMQ的早期版本,即MetaQ 1.x和MetaQ 2.x階段,也是依賴Zookeeper的。但MetaQ 3.x(即RocketMQ)卻去掉了ZooKeeper依賴,轉而採用自己的NameServer。

因為 RocketMQ 的設計理念是簡單高效,並且 RocketMQ 的架構設計決定了它只需要一個輕量級的元數據服務器就足夠了,只需要保持最終一致,而不需要Zookeeper這樣的強一致性解決方案。這麼做的好處很明顯:不需要再依賴另一箇中間件,從而減少整體維護成本。

這裏可以稍微擴展下,其實選擇 NameServer 還是選擇 Zookeeper 代表了在分佈式系統中的設計側重點。

根據CAP理論, RocketMQ 在註冊中心這個模塊的設計上選擇了 AP 模式的 NameServer,而不是 CP 模式的 Zookeeper

不使用 zookeeper 的原因是當 RocketMQ 沒滿足A(可用性)帶來的影響比較大,影響穩定性

Zookeeper CP 的適用場景:

  • 分佈式選主,主備高可用切換等場景下有不可替代的作用,而這些需求往往多集中在大數據、離線任務等相關的業務領域,因為大數據領域,講究分割數據集,並且大部分時間分任務多進程 / 線程並行處理這些數據集,但是總是有一些點上需要將這些任務和進程統一協調,這時候就是 ZooKeeper 發揮巨大作用的用武之地。
  • 但是在交易場景交易鏈路上,在主業務數據存取,大規模服務發現、大規模健康監測等方面有天然的短板,應該竭力避免在這些場景下引入 ZooKeeper,在生產實踐中,應用對 ZooKeeper 申請使用的時候要進行嚴格的場景、容量、SLA 需求的評估。

NameServer 的適用場景:

  • NameServer作為一個名稱服務,需要提供服務註冊、服務剔除、服務發現這些基本功能,但是NameServer節點之間並不通信,容忍在某個時刻各個節點數據可能不一致的情況下 所以可以使用 CP,也可以使用AP,但是大數據使用CP,在線服務則AP,分佈式協調、選主使用CP,服務發現使用 AP

參考資料:

  • RocketMQ 4.8.0 源碼
  • https://github.com/apache/rocketmq/tree/master/docs/cn
  • https://github.com/DillonDong/notes/blob/master/RocketMQ/RocketMQ-03.md
  • 《RocketMQ 技術內幕》

最後

  • 如果覺得有收穫,三連支持下;
  • 文章若有錯誤,歡迎評論留言指出,也歡迎轉載,轉載請註明出處;
  • 個人vx:Listener27, 交流技術、面試、學習資料、幫助一線互聯網大廠內推等