日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm Trident拓扑中的错误处理

發(fā)布時間:2023/12/3 编程问答 53 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm Trident拓扑中的错误处理 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

這篇文章總結(jié)了我在設(shè)計Storm Trident拓?fù)鋾r當(dāng)前的錯誤處理方法。 我在這里關(guān)注代碼設(shè)計,而不是監(jiān)督或冗余之類的部署良好實踐。

由于Storm的實時流性質(zhì),當(dāng)面對大多數(shù)錯誤時,我們最終將不得不移至下一個數(shù)據(jù)。 在這種情況下,錯誤處理歸結(jié)為(或沒有)報告此錯誤,并在以后(或沒有)重試處理失敗的輸入數(shù)據(jù)。 這篇文章的第1部分是關(guān)于這方面的。

這意味著在處理元組時,通常很難確定它是我們第一次遇到它還是它的內(nèi)容已經(jīng)部分地應(yīng)用于持久性。 因此,我們需要使?fàn)顟B(tài)更新操作成為冪等,這是本文的第二部分。

不要對這篇文章的大小印象深刻,Storm實際上為我們完成了大部分工作。 真正需要做的只是了解如何以合理的方式插入東西。

這篇文章基于Storm 0.9,Cassandra 2.0.4和Kafka 0.7。 我在github上放置了一個玩具項目 ,以說明下面討論的幾點。 該項目實際上是根據(jù)我在上一篇文章中介紹的“房間存在”示例改編的 。

第1部分:處理錯誤情況

決定何時要求重試

第一個簡單的錯誤處理策略是簡單地接受運行時錯誤導(dǎo)致的計算質(zhì)量下降。 例如,如果拓?fù)湓谧罱幕瑒哟翱谏嫌嬎阋恍崟r趨勢估計,或者如果我們已經(jīng)在處理諸如Twitter公開流之類的采樣數(shù)據(jù),則可能是這種情況。 如果我們選擇忽略此類錯誤,則實現(xiàn)起來非常簡單,只需用大量的try / catch包裝拓?fù)溥壿?#xff0c;以某種方式報告錯誤,并且不要讓任何事情冒充Storm。

但是,在大多數(shù)情況下,我們關(guān)心一致性,因此必須對嘗試重試或不嘗試失敗的數(shù)據(jù)做出謹(jǐn)慎的決定。

運行時錯誤的一個典型示例是入站數(shù)據(jù)格式問題。 在那種情況下,重試當(dāng)然是沒有意義的,因為它不會第二次變得更好。 相反,我們應(yīng)該記錄故障數(shù)據(jù),并可能要求某些人進(jìn)行調(diào)查。 這是我的玩具項目中BytesToString Storm函數(shù)的一個簡單示例:

