Storm Trident拓扑中的错误处理
這篇文章總結了我在設計Storm Trident拓撲時當前的錯誤處理方法。 我在這里關注代碼設計,而不是監督或冗余之類的部署良好實踐。
由于Storm的實時流性質,當面對大多數錯誤時,我們最終將不得不移至下一個數據。 在這種情況下,錯誤處理歸結為(或沒有)報告此錯誤,并在以后(或沒有)重試處理失敗的輸入數據。 這篇文章的第1部分是關于這方面的。
這意味著在處理元組時,通常很難確定它是我們第一次遇到它還是它的內容已經部分地應用于持久性。 因此,我們需要使狀態更新操作成為冪等,這是本文的第二部分。
不要對這篇文章的大小印象深刻,Storm實際上為我們完成了大部分工作。 真正需要做的只是了解如何以合理的方式插入東西。
這篇文章基于Storm 0.9,Cassandra 2.0.4和Kafka 0.7。 我在github上放置了一個玩具項目 ,以說明下面討論的幾點。 該項目實際上是根據我在上一篇文章中介紹的“房間存在”示例改編的 。
第1部分:處理錯誤情況
決定何時要求重試
第一個簡單的錯誤處理策略是簡單地接受運行時錯誤導致的計算質量下降。 例如,如果拓撲在最近的滑動窗口上計算一些實時趨勢估計,或者如果我們已經在處理諸如Twitter公開流之類的采樣數據,則可能是這種情況。 如果我們選擇忽略此類錯誤,則實現起來非常簡單,只需用大量的try / catch包裝拓撲邏輯,以某種方式報告錯誤,并且不要讓任何事情冒充Storm。
但是,在大多數情況下,我們關心一致性,因此必須對嘗試重試或不嘗試失敗的數據做出謹慎的決定。
運行時錯誤的一個典型示例是入站數據格式問題。 在那種情況下,重試當然是沒有意義的,因為它不會第二次變得更好。 相反,我們應該記錄故障數據,并可能要求某些人進行調查。 這是我的玩具項目中BytesToString Storm函數的一個簡單示例:
public class BytesToString extends BaseFunction { @Overridepublic void execute(TridentTuple tuple, TridentCollector tridentCollector) { try { String asString = new String((byte[]) tuple.getValueByField("bytes"), "UTF-8"); tridentCollector.emit(new Values(asString)); } catch (UnsupportedEncodingException e) { logger.err("ERROR: lost data: unable to parse inbound message from Kafka (expecting UTF-8 string)", e); } }另一方面,如果錯誤與某些不可訪問的外部數據源有關,例如由網絡分區引起的錯誤,我們應按下一節所述觸發重試。
除上述兩種錯誤外,還有許多其他類型的錯誤,但要點仍然是:區分可重試錯誤與不可重試錯誤并做出相應反應很重要。
最后一點,當您決定不報告在IBackingMap的multiget中發生的錯誤時,請格外小心 ,因為該函數必須返回與輸入鍵列表大小相同的列表。 因此,如果出現不可重試的錯誤,我們必須以某種方式返回某些結果。 在大多數情況下,如果我們選擇不重試這種情況下的錯誤,那是因為某些過去的錯誤已經在持久性方面破壞了某些內容,并且為時已晚。 在下面的示例中,由于對從DB讀取的某些數據進行的解析失敗而發生錯誤,并且代碼僅返回null值,這等同于考慮到持久性沒有任何作用(至少沒有用處)。 另請參閱下面的第3部分,以了解針對這種情況的可能解決方案。
@Override public List<OpaqueValue> multiGet(List<List<Object>> keys) { try { return Utils.opaqueStringToOpaqueValues(opaqueStrings, HourlyTimeline.class); } catch (IOException e) { logger.err("error while trying to deserialize data from json => giving up (data is lost!)", e); return Utils.listOfNulls(keys.size()); // this assumes previous state does not exist => destroys data! } }(好吧,來自TimelineBackingMap的這段代碼實際上將所有數據替換為null,這使情況變得更糟,但這是一個玩具項目……)
導致三叉戟元組被重播…
一旦確定觸發元組重播是合理的,我們只需要詢問它,Storm就會做其他所有事情(只需插入正確的噴嘴,請參閱下一節)。 從技術上講,這很簡單:從功能或過濾器之類的Trident原語中觸發重試就像拋出FailedException一樣簡單,就像玩具項目中的TimeLineBackingMap中一樣,其中包括重試和非重試錯誤的示例(請注意,代碼下面來自TimelineBackingMap的示例假定任何數據庫錯誤都是可重試的,這過于簡化了):
@Override public void multiPut(List<List<Object>> keys, List<OpaqueValue> timelines) {;List<OpaqueValue> jsonOpaqueTimelines; try { jsonOpaqueTimelines = Utils.opaqueValuesToOpaqueJson(timelines); } catch (IOException e) { System.err.println("error while trying to serialize data to json => giving up (data is lost!)"); return; }if (jsonOpaqueTimelines != null) { try { DB.put("room_timelines", toSingleKeys(keys), jsonOpaqueTimelines); } catch (Exception e) { logger.err("error while storing timelines to cassandra, triggering a retry...", e); throw new FailedException("could not store data into Cassandra, triggering a retry...", e); } } };然后,Storm會將錯誤傳播回噴嘴,以強制重播元組。 如果我們希望在Storm UI中報告錯誤,則可以拋出ReportedFailedException。
我強烈不建議使用的另一種方法是讓任何其他類型的RuntimeException冒泡到Storm。 這本質上以更高的性能成本實現了相同的結果:它將觸發工作節點崩潰,并且Nimbus將自動重啟,并且所有spout將恢復從最新的已知成功索引中讀取(spout實現(如Kafka spout將其最新成功處理的偏移存儲在zookeeper中)為了這個目的)。 這種快速失敗策略是Storm設計的一部分(請參閱有關工人監督和容錯的文檔)。 從本質上講,這實現了與讓spout重播某些元組相同的一致性保證,但是對性能的影響當然更大,因為我們具有完整的JVM重新啟動并重置了所有當前正在運行的拓撲實例。 因此,切勿故意這樣做。 仍然令人放心的是,如果我們的節點崩潰,數據不會中斷,并且流量自然會繼續。
Storm決定重播元組的第三種情況是它們是否在配置的超時之前未到達拓撲的末尾。 更確切地說,如果未按時收到ACK,則該機制實際上是由發出該元組的spout觸發的,因此,如果元組成功處理但由于某些網絡分區ACK無法到達該spout,則也可以觸發這些重播。 用于控制此設置的Storm參數是topology.enable.message.timeouts和topology.message.timeout.secs ,根據defaults.yaml的默認值為“ true”和30秒。 這只是為什么拓撲中的冪等性如此重要的又一個原因。
…并實際上重播元組
一旦失敗通知到達噴嘴(或在超時情況下由通知生成),我們需要確保失敗的元組將被重播。 除非您自己開發噴嘴,否則只能歸結為選擇正確的噴嘴口味 。 此選擇會影響元組的重播(或不重播)方式,因此它必須與適當的策略保持一致,以處理拓撲中的已重播的元組,這是下一部分的主題。 有3種噴口:
- 非事務性:無保證,但如果您選擇的實現提供“至少一次”保證,在某些情況下它們仍然有用
- 事務性的:不建議使用,因為它們在某些分區情況下可能會阻止拓撲
- opaque(不透明):就重播而言,它們達到元組至少會被播放一次,但在重播方面提供了弱保證,但在重播的情況下,發出的批次可能會不同。 在實踐中,使用它們時,我所建議的所有重要事項是確保拓撲對于這種靈活的重放具有魯棒性,這將在下一部分中進行討論。
關于元組和批處理重播的最后說明
我在元組級別上進行了討論,因為這使設計決策更簡單。 實際上,要求Storm重播單個元組將觸發同一批中包含的許多其他元組的重播,其中一些可能沒有錯誤。
第2部分:重播元組的冪等處理
故事的另一面是,既然我們知道元組可能會被處理幾次,請確保拓撲是冪等的,即,發送相同元組的次數不會使狀態不一致。 沒有副作用的拓撲部分當然不受元組重播的影響。
關于狀態一致性的Storm Trident文檔非常清楚,因此我在這里僅添加一些內容。
如果我們的狀態更新操作已經冪等
如果狀態更新操作本質上已經是冪等的,那么它已經具有元組重播的彈性,并且不需要Storm特殊機制。
如果id值完全基于入站元組內容,則任何“按id存儲”操作都是這種情況。 例如,在我的玩具項目中,我存儲了占用會話,這些會話的主鍵是從入站事件中找到的相關ID派生的,因此在這種情況下,寫操作已經可以重播了,因為任何重播都只會覆蓋相同的現有數據信息而不會破壞任何數據(假設我們有訂購保證,在這種情況下是正確的)。
public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) { DB.upsertPeriods(newOrUpdatedPeriods); }在CassandraDB.java中:
try { PreparedStatement statement = getSession().prepare("INSERT INTO presence (id, payload) values (?,?)"); execute(new BoundStatement(statement).bind(rpp.getId(), periodJson)); } catch (Exception e) { logger.error("error while contacting Cassandra, triggering a retry...", e); new FailedException("error while trying to record room presence in Cassandra ", e); }同時使read-update-write操作成為冪等
我在先前的博客文章中描述了Storm如何使我們能夠實現執行以下操作而不需要DB鎖并且仍然避免出現競爭情況:
- 從數據庫讀取以前的狀態,
- 根據新的元組數據更新內存中的狀態,
- 將新狀態保存到數據庫
風暴的美麗之處在于,為了處理重播的元組而不破壞狀態,我們只需要調整步驟1和3。這是非常重要的:我們現在可以在步驟2中實現所有處理邏輯,就像每個元組只被播放一次,然后根本不關心重播(只要我們是“純”的,請參見下面的評論…)。 這就是“風暴只有一次語義”的含義。
而且,如果我們在內部實現1和3,則使它們重播即可,只是將它們與現有的Storm類包裝在一起即可。 最健壯的方式是使用Opaque邏輯,但代價是每個狀態存儲兩次狀態,如Trident文檔中關于transaction spout的說明 。
更好的是,已經有很多不透明的BackingMap實現可用于Storm-contrib中的諸如Cassandra或Mysql的許多后端,因此,在大多數情況下,除了選擇正確的之外,實際上沒有任何其他事情可做。
最重要的一點是,要使用處理重播元組的不透明BackingMap,必須使用尊重不透明先決條件的噴嘴,如本矩陣所述 。
如果由于某種原因需要實現自己的BackingMap,我們唯一要做的就是使它存儲數據的當前和先前版本以及交易ID。 這是我的玩具項目中的一個簡單示例(但實際上,在編寫類似代碼之前,請考慮一下Storm-contrib ):
public void put(String table, List<String> keys, List<OpaqueValue> opaqueStrings) {;// this should be optimized with C* batches... for (Pair<String, OpaqueValue> keyValue : Utils.zip(keys, opaqueStrings)) { PreparedStatement statement = getSession().prepare(format("INSERT INTO %s (id, txid, prev, curr) values (?, ?, ?, ?)", table)); OpaqueValue opaqueVal = keyValue.getValue(); execute(new BoundStatement(statement).bind(keyValue.getKey(), opaqueVal.getCurrTxid(), opaqueVal.getCurr(), opaqueVal.getPrev())); } } public List<OpaqueValue> get(String table, List<String> keys) {;List<OpaqueValue> vals = new ArrayList<>(keys.size()); ResultSet rs = execute(format("select id, txid, prev, curr from %s where id in ( %s ) ", table, toCsv(keys) )); Map<String, OpaqueValue> data = toMapOfOpaque(rs); for (String key: keys){ vals.add(data.get(key)); }return vals; }然后,要真正獲得Trident的一次語義,唯一要做的就是將其包裝在OpaqueMap中,如下所示:
public static StateFactory FACTORY = new StateFactory() { public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return OpaqueMap.build(new TimelineBackingMap(new CassandraDB(conf))); } }幕后發生的事情是, OpaqueMap將根據與當前批處理元組關聯的事務ID和在持久性中找到的事務ID,選擇要顯示給我們的更新邏輯的先前存儲的狀態(“ curr”或“ prev”)。 該事務ID是由噴嘴提供的,因此這就是保持噴嘴與狀態選擇對齊如此重要的原因:狀態對每個事務ID的含義進行假設。
不要破壞前一個實例!
讓我們回到上面提到的read-update-write序列的步驟2。 既然我們知道不透明邏輯需要存儲任何狀態的新版本和舊版本,請查看以下Reducer代碼并嘗試確定其損壞原因:
public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;if (ENTER == event.getEventType()) { curr.setStartTime(event.getTime()); // buggy code } else { curr.setEndTme(event.getTime()); // buggy code } return curr; }函數式編程的專家稱其為“不純”方法,因為它會修改其輸入參數。 它破壞Storm不透明邏輯的原因是,現在“當前”和“先前” java引用實際上都引用內存中的同一實例。 因此,當不透明邏輯同時保留某個狀態的先前版本和當前版本時,實際上它保存的是新版本的兩倍,因此先前的版本丟失了。
更好的實現可能是這樣的
public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;RoomPresencePeriod updated = new RoomPresencePeriod(curr); // copy constructor if (ENTER == event.getEventType()) { updated.setStartTime(event.getTime()); } else { updated.setEndTme(event.getTime()); } return updated; }第3部分:人為錯誤:全部重播
最后一點,我們必須謙虛地意識到,無論我們采取了多少上述努力和保障,我們仍然會在生產環境中部署錯誤(對此,我發誓,抱歉!)。 對于數據處理平臺,錯誤可能意味著破壞數據的錯誤,當數據是我們的業務時,這是很糟糕的。 在某些情況下,我們只會發現事實之后數據已損壞,就像上面有關multiget的注釋中所述。
內森·馬茲(Nathan Marz)在他的《 大數據》書中 ,描述了一個簡單的基于Lambda架構的“重播所有”想法,以解決該想法。 這本書的簡短摘要也可以在這里找到 。
翻譯自: https://www.javacodegeeks.com/2014/02/error-handling-in-storm-trident-topologies.html
總結
以上是生活随笔為你收集整理的Storm Trident拓扑中的错误处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 与詹金斯一起连续交付Heroku
- 下一篇: 在JUnit中测试预期的异常