技術乾貨|如何將 Pulsar 資料快速且無縫接入 Apache Doris

語言: CN / TW / HK

導讀:Apache Doris Routine Load 支援了將 Kafka 資料接入 Apache Doris,並保障了資料接入過程中的事務性操作。Apache Pulsar 定位為一個雲原生時代企業級的訊息釋出和訂閱系統。那麼 Apache Pulsar 使用者如何將資料接入 Apache Doris 呢?本次分享將介紹利用 KoP 如何將 Pulsar 資料快速且無縫接入 Apache Doris。

KoP 架構介紹:


KoP 是 Kafka on Pulsar 的簡寫,顧名思義就是如何在 Pulsar 上實現對 Kafka 資料的讀寫。KoP 將 Kafka 協議處理外掛引入 Pulsar Broker 來實現 Apache Pulsar 對 Apache Kafka 協議的支援。將 KoP 協議處理外掛新增到現有 Pulsar 集群后,使用者不用修改程式碼就可以將現有的 Kafka 應用程式和服務遷移到 Pulsar。

Apache Pulsar 主要特點如下:

  • 利用企業級多租戶特性簡化運營。
  • 避免資料搬遷,簡化操作。
  • 利用 Apache BookKeeper 和分層儲存持久保留事件流。
  • 利用 Pulsar Functions 進行無伺服器化事件處理。

KoP 架構如下圖,通過圖可以看到 KoP 引入一個新的協議處理外掛,該協議處理外掛利用 Pulsar 的現有元件(例如 Topic 發現、分散式日誌庫-ManagedLedger、cursor 等)來實現 Kafka 傳輸協議。

Routine Load 訂閱 Pulsar 資料思路


Apache Doris Routine Load 支援了將 Kafka 資料接入 Apache Doris,並保障了資料接入過程中的事務性操作。Apache Pulsar 定位為一個雲原生時代企業級的訊息釋出和訂閱系統,已經在很多線上服務使用。那麼 Apache Pulsar 使用者如何將資料資料接入 Apache Doris 呢,答案是通過 KoP 實現。

由於 Kop 直接在 Pulsar 側提供了對 Kafka 的相容,那麼對於 Apache Doris 來說可以像使用 Kafka 一樣使用 Plusar。整個過程對於 Apache Doris 來說無需任務改變,就能將 Pulsar 資料接入 Apache Doris,並且可以獲得 Routine Load 的事務性保障。

--------------------------
|     Apache Doris       |
|     ---------------    |
|     | Routine Load |   |
|     ---------------    |
--------------------------
            |Kafka Protocol(librdkafka)
------------v--------------
|     ---------------    |
|     |     KoP      |   |
|     ---------------    |
|       Apache Pulsar    |
--------------------------

操作實戰


1. Pulsar Standalone 安裝環境準備:

  1. JDK 安裝:略
  2. 下載 Pulsar 二進位制包,並解壓:
#下載
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#解壓並進入安裝目錄
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0

2. KoP 元件編譯和安裝:

  1. 下載 KoP 原始碼
git clone https://github.com/streamnative/kop.git
cd kop

  1. 編譯 KoP 專案:
mvn clean install -DskipTests

  1. protocols 配置:在解壓後的 apache-pulsar 目錄下建立 protocols文 件夾,並把編譯好的 nar 包複製到 protocols 資料夾中。
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols

  1. 新增後的結果檢視:
[[email protected] apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar

3. KoP 配置新增:

  1. 在 standalone.conf 或者 broker.conf 新增如下配置
#kop適配的協議
messagingProtocols=kafka
#kop 的NAR檔案路徑
protocolHandlerDirectory=./protocols
#是否允許自動建立topic
allowAutoTopicCreationType=partitioned

  1. 新增如下服務監聽配置
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0 
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false

當出現如下錯誤:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.

新增如下配置,開啟 transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true

這個錯誤一定要修復,不然看到的現象就是使用 Kafka 自帶的工具:bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh在Pulsar 上進行資料的生產和消費正常,但是在 Apache Doris 中資料無法同步過來。

4. Pulsar 啟動

#前臺啟動
#bin/pulsar standalone
#後臺啟動
pulsar-daemon start standalone

5. 建立 Doris 資料庫和建表

#進入Doris
mysql -u root  -h 127.0.0.1 -P 9030
# 建立資料庫
create database pulsar_doris;
#切換資料庫
use pulsar_doris;
#建立clicklog表
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
    `clickTime` DATETIME NOT NULL COMMENT "點選時間",
    `type` String NOT NULL COMMENT "點選型別",
    `id`  VARCHAR(100) COMMENT "唯一id",
    `user` VARCHAR(100) COMMENT "使用者名稱稱",
    `city` VARCHAR(50) COMMENT "所在城市"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

6. 建立 Routine Load 任務

CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list" = "127.0.0.1:9092",
    "kafka_topic" = "test",
    "property.group.id" = "doris"
 );

