Airflow使用入门指南
Airflow能做什么
關(guān)注公眾號(hào), 查看更多 http://mp.weixin.qq.com/s/xPjXMc_6ssHt16J07BC7jA
Airflow是一個(gè)工作流分配管理系統(tǒng),通過有向非循環(huán)圖的方式管理任務(wù)流程,設(shè)置任務(wù)依賴關(guān)系和時(shí)間調(diào)度。
Airflow獨(dú)立于我們要運(yùn)行的任務(wù),只需要把任務(wù)的名字和運(yùn)行方式提供給Airflow作為一個(gè)task就可以。
安裝和使用
最簡(jiǎn)單安裝
在Linux終端運(yùn)行如下命令 (需要已安裝好python2.x和pip):
pip install airflow pip install "airflow[crypto, password]"安裝成功之后,執(zhí)行下面三步,就可以使用了。默認(rèn)是使用的SequentialExecutor, 只能順次執(zhí)行任務(wù)。
- 初始化數(shù)據(jù)庫(kù) airflow initdb [必須的步驟]
- 啟動(dòng)web服務(wù)器 airflow webserver -p 8080 [方便可視化管理dag]
- 啟動(dòng)任務(wù) airflow scheduler [scheduler啟動(dòng)后,DAG目錄下的dags就會(huì)根據(jù)設(shè)定的時(shí)間定時(shí)啟動(dòng)]
- 此外我們還可以直接測(cè)試單個(gè)DAG,如測(cè)試文章末尾的DAG airflow test ct1 print_date 2016-05-14
最新版本的Airflow可從https://github.com/apache/incubator-airflow下載獲得,解壓縮按照安裝python包的方式安裝。
配置 mysql以啟用LocalExecutor和CeleryExecutor
-
安裝mysql數(shù)據(jù)庫(kù)支持
yum install mysql mysql-server pip install airflow[mysql] -
設(shè)置mysql根用戶的密碼
ct@server:~/airflow: mysql -uroot #以root身份登錄mysql,默認(rèn)無(wú)密碼 mysql> SET PASSWORD=PASSWORD("passwd"); mysql> FLUSH PRIVILEGES; # 注意sql語(yǔ)句末尾的分號(hào) -
新建用戶和數(shù)據(jù)庫(kù)
# 新建名字為<airflow>的數(shù)據(jù)庫(kù) mysql> CREATE DATABASE airflow; # 新建用戶`ct`,密碼為`152108`, 該用戶對(duì)數(shù)據(jù)庫(kù)`airflow`有完全操作權(quán)限mysql> GRANT all privileges on airflow.* TO 'ct'@'localhost' IDENTIFIED BY '152108'; mysql> FLUSH PRIVILEGES; -
修改airflow配置文件支持mysql
-
airflow.cfg 文件通常在~/airflow目錄下
-
更改數(shù)據(jù)庫(kù)鏈接
sql_alchemy_conn = mysql://ct:152108@localhost/airflow 對(duì)應(yīng)字段解釋如下: dialect+driver://username:password@host:port/database -
初始化數(shù)據(jù)庫(kù) airflow initdb
-
初始化數(shù)據(jù)庫(kù)成功后,可進(jìn)入mysql查看新生成的數(shù)據(jù)表。
ct@server:~/airflow: mysql -uct -p152108 mysql> USE airflow; mysql> SHOW TABLES; +-------------------+ | Tables_in_airflow | +-------------------+ | alembic_version | | chart | | connection | | dag | | dag_pickle | | dag_run | | import_error | | job | | known_event | | known_event_type | | log | | sla_miss | | slot_pool | | task_instance | | users | | variable | | xcom | +-------------------+ 17 rows in set (0.00 sec)
-
-
centos7中使用mariadb取代了mysql, 但所有命令的執(zhí)行相同
yum install mariadb mariadb-server systemctl start mariadb ==> 啟動(dòng)mariadb systemctl enable mariadb ==> 開機(jī)自啟動(dòng) mysql_secure_installation ==> 設(shè)置 root密碼等相關(guān) mysql -uroot -p123456 ==> 測(cè)試登錄!
mariadb升級(jí)
curl -sS https://downloads.mariadb.com/MariaDB/mariadb_repo_setup | bash cat <<EOF >/etc/yum.repos.d/MariaDB.repo [mariadb] name = MariaDB-10.3.14 baseurl=http://yum.mariadb.org/10.3.14/centos7-amd64 # alternative: baseurl=http://archive.mariadb.org/mariadb-10.3.14/yum/centos7-amd64 gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB gpgcheck=1 EOF rpm --import https://yum.mariadb.org/RPM-GPG-KEY-MariaDB yum install MariaDB-server galera-4 MariaDB-client MariaDB-shared MariaDB-backup MariaDB-common配置LocalExecutor
注:作為測(cè)試使用,此步可以跳過, 最后的生產(chǎn)環(huán)境用的是CeleryExecutor; 若CeleryExecutor配置不方便,也可使用LocalExecutor。
前面數(shù)據(jù)庫(kù)已經(jīng)配置好了,所以如果想使用LocalExecutor就只需要修改airflow配置文件就可以了。airflow.cfg 文件通常在~/airflow目錄下,打開更改executor為 executor = LocalExecutor即完成了配置。
把文后TASK部分的dag文件拷貝幾個(gè)到~/airflow/dags目錄下,順次執(zhí)行下面的命令,然后打開網(wǎng)址http://127.0.0.1:8080就可以實(shí)時(shí)偵測(cè)任務(wù)動(dòng)態(tài)了:
ct@server:~/airflow: airflow initdb` (若前面執(zhí)行過,就跳過) ct@server:~/airflow: airflow webserver --debug & ct@server:~/airflow: airflow scheduler配置CeleryExecutor (rabbitmq支持)
-
安裝airflow的celery和rabbitmq組件
pip install airflow[celery] pip install airflow[rabbitmq] -
安裝erlang和rabbitmq
-
如果能直接使用yum或apt-get安裝則萬(wàn)事大吉。
-
我使用的CentOS6則不能,需要如下一番折騰,
# (Centos6,[REF](http://www.rabbitmq.com/install-rpm.html)) wget https://packages.erlang-solutions.com/erlang/esl-erlang/FLAVOUR_1_general/esl-erlang_18.3-1~centos~6_amd64.rpm yum install esl-erlang_18.3-1~centos~6_amd64.rpm wget https://github.com/jasonmcintosh/esl-erlang-compat/releases/download/1.1.1/esl-erlang-compat-18.1-1.noarch.rpm yum install esl-erlang-compat-18.1-1.noarch.rpm wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm yum install rabbitmq-server-3.6.1-1.noarch.rpm
-
-
配置rabbitmq
-
啟動(dòng)rabbitmq: rabbitmq-server -detached
-
開機(jī)啟動(dòng)rabbitmq: chkconfig rabbitmq-server on
-
配置rabbitmq (REF)
rabbitmqctl add_user ct 152108 rabbitmqctl add_vhost ct_airflow rabbitmqctl set_user_tags ct airflow rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*" rabbitmq-plugins enable rabbitmq_management # no usage
-
-
修改airflow配置文件支持Celery
-
airflow.cfg 文件通常在~/airflow目錄下
-
更改executor為 executor = CeleryExecutor
-
更改broker_url
broker_url = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host -
更改celery_result_backend,
# 可以與broker_url相同 celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host
-
-
測(cè)試
- 啟動(dòng)服務(wù)器:airflow webserver --debug
- 啟動(dòng)celery worker (不能用根用戶):airflow worker
- 啟動(dòng)scheduler: airflow scheduler
- 提示:
- 測(cè)試過程中注意觀察運(yùn)行上面3個(gè)命令的3個(gè)窗口輸出的日志
- 當(dāng)遇到不符合常理的情況時(shí)考慮清空 airflow backend的數(shù)據(jù)庫(kù), 可使用airflow resetdb清空。
- 刪除dag文件后,webserver中可能還會(huì)存在相應(yīng)信息,這時(shí)需要重啟webserver并刷新網(wǎng)頁(yè)。
- 關(guān)閉webserver: ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
一個(gè)腳本控制airflow系統(tǒng)的啟動(dòng)和重啟
#!/bin/bash#set -x #set -e set -uusage() { cat <<EOF ${txtcyn} Usage:$0 options${txtrst}${bldblu}Function${txtrst}:This script is used to start or restart webserver service.${txtbld}OPTIONS${txtrst}:-S Start airflow system [${bldred}Default FALSE${txtrst}]-s Restart airflow server only [${bldred}Default FALSE${txtrst}]-a Restart all airflow programs including webserver, worker andscheduler. [${bldred}Default FALSE${txtrst}] EOF }start_all= server_only= all=while getopts "hs:S:a:" OPTION docase $OPTION inh)usageexit 1;;S)start_all=$OPTARG;;s)server_only=$OPTARG;;a)all=$OPTARG;;?)usageexit 1;;esac doneif [ -z "$server_only" ] && [ -z "$all" ] && [ -z "${start_all}" ]; thenusageexit 1 fiif [ "$server_only" == "TRUE" ]; thenps -ef | grep -Ei '(airflow-webserver)' | grep master | \awk '{print $2}' | xargs -i kill {}cd ~/airflow/nohup airflow webserver >webserver.log 2>&1 & fiif [ "$all" == "TRUE" ]; thenps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}cd ~/airflow/nohup airflow webserver >>webserver.log 2>&1 &nohup airflow worker >>worker.log 2>&1 &nohup airflow scheduler >>scheduler.log 2>&1 & fiif [ "${start_all}" == "TRUE" ]; thencd ~/airflow/nohup airflow webserver >>webserver.log 2>&1 &nohup airflow worker >>worker.log 2>&1 &nohup airflow scheduler >>scheduler.log 2>&1 & fiairflow.cfg 其它配置
-
dags_folder
dags_folder目錄支持子目錄和軟連接,因此不同的dag可以分門別類的存儲(chǔ)起來(lái)。
-
設(shè)置郵件發(fā)送服務(wù)
smtp_host = smtp.163.com smtp_starttls = True smtp_ssl = False smtp_user = username@163.com smtp_port = 25 smtp_password = userpasswd smtp_mail_from = username@163.com -
多用戶登錄設(shè)置 (似乎只有CeleryExecutor支持)
-
修改airflow.cfg中的下面3行配置
authenticate = True auth_backend = airflow.contrib.auth.backends.password_auth filter_by_owner = True -
增加一個(gè)用戶(在airflow所在服務(wù)器的python下運(yùn)行)
import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'ehbio' user.email = 'mail@ehbio.com' user.password = 'ehbio' session = settings.Session() session.add(user) session.commit() session.close() exit()
-
TASK
-
參數(shù)解釋
-
depends_on_past
Airflow assumes idempotent tasks that operate on immutable data
chunks. It also assumes that all task instance (each task for each
schedule) needs to run.If your tasks need to be executed sequentially, you need to
tell Airflow: use the depends_on_past=True flag on the tasks
that require sequential execution.)如果在TASK本該運(yùn)行卻沒有運(yùn)行時(shí),或者設(shè)置的interval為@once時(shí),推薦使用depends_on_past=False。我在運(yùn)行dag時(shí),有時(shí)會(huì)出現(xiàn),明明上游任務(wù)已經(jīng)運(yùn)行結(jié)束,下游任務(wù)卻沒有啟動(dòng),整個(gè)dag就卡住了。這時(shí)設(shè)置depends_on_past=False可以解決這類問題。
-
timestamp in format like 2016-01-01T00:03:00
-
Task中調(diào)用的命令出錯(cuò)后需要在網(wǎng)站Graph view中點(diǎn)擊run手動(dòng)重啟。
為了方便任務(wù)修改后的順利運(yùn)行,有個(gè)折衷的方法是:- 設(shè)置 email_on_retry: True
- 設(shè)置較長(zhǎng)的retry_delay,方便在收到郵件后,能有時(shí)間做出處理
- 然后再修改為較短的retry_delay,方便快速啟動(dòng)
-
-
寫完task DAG后,一定記得先檢測(cè)下有無(wú)語(yǔ)法錯(cuò)誤 python dag.py
-
測(cè)試文件1:ct1.py
from airflow import DAG from airflow.operators import BashOperator, MySqlOperatorfrom datetime import datetime, timedeltaone_min_ago = datetime.combine(datetime.today() -timedelta(minutes=1), datetime.min.time())default_args = {'owner': 'airflow', #為了測(cè)試方便,起始時(shí)間一般為當(dāng)前時(shí)間減去schedule_interval'start_date': datatime(2016, 5, 29, 8, 30), 'email': ['chentong_biology@163.com'],'email_on_failure': False, 'email_on_retry': False, 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), #'queue': 'bash_queue',#'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), }# DAG id 'ct1'必須在airflow中是unique的, 一般與文件名相同 # 多個(gè)用戶時(shí)可加用戶名做標(biāo)記 dag = DAG('ct1', default_args=default_args,schedule_interval="@once")t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)#cmd = "/home/test/test.bash " 注意末尾的空格 t2 = BashOperator(task_id='echo', bash_command='echo "test" ', retries=3, dag=dag)templated_command = """{% for i in range(2) %}echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}"echo "{{ params.my_param }}"{% endfor %} """ t3 = BashOperator(task_id='templated', bash_command=templated_command, params={'my_param': "Parameter I passed in"}, dag=dag)# This means that t2 will depend on t1 running successfully to run # It is equivalent to t1.set_downstream(t2) t2.set_upstream(t1)t3.set_upstream(t1)# all of this is equivalent to # dag.set_dependency('print_date', 'sleep') # dag.set_dependency('print_date', 'templated') -
測(cè)試文件2: ct2.py
from airflow import DAG from airflow.operators import BashOperatorfrom datetime import datetime, timedeltaone_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1),datetime.min.time())default_args = {'owner': 'airflow', 'depends_on_past': True, 'start_date': one_min_ago,'email': ['chentong_biology@163.com'],'email_on_failure': True, 'email_on_retry': True, 'retries': 5, 'retry_delay': timedelta(hours=30), #'queue': 'bash_queue',#'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), }dag = DAG('ct2', default_args=default_args,schedule_interval="@once")t1 = BashOperator(task_id='run1', bash_command='(cd /home/ct/test; bash run1.sh -f ct_t1) ', dag=dag)t2 = BashOperator(task_id='run2', bash_command='(cd /home/ct/test; bash run2.sh -f ct_t1) ', dag=dag)t2.set_upstream(t1) -
run1.sh
#!/bin/bash#set -x set -e set -uusage() { cat <<EOF ${txtcyn} Usage:$0 options${txtrst}${bldblu}Function${txtrst}:This script is used to do ********************.${txtbld}OPTIONS${txtrst}:-f Data file ${bldred}[NECESSARY]${txtrst}-z Is there a header[${bldred}Default TRUE${txtrst}] EOF }file= header='TRUE'while getopts "hf:z:" OPTION docase $OPTION inh)usageexit 1;;f)file=$OPTARG;;z)header=$OPTARG;;?)usageexit 1;;esac doneif [ -z $file ]; thenusageexit 1 ficat <<END >$file A B C D E F G ENDsleep 20s -
run2.sh
#!/bin/bash#set -x set -e set -uusage() { cat <<EOF ${txtcyn} Usage:$0 options${txtrst}${bldblu}Function${txtrst}:This script is used to do ********************.${txtbld}OPTIONS${txtrst}:-f Data file ${bldred}[NECESSARY]${txtrst} EOF }file= header='TRUE'while getopts "hf:z:" OPTION docase $OPTION inh)usageexit 1;;f)file=$OPTARG;;?)usageexit 1;;esac doneif [ -z $file ]; thenusageexit 1 fiawk 'BEGIN{OFS=FS="\t"}{print $0, "53"}' $file >${file}.out
其它問題
-
The DagRun object has room for a conf parameter that gets exposed
in the “context” (templates, operators, …). That is the place
where you would associate parameters to a specific run. For now this
is only possible in the context of an externally triggered DAG run.
The way the TriggerDagRunOperator works, you can fill in the conf
param during the execution of the callable that you pass to the
operator.If you are looking to change the shape of your DAG through parameters,
we recommend doing that using “singleton” DAGs (using a “@once”
schedule_interval), meaning that you would write a
Python program that generates multiple dag_ids, one of each run,
probably based on metadata stored in a config file or elsewhere.The idea is that if you use parameters to alter the shape of your
DAG, you break some of the assumptions around continuity of the
schedule. Things like visualizing the tree view or how to perform a
backfill becomes unclear and mushy. So if the shape of your DAG
changes radically based on parameters, we consider those to be
different DAGs, and you generate each one in your pipeline file. -
完全刪掉某個(gè)DAG的信息
set @dag_id = 'BAD_DAG'; delete from airflow.xcom where dag_id = @dag_id; delete from airflow.task_instance where dag_id = @dag_id; delete from airflow.sla_miss where dag_id = @dag_id; delete from airflow.log where dag_id = @dag_id; delete from airflow.job where dag_id = @dag_id; delete from airflow.dag_run where dag_id = @dag_id; delete from airflow.dag where dag_id = @dag_id; -
supervisord自動(dòng)管理進(jìn)程
[program:airflow_webserver] command=/usr/local/bin/python2.7 /usr/local/bin/airflow webserver user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-webserver.err.log stdout_logfile=/var/log/airflow-webserver.out.log[program:airflow_worker] command=/usr/local/bin/python2.7 /usr/local/bin/airflow worker user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-worker.err.log stdout_logfile=/var/log/airflow-worker.out.log[program:airflow_scheduler] command=/usr/local/bin/python2.7 /usr/local/bin/airflow scheduler user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-scheduler.err.log stdout_logfile=/var/log/airflow-scheduler.out.log -
在特定情況下,修改DAG后,為了避免當(dāng)前日期之前任務(wù)的運(yùn)行,可以使用backfill填補(bǔ)特定時(shí)間段的任務(wù)
- airflow backfill -s START -e END --mark_success DAG_ID
端口轉(zhuǎn)發(fā)
-
之前的配置都是在內(nèi)網(wǎng)服務(wù)器進(jìn)行的,但內(nèi)網(wǎng)服務(wù)器只開放了SSH端口22,因此
我嘗試在另外一臺(tái)電腦上使用相同的配置,然后設(shè)置端口轉(zhuǎn)發(fā),把外網(wǎng)服務(wù)器
的rabbitmq的5672端口映射到內(nèi)網(wǎng)服務(wù)器的對(duì)應(yīng)端口,然后啟動(dòng)airflow連接
。-
ssh -v -4 -NF -R 5672:127.0.0.1:5672 aliyun
-
上一條命令表示的格式為
ssh -R <local port>:<remote host>:<remote port> <SSH hostname>
local port表示hostname的port
Remote connections from LOCALHOST:5672 forwarded to local address 127.0.0.1:5672
-
-v: 在測(cè)試時(shí)打開
-
-4: 出現(xiàn)錯(cuò)誤"bind: Cannot assign requested address"時(shí),force the
ssh client to use ipv4 -
若出現(xiàn)"Warning: remote port forwarding failed for listen port 52698"
,關(guān)掉其它的ssh tunnel。
-
不同機(jī)器使用airflow
- 在外網(wǎng)服務(wù)器(用做任務(wù)分發(fā)服務(wù)器)配置與內(nèi)網(wǎng)服務(wù)器相同的airflow模塊
- 使用前述的端口轉(zhuǎn)發(fā)以便外網(wǎng)服務(wù)器繞過內(nèi)網(wǎng)服務(wù)器的防火墻訪問rabbitmq 5672端口。
- 在外網(wǎng)服務(wù)器啟動(dòng) airflow webserver scheduler, 在內(nèi)網(wǎng)服務(wù)器啟動(dòng)
airflow worker 發(fā)現(xiàn)任務(wù)執(zhí)行狀態(tài)丟失。繼續(xù)學(xué)習(xí)Celery,以解決此問題。
安裝redis
- http://download.redis.io/releases/redis-3.2.0.tar.gz
- tar xvzf redis-3.2.0.tar.gz and make
- redis-server啟動(dòng)redis
- 使用ps -ef | grep 'redis'檢測(cè)后臺(tái)進(jìn)程是否存在
- 檢測(cè)6379端口是否在監(jiān)聽netstat -lntp | grep 6379
redis新版出現(xiàn)錯(cuò)誤解決方案
任務(wù)未按預(yù)期運(yùn)行可能的原因
- 檢查 start_date 和end_date是否在合適的時(shí)間范圍內(nèi)
- 檢查 airflow worker, airflow scheduler和
airflow webserver --debug的輸出,有沒有某個(gè)任務(wù)運(yùn)行異常 - 檢查airflow配置路徑中l(wèi)ogs文件夾下的日志輸出
- 若以上都沒有問題,則考慮數(shù)據(jù)沖突,解決方式包括清空數(shù)據(jù)庫(kù)或著給當(dāng)前
dag一個(gè)新的dag_id
References
聲明
文章原寫于http://blog.genesino.com/2016/05/airflow/。轉(zhuǎn)載請(qǐng)注明出處。
總結(jié)
以上是生活随笔為你收集整理的Airflow使用入门指南的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Big Sur恢复Catalina ?
- 下一篇: Audacity Mac版教程,使用Au