日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Nimbus三Storm源码分析--Nimbus启动过程

發(fā)布時(shí)間:2023/12/20 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Nimbus三Storm源码分析--Nimbus启动过程 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Nimbus server, 首先從啟動(dòng)命令開(kāi)始, 同樣是使用storm命令"storm nimbus”來(lái)啟動(dòng)
看下源碼, 此處和上面client不同, jvmtype="-server", 最終調(diào)用"backtype.storm.daemon.nimbus"的main
nimbus是用clojure實(shí)現(xiàn)的, 但是clojure是基于JVM的, 所以在最終發(fā)布的時(shí)候會(huì)產(chǎn)生nimbus.class,
所以在用戶(hù)使用的時(shí)候完全可以不知道clojure, 看上去所有都是Java. clojure只是用于提高開(kāi)發(fā)效率而已.

1. Nimbus啟動(dòng)過(guò)程

bin/storm

def nimbus(klass="backtype.storm.daemon.nimbus"): """Syntax: [storm nimbus] Launches the nimbus daemon. This command should be run under supervision with a tool like daemontools or monit. See Setting up a Storm cluster for more information. """ cppaths = [CLUSTER_CONF_DIR] jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ "-Dlogfile.name=nimbus.log", "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml", ] exec_storm_class(klass, jvmtype="-server", extrajars=cppaths, jvmopts=jvmopts)

storm-core/backtype/storm/daemon/nimbus.clj

;; 啟動(dòng)nimbus的主方法 (defn -main [] ;; main前面加上-, 表示是public的. 所以bin/storm能直接調(diào)用nimbus.clj的main方法(-launch (standalone-nimbus))) ;; 同樣launch也是一個(gè)public方法. standalone-nimbus是一個(gè)方法, clojure對(duì)于沒(méi)有參數(shù)的方法可以省略()(defn -launch [nimbus] ;; launch的參數(shù)是一個(gè)Nimbus對(duì)象, 所以上面standalone-nimbus方法的返回值是Nimbus(launch-server! (read-storm-config) nimbus))

注意在clojure中的函數(shù)命名規(guī)范,-functionname表示該函數(shù)是public的,如上面的-main,調(diào)用該函數(shù)的時(shí)候,不需要加-,使用main即可。
而與此相對(duì)的是defn-,這個(gè)表示該函數(shù)是私有函數(shù),不可在外部調(diào)用。

1) standalone-nimbus

nimbus的main, 最終會(huì)調(diào)到launch-server!, conf參數(shù)是調(diào)用read-storm-config讀出的配置參數(shù),
而nimbus是INimbus接口(backtype.storm.scheduler.INimbus)的實(shí)現(xiàn), 可以參考standalone-nimbus.
storm-core/backtype/storm/scheduler/INimbus.java

public interface INimbus {void prepare(Map stormConf, String schedulerLocalDir); /**Returns all slots that are available for the next round of scheduling.在下一次調(diào)度中可用的槽位 * A slot is available for scheduling 如果槽位是空閑的且可以被分配的, 或者雖然被使用但可以被重新分配的. 都是可以被調(diào)度的 * if it is free and can be assigned to, or if it is used and can be reassigned. */ Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments); // this is called after the assignment is changed in ZK void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId); // map from node id to supervisor details String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId); IScheduler getForcedScheduler(); } ;; 返回一個(gè)實(shí)現(xiàn)了INimbus接口的對(duì)象. 由于不想創(chuàng)建這種類(lèi)型, 使用reify匿名對(duì)象 (defn standalone-nimbus [] ;; 沒(méi)有參數(shù). clojure中[]使用的地方有: let綁定, 方法的參數(shù), vector(reify INimbus ;; reify: 具體化匿名數(shù)據(jù)類(lèi)型: 需要一個(gè)實(shí)現(xiàn)了某一協(xié)議/接口的對(duì)象,但是不想創(chuàng)建一個(gè)命名的數(shù)據(jù)類(lèi)型. 匿名類(lèi);; 下面的方式都是INimbus接口的實(shí)現(xiàn)方法(prepare [this conf local-dir]) ;; this可以看做是一個(gè)隱式參數(shù), prepare方法實(shí)際只有2個(gè)參數(shù)的(allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments] (->> supervisors (mapcat (fn [^SupervisorDetails s] (for [p (.getMeta s)] (WorkerSlot. (.getId s) p)))) set )) (assignSlots [this topology slots]) (getForcedScheduler [this] nil ) (getHostName [this supervisors node-id] (if-let [^SupervisorDetails supervisor (get supervisors node-id)] (.getHost supervisor))) ))

