日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Trident State译文

發布時間:2025/3/15 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Trident State译文 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Trident State?譯文

Trident針對狀態化的數據源的讀寫進行了一流的分裝。State可以包含在拓撲中-例如,保存在內存中,有HDFS提供備份-也可以保存在一個外部的數據庫中,像MemcachedCassandra針對以上的所有情況,Trident的API都是一樣的

?

了保證state在更新的過程中遇到重試或者失敗處理時任然能夠具有冪等性,storm采取了必要的容錯。也就是說,storm能夠做到每一條消息僅且僅被處理一次。

?

在進行state更新操作的時候,可以選擇不同等級的容錯方式;在看這些容錯方式之前,讓我們來用一個例子說明如何保證僅且僅被處理一次的語意。假設你正在某個流中進行累加的聚合操作,并且準備把聚合的結果保存在數據庫中。 現在你在數據庫中保存了一個值來表示累加的結果,每處理一個tuple你就對數據庫中的值進行一次累加操作。

?

當失敗處理發生的時候,tuples就會被重放。這就給state的更新操作(還有任何會帶來副作用的操作)帶來了問題--你將無法確定你是否已經基于這個被重發的tuple對state成功地進行了更新操作。也許你還從來沒有處理過這個tuple,在這種情況下你就需要對數據庫中的值進行一次累加操作。也許你已經成功處理過這個tuple并且對數據庫中的值進行過了一次累加操作,但是這個tuple在你更新state之后的某個環節出錯了;在這種情況下,你在接收到這個tuple的時候就不應該對數據庫中的值進行累加操作了。也可能這個tuple曾經出現過,但是在對數據庫中的值進行累加的時候出錯了,在這種情況下你需要對數據庫中的值進行累加操作。

?

僅僅在數據庫中保存累加的值,你永遠無法確定這個tuple是否已經被處理過了。所以你需要更多的信息來幫助你做出正確的決定。Trident提供了一下的語義來幫助用戶獲得僅且僅被處理一次的語義。

?

1.所有的tuple都是一小批一小批的發送的(以batch的方式發送)。

2.每一個批量的tuple都會被賦予一個唯一的"transaction id" (txid);加入該批tuple被重播,那么該批tuple仍然保持相同的txid。

3.State的更新在各個批次的tuple之間是有序的,也就是說,只有第2批成功更新以后,第3批才會執行對state的更新操作。

?

?

有了這些保障,你自己的state就能夠檢測到某一批tuple是否被處理過,并選擇正確的方式來更新state。你到底要采取什么樣的方式來更新state依賴于輸入的spout也就是每一個批量的tuple所提供的一致性語義。Storm提供三種容錯級別的soput:"non-transactional"(非事務型), "transactional"(事務型), 和 "opaque transactional"(透明事務型)。同樣的storm也提供了三種容錯級別的state:"non-transactional"(非事務型), "transactional"(事務型), 和 "opaque transactional"(透明事務型)。讓我們來看看每一種事務類型的spout,以及通過每種spout你所能獲得的容錯方式。

?

?

?

Transactional spouts

記住,Trident總是一小批一小批的處理tuple,并且每一個批次有一個唯一的事務IDSpout的特性有其鎖提供的保障措施決定;事務型的spout具有一下特性:

?

1.?一個txid所對應的batch永遠是相同的。同一個txid的重放的batch永遠和之前該txid所對應的batch相同。

2.?不同batch中的tuple之間不會存在交集(一個tuple不是屬于這個batch,就是屬于另一個batch,永遠不能同時屬于兩個以上的batch)。

3.?每一個tuple都一定會在一個batch中被發送(沒有任何一個tuple被遺漏)。

?

?

這是一種很容易理解的spout類型,一個流被劃分成固定的批次,并且永遠不會改變。Storm提供了一個針對kafka的事務型spout

?

你也許會問:為什么我們不總是使用transactional spout?這很容易理解。一個原因是并不是所有的地方都需要容錯的。舉例來說,TransactionalTridentKafkaSpout 工作的方式是給定一個txidbatch所包含的一個屬于一個topic的來自于所有Kafka partitiontuple序列。一旦這個batch被發出,在任何時候如果這個batch被重新發出時,它必須包含原來所有的tuple以滿足?transactional spout的語義。現在我們假定一個batchTransactionalTridentKafkaSpout所發出,這個batch沒有被成功處理,并且同時kafka的一個節點也down掉了。你就無法像之前一樣重播一個完全一樣的batch(因為kakfa的節點down掉,該topic的一部分partition可能會無法使用),整個處理會被中斷。

?

