在线实时大数据平台Storm输入源共享试验
1、背景:topology程序提交集群模式運行試驗,驗證在同一文件輸入源情況下,worker之間是否會重復輸入處理,以及數據變量能否在不同worker之間共享,如果文件新增數據,topology會不會獲取最新數據處理。
2、試驗代碼:
package cn.wc;import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class TopologyMain {public static void main(String[] args) throws InterruptedException {//ConfigurationConfig conf = new Config();conf.setNumWorkers(3);//設置3個進程conf.put("wordsFile", args[0]);conf.put("output", args[1]);//Topology definitionTopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader(),3);builder.setBolt("word-normalizer",new WordNormalizer(),3).setNumTasks(6).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),3).fieldsGrouping("word-normalizer", new Fields("word"));//集群模式try {StormSubmitter.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());} catch (AlreadyAliveException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InvalidTopologyException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (AuthorizationException e) {// TODO Auto-generated catch blocke.printStackTrace();} } } package cn.wc;import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;//Spout作為數據源,它實現了IRichSpout接口,功能是讀取一個文本文件并把它的每一行內容發送給bolt。 public class WordReader extends BaseRichSpout {private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line* spout最主要的方法,讀取文本文件,并把它的每一行發射出去(給bolt) * 這個方法會不斷被調用,為了降低它對CPU的消耗,當任務完成時讓它sleep一下 */public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their* 發射每一行,Values是一個ArrayList的實現 */this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object* 三個參數,第一個是創建Topology時的配置,第二個是所有的Topology數據,第三個是用來把Spout的數據發射給bolt * */public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {//獲取創建Topology時指定的要讀取的文件路徑 this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");}//初始化發射器this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));} }package cn.wc;import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;//Spout已經成功讀取文件并把每一行作為一個tuple(在Storm數據以tuple的形式傳遞)發射過來,我們這里需要創建兩個bolt分別來負責解析每一行和對單詞計數。 //Bolt中最重要的是execute方法,每當一個tuple傳過來時它便會被調用。 public class WordNormalizer extends BaseBasicBolt {public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this * bolt中最重要的方法,每當接收到一個tuple時,此方法便被調用 * 這個方法的作用就是把文本文件中的每一行切分成一個個單詞,并把這些單詞發射出去(給下一個bolt處理) */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} } package cn.wc;import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map;import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {Integer id;String name;Map<String, Integer> counters;String output=null;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters* Topology執行完畢的清理工作,比如關閉連接、釋放資源等操作都會寫在這里 */@Overridepublic void cleanup() {/*System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}*/}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();output=stormConf.get("output").toString();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);/*** If the word dosn't exist in the map we will create* this, if not We will add 1 */if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}//寫入文件try{File file=new File(output);if(!file.exists())file.createNewFile();FileOutputStream out=new FileOutputStream(file,true); for(Map.Entry<String, Integer> entry : counters.entrySet()){StringBuffer sb=new StringBuffer();sb.append(entry.getKey()+": "+entry.getValue());sb.append("\r\n");out.write(sb.toString().getBytes("utf-8"));} }catch (IOException e){e.printStackTrace();}} }
3、結果分析:
集群環境下執行:storm jar /mnt/wc.jar cn.wc.TopologyMain /mnt/words.txt /tmp/topo.log
/*并行和通信試驗:
?* 設置worker為3,啟動3個進程來服務這個topology
?* spout/bolt的線程線程設置為3,默認對應一個task,就是一個進程跑一個task,總共有9個;
?* 現在對word-normalizer這個bolt設置任務6個,那就是每個進程分2個,現在總共12個task;
?* 總的來說:worker進程有3個,executor線程有9個,task任務有12個;
?* 輸入:/mnt/words.txt 輸出:/tmp/topo.log
*/
1)storm list發現task是15個,不是12個,怎么算就有點疑惑了;
2)輸入的詞匯,明顯被重復統計3次,也就是說3個executor在同一文件輸入源下,不會自動去協調輸入記錄從而排斥;
3)topology程序中設置的變量,無法再executor之間共享;
4)輸入的文件新增詞匯,topology沒有及時去獲取統計,當然topology仍然在集群中運行
4、總結:
? ? ? 1)一個topology被提交到不同節點的不同worker(進程)分布執行,要按照獨立進程來看;
? ? ? 2)worker內要有自己唯一的輸入源,同時要確保輸入源是持續提供;
? ? ? 3)要在worker之間共享數據變量,只能通過其他辦法,如redis來存儲;
? ? ? 也就是說:topology被提交到集群分布式執行,不同worker之間是獨立進程運作。
? ? ??
總結
以上是生活随笔為你收集整理的在线实时大数据平台Storm输入源共享试验的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: centos磁盘空间满查询和移动命令小记
- 下一篇: 在线实时大数据平台Storm并行度试验