PyFlink 開發環境利器:Zeppelin Notebook

語言: CN / TW / HK

摘要:PyFlink 作為 Flink 的 Python 語言入口,其 Python 語言的確很簡單易學,但是 PyFlink 的開發環境卻不容易搭建,稍有不慎,PyFlink 環境就會亂掉,而且很難排查原因。今天給大家介紹一款能夠幫你解決這些問題的 PyFlink 開發環境利器:Zeppelin Notebook。主要內容為:


  1. 準備工作
  2. 搭建 PyFlink 環境
  3. 總結與未來

Tips:點選「閱讀原文」關注第三屆 Flink 極客挑戰賽~

 GitHub 地址 
歡迎大家給 Flink 點贊送 star~


也許你早就聽說過 Zeppelin,但是之前的文章都偏重講述如何在 Zeppelin 裡開發 Flink SQL,今天則來介紹下如何在 Zeppelin 裡高效的開發 PyFlink Job,特別是解決 PyFlink 的環境問題。

一句來總結這篇文章的主題,就是在 Zeppelin notebook 裡利用 Conda 來建立 Python env 自動部署到 Yarn 叢集中,你無需手動在叢集上去安裝任何 PyFlink 的包,並且你可以在一個 Yarn 叢集裡同時使用互相隔離的多個版本的 PyFlink。最後你能看到的效果就是這樣:

■ 1. 能夠在 PyFlink 客戶端使用第三方 Python 庫,比如 matplotlib:



■ 2. 可以在 PyFlink UDF 裡使用第三方 Python 庫,如:



接下來看看如何來實現。

一、準備工作


Step 1.


準備好最新版本的 Zeppelin 的搭建,這個就不在這邊展開了,如果有問題可以加入 Flink on Zeppelin 釘釘群 (34517043) 諮詢。另外需要注意的是,Zeppelin 部署叢集需要是 Linux,如果是 Mac 的話,會導致在 Mac 機器上打的 Conda 環境無法在 Yarn 叢集裡使用 (因為 Conda 包在不同系統間是不相容的)。

Step 2. 

載 Flink 1.13, 需要注意的是,本文的功能只能用在 Flink 1.13 以上版本,然後:


  • flink-Python-*.jar 這個 jar 包 copy 到 Flink 的 lib 資料夾下;


  • opt/Python 這個資料夾 copy 到 Flink 的 lib 資料夾下。


Step 3.


安裝以下軟體 (這些軟體是用於建立 Conda env 的):


  • miniconda:

    http://docs.conda.io/en/latest/miniconda.html


  • conda pack:

    http://conda.github.io/conda-pack/


  • mamba:

    http://github.com/mamba-org/mamba


二、搭建 PyFlink 環境


接下來就可以在 Zeppelin 裡搭建並且使用 PyFlink 了。

Step 1. 製作 JobManager 上的 PyFlink Conda 環境


因為 Zeppelin 天生支援 Shell,所以可以在 Zeppelin 裡用 Shell 來製作 PyFlink 環境。注意這裡的 Python 第三方包是在 PyFlink 客戶端 (JobManager) 需要的包,比如 M atplotlib 這些,並且確保至少安裝了下面這些包:

  • 某個版本的 Python (這裡用的是 3.7)


  • apache-flink (這裡用的是 1.13.1)


  • jupyter,grpcio,protobuf (這三個包是 Zeppelin 需要的)


剩下的包可以根據需要來指定:

   
   
   

%sh

# make sure you have conda and momba installed.
# install miniconda: http://docs.conda.io/en/latest/miniconda.html
# install mamba: http://github.com/mamba-org/mamba

echo "name: pyflink_env
channels:
- conda-forge
- defaults
dependencies:
- Python=3.7
- pip
- pip:
  - apache-flink==1.13.1
- jupyter
- grpcio
- protobuf
- matplotlib
- pandasql
- pandas
- scipy
- seaborn
- plotnine
" > pyflink_env.yml
   
mamba env remove -n pyflink_env
mamba env create -f pyflink_env.yml

執行下面的程式碼打包 PyFlink 的 Conda 環境並且上傳到 HDFS (注意這裡打包出來的檔案格式是 tar.gz):

%sh

rm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gz

hadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz



Step 2. 製作 TaskManager 上的 PyFlink Conda 環境


執行下面的程式碼來建立 TaskManager 上的 PyFlink Conda 環境,TaskManager 上的 PyFlink 環境至少包含以下 2 個包:

  • 某個版本的 Python (這裡用的是 3.7)


  • apache-flink (這裡用的是 1.13.1)


