Apache Druid 資料攝取---本地資料和kafka流式資料

語言: CN / TW / HK

Durid概述

Apache Druid是一個集時間序列資料庫、資料倉庫和全文檢索系統特點於一體的分析性資料平臺。本文將帶你簡單瞭解Druid的特性,使用場景,技術特點和架構。這將有助於你選型資料儲存方案,深入瞭解Druid儲存,深入瞭解時間序列儲存等。

Apache Druid是一個高效能的實時分析型資料庫。

上篇文章,我們瞭解了Druid的載入方式,

咱麼主要說兩種,一種是載入本地資料,一種是通過kafka載入流式資料。

資料攝取

4.1 載入本地檔案

我們匯入演示案例種的演示檔案

4.1.1.1 資料選擇

通過UI選擇local disk

file

並選擇Connect data

file

4.1.1.2 演示資料檢視

演示資料在quickstart/tutorial目錄下的wikiticker-2015-09-12-sampled.json.gz檔案 file

4.1.1.3 選擇資料來源

因為我們是通過imply安裝的,在Base directory輸入絕對路徑/usr/local/imply/imply-2021.05-1/dist/druid/quickstart/tutorial,File filter輸入wikiticker-2015-09-12-sampled.json.gz,並選擇apply應用配置,我們資料已經載入進來了

file

Base directoryFile filter 分開是因為可能需要同時從多個檔案中攝取資料。

4.1.1.4 載入資料

資料定位後,您可以點選"Next: Parse data"來進入下一步。

file

資料載入器將嘗試自動為資料確定正確的解析器。在這種情況下,它將成功確定json。可以隨意使用不同的解析器選項來預覽Druid如何解析您的資料。

4.1.2 資料來源規範配置
4.1.2.1 設定時間列

json 選擇器被選中後,點選 Next:Parse time 進入下一步來決定您的主時間列。

​ Druid的體系結構需要一個主時間列(內部儲存為名為_time的列)。如果您的資料中沒有時間戳,請選擇 固定值(Constant Value) 。在我們的示例中,資料載入器將確定原始資料中的時間列是唯一可用作主時間列的候選者。

這裡可以選擇時間列,以及時間的顯示方式

file

4.1.2.2 設定轉換器

在這裡可以新增虛擬列,將一個列的資料轉換成另一個虛擬列,這裡我們沒有設定,直接跳過

file

4.1.2.3 設定過濾器

這裡可以設定過濾器,對於某些資料可以不進行顯示,這裡我們也跳過

file

4.1.2.4 配置schema

Configure schema 步驟中,您可以配置將哪些維度和指標攝入到Druid中,這些正是資料在被Druid中攝取後出現的樣子。 由於我們的資料集非常小,關掉rollup、確認更改。

file

4.1.2.5 配置Partition

一旦對schema滿意後,點選 Next 後進入 Partition 步驟,該步驟中可以調整資料如何劃分為段檔案的方式,因為我們資料量非常小,這裡我們按照DAY進行分段

file

4.1.3 提交任務
4.1.3.1 釋出資料

點選完成 Tune 步驟,進入到 Publish 步,在這裡我們可以給我們的資料來源命名,這裡我們就命名為druid-sampled

file

點選下一步就可以檢視我們的資料規範

file

​ 這就是您構建的規範,為了檢視更改將如何更新規範是可以隨意返回之前的步驟中進行更改,同樣,您也可以直接編輯規範,並在前面的步驟中看到它。

4.1.3.2 提交任務

對攝取規範感到滿意後,請單擊 Submit,然後將建立一個數據攝取任務。

您可以進入任務檢視,重點關注新建立的任務。任務檢視設定為自動重新整理,請等待任務成功。

file

當一項任務成功完成時,意味著它建立了一個或多個段,這些段現在將由Data伺服器接收。

4.1.3.3 檢視資料來源