public class BytesToString extends BaseFunction { @Overridepublic void execute(TridentTuple tuple, TridentCollector tridentCollector) { try { String asString = new String((byte[]) tuple.getValueByField("bytes"), "UTF-8"); tridentCollector.emit(new Values(asString)); } catch (UnsupportedEncodingException e) { logger.err("ERROR: lost data: unable to parse inbound message from Kafka (expecting UTF-8 string)", e); } }

另一方面,如果錯誤與某些不可訪問的外部數(shù)據(jù)源有關(guān),例如由網(wǎng)絡(luò)分區(qū)引起的錯誤,我們應(yīng)按下一節(jié)所述觸發(fā)重試。

除上述兩種錯誤外,還有許多其他類型的錯誤,但要點仍然是:區(qū)分可重試錯誤與不可重試錯誤并做出相應(yīng)反應(yīng)很重要。

最后一點,當(dāng)您決定不報告在IBackingMap的multiget中發(fā)生的錯誤時,請格外小心 ,因為該函數(shù)必須返回與輸入鍵列表大小相同的列表。 因此,如果出現(xiàn)不可重試的錯誤,我們必須以某種方式返回某些結(jié)果。 在大多數(shù)情況下,如果我們選擇不重試這種情況下的錯誤,那是因為某些過去的錯誤已經(jīng)在持久性方面破壞了某些內(nèi)容,并且為時已晚。 在下面的示例中,由于對從DB讀取的某些數(shù)據(jù)進(jìn)行的解析失敗而發(fā)生錯誤,并且代碼僅返回null值,這等同于考慮到持久性沒有任何作用(至少沒有用處)。 另請參閱下面的第3部分,以了解針對這種情況的可能解決方案。

@Override public List<OpaqueValue> multiGet(List<List<Object>> keys) { try { return Utils.opaqueStringToOpaqueValues(opaqueStrings, HourlyTimeline.class); } catch (IOException e) { logger.err("error while trying to deserialize data from json => giving up (data is lost!)", e); return Utils.listOfNulls(keys.size()); // this assumes previous state does not exist => destroys data! } }

(好吧,來自TimelineBackingMap的這段代碼實際上將所有數(shù)據(jù)替換為null,這使情況變得更糟,但這是一個玩具項目……)

導(dǎo)致三叉戟元組被重播…

一旦確定觸發(fā)元組重播是合理的,我們只需要詢問它,Storm就會做其他所有事情(只需插入正確的噴嘴,請參閱下一節(jié))。 從技術(shù)上講,這很簡單:從功能或過濾器之類的Trident原語中觸發(fā)重試就像拋出FailedException一樣簡單,就像玩具項目中的TimeLineBackingMap中一樣,其中包括重試和非重試錯誤的示例(請注意,代碼下面來自TimelineBackingMap的示例假定任何數(shù)據(jù)庫錯誤都是可重試的,這過于簡化了):

@Override public void multiPut(List<List<Object>> keys, List<OpaqueValue> timelines) {;List<OpaqueValue> jsonOpaqueTimelines; try { jsonOpaqueTimelines = Utils.opaqueValuesToOpaqueJson(timelines); } catch (IOException e) { System.err.println("error while trying to serialize data to json => giving up (data is lost!)"); return; }if (jsonOpaqueTimelines != null) { try { DB.put("room_timelines", toSingleKeys(keys), jsonOpaqueTimelines); } catch (Exception e) { logger.err("error while storing timelines to cassandra, triggering a retry...", e); throw new FailedException("could not store data into Cassandra, triggering a retry...", e); } } };

然后,Storm會將錯誤傳播回噴嘴,以強制重播元組。 如果我們希望在Storm UI中報告錯誤,則可以拋出ReportedFailedException。

我強烈不建議使用的另一種方法是讓任何其他類型的RuntimeException冒泡到Storm。 這本質(zhì)上以更高的性能成本實現(xiàn)了相同的結(jié)果:它將觸發(fā)工作節(jié)點崩潰,并且Nimbus將自動重啟,并且所有spout將恢復(fù)從最新的已知成功索引中讀取(spout實現(xiàn)(如Kafka spout將其最新成功處理的偏移存儲在zookeeper中)為了這個目的)。 這種快速失敗策略是Storm設(shè)計的一部分(請參閱有關(guān)工人監(jiān)督和容錯的文檔)。 從本質(zhì)上講,這實現(xiàn)了與讓spout重播某些元組相同的一致性保證,但是對性能的影響當(dāng)然更大,因為我們具有完整的JVM重新啟動并重置了所有當(dāng)前正在運行的拓?fù)鋵嵗?因此,切勿故意這樣做。 仍然令人放心的是,如果我們的節(jié)點崩潰,數(shù)據(jù)不會中斷,并且流量自然會繼續(xù)。

Storm決定重播元組的第三種情況是它們是否在配置的超時之前未到達(dá)拓?fù)涞哪┪病?更確切地說,如果未按時收到ACK,則該機制實際上是由發(fā)出該元組的spout觸發(fā)的,因此,如果元組成功處理但由于某些網(wǎng)絡(luò)分區(qū)ACK無法到達(dá)該spout,則也可以觸發(fā)這些重播。 用于控制此設(shè)置的Storm參數(shù)是topology.enable.message.timeouts和topology.message.timeout.secs ,根據(jù)defaults.yaml的默認(rèn)值為“ true”和30秒。 這只是為什么拓?fù)渲械膬绲刃匀绱酥匾挠忠粋€原因。

…并實際上重播元組

