airflow mysql_Airflow 使用及原理分析
Airflow 入門及使用
什么是 Airflow?Airflow 是一個使用 Python 語言編寫的 Data Pipeline 調度和監控工作流的平臺。
Airflow 是通過 DAG(Directed acyclic graph 有向無環圖)來管理任務流程的任務調度工具,不需要知道業務數據的具體內容,設置任務的依賴關系即可實現任務調度。
這個平臺擁有和 Hive、Presto、MySQL、HDFS、Postgres 等數據源之間交互的能力,并且提供了鉤子(hook)使其擁有很好地擴展性。除了使用命令行,該工具還提供了一個 WebUI 可以可視化的查看依賴關系、監控進度、觸發任務等。
Airflow 的架構在一個可擴展的生產環境中,Airflow 含有以下組件:
元數據庫:這個數據庫存儲有關任務狀態的信息。
調度器:Scheduler 是一種使用 DAG 定義結合元數據中的任務狀態來決定哪些任務需要被執行以及任務執行優先級的過程。調度器通常作為服務運行。
執行器:Executor 是一個消息隊列進程,它被綁定到調度器中,用于確定實際執行每個任務計劃的工作進程。有不同類型的執行器,每個執行器都使用一個指定工作進程的類來執行任務。例如,LocalExecutor 使用與調度器進程在同一臺機器上運行的并行進程執行任務。其他像 CeleryExecutor 的執行器使用存在于獨立的工作機器集群中的工作進程執行任務。
Workers:這些是實際執行任務邏輯的進程,由正在使用的執行器確定。
Airflow 解決哪些問題通常,在一個運維系統,數據分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限于:
時間依賴:任務需要等待某一個時間點觸發。
外部系統依賴:任務依賴外部系統需要調用接口去訪問。
任務間依賴:任務 A 需要在任務 B 完成后啟動,兩個任務互相間會產生影響。
資源環境依賴:任務消耗資源非常多, 或者只能在特定的機器上執行。
crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。
Airflow 是一種 WMS,即:它將任務以及它們的依賴看作代碼,按照那些計劃規范任務執行,并在實際工作進程之間分發需執行的任務。
Airflow 提供了一個用于顯示當前活動任務和過去任務狀態的優秀 UI,并允許用戶手動管理任務的執行和狀態。
Airflow 中的工作流是具有方向性依賴的任務集合。
具體說就是 Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。
DAG 中的每個節點都是一個任務,DAG 中的邊表示的是任務之間的依賴(強制為有向無環,因此不會出現循環依賴,從而導致無限執行循環)。
Airflow 在 ETL 上的實踐ETL,是英文 Extract,Transform,Load 的縮寫,用來描述將數據從來源端經過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。ETL 一詞較常用在數據倉庫,Airflow 在解決 ETL 任務各種依賴問題上的能力恰恰是我們所需要的。
在現階段的實踐中,我們使用 Airflow 來同步各個數據源數據到數倉,同時定時執行一些批處理任務及帶有數據依賴、資源依賴關系的計算腳本。
本文立意于科普介紹,故在后面的用例中只介紹了 BashOperator,PythonOperator這倆個最為易用且在我們日常使用中最為常見的 Operator。
Airflow 同時也具有不錯的集群擴展能力,可使用 CeleryExecuter 以及多個 Pool 來提高任務并發度。
Airflow在 CeleryExecuter 下可以使用不同的用戶啟動 Worker,不同的 Worker 監聽不同的 Queue,這樣可以解決用戶權限依賴問題。Worker 也可以啟動在多個不同的機器上,解決機器依賴的問題。
Airflow 可以為任意一個 Task 指定一個抽象的 Pool,每個 Pool 可以指定一個 Slot 數。每當一個 Task 啟動時,就占用一個 Slot,當 Slot 數占滿時,其余的任務就處于等待狀態。這樣就解決了資源依賴問題。
Airflow 安裝及初始化假設:你已經安裝好了 Python 及配置好了其包管理工具 pip。
pip?install?apache-airflow
#?初始化數據庫
airflow?initdb
#?上面的命令默認在家目錄下創建?Airflow?文件夾和相關配置文件
#?也可以使用以下命令來指定目錄
export?AIRFLOW_HOME={yourpath}/airflow
#?配置數據庫
#?vim?airflow/airflow.cfg
#?修改?sql_alchemy_conn
#?守護進程運行?webserver,默認端口為8080,也可以通過`-p`來指定端口
airflow?webserver?-D
#?守護進程運行調度器
airflow?scheduler?-D
定義第一個 DAG在 AIRFLOW_HOME 目錄下新建 DAGs 文件夾,后面的所有 DAG 文件都要存儲在這個目錄。
新建 demo.py,語句含義見注釋。
from?datetime?import?datetime,?timedelta
from?airflow?import?DAG
from?airflow.utils.dates?import?days_ago
from?airflow.operators.bash_operator?import?BashOperator
from?airflow.operators.python_operator?import?PythonOperator
from?airflow.operators.dummy_operator?import?DummyOperator
def?default_options():
default_args?=?{
'owner':?'airflow',??#?擁有者名稱
'start_date':?days_ago(1),??#?第一次開始執行的時間,為?UTC?時間(注意不要設置為當前時間)
'retries':?1,#?失敗重試次數
'retry_delay':?timedelta(seconds=5)??#?失敗重試間隔
}
return?default_args
#?定義?DAG
def?test1(dag):
t?=?"echo?'hallo?world'"
#?operator?支持多種類型,?這里使用?BashOperator
task?=?BashOperator(
task_id='test1',??#?task_id
bash_command=t,??#?指定要執行的命令
dag=dag??#?指定歸屬的?DAG
)
return?task
def?hello_world_1():
current_time?=?str(datetime.today())
print('hello?world?at?{}'.format(current_time))
def?test2(dag):
#?PythonOperator
task?=?PythonOperator(
task_id='test2',
python_callable=hello_world_1,??#?指定要執行的函數
dag=dag)
return?task
def?test3(dag):
#?DummyOperator
task?=?DummyOperator(
task_id='test3',
dag=dag)
return?task
with?DAG(
'test_task',??#?dag_id
default_args=default_options(),??#?指定默認參數
schedule_interval="@once"??#?執行周期
)?as?d:
task1?=?test1(d)
task2?=?test2(d)
task3?=?test3(d)
task1?>>?task2?>>?task3??#?指定執行順序
寫完后執行 python $AIRFLOW_HOME/dags/demo.py 檢查是否有錯誤,如果命令行沒有報錯,就表示沒問題。
Web UI打開 localhost:8080。
主視圖:
Airflow 的 WebUI 是其任務調度可視化的體現,可以在這個 WebUI 上監控幾乎所有任務調度運行的實時及歷史數據。一些命令如 Trigger、Clear 均可以在 WebUI 上完成;一些全局參數也可以在主頁面導航欄 Admin 下配置。
點擊 dag_name,進入任務預覽:
任務圖視圖:
任務樹視圖:
其他常用命令#?測試任務,格式:airflow?test?dag_id?task_id?execution_time
airflow?test?test_task?test1?2019-09-10
#?查看生效的?DAGs
airflow?list_dags?-sd?$AIRFLOW_HOME/dags
#?開始運行任務(同?web?界面點?trigger?按鈕)
airflow?trigger_dag?test_task
#?暫停任務
airflow?pause?dag_id
#?取消暫停,等同于在?web?管理界面打開?off?按鈕
airflow?unpause?dag_id
#?查看?task?列表
airflow?list_tasks?dag_id??查看task列表
#?清空任務狀態
airflow?clear?dag_id
#?運行task
airflow?run?dag_id?task_id?execution_date
Airflow 核心原理分析
概念及發展JOB:最上層的工作。分為 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 創建,BackfillJob 由 Backfill 創建,LocalTaskJob 由前面兩種 Job 創建。
DAG:有向無環圖,用來表示工作流。
DAG Run:工作流實例,表示某個工作流的一次運行(狀態)。
Task:任務,工作流的基本組成部分。
TaskInstance:任務實例,表示某個任務的一次運行(狀態)。
在早期版本 Airflow 中,DAG 執行主要有兩種完全獨立的執行途徑:SchedulerJob 和 BackfillJob。在一次較大的重構中增加了 DagRun 方式,以跟蹤 DAG 的執行狀態。
結構關系圖:
DagRun 執行流程描述DagRuns 表示某個時間點 DAG 的狀態(也稱為 DagInstances)。要運行 DAG 或管理 DAG 的執行,必須首先創建一個 DagRun 實例。但是僅創建 DagRun 不足以實際運行 DAG(就像創建 TaskInstance 與實際運行任務并不一樣)。
因此需要一種機制來實現上述流程。結構相當簡單,維護一組要執行的 DagRuns 集合,并循環遍歷該集合,直到所有 DagRuns 成功或失敗為止。
基本的 DagRuns 循環如下所示:
刷新 DAGs
收集新的 DagRuns
執行 DagRuns(包括更新 DagRuns 的狀態為成功或失敗)
喚醒 executor/心跳檢查
Scheduler 的調度邏輯調度器實際上就是一個 airflow.jobs.SchedulerJob 實例 Job 持續運行 run 方法。job.run() 在開始時將自身的信息加入到 Job 表中,并維護狀態和心跳,預期能夠正常結束,將結束時間也更新到表中。但是實際上往往因為異常中斷,導致結束時間為空。不管是如何進行的退出,SchedulerJob 退出時會關閉所有子進程。
這里主要介紹下 Scheduler 的調度邏輯:
遍歷 DAGs 路徑下的所有 DAG 文件,啟動一定數量的進程(進程池),并且給每個進程指派一個 DAG 文件。每個 DagFileProcessor 解析分配給它的 DAG 文件,并根據解析結果在DB中創建 DagRuns 和 TaskInstance。
在 scheduler_loop 中,檢查與活動 DagRun 關聯的 TaskInstance 的狀態,解析 TaskInstance 之間的任何依賴,標識需要被執行的 TaskInstance,然后將它們添加至 executor 隊列,將新排列的 TaskInstance 狀態更新為QUEUED狀態。
每個可用的 executor 從隊列中取一個 TaskInstance,然后開始執行它,將此 TaskInstance 的數據庫記錄更新為SCHEDULED。
當一個 TaskInstance 完成運行,關聯的 executor 就會報告到隊列并更新數據庫中的 TaskInstance 的狀態(例如“完成”、“失敗”等)。
一旦所有的 DAG 處理完畢后,就會進行下一輪循環處理。這里還有一個細節就是上一輪的某個 DAG 的處理時間可能很長,導致到下一輪處理的時候這個 DAG 還沒有處理完成。Airflow 的處理邏輯是在這一輪不為這個 DAG 創建進程,這樣就不會阻塞進程去處理其余 DAG。
文檔原文:
Enumerate the all the files in the DAG directory.
Start a configurable number of processes and for each one, assign a DAG file to process.
In each child process, parse the DAG file, create the necessary DagRuns given the state of the DAG's task instances, and for all the task instances that should run, create a TaskInstance (with the SCHEDULED state) in the ORM.
Back in the main scheduler process, query the ORM for task instances in the SCHEDULED state. If any are found, send them to the executor and set the task instance state to QUEUED.
If any of the child processes have finished, create another process to work on the next file in the series, provided that the number of running processes is less than the configured limit.
Once a process has been launched for all of the files in the DAG directory, the cycle is repeated. If the process to parse a particular DAG file is still running when the file's turn comes up in the next cycle, a new process is not launched and a process for the next file in the series is launched instead. This way, a DAG file that takes a long time to parse does not necessarily block the processing of other DAGs.
Scheduler 模塊代碼結構DagFileProcessor 在子進程中解析 DAG 定義文件。對于發現的 DAG,檢查 DagRun 和 TaskInstance 的狀態。如果有 TaskInstance 可以運行,將狀態標記為 SCHEDULED。為每個 DAG 文件分配一個進程,同時在 DagFileProcessorManager 中保存有 DAG 和 processor 的映射表。在 DAG 沒有被任何 processor 處理的時候,才會給它創建新的處理進程。
DagFileProcessorManager 控制 DagFileProcessors 如何啟動。它追蹤哪些文件應該被處理并且確保一旦有一個 DagFileProcessor 完成解析,下一個 DAG 文件應該得到處理。并且控制 DagFileProcessors 的數量。
SchedulerJob 通過 Agent 獲取 manager 的 DAG 定義文件解析結果,并且將 SCHEDULED 狀態的 TaskInstance 發送給 executor 執行。
DagFileProcessorAgent 作為一個采集代理,scheduler 可以借助 Agent 獲取 manager 獲取到的 DAG 解析結果,并且可以控制manager的行為。
核心類分析Dag
method:
following_schedule() 計算當前 DAG 的下一次調度時間
previous_schedule() 計算當前 DAG 的上一次調度時間
get_dagrun() 返回給定執行日期的 dagrun(如果存在)
create_dagrun() 創建一個包括與此 DAG 相關任務的 dagrun
ckear() 清除指定日期范圍內與當前 DAG 相關的一組任務實例
run() 實例化為 BackfillJob 同時調用 job.run()
DagRun
model:
ID_PREFIX?=?'scheduled__'
ID_FORMAT_PREFIX?=?ID_PREFIX?+?'{0}'
id?=?Column(Integer,?primary_key=True)
dag_id?=?Column(String(ID_LEN))
execution_date?=?Column(UtcDateTime,?default=timezone.utcnow)
start_date?=?Column(UtcDateTime,?default=timezone.utcnow)
end_date?=?Column(UtcDateTime)
_state?=?Column('state',?String(50),?default=State.RUNNING)
run_id?=?Column(String(ID_LEN))
external_trigger?=?Column(Boolean,?default=True)
conf?=?Column(PickleType)
method:
get_dag() 返回與當前 DagRun 相關的 Dag
get_task_instances() 返回與當前 DagRun 的所有 TaskInstances
update_state() 根據 TaskInstances 的狀態確定 DagRun 的總體狀態
get_latest_runs() 返回每個 Dag 的最新一次 DagRun
TaskInstance
model:
__tablename__?=?"task_instance"
task_id?=?Column(String(ID_LEN),?primary_key=True)
dag_id?=?Column(String(ID_LEN),?primary_key=True)
execution_date?=?Column(UtcDateTime,?primary_key=True)
start_date?=?Column(UtcDateTime)
end_date?=?Column(UtcDateTime)
duration?=?Column(Float)
state?=?Column(String(20))
_try_number?=?Column('try_number',?Integer,?default=0)
max_tries?=?Column(Integer)
hostname?=?Column(String(1000))
unixname?=?Column(String(1000))
job_id?=?Column(Integer)
pool?=?Column(String(50),?nullable=False)
queue?=?Column(String(256))
priority_weight?=?Column(Integer)
operator?=?Column(String(1000))
queued_dttm?=?Column(UtcDateTime)
pid?=?Column(Integer)
executor_config?=?Column(PickleType(pickler=dill))
method:
get_dagrun() 返回當前 TaskInstance 的 DagRun
run() TaskInstance run
get_template_context() 通過 Jinja2 模板獲取上下文
xcom_push() 創建一個 XCom 可用于 task 發送參數
xcom_pull() 創建一個 XCom 可用于 task 接收參數
SchedulerJob
def?_execute(self):
"""
The?actual?scheduler?loop.?The?main?steps?in?the?loop?are:
#.?Harvest?DAG?parsing?results?through?DagFileProcessorAgent
#.?Find?and?queue?executable?tasks
#.?Change?task?instance?state?in?DB
#.?Queue?tasks?in?executor
#.?Heartbeat?executor
#.?Execute?queued?tasks?in?executor?ake_aware(execution_date,
self.task.dag.timezone)
"""
self.processor_agent?=?DagFileProcessorAgent()??#?通過檢查當前?processor?數量來控制進程個數
self.executor.start()
#?Start?after?resetting?orphaned?tasks?to?avoid?stressing?out?DB.
self.processor_agent.start()??#?在解析?DAG?文件時,只會對最近修改過的文件進行解析
execute_start_time?=?timezone.utcnow()
#?For?the?execute?duration,?parse?and?schedule?DAGs
while?(timezone.utcnow()?-?execute_start_time).total_seconds()?
self.run_duration?or?self.run_duration?
#?Starting?Loop...
self.processor_agent.heartbeat()??#?控制?DagFileProcessor?解析?DAG?文件的速度
#?Harvesting?DAG?parsing?results
simple_dags?=?self.processor_agent.harvest_simple_dags()
if?len(simple_dags)?>?0:
self._execute_task_instances()
...
#?Call?heartbeats
self.executor.heartbeat()
#?heartbeat()?中根據?parallelism?得出當前可用的?slots?數量,
#?決定?execute_async?多少個?task
#?Process?events?from?the?executor
self._process_executor_events(simple_dag_bag)
#?Ran?scheduling?loop?for?all?tasks?done
...
#?Stop?any?processors
self.processor_agent.terminate()
#?Verify?that?all?files?were?processed,?and?if?so,?deactivate?DAGs?that
#?haven't?been?touched?by?the?scheduler?as?they?likely?have?been
#?deleted.
...
self.executor.end()
method:
create_dag_run() 根據調度周期檢查是否需要為 DAG 創建新的 DagRun。如果已調度,則返回 DagRun,否則返回 None
process_file() 解析 DAG 定義文件
_execute_task_instances() 嘗試執行調度器調度過的 TaskInstances
There are three steps:
Pick TaskInstances by priority with the constraint that they are in the expected states and that we do exceed max_active_runs or pool limits.
Change the state for the TaskInstances above atomically.
Enqueue the TaskInstances in the executor.
reduce_in_chunks() 用來進行小的分批處理
總結本文在第一部分著重介紹了 Airflow 的理念、使用場景及其一般架構。
提供了相對簡單易懂的安裝及操作命令,并附帶了一個使用案例用來介紹代碼如何編排以及 WebUI 的使用。
在第二部分開篇介紹了 Airflow 任務創建、調度和管理的一些基礎概念,以及 Airflow 版本迭代的一些重要變化。Airflow 目前還是處于快速開發中,當前版本有很多遺留問題,版本升級也不是向后兼容的,變動很大。
Scheduler 毫無疑問是整個 Airflow 的核心模塊,邏輯結構復雜。本文從 Scheduler 模塊的主要邏輯入手,分析了控制循環和代碼結構,重點分析了從 dag.py 代碼文件到可調度執行的 TaskInstances 所經歷的階段;以及介紹了并發控制的實現和性能優化。
最后結合源碼介紹了 Airflow 核心類的模型定義和主要方法,以了解各個類所扮演的角色及其實現的功能。
參考https://zhuanlan.zhihu.com/p/90282578
總結
以上是生活随笔為你收集整理的airflow mysql_Airflow 使用及原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: debian php mysql 安装_
- 下一篇: mysql关于死锁的优化_mysql死锁