這也就是"opaque transactional" spouts(不透明事務spout)存在的原因- 他們對于丟失源節點這種情況是容錯的,仍然能夠幫你達到有且只有一次處理的語義。后面會對這種spout有所介紹。

?

(當然,在Kafka開啟replication功能時,transactional spout也是可以做到容錯的;現在的kafka已經完全支持了,所以,上文中所說的當一個節點掛掉以后TransactionalTridentKafkaSpout無法正常工作的情況也就不存在了,也正是因為這樣,大部分時間都選擇使用了TransactionalTridentKafkaSpout,個人感覺在使用kafka的時候"opaque transactional" spouts確實沒有存在的意義)

?

?

在討論?"opaque transactional" spouts之前,讓我們先來看看你該如何為transactional spout設計一個具有僅且僅處理一次的state。這個state的類型被稱為"transactional state"?,它利用任何txid都永遠對應與相同一個批次的tuple的特性。

?

假設你的拓撲是用來統計單詞個數的,并且你將要把統計結果保存在一個key-value數據庫中。Key肯定就是對應的單詞了,值當然就是統計結果。你已經看到只是存儲一個數量是不足以知道你是否已經處理過一個batch的。所以,你需要將txid和統計結果一起保存在值中。那么,當你需要更新統計結果的時候,你只需要比較一下數據庫中的txid和當前batchtxid是否相同;如果相同,你就直接跳過更新操作--因為有強順序的保障,你可以肯定數據庫中的值已經包含了當前batch。如果不相同,你就修改統計結果。這個邏輯之所以能說的通是因為batchtxid永遠不會改變,并且batch之間有序地對state進行更新操作。

?

?

用一個例子來說明這個邏輯為什么行得通,假如你發送了一個txid=3batch,該batch中包含一下的tuple

?

[“man”]

[“man”]

[“dog”]

?

?

假設現在數據庫中保存這如下的key-value數據:

?

man => [count=3,txid=1]

dog => [count=4,txid=3]

apple =>[count=10,txid=2]

?

man相關聯的txid1;由于當前的batchtxid3,那么你就可以肯定這批tupleman 的值還沒有累加到數據庫中。所以你可以給mancount累加2,并且更新txid3。然而,dog對應的txid在數據庫中和當前batch中 一樣,所以你可以肯定對于dog來說當前batch中的值已經在數據庫中增加過了。那么就選擇跳過更新。在該batch更新后,數據庫中的數據如下所示:

?

man => [count=5,txid=3]

dog => [count=4,txid=3]

apple =>[count=10,txid=2]

?

接下來我們一起再來看看 opaque transactional spout以及怎樣去為這種spout設計相應的state

?

?

Opaque transactional spouts

?

opaque transactional spout并不能保證每一個txid永遠對應一個相同的batchopaque transactional spout擁有如下特性:

?

1.?每一個tuple都只會在一個batch中執行通過。也就是說,一個tuple在某一個batch處理失敗了,該tuple可能在之后的另一個新的batch中處理成功。

?

?

OpaqueTridentKafkaSpout就是一個擁有該特性的spout,該spout允許kafka節點掛掉。每當OpaqueTridentKafkaSpout要發送一個新的batch的時候,它將會從上一個batch所成功發送的tuple的結束點開始發送,這就保證了沒有tuple會被遺漏掉,也保證了一個tuple不會被多個batch成功處理。

?

在使用opaque transactional spouts的時候,再使用和transactional spout相同的處理方式:判斷數據庫中存放的txid和當前txid去做對比已經不好用了。這是因為在state的更新過程之間,batch可能已經變了。

?

你能做的就是在數據庫中保存更多的狀態;除了保存值和txid以外,你還需要保存更新前的值(previous value)。讓我們還是用上面的例子來說明這個邏輯。假定你當前batch中的對應count是“2”, 并且我們需要進行一次狀態更新。而當前數據庫中存儲的信息如下:

?

{

??????value = 4,

??prevValue = 1,

??txid = 2

}

?

假設當前的txid3,和數據庫中的txid2)不同。在這種情況下,你把“preValue”設置為“value”,然后將value增加2,并更新txid3。操作過后的數據庫內容變成了下面的樣子:

?

{

??value = 6,

??prevValue = 4,

??txid = 3

}

?

再假設當前的txid2,和數據庫中的txid2)相同。這時你可以確定數據庫中的“value”被之前擁有相同txidbatch更新過,但是之前的batch和現在的batch內容可能不同了。所以你要做的是讓“value”的值等于“preValue”加2,操作過后的數據庫內容變成了下面的樣子:

