日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

大数据调度平台Airflow(五):Airflow使用

發布時間:2023/11/28 生活经验 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据调度平台Airflow(五):Airflow使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Airflow使用

一、Airflow調度Shell命令

1.首先我們需要創建一個python文件,導入需要的類庫

2.實例化DAG

3、定義Task

4、設置task依賴關系

5、上傳python配置腳本

6、重啟Airflow

7、執行airflow

二、DAG調度觸發時間

三、DAG catchup 參數設置

四、DAG調度周期設置

五、DAG任務依賴設置

1、DAG任務依賴設置一

2、???????DAG任務依賴設置二

3、???????DAG任務依賴設置三

4、???????DAG任務依賴設置四

5、???????DAG任務依賴設置五


Airflow使用

上文說到使用Airflow進行任務調度大體步驟如下:

  • 創建python文件,根據實際需要,使用不同的Operator
  • 在python文件不同的Operator中傳入具體參數,定義一系列task
  • 在python文件中定義Task之間的關系,形成DAG
  • 將python文件上傳執行,調度DAG,每個task會形成一個Instance
  • 使用命令行或者WEBUI進行查看和管理

以上python文件就是Airflow python腳本,使用代碼方式指定DAG的結構

一、Airflow調度Shell命令

下面我們以調度執行shell命令為例,來講解Airflow使用。

1.首先我們需要創建一個python文件,導入需要的類庫

# 導入 DAG 對象,后面需要實例化DAG對象
from airflow import DAG# 導入BashOperator Operators,我們需要利用這個對象去執行流程
from airflow.operators.bash import BashOperator

注意:以上代碼可以在開發工具中創建,但是需要在使用的python3.7環境中導入安裝Airflow包。

D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

2.實例化DAG

from datetime import datetime, timedelta# default_args中定義一些參數,在實例化DAG時可以使用,使用python dic 格式定義
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2022, 3, 25),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)

注意:

  • 實例化DAG有三種方式

第一種方式:

with DAG("my_dag_name") as dag:op=XXOperator(task_id="task")

第二種方式(以上采用這種方式):

my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)

第三種方式:

@dag(start_date=days_ago(2))
def generate_dag():op = XXOperator(task_id="task")
dag = generate_dag()
  • baseoperator基礎參數說明:

可以參照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator查看baseopartor中更多參數。

  • DAG參數說明

可以參照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html

查看DAG參數說明,也可以直接在開發工具點擊DAG進入源碼看下對應參數有哪些。

3、定義Task

當實例化Operator時會生成Task任務,從一個Operator中實例化出來對象的過程被稱為一個構造方法,每個構造方法中都有“task_id”充當任務的唯一標識符。

下面我們定義三個Operator,也就是三個Task,每個task_id 不能重復。

# operator 支持多種類型, 這里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)

注意:

  • 每個operator中可以傳入對應的參數,覆蓋DAG默認的參數,例如:last task中“retries”=3 就替代了默認的1。任務參數的優先規則如下:①.顯示傳遞的參數 ②.default_args字典中存在的值③.operator的默認值(如果存在)。
  • BashOperator使用方式參照:http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html#howto-operator-bashoperator

4、設置task依賴關系

#使用 set_upstream、set_downstream 設置依賴關系,不能出現環形鏈路,否則報錯
# middle.set_upstream(first) # middle會在first執行完成之后執行
# last.set_upstream(middle) # last 會在 middle執行完成之后執行#也可以使用位移符來設置依賴關系
first >> middle >>last # first 首先執行,middle次之,last最后
# first >> [middle,last] # first首先執行,middle ,last并行執行

注意:當執行腳本時,如果在DAG中找到一條環形鏈路(例如:A->B->C-A)會引發異常。更多DAG task依賴關系可參照官網:http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies?

5、上傳python配置腳本

到目前為止,python配置如下:

# 導入 DAG 對象,后面需要實例化DAG對象
from airflow import DAG# 導入BashOperator Operators,我們需要利用這個對象去執行流程
from airflow.example_dags.example_bash_operator import dagfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# default_args中定義一些參數,在實例化DAG時可以使用,使用python dic 格式定義
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)# operator 支持多種類型, 這里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)#使用 set_upstream、set_downstream 設置依賴關系,不能出現環形鏈路,否則報錯
# middle.set_upstream(first) # middle會在first執行完成之后執行
# last.set_upstream(middle) # last 會在 middle執行完成之后執行#也可以使用位移符來設置依賴關系
first >> middle >>last # first 首先執行,middle次之,last最后
# first >> [middle,last] # first首先執行,middle ,last并行執行

