在Zeppelin中如何使用Flink

語言: CN / TW / HK

Zeppelin 版本:0.9.0

1. 簡介

當前 Zeppelin 穩定版本為 0.9.0 版本,重構了 Flink Interpreter 以支援最新版本的 Flink。目前僅支援 Flink 1.10+ 版本,不支援舊版本的 Flink。此外也僅支援 scala-2.11,還不支援 scala-2.12。在此 Flink 我們選擇適用於 scala-2.11 的 1.12.4 版本。

0.9.0 版本不支援 Flink 1.13 版本

本文中我們只介紹如何在 Zeppelin 上設定 flink 並以 3 種不同的執行模式執行基本的 Flink 程式。Zeppelin 的安裝可以參考Zeppelin 安裝與啟動博文,Flink 的安裝參考 Flink 安裝與啟動 博文。

2. 配置引數

接下來就是進入 Interpreter 設定頁面,配置 flink 直譯器。在 flink 直譯器中可以設定很多屬性,現在我們簡單瞭解一下在 Zeppelin 中使用 Flink 所需要的一些配置引數。Flink Interpreter 可以使用 Zeppelin 提供的屬性進行配置,如下所示。除此之外也可以新增和設定表中未列出的其他 flink 屬性。

在不同執行模式下使用的配置引數是不一樣的

3. 執行模式

接下來就需要配置 Flink Interpreter。在 Zeppelin 中可以使用 3 種不同的 Flink 叢集模式:

  • Local

  • Remote

  • Yarn

下面將分別說明如何在這 3 種模式下配置 Flink Interpreter。

3.1 Local 模式

Local 模式是最簡單的 Flink 執行模式,我們只需要下載 Flink 1.10 或更新版本,然後解壓縮即可。不需要在 Flink 的 lib 檔案下下載任何 connector jar,也不需要修改 flink-conf.xml,我們從最簡單的配置開始,以防出現奇怪問題,讓問題排查變得複雜。最重要的是指定 FLINK_HOME 以及 設定 flink.execution.mode 為 local 即可:

3.2 Remote 模式

Flink 的 Remote 模式會連線一個已經建立好的 Flink 叢集。我們可以先啟動一個 Flink 叢集,然後在 Zeppelin 中指定叢集的地址。除了配置上面我們說的 FLINK_HOME 以及 flink.execution.mode(唯一不同的是在這裡需要指定為 remote )外,還需要配置 flink.execution.remote.host 和 flink.execution.remote.port (對應 Flink 叢集的 rest.port 配置)來指定 JobManager 的 Rest API 地址(Zeppelin是通過這個 Rest API 來提交 Flink Job):

3.3 Yarn 模式

Flink 的 Yarn 模式會在 Yarn 叢集中動態建立一個 Flink Cluster,然後你就可以往這個 Flink Session Cluster 提交 Flink Job 了。除了配置上面我們說的 FLINK_HOME 以及 flink.execution.mode(唯一不同的是在這裡需要指定為 yarn)外,還需要配置 HADOOP_CONF_DIR,並且要確保 Zeppelin 這臺機器可以訪問你的 Hadoop 叢集:

請確保 Hadoop 命令在我們的 PATH 環境變數彙總。因為 Flink 內部會呼叫 Hadoop CLASSPATH 並在 Flink Interpreter程序中載入所有與 Hadoop 相關的 jar。

4. 執行 Flink 程式

配置完 flink Interpreter 後,我們就可以在 Zeppelin 中執行 flink 程式了。flink Interpreter 中有 6 個子直譯器,每個子直譯器用於不同的場景:

Zeppelin 自帶的 demo 程式中預設使用的 %flink。如果使用 SQL 來完成 flink 程式,可以使用 %flink.ssql 或者 %flink.bsql,後面我們會詳細講解如何在 Zeppelin 中使用 Flink SQL。

現在我們分別看一下在不同執行模式下執行 flink 的情況。

4.1 Local 模式

如下所示我們執行 flink Streaming WordCount 程式:

在這種模式會在本地啟動一個 MiniCluster(本地新生成一個叢集,會以執行緒的方式跑在 Flink Interpreter 程序中),不會直接使用我們已經建立的 Flink 叢集:

停止 Flink Interpreter 會銷燬 Flink Cluster。

在這種情況下,MiniCluster 的 JobManager 需要使用 8081 埠作為 Rest API 的埠,如果這個埠被其他程序佔用,那麼就會碰到如下錯誤:

一種比較大的可能性是你正好在本地啟動了 Flink 的 Standalone cluster。因為 Standalone 模式下的 JobManager 預設也是使用 8081 埠。所以如果是碰到這種錯誤,那麼檢查下你是否在本地起了一個 Flink Standalone 叢集,或者有沒有其他程式使用了8081埠。

一般情況下會修改 rest.port 的預設埠號。

4.2 Remote 模式

Remote 模式跟 Local 模式不一樣,不是 Zeppelin 幫我們建立的 Cluster,是我們在外部單獨啟動的 Flink Cluster,停止 Flink Interpreter 並不會銷燬 Flink Cluster。如下所示我們執行 flink Streaming WordCount 程式:

啟動程式之後,在我們單獨啟動的 Flink Cluster 上就會多一個 flink 作業:

4.3 Yarn 模式

在 Yarn 模式下,當我們啟動 Flink Interpreter 的時候就會在 Yarn 中建立 Yarn Session Cluster,當你停止或者重啟 Flink Interpreter 的時候就會銷燬這個 Yarn Session Cluster。

如下所示我們執行 flink Streaming WordCount 程式:

啟動程式之後,在我們 Yarn Session Cluster 上就會多一個 flink 作業:

也可以通過 Yarn 中的 ApplicationMaster URL 地址跳轉到 Flink UI 上:

歡迎關注我的公眾號和部落格:

參考:

  • Flink interpreter for Apache Zeppelin

  • Flink 執行模式