{

??value = 3,

??prevValue = 1,

??txid = 2

}

?

--------------------------------------------------------------------------------------------------------------------------

注:這里理解起來可能有些晦澀,舉個例子吧。

?

假設一個batch的大小為3,有下面這么多tuple要進行累加:

[dog] [dog]?[man] [man] [man]

?

假設數據庫中現在的信息為:

?

dog =>{value=2,prevValue=1,txid=1}

man =>{value=3,prevValue=1,txid=1}

?

然后發送一個txid2batch {[dog] [dog] [man]}

?

然后進行保存操作,

man 成功保存,但是dog保存的時候發生了錯誤,所以數據庫中的信息變成了

?

dog =>{value=2,prevValue=1,txid=1}

man =>{value=4,prevValue=3,txid=2}

?

那么失敗了,就會有batch的重發,恰好這是負責發送第一個 [dog]kafka節點壞掉了,batch無法獲得第一個[dog]了,那么就只能從第二個dog開始發了,所以發送的batchtxid依然為2,內容為{[dog] [man] [man]}

?

到這里,dog 的兩個txid不同,更新;但是man txid相同了,所以用prevValue+2來更新value;從這里應該可以看出,為什么是這樣做了。

?

更新后的結果如下:

dog =>{value=3,prevValue=2,txid=2}

man =>{value=5,prevValue=3,txid=2}

?

---------------------------------------------------------------------------------------------------------------------------------

因為Trident保證了batch之間的強順序性,因此這種方法是有效的。一旦Trident去處理一個新的batch,它就不會重新回到之前的任何一個batch。并且由于opaque transactional spout確保在各個batch之間是沒有共同成員的,每個tuple只會在一個batch中被成功處理,你可以安全的在之前的值上進心更新。

?

?

?

Non-transactional spouts

?

Non-transactional spout(非事務spout)不提供任何的保障。所以在tuple處理失敗后不進行重發的情況下,一個tuple可能是最多被處理一次的。同時他也可能會是至少處理一次的,如果tuple在不同的batch中被多次成功處理的時候。無論怎樣,這種spout是不可能實現有且只有一次被成功處理的語義的。

?

?

Summary of spout and state types

?

這個圖展示了哪些spoutstate的組合能夠實現有且只有一次被成功處理的語義:

?

?

?

?

Opaque transactional state有著最為強大的容錯性。但是這是以存儲更多的信息作為代價的。Transactional states 需要存儲較少的狀態信息,但是僅能和 transactional spouts協同工作. 最后, non-transactional state所需要存儲的信息最少,但是卻不能實現有且只有一次被成功處理的語義。

StateSpout類型的選擇其實是一種在容錯性和存儲消耗之間的權衡,你的應用的需要會決定那種組合更適合你。

?

?

?

?

?

State APIs

在前面你已經看到了一些用來實現僅且僅執行一次語義的復雜方法,有一個關于Trident的好消息就是,Trident把所有容錯的邏輯都在state內部實現了。那么作為一個用戶,你就從比較txid,保存多余的值到數據庫中,或者任何像它們兩個那樣的苦差事中脫離了出來。你只需要像下面這樣寫代碼就可以了:

?

?

TridentTopology topology = new TridentTopology(); ???????

TridentState wordCounts =

??????topology.newStream("spout1", spout)

????????.each(new Fields("sentence"), new Split(), new Fields("word"))

????????.groupBy(new Fields("word"))

????????.persistentAggregate(MemcachedState.opaque(serverLocations),new Count(),new Fields("count")) //重點就是這句了,這里其實使用了mapState,用來做批量的聚合結果的保//存 ??????????????

????????.parallelismHint(6);

?

?

所有管理opaque transactional state的必要邏輯都在MemcachedState.opaque方法內部實現了。另外,更新操作是批量進行的,以減少對數據庫的壓力。

?

基礎的state接口只有兩個方法:

?

public interface State {

????void beginCommit(Long txid); // can be null for things like partitionPersist occurring off //a DRPC stream(放生在DRPC流中的partitionPersist操作中,txid可能為空)

????void commit(Long txid);

}

?

?

在這個接口所提供的兩個方法中,你可以知道什么時候開了對state的更新操作,什么時候完成了對state的更新操作,在每個方法中你都能夠獲得txidTrident對你的state是如何工作的沒有做出任何的假設(也就是說,你要自己寫更新和查詢方法)。

?

加入你自己有一套數據庫,并且希望通過Trident來在其中更新、查詢用戶的位置信息。那么你自己實現的state中就要自己去寫更新和查詢的方法了:

?

