日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm的StreamID使用样例(版本1.0.2)

發布時間:2025/7/14 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm的StreamID使用样例(版本1.0.2) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

隨手嘗試了一下StreamID的的用法。留個筆記。

?

==數據樣例==

{"Address": "小橋鎮小橋中學對面","CityCode": "511300","CountyCode": "511322","EnterpriseCode": "YUNDA","MailNo": "667748320345","Mobile": "183****5451","Name": "王***","ProvCode": "510000","Weight": "39" }

?

==拓撲結構==

?

==程序源碼==

<Spout1>

package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import common.simulate.DataRandom; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;import java.util.Map;public class Spout1 extends BaseRichSpout {private SpoutOutputCollector _collector = null;private DataRandom _dataRandom = null;private int _timeInterval = 1000;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream1", new Fields("json"));declarer.declareStream("Stream2", new Fields("json"));}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_dataRandom = DataRandom.getInstance();if (conf.containsKey(Constants.SpoutInterval)) {_timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval));}}@Overridepublic void nextTuple() {try {Thread.sleep(_timeInterval);} catch (InterruptedException e) {e.printStackTrace();}JSONObject jsonObject = _dataRandom.getRandomExpressData();System.out.print("[---Spout1---]jsonObject=" + jsonObject + "\n");_collector.emit("Stream1", new Values(jsonObject.toJSONString()));_collector.emit("Stream2", new Values(jsonObject.toJSONString()));} }

?

<CountBolt1>

package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map;public class CountBolt1 extends BaseRichBolt {private OutputCollector _collector = null;private int taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream3", new Fields("company", "count"));}@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {_collector = collector;taskId = context.getThisTaskId();}@Overridepublic void execute(Tuple input) {String str = input.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String company = jsonObject.getString(Constants.EnterpriseCode);int count = 0;if (_map.containsKey(company)) {count = _map.get(company);}count++;_map.put(company, count);_collector.emit("Stream3", new Values(company, count));System.out.print("[---CountBolt1---]" +"taskId=" + taskId + ", company=" + company + ", count=" + count + "\n");} }

?

<CountBolt2>

package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map; import java.util.UUID;public class CountBolt2 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String prov = jsonObject.getString(Constants.ProvCode);int count = 0;if (_map.containsKey(prov)) {count = _map.get(prov);}count++;_map.put(prov, count);_collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));System.out.print("[---CountBolt2---]" +"taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));} }

?

<CountBolt3>

package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map; import java.util.UUID;public class CountBolt3 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String city = jsonObject.getString(Constants.CityCode);int count = 0;if (_map.containsKey(city)) {count = _map.get(city);}count++;_map.put(city, count);_collector.emit("Stream4", new Values(city, count));System.out.print("[---CountBolt3---]" +"taskId=" + _taskId + ", city=" + city + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));} }

?

<TopBolt>

package test;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;import java.util.List; import java.util.Map;public class TopBolt extends BaseRichBolt {@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}@Overridepublic void execute(Tuple tuple) {System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "\n");List<Object> values = tuple.getValues();for(Object value : values) {System.out.print("[---TopBolt---]value=" + value + "\n");}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {} }

?

<TestTopology>

package test;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class TestTopology {public static void main(String[] args)throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("Spout1", new Spout1());builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Top", new TopBolt()).fieldsGrouping("Count1", "Stream3", new Fields("company")).fieldsGrouping("Count2", "Stream4", new Fields("prov")).fieldsGrouping("Count3", "Stream4", new Fields("city"));Config config = new Config();config.setNumWorkers(1);config.put(common.constants.Constants.SpoutInterval, args[1]);if (Boolean.valueOf(args[0])) {StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());} else {LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("TestTopology1", config, builder.createTopology());}} }

?

==結果日志==

[---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小橋鎮小橋中學對面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"} [---CountBolt1---]taskId=1, company=YUNDA, count=1 [---CountBolt3---]taskId=3, city=511300, count=1 [---CountBolt2---]taskId=2, prov=510000, count=1 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=510000 [---TopBolt---]value=1 [---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=511300 [---TopBolt---]value=1 [---TopBolt---]StreamID=Stream3 [---TopBolt---]value=YUNDA [---TopBolt---]value=1

?

轉載于:https://www.cnblogs.com/quchunhui/p/8302192.html

總結

以上是生活随笔為你收集整理的Storm的StreamID使用样例(版本1.0.2)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。