Storm学习二
為什么80%的碼農都做不了架構師?>>>
?
Nimbus
功能:對Topology任務進行分配調度,接收用戶的命令做相應的處理,submit,kill,activate,deactivate,rebalance
nimbus數據結構
java數據結構和Clojure數據結構
Nimbus中除了主服務線程之外,還有個計時器線程
作用如下:
1調用mk-assignment啟動新一輪的任務分配,調用do-cleanup方法清理storm元數據,操作每隔(NIMBUS-MONITOR-FREQ-SECS)10秒執行一次。
2調用clean-inbox方法清理nimbus本地目錄中topology的jar包(Cleanup-inbox-freq-secs 600s)執行一次
3執行topology的狀態轉移事件kill,rebalance等
?
mk-assignments方法會將所有的分配信息保存或更新到zookeeper中,supervisor會周期性地檢查和分配這些信息,并根據這些信息做相應的調度處理
executor->node+port信息
do-cleanup
clean-inbox 清除本地目錄
Topology狀態轉移
transition!-name->transition!
delay-event方法表示延遲一段時間后再處理轉移事件,參數包括nimbus-data,storm-id,延遲執行的時間及轉移事件。schedule方法
kill-transition方法定義了一個方法參數為kill-time;
rebalance-transition也返回了一個方法
?
啟動Nimbus服務
涉及兩個方法
launch-server!定義了核心的處理邏輯,啟動nimbus服務
service-handler 方法是nimbus真正處理請求的地方,定義了一些數據結構,以及用于啟動任務調度和數據清理的線程,它還會返回一個實現了Nimbus$Iface接口,Shutdownable接口以及DaemonCommon接口的對象,nimbus-data方法構建nimbus數據結構,調用cleanup-corrupt-topologies!方法清除哪些在ZooKeeper上還有元數據但在nimbus本地目錄中沒有對應文件夾的Topology,將它們遺留在ZooKepper中的記錄徹底刪除。
將當前所有處于活躍狀態的Topology調用transition!方法,設置Topology的狀態:start-up
?
關閉Nimbus服務
包括殺掉計時器線程,釋放zookeeper連接,以及清除nimbus-data中上傳下載的緩存。
?
?
Nimbus主要服務方法:
Topology的提交
submitTopology:提交一個新的Topology,并為topology創建topology-id設置一些必要的元數據,最后用mk-assignments方法為Topology分配任務
?
jar文件的上傳與下載
nimbus作為服務器,一方面接收用戶提交的Topology jar 包,另一方面還要向supervisor下達任務分配的jar包.
?
文件上傳beginFileUpload,uploadChunk finishFileUpload
文件下載beginFileDownload和downloadChunk
?
UI信息
Nimbus服務器本身記錄了當前集群的任務和調度信息
getClusterInfo當前集群的統計信息 :系統的資源占用情況,Nimbus服務運行了多少時間,以及當前系統中所有Topology的運行統計;
<supervisor-id,SupervisorInfo>信息構造supervisorSummary對象,參數分別為主機名,啟動時間,所有可用的端口數目,使用的端口號的數目以及supervisor-id,最后返回一個SupervisorSummary集合
<topology-id,stormBase>集合
根據topology-id獲取其任務分配信息,構建TopologySummary對象,其參數依次為topology-id,storm-name,所有的Task數目,所有的Executor數目,所有被占用的slot數目;
根據supervisorSummary集合,nimbus的啟動時間以及TopologySummary集合,創建ClusterSummary對象并返回。
?
獲得storm配置項和topology對象獲取等基本工作
getNimbusConf直接返回JSON序列化后的nimbus-data中保存的nimbus使用的storm配置項
getTopology方法獲得系統中所有的topology信息
?
?
?
?
輔助方法
system-topology!
驗證提交的topology,同時添加系統組件和流
?
normalize-topology
計算提交的Topology中每個組件的并行度并更新該組件的Topology-tasks配置項
?
component-parallelism方法,用來計算組件并行度
?
compute-new-topology->executor->node+por
根據系統當前已經存在的分配情況,結合當前系統的運行情況找出需要進行任務分配的Topology集合,并為他們分配任務。
即<topology-id,<executor,[node,port]>>每個topology對應的任務分配情況,計算出新的集合結果
?
compute-executors根據當前topology設置的組件的并行度創建對應的executor.
nimbus:nimbus-data對象
storm-id:topology-id
?
?
?
?
Scheduler
是storm調度器,為topology分配當前集群中可用的資源
IScheduler接口
prepare方法
scheduler方法
?
Storm提供了3種scheduler-EvenScheduler,DefaultScheduler和IsolationScheduler;
?
?
EvenScheduler:將可用資源均勻地分配給當前小任務分配的多個Topology;
?
DefaultScheduler:是Storm默認的任務調度器首先釋放掉其他topology不再需要的資源,然后調用evenScheduler方法為topology均勻分配資源;
?
IsolationScheduler:提供一種機制來確保集群中的某些Topology有足夠的運行資源,可以單獨為某個Topology指定需要的資源;
sort-slots資源列表排序
?
Supervisor
可以理解為單擊任務調度器,負責箭筒nimbus的任務調度器,啟動相應的worker對nimbus分配的任務進行處理,同時也會監聽由他啟動的worker的工作狀態
?
與supervisor相關的數據結構
standalone-supervisor方法:返回一個實現了 ISupervisor接口的對象,獲取和創建supervisor的id
supervisor-data方法:定義了整個supervisor代碼共享數據結構,很多常用的成員變量
?
本地數據存儲,使用LocalState在本地保存相關的信息,LocalState保存重要的數據,保證supervisor失敗重啟后能夠正常運行
?
?
1 supervisor id
2 localAssigment
3 Approved Workers有效的<work-id,port>映射集合
?
?
Supervisor中的線程
計時器線程和兩個時間線程。
計數器線程負責維持心跳,得到各個Supervisor的最新狀態,同時也負責每隔一段時間將事件線程要執行的時間添加到對應的隊列中。
同步nimbus任務的線程通過不斷執行mk-synchronize-supervisor函數來保證supervisor與nimbus的任務同步,獲取新的任務,移除舊任務
管理Worker進程的線程
?
?
啟動Supervisor
通過mk-supervisor方法來啟動服務Supervisor
關閉Supervisor
將運行狀態設置為false,關閉計時器線程,關閉Supervisor與Nimbus同步任務的線程,關閉管理Worker的線程,釋放掉與ZooJeeper的連接
?
?
幾個Supervisor中重要的輔助方法
launch-worker啟動worker進程,分分布式和本地模式
read-allocated-workers 用于獲得worker及其對應的心跳信息,并根據心跳信息判斷worker狀態
wait-for-worker-launch啟動worker時被調用,保證直到Worker成功啟動起來后才返回
shutdown-worker該方法用于關閉Worker進程并清理Worker的本地文件夾
download-storm-code這個方法用于從Nimbus下載與分配給當前Supervisor的任務相對應的Topology信息。跟launch-worker方法類似,該方法也有兩種模式— Local模式和分布式模式
?
?
?
?
轉載于:https://my.oschina.net/iioschina/blog/812358
總結
- 上一篇: Linux操作系统安装---centos
- 下一篇: centos7开放端口