public class LocationDB implements State {

????public void beginCommit(Long txid) { ???

????}

?

????public void commit(Long txid) { ???

????}

?

????public void setLocation(long userId, String location) {

??????// code to access database and set location

?//自己寫的向數據庫中保存用戶位置信息的方法,這個方法會在你自己實現的

//BaseStateUpdater中調用(呵呵,自己實現然后自己調用)

????}

?

????public String getLocation(long userId) {

??????// code to get location from database

??????????//自己寫的從數據庫中查找用戶位置信息的方法,這個方法會在你自己實現的

//BaseQueryFunction中調用(也是自己實現自己調用)

????}}

?

?

然后你就要實現一個Trdient定義的StateFactory?,使你能夠在Trienttask中創建你自己的state。下面是為LocationDB?實現的StateFactory

?

public class LocationDBFactory implements StateFactory {

???public State makeState(Map conf, int partitionIndex, int numPartitions) {

??????return new LocationDB();

???} }

?

?

Trident提供了QueryFunction?用來對state進行查詢,提供了StateUpdater?用來對state進行更新操作。讓我們來寫一個QueryLocation的操作,該操作從LocationDB中查詢用戶的位置信息。首先來看看那你該如何在拓撲中使用QueryLocation操作。假設你的拓撲接收一個用戶的id的輸入流。

?

TridentTopology topology = new TridentTopology();

TridentState locations = topology.newStaticState(new LocationDBFactory());

topology.newStream("myspout", spout)

.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))

?

//.stateQuey就是查詢了,第一個參數指定了要查詢的state(這個state使用LocationDBFactory來創建的,這就是為什么要為你的state建立一個stateFactory了,因為你無法在TridentAPI中直接new你的state,你只能new stateFactory,然后Trident會調用其中的makeState方法來創建state);第二個參數就是輸入的流的字段,這里把userId輸入到操作中;第三個參數就是你自己實現的QueryFunction?用來執行查詢操作;第四個參數是輸出字段。

?

?

好了,現在可以來看看如何來實現一個自己的QueryFunction?了。

?

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {

????

???public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {

//查詢的方法,下面的代碼都是要自己寫的

????????List<String> ret = new ArrayList();

????????for(TridentTuple input: inputs) {

????????????ret.add(state.getLocation(input.getLong(0)));//每次查詢一個,效率不高

????????}

????????return ret;//這個ret的類型是你自己定義好的泛型(在類的開始處)

//返回的ret會循環調用下面的execute方法來發送每一個location

????}

?

????public void execute(TridentTuple tuple, String location, TridentCollector collector) {

//發送輸出數據的方法,輸出字段的定義在上面已經完成了,說白了還是一個bolt節點 ps:在新的版本中String location已經變成了一個List了,也就是ret一次都傳進來了,在execute方法中進行遍歷

????????collector.emit(new Values(location));

????} ???

}

?

?

QueryFunction的執行分為兩步:第一步,Trident會收集一個batch的輸入數據然后把他們傳遞給batchRetrieve。在這個例子中,batchRetrieve會接收到很多的用戶IDBatchRetrieve方法需要返回和接收到的batch中的tuple的數量相同的一個list數據。List中的第一個元素對應第一個tuple查詢的結果,第二個元素對應第二個tuple查詢的結果,以此類推。

?

也許你會看出上面的代碼中沒有利用Trident所提供的batch的優勢,因為它每次只從LocationDB?中查詢一條數據。所以可以把LocationDB?向下面這樣優化一下:

public class LocationDB implements State {

????public void beginCommit(Long txid) { ???

????}

?

????public void commit(Long txid) { ???

????}

?

????public void setLocationsBulk(List<Long> userIds, List<String> locations) {

??????// set locations in bulk批量進行更新

????}

?

????public List<String> bulkGetLocations(List<Long> userIds) {

??????// get locations in bulk批量進行查詢

}}

有了上面優化后的LocationDB?,那么QueryLocation?就也需要修改一下了:

?

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {

????public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {

????????List<Long> userIds = new ArrayList<Long>();

????????for(TridentTuple input: inputs) {

????????????userIds.add(input.getLong(0));

????????}

????????return state.bulkGetLocations(userIds);//一次查一批...

????}

?

????public void execute(TridentTuple tuple, String location, TridentCollector collector) {

????????collector.emit(new Values(location));

????} ???}

?

?

QueryLocation?修改為上面的樣子以后,就可以大大減少對數據庫的請求了。

?

?

查詢說完了,下面就是如何來更新state了。你要利用StateUpdater?接口來實現自己的目的。下面是例子:

?

?