這里面有好幾個(gè)語(yǔ)法點(diǎn): ->>, mapcat if-let
mapcat, (mapcat f & colls) 和普通map不同的是, 會(huì)對(duì)map執(zhí)行的結(jié)果執(zhí)行concat操作等于(concat (map f &colls))
依次對(duì)colls中的每個(gè)集合運(yùn)用函數(shù)f, 最后將每個(gè)結(jié)果合并起來(lái). (mapcat f collections)的map不是數(shù)據(jù)結(jié)構(gòu)意義的映射. 而是一個(gè)遍歷操作.
普通的map版本是: (map f collection), 用java來(lái)描述就是for(Object o : collection) func(o). 集合中的每個(gè)元素會(huì)作為函數(shù)f的參數(shù).
上面的(mapcat (fn [s] ...))并沒(méi)有看到collections. 這個(gè)要結(jié)合->> supervisors來(lái)一起分析.
->> supervisors (mapcat fun) 實(shí)際上等價(jià)于(mapcat fun supervisors). 由于mapcat的返回值是map,根據(jù)接口的定義返回值是一個(gè)集合Collection
所以(mapcat)表達(dá)式后面的set的意思是將(mapcat)表達(dá)式的返回值轉(zhuǎn)換為set, (mapcat)表達(dá)式的返回值會(huì)跟在set后面作為最后一個(gè)Item.
達(dá)到連續(xù)調(diào)用的功能. ->>和->的區(qū)別是->是將返回值作為下一個(gè)表達(dá)式的第二個(gè)Item, 而->>是作為下一個(gè)表達(dá)式的最后一個(gè)Item.

supervisors不是Supervisor列表, 其類(lèi)型是SupervisorDetails. mapcat后面緊跟的函數(shù)的參數(shù)類(lèi)型對(duì)應(yīng)的是collections=supervisors的類(lèi)型.
WorkerSlot需要兩個(gè)參數(shù)id和port. 所以這個(gè)方法返回的是Collection, 對(duì)應(yīng)接口INimbus的返回類(lèi)型.

getHostName的參數(shù)supervisors和allSlotsAvailableForScheduling的supervisors是一樣的.
通過(guò)supervisors.get(node-id)獲取對(duì)應(yīng)的supervisor. 所以我們可以猜測(cè)supervisors是一個(gè)Map.
storm-core/backtype/storm/scheduler/SupervisorDetails.java

public class SupervisorDetails {String id;String host; // hostname of this supervisor Object meta; Object schedulerMeta; // meta data configured for this supervisor Set<Integer> allPorts; // all the ports of the supervisor }

Nimbus要分配任務(wù)給Supervisor上的Worker進(jìn)行工作, 而每個(gè)Supervisor會(huì)有多個(gè)worker. 配置文件中可以為一個(gè)supervisor配置多個(gè)slot port.

2) read-storm-config

