Airflow 基礎系列 - 03 (Operators介紹)

語言: CN / TW / HK

本文翻譯 https://www.astronomer.io/guides/what-is-an-operator

Operators

Operators是DAG的主要構成部分,是DAG中封裝執行邏輯的基本單元。

如果在DAG中建立一個operator的例項,並提供其所需引數,就構成了一個task。一個DAG通常由很多task以及task之間的依賴關係構成。當Airflow根據 execution_date 執行task時,就會產生一個task instance。

BashOperator

t1 = BashOperator(
        task_id='bash_hello_world',
        dag=dag,
        bash_command='echo "Hello World"'
        )

t1

該BashOperator執行了一個bash命令 echo "Hello World"

PythonOperator

def hello(**kwargs):
    print('Hello from {kw}'.format(kw=kwargs['my_keyword']))

t2 = PythonOperator(
        task_id='python_hello',
        dag=dag,
        python_callable=hello,
        op_kwargs={'my_keyword': 'Airflow'}
        )

PythonOperator 呼叫程式碼中定義的 hello 函式, 並通過 PythonOperator op_kwargs 引數傳入函式所需引數。該task執行後將會在控制檯輸出"Hello from Airflow" 。

PostgresOperator

t3 = PostgresOperator(
        task_id='PythonOperator',
        sql='CREATE TABLE my_table (my_column varchar(10));',
        postgres_conn_id='my_postgres_connection',
        autocommit=False
    )

該PostgresOperator向postgres資料庫執行一條sql語句,資料庫連線資訊存在名為 my_postgres_connection 的airflow connection中。如果看PostgresOperator的程式碼,會發現它實際上通過PostgresHook實現與資料庫互動。

SSHOperator

t4 = SSHOperator(
        task_id='SSHOperator',
        ssh_conn_id='my_ssh_connection',
        command='echo "Hello from SSH Operator"'
    )

與BashOperator類似,SSHOperator也是執行bash命令,只不過它是通過SSH到遠端主機執行bash命令。

用於對遠端伺服器進行身份驗證的私鑰儲存在Airflow Connection中,名為 my_ssh_conenction ,它可以在任意DAG中引用,所以在使用SSHOperator時只需要關心bash命令是什麼。

SSHOperator是通過 SSHHook 實現建立SSH連線和執行命令。

S3ToRedshift Operator

t5 = S3ToRedshiftOperator(
        task_id='S3ToRedshift',
        schema='public',
        table='my_table',
        s3_bucket='my_s3_bucket',
        s3_key='{{ ds_nodash }}/my_file.csv',
        redshift_conn_id='my_redshift_connection',
        aws_conn_id='my_aws_connection'
    )

S3ToRedshiftOperator通過Redshift的COPY命令實現從S3載入資料到Redshift,它屬於Transfer Operators,這一類Operator用於從一個系統傳輸資料到另外一個系統。注意S3ToRedshiftOperator的引數中有兩個Airflow connection,一個是Redshift的連線資訊,一個是S3的連線資訊。

這裡也用到了另外一個概念 - templates 。在引數 s3_key 處,有個jinja模板符號 {{ }} ,其中傳入的 ds_nodash 是Airflow引擎傳入模板的預定義變數,其值為DAG Run的邏輯執行日期, nodash 表示日期格式為沒有破折號的格式。所以 s3_key 傳入的值依賴於具體執行的時間,如下是個樣例值 20190711/my_file.csv

templates適合於需要傳入動態變化的引數的場景,也能使DAG程式碼保證冪等(比如上述程式碼中產生的s3檔案都根據各自的日期命名,從而保證了冪等性)。