在线实时大数据平台Storm并行度试验
集群模式試驗(yàn):同一文件輸入數(shù)據(jù)如何處理,數(shù)據(jù)變量共享
1)集群模式一個(gè)worker內(nèi)一個(gè)spout一個(gè)Bolt
jps:1個(gè)worker
storm list:1個(gè)wokers,4個(gè)tasks
2)集群模式一個(gè)worker內(nèi)一個(gè)spout 兩個(gè)Bolt
jps:1個(gè)worker
storm list:1個(gè)wokers,6個(gè)tasks
不同bolt線程之間對(duì)變量counter是互斥讀寫(xiě)的。試驗(yàn)證明多bolt可以同時(shí)對(duì)同一變量進(jìn)行操作。
3)集群模式一個(gè)worker內(nèi)兩個(gè)spout兩個(gè)Bolt
jps:1個(gè)worker
storm list:1個(gè)wokers,7個(gè)tasks
試驗(yàn)證明多spout對(duì)同一文件輸入源會(huì)重復(fù)處理數(shù)據(jù),spout線程間對(duì)輸入源不會(huì)互斥讀取。
基于以上三個(gè)試驗(yàn),說(shuō)明在同一進(jìn)程內(nèi)(worker),spout線程間對(duì)輸入不能互斥(會(huì)重復(fù)處理數(shù)據(jù),只能每個(gè)spout提供不同輸入源),bolt線程間對(duì)變量是互斥的。可以理解,進(jìn)程內(nèi)部對(duì)多線程共享變量是有互斥控制,但對(duì)外部數(shù)據(jù)是不控制(spout是獲取外部數(shù)據(jù)的)。
4)集群模式兩個(gè)worker內(nèi)一個(gè)spout一個(gè)Bolt
jps:2個(gè)worker
storm list:2個(gè)wokers,5個(gè)tasks
這個(gè)試驗(yàn)意義不大,主要是觀察worker和task數(shù)。只有一個(gè)spout不會(huì)重復(fù)處理數(shù)據(jù)。
5)集群模式兩個(gè)worker內(nèi)一個(gè)spout兩個(gè)Bolt
jps:2個(gè)worker
storm list:2個(gè)wokers,7個(gè)tasks
這個(gè)試驗(yàn)意義不大,主要是觀察worker和task數(shù)。只有一個(gè)spout不會(huì)重復(fù)處理數(shù)據(jù),多bolt間可以互斥訪問(wèn)變量。
6)集群模式兩個(gè)worker內(nèi)兩個(gè)spout兩個(gè)Bolt
jps:2個(gè)worker
storm list:2個(gè)wokers,8個(gè)tasks
多spout會(huì)重復(fù)讀取同一輸入源的數(shù)據(jù)。跨進(jìn)程不能共享變量。
通過(guò)上面試驗(yàn)可以得出:
1)進(jìn)程間(worker)是不能共享互斥訪問(wèn)變量;
2)線程間(spout)是不能共享互斥讀取同一文件;
3)線程間(bolt)是可以共享互斥訪問(wèn)變量;
通過(guò)上面這個(gè)圖,更好理解:
1)多個(gè)spout要提供不同輸入源,同一文件會(huì)重復(fù)處理;
2)多個(gè)bolt間可以匯聚統(tǒng)計(jì)不同spout發(fā)射過(guò)來(lái)的同主題數(shù)據(jù);通過(guò)上面試驗(yàn),實(shí)際上,對(duì)spout/bolt框架還是不能夠全面了解,那些代碼是storm框架控制,那些是自己控制,搞明白這個(gè),就是在變量定義以及數(shù)據(jù)是否存儲(chǔ)到磁盤(pán)來(lái)共享設(shè)計(jì)topology。
代碼如下:
package cn.wc;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class TopologyMain {public static void main(String[] args) throws InterruptedException {//Configuration Config conf = new Config(); conf.setNumWorkers(1);//設(shè)置2個(gè)進(jìn)程 conf.put("inpath", args[0]); //輸入文件路徑conf.put("outpath", args[1]); //輸出結(jié)果路徑//Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader",new WordReader(),2); builder.setBolt("word-normalizer",new WordNormalizer(),2).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word")); //集群模式 try { //storm jar /mnt/wc.jar cn.wc.TopologyMain /tmp/topoin.txt /tmp/topoout.logStormSubmitter.submitTopology("topoword", conf, builder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } package cn.wc;import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;//Spout作為數(shù)據(jù)源,它實(shí)現(xiàn)了IRichSpout接口,功能是讀取一個(gè)文本文件并把它的每一行內(nèi)容發(fā)送給bolt。 public class WordReader extends BaseRichSpout {private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;boolean ass=false;public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line* spout最主要的方法,讀取文本文件,并把它的每一行發(fā)射出去(給bolt) * 這個(gè)方法會(huì)不斷被調(diào)用,為了降低它對(duì)CPU的消耗,當(dāng)任務(wù)完成時(shí)讓它sleep一下 */public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their* 發(fā)射每一行,Values是一個(gè)ArrayList的實(shí)現(xiàn) */if(str=="a" && ass) return;//如果對(duì)行值為a已經(jīng)處理,就返回if(str=="a" && !ass) {//用于判斷跨進(jìn)程是否可以共享變量ass = true;this.collector.emit(new Values(str),str); }elsethis.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object* 三個(gè)參數(shù),第一個(gè)是創(chuàng)建Topology時(shí)的配置,第二個(gè)是所有的Topology數(shù)據(jù),第三個(gè)是用來(lái)把Spout的數(shù)據(jù)發(fā)射給bolt * */public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {//獲取創(chuàng)建Topology時(shí)指定的要讀取的文件路徑 this.fileReader = new FileReader(conf.get("inpath").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("inpath")+"]");}//初始化發(fā)射器this.collector = collector;}/*** Declare the output field "line"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));} } package cn.wc;import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;//Spout已經(jīng)成功讀取文件并把每一行作為一個(gè)tuple(在Storm數(shù)據(jù)以tuple的形式傳遞)發(fā)射過(guò)來(lái),我們這里需要?jiǎng)?chuàng)建兩個(gè)bolt分別來(lái)負(fù)責(zé)解析每一行和對(duì)單詞計(jì)數(shù)。 //Bolt中最重要的是execute方法,每當(dāng)一個(gè)tuple傳過(guò)來(lái)時(shí)它便會(huì)被調(diào)用。 public class WordNormalizer extends BaseBasicBolt {public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this * bolt中最重要的方法,每當(dāng)接收到一個(gè)tuple時(shí),此方法便被調(diào)用 * 這個(gè)方法的作用就是把文本文件中的每一行切分成一個(gè)個(gè)單詞,并把這些單詞發(fā)射出去(給下一個(gè)bolt處理) */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} } package cn.wc;import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map;import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {Integer id;String name;Map<String, Integer> counters;String outpath=null;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters* Topology執(zhí)行完畢的清理工作,比如關(guān)閉連接、釋放資源等操作都會(huì)寫(xiě)在這里 */@Overridepublic void cleanup() {/*System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}*/}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();outpath=stormConf.get("outpath").toString();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);Integer c=1;/*** If the word dosn't exist in the map we will create* this, if not We will add 1 */if(!counters.containsKey(str)){counters.put(str, 1);}else{c = counters.get(str) + 1;counters.put(str, c);}//寫(xiě)入文件try{File file=new File(outpath);if(!file.exists())file.createNewFile();FileOutputStream out=new FileOutputStream(file,true); StringBuffer sb=new StringBuffer();sb.append(str+": "+c);sb.append("\r\n");out.write(sb.toString().getBytes("utf-8")); }catch (IOException e){e.printStackTrace();}} }總結(jié)
以上是生活随笔為你收集整理的在线实时大数据平台Storm并行度试验的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 在线实时大数据平台Storm输入源共享试
- 下一篇: 算法导论之用于不相交集合的数据结构