日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Strom+Kafka + redis实时计算单词出现频率的案例

發布時間:2024/9/27 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Strom+Kafka + redis实时计算单词出现频率的案例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

案例要實現的目標

在Kafka的shell 客戶端中輸入內容,通過Storm實時去kafka中取數據并進行計算單詞出現的次數,并且實時把這些數據信息存儲到redis中。

代碼編寫

編寫Pom文件,代碼如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.storm.kafkastormredis</groupId><artifactId>kafkastormredis</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><!--<scope>provided</scope>--><version>1.1.0</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>1.1.0</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!--告訴運行的主類是哪個,注意根據自己的情況,下面的包名做相應的修改--><mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build> </project>

在strom案例中需要有spout接收數據。在一些常規學習用的案例中通常從一個文件中獲取數據。通常的代碼如下:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/20.*/import org.apache.commons.lang.StringUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields;import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.List; import java.util.Map;/*** 這個類是模擬從文件中讀取數據的代碼。在本案例的strom + kafka + redis的案例中將用不到。** @author tuzq* @create 2017-06-20 23:41*/ public class MyLocalFileSpout extends BaseRichSpout {private SpoutOutputCollector collector;private BufferedReader bufferedReader;/*** 初始化方法* @param map* @param context* @param collector*/@Overridepublic void open(Map map, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;try {this.bufferedReader = new BufferedReader(new FileReader(new File("E:/wordcount/input/1.txt")));} catch (Exception e) {e.printStackTrace();}}/*** Strom實時計算的特性就是對數據一條一條的處理* while(true) {* this.nextTuple();* }*/@Overridepublic void nextTuple() {//每被調用一次就會發送一條數據出去try {String line = bufferedReader.readLine();if (StringUtils.isNotBlank(line)) {List<Object> arrayList = new ArrayList<Object>();arrayList.add(line);collector.emit(arrayList);}} catch(Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("juzi"));}}

在spout編寫完成之后,通常通過Bolt來進行文本的切割。在下面的切割代碼中,模擬的是從kafka中獲取數據,并進行切割。代碼如下:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;/*** 這個Bolt模擬從kafkaSpout接收數據,并把數據信息發送給MyWordCountAndPrintBolt的過程。** @author tuzq* @create 2017-06-21 9:14*/ public class MySplitBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {//1、數據如何獲取//如果StormTopologyDriver中的spout配置的是MyLocalFileSpout,則用的是declareOutputFields中的juzi這個key//byte[] juzi = (byte[]) input.getValueByField("juzi");//2、這里用這個是因為StormTopologyDriver這個里面的spout用的是KafkaSpout,而KafkaSpout中的declareOutputFields返回的是bytes,所以下面用bytes,這個地方主要模擬的是從kafka中獲取數據byte[] juzi = (byte[]) input.getValueByField("bytes");//2、進行切割String[] strings = new String(juzi).split(" ");//3、發送數據for (String word : strings) {//Values對象幫我們生成一個listcollector.emit(new Values(word,1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","num"));} }

對文本信息進行切割之后,需要對數據進行統計,這里使用另外一個Bolt來完成,代碼如下:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; import redis.clients.jedis.Jedis;import java.util.HashMap; import java.util.Map;/*** 用于統計分析,并且把統計分析的結果存儲到redis中。** @author tuzq* @create 2017-06-21 9:22*/ public class MyWordCountAndPrintBolt extends BaseBasicBolt {private Jedis jedis;private Map<String,String> wordCountMap = new HashMap<String,String>();@Overridepublic void prepare(Map stormConf, TopologyContext context) {//連接redis---代表可以連接任何事物jedis = new Jedis("hadoop11",6379);super.prepare(stormConf,context);}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String word = (String) input.getValueByField("word");Integer num = (Integer) input.getValueByField("num");//1、查看單詞對應的value是否存在Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word));if (integer == null || integer.intValue() == 0) {wordCountMap.put(word,num + "");} else {wordCountMap.put(word,(integer.intValue() + num) + "");}//2、保存到redisSystem.out.println(wordCountMap);//redis key wordcount:-->Mapjedis.hmset("wordcount",wordCountMap);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {//todo 不需要定義輸出的字段} }

接下來通過一個Driver串聯起Spout、Bolt實現實時計算,代碼如下:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.topology.TopologyBuilder;/*** 這個Driver使Kafka、strom、redis進行串聯起來。** 這個代碼執行前需要創建kafka的topic,創建代碼如下:* [root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount** 接著還要向kafka中傳遞數據,打開一個shell的producer來模擬生產數據* [root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount* 接著輸入數據** @author tuzq* @create 2017-06-21 9:39*/ public class StormTopologyDriver {public static void main(String[] args) throws Exception {//1、準備任務信息TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("KafkaSpout",new KafkaSpout(new SpoutConfig(new ZkHosts("hadoop11:2181"),"wordCount","/wordCount","wordCount")),2);topologyBuilder.setBolt("bolt1",new MySplitBolt(),4).shuffleGrouping("KafkaSpout");topologyBuilder.setBolt("bolt2",new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");//2、任務提交//提交給誰?提交內容Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();//本地模式LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("wordcount",config,stormTopology);//集群模式//StormSubmitter.submitTopology("wordcount1",config,stormTopology);} }

運行程序
1、啟動Kafka集群,啟動方式參考博文:
http://blog.csdn.net/tototuzuoquan/article/details/73430874
2、啟動redis,啟動和安裝方式參考博文:
http://blog.csdn.net/tototuzuoquan/article/details/43611535
3、在kafka上創建topic,參考博文:
http://blog.csdn.net/tototuzuoquan/article/details/73432256
這里我們使用:

//創建kafka的topic

[root@hadoop1 ~]# cd $KAFKA_HOME [root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount

接下來創建producer,來發送數據到kafka:

[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount

在上面輸入數據。

4、運行程序,進入StormTopologyDriver,右鍵run.最后的效果如下:

5、最后如果想看MyWordCountAndPrintBolt中記錄到redis的wordcount內容,可以編寫如下代碼案例:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import redis.clients.jedis.Jedis;import java.util.Map;/*** 代碼說明** @author tuzq* @create 2017-06-21 10:13*/ public class TestRedis {public static void main(String[] args) {Jedis jedis = new Jedis("hadoop11",6379);Map<String,String> wordcount = jedis.hgetAll("wordcount");System.out.println(wordcount);} }

運行后的結果如下:

總結

以上是生活随笔為你收集整理的Strom+Kafka + redis实时计算单词出现频率的案例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 美女污污网站 | 中文字幕人妻伦伦 | 九九精品在线视频 | 中文字幕一区二区人妻痴汉电车 | 美女黄色一级视频 | 亚洲精品.www | 欧美精品123区 | 欧美毛片免费看 | 欧美日本中文字幕 | 成人精品一区二区三区 | gogo人体做爰大胆视频 | 四川黄色一级片 | 亚洲一区二区三区免费看 | 亚洲精品成人久久 | 91视频地址 | 日本一区二区在线播放 | 放几个免费的毛片出来看 | 免费搞黄网站 | 久久免费国产精品 | 国产精品偷伦视频免费观看了 | 国产美女激情视频 | 九九天堂 | 日韩色网站 | 国产精品久久精品三级 | 日本成人免费视频 | 欧美极度另类 | 久热精品在线观看视频 | 亚洲а∨天堂久久精品2021 | 亚洲视频二区 | 久久久一区二区三区四区 | 欧美 日韩 国产 成人 在线 | 国产精品久久久精品三级 | 人人狠狠 | 丰满少妇乱子伦精品看片 | 国产成人精品123区免费视频 | 国产91丝袜在线18 | 中文字字幕第183页 欧美特级一级片 | 欧美精品一区二区三区久久久竹菊 | 亚色av | 国产日视频| 午夜精品久久久久 | 精品日韩在线视频 | 欧美r级在线 | 亚洲一区二区三区婷婷 | 欧美特黄一区二区三区 | 牛牛精品一区 | 国产午夜福利在线播放 | 狠狠干天天 | 下面一进一出好爽视频 | 国产性―交―乱―色―情人 | 视频在线观看99 | 少妇被爽到高潮动态图 | 欧美91av| 亚洲 欧美 激情 小说 另类 | 精品伦精品一区二区三区视频密桃 | 国产精品成人一区二区网站软件 | 国产精品精品国产色婷婷 | 色月婷婷 | 中文字幕精品亚洲 | 99re这里只有精品在线观看 | 欧美欧美欧美 | 日韩少妇一区二区三区 | 欧美xxx在线观看 | 美女福利影院 | 人人艹在线观看 | 无码乱人伦一区二区亚洲 | 国产精品久久久久久久久久久久久久久 | 成人国产精品入口免费视频 | 天天舔天天爱 | 女人裸体无遮挡 | 男女插孔视频 | 国产乱真实合集 | 成年人黄色大全 | 女人的黄色片 | 日日夜夜伊人 | 神马午夜一区二区 | 国产又粗又猛又爽又黄的网站 | 中国美女一级黄色片 | 久久久性色精品国产免费观看 | 日日爽夜夜操 | 天堂男人av | 精品视频日韩 | 激情小说亚洲色图 | 99re在线视频精品 | 中文字幕视频在线观看 | 国产在线播放一区 | 久久久久一级片 | 99re免费视频精品全部 | av性天堂网| 在线观看天堂av | 国产对白刺激视频 | 好吊操这里有精品 | 艳妇臀荡乳欲伦交换在线播放 | 黄色网页免费 | 在线免费观看一区二区 | 中国xxxx性xxxx产国 | 亚洲一区精品在线观看 | 日韩精品在线观看中文字幕 | 91看片免费看 |