Airflow sensor簡介
本文譯自:
-
http://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html
-
http://airflow.apache.org/docs/apache-airflow/stable/concepts/smart-sensors.html
Sensors
Sensors是一種特殊型別的Operator。歸納起來,這種型別的Operator只做一件事,即等待某件事情發生。這件事情可能是到達某個時間點、檔案寫入完成或是接收到某個外部系統事件。無論如何,當這件事情發生的時候,Sensors即執行成功,並繼續觸發下游任務的執行。
按照以上定義,我們可以得知Sensors在大多數時間內都處於空轉狀態。基於此特性,Sensors在Airflow中有三種不同的實現方式。
-
poke (預設模式): Sensor在執行結束前將持續佔用一個slot。
-
reschedule : Sensor僅在執行檢查邏輯時佔用一個slot,在執行結束前重新排程一個新任務,在兩次檢查邏輯執行之間不會佔用slot。
-
smart sensor : 使用一箇中心化的Sensor執行所有相同型別的Sensor。
在所有模式中,poke和reschedule這2種模式可以在初始化sensor時直接進行配置,在這兩種模式之間選擇的主要參考因素是檢查邏輯的重複執行時間間隔。如果檢查邏輯需要在較短時間間隔內執行則建議使用poke模式,否則使用reschedule模式。
如果要使用smart sensor則需要進行額外的配置,我們將在下面的章節進行詳細介紹。
Smart Sensors簡介
注意事項
smart sensor是一個airflow2中的早期特性。在未來的Airflow迭代過程中,可能會對該功能進行非相容性變更。但這並不意味著smart sensor是一個試驗性的特性,事實上,該特性已於2020年早些時候在Airbnb內部的生產排程中上線執行,並且顯著降低了sensor的工作負載。
預設sensor實現
smart sensor實現
smart sensor通過將若干個輕量級的sensor合併入同一個例項,運行於一個內建的DAG程序中,從而極大的降低Airflow sensor的工作負載。
當我們在Airflow中開啟smart sensor功能後,Airflow將會建立一些特殊的內建DAG。這些DAG包含了SmartSensorOperator,該Operator將從sensor_instance表中批量獲取sensor任務並執行相關邏輯。值得注意的是,使用者無需變更原有DAG邏輯即可使用該特性。
如何使用Smart Sensor
要開啟smart sensor,我們需要在airflow.cfg配置檔案中新增以下配置
[smart_sensor] use_smart_sensor = true shard_code_upper_limit = 10000 # Users can change the following config based on their requirements shards = 5 sensors_enabled = NamedHivePartitionSensor, MetastorePartitionSensor
-
use_smart_sensor : smart sensor開關
-
shards : 中心化smart sensor程序的並行度
-
sensors_enabled : 允許加入smart sensor的Sensor型別
開啟和禁用smart sensor服務是Airflow系統級別的配置變更。改變更對於使用者來說應該是透明的,現有的DAG無需隨smart sensor的開啟或禁用而變更,且配置變更不會引起任何使用者sensor任務的失敗。
如何開發支援smart sensor的Operator
-
首先在sensor類中定義 poke_context_fields 屬性。 poke_context_fields 需要包含初始化sensor需要的所有屬性名。
-
在 airflow.cfg 中新增該sensor類名到 smart_sensor.sensors_enabled 中 ,所有sensor類名均已逗號分隔。
- Airflow sensor簡介
- Airflow 基礎系列 - 03 (Operators介紹)
- Airflow 基礎系列 - 02 (Executor詳解)
- 深入理解Delta Lake 實現原理(on Zeppelin)
- Airflow 基礎系列-01 (Airflow元件)
- PyFlink 開發環境利器:Zeppelin Notebook
- 在Zeppelin中如何使用Flink
- 記一次 Centos7.x 安裝、部署 Zeppelin v0.9.0 並配置 PostgreSql 資料庫
- 新版Denodo Platform 8.0加速混合/多雲整合,通過AI/ML實現資料管理自動化,並提高效能
- PyFlink 區塊鏈?揭祕行業領頭企業 BTC.com 如何實現實時計算
- PyFlink 區塊鏈?揭祕行業領頭企業 BTC.com 如何實現實時計算
- Flink x Zeppelin ,Hive Streaming 實戰解析
- Zeppelin整合Flink採坑實錄