Trident API 概览
Trident API 概覽
? 在網(wǎng)上看到了很多有TRIDENT相關(guān)API的翻譯,看來(lái)看去,總覺(jué)得沒(méi)有說(shuō)清楚很多東西,所以自己結(jié)合使用的經(jīng)驗(yàn)翻譯了一篇出來(lái);翻譯完以后,也發(fā)現(xiàn)
在自己的翻譯中也有很多地方是表達(dá)不清楚的··不過(guò)多少感覺(jué)有些個(gè)人的理解編織在里面了。大俠們勿噴哈!
原文地址:http://storm.apache.org/releases/1.1.0/Trident-API-Overview.html
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
在Trident中核心的數(shù)據(jù)模式是“流”,以一系列batch的方式進(jìn)行處理。一個(gè)流分區(qū)在集群中不同的節(jié)點(diǎn)上,并且對(duì)于流的操作在各個(gè)分區(qū)上并行執(zhí)行。
?
在Trident中有五類操作:
1.在本地的分區(qū)上執(zhí)行的操作,不會(huì)引起網(wǎng)絡(luò)傳輸;
2.對(duì)一個(gè)流進(jìn)行重新分區(qū)但并不改變流中的內(nèi)容(會(huì)有網(wǎng)絡(luò)傳輸);
3.將網(wǎng)絡(luò)傳輸作為操作的一部分的聚合操作;
4.在分組后的流上進(jìn)行的操作;
5.合并和鏈接(原文:Merges and joins)
?
本地分區(qū)操作
本地分區(qū)操作不引起網(wǎng)絡(luò)傳輸,獨(dú)立運(yùn)行于每一個(gè)批量分區(qū)中
Function:
?
一個(gè)函數(shù)接收一批輸入字段并且發(fā)送零個(gè)或者更多的tuple來(lái)作為輸出。輸出tuple的字段被追加到原始的輸入tuple的字段后面。假如一個(gè)函數(shù)不發(fā)送任何的tuple,原始輸入的
tuple就會(huì)被過(guò)濾掉。否則,原始輸入tuple中的字段會(huì)被包含在每一個(gè)輸出tuple中。假如你有一個(gè)像下面一樣的函數(shù):
?
public class MyFunction extends BaseFunction {
????public void execute(TridentTuple tuple, TridentCollector collector) {
????????for(int i=0; i < tuple.getInteger(0); i++) {//獲取原始輸入tuple中的第一個(gè)字段的值,???//然后i從0開(kāi)始,如果i小于這個(gè)值,那么發(fā)送一個(gè)新的tuple;
//tuple中顯示的發(fā)送一個(gè)i的值
????????????collector.emit(new Values(i));
????????}
????}
}
現(xiàn)在假設(shè)你在mystream(Trident的一個(gè)拓?fù)?/span>)變量中有一個(gè)流,包含的字段有["a","b","c"],并且有如下的3個(gè)tuple要經(jīng)過(guò)該函數(shù):
[1, 2, 3]
[4, 1, 6]
[3, 0, 8]
假如你運(yùn)行如下代碼:
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))//輸入流中使用 "b"這個(gè)字段的值,輸出流中申明一個(gè)"d"字段的值
附加解釋
?運(yùn)行過(guò)程如下: ????????????????????????????????????????????????????????????????????????????????????????????????????????????
首先[1, 2, 3]進(jìn)去了函數(shù)中,取到"b"字段中的值為2,那么可以發(fā)送兩個(gè)tuple,其中 "d"字段的值分別為0 ,1 同時(shí)由于原始輸入tuple中的字段會(huì)被保留,所以輸出的兩個(gè)tuple為如下格式:[1,2 ,3 ,0] [1,2,3,1];同理然后[4,1,6]進(jìn)入函數(shù),輸出流為[4,1,6,0] ????????????????????????????????????????????????????????????????????????????????最后[3,0,8]進(jìn)入函數(shù),由于不滿足循環(huán)條件,沒(méi)有輸出tuple;所以[3,0,8]被直接過(guò)濾掉了。 ???????????????????????????????????????
進(jìn)過(guò)該函數(shù)處理的輸出tuple擁有一下字段 ["a", "b", "c", "d"],輸出的tuple看起來(lái)是這個(gè)樣子的:
[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]
?
Filter:
?
過(guò)濾器接收一個(gè)tuple,并決定是否保留該tuple。假設(shè)你有這樣一個(gè)過(guò)濾器:
?
public class MyFilter extends BaseFilter {
????public boolean isKeep(TridentTuple tuple) {//返回true就會(huì)被保留,返回false就不會(huì)被保留了
????????return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;//判斷條件是第一個(gè)字段值為1,第二個(gè)字段值為2的tuple才會(huì)被保留下來(lái)
????}
}
現(xiàn)在假設(shè)你有一些擁有 ["a", "b", "c"]這些字段的tuple,他們的值如下:
[1, 2, 3]
[2, 1, 1]
[2, 3, 4]
那么如果你運(yùn)行如下代碼:
mystream.filter(new MyFilter())
經(jīng)過(guò)處理后的輸出結(jié)果tuple就會(huì)變成下面這樣:
[1, 2, 3]
?
map和flatMap(這兩個(gè)函數(shù)比較新,在比較舊的storm版本中沒(méi)有這兩個(gè)函數(shù))
?
?map返回一個(gè)由提供的mapping函數(shù)對(duì)原始輸入流作用后的結(jié)果所組成的流(注:意味著原來(lái)字段對(duì)應(yīng)的值被替換掉了),這種操作可以應(yīng)用于一個(gè)流轉(zhuǎn)換為另一個(gè)流(一對(duì)一);
?
?舉個(gè)例子,假如這里有一個(gè)單詞流,并且你想把這個(gè)單詞流中的值轉(zhuǎn)換為大寫的方式,你就可以像下面這樣定義一個(gè)mapping函數(shù)了:
???
???注:文中所提到的單詞流,個(gè)人理解應(yīng)該是這樣的 ["world"] -----> [a],[b],[c]
??
public class UpperCase extends MapFunction {
?@Override
?public Values execute(TridentTuple input) {
???return new Values(input.getString(0).toUpperCase());
?}
}
?
這個(gè)mapping函數(shù)可以被應(yīng)用于流上,來(lái)產(chǎn)生一個(gè)把原始輸入流中的單詞轉(zhuǎn)換為大寫形式的新流;
?
mystream.map(new UpperCase())
注:個(gè)人理解------處理后的結(jié)果是 ["world"]------> [A],[B],[C]
?flatMap和map很相似,但是擁有將一個(gè)流轉(zhuǎn)換為多個(gè)流的能力(一對(duì)多),然后把生成的元素平壓到一個(gè)新的流中。(注:這句話怎么理解呢?額,不好表達(dá)清楚,有厲害的可以幫忙翻譯翻譯;我有一些個(gè)人的理解,但是還沒(méi)想好怎么組織語(yǔ)言)
?
?舉個(gè)例子,有一個(gè)句子的流,而且你想你想把這個(gè)句子的流轉(zhuǎn)換為單詞的流,那么你就需要像下面這樣來(lái)定義flatMap函數(shù):
?
public class Split extends FlatMapFunction {
??@Override
??public Iterable<Values> execute(TridentTuple input) {//其實(shí)函數(shù)看起來(lái)很簡(jiǎn)單
????List<Values> valuesList = new ArrayList<>();
????for (String word : input.getString(0).split(" ")) {
??????valuesList.add(new Values(word));
????}
????return valuesList;
??}
}
?
?這個(gè)flatMap函數(shù)可以作用于一個(gè)句子流,然后生成一個(gè)單詞流:
?
mystream.flatMap(new Split())
?
?當(dāng)然這些操作完全支持鏈?zhǔn)秸{(diào)用,那么你就可以通過(guò)如下的方式來(lái)將一個(gè)句子流轉(zhuǎn)換一個(gè)大寫單詞流:
?
mystream.flatMap(new Split()).map(new UpperCase())
?
?如果你不把輸出字段作為參數(shù)傳遞給map和faltMap,map和faltMap會(huì)把輸入字段作為輸出字段使用
?
?假如你想用新的輸出字段來(lái)替換舊的輸入字段,那么你可以像下面這樣在調(diào)用方法的時(shí)候,增加一個(gè)Fields參數(shù)
?
mystream.map(new UpperCase(), new Fields("uppercased"))
?輸出流會(huì)忽略輸入流中的字段,并只保留 "uppercased"這個(gè)字段。flatMap同理,例子如下:
mystream.flatMap(new Split(), new Fields("word"))
?
?
?
Peek:(這個(gè)函數(shù)比較新,在比較舊的storm版本中沒(méi)有這個(gè)函數(shù))
?
?peek用來(lái)對(duì)流中流過(guò)的每一個(gè)Trident tuple做一些額外的操作,這個(gè)功能在debug中會(huì)很有用,當(dāng)tuple經(jīng)過(guò)管道中的某個(gè)特定點(diǎn)的時(shí)候你可以觀察到這些tuple。
?
?舉個(gè)例子,下面的代碼將會(huì)在單詞被轉(zhuǎn)換為大寫的結(jié)果傳遞給groupBy之前打印他們:
?
mystream.flatMap(new Split()).map(new UpperCase())
?.peek(new Consumer() {
??????@Override public void accept(TridentTuple input) {//這個(gè)函數(shù)中,你只能獲得tuple,然后用這個(gè)tuple的數(shù)據(jù)做一些事情,
//比如打印出來(lái)看一看,發(fā)送個(gè)電子郵件什么的.但是你不可能對(duì)流產(chǎn)生任何的影響
?????????System.out.println(input.getString(0)); } })
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
?
?
?
min 和 minBy?:(這兩個(gè)函數(shù)比較新,在比較舊的storm版本中沒(méi)有這兩個(gè)函數(shù))
?
?min和minBy可以返回一個(gè)trident流中一個(gè)分區(qū)中一批tuple中的最小值;
?
?假如,一個(gè)trident流中包含字段["device-id","count"]并且以分區(qū)的方式發(fā)送流;
?
?Partition 0: ??????
?[123, 2]
?[113, 54]
?[23, ?28]
?[237, 37]
?[12, ?23]
?[62, ?17]
?[98, ?42]
?
?Partition 1:
?[64, ?18]
?[72, ?54]
?[2, ??28]
?[742, 71]
?[98, ?45]
?[62, ?12]
?[19, ?174]
?
?Partition 2:
?[27, ?94]
?[82, ?23]
?[9, ??86]
?[74, ?37]
?[51, ?49]
?[37, ?98]
?
?
?當(dāng)binBy操作像下面這樣應(yīng)用與上面的流中的tuple上時(shí),結(jié)果是在每個(gè)分區(qū)上發(fā)送count是最小值的tuple。
?
mystream.minBy(new Fields("count"))
?
?上面代碼在3個(gè)分區(qū)中運(yùn)行的結(jié)果是:
?
?Partition 0:
?[123, 2]
?
?Partition 1:
?[62, ?12]
?
?Partition 2:
?[82, ?23]
?
?你可以在public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)和public Stream min(Comparator<TridentTuple> comparator)方法中查看其他min和minBy操作;
?下面的例子演示了這些API是如何使用不同的比較器來(lái)找出一批tuple中的最小值的:
?
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
?
TridentTopology topology = new TridentTopology();
????????
Stream vehiclesStream = topology.newStream("spout1", spout).each(allFields,new Debug("##### vehicles"));
Stream slowVehiclesStream =vehiclesStream .min(new SpeedComparator()) // Comparator w.r.t speed on received tuple..each(vehicleField, new Debug("#### slowest vehicle"));
vehiclesStream.minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple..each(vehicleField, new Debug("#### least efficient vehicle"));
?
//這兩個(gè)類的地址在:https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
????????@Override
????????public int compare(TridentTuple tuple1, TridentTuple tuple2) {
????????????Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
????????????Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
????????????return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
????????}
???????}
static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
@Override
????????public int compare(TridentTuple tuple1, TridentTuple tuple2) {
????????????Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
????????????Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
????????????return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
????????}
????}
?
?
?
max和maxBy:(這兩個(gè)函數(shù)比較新,在比較舊的storm版本中沒(méi)有這兩個(gè)函數(shù))
?
?max 和 maxBy 可以返回一個(gè)trident流中一個(gè)分區(qū)中一批tuple中的最大值;
?
?假如,一個(gè)trident流中包含字段["device-id","count"]并且以分區(qū)的方式發(fā)送流;
?
Partition 0: ??????
[123, 2]
[113, 54]
[23, ?28]
[237, 37]
[12, ?23]
[62, ?17]
[98, ?42]
?
Partition 1:
[64, ?18]
[72, ?54]
[2, ??28]
[742, 71]
[98, ?45]
[62, ?12]
[19, ?174]
?
Partition 2:
[27, ?94]
[82, ?23]
[9, ??86]
[74, ?37]
[51, ?49]
[37, ?98]
?當(dāng)maxBy操作像下面這樣應(yīng)用與上面的流中的tuple上時(shí),結(jié)果是在每個(gè)分區(qū)上發(fā)送count是最大值的tuple。
?
mystream.maxBy(new Fields("count"))
?
??上面代碼在3個(gè)分區(qū)中運(yùn)行的結(jié)果是:
??
??Partition 0:
??[113, 54]
?
??Partition 1:
??[19, ?174]
?
??Partition 2:
??[37, ?98]
??
??
??你可以在public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)和public Stream max(Comparator<TridentTuple> comparator)方法中查看其他max和maxBy操作;
??下面的例子演示了這些API是如何使用不同的比較器來(lái)找出一批tuple中的最大值的:
??
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
?
TridentTopology topology = new TridentTopology();
?
Stream vehiclesStream = topology.newStream("spout1", spout). each(allFields, new Debug("##### vehicles"));
?
????????vehiclesStream
????????????????.max(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
????????????????.each(vehicleField, new Debug("#### fastest vehicle"))
????????????????.project(driverField)
????????????????.each(driverField, new Debug("##### fastest driver"));
?
????????vehiclesStream
????????????????.maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
????????????????.each(vehicleField, new Debug("#### most efficient vehicle"));
?
########兩個(gè)比較器和min、minBy中用到的比較器是一樣的
?
?
?
Windowing:
?
Trident可以處理在同一個(gè)window中的一批一批的tuple,并且將匯總結(jié)果發(fā)送到下一個(gè)操作。這里有兩種類型的window,分別是Tumblingwindow和SlidingWindow;兩者都支持基于處理時(shí)間的或者是基于tuple的個(gè)數(shù)
?的Window劃分。
?
?Tumbling window:
?
?tuple被基于處理時(shí)間或者tuple的count值,分配在一個(gè)單獨(dú)的Window中;任何的tuple都只可能屬于一個(gè)Window。
?
????/**
?????* 返回一個(gè)包含tuple個(gè)數(shù)為windowCount的tummbling window中每一個(gè)tuple的匯總結(jié)果所組成的流
?????*/
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);
?
????/**
?????* 返回一個(gè)時(shí)間跨度為windowDuration的tummbling window中每一個(gè)tuple的匯總結(jié)果所組成的流
?????*/
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);
?
?
?Sliding window:
?
?Tuple被分組到各個(gè)window中,并且window每隔一定的時(shí)間間隔進(jìn)行一次滑動(dòng)。一個(gè)tuple可以屬于一個(gè)或者多個(gè)window。
?
????/**
?????* 數(shù)為windowCount的sliding window中每一個(gè)tuple的匯總結(jié)果所組成的流,并將sliding window向后滑動(dòng)slideCount
?????*/
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
?
????/**
?????* 返回一個(gè)時(shí)間跨度為window向后滑動(dòng)windowDuration的sliding window中每一個(gè)tuple的匯總結(jié)果所組成的流,并將sliding window向后滑動(dòng)windowDuration
?????*/
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
?
?
Common windowing API:
?下面是接受任何被支持的windowing configuration的公共windowing API:
?
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)
?
windowConfig 可以是下面這幾種類型:
1.SlidingCountWindow.of(int windowCount, int slidingCount)
2.SlidingDurationWindow.of(BaseWindowedBolt.Duration?windowDuration,BaseWindowedBolt.Duration slidingDuration)
3.TumblingCountWindow.of(int windowLength)
4.TumblingCountWindow.of(int windowLength)
?
Trident windowing APIS 需要WindowsStoreFactory 來(lái)保存接收到的tuple和匯總值;現(xiàn)在已經(jīng)提供的一個(gè)基礎(chǔ)的工廠是基于hbase的HBaseWindowsStoreFactory;它可以被擴(kuò)展,用來(lái)支持不同場(chǎng)景的應(yīng)用。
HBaseWindowStoreFactory 的例子如下:
?
// window-state table should already be created with cf:tuples column(要在hbase中提前建立好一個(gè)表叫window-state,并且已經(jīng)在cf列族中添加了tuples列)
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
????
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
????????????new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
????????????new Values("how many apples can you eat"), new Values("to be or not to be the person"));
????spout.setCycle(true);
?
TridentTopology topology = new TridentTopology();
?
Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),new Split(), new Fields("word"))
?????????????????????????.window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
????????????.peek(new Consumer() {
????????????????@Override
????????????????public void accept(TridentTuple input) {
????????????????????LOG.info("Received tuple: [{}]", input);
????????????????}
????????????});
?
????StormTopology stormTopology = ?topology.build();
?
partitionAggregate:
?
?partitionAggregate 在每一個(gè)批次的tuple的每一個(gè)分區(qū)上運(yùn)行一個(gè)函數(shù),和function(第一個(gè)介紹的那個(gè))不同,partitionAggregate 處理后發(fā)送的tuple覆蓋了他所接收到的tuple;來(lái)看看這個(gè)例子:
?
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
?
?假設(shè)輸入的流中的tuple包含字段 ["a", "b"],并且有一下這樣幾個(gè)分區(qū)的tuple流入了sum函數(shù):
?
Partition 0:
["a", 1]
["b", 2]
?
Partition 1:
["a", 3]
["c", 8]
?
Partition 2:
["e", 1]
["d", 9]
["d", 10]
?
?
?然后輸出的流中只包含一個(gè)字段'sum',tuple的內(nèi)容如下:
?
Partition 0:
[3]
?
Partition 1:
[11]
?
Partition 2:
[20]
?
?這里有三種不同的接口用來(lái)定義聚合器:CombinerAggregator, ReducerAggregator, 和 Aggregator.
?
?CombinerAggregator接口定義的內(nèi)容如下:
?
public interface CombinerAggregator<T> extends Serializable {
????T init(TridentTuple tuple);
????T combine(T val1, T val2);
????T zero();
}
?
?一個(gè)CombinerAggregator只能返回一個(gè)tuple并且該tuple只有一個(gè)字段。CombinerAggregator在每一個(gè)輸入的tuple上都會(huì)運(yùn)行init方法來(lái)初始化值,然后使用combine方法來(lái)combine所有的值,直到只剩下一個(gè)
?值為止。如果分區(qū)內(nèi)沒(méi)有任何tuple,CombinerAggregator就發(fā)送zero方法產(chǎn)生的值。例如,這是Count的實(shí)現(xiàn):
?
public class Count implements CombinerAggregator<Long> {
????public Long init(TridentTuple tuple) {
????????return 1L;
????}
?
????public Long combine(Long val1, Long val2) {
????????return val1 + val2;
????}
?
????public Long zero() {
????????return 0L;
????}
}
?
?
?當(dāng)在aggregate方法中而不是在partitionAggregate方法中使用CombinerAggregators 的時(shí)候,你就能感受到它的好處了。在aggregate方法中,Trident會(huì)通過(guò)在網(wǎng)絡(luò)之間傳遞tuple之前進(jìn)行局部分區(qū)聚合的
方式來(lái)優(yōu)化計(jì)算。
?
?
?ReducerAggregator接口的定義如下:
?
public interface ReducerAggregator<T> extends Serializable {
????T init();
????T reduce(T curr, TridentTuple tuple);
}
?ReducerAggregator在初始化的時(shí)候設(shè)置一個(gè)初始值,然后迭代每一個(gè)輸入的tuple的value來(lái)產(chǎn)生一個(gè)只有一個(gè)值的單一tuple來(lái)作為輸出。例如,下面是使用ReducerAggregator實(shí)現(xiàn)的Count函數(shù):
?
?
public class Count implements ReducerAggregator<Long> {
????public Long init() {
????????return 0L;
????}
?
????public Long reduce(Long curr, TridentTuple tuple) {
????????return curr + 1;
????}
}
?
?
ReducerAggregator也可以被使用在persistentAggregate方法中,稍后你將會(huì)看到。
?
?最最通用的聚合接口就是Aggregator了,接口定義如下:
?
public interface Aggregator<T> extends Operation {
????T init(Object batchId, TridentCollector collector);
????void aggregate(T state, TridentTuple tuple, TridentCollector collector);
????void complete(T state, TridentCollector collector);
}
?
Aggregator們可以發(fā)送帶有任意數(shù)量字段的任意數(shù)量的tuple。在其方法執(zhí)行的任何地方都可以發(fā)送tuple,Aggregator們按照如下的方式來(lái)執(zhí)行:
?
1.初始化方法在處理一個(gè)batch之前被調(diào)用,返回結(jié)果是一個(gè)用來(lái)表示聚合狀態(tài)的對(duì)象,并且會(huì)被傳遞給aggregate和complete方法中。
2.aggregate 方法在每一個(gè)batch分區(qū)的tuple上運(yùn)行,這個(gè)方法可以更新初始化的那個(gè)狀態(tài)對(duì)象,并可以選擇性地發(fā)送一些消息。
3.complete 方法在batch分區(qū)上的所有tuple都被Aggregator處理后調(diào)用。
?
下面的例子演示如何使用Aggregator來(lái)實(shí)現(xiàn)一個(gè)Count:
?
public class CountAgg extends BaseAggregator<CountState> {
????static class CountState {
????????long count = 0;
????}
?
????public CountState init(Object batchId, TridentCollector collector) {
????????return new CountState();
????}
?
????public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
????????state.count+=1;
????}
?
????public void complete(CountState state, TridentCollector collector) {
????????collector.emit(new Values(state.count));
????}
}
?
有時(shí)候你想同時(shí)執(zhí)行很多個(gè)aggregator,這種方式被稱為鏈?zhǔn)秸{(diào)用,可以像下面這樣使用:
mystream.chainedAgg()
????????.partitionAggregate(new Count(), new Fields("count"))
????????.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
????????.chainEnd()
?
?上面的代碼會(huì)在每一個(gè)分區(qū)上運(yùn)行Count和Sum聚合器,輸出將會(huì)只有一個(gè)tuple,并包含 ["count", "sum"]字段。
?
?
?
stateQuery 和 partitionPersist:
?
?stateQuery 和 partitionPersist 分別可以查詢和更新作為數(shù)據(jù)資源的state;相關(guān)介紹在Trident state doc中。
?
?
projection:
?projection方法作用在流上后,可以使流中只包含方法中指定的字段;假如你有一個(gè)包含字段["a", "b", "c", "d"]的流,然后你運(yùn)行下面的方法:
?
mystream.project(new Fields("b", "d"))
?
?那么輸出的流中就會(huì)只包含字段 ["b", "d"]。
?
?
Repartitioning operations:
?
?Repartitioning operations(重分區(qū)操作)運(yùn)行一個(gè)可以改變tuple在各個(gè)任務(wù)之間是如何分區(qū)的函數(shù),該函數(shù)的運(yùn)行結(jié)果可能會(huì)改變分區(qū)的數(shù)目;例如,并行處理數(shù)大于重新分區(qū)后的分區(qū)數(shù)的時(shí)候。
重分區(qū)操作需要進(jìn)行網(wǎng)絡(luò)傳輸,下面是提供的重分區(qū)函數(shù):
?
?1.shuffle:隨機(jī)分配tuple到所有的目標(biāo)分區(qū)中;
?2.broadcast:每一個(gè)tuple都會(huì)被重復(fù)的發(fā)送到每一個(gè)目標(biāo)分區(qū)中;這在DRPC操作用很有用,例如:你需要在每一個(gè)分區(qū)的data上進(jìn)行stateQuery的時(shí)候
?3.partitionBy:該函數(shù)接收一批字段,然后根據(jù)這批字段進(jìn)行分區(qū)。這批字段會(huì)被進(jìn)行哈希運(yùn)算然后根據(jù)分區(qū)個(gè)數(shù)取模,然后根據(jù)運(yùn)算結(jié)果進(jìn)行分區(qū)。該函數(shù)保證相同一批字段的tuple一定會(huì)去到同一個(gè)分區(qū)
中。
?4.global:所有的tuple都被發(fā)送到同一個(gè)分區(qū)中。流中所有的batch都會(huì)選擇同一個(gè)分區(qū)。
?5.batchGlobal:在batch中的所有tuple都會(huì)進(jìn)入同一個(gè)分區(qū),但是不同的batch中的tuple可能會(huì)進(jìn)入到不同的分區(qū)中。
?6.partition:該函數(shù)接收一個(gè)本地化的分區(qū)方法,本地化的分區(qū)方法需要實(shí)現(xiàn)org.apache.storm.grouping.CustomStreamGrouping。
?
?
Aggregation operations(集合操作):
?
?Trident有aggregate 和persistentAggregate 方法來(lái)提供在一個(gè)流上進(jìn)行聚合操作;aggregate 獨(dú)立地運(yùn)行在流中的每一個(gè)batch上,persistentAggregate 會(huì)運(yùn)行在流中的所有batch上,并且會(huì)把結(jié)果
保存在state中。
?
?運(yùn)行aggregate方法會(huì)在流上進(jìn)行全局的聚合。當(dāng)你使用ReducerAggregator 和ReducerAggregator 的時(shí)候,首先流會(huì)被重新分組到一個(gè)單獨(dú)的分區(qū)中,然后分區(qū)函數(shù)在這個(gè)單獨(dú)的分區(qū)中運(yùn)行;然而當(dāng)你
使用CombinerAggregator的時(shí)候,Trident首先會(huì)在每一個(gè)分區(qū)上進(jìn)行聚合,然后把每個(gè)分區(qū)的聚合結(jié)果重新分區(qū)到一個(gè)獨(dú)立的分區(qū)中,然后在完成網(wǎng)絡(luò)傳輸后完成全局聚合操作。CombinerAggregator比較
高效,你應(yīng)該盡量的使用它。
?
?這里有一個(gè)例子展示如何使用aggregate 來(lái)獲得某個(gè)batch中的全局count:
?
mystream.aggregate(new Count(), new Fields("count"))
?
?像partitionAggregate一樣,aggregate中的聚合器可以以鏈?zhǔn)降姆绞竭M(jìn)行調(diào)用;然而,如果你把一個(gè)CombinerAggregator 和一個(gè)不是CombinerAggregator 的聚合器鏈在一起后,storm就無(wú)法進(jìn)行在每個(gè)分區(qū)
中預(yù)先進(jìn)行聚合操作的優(yōu)化了。
?
?你可以在Trident state doc中查看persistentAggregate的使用方式。
?
?
Operations on grouped streams(在分組流上的操作)
?
?groupBy 操作根據(jù)特定的字段運(yùn)行一個(gè)partitionBy 操作來(lái)對(duì)流進(jìn)行重新分區(qū),然后在每一個(gè)分區(qū)中,把特定字段相同的tuple放到一個(gè)組中。下面是一個(gè)示例圖:
?
?
?
?
?如果你在一個(gè)分組后的流上運(yùn)行aggregators ,那么聚合操作會(huì)在每一個(gè)組中運(yùn)行,而不是在每個(gè)batch中運(yùn)行。persistentAggregate 也可以運(yùn)行在一個(gè)分組后的流上,在這種情況下
聚合后的結(jié)果會(huì)被保存在一個(gè) MapState中,該 MapState使用用來(lái)分組的字段作為key。在Trident state doc中你可以找到更多答案。
?
?和普通的流一樣,運(yùn)行在分組后的流上的aggregators 也可以進(jìn)行鏈?zhǔn)秸{(diào)用。
?
?
Merges and joins:
?
?
?API的最后一部分就是把不同的流結(jié)合在一起,最最簡(jiǎn)單的結(jié)合流的方式就是把幾個(gè)不同的流合并到同一個(gè)流中。你可以通過(guò)merge 方法(像下面這樣)來(lái)達(dá)到目的:
?
topology.merge(stream1, stream2, stream3);
?
?Trident會(huì)用第一個(gè)流的字段來(lái)重新命名其他合并的流的字段,在作為新的輸出流的字段。
?
?另一種合結(jié)合流的方式就是join操作,現(xiàn)在來(lái)看一個(gè)標(biāo)準(zhǔn)的join操作,就像在SQL中的join操作一樣,join要求輸入是有限的,所以對(duì)于無(wú)限地不停地發(fā)送的流是不起作用的。在Trident
中的join操作僅僅作用于每一個(gè)有spout發(fā)出的很小的batch中;
?
?下面的例子在包含字段["key", "val1", "val2"] 的流和包含字段["x", "val1"]的另一個(gè)流上進(jìn)行join操作:
?
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
?
?
?上面的代碼中,把stream1和stream2通過(guò)key和xjoin在一起,Trident要求輸出流中所有的輸出字段都要起名字,因?yàn)樵谳斎肓髦锌赡軙?huì)用重復(fù)的字段名稱。由join操作發(fā)出的tuple
會(huì)包含如下內(nèi)容:
?
1.首先是鏈接字段的列表。在這里key等同以stream1中的key也等同于stream2中的x;
2.然后就是所有流中沒(méi)有進(jìn)行join的字段,這些字段按照傳遞進(jìn)來(lái)的順序排序;在這個(gè)例子中,a=stream1.val1,b=stream1.val2,c=stream2.val1.
?當(dāng)來(lái)自不同的spout的流和并的時(shí)候,這些spout會(huì)在發(fā)送batch上進(jìn)行同步。也就是說(shuō)一個(gè)要處理的batch會(huì)包含所有的參與的spout所發(fā)送的tuple。
?你也許會(huì)好奇,該如何實(shí)現(xiàn)一個(gè)類似"windowed join"的操作,也就是說(shuō),來(lái)自一方的tuple和來(lái)自另一方的最近一小時(shí)的tuple進(jìn)行join操作。
?要實(shí)現(xiàn)這樣的功能,你需要利用partitionPersist 和stateQuery,最近一小時(shí)tuple會(huì)被保存并且循環(huán)迭代在一個(gè)state中,以join操作的field作為key。然后stateQuery 將會(huì)通過(guò)join的字段查詢state中的數(shù)據(jù)來(lái)進(jìn)行join操作。
總結(jié)
以上是生活随笔為你收集整理的Trident API 概览的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 蓝桥杯第六届省赛JAVA真题----打印
- 下一篇: 提权真的很难吗?