從標題導航到 Datasources 檢視,一旦看到綠色(完全可用)圓圈,就可以查詢資料來源。此時,您可以轉到 Query 檢視以對資料來源執行SQL查詢。

file

4.1.3.4 查詢資料

可以轉到查詢頁面進行資料查詢,這裡在sql視窗編寫sql後點擊執行就可以查詢資料了

file

4.2 kafka載入流式資料

4.2.1 安裝Kafka

這裡我們使用docker-compose的方式啟動kafka

4.2.1.1 編輯資源清單
vi docker-compose.yml

version: '2'
services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    ports: 
      - 2181:2181
  kafka:
    image: wurstmeister/kafka       ## 映象
    volumes: 
        - /etc/localtime:/etc/localtime ## 掛載位置(kafka映象和宿主機器之間時間保持一直)
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.64.190   ## 修改:宿主機IP
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       ## 卡夫卡執行是基於zookeeper的
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: 120
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DELETE_RETENTION_MS: 1000

4.2.2.2 啟動容器
docker-compose up -d

docker-compose ps

file

4.2.3 驗證kafka

啟動kafka後需要驗證kafka是否可用

4.2.3.1 登入容器

登入容器並進入指定目錄

#進入容器
docker exec -it kafka_kafka_1 bash

#進入 /opt/kafka_2.13-2.7.0/bin/ 目錄下
cd /opt/kafka_2.13-2.7.0/bin/

file

4.2.3.2 傳送訊息

執行客戶端傳送訊息,注意這裡的連線地址需要寫我們配置的宿主機地址

#執行kafka生產者傳送訊息
./kafka-console-producer.sh --broker-list 192.168.64.173:9092 --topic test

傳送的資料如下

