Airflow使用入门指南
Airflow能做什么
關注公眾號, 查看更多 http://mp.weixin.qq.com/s/xPjXMc_6ssHt16J07BC7jA
Airflow是一個工作流分配管理系統,通過有向非循環圖的方式管理任務流程,設置任務依賴關系和時間調度。
Airflow獨立于我們要運行的任務,只需要把任務的名字和運行方式提供給Airflow作為一個task就可以。
安裝和使用
最簡單安裝
在Linux終端運行如下命令 (需要已安裝好python2.x和pip):
pip install airflow pip install "airflow[crypto, password]"安裝成功之后,執行下面三步,就可以使用了。默認是使用的SequentialExecutor, 只能順次執行任務。
- 初始化數據庫 airflow initdb [必須的步驟]
- 啟動web服務器 airflow webserver -p 8080 [方便可視化管理dag]
- 啟動任務 airflow scheduler [scheduler啟動后,DAG目錄下的dags就會根據設定的時間定時啟動]
- 此外我們還可以直接測試單個DAG,如測試文章末尾的DAG airflow test ct1 print_date 2016-05-14
最新版本的Airflow可從https://github.com/apache/incubator-airflow下載獲得,解壓縮按照安裝python包的方式安裝。
配置 mysql以啟用LocalExecutor和CeleryExecutor
-
安裝mysql數據庫支持
yum install mysql mysql-server pip install airflow[mysql] -
設置mysql根用戶的密碼
ct@server:~/airflow: mysql -uroot #以root身份登錄mysql,默認無密碼 mysql> SET PASSWORD=PASSWORD("passwd"); mysql> FLUSH PRIVILEGES; # 注意sql語句末尾的分號 -
新建用戶和數據庫
# 新建名字為<airflow>的數據庫 mysql> CREATE DATABASE airflow; # 新建用戶`ct`,密碼為`152108`, 該用戶對數據庫`airflow`有完全操作權限mysql> GRANT all privileges on airflow.* TO 'ct'@'localhost' IDENTIFIED BY '152108'; mysql> FLUSH PRIVILEGES; -
修改airflow配置文件支持mysql
-
airflow.cfg 文件通常在~/airflow目錄下
-
更改數據庫鏈接
sql_alchemy_conn = mysql://ct:152108@localhost/airflow 對應字段解釋如下: dialect+driver://username:password@host:port/database -
初始化數據庫 airflow initdb
-
初始化數據庫成功后,可進入mysql查看新生成的數據表。
ct@server:~/airflow: mysql -uct -p152108 mysql> USE airflow; mysql> SHOW TABLES; +-------------------+ | Tables_in_airflow | +-------------------+ | alembic_version | | chart | | connection | | dag | | dag_pickle | | dag_run | | import_error | | job | | known_event | | known_event_type | | log | | sla_miss | | slot_pool | | task_instance | | users | | variable | | xcom | +-------------------+ 17 rows in set (0.00 sec)
-
-
centos7中使用mariadb取代了mysql, 但所有命令的執行相同
yum install mariadb mariadb-server systemctl start mariadb ==> 啟動mariadb systemctl enable mariadb ==> 開機自啟動 mysql_secure_installation ==> 設置 root密碼等相關 mysql -uroot -p123456 ==> 測試登錄!
mariadb升級
curl -sS https://downloads.mariadb.com/MariaDB/mariadb_repo_setup | bash cat <<EOF >/etc/yum.repos.d/MariaDB.repo [mariadb] name = MariaDB-10.3.14 baseurl=http://yum.mariadb.org/10.3.14/centos7-amd64 # alternative: baseurl=http://archive.mariadb.org/mariadb-10.3.14/yum/centos7-amd64 gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB gpgcheck=1 EOF rpm --import https://yum.mariadb.org/RPM-GPG-KEY-MariaDB yum install MariaDB-server galera-4 MariaDB-client MariaDB-shared MariaDB-backup MariaDB-common配置LocalExecutor
注:作為測試使用,此步可以跳過, 最后的生產環境用的是CeleryExecutor; 若CeleryExecutor配置不方便,也可使用LocalExecutor。
前面數據庫已經配置好了,所以如果想使用LocalExecutor就只需要修改airflow配置文件就可以了。airflow.cfg 文件通常在~/airflow目錄下,打開更改executor為 executor = LocalExecutor即完成了配置。
把文后TASK部分的dag文件拷貝幾個到~/airflow/dags目錄下,順次執行下面的命令,然后打開網址http://127.0.0.1:8080就可以實時偵測任務動態了:
ct@server:~/airflow: airflow initdb` (若前面執行過,就跳過) ct@server:~/airflow: airflow webserver --debug & ct@server:~/airflow: airflow scheduler配置CeleryExecutor (rabbitmq支持)
-
安裝airflow的celery和rabbitmq組件
pip install airflow[celery] pip install airflow[rabbitmq] -
安裝erlang和rabbitmq
-
如果能直接使用yum或apt-get安裝則萬事大吉。
-
我使用的CentOS6則不能,需要如下一番折騰,
# (Centos6,[REF](http://www.rabbitmq.com/install-rpm.html)) wget https://packages.erlang-solutions.com/erlang/esl-erlang/FLAVOUR_1_general/esl-erlang_18.3-1~centos~6_amd64.rpm yum install esl-erlang_18.3-1~centos~6_amd64.rpm wget https://github.com/jasonmcintosh/esl-erlang-compat/releases/download/1.1.1/esl-erlang-compat-18.1-1.noarch.rpm yum install esl-erlang-compat-18.1-1.noarch.rpm wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm yum install rabbitmq-server-3.6.1-1.noarch.rpm
-
-
配置rabbitmq
-
啟動rabbitmq: rabbitmq-server -detached
-
開機啟動rabbitmq: chkconfig rabbitmq-server on
-
配置rabbitmq (REF)
rabbitmqctl add_user ct 152108 rabbitmqctl add_vhost ct_airflow rabbitmqctl set_user_tags ct airflow rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*" rabbitmq-plugins enable rabbitmq_management # no usage
-
-
修改airflow配置文件支持Celery
-
airflow.cfg 文件通常在~/airflow目錄下
-
更改executor為 executor = CeleryExecutor
-
更改broker_url
broker_url = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host -
更改celery_result_backend,
# 可以與broker_url相同 celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host
-
-
測試
- 啟動服務器:airflow webserver --debug
- 啟動celery worker (不能用根用戶):airflow worker
- 啟動scheduler: airflow scheduler
- 提示:
- 測試過程中注意觀察運行上面3個命令的3個窗口輸出的日志
- 當遇到不符合常理的情況時考慮清空 airflow backend的數據庫, 可使用airflow resetdb清空。
- 刪除dag文件后,webserver中可能還會存在相應信息,這時需要重啟webserver并刷新網頁。
- 關閉webserver: ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
一個腳本控制airflow系統的啟動和重啟
#!/bin/bash#set -x #set -e set -uusage() { cat <<EOF ${txtcyn} Usage:$0 options${txtrst}${bldblu}Function${txtrst}:This script is used to start or restart webserver service.${txtbld}OPTIONS${txtrst}:-S Start airflow system [${bldred}Default FALSE${txtrst}]-s Restart airflow server only [${bldred}Default FALSE${txtrst}]-a Restart all airflow programs including webserver, worker andscheduler. [${bldred}Default FALSE${txtrst}] EOF }start_all= server_only= all=while getopts "hs:S:a:" OPTION docase $OPTION inh)usageexit 1;;S)start_all=$OPTARG;;s)server_only=$OPTARG;;a)all=$OPTARG;;?)usageexit 1;;esac doneif [ -z "$server_only" ] && [ -z "$all" ] && [ -z "${start_all}" ]; thenusageexit 1 fiif [ "$server_only" == "TRUE" ]; thenps -ef | grep -Ei '(airflow-webserver)' | grep master | \awk '{print $2}' | xargs -i kill {}cd ~/airflow/nohup airflow webserver >webserver.log 2>&1 & fiif [ "$all" == "TRUE" ]; thenps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}cd ~/airflow/nohup airflow webserver >>webserver.log 2>&1 &nohup airflow worker >>worker.log 2>&1 &nohup airflow scheduler >>scheduler.log 2>&1 & fiif [ "${start_all}" == "TRUE" ]; thencd ~/airflow/nohup airflow webserver >>webserver.log 2>&1 &nohup airflow worker >>worker.log 2>&1 &nohup airflow scheduler >>scheduler.log 2>&1 & fiairflow.cfg 其它配置
-
dags_folder
dags_folder目錄支持子目錄和軟連接,因此不同的dag可以分門別類的存儲起來。
-
設置郵件發送服務
smtp_host = smtp.163.com smtp_starttls = True smtp_ssl = False smtp_user = username@163.com smtp_port = 25 smtp_password = userpasswd smtp_mail_from = username@163.com -
多用戶登錄設置 (似乎只有CeleryExecutor支持)
-
修改airflow.cfg中的下面3行配置
authenticate = True auth_backend = airflow.contrib.auth.backends.password_auth filter_by_owner = True -
增加一個用戶(在airflow所在服務器的python下運行)
import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'ehbio' user.email = 'mail@ehbio.com' user.password = 'ehbio' session = settings.Session() session.add(user) session.commit() session.close() exit()
-
TASK
-
參數解釋
-
depends_on_past
Airflow assumes idempotent tasks that operate on immutable data
chunks. It also assumes that all task instance (each task for each
schedule) needs to run.If your tasks need to be executed sequentially, you need to
tell Airflow: use the depends_on_past=True flag on the tasks
that require sequential execution.)如果在TASK本該運行卻沒有運行時,或者設置的interval為@once時,推薦使用depends_on_past=False。我在運行dag時,有時會出現,明明上游任務已經運行結束,下游任務卻沒有啟動,整個dag就卡住了。這時設置depends_on_past=False可以解決這類問題。
-
timestamp in format like 2016-01-01T00:03:00
-
Task中調用的命令出錯后需要在網站Graph view中點擊run手動重啟。
為了方便任務修改后的順利運行,有個折衷的方法是:- 設置 email_on_retry: True
- 設置較長的retry_delay,方便在收到郵件后,能有時間做出處理
- 然后再修改為較短的retry_delay,方便快速啟動
-
-
寫完task DAG后,一定記得先檢測下有無語法錯誤 python dag.py
-
測試文件1:ct1.py
from airflow import DAG from airflow.operators import BashOperator, MySqlOperatorfrom datetime import datetime, timedeltaone_min_ago = datetime.combine(datetime.today() -timedelta(minutes=1), datetime.min.time())default_args = {'owner': 'airflow', #為了測試方便,起始時間一般為當前時間減去schedule_interval'start_date': datatime(2016, 5, 29, 8, 30), 'email': ['chentong_biology@163.com'],'email_on_failure': False, 'email_on_retry': False, 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), #'queue': 'bash_queue',#'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), }# DAG id 'ct1'必須在airflow中是unique的, 一般與文件名相同 # 多個用戶時可加用戶名做標記 dag = DAG('ct1', default_args=default_args,schedule_interval="@once")t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)#cmd = "/home/test/test.bash " 注意末尾的空格 t2 = BashOperator(task_id='echo', bash_command='echo "test" ', retries=3, dag=dag)templated_command = """{% for i in range(2) %}echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}"echo "{{ params.my_param }}"{% endfor %} """ t3 = BashOperator(task_id='templated', bash_command=templated_command, params={'my_param': "Parameter I passed in"}, dag=dag)# This means that t2 will depend on t1 running successfully to run # It is equivalent to t1.set_downstream(t2) t2.set_upstream(t1)t3.set_upstream(t1)# all of this is equivalent to # dag.set_dependency('print_date', 'sleep') # dag.set_dependency('print_date', 'templated') -
測試文件2: ct2.py
from airflow import DAG from airflow.operators import BashOperatorfrom datetime import datetime, timedeltaone_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1),datetime.min.time())default_args = {'owner': 'airflow', 'depends_on_past': True, 'start_date': one_min_ago,'email': ['chentong_biology@163.com'],'email_on_failure': True, 'email_on_retry': True, 'retries': 5, 'retry_delay': timedelta(hours=30), #'queue': 'bash_queue',#'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), }dag = DAG('ct2', default_args=default_args,schedule_interval="@once")t1 = BashOperator(task_id='run1', bash_command='(cd /home/ct/test; bash run1.sh -f ct_t1) ', dag=dag)t2 = BashOperator(task_id='run2', bash_command='(cd /home/ct/test; bash run2.sh -f ct_t1) ', dag=dag)t2.set_upstream(t1) -
run1.sh
#!/bin/bash#set -x set -e set -uusage() { cat <<EOF ${txtcyn} Usage:$0 options${txtrst}${bldblu}Function${txtrst}:This script is used to do ********************.${txtbld}OPTIONS${txtrst}:-f Data file ${bldred}[NECESSARY]${txtrst}-z Is there a header[${bldred}Default TRUE${txtrst}] EOF }file= header='TRUE'while getopts "hf:z:" OPTION docase $OPTION inh)usageexit 1;;f)file=$OPTARG;;z)header=$OPTARG;;?)usageexit 1;;esac doneif [ -z $file ]; thenusageexit 1 ficat <<END >$file A B C D E F G ENDsleep 20s -
run2.sh
#!/bin/bash#set -x set -e set -uusage() { cat <<EOF ${txtcyn} Usage:$0 options${txtrst}${bldblu}Function${txtrst}:This script is used to do ********************.${txtbld}OPTIONS${txtrst}:-f Data file ${bldred}[NECESSARY]${txtrst} EOF }file= header='TRUE'while getopts "hf:z:" OPTION docase $OPTION inh)usageexit 1;;f)file=$OPTARG;;?)usageexit 1;;esac doneif [ -z $file ]; thenusageexit 1 fiawk 'BEGIN{OFS=FS="\t"}{print $0, "53"}' $file >${file}.out
其它問題
-
The DagRun object has room for a conf parameter that gets exposed
in the “context” (templates, operators, …). That is the place
where you would associate parameters to a specific run. For now this
is only possible in the context of an externally triggered DAG run.
The way the TriggerDagRunOperator works, you can fill in the conf
param during the execution of the callable that you pass to the
operator.If you are looking to change the shape of your DAG through parameters,
we recommend doing that using “singleton” DAGs (using a “@once”
schedule_interval), meaning that you would write a
Python program that generates multiple dag_ids, one of each run,
probably based on metadata stored in a config file or elsewhere.The idea is that if you use parameters to alter the shape of your
DAG, you break some of the assumptions around continuity of the
schedule. Things like visualizing the tree view or how to perform a
backfill becomes unclear and mushy. So if the shape of your DAG
changes radically based on parameters, we consider those to be
different DAGs, and you generate each one in your pipeline file. -
完全刪掉某個DAG的信息
set @dag_id = 'BAD_DAG'; delete from airflow.xcom where dag_id = @dag_id; delete from airflow.task_instance where dag_id = @dag_id; delete from airflow.sla_miss where dag_id = @dag_id; delete from airflow.log where dag_id = @dag_id; delete from airflow.job where dag_id = @dag_id; delete from airflow.dag_run where dag_id = @dag_id; delete from airflow.dag where dag_id = @dag_id; -
supervisord自動管理進程
[program:airflow_webserver] command=/usr/local/bin/python2.7 /usr/local/bin/airflow webserver user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-webserver.err.log stdout_logfile=/var/log/airflow-webserver.out.log[program:airflow_worker] command=/usr/local/bin/python2.7 /usr/local/bin/airflow worker user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-worker.err.log stdout_logfile=/var/log/airflow-worker.out.log[program:airflow_scheduler] command=/usr/local/bin/python2.7 /usr/local/bin/airflow scheduler user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-scheduler.err.log stdout_logfile=/var/log/airflow-scheduler.out.log -
在特定情況下,修改DAG后,為了避免當前日期之前任務的運行,可以使用backfill填補特定時間段的任務
- airflow backfill -s START -e END --mark_success DAG_ID
端口轉發
-
之前的配置都是在內網服務器進行的,但內網服務器只開放了SSH端口22,因此
我嘗試在另外一臺電腦上使用相同的配置,然后設置端口轉發,把外網服務器
的rabbitmq的5672端口映射到內網服務器的對應端口,然后啟動airflow連接
。-
ssh -v -4 -NF -R 5672:127.0.0.1:5672 aliyun
-
上一條命令表示的格式為
ssh -R <local port>:<remote host>:<remote port> <SSH hostname>
local port表示hostname的port
Remote connections from LOCALHOST:5672 forwarded to local address 127.0.0.1:5672
-
-v: 在測試時打開
-
-4: 出現錯誤"bind: Cannot assign requested address"時,force the
ssh client to use ipv4 -
若出現"Warning: remote port forwarding failed for listen port 52698"
,關掉其它的ssh tunnel。
-
不同機器使用airflow
- 在外網服務器(用做任務分發服務器)配置與內網服務器相同的airflow模塊
- 使用前述的端口轉發以便外網服務器繞過內網服務器的防火墻訪問rabbitmq 5672端口。
- 在外網服務器啟動 airflow webserver scheduler, 在內網服務器啟動
airflow worker 發現任務執行狀態丟失。繼續學習Celery,以解決此問題。
安裝redis
- http://download.redis.io/releases/redis-3.2.0.tar.gz
- tar xvzf redis-3.2.0.tar.gz and make
- redis-server啟動redis
- 使用ps -ef | grep 'redis'檢測后臺進程是否存在
- 檢測6379端口是否在監聽netstat -lntp | grep 6379
redis新版出現錯誤解決方案
任務未按預期運行可能的原因
- 檢查 start_date 和end_date是否在合適的時間范圍內
- 檢查 airflow worker, airflow scheduler和
airflow webserver --debug的輸出,有沒有某個任務運行異常 - 檢查airflow配置路徑中logs文件夾下的日志輸出
- 若以上都沒有問題,則考慮數據沖突,解決方式包括清空數據庫或著給當前
dag一個新的dag_id
References
聲明
文章原寫于http://blog.genesino.com/2016/05/airflow/。轉載請注明出處。
總結
以上是生活随笔為你收集整理的Airflow使用入门指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Big Sur恢复Catalina ?
- 下一篇: Audacity Mac版教程,使用Au