日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm的BaseBasicBolt源码解析ack机制

發布時間:2023/12/10 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm的BaseBasicBolt源码解析ack机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我們在學習ack機制的時候,我們知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit數據的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。
在使用BaseRichBolt需要在emit數據的時候,顯示指定該數據的源tuple要加上第二個參數anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple);并且需要在execute執行成功后調用OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple);

那么我們來看看BasicBolt的源碼是不是這樣的,不能因為看到別人的帖子說是這樣的,我們就這樣任務,以訛傳訛,我們要To see is to believe。

?

為了方便看源代碼,我先上我們的繼承類:

public class SplitSentenceBolt extends BaseBasicBolt { public void prepare(Map stormConf, TopologyContext context) {super.prepare(stormConf, context);}//5:執行我們自己的邏輯處理方法,接收傳入的參數。public void execute(Tuple input, BasicOutputCollector collector) {String sentence = (String)input.getValueByField("sentence");String[] words = sentence.split(" ");for (String word : words) {word = word.trim();word = word.toLowerCase();collector.emit(new Values(word,1));//這個地方就是調用OutputCollector的包裝類,來發消息}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","num"));} }

通過打斷點,我們發現,bolt的task會創建這個類下面會標準執行順序

public class BasicBoltExecutor implements IRichBolt {public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class); private IBasicBolt _bolt;private transient BasicOutputCollector _collector;//1:創建該對象,然后把我們寫的SplitSentenceBolt對象賦給父類IBasicBolt。public BasicBoltExecutor(IBasicBolt bolt) {_bolt = bolt;}public void declareOutputFields(OutputFieldsDeclarer declarer) {_bolt.declareOutputFields(declarer);//這里就是調用SplitSentenceBolt對象的方法了。}//2:給BasicOutputCollector _collector字段賦值,BasicOutputCollector就是對OutputCollector類的包裝。public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {_bolt.prepare(stormConf, context);_collector = new BasicOutputCollector(collector);}//3:然后程序執行該方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /]public void execute(Tuple input) {_collector.setContext(input);//把接收到的tuple值設置給BasicOutputCollector中inputTuple字段。try {_bolt.execute(input, _collector);//這個地方是調用我們實現類SplitSentenceBolt的ececute方法。_collector.getOutputter().ack(input);//這個地方就是響應} catch(FailedException e) {if(e instanceof ReportedFailedException) {_collector.reportError(e);}_collector.getOutputter().fail(input);//這個地方就是響應}}public void cleanup() {_bolt.cleanup();}public Map<String, Object> getComponentConfiguration() {return _bolt.getComponentConfiguration();} } public class BasicOutputCollector implements IBasicOutputCollector {private OutputCollector out;private Tuple inputTuple;public BasicOutputCollector(OutputCollector out) {this.out = out;}//4:把收到的tuple數據賦值給inputTuple,這個時候BasicOutputCollector對象的字段都具有值了。public void setContext(Tuple inputTuple) {this.inputTuple = inputTuple;}//6:這里我們發送新的(轉換后的)tuple數據,看他內部的調用,其實他也會發送一個anchor tuple來保持tracker鏈路, 而這個anchor tuple就是bolt接收到轉換前的源tuple數據。public List<Integer> emit(List<Object> tuple) { return emit(Utils.DEFAULT_STREAM_ID, tuple); }public List<Integer> emit(String streamId, List<Object> tuple) {return out.emit(streamId, inputTuple, tuple);}public void emitDirect(int taskId, String streamId, List<Object> tuple) {out.emitDirect(taskId, streamId, inputTuple, tuple);}public void emitDirect(int taskId, List<Object> tuple) {emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);}protected IOutputCollector getOutputter() {return out;}public void reportError(Throwable t) {out.reportError(t);} }

?這里大家不要糾結bolt的啟動時從哪里開始的,我后面會講的,這里我們關注的是,BasicBoltExecutor對象創建后的執行過程,以這我們來看執行的過程。在BasicBoltExecutor的execute方法中,我們看到了ack和fail方法會被自動調用的,當我們的程序拋出異常則會執行fail方法的。

轉發:http://www.cnblogs.com/intsmaze/p/5924873.html

?

?

總結

以上是生活随笔為你收集整理的Storm的BaseBasicBolt源码解析ack机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。