Kafka Connect | 無縫結合Kafka構建高效ETL方案

語言: CN / TW / HK

很多同學可能沒有接觸過 Kafka Connect,大家要注意不是Connector。

Kafka Connect 是一款可擴充套件並且可靠地在 Apache Kafka 和其他系統之間進行資料傳輸的工具。

背景

Kafka connect是Confluent公司(當時開發出Apache Kafka的核心團隊成員出來創立的新公司)開發的confluent platform的核心功能。可以很簡單的快速定義 connectors 將大量資料從 Kafka 移入和移出. Kafka Connect 可以攝取資料庫資料或者收集應用程式的 metrics 儲存到 Kafka topics,使得資料可以用於低延遲的流處理。一個匯出的 job 可以將來自 Kafka topic 的資料傳輸到二級儲存,用於系統查詢或者批量進行離線分析。

大家都知道現在資料的ETL過程經常會選擇kafka作為訊息中介軟體應用在離線和實時的使用場景中,而kafka的資料上游和下游一直沒有一個。無縫銜接的pipeline來實現統一,比如會選擇flume 或者 logstash 採集資料到kafka,然後kafka又通過其他方式pull或者push資料到目標儲存。而kafka connect旨在圍繞kafka構建一個可伸縮的,可靠的資料流通道,通過 Kafka connect可以快速實現大量資料進出kafka從而和其他源資料來源或者目標資料來源進行互動構造一個低延遲的資料pipeline.給個圖更直觀點,大家感受下。

Kafka Connect 功能包括:

  • Kafka connectors 通用框架:- Kafka Connect 將其他資料系統和Kafka整合標準化,簡化了 connector 的開發,部署和管理

  • 分散式和單機模式 - 可以擴充套件成一個集中式的管理服務,也可以單機方便的開發,測試和生產環境小型的部署。

  • REST 介面 - 通過易於使用的REST API提交和管理connectors到您的Kafka Connect叢集

  • offset 自動管理 - 只需要connectors 的一些資訊,Kafka Connect 可以自動管理offset 提交的過程,因此開發人員無需擔心開發中offset提交出錯的這部分。

  • 分散式的並且可擴充套件 - Kafka Connect 構建在現有的 group 管理協議上。Kafka Connect 叢集可以擴充套件新增更多的workers。

  • 整合流處理/批處理 - 利用 Kafka 已有的功能,Kafka Connect 是一個橋接stream 和批處理系統理想的方式。

Kafka Connect的適用場景

聯結器和普通的生產者消費者模式有什麼區別呢?似乎兩種方式都可以達到目的。可能第一次接觸connect的人都會由此疑問。在《kafka權威指南》這本書裡,作者給出了建議:

如果你是開發人員,你會使用 Kafka 客戶端將應用程式連線到Kafka ,井修改應用程式的程式碼,將資料推送到 Kafka 或者從 Kafka 讀取資料。如果要將 Kafka 連線到資料儲存系統,可以使用 Connect,因為這些系統不是你開發的,構建資料管道 I 10s你無能或者也不想修改它們的程式碼。Connect 可以用於從外部資料儲存系統讀取資料, 或者將資料推送到外部儲存系統。如果資料儲存系統提供了相應的聯結器,那麼非開發人員就可以通過配置聯結器的方式來使用 Connect。

如果你要連線的資料儲存系統沒有相應的聯結器,那麼可以考慮使用客戶端 API 或 Connect API 開發一個應用程式。我們建議首選 Connect,因為它提供了一些開箱即用的特性,比如配置管理、偏移量儲存、井行處理、錯誤處理,而且支援多種資料型別和標準的 REST 管理 API。開發一個連線 Kafka 和外部資料儲存系統的小應用程式看起來很簡單,但其實還有很多細節需要處理,比如資料型別和配置選項,這些無疑加大了開發的複雜性一Connect 處理了大部分細節,讓你可以專注於資料的傳輸。

Kafka Connect架構和元件

Kafka connect的幾個重要的概念包括:connectors、tasks、workers、converters和transformers。

  • Connectors-通過管理任務來細條資料流的高階抽象

  • Tasks- 資料寫入kafka和資料從kafka讀出的實現

  • Workers-執行connectors和tasks的程序

  • Converters- kafka connect和其他儲存系統直接傳送或者接受資料之間轉換資料

1) Connectors:在kafka connect中,connector決定了資料應該從哪裡複製過來以及資料應該寫入到哪裡去,一個connector例項是一個需要負責在kafka和其他系統之間複製資料的邏輯作業,connector plugin是jar檔案,實現了kafka定義的一些介面來完成特定的任務。

