twitter storm源码走读(二)
topology提交過程分析
概要
storm cluster可以想像成為一個(gè)工廠,nimbus主要負(fù)責(zé)從外部接收訂單和任務(wù)分配。除了從外部接單,nimbus還要將這些外部訂單轉(zhuǎn)換成為內(nèi)部工作分配,這個(gè)時(shí)候nimbus充當(dāng)了調(diào)度室的角色。supervisor作為中層干部,職責(zé)就是生產(chǎn)車間的主任,他的日常工作就是時(shí)刻等待著調(diào)度到給他下達(dá)新的工作。作為車間主任,supervisor領(lǐng)到的活是不用自己親力親為去作的,他手下有著一班的普通工人。supervisor對(duì)這些工人只會(huì)喊兩句話,開工,收工。注意,講收工的時(shí)候并不意味著worker手上的活已經(jīng)干完了,只是進(jìn)入休息狀態(tài)而已。
topology的提交過程涉及到以下角色。
- storm client? ?負(fù)責(zé)將用戶創(chuàng)建的topology提交到nimbus
- nimbus? ?????通過thrift接口接收用戶提交的topology
- supervisor???????根據(jù)zk接口上提示的消息下載最新的任務(wù)安排,并負(fù)責(zé)啟動(dòng)worker
- worker????????????worker內(nèi)可以運(yùn)行task,這些task要么屬于bolt類型,要么屬于spout類型
- executor?????????executor是一個(gè)個(gè)運(yùn)行的線程,同一個(gè)executor內(nèi)可以運(yùn)行同一種類型的task,即一個(gè)線程中的task要么全部是bolt類型,要么全部是spout類型
一個(gè)worker等同于一個(gè)進(jìn)程,一個(gè)executor等同于一個(gè)線程,同一個(gè)線程中能夠運(yùn)行一或多個(gè)tasks。在0.8.0版之前,一個(gè)task是對(duì)應(yīng)于一個(gè)線程的,在0.8.0版本中引入了executor概念,變化引入之后,task與thread之間的一一對(duì)應(yīng)關(guān)系就取消了,同時(shí)在zookeeper server中原本存在的tasks-subtree也消失了,有關(guān)這個(gè)變化,可以參考http://storm-project.net/2012/08/02/storm080-released.html
?storm client
storm client需要執(zhí)行下面這句指令將要提交的topology提交給storm cluster 假設(shè)jar文件名為storm-starter-0.0.1-snapshot-standalone.jar,啟動(dòng)程序?yàn)?storm.starter.ExclamationTopology,給這個(gè)topology起的名稱為exclamationTopology.
#./storm jar $HOME/working/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.ExclamationTopology exclamationTopology這么短短的一句話對(duì)于storm client來說,究竟意味著什么呢? 源碼面前是沒有任何秘密可言的,那好打開storm client的源碼文件
def jar(jarfile, klass, *args):"""Syntax: [storm jar topology-jar-path class ...]Runs the main method of class with the specified arguments. The storm jars and configs in ~/.storm are put on the classpath. The process is configured so that StormSubmitter (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)will upload the jar at topology-jar-path when the topology is submitted."""exec_storm_class(klass,jvmtype="-client",extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],args=args,jvmopts=["-Dstorm.jar=" + jarfile]) def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):global CONFFILEall_args = ["java", jvmtype, get_config_opts(),"-Dstorm.home=" + STORM_DIR, "-Djava.library.path=" + confvalue("java.library.path", extrajars),"-Dstorm.conf.file=" + CONFFILE,"-cp", get_classpath(extrajars),] + jvmopts + [klass] + list(args)print "Running: " + " ".join(all_args)if fork:os.spawnvp(os.P_WAIT, "java", all_args)else:os.execvp("java", all_args) # replaces the current process andnever returnsexec_storm_class說白了就是要運(yùn)行傳進(jìn)來了的WordCountTopology類中main函數(shù),再看看main函數(shù)的實(shí)現(xiàn)
public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 5);builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} }對(duì)于storm client側(cè)來說,最主要的函數(shù)StormSubmitter露出了真面目,submitTopology才是我們真正要研究的重點(diǎn)。
public static void submitTopology(String name, Map stormConf,StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {if(!Utils.isValidConf(stormConf)) {throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");}stormConf = new HashMap(stormConf);stormConf.putAll(Utils.readCommandLineOpts());Map conf = Utils.readStormConfig();conf.putAll(stormConf);try {String serConf = JSONValue.toJSONString(stormConf);if(localNimbus!=null) {LOG.info("Submitting topology " + name + " in local mode");localNimbus.submitTopology(name, null, serConf, topology);} else {NimbusClient client = NimbusClient.getConfiguredClient(conf);if(topologyNameExists(conf, name)) {throw new RuntimeException("Topology with name `"+ name + "` already exists on cluster");}submitJar(conf);try {LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);if(opts!=null) {client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); } else {// this is for backwards compatibility client.getClient().submitTopology(name, submittedJar, serConf, topology); }} catch(InvalidTopologyException e) {LOG.warn("Topology submission exception", e);throw e;} catch(AlreadyAliveException e) {LOG.warn("Topology already alive exception", e);throw e;} finally {client.close();}}LOG.info("Finished submitting topology: " + name);} catch(TException e) {throw new RuntimeException(e);}}submitTopology函數(shù)其實(shí)主要就干兩件事,一上傳jar文件到storm cluster,另一件事通知storm cluster文件已經(jīng)上傳完畢,你可以執(zhí)行某某某topology了.
先看上傳jar文件對(duì)應(yīng)的函數(shù)submitJar,其調(diào)用關(guān)系如下圖所示
再看第二步中的調(diào)用關(guān)系,圖是我用tikz/pgf寫的,生成的是pdf格式。
在上述兩幅調(diào)用關(guān)系圖中,處于子樹位置的函數(shù)都曾在storm.thrift中聲明,如果此刻已經(jīng)忘記了的點(diǎn)話,可以翻看一下前面1.3節(jié)中有關(guān)storm.thrift的描述。client側(cè)的這些函數(shù)都是由thrift自動(dòng)生成的。
由于篇幅和時(shí)間的關(guān)系,在storm client側(cè)submit topology的時(shí)候,非常重要的函數(shù)還有TopologyBuilder.java中的源碼。
nimbus
storm client側(cè)通過thrift接口向nimbus發(fā)送了了jar并且通過預(yù)先定義好的submitTopologyWithOpts來處理上傳的topology,那么nimbus是如何一步步的進(jìn)行文件接收并將其任務(wù)細(xì)化最終下達(dá)給supervisor的呢。
submitTopologyWithOpts
一切還是要從thrift說起,supervisor.clj中的service-handler具體實(shí)現(xiàn)了thrift定義的Nimbus接口,代碼這里就不羅列了,太占篇幅。主要看其是如何實(shí)現(xiàn)submitTopologyWithOpts
(^void submitTopologyWithOpts[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology^SubmitOptions submitOptions](try(assert (not-nil? submitOptions))(validate-topology-name! storm-name)(check-storm-active! nimbus storm-name false)(.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)storm-name(from-json serializedConf)topology)(swap! (:submitted-count nimbus) inc)(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))storm-conf (normalize-confconf(-> serializedConffrom-json(assoc STORM-ID storm-id)(assoc TOPOLOGY-NAME storm-name))topology)total-storm-conf (merge conf storm-conf)topology (normalize-topology total-storm-conf topology)topology (if (total-storm-conf TOPOLOGY-OPTIMIZE)(optimize-topology topology)topology)storm-cluster-state (:storm-cluster-state nimbus)](system-topology! total-storm-conf topology) ;; this validates the structure of the topology(log-message "Received topology submission for " storm-name " with conf " storm-conf);; lock protects against multiple topologies being submitted at once and;; cleanup thread killing topology in b/w assignment and starting the topology(locking (:submit-lock nimbus)(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)(.setup-heartbeats! storm-cluster-state storm-id)(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactiveTopologyInitialStatus/ACTIVE :active}](start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))(mk-assignments nimbus)))(catch Throwable e(log-warn-error e "Topology submission exception. (topology name='" storm-name "')")(throw e))))storm cluster在zookeeper server上創(chuàng)建的目錄結(jié)構(gòu)。目錄結(jié)構(gòu)相關(guān)的源文件是config.clj.
白話一下上面這個(gè)函數(shù)的執(zhí)行邏輯,對(duì)上傳的topology作必要的檢測(cè),包括名字,文件內(nèi)容及格式,好比你進(jìn)一家公司上班之前做的體檢。這些工作都完成之后進(jìn)入關(guān)鍵區(qū)域,是進(jìn)入關(guān)鍵區(qū)域所以上鎖,呵呵。
normalize-topology
(defn all-components [^StormTopology topology](apply merge {}(for [f thrift/STORM-TOPOLOGY-FIELDS](.getFieldValue topology f))))一旦列出所有的components,就可以讀出這些component的配置信息。
mk-assignments
在這關(guān)鍵區(qū)域內(nèi)執(zhí)行的重點(diǎn)就是函數(shù)mk-assignments,mk-assignment有兩個(gè)主要任務(wù),第一是計(jì)算出有多少task,即有多少個(gè)spout,多少個(gè)bolt,第二就是在剛才的計(jì)算基礎(chǔ)上通過調(diào)用zookeeper應(yīng)用接口,寫入assignment,以便supervisor感知到有新的任務(wù)需要認(rèn)領(lǐng)。
先說第二點(diǎn),因?yàn)檫壿嫼?jiǎn)單。在mk-assignment中執(zhí)行如下代碼在zookeeper中設(shè)定相應(yīng)的數(shù)據(jù)以便supervisor能夠感知到有新的任務(wù)產(chǎn)生
(doseq [[topology-id assignment] new-assignments:let [existing-assignment (get existing-assignments topology-id)topology-details (.getById topologies topology-id)]](if (= existing-assignment assignment)(log-debug "Assignment for " topology-id " hasn't changed")(do(log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))(.set-assignment! storm-cluster-state topology-id assignment))))調(diào)用關(guān)系如下圖所示
?
而第一點(diǎn)涉及到的計(jì)算相對(duì)繁雜,需要一一仔細(xì)道來。其實(shí)第一點(diǎn)中非常重要的課題就是如何進(jìn)行任務(wù)的分發(fā),即scheduling.
也許你已經(jīng)注意到目錄src/clj/backtype/storm/scheduler,或者注意到storm.yaml中與scheduler相關(guān)的配置項(xiàng)。那么這個(gè)scheduler到底是在什么時(shí)候起作用的呢。mk-assignments會(huì)間接調(diào)用到這么一個(gè)名字看起來奇怪異常的函數(shù)。compute-new-topology->executor->node+por,也就是在這么很奇怪的函數(shù)內(nèi),scheduler被調(diào)用
schedule計(jì)算出來的assignments保存于Cluster.java中,這也是為什么new-scheduler-assignment要從其中讀取數(shù)據(jù)的緣由所在。有了assignment,就可以計(jì)算出相應(yīng)的node和port,其實(shí)就是這個(gè)任務(wù)應(yīng)該交由哪個(gè)supervisor上的worker來執(zhí)行。
?storm在zookeeper server上創(chuàng)建的目錄結(jié)構(gòu)如下圖所示
?
有了這個(gè)目錄結(jié)構(gòu),現(xiàn)在要解答的問題是在topology在提交的時(shí)候要寫哪幾個(gè)目錄?assignments目錄下會(huì)新創(chuàng)建一個(gè)新提交的topology的目錄,在這個(gè)topology中需要寫的數(shù)據(jù),其數(shù)據(jù)結(jié)構(gòu)是什么樣子?
?
supervisor
一旦有新的assignment被寫入到zookeeper中,supervisor中的回調(diào)函數(shù)mk-synchronize-supervisor立馬被喚醒執(zhí)行
主要執(zhí)行邏輯就是讀入zookeeper server中新的assignments全集與已經(jīng)運(yùn)行與本機(jī)上的assignments作比較,區(qū)別出哪些是新增的。在sync-processes函數(shù)中將運(yùn)行具體task的worker拉起。
?要想講清楚topology提交過程中,supervisor需要做哪些動(dòng)作,最主要的是去理解下面兩個(gè)函數(shù)的處理邏輯。
- mk-synchronize-supervisor??當(dāng)在zookeeper server的assignments子目錄內(nèi)容有所變化時(shí),supervisor收到相應(yīng)的notification, 處理這個(gè)notification的回調(diào)函數(shù)即為mk-synchronize-supervisor,mk-sychronize-supervisor讀取所有的assignments即便它不是由自己處理,并將所有assignment的具體信息讀出。爾后判斷分析出哪些assignment是分配給自己處理的,在這些分配的assignment中,哪些是新增的。知道了新增的assignment之后,從nimbus的相應(yīng)目錄下載jar文件,用戶自己的處理邏輯代碼并沒有上傳到zookeeper server而是在nimbus所在的機(jī)器硬盤上。
- sync-processes?mk-synchronize-supervisor預(yù)處理過完與assignment相關(guān)的操作后,將真正啟動(dòng)worker的動(dòng)作交給event-manager, event-manager運(yùn)行在另一個(gè)獨(dú)立的線程中,這個(gè)線程中進(jìn)行處理的一個(gè)主要函數(shù)即sync-processes. sync-processes會(huì)將當(dāng)前運(yùn)行著的worker全部kill,然后指定新的運(yùn)行參數(shù),重新拉起worker.
總結(jié)
以上是生活随笔為你收集整理的twitter storm源码走读(二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 实战 MDT 2012(六)---基于M
- 下一篇: 项目代码规范