伴隨著海量資料的衝擊,資料處理分析能力在業務中的價值與日俱增,各行各業對於資料處理時效性的探索也在不斷深入,作為主打實時計算的計算引擎 - Apache Flink 應運而生。
PyFlink 開發環境利器:Zeppelin Notebook
摘要:PyFlink 作為 Flink 的 Python 語言入口,其 Python 語言的確很簡單易學,但是 PyFlink 的開發環境卻不容易搭建,稍有不慎,PyFlink 環境就會亂掉,而且很難排查原因。今天給大家介紹一款能夠幫你解決這些問題的 PyFlink 開發環境利器:Zeppelin Notebook。主要內容為:
-
準備工作 -
搭建 PyFlink 環境 -
總結與未來
■ 1. 能夠在 PyFlink 客戶端使用第三方 Python 庫,比如 matplotlib:
■ 2. 可以在 PyFlink UDF 裡使用第三方 Python 庫,如:
一、準備工作
Step 1.
把 flink-Python-*.jar 這個 jar 包 copy 到 Flink 的 lib 資料夾下;
把 opt/Python 這個資料夾 copy 到 Flink 的 lib 資料夾下。
Step 3.
miniconda:
http://docs.conda.io/en/latest/miniconda.html
conda pack:
http://conda.github.io/conda-pack/
mamba:
http://github.com/mamba-org/mamba
二、搭建 PyFlink 環境
Step 1. 製作 JobManager 上的 PyFlink Conda 環境
某個版本的 Python (這裡用的是 3.7)
apache-flink (這裡用的是 1.13.1)
jupyter,grpcio,protobuf (這三個包是 Zeppelin 需要的)
%sh
# make sure you have conda and momba installed.
# install miniconda: http://docs.conda.io/en/latest/miniconda.html
# install mamba: http://github.com/mamba-org/mamba
echo "name: pyflink_env
channels:
- conda-forge
- defaults
dependencies:
- Python=3.7
- pip
- pip:
- apache-flink==1.13.1
- jupyter
- grpcio
- protobuf
- matplotlib
- pandasql
- pandas
- scipy
- seaborn
- plotnine
" > pyflink_env.yml
mamba env remove -n pyflink_env
mamba env create -f pyflink_env.yml
%sh
rm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gz
hadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz
Step 2. 製作 TaskManager 上的 PyFlink Conda 環境
某個版本的 Python (這裡用的是 3.7)
apache-flink (這裡用的是 1.13.1)
echo "name: pyflink_tm_env
channels:
- conda-forge
- defaults
dependencies:
- Python=3.7
- pip
- pip:
- apache-flink==1.13.1
- pandas
" > pyflink_tm_env.yml
mamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.yml
%sh
rm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.zip
hadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip
Step 3. 在 PyFlink 中使用 Conda 環境
flink.execution.mode 為 yarn-application, 本文所講的方法只適用於 yarn-application 模式;
指定 yarn.ship-archives,zeppelin.pyflink.Python 以及 zeppelin.interpreter.conda.env.name 來配置 JobManager 側的 PyFlink Conda 環境;
指定 Python.archives 以及 Python.executable 來指定 TaskManager 側的 PyFlink Conda 環境;
指定其他可選的 Flink 配置,比如這裡的 flink.jm.memory 和 flink.tm.memory。
%flink.conf
flink.execution.mode yarn-application
yarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz
zeppelin.pyflink.Python pyflink_env.tar.gz/bin/Python
zeppelin.interpreter.conda.env.name pyflink_env.tar.gz
Python.archives hdfs:///tmp/pyflink_tm_env.zip
Python.executable pyflink_tm_env.zip/bin/Python3.7
flink.jm.memory 2048
flink.tm.memory 2048
下面的例子裡,可以在 PyFlink 客戶端 (JobManager 側) 使用上面建立的 JobManager 側的 Conda 環境,比如下邊使用了 Matplotlib。
-
下面的例子是在 PyFlink UDF 裡使用上面建立的 TaskManager 側 Conda 環境裡的庫,比如下面在 UDF 裡使用 Pandas。
三、總結與未來
目前我們需要建立 2 個 Conda env ,原因是 Zeppelin 支援 tar.gz 格式,而 Flink 只支援 zip 格式。等後期兩邊統一之後,只要建立一個 Conda env 就可以;
apache-flink 現在包含了 Flink 的 jar 包,這就導致打出來的 Conda env 特別大,Yarn container 在初始化的時候耗時會比較長,這個需要 Flink 社群提供一個輕量級的 Python 包 (不包含 Flink jar 包),就可以大大減小 Conda env 的大小。
為給行業帶來更多實時計算賦能實踐的思路,鼓勵廣大熱愛技術的開發者加深對 Flink 的掌握,Apache Flink 社群聯手阿里雲、英特爾、阿里巴巴人工智慧治理與可持續發展實驗室 (AAIG)、Occlum 聯合舉辦 "第三屆 Apache Flink 極客挑戰賽暨 AAIG CUP" 活動,即日起正式啟動。
▼ 掃描二維碼,瞭解更多賽事資訊 ▼
戳我,瞭解 Flink 挑戰賽資訊~
本文分享自微信公眾號 - Flink 中文社群(gh_5efd76d10a8d)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。
- 基於 Flink 流計算實現的股票交易實時資產應用
- 愛奇藝統一實時計算平臺建設
- Apache Flink 社群 2022 年度報告:Evolution, Diversity, Connection
- 流批一體在京東的探索與實踐
- 美團基於 Flink 的實時數倉平臺建設新進展
- 基於 Flink 打造的伴魚實時計算平臺 Palink 的設計與實現
- 官宣|Apache Flink 1.15 釋出公告
- Flink 在米哈遊的落地實踐
- 伴魚基於 Flink 構建資料整合平臺的設計與實現
- 百萬TPS高吞吐、秒級低延遲,阿里搜尋離線平臺如何實現?
- Flink CDC 2.1 正式釋出,穩定性大幅提升,新增 Oracle,MongoDB 支援
- 順豐科技 Hudi on Flink 實時數倉實踐
- 2021 年網易雲音樂實時計算平臺發展和挑戰
- 官宣|Apache Flink 1.14.0 釋出公告
- 5 年迭代 5 次,抖音基於 Flink 的推薦系統演進歷程
- 37 手遊基於 Flink CDC Hudi 湖倉一體方案實踐
- PyFlink 開發環境利器:Zeppelin Notebook
- 30 萬獎金等你來!第三屆 Apache Flink 極客挑戰賽暨 AAIG CUP 報名開始
- Flink 助力美團數倉增量生產
- Flink SQL 在位元組跳動的優化與實踐