閱讀源碼其實(shí)都會(huì)遵循一個(gè)范式,那就是程序的入口在哪,配置文件是在什么時(shí)候讀入的。那么好,現(xiàn)在就來(lái)講配置參數(shù)的讀入,在上面的-launch函數(shù)中,
已經(jīng)可以見(jiàn)到用以讀取配置文件的函數(shù)了,那就是read-storm-config。非常狗血的是, 在 nimbus.clj 中有一個(gè)名稱(chēng)非常類(lèi)似的函數(shù)稱(chēng)為read-storm-conf,這個(gè)可不是來(lái)讀取storm cluster的配置信息,它其實(shí)是用來(lái)讀取Topology的配置內(nèi)容的。read-storm-config定義于config.clj中,此時(shí)你會(huì)說(shuō)等等,沒(méi)見(jiàn)到有地方
import或是use backtype.storm.config啊。這一切都被包裝了,它們統(tǒng)統(tǒng)被放到bootstrap.clj中了。注意到這行沒(méi) (bootstrap)
好了, 上述有關(guān)文件引用的疑問(wèn)解決之后, 還是回到正題, 看看read-storm-config的定義吧。storm默認(rèn)的配置文件使用的是yaml格式,一定要找到使用yaml parser的地方。
storm-core/backtype/storm/config.clj

(defn read-storm-config [](let [conf (clojurify-structure (Utils/readStormConfig))] ;; let中參數(shù)conf被賦值為右側(cè)的表達(dá)式的值. 和java方法參數(shù)不同, let中參數(shù)可以被計(jì)算 (validate-configs-with-schemas conf) ;; 對(duì)conf進(jìn)行驗(yàn)證 conf)) ;; 返回這個(gè)conf

真正實(shí)現(xiàn)對(duì)配置文件storm.yaml進(jìn)行讀取的是由java代碼來(lái)實(shí)現(xiàn)的,readStormConfig定義于Utils.java中。
storm-core/backtype/storm/utils/Utils.java

public static Map readDefaultConfig() { return findAndReadConfigFile("defaults.yaml", true); } public static Map readStormConfig() { Map ret = readDefaultConfig(); // 首先讀取defaults.yaml的配置 String confFile = System.getProperty("storm.conf.file"); Map storm; if (confFile==null || confFile.equals("")) { storm = findAndReadConfigFile("storm.yaml", false); } else { storm = findAndReadConfigFile(confFile, true); } ret.putAll(storm); // 其次讀取storm.yaml中的配置 ret.putAll(readCommandLineOpts()); // 最后是命令行的參數(shù), 這個(gè)優(yōu)先級(jí)更高 return ret; } public static Map findAndReadConfigFile(String name, boolean mustExist) { HashSet<URL> resources = new HashSet<URL>(findResources(name)); URL resource = resources.iterator().next(); Yaml yaml = new Yaml(); Map ret = (Map) yaml.load(new InputStreamReader(resource.openStream())); if(ret==null) ret = new HashMap(); return new HashMap(ret); // 解析storm.yaml文件, 返回HashMap } public static List<URL> findResources(String name) { Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name); List<URL> ret = new ArrayList<URL>(); while(resources.hasMoreElements()) { ret.add(resources.nextElement()); } return ret; }

終于看到神秘的Yaml了,那么Yaml這個(gè)類(lèi)又是由誰(shuí)提供的呢,看看Utils.java的 開(kāi)頭部分有這么一句話: import org.yaml.snakeyaml.Yaml;
再看看在storm-core/project.clj中定義的dependencies: [org.yaml/snakeyaml "1.11"]
至此,yaml文件的解析及其依賴(lài)關(guān)系的解決探索完畢。在新版本的storm中使用了maven管理. 可以查看pom.xml

3) storm.yaml

conf/storm.yaml

# storm.zookeeper.servers: # - "server1" # - "server2" # # nimbus.host: "nimbus" # # ##### These may optionally be filled in: # ## List of custom serializations # topology.kryo.register: # - org.mycompany.MyType # - org.mycompany.MyType2: org.mycompany.MyType2Serializer # ## List of custom kryo decorators # topology.kryo.decorators: # - org.mycompany.MyDecorator # ## Locations of the drpc servers # drpc.servers: # - "server1" # - "server2" ## Metrics Consumers # topology.metrics.consumer.register: # - class: "backtype.storm.metrics.LoggingMetricsConsumer" # parallelism.hint: 1 # - class: "org.mycompany.MyMetricsConsumer" # parallelism.hint: 1 # argument: # - endpoint: "metrics-collector.mycompany.org" storm.zookeeper.servers: - 127.0.0.1 storm.zookeeper.port: 2181 nimbus.host: "127.0.0.1" storm.local.dir: "/home/hadoop/data/storm" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703

