日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Trident API 概览

發布時間:2025/3/15 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Trident API 概览 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Trident API 概覽

? 在網上看到了很多有TRIDENT相關API的翻譯,看來看去,總覺得沒有說清楚很多東西,所以自己結合使用的經驗翻譯了一篇出來;翻譯完以后,也發現

在自己的翻譯中也有很多地方是表達不清楚的··不過多少感覺有些個人的理解編織在里面了。大俠們勿噴哈!

原文地址:http://storm.apache.org/releases/1.1.0/Trident-API-Overview.html

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


Trident中核心的數據模式是“流”,以一系列batch的方式進行處理。一個流分區在集群中不同的節點上,并且對于流的操作在各個分區上并行執行。

?

Trident中有五類操作:

1.在本地的分區上執行的操作,不會引起網絡傳輸;

2.對一個流進行重新分區但并不改變流中的內容(會有網絡傳輸);

3.將網絡傳輸作為操作的一部分的聚合操作;

4.在分組后的流上進行的操作;

5.合并和鏈接(原文:Merges and joins

?

本地分區操作

本地分區操作不引起網絡傳輸,獨立運行于每一個批量分區中

Function:

?

一個函數接收一批輸入字段并且發送零個或者更多的tuple來作為輸出。輸出tuple的字段被追加到原始的輸入tuple的字段后面。假如一個函數不發送任何的tuple,原始輸入的

tuple就會被過濾掉。否則,原始輸入tuple中的字段會被包含在每一個輸出tuple中。假如你有一個像下面一樣的函數:

?

public class MyFunction extends BaseFunction {

????public void execute(TridentTuple tuple, TridentCollector collector) {

????????for(int i=0; i < tuple.getInteger(0); i++) {//獲取原始輸入tuple中的第一個字段的值,???//然后i0開始,如果i小于這個值,那么發送一個新的tuple

//tuple中顯示的發送一個i的值

????????????collector.emit(new Values(i));

????????}

????}

}

現在假設你在mystream(Trident的一個拓撲)變量中有一個流,包含的字段有["a","b","c"],并且有如下的3tuple要經過該函數:

[1, 2, 3]

[4, 1, 6]

[3, 0, 8]

假如你運行如下代碼:

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))//輸入流中使用 "b"這個字段的值,輸出流中申明一個"d"字段的值

附加解釋

?運行過程如下: ????????????????????????????????????????????????????????????????????????????????????????????????????????????

首先[1, 2, 3]進去了函數中,取到"b"字段中的值為2,那么可以發送兩個tuple,其中 "d"字段的值分別為0 1 同時由于原始輸入tuple中的字段會被保留,所以輸出的兩個tuple為如下格式:[1,2 ,3 ,0] [1,2,3,1];同理然后[4,1,6]進入函數,輸出流為[4,1,6,0] ????????????????????????????????????????????????????????????????????????????????最后[3,0,8]進入函數,由于不滿足循環條件,沒有輸出tuple;所以[3,0,8]被直接過濾掉了。 ???????????????????????????????????????

進過該函數處理的輸出tuple擁有一下字段 ["a", "b", "c", "d"],輸出的tuple看起來是這個樣子的:

[1, 2, 3, 0]

[1, 2, 3, 1]

[4, 1, 6, 0]

?

Filter:

?

過濾器接收一個tuple,并決定是否保留該tuple。假設你有這樣一個過濾器:

?

public class MyFilter extends BaseFilter {

????public boolean isKeep(TridentTuple tuple) {//返回true就會被保留,返回false就不會被保留了

????????return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;//判斷條件是第一個字段值為1,第二個字段值為2tuple才會被保留下來

????}

}

現在假設你有一些擁有 ["a", "b", "c"]這些字段的tuple,他們的值如下:

[1, 2, 3]

[2, 1, 1]

[2, 3, 4]

那么如果你運行如下代碼:

mystream.filter(new MyFilter())

經過處理后的輸出結果tuple就會變成下面這樣:

[1, 2, 3]

