大数据调度平台Airflow(五):Airflow使用
目錄
Airflow使用
一、Airflow調度Shell命令
1.首先我們需要創建一個python文件,導入需要的類庫
2.實例化DAG
3、定義Task
4、設置task依賴關系
5、上傳python配置腳本
6、重啟Airflow
7、執行airflow
二、DAG調度觸發時間
三、DAG catchup 參數設置
四、DAG調度周期設置
五、DAG任務依賴設置
1、DAG任務依賴設置一
2、???????DAG任務依賴設置二
3、???????DAG任務依賴設置三
4、???????DAG任務依賴設置四
5、???????DAG任務依賴設置五
Airflow使用
上文說到使用Airflow進行任務調度大體步驟如下:
- 創建python文件,根據實際需要,使用不同的Operator
- 在python文件不同的Operator中傳入具體參數,定義一系列task
- 在python文件中定義Task之間的關系,形成DAG
- 將python文件上傳執行,調度DAG,每個task會形成一個Instance
- 使用命令行或者WEBUI進行查看和管理
以上python文件就是Airflow python腳本,使用代碼方式指定DAG的結構
一、Airflow調度Shell命令
下面我們以調度執行shell命令為例,來講解Airflow使用。
1.首先我們需要創建一個python文件,導入需要的類庫
# 導入 DAG 對象,后面需要實例化DAG對象
from airflow import DAG# 導入BashOperator Operators,我們需要利用這個對象去執行流程
from airflow.operators.bash import BashOperator
注意:以上代碼可以在開發工具中創建,但是需要在使用的python3.7環境中導入安裝Airflow包。
D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
2.實例化DAG
from datetime import datetime, timedelta# default_args中定義一些參數,在實例化DAG時可以使用,使用python dic 格式定義
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2022, 3, 25), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)
注意:
- 實例化DAG有三種方式
第一種方式:
with DAG("my_dag_name") as dag:op=XXOperator(task_id="task")
第二種方式(以上采用這種方式):
my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)
第三種方式:
@dag(start_date=days_ago(2))
def generate_dag():op = XXOperator(task_id="task")
dag = generate_dag()
- baseoperator基礎參數說明:
可以參照:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator查看baseopartor中更多參數。
- DAG參數說明
可以參照:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html
查看DAG參數說明,也可以直接在開發工具點擊DAG進入源碼看下對應參數有哪些。
3、定義Task
當實例化Operator時會生成Task任務,從一個Operator中實例化出來對象的過程被稱為一個構造方法,每個構造方法中都有“task_id”充當任務的唯一標識符。
下面我們定義三個Operator,也就是三個Task,每個task_id 不能重復。
# operator 支持多種類型, 這里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)
注意:
- 每個operator中可以傳入對應的參數,覆蓋DAG默認的參數,例如:last task中“retries”=3 就替代了默認的1。任務參數的優先規則如下:①.顯示傳遞的參數 ②.default_args字典中存在的值③.operator的默認值(如果存在)。
- BashOperator使用方式參照:http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html#howto-operator-bashoperator
4、設置task依賴關系
#使用 set_upstream、set_downstream 設置依賴關系,不能出現環形鏈路,否則報錯
# middle.set_upstream(first) # middle會在first執行完成之后執行
# last.set_upstream(middle) # last 會在 middle執行完成之后執行#也可以使用位移符來設置依賴關系
first >> middle >>last # first 首先執行,middle次之,last最后
# first >> [middle,last] # first首先執行,middle ,last并行執行
注意:當執行腳本時,如果在DAG中找到一條環形鏈路(例如:A->B->C-A)會引發異常。更多DAG task依賴關系可參照官網:http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies?
5、上傳python配置腳本
到目前為止,python配置如下:
# 導入 DAG 對象,后面需要實例化DAG對象
from airflow import DAG# 導入BashOperator Operators,我們需要利用這個對象去執行流程
from airflow.example_dags.example_bash_operator import dagfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# default_args中定義一些參數,在實例化DAG時可以使用,使用python dic 格式定義
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)# operator 支持多種類型, 這里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)#使用 set_upstream、set_downstream 設置依賴關系,不能出現環形鏈路,否則報錯
# middle.set_upstream(first) # middle會在first執行完成之后執行
# last.set_upstream(middle) # last 會在 middle執行完成之后執行#也可以使用位移符來設置依賴關系
first >> middle >>last # first 首先執行,middle次之,last最后
# first >> [middle,last] # first首先執行,middle ,last并行執行
?將以上python配置文件上傳到$AIRFLOW_HOME/dags目錄下,默認$AIRFLOW_HOME為安裝節點的“/root/airflow”目錄,當前目錄下的dags目錄需要手動創建。
6、重啟Airflow
“ps aux|grep webserver”和“ps aux|grep scheduler”找到對應的airflow進程殺掉,重新啟動Airflow。重啟之后,可以在airflow webui看到對應的DAG ID ”myairflow_execute_bash”。
7、執行airflow
按照如下步驟執行DAG,首先打開工作流,然后“Trigger DAG”執行,隨后可以看到任務執行成功。
查看task執行日志:
二、DAG調度觸發時間
在Airflow中,調度程序會根據DAG文件中指定的“start_date”和“schedule_interval”來運行DAG。特別需要注意的是Airflow計劃程序在計劃時間段的末尾觸發執行DAG,而不是在開始時刻觸發DAG,例如:
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2022, 3, 25), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)
以上配置的DAG是從世界標準時間2022年3月24號開始調度,每隔1天執行一次,這個DAG的具體運行時間如下圖:?
| 自動調度DAG 執行日期 | 自動調度DAG實際執行觸發時間 |
| 2022-03-24,00:00:00+00:00 | 2022-03-25,00:00:00+00:00 |
| 2022-03-25,00:00:00+00:00 | 2022-03-26,00:00:00+00:00 |
| 2022-03-26,00:00:00+00:00 | 2022-03-27,00:00:00+00:00 |
| 2022-03-27,00:00:00+00:00 | 2022-03-28,00:00:00+00:00 |
| 2022-03-28,00:00:00+00:00 | 2022-03-29,00:00:00+00:00 |
| ... ... | ... ... |
以上表格中以第一條數據為例解釋,Airflow正常調度是每天00:00:00 ,假設當天日期為2022-03-24,正常我們認為只要時間到了2022-03-24 00:00:00 就會執行,改調度時間所處于的調度周期為2022-03-24 00:00:00 ~ 2022-03-25 00:00:00 ,在Airflow中實際上是在調度周期末端觸發執行,也就是說2022-03-24 00:00:00 自動觸發執行時刻為 2022-03-25 00:00:00。?
如下圖,在airflow中,“execution_date”不是實際運行時間,而是其計劃周期的開始時間戳。例如:execution_date 是2021-09-04 00:00:00 的DAG 自動調度運行的實際時間為2021-09-05 00:00:00。當然除了自動調度外,我們還可以手動觸發執行DAG執行,要判斷DAG運行時計劃調度(自動調度)還是手動觸發,可以查看“Run Type”。
三、DAG catchup 參數設置
在Airflow的工作計劃中,一個重要的概念就是catchup(追趕),在實現DAG具體邏輯后,如果將catchup設置為True(默認就為True),Airflow將“回填”所有過去的DAG run,如果將catchup設置為False,Airflow將從最新的DAG run時刻前一時刻開始執行 DAG run,忽略之前所有的記錄。
例如:現在某個DAG每隔1分鐘執行一次,調度開始時間為2001-01-01 ,當前日期為2021-10-01 15:23:21,如果catchup設置為True,那么DAG將從2001-01-01 00:00:00 開始每分鐘都會運行當前DAG。如果catchup 設置為False,那么DAG將從2021-10-01 15:22:20(當前2021-10-01 15:23:21前一時刻)開始執行DAG run。
舉例:有first ,second,third三個shell命令任務,按照順序調度,每隔1分鐘執行一次,首次執行時間為2000-01-01。
設置catchup 為True(默認),DAG python配置如下:
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2001, 1, 1), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}
dag = DAG(dag_id = 'catchup_test1 ', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1), # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒catchup=True # 執行DAG時,將開始時間到目前所有該執行的任務都執行,默認為True
)first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)
middle = BashOperator(task_id='second',bash_command='echo "run second task"',dag=dag
)
last = BashOperator(task_id='third',bash_command='echo "run third task"',dag=dag,retries=3
)
first >> middle >>last
上傳python配置文件到$AIRFLOW_HOME/dags下,重啟airflow,DAG執行調度如下:
設置catchup 為False,DAG python配置如下:
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2001, 1, 1), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}
dag = DAG(dag_id = 'catchup_test2', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1), # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒catchup=False # 執行DAG時,將開始時間到目前所有該執行的任務都執行,默認為True
)first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)
middle = BashOperator(task_id='second',bash_command='echo "run second task"',dag=dag
)
last = BashOperator(task_id='third',bash_command='echo "run third task"',dag=dag,retries=3
)
first >> middle >>last
上傳python配置文件到$AIRFLOW_HOME/dags下,重啟airflow,DAG執行調度如下:
有兩種方式在Airflow中配置catchup:
- 全局配置
在airflow配置文件airflow.cfg的scheduler部分下,設置catchup_by_default=True(默認)或False,這個設置是全局性的設置。
- DAG文件配置
在python代碼配置中設置DAG對象的參數:dag.catchup=True或False。
dag = DAG(dag_id = 'myairflow_execute_bash',
default_args = default_args,
catchup=False,schedule_interval = timedelta(days=1))
四、DAG調度周期設置
每個DAG可以有或者沒有調度執行周期,如果有調度周期,我們可以在python代碼DAG配置中設置“schedule_interval”參數來指定調度DAG周期,可以通過以下三種方式來設置。
- 預置的Cron調度
Airflow預置了一些Cron調度周期,可以參照:
DAG Runs — Airflow Documentation,如下圖:
?
在python配置文件中使用如下:
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = '@daily' # 使用預置的Cron調度,每天0點0分調度
- Cron
這種方式就是寫Linux系統的crontab定時任務命令,可以在https://crontab.guru/網站先生成對應的定時調度命令,其格式如下:
minute hour day month week
minute:表示分鐘,可以從0~59之間的任意整數。
hour:表示小時,可以是從0到23之間的任意整數。
day:表示日期,可以是1到31之間的任何整數。
month:表示月份,可以是從1到12之間的任何整數。
week:表示星期幾,可以是從0到7之間的任何整數,這里的0或7代表星期日。
以上各個字段中還可以使用特殊符號代表不同意思:
星號(*):代表所有可能的值,例如month字段如果是星號,則表示在滿足其它字段的制約條件后每月都執行該命令操作。
逗號(,):可以用逗號隔開的值指定一個列表范圍,例如,”1,2,5,7,8,9”
中杠(-):可以用整數之間的中杠表示一個整數范圍,例如”2-6”表示”2,3,4,5,6”
正斜線(/):可以用正斜線指定時間的間隔頻率,步長,例如”0-23/2”表示每兩小時執行一次。
在python配置文件中使用如下:
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = '* * * * *' # 使用Crontab 定時任務命令,每分鐘運行一次
)
- datetime.timedelta
timedelta是使用python timedelta 設置調度周期,可以配置天、周、小時、分鐘、秒、毫秒。在python配置文件中使用如下:
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=5) # 使用python timedelta 設置調度周期,可以配置天、周、小時、分鐘、秒、毫秒
)
五、???????DAG任務依賴設置
1、???????DAG任務依賴設置一
- DAG調度流程圖
- task執行依賴
A >> B >>C
- 完整代碼
'''
airflow 任務依賴關系設置一'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_1', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)A >> B >>C
2、???????DAG任務依賴設置二
- DAG調度流程圖
- task執行依賴
[A,B] >>C >>D
- 完整代碼
'''
airflow 任務依賴關系設置二'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_2', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)[A,B] >>C >>D
3、???????DAG任務依賴設置三
- DAG調度流程圖
- task執行依賴
[A,B,C] >>D >>[E,F]
- 完整代碼
'''
airflow 任務依賴關系設置三'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_3', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)F = BashOperator(task_id='F',bash_command='echo "run F task"',dag=dag
)[A,B,C] >>D >>[E,F]
4、???????DAG任務依賴設置四
- DAG調度流程圖
?
- task執行依賴
A >>B>>C>>D
A >>E>>F
- 完整代碼
'''
airflow 任務依賴關系設置四'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_4', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)F = BashOperator(task_id='F',bash_command='echo "run F task"',dag=dag
)A >>[B,C,D]
A >>[E,F]
5、???????DAG任務依賴設置五
- DAG調度流程圖
?
- task執行依賴
A >>B>>E
C >>D>>E
- 完整代碼
'''
airflow 任務依賴關系設置五'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執行的時間,為 UTC 時間'retries': 1, # 失敗重試次數'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_5', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)A >>B>>E
C >>D>>E
- 📢博客主頁:https://lansonli.blog.csdn.net
- 📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正!
- 📢本文由 Lansonli 原創,首發于 CSDN博客🙉
- 📢大數據系列文章會每天更新,停下休息的時候不要忘了別人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活??
總結
以上是生活随笔為你收集整理的大数据调度平台Airflow(五):Airflow使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 大数据调度平台Airflow(三):Ai
- 下一篇: 大数据调度平台Airflow(八):Ai