Airflow 基礎系列 - 03 (Operators介紹)
本文翻譯 http://www.astronomer.io/guides/what-is-an-operator
Operators
Operators是DAG的主要構成部分,是DAG中封裝執行邏輯的基本單元。
如果在DAG中建立一個operator的例項,並提供其所需引數,就構成了一個task。一個DAG通常由很多task以及task之間的依賴關係構成。當Airflow根據 execution_date 執行task時,就會產生一個task instance。
BashOperator
t1 = BashOperator( task_id='bash_hello_world', dag=dag, bash_command='echo "Hello World"' )
t1
該BashOperator執行了一個bash命令 echo "Hello World" 。
PythonOperator
def hello(**kwargs): print('Hello from {kw}'.format(kw=kwargs['my_keyword'])) t2 = PythonOperator( task_id='python_hello', dag=dag, python_callable=hello, op_kwargs={'my_keyword': 'Airflow'} )
該 PythonOperator 呼叫程式碼中定義的 hello 函式, 並通過 PythonOperator 的 op_kwargs 引數傳入函式所需引數。該task執行後將會在控制檯輸出"Hello from Airflow" 。
PostgresOperator
t3 = PostgresOperator( task_id='PythonOperator', sql='CREATE TABLE my_table (my_column varchar(10));', postgres_conn_id='my_postgres_connection', autocommit=False )
該PostgresOperator向postgres資料庫執行一條sql語句,資料庫連線資訊存在名為 my_postgres_connection 的airflow connection中。如果看PostgresOperator的程式碼,會發現它實際上通過PostgresHook實現與資料庫互動。
SSHOperator
t4 = SSHOperator( task_id='SSHOperator', ssh_conn_id='my_ssh_connection', command='echo "Hello from SSH Operator"' )
與BashOperator類似,SSHOperator也是執行bash命令,只不過它是通過SSH到遠端主機執行bash命令。
用於對遠端伺服器進行身份驗證的私鑰儲存在Airflow Connection中,名為 my_ssh_conenction ,它可以在任意DAG中引用,所以在使用SSHOperator時只需要關心bash命令是什麼。
SSHOperator是通過 SSHHook 實現建立SSH連線和執行命令。
S3ToRedshift Operator
t5 = S3ToRedshiftOperator( task_id='S3ToRedshift', schema='public', table='my_table', s3_bucket='my_s3_bucket', s3_key='{{ ds_nodash }}/my_file.csv', redshift_conn_id='my_redshift_connection', aws_conn_id='my_aws_connection' )
S3ToRedshiftOperator通過Redshift的COPY命令實現從S3載入資料到Redshift,它屬於Transfer Operators,這一類Operator用於從一個系統傳輸資料到另外一個系統。注意S3ToRedshiftOperator的引數中有兩個Airflow connection,一個是Redshift的連線資訊,一個是S3的連線資訊。
這裡也用到了另外一個概念 - templates 。在引數 s3_key 處,有個jinja模板符號 {{ }} ,其中傳入的 ds_nodash 是Airflow引擎傳入模板的預定義變數,其值為DAG Run的邏輯執行日期, nodash 表示日期格式為沒有破折號的格式。所以 s3_key 傳入的值依賴於具體執行的時間,如下是個樣例值 20190711/my_file.csv 。
templates適合於需要傳入動態變化的引數的場景,也能使DAG程式碼保證冪等(比如上述程式碼中產生的s3檔案都根據各自的日期命名,從而保證了冪等性)。
- Airflow sensor簡介
- Airflow 基礎系列 - 03 (Operators介紹)
- Airflow 基礎系列 - 02 (Executor詳解)
- 深入理解Delta Lake 實現原理(on Zeppelin)
- Airflow 基礎系列-01 (Airflow元件)
- PyFlink 開發環境利器:Zeppelin Notebook
- 在Zeppelin中如何使用Flink
- 記一次 Centos7.x 安裝、部署 Zeppelin v0.9.0 並配置 PostgreSql 資料庫
- 新版Denodo Platform 8.0加速混合/多雲整合,通過AI/ML實現資料管理自動化,並提高效能
- PyFlink 區塊鏈?揭祕行業領頭企業 BTC.com 如何實現實時計算
- PyFlink 區塊鏈?揭祕行業領頭企業 BTC.com 如何實現實時計算
- Flink x Zeppelin ,Hive Streaming 實戰解析
- Zeppelin整合Flink採坑實錄