Airflow 基礎系列-01 (Airflow元件)
Apache Airflow 微信公眾號是由一群Airflow的愛好者一起維護的,我們旨在普及Airflow知識,給廣大Airflow使用者搭建一個交流的平臺。
前期我們會發布一些Airflow的基本知識文章。本文是Apache Airflow 公眾號的開篇之作,主要介紹Airflow的整體架構以及主要的元件。
Airflow 架構
下圖是Airflow的架構圖,
我們可以看到Airflow主要有4個元件:
-
WebServer
-
Scheduler
-
Executor
-
Database
WebServer - Airflow UI
WebServer 實際上是一個Python的Flask app。你可以在Airflow WebServer的UI介面上看到排程作業的狀態。作業的資訊都儲存在資料庫裡,WebServer負責查詢資料庫並在頁面上展示作業資訊。WebServer 元件也負責讀取並展示Remote的作業日誌(Airflow的作業日誌可以存放在S3,Google Cloud Storage,AzureBlobs,ElasticsSearch等等)。
Scheduler - 作業排程器
Scheduler 是一個多執行緒的Python程序。Scheduler 通過檢查DAG的task依賴關係以及資料庫裡各個task的狀態來決定接下來跑哪個task,什麼時候跑以及在哪裡跑。
Executor - 作業執行器
Airflow 支援以下4種類型的 Executor
-
SequentialExecutor 按照線性的方式執行task,沒有併發和並行。一般用在開發環境裡用。
-
LocalExecutor 支援並行和多行程,一般用在單節點的Airflow裡。
-
CeleryExecutor 是在分散式環境下執行器。但是依賴第三方的message queue元件來排程task到worker節點,message queue可以用Redis,RabbitMQ。
-
KubernetesExecutor 是 Airfow 1.10新引入的一個執行器,主要用在K8s環境裡。
Metadata Database - 元資料資料庫
元資料資料庫可以是任何支援 SQLAlchemy的資料庫(比如Postgres,MySql)。Scheduler 通過修改資料庫來更新task狀態,WebServer會讀取資料庫來展示作業狀態
Airflow 是如何排程的
-
Schduler 會掃描dags資料夾,在元資料資料庫裡建立DAG對應的記錄。根據配置,每一個DAG都會分配若干個程序。
-
每個程序都會掃描對應的DAG檔案,根據排程配置引數建立DagRuns。然後每一個滿足被排程條件的Task都會例項出來一個TaskInstance,TaskInstance會被初始化為 Scheduled 的狀態,並在資料庫裡更新。
-
Scheduler 程序查詢資料庫拿到所有 Scheduled 狀態的tasks,並把他們傳送到Executor(對應TaskInstance的狀態更新為QUEUED)
-
Worker會從queue里拉取task並執行。TaskInstance的狀態由 QUEUED 轉變為 RUNNING
-
當一個task結束了,worker會把task狀態更新為對應的結束狀態(FINISHED,FAILED,等等),Scheduler會在資料庫裡更新對應的狀態。
# http://github.com/apache/incubator-airflow/blob/2d50ba43366f646e9391a981083623caa12e8967/airflow/jobs.py#L1386
def _process_dags(self, dagbag, dags, tis_out):
"""
Iterates over the dags and processes them. Processing includes:
1. Create appropriate DagRun(s) in the DB.
2. Create appropriate TaskInstance(s) in the DB.
3. Send emails for tasks that have missed SLAs.
:param dagbag: a collection of DAGs to process
:type dagbag: models.DagBag
:param dags: the DAGs from the DagBag to process
:type dags: DAG
:param tis_out: A queue to add generated TaskInstance objects
:type tis_out: multiprocessing.Queue[TaskInstance]
:return: None
"""
for dag in dags:
dag = dagbag.get_dag(dag.dag_id)
if dag.is_paused:
self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
continue
if not dag:
self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
continue
self.log.info("Processing %s", dag.dag_id)
dag_run = self.create_dag_run(dag)
if dag_run:
self.log.info("Created %s", dag_run)
self._process_task_instances(dag, tis_out)
self.manage_slas(dag)
models.DagStat.update([d.dag_id for d in dags])
Airflow 元件配置
配置Airflow 元件之間的互動是同一個airflow.cfg 檔案控制的,這個檔案裡本身有對各種配置的說明文件,這裡我們介紹一些常用的配置項:
Parallelism
以下3個引數可以用來控制task的並行度。
parallelism
, dag_concurrency
and max_active_runs_per_dag
parallelism是指airflow executor最多執行的task數。dag_concurrenty 是指單個dag最多執行的task數。
max_active_runs_per_dag 是指最多的dag執行例項。
Scheduler
job_heartbeat_sec 是指task接受外部kill signal的頻率(比如你在airflow web頁面kill task),預設是5秒鐘
scheduler_heartbeat_sec 是指scheduler 觸發新task的時間隔間,預設是5秒。
還有更多配置會在後面的文章裡介紹,也希望大家多多支援Airflow公眾號,後續我們更新更多有關Airflow的文章。
本文翻譯 http://www.astronomer.io/guides/airflow-components
- Airflow sensor簡介
- Airflow 基礎系列 - 03 (Operators介紹)
- Airflow 基礎系列 - 02 (Executor詳解)
- 深入理解Delta Lake 實現原理(on Zeppelin)
- Airflow 基礎系列-01 (Airflow元件)
- PyFlink 開發環境利器:Zeppelin Notebook
- 在Zeppelin中如何使用Flink
- 記一次 Centos7.x 安裝、部署 Zeppelin v0.9.0 並配置 PostgreSql 資料庫
- 新版Denodo Platform 8.0加速混合/多雲整合,通過AI/ML實現資料管理自動化,並提高效能
- PyFlink 區塊鏈?揭祕行業領頭企業 BTC.com 如何實現實時計算
- PyFlink 區塊鏈?揭祕行業領頭企業 BTC.com 如何實現實時計算
- Flink x Zeppelin ,Hive Streaming 實戰解析
- Zeppelin整合Flink採坑實錄