在配置文件中需要至少回答以下三個(gè)問(wèn)題
1. zookeeper server在哪臺(tái)機(jī)器上運(yùn)行,具體就來(lái)說(shuō)就是ip地址啦
2. nimbus在哪運(yùn)行,可以填寫(xiě)ip地址或域名
3. 在每臺(tái)supervisor運(yùn)行的機(jī)器上可以啟幾個(gè)slot,指定這些slot監(jiān)聽(tīng)的端 口號(hào)
2. thrift RPC
1) thrift
網(wǎng)絡(luò)結(jié)點(diǎn)之間的消息交互一般會(huì)牽涉到兩個(gè)基本的問(wèn)題,
? 消息通道的建立
? 消息的編解碼
如果每變化一個(gè)需求就手工來(lái)重寫(xiě)一次,一是繁瑣,二是易錯(cuò)。為了一勞永逸的解決此類(lèi)問(wèn)題,神一樣的工具就出現(xiàn)了,如google protolbuffer,如thrift.
thrift的使用步驟如下

編寫(xiě)后綴名為thrift的文件,使用工具生成對(duì)應(yīng)語(yǔ)言的源碼,thrift支持的語(yǔ)言很多的,什么c,c++,java,python等,統(tǒng)統(tǒng)不是問(wèn)題。
實(shí)現(xiàn)thrift client
實(shí)現(xiàn)thrift server
thrift server需要實(shí)現(xiàn)thrift文件中定義的service接口。更為具體的信息可以通過(guò)閱讀官方文檔來(lái)獲得。這里有個(gè)thrift java的示例.

(1). 編寫(xiě)thrift文件:add.thrift

namespace java com.zqh.code.thrift.server // defines the namespace typedef i32 int // typedefs to get convenient names for your types service AdditionService { // defines the service to add two numbers int add(1:int n1, 2:int n2), // defines a method }

(2). 編譯:thrift --gen java add.thrift 會(huì)在當(dāng)前目錄生成gen-java/$namespace$/AdditionService
(3). Service:Interface的實(shí)現(xiàn)類(lèi)

public class AdditionServiceHandler implements AdditionService.Iface { public int add(int n1, int n2) throws TException { return n1 + n2; } }

實(shí)現(xiàn)類(lèi)具體實(shí)現(xiàn)了thrift文件定義的接口方法.
(4). Server

