日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

在线实时大数据平台Storm输入源共享试验

發(fā)布時(shí)間:2025/4/16 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 在线实时大数据平台Storm输入源共享试验 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1、背景:topology程序提交集群模式運(yùn)行試驗(yàn),驗(yàn)證在同一文件輸入源情況下,worker之間是否會(huì)重復(fù)輸入處理,以及數(shù)據(jù)變量能否在不同worker之間共享,如果文件新增數(shù)據(jù),topology會(huì)不會(huì)獲取最新數(shù)據(jù)處理。

2、試驗(yàn)代碼:

package cn.wc;import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class TopologyMain {public static void main(String[] args) throws InterruptedException {//ConfigurationConfig conf = new Config();conf.setNumWorkers(3);//設(shè)置3個(gè)進(jìn)程conf.put("wordsFile", args[0]);conf.put("output", args[1]);//Topology definitionTopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader(),3);builder.setBolt("word-normalizer",new WordNormalizer(),3).setNumTasks(6).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),3).fieldsGrouping("word-normalizer", new Fields("word"));//集群模式try {StormSubmitter.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());} catch (AlreadyAliveException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InvalidTopologyException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (AuthorizationException e) {// TODO Auto-generated catch blocke.printStackTrace();} } } package cn.wc;import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;//Spout作為數(shù)據(jù)源,它實(shí)現(xiàn)了IRichSpout接口,功能是讀取一個(gè)文本文件并把它的每一行內(nèi)容發(fā)送給bolt。 public class WordReader extends BaseRichSpout {private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line* spout最主要的方法,讀取文本文件,并把它的每一行發(fā)射出去(給bolt) * 這個(gè)方法會(huì)不斷被調(diào)用,為了降低它對(duì)CPU的消耗,當(dāng)任務(wù)完成時(shí)讓它sleep一下 */public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their* 發(fā)射每一行,Values是一個(gè)ArrayList的實(shí)現(xiàn) */this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object* 三個(gè)參數(shù),第一個(gè)是創(chuàng)建Topology時(shí)的配置,第二個(gè)是所有的Topology數(shù)據(jù),第三個(gè)是用來把Spout的數(shù)據(jù)發(fā)射給bolt * */public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {//獲取創(chuàng)建Topology時(shí)指定的要讀取的文件路徑 this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");}//初始化發(fā)射器this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));} }
package cn.wc;import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;//Spout已經(jīng)成功讀取文件并把每一行作為一個(gè)tuple(在Storm數(shù)據(jù)以tuple的形式傳遞)發(fā)射過來,我們這里需要?jiǎng)?chuàng)建兩個(gè)bolt分別來負(fù)責(zé)解析每一行和對(duì)單詞計(jì)數(shù)。 //Bolt中最重要的是execute方法,每當(dāng)一個(gè)tuple傳過來時(shí)它便會(huì)被調(diào)用。 public class WordNormalizer extends BaseBasicBolt {public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this * bolt中最重要的方法,每當(dāng)接收到一個(gè)tuple時(shí),此方法便被調(diào)用 * 這個(gè)方法的作用就是把文本文件中的每一行切分成一個(gè)個(gè)單詞,并把這些單詞發(fā)射出去(給下一個(gè)bolt處理) */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} } package cn.wc;import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map;import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {Integer id;String name;Map<String, Integer> counters;String output=null;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters* Topology執(zhí)行完畢的清理工作,比如關(guān)閉連接、釋放資源等操作都會(huì)寫在這里 */@Overridepublic void cleanup() {/*System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}*/}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();output=stormConf.get("output").toString();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);/*** If the word dosn't exist in the map we will create* this, if not We will add 1 */if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}//寫入文件try{File file=new File(output);if(!file.exists())file.createNewFile();FileOutputStream out=new FileOutputStream(file,true); for(Map.Entry<String, Integer> entry : counters.entrySet()){StringBuffer sb=new StringBuffer();sb.append(entry.getKey()+": "+entry.getValue());sb.append("\r\n");out.write(sb.toString().getBytes("utf-8"));} }catch (IOException e){e.printStackTrace();}} }
3、結(jié)果分析:

集群環(huán)境下執(zhí)行:storm jar /mnt/wc.jar cn.wc.TopologyMain /mnt/words.txt /tmp/topo.log
/*并行和通信試驗(yàn):
?* 設(shè)置worker為3,啟動(dòng)3個(gè)進(jìn)程來服務(wù)這個(gè)topology
?* spout/bolt的線程線程設(shè)置為3,默認(rèn)對(duì)應(yīng)一個(gè)task,就是一個(gè)進(jìn)程跑一個(gè)task,總共有9個(gè);
?* 現(xiàn)在對(duì)word-normalizer這個(gè)bolt設(shè)置任務(wù)6個(gè),那就是每個(gè)進(jìn)程分2個(gè),現(xiàn)在總共12個(gè)task;
?* 總的來說:worker進(jìn)程有3個(gè),executor線程有9個(gè),task任務(wù)有12個(gè);
?* 輸入:/mnt/words.txt 輸出:/tmp/topo.log
*/

1)storm list發(fā)現(xiàn)task是15個(gè),不是12個(gè),怎么算就有點(diǎn)疑惑了;

2)輸入的詞匯,明顯被重復(fù)統(tǒng)計(jì)3次,也就是說3個(gè)executor在同一文件輸入源下,不會(huì)自動(dòng)去協(xié)調(diào)輸入記錄從而排斥;

3)topology程序中設(shè)置的變量,無法再executor之間共享;

4)輸入的文件新增詞匯,topology沒有及時(shí)去獲取統(tǒng)計(jì),當(dāng)然topology仍然在集群中運(yùn)行


4、總結(jié):

? ? ? 1)一個(gè)topology被提交到不同節(jié)點(diǎn)的不同worker(進(jìn)程)分布執(zhí)行,要按照獨(dú)立進(jìn)程來看;

? ? ? 2)worker內(nèi)要有自己唯一的輸入源,同時(shí)要確保輸入源是持續(xù)提供;

? ? ? 3)要在worker之間共享數(shù)據(jù)變量,只能通過其他辦法,如redis來存儲(chǔ);

? ? ? 也就是說:topology被提交到集群分布式執(zhí)行,不同worker之間是獨(dú)立進(jìn)程運(yùn)作。

? ? ??


總結(jié)

以上是生活随笔為你收集整理的在线实时大数据平台Storm输入源共享试验的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。