?

mapflatMap(這兩個函數比較新,在比較舊的storm版本中沒有這兩個函數)

?

?map返回一個由提供的mapping函數對原始輸入流作用后的結果所組成的流(注:意味著原來字段對應的值被替換掉了),這種操作可以應用于一個流轉換為另一個流(一對一)

?

?舉個例子,假如這里有一個單詞流,并且你想把這個單詞流中的值轉換為大寫的方式,你就可以像下面這樣定義一個mapping函數了:

???

???注:文中所提到的單詞流,個人理解應該是這樣的 ["world"] -----> [a],[b],[c]

??

public class UpperCase extends MapFunction {

?@Override

?public Values execute(TridentTuple input) {

???return new Values(input.getString(0).toUpperCase());

?}

}

?

這個mapping函數可以被應用于流上,來產生一個把原始輸入流中的單詞轉換為大寫形式的新流;

?

mystream.map(new UpperCase())

注:個人理解------處理后的結果是 ["world"]------> [A],[B],[C]

?flatMapmap很相似,但是擁有將一個流轉換為多個流的能力(一對多),然后把生成的元素平壓到一個新的流中。(注:這句話怎么理解呢?額,不好表達清楚,有厲害的可以幫忙翻譯翻譯;我有一些個人的理解,但是還沒想好怎么組織語言)

?

?舉個例子,有一個句子的流,而且你想你想把這個句子的流轉換為單詞的流,那么你就需要像下面這樣來定義flatMap函數:

?

public class Split extends FlatMapFunction {

??@Override

??public Iterable<Values> execute(TridentTuple input) {//其實函數看起來很簡單

????List<Values> valuesList = new ArrayList<>();

????for (String word : input.getString(0).split(" ")) {

??????valuesList.add(new Values(word));

????}

????return valuesList;

??}

}

?

?這個flatMap函數可以作用于一個句子流,然后生成一個單詞流:

?

mystream.flatMap(new Split())

?

?當然這些操作完全支持鏈式調用,那么你就可以通過如下的方式來將一個句子流轉換一個大寫單詞流:

?

mystream.flatMap(new Split()).map(new UpperCase())

?

?如果你不把輸出字段作為參數傳遞給mapfaltMapmapfaltMap會把輸入字段作為輸出字段使用

?

?假如你想用新的輸出字段來替換舊的輸入字段,那么你可以像下面這樣在調用方法的時候,增加一個Fields參數

?

mystream.map(new UpperCase(), new Fields("uppercased"))

?輸出流會忽略輸入流中的字段,并只保留 "uppercased"這個字段。flatMap同理,例子如下:

mystream.flatMap(new Split(), new Fields("word"))

?

?

?

Peek:(這個函數比較新,在比較舊的storm版本中沒有這個函數)

?

?peek用來對流中流過的每一個Trident tuple做一些額外的操作,這個功能在debug中會很有用,當tuple經過管道中的某個特定點的時候你可以觀察到這些tuple

?

?舉個例子,下面的代碼將會在單詞被轉換為大寫的結果傳遞給groupBy之前打印他們:

?

mystream.flatMap(new Split()).map(new UpperCase())

?.peek(new Consumer() {

??????@Override public void accept(TridentTuple input) {//這個函數中,你只能獲得tuple,然后用這個tuple的數據做一些事情,

//比如打印出來看一看,發送個電子郵件什么的.但是你不可能對流產生任何的影響

?????????System.out.println(input.getString(0)); } })

.groupBy(new Fields("word"))

.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

?

?

?

min minBy?:(這兩個函數比較新,在比較舊的storm版本中沒有這兩個函數)

?

?minminBy可以返回一個trident流中一個分區中一批tuple中的最小值;

?

?假如,一個trident流中包含字段["device-id","count"]并且以分區的方式發送流;

?

?Partition 0: ??????

?[123, 2]

?[113, 54]

?[23, ?28]

?[237, 37]

?[12, ?23]

?[62, ?17]

?[98, ?42]