剩下的包是 Python UDF 需要依賴的包,比如這 裡指定了 pandas:

echo "name: pyflink_tm_env
channels:
- conda-forge
- defaults
dependencies:
- Python=3.7
- pip
- pip:
  - apache-flink==1.13.1
- pandas
" > pyflink_tm_env.yml
   
mamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.yml

執行下面的程式碼打包 PyFlink 的 Conda 環境並且上傳到 HDFS (注意這裡使用的是 zip 格式):

%sh

rm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.zip

hadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip


Step 3. 在 PyFlink 中使用 Conda 環境


接下來就可以在 Zeppelin 中使用上面建立的 Conda 環境了,首先需要在 Zeppelin 裡配置 Flink,主要配置的選項有:

  • flink.execution.mode 為 yarn-application, 本文所講的方法只適用於 yarn-application 模式;


  • 指定 yarn.ship-archiveszeppelin.pyflink.Python 以及 zeppelin.interpreter.conda.env.name 來配置 JobManager 側的 PyFlink Conda 環境;


  • 指定 Python.archives 以及 Python.executable 來指定 TaskManager 側的 PyFlink Conda 環境;


  • 指定其他可選的 Flink 配置,比如這裡的 flink.jm.memoryflink.tm.memory


   
   
   

%flink.conf


flink.execution.mode yarn-application

yarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz
zeppelin.pyflink.Python pyflink_env.tar.gz/bin/Python
zeppelin.interpreter.conda.env.name pyflink_env.tar.gz

Python.archives hdfs:///tmp/pyflink_tm_env.zip
Python.executable pyflink_tm_env.zip/bin/Python3.7

flink.jm.memory 2048
flink.tm.memory 2048

接下來就可以如一開始所說的那樣在 Zeppelin 裡使用 PyFlink 以及指定的 Conda 環境了。有 2 種場景:

  • 下面的例子裡,可以在 PyFlink 客戶端 (JobManager 側) 使用上面建立的 JobManager 側的 Conda 環境,比如下邊使用了 Matplotlib。



  • 下面的例子是在 PyFlink UDF 裡使用上面建立的 TaskManager 側 Conda 環境裡的庫,比如下面在 UDF 裡使用 Pandas。



三、總結與未來


本文內容就是在 Zeppelin notebook 裡利用 Conda 來建立 Python env 自動部署到 Yarn 叢集中,無需手動在叢集上去安裝任何 Pyflink 的包,並且可以在一個 Yarn 叢集裡同時使用多個版本的 PyFlink。

每個 PyFlink 的環境都是隔離的,而且可以隨時定製更改 Conda 環境。可以下載下面這個 note 並匯入到 Zeppelin,就可以復現今天講的內容:http://23.254.161.240/#/notebook/2G8N1WTTS

此外還有很多可以改進的地方:

  • 目前我們需要建立 2 個 Conda env ,原因是 Zeppelin 支援 tar.gz 格式,而 Flink 只支援 zip 格式。等後期兩邊統一之後,只要建立一個 Conda env 就可以;


  • apache-flink 現在包含了 Flink 的 jar 包,這就導致打出來的 Conda env 特別大,Yarn container 在初始化的時候耗時會比較長,這個需要 Flink 社群提供一個輕量級的 Python 包 (不包含 Flink jar 包),就可以大大減小 Conda env 的大小。


希望瞭解更多 Flink on Zeppelin 使用的同學可以加入下面的釘釘群來討論。



第三屆 Apache Flink 極客挑戰賽報名開始!
30 萬獎金等你來!

伴隨著海量資料的衝擊,資料處理分析能力在業務中的價值與日俱增,各行各業對於資料處理時效性的探索也在不斷深入,作為主打實時計算的計算引擎 - Apache Flink 應運而生。


為給行業帶來更多實時計算賦能實踐的思路,鼓勵廣大熱愛技術的開發者加深對 Flink 的掌握,Apache Flink 社群聯手阿里雲、英特爾、阿里巴巴人工智慧治理與可持續發展實驗室 (AAIG)、Occlum 聯合舉辦 "第三屆 Apache Flink 極客挑戰賽暨 AAIG CUP" 活動,即日起正式啟動。



▼ 掃描二維碼,瞭解更多賽事資訊 






更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群~
▼ 關注「Flink 中文社群」,獲取更多技術乾貨 

   戳我,瞭解 Flink 挑戰賽資訊~

本文分享自微信公眾號 - Flink 中文社群(gh_5efd76d10a8d)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。