使用Storm进行可扩展的实时状态更新
在本文中,我將說明如何借助Storm框架以可擴展且無鎖定的方式在數據庫中維護實時事件驅動流程的當前狀態。
Storm是基于事件的數據處理引擎。 它的模型依賴于基本原語,例如事件轉換,過濾,聚合……,我們將它們組合成拓撲 。 拓撲的執行通常分布在多個節點上,并且風暴群集還可以并行執行給定拓撲的多個實例。 因此,在設計時,務必牢記哪些Storm原語在分區范圍內執行,即在一個群集節點的級別上執行,以及哪些在群集范圍內執行(又稱為重新分區操作) ,因為它們涉及將事件從分區到分區)。 Storm Trident API文檔明確提到了哪些功能做什么,作用范圍如何。 Storm的分區概念與Kafka隊列的分區概念保持一致, Kafka隊列是入站事件的常見來源。
拓撲通常需要維護一些執行的持續狀態。 例如,這可以是一些傳感器值的滑動窗口平均值,從推文中提取的近期情緒,在不同位置出現的人數。……由于某些狀態更新操作具有分區范圍(例如partitionAggregate ),因此可伸縮性模型在這里尤為重要。其他則具有集群范圍(例如groupby + perstitentAggregate的組合)。 這篇文章中說明了這一點。
示例代碼可在githup上獲得 。 它基于Storm 0.8.2,Cassandra 1.2.5和JDK 1.7.0。 請注意,此示例未包含適當的錯誤處理:噴口或螺栓均不支持重試失敗的元組,我將在以后的文章中解決。 另外,我使用Java序列化將數據存儲在元組中,因此,即使Storm支持多種語言,我的示例也是特定于Java的。
實際示例:出席事件
我的示例是模擬一個跟蹤人們在建筑物內位置的系統。 每當用戶進入或離開房間時,每個房間入口處的傳感器都會發出如下事件:
{"eventType": "ENTER", "userId": "John_5", "time": 1374922058918, "roomId": "Cafetaria", "id": "bf499c0bd09856e7e0f68271336103e0A", "corrId": "bf499c0bd09856e7e0f68271336103e0"} {"eventType": "ENTER", "userId": "Zoe_15", "time": 1374915978294, "roomId": "Conf1", "id": "3051649a933a5ca5aeff0d951aa44994A", "corrId": "3051649a933a5ca5aeff0d951aa44994"} {"eventType": "LEAVE", "userId": "Jenny_6", "time": 1374934783522, "roomId": "Conf1", "id": "6abb451d45061968d9ca01b984445ee8B", "corrId": "6abb451d45061968d9ca01b984445ee8"} {"eventType": "ENTER", "userId": "Zoe_12", "time": 1374921990623, "roomId": "Hall", "id": "86a691490fff3fd4d805dce39f832b31A", "corrId": "86a691490fff3fd4d805dce39f832b31"} {"eventType": "LEAVE", "userId": "Marie_11", "time": 1374927215277, "roomId": "Conf1", "id": "837e05916349b42bc4c5f65c0b2bca9dB", "corrId": "837e05916349b42bc4c5f65c0b2bca9d"} {"eventType": "ENTER", "userId": "Robert_8", "time": 1374911746598, "roomId": "Annex1", "id": "c461a50e236cb5b4d6b2f45d1de5cbb5A", "corrId": "c461a50e236cb5b4d6b2f45d1de5cbb5"}對應于一個房間內一個用戶的一個使用周期的(“ ENTER”和“ LEAVE”)對中的每個事件具有相同的相關性ID。 這可能對傳感器提出了很多要求,但是出于本示例的目的,這使我的生活更加輕松 。
為了使事情變得有趣,讓我們想象一下,不能保證到達我們服務器的事件遵守時間順序(請參閱生成事件的python腳本中的shuffle()調用)。
我們將構建一個Storm拓撲,該拓撲將構建每個房間的每分鐘每分鐘的占用時間線,如本文結尾處的時間圖所示。 在數據庫中,房間時間線被切成一個小時的時間段,這些時間段被獨立存儲和更新。 這是Cafetaria占用1小時的示例:
{"roomId":"Cafetaria","sliceStartMillis":1374926400000,"occupancies":[11,12,12,12,13,15,15,14,17,18,18,19,20,22,22,22,21,22,23,25,25,25,28,28,33,32,31,31,29,28,27,27,25, 22,22,21,20,19,19,19,17,17,16,16,15,15,16,15,14,13,13,12,11,10,9,11,10,9,11,10]}為了實現這一點,我們的拓撲需要:
- 根據correlationIDID重新組合“ ENTER”和“ LEAVE”事件,并為此用戶在此房間中產生相應的存在時間
- 將每個在場期間的影響應用于房間入住時間表
順便說一句,Cassandra提供了Counter列 ,盡管我可以很好地替代它們,但我在這里不使用它們。 但是,我的目的是說明Storm功能,即使它會使方法有些虛構。
分組依據/ persistentAggregate / iBackingMap說明
在查看示例代碼之前,讓我們澄清一下這些“三叉戟風暴”原語如何協同工作。
想象一下,我們從上午9:47到上午10:34收到了兩個描述用戶在roomA中存在??的事件。 更新會議室的時間表需要:
- 從數據庫加載兩個受影響的時間軸切片:[9.00am,10:00 am]和[10.00am,11:00 am]
- 在這兩個時間軸切片中添加此用戶的狀態
- 將它們保存到數據庫
但是,像這樣天真地實現這一目標遠非最優,首先是因為它每個事件使用兩個DB請求,其次是因為這種“讀取-更新-寫入”序列通常需要一種鎖定機制,該鎖定機制通常無法很好地擴展。
為了解決第一點,我們想對幾個事件重新組合數據庫操作。 在Storm中,事件(或元組 )被成批處理。 IBackingMap是一個我們可以實現的原語,它使我們可以立即查看整批元組。 我們將使用它在批處理的開始(multiget)和結束時的所有DB-write操作(multiput)重新分組。 但是,multiget不允許我們查看元組本身,而只能查看“查詢鍵”,這是從元組內容中計算出來的,如下所述。
原因在于上面提到的關于天真的實現的第二點:我們想并行執行幾個[multiget +更新邏輯+ multiput]流,而不依賴鎖。 這是通過確保那些并行子進程更新不相交的數據集來實現的。 這就要求定義拆分為并行流的拓撲元素還控制在每個流內的DB中加載和更新哪些數據。 該元素是Storm groupBy原語:它通過按字段值對元組進行分組來定義拆分,并且它通過將“ groupedBy”值作為對multiget的查詢關鍵字來控制每個并行流更新的數據。
下圖在房間占用示例中對此進行了說明(簡化為每個房間僅存儲一個時間軸,而不是每個一小時片段存儲一個時間軸):
但是,并行性并沒有完全發生(例如,當前的Storm實現在分組流中依次調用每個reducer / combiner),但這是設計拓撲時要牢記的一個好模型。
有趣的是,在groupBy和multiget之間發生了一些Storm魔術。 回想一下,Storm旨在進行大規模分布,這意味著每個流在多個節點上并行執行,并從諸如Hadoop HDFS或分布式Kafka隊列之類的分布式數據源獲取輸入數據。 這意味著groupBy()同時在幾個節點上執行,所有可能處理的事件都需要組合在一起。 groupBy是一種重新分區操作 ,可確保將所有需要分組的事件都發送到同一節點,并由IBackingMap +組合器或化簡器的同一實例處理,因此不會發生爭用情況。
同樣,Storm要求我們將IBackingMap包裝到可用的Storm MapState原語(或我們自己的原語)之一中,通常用于處理失敗/重播的元組。 如上所述,我不在本文中討論這一方面。
使用這種方法,我們必須實現IBackingMap,以便它尊重以下屬性:
- 對于不同的鍵值,由multiget讀取和由IBackingMap的multiput操作寫入的數據庫行必須是不同的。
我想這就是他們將這些值稱為“關鍵”的原因 (盡管任何尊重此屬性的方法都可以)。
回到例子
讓我們看看這在實踐中是如何工作的。 該示例的主要拓撲在此處可用:
// reading events .newStream("occupancy", new SimpleFileStringSpout("data/events.json", "rawOccupancyEvent")) .each(new Fields("rawOccupancyEvent"), new EventBuilder(), new Fields("occupancyEvent"))第一部分只是讀取JSON格式的輸入事件(我使用的是簡單的文件輸出),對它們進行反序列化,然后使用Java序列化將它們放入稱為“ occupancyEvent”的元組字段中。 這些元組中的每一個都描述了用戶在房間內或房間外的“ ENTER”或“ LEAVE”事件。
// gathering "enter" and "leave" events into "presence periods" .each(new Fields("occupancyEvent"), new ExtractCorrelationId(), new Fields("correlationId")) .groupBy(new Fields("correlationId")) .persistentAggregate( PeriodBackingMap.FACTORY, new Fields("occupancyEvent"), new PeriodBuilder(), new Fields("presencePeriod")) .newValuesStream()當我們遇到correlationId的不同值時,groupBy原語會創建盡可能多的元組組(這可能意味著很多,因為通常最多兩個事件具有相同的correlationId)。 當前批處理中具有相同相關ID的所有元組將重新組合在一起,并且一組或幾組元組將一起顯示給persistentAggregate中定義的元素。 PeriodBackingMap是IBackingMap的實現,其中實現了multiget方法,該方法將接收將在后續步驟中處理的元組組的所有相關ID(例如:{“ roomA”,“ roomB”,“ Hall ”},如上圖所示。
public List<RoomPresencePeriod> multiGet(List<List<Object>> keys) {return CassandraDB.DB.getPresencePeriods(toCorrelationIdList(keys)); }該代碼只需要從數據庫中檢索每個相關ID的潛在存在期間即可。 因為我們在一個元組字段上執行了groupBy,所以每個List在這里都包含一個字符串:correlationId。 請注意,我們返回的列表必須與鍵列表的大小完全相同,以便Storm知道哪個周期對應于哪個鍵。 因此,對于數據庫中不存在的任何鍵,我們只需在結果列表中放置一個空值即可。
一旦加載,Storm就會將一個具有相同相關性ID的元組一個一個地呈現給我們的化簡器PeriodBuilder 。 在我們的例子中,我們知道在此批處理中每個唯一的RelationshipId最多被調用兩次,但是一般來說可能會更多,或者如果當前批處理中不存在其他ENTER / LEAVE事件,則只能調用一次。 在對muliget()/ multiput()的調用與我們的reducer之間,借助我們選擇的MapState實現,Storm讓我們可以插入適當的邏輯來重放以前失敗的元組。 在以后的文章中有更多關于…
一旦我們減少了每個元組序列,Storm就會將結果傳遞給IBackingMap的mulitput(),在這里我們將所有內容“追加”到數據庫:
public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) {CassandraDB.DB.upsertPeriods(newOrUpdatedPeriods); }Storm persistenceAggregate會使用我們的reducer提供給multitput()的值自動向拓撲元組的后續部分發出。 這意味著我們剛剛建立的在線狀態很容易作為元組字段使用,我們可以使用它們直接更新會議室時間線:
// building room timeline .each(new Fields("presencePeriod"), new IsPeriodComplete()) .each(new Fields("presencePeriod"), new BuildHourlyUpdateInfo(), new Fields("roomId", "roundStartTime")) .groupBy(new Fields("roomId", "roundStartTime")) .persistentAggregate( TimelineBackingMap.FACTORY, new Fields("presencePeriod","roomId", "roundStartTime"), new TimelineUpdater(), new Fields("hourlyTimeline"))第一行只是過濾掉尚未包含“ ENTER”和“ LEAVE”事件的任何期間。
然后, BuildHourlyUpdateInfo實現一對多的元組發射邏輯:對于每個占用期,它僅在“開始時間”內發射一個元組。 例如,從9:47 am到10:34 am在roomA中的占用將在此處觸發針對roomA的9.00am時間軸切片的元組,以及另一個針對10.00am的元組的發射。
下一部分實現與以前相同的groupBy / IBackingMap方法,只是這次使用兩個分組鍵而不是一個(因此,mulitget中的List <Object>將包含兩個值:一個String和一個Long)。 由于我們存儲一個小時的時間軸塊,因此上面提到的IBackingMap的必要屬性受到尊重。 多重獲取為每個(“ roomId”,“開始時間”)對檢索時間線塊,然后TimelineUpdater (再次是reducer)用與當前批次中找到的該時間線片相對應的每個存在時間更新時間線片(這就是BuildHourlyUpdateInfo的一對多元組發射邏輯)和multiput()僅保存結果。
導致咖啡廳占用
當我們看著它時,一切總是更加美麗,因此讓我們來繪制房間的占用情況 。 稍后,用一些R代碼 ,我們可以看到每分鐘的房間占用情況(由于所有數據都是隨機的,所以意義不大,但是……):
結論
希望本文提供了一種維護Storm拓撲狀態的有用方法。 我還嘗試說明了將處理邏輯實現為小型拓撲元素的實現,這些拓撲元素彼此插入,而不是將一些“冗長的”螺栓捆綁在冗長而復雜的邏輯部分上。
Storm的一個重要方面是它的可擴展性,很可能會在任何地方插入該子類或子類的子類來調整其行為。 春天十年前有這種聰明有趣的感覺(哦,該死,我現在有點老了……^ __ ^)
參考:來自Svend博客的 JCG合作伙伴 Svend Vanderveken 使用Storm進行的可伸縮實時狀態更新 。翻譯自: https://www.javacodegeeks.com/2013/08/scalable-real-time-state-update-with-storm.html
總結
以上是生活随笔為你收集整理的使用Storm进行可扩展的实时状态更新的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 房价备案价格还可以还价吗(房价备案价格)
- 下一篇: 在Amazon EMR上运行Hadoop