public class MyServer {public static void start(AdditionService.Processor<AdditionServiceHandler> processor) { TServerTransport serverTransport = new TServerSocket(9090); // 服務(wù)端Socket TServer server = new TSimpleServer(new Args(serverTransport).processor(processor)); System.out.println("Starting the simple server..."); server.serve(); } public static void main(String[] args) { start(new AdditionService.Processor<AdditionServiceHandler>(new AdditionServiceHandler())); } }

服務(wù)端通過(guò)TServerSocket暴露出服務(wù)端口, 客戶(hù)端要通過(guò)這個(gè)端口連接.
實(shí)現(xiàn)類(lèi)Handler的實(shí)例要作為生成的AdditionService.Processor的參數(shù).
Args需要TServerTransport作為參數(shù), 然后調(diào)用processor方法, 該方法需要AdditionServiceProcessor參數(shù).
這個(gè)過(guò)程類(lèi)似于將自定義實(shí)現(xiàn)類(lèi)Handler注冊(cè)到服務(wù)端上. 接著啟動(dòng)服務(wù)器.
(5). Client

public class AdditionClient {public static void main(String[] args) { TTransport transport = new TSocket("localhost", 9090); transport.open(); TProtocol protocol = new TBinaryProtocol(transport); AdditionService.Client client = new AdditionService.Client(protocol); System.out.println(client.add(100, 200)); transport.close(); } }

客戶(hù)端要建立到服務(wù)端的連接, 需要提供Server的host和port. 根據(jù)TTransport構(gòu)造出和服務(wù)端進(jìn)行通訊的一個(gè)協(xié)議.
這個(gè)協(xié)議傳給自動(dòng)生成的AdditionService的Client內(nèi)部類(lèi), 會(huì)生成一個(gè)類(lèi)似服務(wù)端的代理對(duì)象.
接著就可以使用這個(gè)代理對(duì)象調(diào)用thrift協(xié)議提供的方法.

分布式測(cè)試: 可以在兩臺(tái)機(jī)器上測(cè)試. 第一二步都需要在兩臺(tái)機(jī)器上操作: 編寫(xiě)thrift文件, 編譯.
然后在第一臺(tái)機(jī)器操作3: 自定義實(shí)現(xiàn)類(lèi); 4: Server. 在第二臺(tái)機(jī)器上操作5: Client. 最后分別運(yùn)行兩臺(tái)機(jī)器的Server和Client.

2) nimbus thrift server

有了thrift這個(gè)背景,我們?cè)僦匦率捌鹕鲜龅拇a執(zhí)行路徑。上頭講到程序執(zhí)行至

(defn -launch [nimbus] ;; launch的參數(shù)是一個(gè)Nimbus對(duì)象, 所以上面standalone-nimbus方法的返回值是Nimbus(launch-server! (read-storm-config) nimbus))(defn launch-server! [conf nimbus] ;; 讓nimbus作為一個(gè)thrift server運(yùn)行起來(lái)(validate-distributed-mode! conf) ;; 分布式模式下才會(huì)啟動(dòng)thrift server(let [service-handler (service-handler conf nimbus) ;; 自定義實(shí)現(xiàn)類(lèi), 實(shí)現(xiàn)storm.thrift中service Nimbus定義的接口方法options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) ;; 服務(wù)端的ServerSocket(THsHaServer$Args.) ;; TServerSocket作為T(mén)Server.Args內(nèi)部類(lèi)的參數(shù). 創(chuàng)建了Args args對(duì)象 ->表示插入第二個(gè)位置(.workerThreads 64) ;; 上面new Args(TServerSocket)會(huì)作為這里的第二個(gè)位置, 即args.workerThreads(64) (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE))) (.processor (Nimbus$Processor. service-handler)) ;; args作為這里的第二個(gè)位置,即調(diào)用了args.processor ;; new Nimbus.Processor(service-handler), 自定義實(shí)現(xiàn)類(lèi)作為Nimbus.Processor的參數(shù), ;; processor會(huì)作為參數(shù)再傳給args.processor() ) ;; 最終返回的是TServer.AbstractServerArgs, 會(huì)作為T(mén)Server構(gòu)造函數(shù)的參數(shù) server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))] (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server)))) (log-message "Starting Nimbus server...") ;; 上面添加了一個(gè)關(guān)閉鉤子. 類(lèi)似回調(diào)函數(shù). 當(dāng)關(guān)閉Nimbus的thrift服務(wù)時(shí), 會(huì)觸發(fā)這個(gè)函數(shù)執(zhí)行 (.serve server))) ;; 啟動(dòng)TServer, 即啟動(dòng)Nimbus的thrift服務(wù)