一旦失敗通知到達(dá)噴嘴(或在超時情況下由通知生成),我們需要確保失敗的元組將被重播。 除非您自己開發(fā)噴嘴,否則只能歸結(jié)為選擇正確的噴嘴口味 。 此選擇會影響元組的重播(或不重播)方式,因此它必須與適當(dāng)?shù)牟呗员3忠恢?#xff0c;以處理拓?fù)渲械囊阎夭サ脑M,這是下一部分的主題。 有3種噴口:

  • 非事務(wù)性:無保證,但如果您選擇的實現(xiàn)提供“至少一次”保證,在某些情況下它們?nèi)匀挥杏?
  • 事務(wù)性的:不建議使用,因為它們在某些分區(qū)情況下可能會阻止拓?fù)?
  • opaque(不透明):就重播而言,它們達(dá)到元組至少會被播放一次,但在重播方面提供了弱保證,但在重播的情況下,發(fā)出的批次可能會不同。 在實踐中,使用它們時,我所建議的所有重要事項是確保拓?fù)鋵τ谶@種靈活的重放具有魯棒性,這將在下一部分中進(jìn)行討論。

關(guān)于元組和批處理重播的最后說明

我在元組級別上進(jìn)行了討論,因為這使設(shè)計決策更簡單。 實際上,要求Storm重播單個元組將觸發(fā)同一批中包含的許多其他元組的重播,其中一些可能沒有錯誤。

第2部分:重播元組的冪等處理

故事的另一面是,既然我們知道元組可能會被處理幾次,請確保拓?fù)涫莾绲鹊?#xff0c;即,發(fā)送相同元組的次數(shù)不會使?fàn)顟B(tài)不一致。 沒有副作用的拓?fù)洳糠之?dāng)然不受元組重播的影響。

關(guān)于狀態(tài)一致性的Storm Trident文檔非常清楚,因此我在這里僅添加一些內(nèi)容。

如果我們的狀態(tài)更新操作已經(jīng)冪等

如果狀態(tài)更新操作本質(zhì)上已經(jīng)是冪等的,那么它已經(jīng)具有元組重播的彈性,并且不需要Storm特殊機制。

如果id值完全基于入站元組內(nèi)容,則任何“按id存儲”操作都是這種情況。 例如,在我的玩具項目中,我存儲了占用會話,這些會話的主鍵是從入站事件中找到的相關(guān)ID派生的,因此在這種情況下,寫操作已經(jīng)可以重播了,因為任何重播都只會覆蓋相同的現(xiàn)有數(shù)據(jù)信息而不會破壞任何數(shù)據(jù)(假設(shè)我們有訂購保證,在這種情況下是正確的)。

public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) { DB.upsertPeriods(newOrUpdatedPeriods); }

在CassandraDB.java中:

try { PreparedStatement statement = getSession().prepare("INSERT INTO presence (id, payload) values (?,?)"); execute(new BoundStatement(statement).bind(rpp.getId(), periodJson)); } catch (Exception e) { logger.error("error while contacting Cassandra, triggering a retry...", e); new FailedException("error while trying to record room presence in Cassandra ", e); }

同時使read-update-write操作成為冪等

我在先前的博客文章中描述了Storm如何使我們能夠?qū)崿F(xiàn)執(zhí)行以下操作而不需要DB鎖并且仍然避免出現(xiàn)競爭情況:

  • 從數(shù)據(jù)庫讀取以前的狀態(tài),
  • 根據(jù)新的元組數(shù)據(jù)更新內(nèi)存中的狀態(tài),
  • 將新狀態(tài)保存到數(shù)據(jù)庫

風(fēng)暴的美麗之處在于,為了處理重播的元組而不破壞狀態(tài),我們只需要調(diào)整步驟1和3。這是非常重要的:我們現(xiàn)在可以在步驟2中實現(xiàn)所有處理邏輯,就像每個元組只被播放一次,然后根本不關(guān)心重播(只要我們是“純”的,請參見下面的評論…)。 這就是“風(fēng)暴只有一次語義”的含義。

而且,如果我們在內(nèi)部實現(xiàn)1和3,則使它們重播即可,只是將它們與現(xiàn)有的Storm類包裝在一起即可。 最健壯的方式是使用Opaque邏輯,但代價是每個狀態(tài)存儲兩次狀態(tài),如Trident文檔中關(guān)于transaction spout的說明 。

更好的是,已經(jīng)有很多不透明的BackingMap實現(xiàn)可用于Storm-contrib中的諸如Cassandra或Mysql的許多后端,因此,在大多數(shù)情況下,除了選擇正確的之外,實際上沒有任何其他事情可做。

最重要的一點是,要使用處理重播元組的不透明BackingMap,必須使用尊重不透明先決條件的噴嘴,如本矩陣所述 。

如果由于某種原因需要實現(xiàn)自己的BackingMap,我們唯一要做的就是使它存儲數(shù)據(jù)的當(dāng)前和先前版本以及交易ID。 這是我的玩具項目中的一個簡單示例(但實際上,在編寫類似代碼之前,請考慮一下Storm-contrib ):

