kafka 脚本发送_NWPC消息平台:在ecFlow系统中发送产品事件消息
本文屬于介紹?NWPC 消息平臺?系列文章。
本文介紹如何在基于 ecFlow 構建的數值預報業務系統中發送 NWPC 消息平臺的?產品事件消息。
介紹
數值預報業務系統產品制作一般分為三個步驟:
監視輸出:檢測模式積分任務是否輸出計算結果,使用單一任務循環檢查文件是否存在,一般運行在 HPC 的登錄節點
生成產品:根據模式輸出結果制作產品,一般運行在 HPC 的串行和并行計算節點中
分發產品:文件拷貝或 FTP 傳輸,需要與 HPC 外部通訊,一般運行在 HPC 的串行節點或登錄節點
數值預報業務系統產品制作流程
NWPC 消息平臺目前使用兩種產品事件消息,對應于上述不同步驟:
面向 NMC 監控平臺的表示 GRIB 2 產品已生成的消息,對應步驟 2
面向 NWPC 消息系統的表示 GRIB 2 產品已上傳到二級存儲的消息,對應步驟 3
數值預報業務系統根據事件消息的應用場景,使用不同方式發送以上兩種產品事件消息。
方案
本節介紹將事件消息發送集成到數值預報業務系統中的兩種方式。
腳本集成
將事件消息發送命令加入到已有任務腳本中,是最簡單的集成方式,無需對現有系統的結構進行修改。
GRIB 2 產品生成消息目前使用這種方式。該消息用于統計產品生成的時間,需要在產品生成后盡快發送。最直觀的方法就是將消息發送命令放到產品制作任務腳本的最后。但實踐中我們往往將產品生成與分發拆分為兩個相關聯的任務:產品制作任務只負責產品生成,不與系統外部進行交互;產品分發任務由產品制作任務驅動,與外部進行通訊。產品制作任務可能運行在無法與外界進行通訊的計算節點上。所以 NWPC 消息平臺將產品生成消息發送命令放到分發任務腳本的開頭,既符合將產品生成與分發相分離的要求,也能在第一時間發送消息,不受后續分發操作的影響。
這種方式存在一定的弊端。
(1) 如果產品分發任務執行失敗并重新運行,對于同一個產品文件,會產生多條消息記錄,需要后端應用對重復消息進行額外處理。
(2) 業務系統為了保證分發任務執行穩定,會限制同時運行的任務數。分發任務可能會因作業數限制而延遲啟動,影響產品生成消息按時發送。另外,某些極端情況下,上傳任務持續處于運行狀態,后續任務無法運行,導致產品生成消息無法發送。
消息發送任務
為產品事件創建單獨的發送任務可以解決消息重復發送的問題。
GRIB 2 產品完成上傳二級存儲的消息目前使用這種方式。該消息計劃用于驅動部署在氣象大數據云平臺加工流水線上的產品制作任務,使后續系統無需重復監視二級存儲中文件的到達情況。產品上傳任務一般運行在 HPC 的串行計算節點或其它專用傳輸節點上,受隊列節點數限制。而消息上傳任務運行時間通常在 2 秒以內,資源消耗較少,所以可以直接在 ecFlow 服務運行節點(即 HPC 登錄節點)上運行,不占用計算節點,不會因作業排隊而帶來額外的時間延遲。
這種方式也存在一定的弊端。
(1) 額外創建任務需要重新設計現有 ecFlow 系統,不方便進行集成。
(2) 如果為發送消息增加大量任務,可能會給整個 ecFlow 系統帶來額外負擔,包括處理依賴關系、生成作業腳本等,同時也不利于系統維護。
關鍵技術實現
消息命令行工具
NWPC 的數值預報業務系統任務腳本都是 shell 腳本,在 ecFlow 系統中發送產品事件消息最合適的方式就是使用命令行程序。
NWPC 消息平臺分別為 NMC 和 NWPC 兩套消息系統開發命令行程序,實現在 CMA-PI 上向消息中間件 Kafka 和 RabbitMQ 發送事件消息。
nmc-message-client
nmc_monitor_client send?命令使用?kafka-go?庫連接 NMC 監控平臺的 Kafka 服務,發送產品生成消息,支持多個 Kafka 節點。
https://github.com/segmentio/kafka-go
產品生成消息中的各個參數由命令參數提供:
source:系統名稱
type:消息類型
status:事件狀態,只發送完成狀態 (0)
file-name:文件名
absolute-data-name:文件絕對路徑
start-time:起報時次
forecast-time:預報時效
為了保證消息發送異常時不影響后面的產品上傳任務,增加?--ignore-error?選項忽略命令執行過程中的所有錯誤。
下面是 GRAPES GFS 后處理系統中發送產品生成消息的命令調用。
/g1/u/nwp_pd/nmc_message_client/bin/nmc_monitor_client send \ --target "host1:9092,host2:9092,host3:9092" \ --source nwpc_grapes_gfs \ --type prod_grib \ --status 0 \ --file-name gmf.gra.${init_time}${forecast_time}.grb2 \ --absolute-data-name ${run_dir}/output/grib2_orig/gmf.gra.${init_time}${forecast_time}.grb2 \ --start-time ${init_time} \ --forecast-time ${forecast_time} \ --ignore-error \ --debugnwpc-message-client
nwpc_message_client production?命令使用?amqp?庫向 NWPC 消息系統的 RabbitMQ 服務發送產品完成上傳消息。
https://github.com/streadway/amqp
產品完成上傳消息中的各個參數由命令參數提供:
system:系統名
production-stream:產品流
production-type:產品類型
production-name:產品名稱
event:事件名
start-time:起報時次
forecast-time:預報時效
下面是 GRAPES GFS 后處理系統中發送產品完成上傳二級存儲消息的命令調用。
/path/to/nwpc_message_client production \ --system grapes_gfs_gmf \ --production-stream oper \ --production-type grib2 \ --production-name orig \ --event storage \ --status complete \ --start-time ${init_time} \ --forecast-time "${forecast_hour}h" \ --rabbitmq-server ${NWPC_MESSAGE_CLIENT_RABBITMQ_ADDRESS} \ --broker-address ${NWPC_MESSAGE_CLIENT_BROKER_ADDRESSS}腳本集成
在上傳任務腳本中集成消息發送命令,為了保證消息發送與上傳任務互不影響,將發送命令放到腳本開頭,設置超時時間,并忽略發送命令的錯誤。使用與產品上傳一樣的標識變量控制是否發送消息,避免在測試期間發送無效消息。
if [ ${FLAG_UPLOAD_ORIG} == ".true." ]; then/g1/u/nwp_pd/nmc_message_client/bin/nmc_monitor_client send \ # ...skip commands...
fi
if [ ${FLAG_UPLOAD_ORIG} == ".true." ]; then
# do some ftp
fi
消息發送任務
單獨的消息發送任務由其他任務(比如產品上傳)觸發,在 ecFlow 服務所在的節點上直接運行,不占用計算資源。
GRAPES GFS 產品后處理系統中使用單獨的任務發送產品上傳消息
同樣也使用標識變量控制消息發送,并忽略發送消息時發生的任何錯誤。
if [ ${FLAG_STORAGE_ARCHIVE_GRIB2} == ".true." ]; thenecho "Send message to nwpc message broker and ignore any errors..."
set +e
source ${NWPC_MESSAGE_CLIENT_CONFIG_SCRIPT}
${NWPC_MESSAGE_CLIENT_BIN} production \ # ...skip commands...
set -e
fi
應用
目前已在 GRAPES 確定性模式系統中應用產品事件消息。其中 GRAPES GFS、GRAPES MESO 10KM、GRAPES 3KM 發送兩種消息,GRAPES TYM 僅發送產品上傳消息。
GRAPES 模式業務系統中產品事件消息發送的兩種方式
參考
NWPC 消息平臺項目
nwpc-oper/nwpc-message-client
https://github.com/nwpc-oper/nwpc-message-client
nwpc-oper/nmc-message-client
https://github.com/nwpc-oper/nmc-message-client
nwpc-oper/nwpc-message-tool
https://github.com/nwpc-oper/nmc-message-tool
產品事件消息
《NWPC消息平臺:產品事件消息》
《適用于NMC監控平臺的數值預報產品消息》
命名行工具
《使用Cobra構建命令行程序》
《動態解析命令行參數》
《使用kafka-go連接Kafka》
題圖由 renategranade0 在 Pixabay 上發布。
總結
以上是生活随笔為你收集整理的kafka 脚本发送_NWPC消息平台:在ecFlow系统中发送产品事件消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: for遍历list scala_面试官问
- 下一篇: 基于单片机自动升旗系统_基于视觉定位的机