2) Tasks:task是kafka connect資料模型的主角,每一個connector都會協調一系列的task去執行任務,connector可以把一項工作分割成許多的task,然後再把task分發到各個worker中去執行(分散式模式下),task不自己儲存自己的狀態資訊,而是交給特定的kafka 主題去儲存(config.storage.topic 和status.storage.topic)。在分散式模式下有一個概念叫做任務再平衡(Task Rebalancing),當一個connector第一次提交到叢集時,所有的worker都會做一個task rebalancing從而保證每一個worker都運行了差不多數量的工作,而不是所有的工作壓力都集中在某個worker程序中,而當某個程序掛了之後也會執行task rebalance。

3) Workers:connectors和tasks都是邏輯工作單位,必須安排在程序中執行,而在kafka connect中,這些程序就是workers,分別有兩種worker:standalone和distributed。這裡不對standalone進行介紹,具體的可以檢視官方文件。我個人覺得distributed worker很棒,因為它提供了可擴充套件性以及自動容錯的功能,你可以使用一個group.ip來啟動很多worker程序,在有效的worker程序中它們會自動的去協調執行connector和task,如果你新加了一個worker或者掛了一個worker,其他的worker會檢測到然後在重新分配connector和task。

4) Converters:converter會把bytes資料轉換成kafka connect內部的格式,也可以把kafka connect內部儲存格式的資料轉變成bytes,converter對connector來說是解耦的,所以其他的connector都可以重用,例如,使用了avro converter,那麼jdbc connector可以寫avro格式的資料到kafka,當然,hdfs connector也可以從kafka中讀出avro格式的資料。

5) Connector可以配置轉換,以便對單個訊息進行簡單且輕量的修改。這對於小資料的調整和事件路由十分方便,且可以在connector配置中將多個轉換連結在一起。然而,應用於多個訊息的更復雜的轉換最好使用KSQL和Kafka Stream實現。轉換是一個簡單的函式,輸入一條記錄,並輸出一條修改過的記錄。Kafka Connect提供許多轉換,它們都執行簡單但有用的修改。可以使用自己的邏輯定製實現轉換介面,將它們打包為Kafka Connect外掛,將它們與connector一起使用。當轉換與source connector一起使用時,Kafka Connect通過第一個轉換傳遞connector生成的每條源記錄,第一個轉換對其進行修改並輸出一個新的源記錄。將更新後的源記錄傳遞到鏈中的下一個轉換,該轉換再生成一個新的修改後的源記錄。最後更新的源記錄會被轉換為二進位制格式寫入到kafka。轉換也可以與sink connector一起使用。

安裝和初體驗

Kafka Connect 當前支援兩種執行方式,單機(單個程序)和分散式。

1、單機模式

./connect-standalone.sh ../config/connect-file.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties

2、分散式

  • 下載相應的第三方Connect後打包編譯。

  • 將jar丟到Kafka的libs目錄下。

  • 啟動connector。

  • 使用Rest API提交connector配置。

./connect-distributed.sh ../config/connect-distributed.properties

由於Kafka Connect 旨在作為服務執行,它還提供了一個用於管理 connectors 的REST API。預設情況下,此服務在埠8083上執行,支援的一些介面列表如圖:

下面我們按照官網的步驟來實現Kafka Connect官方案例,使用Kafka Connect把Source(test.txt)轉為流資料再寫入到Destination(test.sink.txt)中。如下圖所示:

本例使用到了兩個Connector:

  • FileStreamSource:從test.txt中讀取併發布到Broker中

  • FileStreamSink:從Broker中讀取資料並寫入到test.sink.txt檔案中

其中的Source使用到的配置檔案是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

其中的Sink使用到的配置檔案是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Broker使用到的配置檔案是${KAFKA_HOME}/config/connect-standalone.properties

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

然後我們分別啟動Source Connector和Sink Connector:

./bin/connect-standalone.sh 

config/connect-standalone.properties 

config/connect-file-source.properties 

config/connect-file-sink.properties

然後我們向檔案寫入一些資料:

echo 'hello flink01' >> test.txt 

echo 'hello flink02' >> test.txt

然後我們就可以在目標檔案中看到:

cat test.sink.txt

hello flink01
hello flink02

我們在下篇文章中將更為詳細的介紹Kafka Connect在實際生產中的應用以及在各大公司的使用情況。

本文分享自微信公眾號 - 浪尖聊大資料(bigdatatip)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。