日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

storm消息可靠机制(ack)的原理和使用

發布時間:2024/7/23 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm消息可靠机制(ack)的原理和使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

關于storm的基礎,參照我這篇文章:流式計算storm
關于并發和并行,參照我這篇文章:并發和并行
關于storm的并行度解釋,參照我這篇文章:storm的并行度解釋
關于storm的流分組策略,參照我這篇文章:storm的流分組策略
關于storm的消息可靠機制,參照我這篇文章:storm的消息可靠機制

storm的消息可靠機制可以確保spout發出的每條tuple消息都會被完整的處理;
主要是由spout和bolt共同完成的.
本文主要討論storm的消息可靠機制的原理和使用

storm的可靠機制,是storm的一大亮點,那么他是如何實現的呢? 先看效果:1.spout每發一條消息,就新建一個唯一的msgId(比如UUID),然后將這條消息和這個唯一id存在map中;2.每個bolt在處理tuple后,emit的時候帶上tulpe,成功,就調用ack方法,代表成功,失敗就調用fail方法,代表失敗;這樣編寫代碼后,你會發現,失敗的消息spout會重新發送,效果就出來了 實現原理:原理很簡單,使用了異或的知識點.我們知道,任意兩個相同的數字,異或的結果都是0.例如:1^1=0現在請跟著我的思路想:1.首先想象有個服務,叫ack,他的主要作用就是判斷每條tuple信息是否都成功處理2.每個spout發送和接收成功,都要給ack發送一個數字,最后由ack計算,判斷整條鏈路是否成功處理3.spout作為發送方,假設他要給3個bolt發送消息,分別是bolt1,bolt2,bolt3;4.假設這3個bolt最后都發給bolt4;5.假設本次要處理的消息叫做root_id;6.開始發送了;7.spout給bolt1發送消息<root_id,1>8.spout給bolt2發送消息<root_id,2>9.spout給bolt3發送消息<root_id,3>10.發送完spout再給ack發送1^2^311.bolt1收到<root_id,1>,處理成功再給bolt4發送<root_id,4>;12.發送完bolt1再給ack發送1^4,處理不成功就不發送了;13.bolt2收到<root_id,2>,處理成功再給bolt4發送<root_id,5>;14.發送完bolt2再給ack發送2^5,處理不成功就不發送了;15.bolt3收到<root_id,3>,處理成功再給bolt4發送<root_id,6>;16.發送完bolt3再給ack發送3^6,處理不成功就不發送了;17.bolt4收到前3個bolt的消息,<root_id,4>,<root_id,5>,<root_id,6>,處理成功后分別給ack發送4,5,6,處理不成功就不發送了;18.我們站在ack的角度來看,對于root_id這條消息來說,如果所有spout和bolt都成功,那么應該會收到:1^2^3,1^4,2^5,3^6,4,5,6;19.將所有收到的數字異或操作,即:1^2^3^1^4^2^5^3^6^4^5^6,由于相同數字異或結果為0,即上面的式子的結果就是0,任意少收到哪個值,最終的結果都不會為0;20.如果ack最終計算的結果是0,那么就代表這個消息root_id處理成功了21.如果ack最終計算結果不為0,那么就代表這個消息root_id處理失敗了

如何使用

舉個項目中的例子:
spout中:

這個類 extends BaseRichSpoutprivate OutputCollector collector;private ConcurrentHashMap<UUID, Values> pending;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {this.collector = collector;this.pending = new ConcurrentHashMap<>();}@Overridepublic void nextTuple() {//具體業務...Values value = new Values("要傳的業務數據");UUID msgId = UUID.randomUUID();this.pending.put(msgId, value);this.collector.emit(value, msgId);}@Overridepublic void ack(Object msgId) {//收到成功消息,就刪除這條msgIdthis.pending.remove(msgId);}@Overridepublic void fail(Object msgId) {//收到失敗消息就重新發送一遍//一般成熟的做法是會再記錄個失敗次數,不會一直失敗重發的this.collector.emit(this.pending.get(msgId), msgId);}

bolt中:

這個類 extends BaseRichBoltprivate OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {this.collector=collector;}@Overridepublic void execute(Tuple tuple) {try {//具體業務...//注意,這里發送的時候,一定要帶上tuplethis.collector.emit(tuple,new Values("業務數據"));collector.ack(tuple);} catch (Exception e) {collector.fail(tuple);e.printStackTrace();}}

總結

以上是生活随笔為你收集整理的storm消息可靠机制(ack)的原理和使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。