?

?Partition 1:

?[64, ?18]

?[72, ?54]

?[2, ??28]

?[742, 71]

?[98, ?45]

?[62, ?12]

?[19, ?174]

?

?Partition 2:

?[27, ?94]

?[82, ?23]

?[9, ??86]

?[74, ?37]

?[51, ?49]

?[37, ?98]

?

?

?binBy操作像下面這樣應用與上面的流中的tuple上時,結果是在每個分區上發送count是最小值的tuple

?

mystream.minBy(new Fields("count"))

?

?上面代碼在3個分區中運行的結果是:

?

?Partition 0:

?[123, 2]

?

?Partition 1:

?[62, ?12]

?

?Partition 2:

?[82, ?23]

?

?你可以在public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)public Stream min(Comparator<TridentTuple> comparator)方法中查看其他minminBy操作;

?下面的例子演示了這些API是如何使用不同的比較器來找出一批tuple中的最小值的:

?

FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));

?

TridentTopology topology = new TridentTopology();

????????

Stream vehiclesStream = topology.newStream("spout1", spout).each(allFields,new Debug("##### vehicles"));

Stream slowVehiclesStream =vehiclesStream .min(new SpeedComparator()) // Comparator w.r.t speed on received tuple..each(vehicleField, new Debug("#### slowest vehicle"));

vehiclesStream.minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple..each(vehicleField, new Debug("#### least efficient vehicle"));

?

//這兩個類的地址在:https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java

static class SpeedComparator implements Comparator<TridentTuple>, Serializable {

????????@Override

????????public int compare(TridentTuple tuple1, TridentTuple tuple2) {

????????????Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);

????????????Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);

????????????return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);

????????}

???????}

static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {

@Override

????????public int compare(TridentTuple tuple1, TridentTuple tuple2) {

????????????Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);

????????????Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);

????????????return Double.compare(vehicle1.efficiency, vehicle2.efficiency);

????????}

????}

?

?

?

maxmaxBy:(這兩個函數比較新,在比較舊的storm版本中沒有這兩個函數)

?

?max maxBy 可以返回一個trident流中一個分區中一批tuple中的最大值;

?

?假如,一個trident流中包含字段["device-id","count"]并且以分區的方式發送流;

?

Partition 0: ??????

[123, 2]

[113, 54]

[23, ?28]

[237, 37]

[12, ?23]

[62, ?17]

[98, ?42]

?

Partition 1:

[64, ?18]

[72, ?54]

[2, ??28]

[742, 71]

[98, ?45]

[62, ?12]

[19, ?174]

?

Partition 2:

[27, ?94]

[82, ?23]

[9, ??86]

[74, ?37]

[51, ?49]

[37, ?98]

?maxBy操作像下面這樣應用與上面的流中的tuple上時,結果是在每個分區上發送count是最大值的tuple

?

mystream.maxBy(new Fields("count"))

?

??上面代碼在3個分區中運行的結果是:

??

??Partition 0:

??[113, 54]

?

??Partition 1:

??[19, ?174]

?

??Partition 2:

??[37, ?98]

??

??

??你可以在public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)public Stream max(Comparator<TridentTuple> comparator)方法中查看其他maxmaxBy操作;

??下面的例子演示了這些API是如何使用不同的比較器來找出一批tuple中的最大值的:

??

FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));

?

TridentTopology topology = new TridentTopology();

?

Stream vehiclesStream = topology.newStream("spout1", spout). each(allFields, new Debug("##### vehicles"));

?

????????vehiclesStream

????????????????.max(new SpeedComparator()) // Comparator w.r.t speed on received tuple.

????????????????.each(vehicleField, new Debug("#### fastest vehicle"))

????????????????.project(driverField)

????????????????.each(driverField, new Debug("##### fastest driver"));

?

????????vehiclesStream

????????????????.maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.

????????????????.each(vehicleField, new Debug("#### most efficient vehicle"));

?

########兩個比較器和minminBy中用到的比較器是一樣的

?