launch-server!說(shuō)白了,就是讓nimbus作為一個(gè)thrift server運(yùn)行起來(lái), 那么storm.thrift中service指定的各個(gè)接口函數(shù)實(shí)現(xiàn)在service-handler中完成。
對(duì)比clojure版本的創(chuàng)建thrift server的過(guò)程, 其實(shí)和上面java示例是一樣的, 只不過(guò)換了不同的實(shí)現(xiàn)類(lèi). 以下是java-clojure的代碼對(duì)比.
new AdditionServiceHandler() (service-handler conf nimbus)
new AdditionService.Processor(new AdditionServiceHandler()) (Nimbus$Processor. service-handler)
TServerTransport serverTransport = new TServerSocket(9090); (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
new Args(serverTransport) -> (TNonblockingServerSocket...) (THsHaServer$Args.)
new Args(serverTransport).processor(processor) -> (TNonblockingServerSocket...) (THsHaServer$Args.) (.processor (Nimbus$Processor. ..))
TServer server = new TSimpleServer(new Args(serverTransport).processor(processor)); server (THsHaServer… options)
server.serve(); (.serve server)

service-handler可是一個(gè)大家伙。對(duì)比一下 service-handler可以發(fā)現(xiàn),在storm.thrift中的定義的Nimbus服務(wù),
其接口在 service-handler中一一得以實(shí)現(xiàn)。 以下是storm.thrift中關(guān)于service Nimbus的聲明。
storm-core/storm.thrift

namespace java backtype.storm.generated service Nimbus {void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws ...; void killTopology(1: string name) throws (1: NotAliveException e); void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e); void activate(1: string name) throws (1: NotAliveException e); void deactivate(1: string name) throws (1: NotAliveException e); void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite); // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs string beginFileUpload(); void uploadChunk(1: string location, 2: binary chunk); void finishFileUpload(1: string location); string beginFileDownload(1: string file); binary downloadChunk(1: string id); //can stop downloading chunks when receive 0-length byte array back string getNimbusConf(); // returns json ClusterSummary getClusterInfo(); // stats functions TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e); string getTopologyConf(1: string id) throws (1: NotAliveException e); //returns json StormTopology getTopology(1: string id) throws (1: NotAliveException e); StormTopology getUserTopology(1: string id) throws (1: NotAliveException e); }

這個(gè)文件還指定了其他一些struct結(jié)構(gòu)的數(shù)據(jù)類(lèi)型, 比如StormTopology, TopologySummary, ClusterSummary, TopologyInfo等.
編譯storm.thrift文件生成的代碼在namespace指定的位置: backtype.storm.generated
storm-core/genthrift.sh

thrift7 --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift

現(xiàn)在來(lái)回顧下storm的thrift RPC的整體流程.
1. 編寫(xiě) storm.thrift
2. 編譯 genthrift.sh, 會(huì)在backtype.storm.generated生成Nimbus.java接口類(lèi). 其中含有內(nèi)部類(lèi)Iface(Service), Processor(Server), Client(Client)
3. Service服務(wù)類(lèi): nimbus.clj中的service-handler方法的返回值. 其應(yīng)該實(shí)現(xiàn)Nimbus.Iface接口. 所以service-handler使用reify Nimbus$Iface
4. Server服務(wù)端: launch-server!中創(chuàng)建thrift的TServer, 并啟動(dòng). 使用了Nimbus.Processor, 傳入service-handler自定義服務(wù)實(shí)現(xiàn)類(lèi)
5. Client客戶(hù)端: StormSubmitter中l(wèi)ocalNimbus!=null時(shí), 使用NimbusClient即Nimbus.Client調(diào)用RPC定義的接口方法

注意: 對(duì)于本地模式, 在StormSubmitter中直接使用Nimbus.Iface localNimbus對(duì)象. 這個(gè)對(duì)象的實(shí)現(xiàn)類(lèi)應(yīng)該就是service-handler.
對(duì)于分布式模式, StormSubmitter作為客戶(hù)端, 會(huì)通過(guò)client調(diào)用RPC定義的接口方法. 即storm.thrift中定義的方法. 所以service-handler要實(shí)現(xiàn)這些方法!

  • 2015年04月14日發(fā)布

轉(zhuǎn)載于:https://www.cnblogs.com/catkins/p/5252480.html

總結(jié)

以上是生活随笔為你收集整理的Nimbus三Storm源码分析--Nimbus启动过程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。