trident原理及编程指南
trident原理及編程指南
@(STORM)[storm, 大數據]
- trident原理及編程指南
- 一理論介紹
- 一trident是什么
- 二trident處理單位
- 三事務類型
- 1spout類型
- 2state類型
- 3實現恰好一次的spout與state組合類型
- 二編程指南
- 1定義輸入流
- 2統計單詞數量
- 3輸出統計結果
- 4split的字義
- 三使用kafka作為數據源
- 1定義kafka相關配置
- 2從kafka中讀取消息并處理
- 3提交拓撲
- 四State示例
- 1主類
- 2Aggregator的用法
- 1Aggregator接口
- 2init方法
- 3aggregate方法
- 4complete方法
- 3state的用法
- 1拓撲定義
- 2工廠類NameSumStateFactory
- 3更新類NameSumUpdater
- 4狀態類NameSumState
- 4state應用思路總結
一、理論介紹
(一)trident是什么?
Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you’re familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.
簡單的說,trident是storm的更高層次抽象,相對storm,它主要提供了3個方面的好處:
(1)提供了更高層次的抽象,將常用的count,sum等封裝成了方法,可以直接調用,不需要自己實現。
(2)以批次代替單個元組,每次處理一個批次的數據。
(3)提供了事務支持,可以保證數據均處理且只處理了一次。
(二)trident處理單位
trident每次處理消息均為batch為單位,即一次處理多個元組。
(三)事務類型
關于事務類型,有2個比較容易混淆的概念:spout的事務類型以及事務狀態。
它們都有3種類型,分別為:事務型、非事務型和透明事務型。
1、spout類型
spout的類型指定了由于下游出現問題(fail被調用,或者超時無回復)導致元組需要重放時,應該怎么發送元組。
事務型spout:重放時能保證同一個批次發送同一批元組。可以保證每一個元組都被發送且只發送一個,且同一個批次所發送的元組是一樣的。
非事務型spout:沒有任何保障,發完就算。
透明事務型spout:同一個批次發送的元組有可能不同的,它可以保證每一個元組都被發送且只發送一次,但不能保證重放時同一個批次的數據是一樣的。這對于部分失效的情況尤其有用,假如以kafka作為spout,當一個topic的某個分區失效時,可以用其它分區的數據先形成一個批次發送出去,如果是事務型spout,則必須等待那個分區恢復后才能繼續發送。
這三種類型可以分別通過實現ITransactionalSpout、ITridentSpout、IOpaquePartitionedTridentSpout接口來定義。
2、state類型
state的類型指定了如果將storm的中間輸出或者最終輸出持久化到某個地方(如內存),當某個批次的數據重放時應該如果更新狀態。state對于下游出現錯誤的情況尤其有用。
事務型狀態:同一批次tuple提供的結果是相同的。
非事務型狀態:沒有回滾能力,更新操作是永久的。
透明事務型狀態:更新操作基于先前的值,這樣由于這批數據發生變化,對應的結果也會發生變化。透明事務型狀態除了保存當前數據外,還要保存上一批數據,當數據重放時,可以基于上一批數據作更新。
注意,此處的狀態應該是原子性的,比如將狀態寫入hbase,則應該全部寫入,或者全部沒寫入,不能說寫入一半,另一半沒寫入,這連事務型也無法保證恰好一次了。比如說寫入本地磁盤,就有可能導致這種情況,如果寫到一半出錯,則無法保證恰好一次了,因為磁盤沒有類似于數據庫的commit、rollback操作。
3、實現恰好一次的spout與state組合類型
由上表可以看出:
(1)當spout與state均為transcational或者均為opaque時,可以實現恰好一次。
(2)當spout為tansactional,state為opaque時,也可以實現恰好一次。
(3)但當spout為opaque,state為transactional時,不可以實現恰好一次。因此opaque spout重發時,它的內容可能與上一次不同,而state如果在上個批次已經更新過但這個批次最終fail了,則spout重發時,會在已經fail掉的批次上更新,而上一個批次是不應該計算在內的。如果state是transactional的,則它同時保存了上一次狀態及當前狀態,所以可以基于上一次的狀態作更新,就不會有這個問題。
二、編程指南
代碼如下
package org.ljh.tridentdemo;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple;public class TridentWordCount {public static class Split extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}}}public static StormTopology buildTopology(LocalDRPC drpc) {FixedBatchSpout spout =new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);//創建拓撲對象TridentTopology topology = new TridentTopology();//這個流程用于統計單詞數據,結果將被保存在wordCounts中TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);//這個流程用于查詢上面的統計結果topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();}public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));}} }實例實現了最基本的wordcount功能,然后將結果輸出。關鍵步驟如下:
1、定義輸入流
FixedBatchSpout spout =new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);(1)使用FixedBatchSpout創建一個輸入spout,spout的輸出字段為sentence,每3個元組作為一個batch。
(2)數據不斷的重復發送。
2、統計單詞數量
TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);這個流程用于統計單詞數據,結果將被保存在wordCounts中。6行代碼的含義分別為:
(1)首先從spout中讀取消息,spout1定義了zookeeper中用于保存這個拓撲的節點名稱。
(2)并行度設置為16,即16個線程同時從spout中讀取消息。
(3)each中的三個參數分別為:輸入字段名稱,處理函數,輸出字段名稱。即從字段名稱叫sentence的數據流中讀取數據,然后經過new Split()處理后,以word作為字段名發送出去。其中new Split()后面介紹,它的功能就是將輸入的內容以空格為界作了切分。
(4)將字段名稱為word的數據流作分組,即相同值的放在一組。
(5)將已經分好組的數據作統計,結果放到MemoryMapState,然后以count作為字段名稱將結果發送出去。這步驟會同時存儲數據及狀態,并將返回TridentState對象。
(6)并行度設置。
3、輸出統計結果
topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));這個流程從上述的wordCounts對象中讀取結果,并返回。6行代碼的含義分別為:
(1)等待一個drpc調用,從drpc服務器中接受words的調用來提供消息。調用代碼如下:
drpc.execute("words", "cat the dog jumped")(2)輸入為上述調用中提供的參數,經過Split()后,以word作為字段名稱發送出去。
(3)以word的值作分組。
(4)從wordCounts對象中查詢結果。4個參數分別代表:數據來源,輸入數據,內置方法(用于從map中根據key來查找value),輸出名稱。
(5)過濾掉空的查詢結果,如本例中,cat和dog都沒有結果。
(6)將結果作統計,并以sum作為字段名稱發送出去,這也是DRPC調用所返回的結果。如果沒有這一行,最后的輸出結果
加上這一行后,結果為:
DRPC RESULT: [[180]]4、split的字義
public static class Split extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}} }注意它最后會發送數據。
5、創建并啟動拓撲
public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));} }(1)當無參數運行時,啟動一個本地的集群,及自已創建一個drpc對象來輸入。
(2)當有參數運行時,設置worker數量為3,然后提交拓撲到集群,并等待遠程的drpc調用。
三、使用kafka作為數據源
package com.netease.sytopology;import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.Arrays;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import storm.kafka.BrokerHosts; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.trident.OpaqueTridentKafkaSpout; import storm.kafka.trident.TridentKafkaConfig; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/** 本類完成以下內容*/ public class SyTopology {public static final Logger LOG = LoggerFactory.getLogger(SyTopology.class);private final BrokerHosts brokerHosts;public SyTopology(String kafkaZookeeper) {brokerHosts = new ZkHosts(kafkaZookeeper);}public StormTopology buildTopology() {TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());// TransactionalTridentKafkaSpout kafkaSpout = new// TransactionalTridentKafkaSpout(kafkaConfig);OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);TridentTopology topology = new TridentTopology();// TridentState wordCounts =topology.newStream("kafka4", kafkaSpout).each(new Fields("str"), new Split(),new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);// .persistentAggregate(new HazelCastStateFactory(), new Count(),// new Fields("aggregates_words")).parallelismHint(2);return topology.build();}public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {String kafkaZk = args[0];SyTopology topology = new SyTopology(kafkaZk);Config config = new Config();config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);String name = args[1];String dockerIp = args[2];config.setNumWorkers(9);config.setMaxTaskParallelism(5);config.put(Config.NIMBUS_HOST, dockerIp);config.put(Config.NIMBUS_THRIFT_PORT, 6627);config.put(Config.STORM_ZOOKEEPER_PORT, 2181);config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));StormSubmitter.submitTopology(name, config, topology.buildTopology());}static class Split extends BaseFunction {public void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(",")) {try {FileWriter fw = new FileWriter(new File("/home/data/test/ma30/ma30.txt"),true);fw.write(word);fw.flush();fw.close();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}collector.emit(new Values(word));}}} }本例將從kafka中讀取消息,然后對消息根據“,”作拆分,并寫入一個本地文件。
1、定義kafka相關配置
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);其中ma30是訂閱的topic名稱。
2、從kafka中讀取消息并處理
topology.newStream("kafka4", kafkaSpout).each(new Fields("str"), new Split(),new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);(1)指定了數據來源,并指定zookeeper中用于保存數據的位置,即保存在/transactional/kafka4。
(2)指定處理方法及發射的字段。
(3)根據word作分組。
(4)計數后將狀態寫入MemoryMapState
3、提交拓撲:
storm jar target/sytopology2-0.0.1-SNAPSHOT.jar com.netease.sytopology.SyTopology 192.168.172.98:2181/kafka test3 192.168.172.98此時可以在/home/data/test/ma30/ma30.txt看到split的結果
四、State示例
trident通過spout的事務性與state的事務處理,保證了恰好一次的語義。這里介紹了如何使用state。
完整代碼請見 https://github.com/lujinhong/tridentdemo
1、主類
主類定義了拓撲的整體邏輯,這個拓撲通過一個固定的spout循環產生數據,然后統計消息中每個名字出現的次數。
拓撲中先將消息中的內容提取出來成name, age, title, tel4個field,然后通過project只保留name字段供統計,接著按照name分區后,為每個分區進行聚合,最后將聚合結果通過state寫入map中。
storm.trident.Stream Origin_Stream = topology.newStream("tridentStateDemoId", spout).parallelismHint(3).shuffle().parallelismHint(3).each(new Fields("msg"), new Splitfield(),new Fields("name", "age", "title", "tel")).parallelismHint(3).project(new Fields("name")) //其實沒什么必要,上面就不需要發射BCD字段,但可以示范一下project的用法.parallelismHint(3).partitionBy(new Fields("name")); //根據name的值作分區Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue")).partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());2、Aggregator的用法
這里涉及了一些trident常用的API,但project等相對容易理解,這里只介紹partitionAggregate的用法。
再看看上面代碼中對partitionAggregate的使用:
Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))第一,三個參數分別表示輸入流的名稱與輸出流的名稱。中間的NameCountAggregator是一個Aggregator的對象,它定義了如何對輸入流進行聚合。我們看一下它的代碼:
public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判斷某個名字是否已經存在于map中,若無,則put,若有,則遞增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//將聚合后的結果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}(1)Aggregator接口
它實現了Aggregator接口,這個接口有3個方法:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); }init方法:在處理batch之前被調用。init的返回值是一個表示聚合狀態的對象,該對象會被傳遞到aggregate和complete方法。
aggregate方法:為每個在batch分區的輸入元組所調用,更新狀態
complete方法:當batch分區的所有元組已經被aggregate方法處理完后被調用。
除了實現Aggregator接口,還可以實現ReducerAggregator或者CombinerAggregator,它們使用更方便。詳見《從零開始學storm》或者官方文檔
https://storm.apache.org/documentation/Trident-API-Overview.html
下面我們看一下這3個方法的實現。
(2)init方法
@Override public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>(); }僅初始化了一個HashMap對象,這個對象會作為參數傳給aggregate和complete方法。對一個batch只執行一次。
(3)aggregate方法
aggregate方法對于batch內的每一個tuple均執行一次。這里將這個batch內的名字出現的次數放到init方法所初始化的map中。
@Override public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);} }(4)complete方法
這里在complete將aggregate處理完的結果發送出去,實際上可以在任何地方emit,比如在aggregate里面。
這個方法對于一個batch也只執行一次。
3、state的用法
(1)拓撲定義
先看一下主類中如何將結果寫入state:
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());它的定義為:
TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)其中的第二個參數比較容易理解,就是輸入流的名稱,這里是名字與它出現的個數。下面先看一下Facotry。
(2)工廠類:NameSumStateFactory
很簡單,它實現了StateFactory,只有一個方法makeState,返回一個State類型的對象。
public class NameSumStateFactory implements StateFactory {private static final long serialVersionUID = 8753337648320982637L;@Overridepublic State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) {return new NameSumState(); } }(3)更新類:NameSumUpdater
這個類繼承自BaseStateUpdater,它的updateState對batch的內容進行處理,這里是將batch的內容放到一個map中,然后調用setBulk方法
public class NameSumUpdater extends BaseStateUpdater<NameSumState> {private static final long serialVersionUID = -6108745529419385248L;public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) {Map<String,Integer> map=new HashMap<String,Integer>();for(TridentTuple t: tuples) {map.put(t.getString(0), t.getInteger(1));}state.setBulk(map);} }(4)狀態類:NameSumState
這是state最核心的類,它實現了大部分的邏輯。NameSumState實現了State接口:
public interface State {void beginCommit(Long txid); void commit(Long txid); }分別在提交之前與提交成功的時候調用,在這里只打印了一些信息。
另外NameSumState還定義了如何處理NameSumUpdater傳遞的消息:
public void setBulk(Map<String, Integer> map) {// 將新到的tuple累加至map中for (Entry<String, Integer> entry : map.entrySet()) {String key = entry.getKey();if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}}System.out.println("-------");// 將map中的當前狀態打印出來。for (Entry<String, Integer> entry : this.map.entrySet()) {String Key = entry.getKey();Integer Value = entry.getValue();System.out.println(Key + "|" + Value);} }即將NameSumUpdater傳送過來的內容寫入一個HashMap中,并打印出來。
此處將state記錄在一個HashMap中,如果需要記錄在其它地方,如mysql,則使用jdbc寫入mysql代替下面的map操作即可。
事實上,這個操作不一定要在state中執行,可以在任何類中,但建議還是在state類中實現。
4、state應用思路總結
(1)使用state,你不再需要比較事務id,在數據庫中同時寫入多個值等內容,而是專注于你的邏輯實現
(2)除了實現State接口,更常用的是實現MapState接口,下次補充。
(3)在拓撲中指定了StateFactory,這個工廠類找到相應的State類。而Updater則每個批次均會調用它的方法。State中則定義了如何保存數據,這里將數據保存在內存中的一個HashMap,還可以保存在mysql, hbase等等。
(4)trident會自動比較txid的值,如果和當前一樣,則不更改狀態,如果是當前txid的下一個值,則更新狀態。這種邏輯不需要用戶處理。
(5)如果需要實現透明事務狀態,則需要保存當前值與上一個值,在update的時候2個要同時處理。即邏輯由自己實現。在本例子中,大致思路是在NameSumState中創建2個HashMap,分別對應當前與上一個狀態的值,而NameSumUpdater每次更新這2個Map。
總結
以上是生活随笔為你收集整理的trident原理及编程指南的全部內容,希望文章能夠幫你解決所遇到的問題。