?

?

Windowing:

?

Trident可以處理在同一個window中的一批一批的tuple,并且將匯總結果發送到下一個操作。這里有兩種類型的window,分別是TumblingwindowSlidingWindow;兩者都支持基于處理時間的或者是基于tuple的個數

?Window劃分。

?

?Tumbling window

?

?tuple被基于處理時間或者tuplecount值,分配在一個單獨的Window中;任何的tuple都只可能屬于一個Window

?

????/**

?????* 返回一個包含tuple個數為windowCounttummbling window中每一個tuple的匯總結果所組成的流

?????*/

public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);

?

????/**

?????* 返回一個時間跨度為windowDurationtummbling window中每一個tuple的匯總結果所組成的流

?????*/

public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);

?

?

?Sliding window:

?

?Tuple被分組到各個window中,并且window每隔一定的時間間隔進行一次滑動。一個tuple可以屬于一個或者多個window

?

????/**

?????* 數為windowCountsliding window中每一個tuple的匯總結果所組成的流,并將sliding window向后滑動slideCount

?????*/

public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);

?

????/**

?????* 返回一個時間跨度為window向后滑動windowDurationsliding window中每一個tuple的匯總結果所組成的流,并將sliding window向后滑動windowDuration

?????*/

public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);

?

?

Common windowing API

?下面是接受任何被支持的windowing configuration的公共windowing API

?

public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)

?

windowConfig 可以是下面這幾種類型:

1.SlidingCountWindow.of(int windowCount, int slidingCount)

2.SlidingDurationWindow.of(BaseWindowedBolt.Duration?windowDuration,BaseWindowedBolt.Duration slidingDuration)

3.TumblingCountWindow.of(int windowLength)

4.TumblingCountWindow.of(int windowLength)

?

Trident windowing APIS 需要WindowsStoreFactory 來保存接收到的tuple和匯總值;現在已經提供的一個基礎的工廠是基于hbaseHBaseWindowsStoreFactory;它可以被擴展,用來支持不同場景的應用。

HBaseWindowStoreFactory 的例子如下:

?

// window-state table should already be created with cf:tuples column(要在hbase中提前建立好一個表叫window-state,并且已經在cf列族中添加了tuples)

HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));

????

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),

????????????new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),

????????????new Values("how many apples can you eat"), new Values("to be or not to be the person"));

????spout.setCycle(true);

?

TridentTopology topology = new TridentTopology();

?

Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),new Split(), new Fields("word"))

?????????????????????????.window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))

????????????.peek(new Consumer() {

????????????????@Override

????????????????public void accept(TridentTuple input) {

????????????????????LOG.info("Received tuple: [{}]", input);

????????????????}

????????????});

?

????StormTopology stormTopology = ?topology.build();

?

partitionAggregate

?

?partitionAggregate 在每一個批次的tuple的每一個分區上運行一個函數,和function(第一個介紹的那個)不同,partitionAggregate 處理后發送的tuple覆蓋了他所接收到的tuple來看看這個例子:

?

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

?

?假設輸入的流中的tuple包含字段 ["a", "b"],并且有一下這樣幾個分區的tuple流入了sum函數:

?

Partition 0:

["a", 1]

["b", 2]

?

Partition 1:

["a", 3]

["c", 8]

?

Partition 2:

["e", 1]

["d", 9]

["d", 10]

?

?

?然后輸出的流中只包含一個字段'sum'tuple的內容如下:

?

Partition 0:

[3]

?

Partition 1:

[11]

?

Partition 2:

[20]

?

?這里有三種不同的接口用來定義聚合器:CombinerAggregator, ReducerAggregator, Aggregator.

?

?CombinerAggregator接口定義的內容如下:

?

public interface CombinerAggregator<T> extends Serializable {

????T init(TridentTuple tuple);

????T combine(T val1, T val2);

????T zero();

}

?

?一個CombinerAggregator只能返回一個tuple并且該tuple只有一個字段。CombinerAggregator在每一個輸入的tuple上都會運行init方法來初始化值,然后使用combine方法來combine所有的值,直到只剩下一個