public class LocationUpdater extends BaseStateUpdater<LocationDB> {

????public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {//很簡單

????????List<Long> ids = new ArrayList<Long>();

????????List<String> locations = new ArrayList<String>();

????????for(TridentTuple t: tuples) {

????????????ids.add(t.getLong(0));

????????????locations.add(t.getString(1));

????????}

????????state.setLocationsBulk(ids, locations);

????}}

?

有了上面的代碼,你就可以像下面這樣在Trident中來更新state

?

TridentTopology topology = new TridentTopology();

TridentState locations =

topology.newStream("locations", locationsSpout)

.partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())

第一個參數就是LocationDB對應的stateFactory;第二個參數是輸入的流的字段;第三個就是上面寫的更新操作了。

?

?

partitionPersist 操作會更新一個State。其內部是將?State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操作。

在這段代碼中,只是簡單的從輸入的tuple中提取處userid和對應的location,并一起更新到State中。

partitionPersist 會返回一個TridentState對象來表示被這個Trident topoloy更新過的locationDB。 然后你就可以使用這個statetopology的任何地方進行查詢操作了。

?

同時,你也可以看到我們傳了一個TridentCollectorStateUpdatersemit到這個collectortuple就會去往一個新的stream。在這個例子中,我們并沒有去往一個新的stream的需要,但是如果你在做一些事情,比如說更新數據庫中的某個count,你可以emit更新的count到這個新的stream。然后你可以通過調用TridentState#newValuesStream方法來訪問這個新的stream來進行其他的處理。

?

persistentAggregate

?

persistentAggregate是另一個用來更新state的方法,?你在之前的word count例子中應該已經見過了,如下:

?

TridentTopology topology = new TridentTopology();

TridentState wordCounts =

??????topology.newStream("spout1", spout)

????????.each(new Fields("sentence"), new Split(), new Fields("word"))

????????.groupBy(new Fields("word"))

????????.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

?

persistentAggregate是在partitionPersist之上的另外一層抽象。它知道怎么去使用一個Trident 聚合器來更新State。在這個例子當中,因為這是一個group好的streamTrident會期待你提供的state是實現了MapState接口的。用來進行group的字段會以key的形式存在于State當中,聚合后的結果會以value的形式存儲在State當中。MapState接口看上去如下所示:

?

public interface ?MapState<T> extends State {

????List<T> multiGet(List<List<Object>> keys);

????List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);

????void multiPut(List<List<Object>> keys, List<T> vals);}

?

?

?

?

當你在一個未經過groupstream上面進行聚合的話,Trident會期待你的state實現Snapshottable接口:

?

public interface ?Snapshottable<T> extends State {

????T get();

????T update(ValueUpdater updater);

????void set(T o);

}

?

MemoryMapState??MemcachedState?都實現了上面的2個接口。(自己寫的mapState也會實現上面的兩個接口)

?

Implementing Map States

Trident中實現MapState是非常簡單的,它幾乎幫你做了所有的事情。OpaqueMap, TransactionalMap, NonTransactionalMap 類實現了所有相關的邏輯,包括容錯的邏輯。你只需要將一個IBackingMap 的實現提供給這些類就可以了。IBackingMap接口看上去如下所示:

?

public interface IBackingMap<T> {

????List<T> multiGet(List<List<Object>> keys);

????void multiPut(List<List<Object>> keys, List<T> vals);

}

?

OpaqueMap's會用OpaqueValuevalue來調用multiPut方法,TransactionalMap's會提供TransactionalValue中的value,而NonTransactionalMaps只是簡單的把從Topology獲取的object傳遞給multiPut

?

Trident還提供了一種CachedMap類來進行自動的LRU 緩存。

?

另外,Trident 提供了?SnapshottableMap?類將一個MapState 轉換成一個 Snapshottable 對象.(用來對沒有進行group by 的流進行全局匯總)

?

大家可以看看?MemcachedState的實現,從而學習一下怎樣將這些工具組合在一起形成一個高性能的MapState實現。MemcachedState是允許大家選擇使用opaque transactional, transactional, 還是 non-transactional 語義的。

?

?

Ps:翻譯的內容就這么多了,其實網上翻譯的很多,但是看了以后并不能給很多新手帶來一些幫助(原文寫的太高深了)。努力翻譯了一下,但是還是覺得有很多沒有說清楚,下面會抽時間把storm官方提供的 hbase相關的trident state的源代碼解讀一下,我覺得只有解讀一下這個源代碼,才會讓人更加清晰 state當地怎么用,以及如何寫自己的state

?

總結

以上是生活随笔為你收集整理的Trident State译文的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。