Storm中并行度原来是这样计算的(1.0.1版本)
==思考問題1==
向集群提交一個(gè)拓?fù)涞臅r(shí)候,Storm是如何計(jì)算Task數(shù)以及Executor數(shù)的?
具體有多少個(gè)worker,多少個(gè)executor,每個(gè)executor負(fù)責(zé)多少個(gè)task?
?
==思考問題2:==
構(gòu)建拓?fù)涞臅r(shí)候,有3個(gè)地方會(huì)影響并行度,這3個(gè)地方之間有什么關(guān)系?
builder.setSpout("spout", new RandomSentenceSpout(), 5); //parallelism-hint builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1); builder.setSpout("spout", new RandomSentenceSpout(), 5).setMaxTaskParallelism(1);?
==3個(gè)參數(shù)的信息==
1、parallelism-hint:
構(gòu)建拓?fù)鋾r(shí),可以通過setSpout或setBolt的函數(shù)參數(shù)中指定。為初始executor數(shù)。
如:builder.setSpout("spout", new RandomSentenceSpout(), 5);
?
2、?TOPOLOGY-TASKS:
構(gòu)建拓?fù)鋾r(shí),通過Spout/Bolt的setNumTasks()方法來指定。為component的task數(shù)(Spout或Bolt)。
如:builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1);
?
3、TOPOLOGY-MAX-TASK-PARALLELISM:
構(gòu)建拓?fù)鋾r(shí),通過Spout/Bolt的setMaxTaskParallelism()方法來指定。為component的最大并行度。通常用于測試,在本地模式時(shí)使用。
如:builder.setSpout("spout", new RandomSentenceSpout(), 5).setMaxTaskParallelism(1);
?
==結(jié)論1:Executor數(shù)是多少?==
對應(yīng)topology代碼中, 為每個(gè)component指定的parallelism-hint數(shù)(通過setBolt和setSpout的參數(shù))
?
==結(jié)論2:Task數(shù)是多少?==
版本號:apache-storm-1.0.1
代碼路徑:org/apache/storm/daemon/nimbus.clj
?
?
這里有一個(gè)函數(shù)非常重要,看了之后上面的3個(gè)關(guān)系多少會(huì)清晰很多。
該函數(shù)返回計(jì)算之后的真實(shí)的Task數(shù)。
(defn- component-parallelism [storm-conf component](let [storm-conf (merge storm-conf (component-conf component))num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)](if max-parallelism(min max-parallelism num-tasks)num-tasks)))?
這個(gè)代碼是用clojure語言編寫的,沒有用過的人估計(jì)會(huì)非常蛋疼,
為了方便理解,用偽代碼(方便理解)翻譯之后,大概思路是這個(gè)樣子的:
num-tasks = (TOPOLOGY-TASKS != null ? TOPOLOGY-TASKS : parallelism-hint); max-parallelism = TOPOLOGY-MAX-TASK-PARALLELISM;if (max-parallelism != null) {//取兩者較小return min(num-tasks, max-parallelism); } else { return num-tasks; }?
如果將3個(gè)參數(shù)進(jìn)行排列組合之后,獲得結(jié)果如下:
?
簡單理解來說:
1、暫時(shí)不考慮TOPOLOGY-MAX-TASK-PARALLELIS。(測試用的玩意兒,弄出來影響思路)
2、TOPOLOGY-TASKS優(yōu)先于parallelism-hint。
?
==Executor與Task是如何匹配的?==
下面的代碼是分配的代碼
(defn- compute-executors [nimbus storm-id](let [conf (:conf nimbus)blob-store (:blob-store nimbus)storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)component->executors (:component->executors storm-base)storm-conf (read-storm-conf-as-nimbus storm-id blob-store)topology (read-storm-topology-as-nimbus storm-id blob-store)task->component (storm-task-info topology storm-conf)](->> (storm-task-info topology storm-conf)reverse-map(map-val sort)(join-maps component->executors)(map-val (partial apply partition-fixed))(mapcat second)(map to-executor-id))))?
理解這個(gè)代碼之前,我們首先把注意力放在storm-task-info這個(gè)函數(shù)上,看看它都干了些什么。
代碼位置:org/apache/storm/daemon/common.clj
(defn storm-task-info"Returns map from task -> component id"[^StormTopology user-topology storm-conf](->> (system-topology! storm-conf user-topology)all-components(map-val (comp #(get % TOPOLOGY-TASKS) component-conf))(sort-by first) (mapcat (fn [[c num-tasks]] (repeat num-tasks c))) (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1))) (into {}) ))?
來看看廣大網(wǎng)友的解讀版。參考博客:https://www.cnblogs.com/ierbar0604/p/4386480.html
這個(gè)函數(shù), 首先讀出所有components ,對每個(gè)component, 讀出TOPOLOGY-TASKS(已經(jīng)過標(biāo)準(zhǔn)化之后的TASK數(shù),具體參照前面的內(nèi)容),
最后用遞增序列產(chǎn)生taskid, 并最終生成component和task的對應(yīng)關(guān)系。
(如果不設(shè)置TOPOLOGY-TASKS,task數(shù)等于executor數(shù),后面分配就很容易,否則就涉及task分配問題)
?
storm-task-info函數(shù)的輸出,是這個(gè)樣子的:
{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
?
然后,我們把注意力返回到compute-executors函數(shù)(調(diào)用storm-task-info函數(shù)的調(diào)用處)。
還是用上面博客中,網(wǎng)友解讀的版本來幫助我們理解。(注意:需要對照源碼,確認(rèn)當(dāng)前版本代碼是否有變化)
?
==我的筆記==
?
最后,從程序與StormUI界面對比來看看并行度的分配結(jié)果。
(拓?fù)涑绦?#xff09;
?
?(UI界面)
?
==簡單總結(jié)==
1、有3個(gè)地方可以影響Task數(shù),根據(jù)3個(gè)參數(shù)的結(jié)果決定Task數(shù)。
2、executor數(shù) = 所有組件的parallelism-hint總數(shù)。
3、task數(shù)在生命周期內(nèi)不變,executor數(shù)可能改變。
?
==rebalance命令==
storm運(yùn)行過程中,而已使用rebalance命令動(dòng)態(tài)調(diào)整拓?fù)涞膚orker數(shù)及并發(fā)度。
命令模板:storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*? (*表示可以設(shè)置多個(gè))
## 重新配置拓?fù)?"mytopology",使得該拓?fù)鋼碛?5 個(gè) worker processes, ## 另外,配置名為 "blue-spout" 的 spout 使用 3 個(gè) executor, ## 配置名為 "yellow-bolt" 的 bolt 使用 10 個(gè) executor。$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=1-w:標(biāo)記覆蓋Storm在禁用與關(guān)閉期間等待的時(shí)間長度。
?
==其他疑問==
1、網(wǎng)上總是能看到,“不推薦使用setNumTasks”的方式來提高并發(fā)度。至于原因確實(shí)是一直沒有搞明白。
答:如果只單純的使用setNumTasks,不調(diào)整parallelism-hint,會(huì)造成多個(gè)Task運(yùn)行在1個(gè)executor的結(jié)果。并不一定能夠提高性能。
?
2、如果task數(shù)比executor數(shù)多,是否會(huì)有閑置executor?(需要用代碼驗(yàn)證)
答:不會(huì)有閑置executor。
?
-------------
參考博客:
https://www.cnblogs.com/ierbar0604/p/4386480.html
http://lib.csdn.net/article/60/42875
轉(zhuǎn)載于:https://www.cnblogs.com/quchunhui/p/8271349.html
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的Storm中并行度原来是这样计算的(1.0.1版本)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: POJ - 1509 Glass Be
- 下一篇: React的思想