?值為止。如果分區內沒有任何tupleCombinerAggregator就發送zero方法產生的值。例如,這是Count的實現:

?

public class Count implements CombinerAggregator<Long> {

????public Long init(TridentTuple tuple) {

????????return 1L;

????}

?

????public Long combine(Long val1, Long val2) {

????????return val1 + val2;

????}

?

????public Long zero() {

????????return 0L;

????}

}

?

?

?當在aggregate方法中而不是在partitionAggregate方法中使用CombinerAggregators 的時候,你就能感受到它的好處了。在aggregate方法中,Trident會通過在網絡之間傳遞tuple之前進行局部分區聚合的

方式來優化計算。

?

?

?ReducerAggregator接口的定義如下:

?

public interface ReducerAggregator<T> extends Serializable {

????T init();

????T reduce(T curr, TridentTuple tuple);

}

?ReducerAggregator在初始化的時候設置一個初始值,然后迭代每一個輸入的tuplevalue來產生一個只有一個值的單一tuple來作為輸出。例如,下面是使用ReducerAggregator實現的Count函數:

?

?

public class Count implements ReducerAggregator<Long> {

????public Long init() {

????????return 0L;

????}

?

????public Long reduce(Long curr, TridentTuple tuple) {

????????return curr + 1;

????}

}

?

?

ReducerAggregator也可以被使用在persistentAggregate方法中,稍后你將會看到。

?

?最最通用的聚合接口就是Aggregator了,接口定義如下:

?

public interface Aggregator<T> extends Operation {

????T init(Object batchId, TridentCollector collector);

????void aggregate(T state, TridentTuple tuple, TridentCollector collector);

????void complete(T state, TridentCollector collector);

}

?

Aggregator們可以發送帶有任意數量字段的任意數量的tuple。在其方法執行的任何地方都可以發送tupleAggregator們按照如下的方式來執行:

?

1.初始化方法在處理一個batch之前被調用,返回結果是一個用來表示聚合狀態的對象,并且會被傳遞給aggregatecomplete方法中。

2.aggregate 方法在每一個batch分區的tuple上運行,這個方法可以更新初始化的那個狀態對象,并可以選擇性地發送一些消息。

3.complete 方法在batch分區上的所有tuple都被Aggregator處理后調用。

?

下面的例子演示如何使用Aggregator來實現一個Count

?

public class CountAgg extends BaseAggregator<CountState> {

????static class CountState {

????????long count = 0;

????}

?

????public CountState init(Object batchId, TridentCollector collector) {

????????return new CountState();

????}

?

????public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {

????????state.count+=1;

????}

?

????public void complete(CountState state, TridentCollector collector) {

????????collector.emit(new Values(state.count));

????}

}

?

有時候你想同時執行很多個aggregator,這種方式被稱為鏈式調用,可以像下面這樣使用:

mystream.chainedAgg()

????????.partitionAggregate(new Count(), new Fields("count"))

????????.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

????????.chainEnd()

?

?上面的代碼會在每一個分區上運行CountSum聚合器,輸出將會只有一個tuple,并包含 ["count", "sum"]字段。

?

?

?

stateQuery partitionPersist:

?

?stateQuery partitionPersist 分別可以查詢和更新作為數據資源的state;相關介紹在Trident state doc中。

?

?

projection

?projection方法作用在流上后,可以使流中只包含方法中指定的字段;假如你有一個包含字段["a", "b", "c", "d"]的流,然后你運行下面的方法:

?

mystream.project(new Fields("b", "d"))

?

?那么輸出的流中就會只包含字段 ["b", "d"]

?

?

Repartitioning operations:

?

?Repartitioning operations(重分區操作)運行一個可以改變tuple在各個任務之間是如何分區的函數,該函數的運行結果可能會改變分區的數目;例如,并行處理數大于重新分區后的分區數的時候。

重分區操作需要進行網絡傳輸,下面是提供的重分區函數:

?

?1.shuffle:隨機分配tuple到所有的目標分區中;

?2.broadcast:每一個tuple都會被重復的發送到每一個目標分區中;這在DRPC操作用很有用,例如:你需要在每一個分區的data上進行stateQuery的時候

?3.partitionBy:該函數接收一批字段,然后根據這批字段進行分區。這批字段會被進行哈希運算然后根據分區個數取模,然后根據運算結果進行分區。該函數保證相同一批字段的tuple一定會去到同一個分區

中。

?4.global:所有的tuple都被發送到同一個分區中。流中所有的batch都會選擇同一個分區。

?5.batchGlobal:在batch中的所有tuple都會進入同一個分區,但是不同的batch中的tuple可能會進入到不同的分區中。

?6.partition:該函數接收一個本地化的分區方法,本地化的分區方法需要實現org.apache.storm.grouping.CustomStreamGrouping

?

?

Aggregation operations(集合操作):

?

?Tridentaggregate persistentAggregate 方法來提供在一個流上進行聚合操作;aggregate 獨立地運行在流中的每一個batch上,persistentAggregate 會運行在流中的所有batch上,并且會把結果

保存在state中。

?

?運行aggregate方法會在流上進行全局的聚合。當你使用ReducerAggregator ReducerAggregator 的時候,首先流會被重新分組到一個單獨的分區中,然后分區函數在這個單獨的分區中運行;然而當你

使用CombinerAggregator的時候,Trident首先會在每一個分區上進行聚合,然后把每個分區的聚合結果重新分區到一個獨立的分區中,然后在完成網絡傳輸后完成全局聚合操作。CombinerAggregator比較

高效,你應該盡量的使用它。

?

?這里有一個例子展示如何使用aggregate 來獲得某個batch中的全局count

?

mystream.aggregate(new Count(), new Fields("count"))

?

?partitionAggregate一樣,aggregate中的聚合器可以以鏈式的方式進行調用;然而,如果你把一個CombinerAggregator 和一個不是CombinerAggregator 的聚合器鏈在一起后,storm就無法進行在每個分區

中預先進行聚合操作的優化了。

?

?你可以在Trident state doc中查看persistentAggregate的使用方式。

?

?

Operations on grouped streams(在分組流上的操作)

?

?groupBy 操作根據特定的字段運行一個partitionBy 操作來對流進行重新分區,然后在每一個分區中,把特定字段相同的tuple放到一個組中。下面是一個示例圖:

?

?

?

?

?如果你在一個分組后的流上運行aggregators ,那么聚合操作會在每一個組中運行,而不是在每個batch中運行。persistentAggregate 也可以運行在一個分組后的流上,在這種情況下

聚合后的結果會被保存在一個 MapState中,該 MapState使用用來分組的字段作為key。在Trident state doc中你可以找到更多答案。

?

?和普通的流一樣,運行在分組后的流上的aggregators 也可以進行鏈式調用。

?

?

Merges and joins:

?

?

?API的最后一部分就是把不同的流結合在一起,最最簡單的結合流的方式就是把幾個不同的流合并到同一個流中。你可以通過merge 方法(像下面這樣)來達到目的:

?

topology.merge(stream1, stream2, stream3);

?

?Trident會用第一個流的字段來重新命名其他合并的流的字段,在作為新的輸出流的字段

?

?另一種合結合流的方式就是join操作,現在來看一個標準的join操作,就像在SQL中的join操作一樣,join要求輸入是有限的,所以對于無限地不停地發送的流是不起作用的。在Trident

中的join操作僅僅作用于每一個有spout發出的很小的batch中;

?

?下面的例子在包含字段["key", "val1", "val2"] 的流和包含字段["x", "val1"]的另一個流上進行join操作:

?

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

?

?

?上面的代碼中,把stream1stream2通過keyxjoin在一起,Trident要求輸出流中所有的輸出字段都要起名字,因為在輸入流中可能會用重復的字段名稱。由join操作發出的tuple

