Zeppelin整合Flink採坑實錄

語言: CN / TW / HK

I.前言

前兩天轉了章大的zeppelin系列教程(以下簡稱“教程”),我也好好的研究學習了一波。

我曾無數次鼓吹基於Jupyter的應用,也相信在未來資料分析領域,他會有自己的一席之地. 對話式的管家服務,真是誰用誰知道...

以下內容摘自 “教程

下面是Zeppelin和Flink的故事。

Flink問:雖然我提供了多種語言支援,有SQL,Java,Scala還有Python,但是每種語言都有自己的入口,使用者很難多種語言混著用。比如在sql-client中只能執行Sql,不能寫UDF,在pyflink shell裡,只能用python的udf,不能用scala和java的udf。有沒有誰能幫我把這些語言全部打通。

Zeppelin答:我可以。

Flink問:我的一個很大的使用場景是實時大屏,但是我一個人辦不到,往往需要藉助第三方儲存,還需要前端開發,有沒有誰能讓使用者不用寫前端程式碼就實現實時大屏

Zeppelin答:我可以。

Flink問:我的Sql已經很強大了,但是使用者在sql-client裡不能寫comment,而且不支援執行多條sql語句,有誰能幫我把這些功能補齊下。

Zeppelin答:我可以。

Flink問 :好多初學者說要跑一個flink job實在是太難了,好多東西需要配置,還要學習各種命令列,有沒有誰能讓使用者更容易得提交和管理Flink Job。

Zeppelin答 :我可以。

Flink問 :Flink Job提交目前只能一個個提交,一個job跑完跑另外一個,有些使用者想並行執行多個Flink Job,誰能幫我搞定這個需求?

Zeppelin答 :我可以。

Flink問 :我有豐富的connector,但是使用者每次都要把connector打包到uber jar裡,或者copy到flink的lib下,但是這樣會把各種connector jar混在一起,容易發生衝突,很難管理,有誰能提供一個乾淨點的方案?

Zeppelin答 :我可以。

II.填坑

Zepplin 0.9版本雖然已經做的足夠出色了,但是還是有很多隱含的條件(坑),對新人還是不那麼友好的,我在研習“教程”的時候,也就稍微總結了一下:

  • 版本:

目前zepplin 0.9 preview 整合flink,只能使用 Apache Flink 1.10.1 for Scala 2.11  ,不能使用scala2.12

  • 環境:

    實驗的話,需要在linux下嘗試,windows是不支援,儘管他都有windows下的啟動指令碼.

  • FLINK_HOME

在interpret裡設定FLINK_HOME,指向你的Flink,切記1.10.1 scala2.11版本

  • Kafka Connect Datagen

    使用提供的docker映象來做kafka叢集,提供資料,安裝docker不在這裡說了,可能啟動正常,但是沒有datagenconnector

說明,docker-compose.yml裡這兩句沒起作用

- ./plugins:/tmp/connect-plugins

- ./data:/tmp/data

執行下面語句就ok了。

setenforce 0 

另外,預設配置裡

KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092'

CONNECT_BOOTSTRAP_SERVERS: '192.168.16.3:9092'

可能不剩下,使用下面語句,找到broker的ip,替換broker

docker exec -it ID/NAMES ip addr

建立kafka connector時,使用官方語句可能不太好事,可以使用UI即本機IP:8000 來進行配置,原文提供的 connect.source.datagen.json 也有些不太好使,我修改了一下,如下:

{

"connector.class": "com.github.xushiyan.kafka.connect.datagen.performance.DatagenConnector",

"tasks.max": "1",

"random.fields": "status:foo|bar|baz, direction:up|down|left|right",

"event.timestamp.field": "event_ts",

"poll.size": "10",

"key.converter.schemas.enable": "false",

"poll.interval.ms": "5000",

"value.converter.schemas.enable": "false",

"topic.name": "generated.events",

"value.converter": "org.apache.kafka.connect.storage.StringConverter",

"message.template": "{\"status\":\"foo\",\"direction\":\"up\"}",

"key.converter": "org.apache.kafka.connect.storage.StringConverter"

}

如下圖所示

可以在Topic UI裡可以看到資料,就證明這裡配置ok了

  • Streaming ETL

記住先執行:

%flink.conf # You need to run this paragraph first before running any flink code. flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.10.1,org.apache.flink:flink-connector-kafka-base_2.11:1.10.1,org.apache.flink:flink-json:1.10.1

上面的坑都填好了,就能愉快的玩耍了

感謝 章大 在釘釘群裡耐心的解答。

目前只踩到這裡,繼續加油,奧利給!!!