?將以上python配置文件上傳到$AIRFLOW_HOME/dags目錄下,默認$AIRFLOW_HOME為安裝節點的“/root/airflow”目錄,當前目錄下的dags目錄需要手動創建。

6、重啟Airflow

“ps aux|grep webserver”和“ps aux|grep scheduler”找到對應的airflow進程殺掉,重新啟動Airflow。重啟之后,可以在airflow webui看到對應的DAG ID ”myairflow_execute_bash”。

7、執行airflow

按照如下步驟執行DAG,首先打開工作流,然后“Trigger DAG”執行,隨后可以看到任務執行成功。

查看task執行日志:

二、DAG調度觸發時間

在Airflow中,調度程序會根據DAG文件中指定的“start_date”和“schedule_interval”來運行DAG。特別需要注意的是Airflow計劃程序在計劃時間段的末尾觸發執行DAG,而不是在開始時刻觸發DAG,例如:

default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2022, 3, 25),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)

以上配置的DAG是從世界標準時間2022年3月24號開始調度,每隔1天執行一次,這個DAG的具體運行時間如下圖:?

自動調度DAG 執行日期

自動調度DAG實際執行觸發時間

2022-03-24,00:00:00+00:00

2022-03-25,00:00:00+00:00

2022-03-25,00:00:00+00:00

2022-03-26,00:00:00+00:00

2022-03-26,00:00:00+00:00

2022-03-27,00:00:00+00:00

2022-03-27,00:00:00+00:00

2022-03-28,00:00:00+00:00

2022-03-28,00:00:00+00:00

2022-03-29,00:00:00+00:00

... ...

... ...

以上表格中以第一條數據為例解釋,Airflow正常調度是每天00:00:00 ,假設當天日期為2022-03-24,正常我們認為只要時間到了2022-03-24 00:00:00 就會執行,改調度時間所處于的調度周期為2022-03-24 00:00:00 ~ 2022-03-25 00:00:00 ,在Airflow中實際上是在調度周期末端觸發執行,也就是說2022-03-24 00:00:00 自動觸發執行時刻為 2022-03-25 00:00:00。?

如下圖,在airflow中,“execution_date”不是實際運行時間,而是其計劃周期的開始時間戳。例如:execution_date 是2021-09-04 00:00:00 的DAG 自動調度運行的實際時間為2021-09-05 00:00:00。當然除了自動調度外,我們還可以手動觸發執行DAG執行,要判斷DAG運行時計劃調度(自動調度)還是手動觸發,可以查看“Run Type”。

三、DAG catchup 參數設置

在Airflow的工作計劃中,一個重要的概念就是catchup(追趕),在實現DAG具體邏輯后,如果將catchup設置為True(默認就為True),Airflow將“回填”所有過去的DAG run,如果將catchup設置為False,Airflow將從最新的DAG run時刻前一時刻開始執行 DAG run,忽略之前所有的記錄。

例如:現在某個DAG每隔1分鐘執行一次,調度開始時間為2001-01-01 ,當前日期為2021-10-01 15:23:21,如果catchup設置為True,那么DAG將從2001-01-01 00:00:00 開始每分鐘都會運行當前DAG。如果catchup 設置為False,那么DAG將從2021-10-01 15:22:20(當前2021-10-01 15:23:21前一時刻)開始執行DAG run。

舉例:有first ,second,third三個shell命令任務,按照順序調度,每隔1分鐘執行一次,首次執行時間為2000-01-01。

設置catchup 為True(默認),DAG python配置如下:

from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2001, 1, 1),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}
dag = DAG(dag_id = 'catchup_test1 ', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1), # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒catchup=True # 執行DAG時,將開始時間到目前所有該執行的任務都執行,默認為True
)first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)
middle = BashOperator(task_id='second',bash_command='echo "run second task"',dag=dag
)
last = BashOperator(task_id='third',bash_command='echo "run third task"',dag=dag,retries=3
)
first >> middle >>last

上傳python配置文件到$AIRFLOW_HOME/dags下,重啟airflow,DAG執行調度如下:

設置catchup 為False,DAG python配置如下:

from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2001, 1, 1),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}
dag = DAG(dag_id = 'catchup_test2', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1), # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒catchup=False # 執行DAG時,將開始時間到目前所有該執行的任務都執行,默認為True
)first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)
middle = BashOperator(task_id='second',bash_command='echo "run second task"',dag=dag
)
last = BashOperator(task_id='third',bash_command='echo "run third task"',dag=dag,retries=3
)
first >> middle >>last

