Storm Trident API
在Storm Trident中有五種操作類型
- Apply Locally:本地操作,所有操作應用在本地節(jié)點數(shù)據(jù)上,不會產(chǎn)生網(wǎng)絡(luò)傳輸??? ?
- Repartitioning:數(shù)據(jù)流重定向,單純的改變數(shù)據(jù)流向,不會改變數(shù)據(jù)內(nèi)容,這部分會有網(wǎng)絡(luò)傳輸
- Aggragation:聚合操作,會有網(wǎng)絡(luò)傳輸
- Grouped streams上的操作
- Merge和Join
一Apply Locally
1.functions函數(shù)操作
函數(shù)的作用是接收一個tuple(需指定接收tuple的哪個字段),輸出0個或多個tuples。輸出的新字段值會被追加到原始輸入tuple的后面,如果一個function不輸出tuple,那就意味這這個tuple被過濾掉了,例如下面的例子:
1 class AddAndSubFuction extends BaseFunction{ 2 3 public void execute(TridentTuple tuple, TridentCollector collector) { 4 int res1 = tuple.getInteger(0); 5 int res2 = tuple.getInteger(1); 6 int sub = res1 > res2 ? res1 - res2 : res2 - res1; 7 collector.emit(new Values(res1+res2,sub)); 8 } 9 }?
2.Filter過濾操作
Filters很簡單,接收一個tuple,并決定是否保留這個tuple,例如
1 class ScoreFilter extends BaseFilter{ 2 3 public boolean isKeep(TridentTuple tuple) { 4 return tuple.getInteger(0) >= 60; 5 } 6 }上述Filter過濾調(diào)成績小于60的tuple.
3.partitionAggregate
PartitionAggregate的作用對每個Partition中的tuple進行聚合,與前面的函數(shù)在原tuple后面追加數(shù)據(jù)不同,PartitionAggregate的輸出會直接替換掉輸入的tuple,僅數(shù)據(jù)PartitionAggregate中發(fā)射的tuple。
TridentAPI提供了三個聚合器接口:CombinerAggregator,ReducerAggregator,Aggregator
我們先來看一看CombinerAggregator,CombinerAggregator接口的定義如下:
?
CombinerAggregator接口只返回一個tuple,并且這個tuple也只包含一個field。init方法會先執(zhí)行,它負責預處理每一個接收到的tuple,然后再執(zhí)行combine函數(shù)來計算收到的tuples直到最后一個tuple到達,當所有tuple處理完時,CombinerAggregator會發(fā)射zero函數(shù)的輸出,比如CombinerAggregator的實現(xiàn)類Count的定義如下:
?
當你使用aggregate?方法代替PartitionAggregate時,CombinerAggregator的好處就體現(xiàn)出來了,因為Trident會自動優(yōu)化計算,在網(wǎng)絡(luò)傳輸tuples之前做局部聚合。
我們再來看一下ReducerAggregator,ReducerAggregator的定義如下:
?
ReducerAggregator通過init方法提供一個初始值,然后為輸入的每個tuple迭代這個值,最終產(chǎn)生一個唯一的tuple并輸出,定義一個實例如下:
?
最后看一下通用的聚合器Aggregator,它的定義如下:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); } Aggregator接口可以發(fā)射含任意數(shù)量屬性的任意數(shù)據(jù)量的tuples,并且可以在執(zhí)行過程中的任何時候發(fā)射:
init:在處理數(shù)據(jù)之前被調(diào)用,它的返回值會作為一個狀態(tài)值傳遞給aggregate和complete方法
aggregate:用來處理每一個輸入的tuple,它可以更新狀態(tài)值也可以發(fā)射tuple
complete:當所有tuple都被處理完成后被調(diào)用
有時候我們需要執(zhí)行多個聚合器,這在Trident中稱為chaining
4.projection投影操作
投影操作的作用是僅僅保留stream指定字段的數(shù)據(jù),和關(guān)系數(shù)據(jù)庫中投影的概念類似
二Repartitioning重定向操作
重定向操作是如何在各個任務間對tuples進行分區(qū)。分區(qū)的數(shù)量也有可能改變重定向的結(jié)果。重定向需要網(wǎng)絡(luò)傳輸,下面介紹下重定向函數(shù):
?
三Aggragation聚合操作
Trident有aggregate和 persistentAggregate方法來做聚合操作。aggregate是獨立的運行在Stream的每個Batch上的,而persistentAggregate則是運行在Stream的所有Batch上并把運算結(jié)果存儲在state source中。 運行aggregate方法做全局聚合。當你用到 ReducerAggregator或Aggregator時,Stream首先被重定向到一個分區(qū)中,然后其中的聚合函數(shù)便在這個分區(qū)上運行。當你用到CombinerAggregator時,Trident會首先在每個分區(qū)上做局部聚合,然后把局部聚合后的結(jié)果重定向到一個分區(qū),因此使用CombinerAggregator會更高效,可能的話我們需要優(yōu)先考慮使用它。
四Grouped streams
GroupBy操作是根據(jù)特定的字段對流進行重定向的,還有,在一個分區(qū)內(nèi)部,每個相同字段的tuple也會被Group到一起。如果你在grouped Stream上面運行aggregators,聚合操作會運行在每個Group中而不是整個Batch。persistentAggregate也能運行在GroupedSteam上,不過結(jié)果會被保存在MapState中,其中的key便是分組的字段。 當然,aggregators在GroupedStreams上也可以串聯(lián)。
五Merge和Join
api的最后一部分便是如何把各種流匯聚到一起。最簡單的方式就是把這些流匯聚成一個流。我們可以這么做:
topology.merge(stream1, stream2, stream3);?
另一種合并流的方式就是join。一個標準的join就像是一個sql,必須有標準的輸入,因此,join只針對符合條件的Stream。join應用在來自Spout的每一個小Batch中。join時候的tuple會包含:
1.join的字段,如Stream1中的key和Stream2中的x
2.所有非join的字段,根據(jù)傳入join方法的順序,a和b分別代表steam1的val1和val2,c代表Stream2的val1
當join的是來源于不同Spout的stream時,這些Spout在發(fā)射數(shù)據(jù)時需要同步,一個Batch所包含的tuple會來自各個Spout。
?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/senlinyang/p/8081447.html
總結(jié)
以上是生活随笔為你收集整理的Storm Trident API的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第一周冲刺_周三总结
- 下一篇: 零元学Expression Blend