會包含如下內容:

?

1.首先是鏈接字段的列表。在這里key等同以stream1中的key也等同于stream2中的x

2.然后就是所有流中沒有進行join的字段,這些字段按照傳遞進來的順序排序;在這個例子中,a=stream1.val1,b=stream1.val2,c=stream2.val1.

?當來自不同的spout的流和并的時候,這些spout會在發送batch上進行同步。也就是說一個要處理的batch會包含所有的參與的spout所發送的tuple

?你也許會好奇,該如何實現一個類似"windowed join"的操作,也就是說,來自一方的tuple和來自另一方的最近一小時的tuple進行join操作。

?要實現這樣的功能,你需要利用partitionPersist stateQuery,最近一小時tuple會被保存并且循環迭代在一個state中,以join操作的field作為key。然后stateQuery 將會通過join的字段查詢state中的數據來進行join操作。

總結

以上是生活随笔為你收集整理的Trident API 概览的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 日本一二三区在线视频 | 性色一区二区 | 人人妻人人澡人人爽人人精品 | 国内精品卡一卡二卡三 | av影视天堂 | 看av免费毛片手机播放 | 91精品久久久久久久 | 天堂成人在线观看 | 91不卡在线 | 秋霞影院一区二区 | 一二三区免费 | 久久在线播放 | 色综合天 | 免费看黄色片网站 | 女教师痴汉调教hd中字 | 人与禽一级全黄 | av噜噜 | 国产在线视频不卡 | 亚洲久操| 叶爱在线 | 操操操综合 | 国产高清一区二区三区 | 一区二区在线观看免费视频 | 91精品国产综合久久香蕉 | 天堂网av2018| 中文字幕日韩在线播放 | 好看的黄色网址 | www.日韩一区| 麻豆系列 | 精品国产免费一区二区三区 | 亚洲人成在线播放 | 欧美一级免费黄色片 | 麻豆回家视频区一区二 | 噜噜噜视频 | 日本一区二区免费看 | 久久久久亚洲精品中文字幕 | 老司机深夜福利在线观看 | 天天操人人射 | 少妇性l交大片免潘金莲 | 精品人伦一区二区 | 欧美国产一区二区三区 | 久久久噜噜噜久久中文字幕色伊伊 | 中文字幕精品国产 | 国产另类xxxxhd高清 | 在线观看国产精品视频 | 91视频最新地址 | 爱爱三级视频 | 日韩少妇一区二区 | 国产在线操 | 亚洲欧美激情另类 | 色欲久久久天天天综合网 | 永久免费av在线 | 鲁丝一区二区 | 日韩黄色高清视频 | 越南黄色一级片 | 国产艳情片 | 一级性爱视频 | 激情亚洲色图 | 91成人在线观看喷潮 | 亚洲综合在线视频 | 免费黄在线看 | 清冷学长被爆c躁到高潮失禁 | 国产一级二级三级视频 | 少妇免费看 | 蜜臀va | 999久久久 | 亚洲精品久久久久久动漫器材一区 | 日韩成人精品在线观看 | 国产骚b| 日韩精品成人av | 精品人妻在线一区二区三区 | 91欧美成人| 精品无码一区二区三区在线 | 亚洲图片欧美视频 | 久久夜色精品国产欧美乱极品 | 亚洲精品综合精品自拍 | 91久久精品国产91性色tv | 国产欧美在线观看 | 中国大陆一级毛片 | 91蝌蚪91密月 | 亚洲成av人影院 | 国产经典一区 | 日一区二区 | avav国产 | 国内自拍一区 | 日日夜夜天天干 | 亚洲a级精品 | 91亚洲网 | av大片免费看 | 中文字幕精 | 国产中文字幕在线观看 | 午夜影院18 | av日韩在线播放 | 另类三区 | www.com捏胸挤出奶 | 国产91免费在线观看 | 久久精品无码一区二区三区毛片 | 少妇人妻无码专区视频 | 中文字幕一区二区三区在线观看 |