上傳python配置文件到$AIRFLOW_HOME/dags下,重啟airflow,DAG執行調度如下:

有兩種方式在Airflow中配置catchup:

  • 全局配置

在airflow配置文件airflow.cfg的scheduler部分下,設置catchup_by_default=True(默認)或False,這個設置是全局性的設置。

  • DAG文件配置

在python代碼配置中設置DAG對象的參數:dag.catchup=True或False。

dag = DAG(dag_id = 'myairflow_execute_bash',
default_args = default_args,    
catchup=False,schedule_interval = timedelta(days=1))

四、DAG調度周期設置

每個DAG可以有或者沒有調度執行周期,如果有調度周期,我們可以在python代碼DAG配置中設置“schedule_interval”參數來指定調度DAG周期,可以通過以下三種方式來設置。

  • 預置的Cron調度

Airflow預置了一些Cron調度周期,可以參照:

DAG Runs — Airflow Documentation,如下圖:

?

在python配置文件中使用如下:

default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = '@daily' # 使用預置的Cron調度,每天0點0分調度

  • Cron

這種方式就是寫Linux系統的crontab定時任務命令,可以在https://crontab.guru/網站先生成對應的定時調度命令,其格式如下:

minute  hour  day  month  week
minute:表示分鐘,可以從0~59之間的任意整數。
hour:表示小時,可以是從0到23之間的任意整數。
day:表示日期,可以是1到31之間的任何整數。
month:表示月份,可以是從1到12之間的任何整數。
week:表示星期幾,可以是從0到7之間的任何整數,這里的0或7代表星期日。

以上各個字段中還可以使用特殊符號代表不同意思:

星號(*):代表所有可能的值,例如month字段如果是星號,則表示在滿足其它字段的制約條件后每月都執行該命令操作。
逗號(,):可以用逗號隔開的值指定一個列表范圍,例如,”1,2,5,7,8,9”
中杠(-):可以用整數之間的中杠表示一個整數范圍,例如”2-6”表示”2,3,4,5,6”
正斜線(/):可以用正斜線指定時間的間隔頻率,步長,例如”0-23/2”表示每兩小時執行一次。

在python配置文件中使用如下:

default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = '* * * * *' # 使用Crontab 定時任務命令,每分鐘運行一次
)

  • datetime.timedelta

timedelta是使用python timedelta 設置調度周期,可以配置天、周、小時、分鐘、秒、毫秒。在python配置文件中使用如下:

default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=5) # 使用python timedelta 設置調度周期,可以配置天、周、小時、分鐘、秒、毫秒
)

五、???????DAG任務依賴設置

1、???????DAG任務依賴設置一

  • DAG調度流程圖

  • task執行依賴
A >> B >>C
  • 完整代碼
'''
airflow 任務依賴關系設置一'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_1', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)A >> B >>C

2、???????DAG任務依賴設置二

  • DAG調度流程圖

  • task執行依賴
[A,B] >>C >>D
  • 完整代碼

'''
airflow 任務依賴關系設置二'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_2', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)[A,B] >>C >>D

3、???????DAG任務依賴設置三

  • DAG調度流程圖

  • task執行依賴
[A,B,C] >>D >>[E,F]
  • 完整代碼

'''
airflow 任務依賴關系設置三'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_3', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)F = BashOperator(task_id='F',bash_command='echo "run F task"',dag=dag
)[A,B,C] >>D >>[E,F]

4、???????DAG任務依賴設置四

  • DAG調度流程圖

?

  • task執行依賴
A >>B>>C>>D
A >>E>>F

  • 完整代碼
'''
airflow 任務依賴關系設置四'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_4', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)F = BashOperator(task_id='F',bash_command='echo "run F task"',dag=dag
)A >>[B,C,D]
A >>[E,F]

5、???????DAG任務依賴設置五

  • DAG調度流程圖

?

  • task執行依賴
A >>B>>E
C >>D>>E

  • 完整代碼
'''
airflow 任務依賴關系設置五'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22),  # 第一次開始執行的時間,為 UTC 時間'retries': 1,  # 失敗重試次數'retry_delay': timedelta(minutes=5),  # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_5', #DAG id ,必須完全由字母、數字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)A >>B>>E
C >>D>>E

  • 📢博客主頁:https://lansonli.blog.csdn.net
  • 📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正!
  • 📢本文由 Lansonli 原創,首發于 CSDN博客🙉
  • 📢大數據系列文章會每天更新,停下休息的時候不要忘了別人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活??

總結

以上是生活随笔為你收集整理的大数据调度平台Airflow(五):Airflow使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。