Airflow 基礎系列-01 (Airflow元件)

語言: CN / TW / HK

Apache Airflow 微信公眾號是由一群Airflow的愛好者一起維護的,我們旨在普及Airflow知識,給廣大Airflow使用者搭建一個交流的平臺。

前期我們會發布一些Airflow的基本知識文章。本文是Apache Airflow 公眾號的開篇之作,主要介紹Airflow的整體架構以及主要的元件。

Airflow 架構

下圖是Airflow的架構圖,

我們可以看到Airflow主要有4個元件:

  • WebServer

  • Scheduler

  • Executor

  • Database

WebServer - Airflow UI

WebServer 實際上是一個Python的Flask app。你可以在Airflow WebServer的UI介面上看到排程作業的狀態。作業的資訊都儲存在資料庫裡,WebServer負責查詢資料庫並在頁面上展示作業資訊。WebServer 元件也負責讀取並展示Remote的作業日誌(Airflow的作業日誌可以存放在S3,Google Cloud Storage,AzureBlobs,ElasticsSearch等等)。

Scheduler - 作業排程器

Scheduler 是一個多執行緒的Python程序。Scheduler 通過檢查DAG的task依賴關係以及資料庫裡各個task的狀態來決定接下來跑哪個task,什麼時候跑以及在哪裡跑。

Executor - 作業執行器

Airflow   支援以下4種類型的 Executor

  • SequentialExecutor 按照線性的方式執行task,沒有併發和並行。一般用在開發環境裡用。

  • LocalExecutor 支援並行和多行程,一般用在單節點的Airflow裡。

  • CeleryExecutor 是在分散式環境下執行器。但是依賴第三方的message queue元件來排程task到worker節點,message queue可以用Redis,RabbitMQ。

  • KubernetesExecutor 是 Airfow 1.10新引入的一個執行器,主要用在K8s環境裡。

Metadata Database - 元資料資料庫

元資料資料庫可以是任何支援 SQLAlchemy的資料庫(比如Postgres,MySql)。Scheduler 通過修改資料庫來更新task狀態,WebServer會讀取資料庫來展示作業狀態

Airflow 是如何排程的

  1. Schduler 會掃描dags資料夾,在元資料資料庫裡建立DAG對應的記錄。根據配置,每一個DAG都會分配若干個程序。

  2. 每個程序都會掃描對應的DAG檔案,根據排程配置引數建立DagRuns。然後每一個滿足被排程條件的Task都會例項出來一個TaskInstance,TaskInstance會被初始化為 Scheduled 的狀態,並在資料庫裡更新。

  3. Scheduler 程序查詢資料庫拿到所有 Scheduled 狀態的tasks,並把他們傳送到Executor(對應TaskInstance的狀態更新為QUEUED)

  4. Worker會從queue里拉取task並執行。TaskInstance的狀態由 QUEUED 轉變為 RUNNING

  5. 當一個task結束了,worker會把task狀態更新為對應的結束狀態(FINISHED,FAILED,等等),Scheduler會在資料庫裡更新對應的狀態。

# https://github.com/apache/incubator-airflow/blob/2d50ba43366f646e9391a981083623caa12e8967/airflow/jobs.py#L1386


def _process_dags(self, dagbag, dags, tis_out):
"""
Iterates over the dags and processes them. Processing includes:
1. Create appropriate DagRun(s) in the DB.


2. Create appropriate TaskInstance(s) in the DB.


3. Send emails for tasks that have missed SLAs.


:param dagbag: a collection of DAGs to process
:type dagbag: models.DagBag
:param dags: the DAGs from the DagBag to process
:type dags: DAG
:param tis_out: A queue to add generated TaskInstance objects
:type tis_out: multiprocessing.Queue[TaskInstance]
:return: None
"""
for dag in dags:
dag = dagbag.get_dag(dag.dag_id)
if dag.is_paused:
self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
continue


if not dag:
self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
continue


self.log.info("Processing %s", dag.dag_id)


dag_run = self.create_dag_run(dag)
if dag_run:
self.log.info("Created %s", dag_run)
self._process_task_instances(dag, tis_out)
self.manage_slas(dag)


models.DagStat.update([d.dag_id for d in dags])

Airflow 元件配置

配置Airflow 元件之間的互動是同一個airflow.cfg 檔案控制的,這個檔案裡本身有對各種配置的說明文件,這裡我們介紹一些常用的配置項:

Parallelism

以下3個引數可以用來控制task的並行度。

parallelismdag_concurrency and  max_active_runs_per_dag

parallelism是指airflow executor最多執行的task數。dag_concurrenty 是指單個dag最多執行的task數。

max_active_runs_per_dag 是指最多的dag執行例項。

Scheduler

job_heartbeat_sec 是指task接受外部kill signal的頻率(比如你在airflow web頁面kill task),預設是5秒鐘

scheduler_heartbeat_sec 是指scheduler 觸發新task的時間隔間,預設是5秒。

還有更多配置會在後面的文章裡介紹,也希望大家多多支援Airflow公眾號,後續我們更新更多有關Airflow的文章。

本文翻譯 https://www.astronomer.io/guides/airflow-components