日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm累计求和Demo并且在集群上运行

發(fā)布時間:2025/3/19 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm累计求和Demo并且在集群上运行 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?打成jar包放在主節(jié)點上去運行.

1 import java.util.Map; 2 3 import backtype.storm.Config; 4 import backtype.storm.StormSubmitter; 5 import backtype.storm.generated.AlreadyAliveException; 6 import backtype.storm.generated.InvalidTopologyException; 7 import backtype.storm.spout.SpoutOutputCollector; 8 import backtype.storm.task.OutputCollector; 9 import backtype.storm.task.TopologyContext; 10 import backtype.storm.topology.OutputFieldsDeclarer; 11 import backtype.storm.topology.TopologyBuilder; 12 import backtype.storm.topology.base.BaseRichBolt; 13 import backtype.storm.topology.base.BaseRichSpout; 14 import backtype.storm.tuple.Fields; 15 import backtype.storm.tuple.Tuple; 16 import backtype.storm.tuple.Values; 17 import backtype.storm.utils.Utils; 18 19 /** 20 * 在集群運行 21 * 數(shù)字累加求和 22 * 先添加storm依賴 23 * 24 * @author Administrator 25 * 26 */ 27 public class StormTopologySum { 28 29 30 /** 31 * spout需要繼承baserichspout,實現(xiàn)未實現(xiàn)的方法 32 * @author Administrator 33 * 34 */ 35 public static class MySpout extends BaseRichSpout{ 36 private Map conf; 37 private TopologyContext context; 38 private SpoutOutputCollector collector; 39 40 /** 41 * 初始化方法,只會執(zhí)行一次 42 * 在這里面可以寫一個初始化的代碼 43 * Map conf:其實里面保存的是topology的一些配置信息 44 * TopologyContext context:topology的上下文,類似于servletcontext 45 * SpoutOutputCollector collector:發(fā)射器,負責向外發(fā)射數(shù)據(jù)(tuple) 46 */ 47 @Override 48 public void open(Map conf, TopologyContext context, 49 SpoutOutputCollector collector) { 50 this.conf = conf; 51 this.context = context; 52 this.collector = collector; 53 } 54 55 int num = 1; 56 /** 57 * 這個方法是spout中最重要的方法, 58 * 這個方法會被storm框架循環(huán)調用,可以理解為這個方法是在一個while循環(huán)之內 59 * 每調用一次,會向外發(fā)射一條數(shù)據(jù) 60 */ 61 @Override 62 public void nextTuple() { 63 System.out.println("spout發(fā)射:"+num); 64 //把數(shù)據(jù)封裝到values中,稱為一個tuple,發(fā)射出去 65 this.collector.emit(new Values(num++)); 66 Utils.sleep(1000); 67 } 68 69 /** 70 * 聲明輸出字段 71 */ 72 @Override 73 public void declareOutputFields(OutputFieldsDeclarer declarer) { 74 //給values中的數(shù)據(jù)起個名字,方便后面的bolt從這個values中取數(shù)據(jù) 75 //fields中定義的參數(shù)和values中傳遞的數(shù)值是一一對應的 76 declarer.declare(new Fields("num")); 77 } 78 79 } 80 81 82 /** 83 * 自定義bolt需要實現(xiàn)baserichbolt 84 * @author Administrator 85 * 86 */ 87 public static class MyBolt extends BaseRichBolt{ 88 private Map stormConf; 89 private TopologyContext context; 90 private OutputCollector collector; 91 92 /** 93 * 和spout中的open方法意義一樣 94 */ 95 @Override 96 public void prepare(Map stormConf, TopologyContext context, 97 OutputCollector collector) { 98 this.stormConf = stormConf; 99 this.context = context; 100 this.collector = collector; 101 } 102 103 int sum = 0; 104 /** 105 * 是bolt中最重要的方法,當spout發(fā)射一個tuple出來,execute也會被調用,需要對spout發(fā)射出來的tuple進行處理 106 */ 107 @Override 108 public void execute(Tuple input) { 109 //input.getInteger(0);//也可以根據(jù)角標獲取tuple中的數(shù)據(jù) 110 Integer value = input.getIntegerByField("num"); 111 sum+=value; 112 System.out.println("和:"+sum); 113 } 114 115 /** 116 * 聲明輸出字段 117 */ 118 @Override 119 public void declareOutputFields(OutputFieldsDeclarer declarer) { 120 //在這沒必要定義了,因為execute方法中沒有向外發(fā)射tuple,所以就不需要聲明了。 121 //如果nextTuple或者execute方法中向外發(fā)射了tuple,那么declareOutputFields必須要聲明,否則不需要聲明 122 } 123 124 } 125 /** 126 * 注意:在組裝topology的時候,組件的id在定義的時候,名稱不能以__開頭。__是系統(tǒng)保留的 127 * @param args 128 */ 129 public static void main(String[] args) { 130 //組裝topology 131 TopologyBuilder topologyBuilder = new TopologyBuilder(); 132 topologyBuilder.setSpout("spout1", new MySpout()); 133 //.shuffleGrouping("spout1"); 表示讓MyBolt接收MySpout發(fā)射出來的tuple 134 topologyBuilder.setBolt("bolt1", new MyBolt()).setNumTasks(2).shuffleGrouping("spout1"); 135 136 //創(chuàng)建本地storm集群 137 /*LocalCluster localCluster = new LocalCluster(); 138 localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());*/ 139 140 //在集群運行 141 String simpleName = StormTopologySum.class.getSimpleName(); 142 try { 143 Config stormConf = new Config(); 144 //stormConf.setNumWorkers(2); 145 StormSubmitter.submitTopology(simpleName, stormConf, topologyBuilder.createTopology()); 146 } catch (AlreadyAliveException e) { 147 e.printStackTrace(); 148 } catch (InvalidTopologyException e) { 149 e.printStackTrace(); 150 } 151 } 152 }

?

轉載于:https://www.cnblogs.com/DreamDrive/p/5786198.html

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的Storm累计求和Demo并且在集群上运行的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 中国妇女做爰视频 | а√天堂www在线天堂小说 | 亚洲伊人久久综合 | 奶水喷溅 在线播放 | 久久y | 久久精品一级片 | 香蕉视频链接 | 亚洲精品在线视频 | 久久久久性 | 日本xxx在线播放 | 亚洲一区二区免费看 | 欧美人妻一区二区三区 | 久久中文字幕高清 | 伊人手机在线视频 | 玖玖色在线 | 快播色图 | 六月综合激情 | 牛牛在线免费视频 | 国产高清视频 | 天天综合天天做天天综合 | 尤物视频在线观看 | 污的视频在线观看 | 又欲又污又肉又黄短文 | 怨女1988国语版在线观看高清 | 国产亚洲一区二区三区四区 | 久久久久久久毛片 | 动漫av在线免费观看 | 91精品国产欧美一区二区 | 亚洲在线不卡 | 久久涩涩 | 天堂va欧美ⅴa亚洲va一国产 | 五月网婷婷 | 秋霞毛片少妇激情免费 | 欧美福利影院 | 狠狠爱视频| 少妇精品无码一区二区免费视频 | 国产精品福利视频 | 激情高潮呻吟抽搐喷水 | av一级网站 | 亚洲国产精品久久久久婷婷老年 | 国产一二三区精品 | 波多野结衣在线影院 | 久久91精品国产 | 靠逼视频免费网站 | 久久a级片 | 久久精品免费一区二区 | 天天干夜操 | 三年电影在线观看 | www黄色av| 久久中文免费视频 | 美女隐私黄www网站动漫 | 青草精品视频 | 人人揉人人 | 黄色片免费视频 | 国产热视频 | 国产永久免费视频 | 春意影院福利社 | 手机看片久久 | 91av一区二区三区 | 亚洲国产成人精品91久久久 | 69xx欧美| 精品视频在线免费观看 | 日本一本在线观看 | 婷婷在线影院 | 久久久全国免费视频 | 亚洲好骚综合 | 欧洲成人av| 玩弄人妻少妇500系列视频 | 日日弄天天弄美女bbbb | 91色影院 | 天天射日日 | 成年人免费在线 | 亚洲aⅴ乱码精品成人区 | 在线天堂6 | 亚洲污网站 | 亚洲欧美激情小说另类 | 免费人妻一区二区三区 | 在线日韩中文字幕 | 日韩av在线播放一区 | 国产精品一区二区无码免费看片 | 天天操网 | 久久午夜鲁丝片午夜精品 | 精品国产欧美一区二区三区成人 | 国产综合视频一区二区 | 99精品无码一区二区 | 国产女主播在线播放 | 欧美亚洲高清 | 国产亚洲色婷婷久久99精品 | 蜜桃av在线播放 | 男男肉耽高h彩漫 | 国产亚洲精品久久久久久青梅 | 好吊视频一区二区三区四区 | 高跟鞋调教—视频|vk | 国产精品手机在线观看 | 国产精品19p | 亚洲区小说区图片区 | 亚洲奶汁xxxx哺乳期 | 亚洲午夜精品 | 大陆极品少妇内射aaaaaa |