{"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}

file

4.2.3.3 消費訊息

執行消費者消費訊息

./kafka-console-consumer.sh --bootstrap-server 192.168.64.173:9092 --topic test --from-beginning

file

有資料列印說明我們kafka安裝是沒有問題的

4.2.4 傳送資料到kafka
4.2.4.1 編寫程式碼

編寫程式碼傳送訊息到kafka中

@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 傳送訊息到kafka
     *
     * @param topic   主題
     * @param message 內容體
     */
    public void sendMsg(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

@RestController
@RequestMapping("/taxi")
public class KafkaController {
    @Autowired
    private KafkaSender kafkaSender;

    @RequestMapping("/batchTask/{num}")
    public String batchAdd(@PathVariable("num") int num) {
        for (int i = 0; i < num; i++) {
            Message message = Utils.getRandomMessage();
            kafkaSender.sendMsg("message", JSON.toJSONString(message));
        }
        return "OK";
    }
}

4.2.4.2 傳送訊息

使用postman 傳送訊息到kafka,訊息地址:http://localhost:8010/taxi/batchTask/10,訊息資料如下

file

顯示OK說明訊息已經發送到了kafka中

4.2.5 資料選擇
4.2.51 kafka資料檢視

在load頁面選擇kafka,進行資料攝取模式選擇

file

4.2.5.2 選擇資料來源

在這裡輸入ZK的地址以及需要選擇資料的topic

116.62.213.90:10903,116.62.213.90:10904

file

4.2.5.3 載入資料

點選apply應用配置,設定載入資料來源

file

4.2.6 資料來源規範配置
4.2.6.1 設定時間列

json 選擇器被選中後,點選 Next:Parse time 進入下一步來決定您的主時間列。

​ 因為我們的時間列有兩個建立時間以及打車時間,我們配置時間列為trvelDate

file

4.2.6.2 設定轉換器

在這裡可以新增虛擬列,將一個列的資料轉換成另一個虛擬列,這裡我們增加一個狀態的虛擬列,來顯示狀態的中文名稱我們定義 0:測試資料, 1:發起打車,2:排隊中,3:司機接單,4:乘客上車,5:完成打車

我們使用case_simple來實現判斷功能,更多判斷功能參考

case_simple(status,0,'測試資料',1,'發起打車',2,'排隊中',3,'司機接單',4,'完成打車','狀態錯誤')

在這裡我們新建了一個status_text的虛擬列來展示需要中文顯示的列

file

配置年齡預設值,如果為空我們設定為25

nvl(age,25)

file

配置性別設定,我們需要設定為男女,0:男,1:女,如果為null,我們設定為男

case_simple(nvl(sex,0),0,'男',1,'女','男')

file

4.2.6.3 設定過濾器

這裡可以設定過濾器,對於某些資料不展示,這裡我們使用區間過濾器選擇顯示status>=1的資料,具體表達式可用參考

 {
  "type" : "bound",
  "dimension" : "status",
  "ordering": "numeric",
  "lower": "1",
 }

因為我們把資料是0的測試資料不顯示了,所以只顯示了一條資料為1的資料

file

4.2.6.4 配置schema

Configure schema 步驟中,您可以配置將哪些維度和指標攝入到Druid中,這些正是資料在被Druid中攝取後出現的樣子。 由於我們的資料集非常小,關掉rollup、確認更改。

file

4.2.6.5 配置Partition

一旦對schema滿意後,點選 Next 後進入 Partition 步驟,該步驟中可以調整資料如何劃分為段檔案的方式,因為我們打車一般按照小時來算的,我們設定為分割槽為``hour

file

4.2.6.6 配置拉取方式

這裡設定kafka的拉取方式,主要設定偏移量的一些配置

​ 在 Tune 步驟中,將 Use earliest offset 設定為 True 非常重要,因為我們需要從流的開始位置消費資料。 其他沒有任何需要更改的地方,進入到 Publish

file

4.5.7 提交任務
4.2.7.1 釋出資料

點選完成 Tune 步驟,進入到 Publish 步,在這裡我們可以給我們的資料來源命名,這裡我們就命名為taxi-messagefile

點選下一步就可以檢視我們的資料規範

file

​ 這就是您構建的規範,為了檢視更改將如何更新規範是可以隨意返回之前的步驟中進行更改,同樣,您也可以直接編輯規範,並在前面的步驟中看到它。

4.2.7.2 提交任務

對攝取規範感到滿意後,請單擊 Submit,然後將建立一個數據攝取任務。

您可以進入任務檢視,重點關注新建立的任務。任務檢視設定為自動重新整理,請等待任務成功。

file

當一項任務成功完成時,意味著它建立了一個或多個段,這些段現在將由Data伺服器接收。

4.2.7.3 檢視資料來源

從標題導航到 Datasources 檢視,一旦看到綠色(完全可用)圓圈,就可以查詢資料來源。此時,您可以轉到 Query 檢視以對資料來源執行SQL查詢。

file

4.2.7.4 查詢資料

可以轉到查詢頁面進行資料查詢,這裡在sql視窗編寫sql後點擊執行就可以查詢資料了

file

4.2.7.5 動態新增資料

傳送一條資料到kafka

file

druid 查詢資料,發現新的資料已經進來了

file

4.2.8 清理資料
4.2.8.1 關閉叢集
# 進入impl安裝目錄
cd /usr/local/imply/imply-2021.05-1
# 關閉叢集
./bin/service --down

file

4.2.8.2 等待關閉服務

通過程序檢視,檢視服務是否已經關閉

 ps -ef|grep druid

file

4.2.8.3 清理資料

通過刪除druid軟體包下的var目錄的內容來重置叢集狀態

ll
rm -rf var

file

4.2.8.4 重新啟動叢集
 nohup bin/supervise -c conf/supervise/quickstart.conf > logs/quickstart.log 2>&1 &

4.2.8.5 檢視資料來源

登入後檢視資料來源,我們發現已經被重置了

file

本文由育博學谷狂野架構師釋出 如果本文對您有幫助,歡迎關注和點贊;如果您有任何建議也可留言評論或私信,您的支援是我堅持創作的動力 轉載請註明出處!