public void put(String table, List<String> keys, List<OpaqueValue> opaqueStrings) {;// this should be optimized with C* batches... for (Pair<String, OpaqueValue> keyValue : Utils.zip(keys, opaqueStrings)) { PreparedStatement statement = getSession().prepare(format("INSERT INTO %s (id, txid, prev, curr) values (?, ?, ?, ?)", table)); OpaqueValue opaqueVal = keyValue.getValue(); execute(new BoundStatement(statement).bind(keyValue.getKey(), opaqueVal.getCurrTxid(), opaqueVal.getCurr(), opaqueVal.getPrev())); } } public List<OpaqueValue> get(String table, List<String> keys) {;List<OpaqueValue> vals = new ArrayList<>(keys.size()); ResultSet rs = execute(format("select id, txid, prev, curr from %s where id in ( %s ) ", table, toCsv(keys) )); Map<String, OpaqueValue> data = toMapOfOpaque(rs); for (String key: keys){ vals.add(data.get(key)); }return vals; }

然后,要真正獲得Trident的一次語義,唯一要做的就是將其包裝在OpaqueMap中,如下所示:

public static StateFactory FACTORY = new StateFactory() { public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return OpaqueMap.build(new TimelineBackingMap(new CassandraDB(conf))); } }

幕后發(fā)生的事情是, OpaqueMap將根據(jù)與當(dāng)前批處理元組關(guān)聯(lián)的事務(wù)ID和在持久性中找到的事務(wù)ID,選擇要顯示給我們的更新邏輯的先前存儲的狀態(tài)(“ curr”或“ prev”)。 該事務(wù)ID是由噴嘴提供的,因此這就是保持噴嘴與狀態(tài)選擇對齊如此重要的原因:狀態(tài)對每個事務(wù)ID的含義進(jìn)行假設(shè)。

不要破壞前一個實例!

讓我們回到上面提到的read-update-write序列的步驟2。 既然我們知道不透明邏輯需要存儲任何狀態(tài)的新版本和舊版本,請查看以下Reducer代碼并嘗試確定其損壞原因:

public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;if (ENTER == event.getEventType()) { curr.setStartTime(event.getTime()); // buggy code } else { curr.setEndTme(event.getTime()); // buggy code } return curr; }

函數(shù)式編程的專家稱其為“不純”方法,因為它會修改其輸入?yún)?shù)。 它破壞Storm不透明邏輯的原因是,現(xiàn)在“當(dāng)前”和“先前” java引用實際上都引用內(nèi)存中的同一實例。 因此,當(dāng)不透明邏輯同時保留某個狀態(tài)的先前版本和當(dāng)前版本時,實際上它保存的是新版本的兩倍,因此先前的版本丟失了。

更好的實現(xiàn)可能是這樣的

public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;RoomPresencePeriod updated = new RoomPresencePeriod(curr); // copy constructor if (ENTER == event.getEventType()) { updated.setStartTime(event.getTime()); } else { updated.setEndTme(event.getTime()); } return updated; }

第3部分:人為錯誤:全部重播

最后一點,我們必須謙虛地意識到,無論我們采取了多少上述努力和保障,我們?nèi)匀粫谏a(chǎn)環(huán)境中部署錯誤(對此,我發(fā)誓,抱歉!)。 對于數(shù)據(jù)處理平臺,錯誤可能意味著破壞數(shù)據(jù)的錯誤,當(dāng)數(shù)據(jù)是我們的業(yè)務(wù)時,這是很糟糕的。 在某些情況下,我們只會發(fā)現(xiàn)事實之后數(shù)據(jù)已損壞,就像上面有關(guān)multiget的注釋中所述。

內(nèi)森·馬茲(Nathan Marz)在他的《 大數(shù)據(jù)》書中 ,描述了一個簡單的基于Lambda架構(gòu)的“重播所有”想法,以解決該想法。 這本書的簡短摘要也可以在這里找到 。

參考:來自Svend博客的 JCG合作伙伴 Svend Vanderveken 在Storm Trident拓?fù)渲械腻e誤處理 。

翻譯自: https://www.javacodegeeks.com/2014/02/error-handling-in-storm-trident-topologies.html

總結(jié)

以上是生活随笔為你收集整理的Storm Trident拓扑中的错误处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。