上述命令中的引數解釋如下:

  • pulsar_doris :Routine Load 任務所在的資料庫
  • load_from_pulsar_test:Routine Load 任務名稱
  • clicklog:Routine Load 任務的目標表,也就是配置 Routine Load 任務將資料匯入到 Doris 哪個表中。
  • strict_mode:匯入是否為嚴格模式,這裡設定為 False。
  • format:匯入資料的型別,這裡配置為 Json。
  • kafka_broker_list:Kafka Broker 服務的地址
  • kafka_broker_list:Kafka Topic 名稱,也就是同步哪個 Topic 上的資料。
  • property.group.id:消費組 ID

7. 資料匯入和測試

  1. 資料匯入

    構造一個 ClickLog 的資料結構,並呼叫 Kafka 的 Producer 傳送 5000 萬條資料到 Pulsar。

ClickLog 資料結構如下:

public class ClickLog {
    private String id;
    private String user;
    private String city;
    private String clickTime;
    private String type;
    ... //省略getter和setter
   }

訊息構造和傳送的核心程式碼邏輯如下:

       String strDateFormat = "yyyy-MM-dd HH:mm:ss";
       @Autowired
       private Producer producer;
        try {
            for(int j =0 ; j<50000;j++){
              int batchSize = 1000;
                for(int i = 0 ; i<batchSize ;i++){
                    ClickLog clickLog  = new ClickLog();
                    clickLog.setId(UUID.randomUUID().toString());
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
                    clickLog.setClickTime(simpleDateFormat.format(new Date()));
                    clickLog.setType("webset");
                    clickLog.setUser("user"+ new Random().nextInt(1000) +i);
                    producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

  1. ROUTINE LOAD 任務檢視 執行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;命令,檢視匯入任務的狀態。
mysql>  SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;
*************************** 1. row ***************************
                  Id: 87873
                Name: load_from_pulsar_test
          CreateTime: 2022-05-31 12:03:34
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:pulsar_doris
           TableName: clicklog1
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
    CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
           Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
            Progress: {"0":"51139566"}
                 Lag: {"0":0}
ReasonOfStateChanged: 
        ErrorLogUrls: 
            OtherMsg: 
1 row in set (0.00 sec)
ERROR: 
No query specified

從上面結果可以看到 totalRows 為 50000000,errorRows 為 0。說明資料不丟不重的匯入 Apache Doris 了。

  1. 資料統計驗證 執行如下命令統計表中的資料,發現統計的結果也是 50000000,符合預期。
mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql> 

通過 KoP 我們實現了將 Apache Pulsar 資料無縫接入 Apache Doris ,無需對 Routine Load 任務進行任何修改,並保障了資料匯入過程中的事務性。與此同時,Apache Doris 社群已經啟動了 Apache Pulsar 原生匯入支援的設計,相信在不久後就可以直接訂閱 Pulsar 中的訊息資料,並保證資料匯入過程中的 Exactly-Once 語義。

加入社群

如果你對 Apache Doris 感興趣,請加入 Doris 社群交流群。歡迎更多的開源技術愛好者加入 Apache Doris 社群,攜手成長,共建社群生態。

圖片

圖片

SelectDB 是一家開源技術公司,致力於為 Apache Doris 社群提供一個由全職工程師、產品經理和支援工程師組成的團隊,繁榮開源社群生態,打造實時分析型資料庫領域的國際工業界標準。基於 Apache Doris 研發的新一代雲原生實時數倉 SelectDB,運行於多家雲上,為使用者和客戶提供開箱即用的能力。

相關連結:

SelectDB 官方網站:

https://selectdb.com

Apache Doris 官方網站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 開發者郵件組:

[email protected]

「其他文章」