日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计

發(fā)布時間:2024/3/24 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

maven先安裝好。

以下講storm-starter的使用。

1、從github下載官方的storm-starter例子包,是maven工程,

? ? ? 地址?https://github.com/nathanmarz/storm-starter

2、把文件解壓復制到workspace目錄下,用cmd命令行,在該文件目錄下運行mvn eclipse:eclipse,生成eclipse所用的文件,使得maven工程變成eclipse可用的工程。

3、導入到eclipse。新建源碼文件夾lesson。把上一節(jié)storm入門案例工程的lesson包,整個復制到storm-starter-master的lesson源碼文件夾下。

4、選中項目右鍵,Run as maven package,用maven打包。在target文件夾下有2個jar包。

storm-starter-0.0.1-SNAPSHOT.jar是不含依賴,只含有工程代碼,較小。

storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar 包含依賴,較大,通常需要由依賴的包。

5、官方提供的例子

?

分組策略(Stream Grouping)

stream grouping 用來定義一個 stream 應該如何分配給 Bolts 上面的多個Tasks,也就是分配給Bolts上面的多個Executors(多線程,并發(fā)度)。

1、Storm 里面有 6種類型的 stream grouping:

注:1)、2)、5)最常用,其他基本不用。

單線程等于All Grouping

1)Shuffle Grouping:隨機分組,隨機派發(fā) stream 里面的 tuple,保證每個 bolt 接收到的 tuple 數(shù)目大致相同。通過輪詢實現(xiàn),保證平均分配。

2)Fields Grouping:按字段分組,比如按 userid 來分組,具有同樣 userid 的 tuple 會被分到相同的 bolts,而不同的 userid 則會被分配到不同的 bolts。

3)All Grouping:廣播發(fā)送,對于每一個 tuple,所有的 bolts 都會收到。

4)Global Grouping:全局分組,這個 tuple 被分配到 storm 中的一個 bolt 的其中一個 task。再具體一點就是分配給 id 值最低的那個 task。

5)None Grouping:不分組,這個分組的意思是說 stream 不關(guān)心到底誰會收到它的 tuple。目前這種分組和 Shuffle Grouping 是一樣的效果,但是多線程下不平均分配。

6)Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個 task 處理這個消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息 tuple 必須使用 emitDirect 方法來發(fā)射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的 task 的 id (OutputCollector.emit 方法也會返回 task 的 id)。

7)Local or Shuffle Grouping:如果目標 bolt 有一個或者多個 task 在同一個工作進程中,tuple 將會被隨機發(fā)送給這些 tasks。否則,和普通的 Shuffle Grouping 行為一致。

2、測試

1)Shuffle Grouping 輪循的方式。

在lesson的Main.java改代碼。把bolt并發(fā)數(shù)改為2,也就是bolt會有2個線程。

// bolt的方法有3個參數(shù),// bolt的id(String類型),實例,并發(fā)數(shù)。大量數(shù)據(jù)場景并行數(shù)設(shè)置大一些// bolt的數(shù)據(jù)來源于spout,名稱要和上文setSpout的id一致,否則不能獲取到數(shù)據(jù)// shuffle發(fā)射規(guī)則,后續(xù)詳講topoBuilder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");

lesson的MyBolt.java改代碼。打印的內(nèi)容增加當前的線程名。

if (null != valStr) {num++;System.out.println(Thread.currentThread().getName() + ", lines : " + num + ", sessionId: " + valStr.split("\t")[1]);}

控制臺打印的內(nèi)容,顯示有2個bolt線程,每個線程接收到的個數(shù)相同。每個線程打印出來的總行數(shù)相加,才等于track.log文件的總行數(shù)。

Thread-22-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-50-bolt, lines : 1, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-22-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-50-bolt, lines : 2, sessionId: 5C3FBA728FD7D264B80769B23 Thread-22-bolt, lines : 3, sessionId: 5D16C1F5191CF9371Y32B58CF Thread-50-bolt, lines : 3, sessionId: 5C16BC4MB91B85661FE22F413 . . . Thread-22-bolt, lines : 23, sessionId: 5GFBAT3D3100A7A7255027A70 Thread-50-bolt, lines : 23, sessionId: 5D16C1EB1C7A751AE03201C3F Thread-50-bolt, lines : 24, sessionId: 5B16C0F7215109AG43528BA2D Thread-22-bolt, lines : 24, sessionId: 5N16C2FE51E5619C2A1244215 Thread-22-bolt, lines : 25, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-50-bolt, lines : 25, sessionId: 5C3FBA728FD7D264B80769B23

2)spout的并發(fā)數(shù)為2,bolt并發(fā)數(shù)為1.

topoBuilder.setSpout("spout", new MySpout(), 2); topoBuilder.setBolt("bolt", new MyBolt(), 1).shuffleGrouping("spout");

控制臺打印的內(nèi)容顯示,只有1個bolt線程,符合預期。但是bolt讀取到的總行數(shù)是track.log文件行數(shù)的2倍。這是因為spout有2個線程,所以track.log被讀取了2次。說明把文件當做spout數(shù)據(jù)來源是,shout的線程數(shù)只能是1.

Thread-23-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-23-bolt, lines : 2, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-23-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-23-bolt, lines : 4, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-23-bolt, lines : 5, sessionId: 5D16C7A886E2P2AE3EA29FC3E . . . Thread-23-bolt, lines : 96, sessionId: 5N16C2FE51E5619C2A1244215 Thread-23-bolt, lines : 97, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-23-bolt, lines : 98, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-23-bolt, lines : 99, sessionId: 5C3FBA728FD7D264B80769B23 Thread-23-bolt, lines : 100, sessionId: 5C3FBA728FD7D264B80769B23

3)Non Grouping

spout并發(fā)數(shù)為1,bolt并發(fā)數(shù)為2.

topoBuilder.setSpout("spout", new MySpout(), 1); topoBuilder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");

控制臺打印內(nèi)容如下,bolt有2個線程,線程名為Thread-23-bolt的計數(shù)器是20,線程名為Thread-50-bolt的計數(shù)器是30。說明在non grouping模式下,是不平均分配的。

Thread-23-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-50-bolt, lines : 1, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-23-bolt, lines : 3, sessionId: 5C3FBA728FD7D264B80769B23 Thread-23-bolt, lines : 4, sessionId: 5D16C1F5191CF9371Y32B58CF Thread-50-bolt, lines : 2, sessionId: 5C16BC4MB91B85661FE22F413 . . . Thread-23-bolt, lines : 17, sessionId: 5D16C1EB1C7A751AE03201C3F Thread-23-bolt, lines : 18, sessionId: 5B16C0F7215109AG43528BA2D Thread-50-bolt, lines : 30, sessionId: 5N16C2FE51E5619C2A1244215 Thread-23-bolt, lines : 19, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-23-bolt, lines : 20, sessionId: 5C3FBA728FD7D264B80769B23

4)Fields Grouping 策略,

? 回顧 fields grouping 策略的作用:

(1)過濾,從源端(spout或上一級bolt)多輸出Fields中選擇某些field;

(2)相同的tuple會分發(fā)給同一個Executor或task處理。

典型場景:去重操作,join(企業(yè)用得不多,需要用到2個數(shù)據(jù)源,且2個數(shù)據(jù)源要同時,不能相差太久)

1個spout,2個bolt。

topoBuilder.setSpout("spout", new MySpout(), 1); // Field grouping有2個參數(shù)。第一個是spout名稱,第二個是field名稱 topoBuilder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("log"));

效果和Non Grouping差不多。

Thread-50-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-50-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-50-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-50-bolt, lines : 4, sessionId: 5C3FBA728FD7D264B80769B23 Thread-50-bolt, lines : 5, sessionId: 5D16C1F5191CF9371Y32B58CF . . . Thread-50-bolt, lines : 26, sessionId: 5B16C0F7215109AG43528BA2D Thread-21-bolt, lines : 22, sessionId: 5N16C2FE51E5619C2A1244215 Thread-21-bolt, lines : 23, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-50-bolt, lines : 27, sessionId: 5C3FBA728FD7D264B80769B23

5)All grouping策略模式

1個spout,2個bolt。

topoBuilder.setSpout("spout", new MySpout(), 1); // all grouping 廣播方式 topoBuilder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");

spout會把每個數(shù)據(jù)分發(fā)給每一個下級的bolt,每個bolt線程獲取到的行數(shù)都是一樣的。開發(fā)時廣播方式不常用。

Thread-23-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-49-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-23-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-49-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E . . . Thread-23-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-49-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-23-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23 Thread-49-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23

6)Global Grouping 全局分組

1的spout,2個bolt。

topoBuilder.setSpout("spout", new MySpout(), 1); // Global Grouping 全局分組 topoBuilder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");

控制臺打印的內(nèi)容,有2個線程,名稱分別為Thread-22-bolt 和 Thread-50-bolt。但是只有序號小的有接收到數(shù)據(jù),序號大的沒有接收到數(shù)據(jù)。Global Grouping 全局分組是把數(shù)據(jù)分配給id值最低的task。

[Thread-22-bolt] INFO backtype.storm.daemon.executor - Prepared bolt bolt:(5) [Thread-50-bolt] INFO backtype.storm.daemon.executor - Prepared bolt bolt:(6) . . . Thread-22-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-22-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-22-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E . . . Thread-22-bolt, lines : 47, sessionId: 5B16C0F7215109AG43528BA2D Thread-22-bolt, lines : 48, sessionId: 5N16C2FE51E5619C2A1244215 Thread-22-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-22-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23

?

思考:讀取文件案例思考

? ? ? 示例中用的是storm讀取文件,把文件作為數(shù)據(jù)源,在企業(yè)中很少見。storm是分布式應用,數(shù)據(jù)會分發(fā)到每一臺supervisor執(zhí)行,讀本地文件只在一臺機器上。

1)Spout數(shù)據(jù)源可以是數(shù)據(jù)庫、文件、MQ(比如:Kafka)。
2)數(shù)據(jù)源是數(shù)據(jù)庫:只適合讀取數(shù)據(jù)庫的配置文件,但不能讀取增量數(shù)據(jù)。
3)數(shù)據(jù)源是文件:只適合測試、講課用(因為集群是分布式集群),其他無用。
4)企業(yè)產(chǎn)生的 log 文件處理步驟:
??(1)讀出內(nèi)容寫 入MQ
??(2)Storm 再處理

讀文件案例說明:

1)分布式應用無法讀文件;

2)spout無法并發(fā)讀,開并發(fā)會重復讀。

?

并發(fā)度場景分析

場景分析:

單線程下:加減乘除(其實什么都可以做),和任何類進行操作。

多線程下:可以做局部加減乘除,不適合做全部加減乘除。

多線程下適合:

a、局部加減乘除

b、做處理類Operate,如split

c、持久化,如入DB

?

?

官方案例

1、統(tǒng)計單詞?WordCountTopology,在storm-starter-master工程的目錄如下。

為了便于理解,對官方的案例進行改寫。在源碼文件夾lesson下,創(chuàng)建包WordCount,復制WordCountTopology.java到WordCount包。

2、程序分析:

(1)有1個spout,有2個bolt。

(2)MyRandomSentenceSpout多次發(fā)送數(shù)據(jù),nextTuple函數(shù)的實現(xiàn)是,每次發(fā)送的數(shù)據(jù)相同。

(3)MySplit,接收到spout的數(shù)據(jù),數(shù)據(jù)都是字符串,對字符串進行分割。

(4)WordCount,接收從split發(fā)射的數(shù)據(jù),都是單個字符,統(tǒng)計每個字符的個數(shù)。

3、各個程序代碼和運行結(jié)果

(1)主程序WordCountTopology,用于統(tǒng)計單詞個數(shù)的bolt程序WordCount

package WordCount;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout;import java.util.HashMap; import java.util.Map;/*** This topology demonstrates Storm's stream groupings and multilang capabilities.*/ public class WordCountTopology {public static class SplitSentence extends ShellBolt implements IRichBolt {public SplitSentence() {super("python", "splitsentence.py");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}// 統(tǒng)計每個單詞出現(xiàn)的次數(shù)public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null)count = 0;count++;counts.put(word, count);//打印出當前線程名稱,單詞 和 個數(shù)System.out.println(Thread.currentThread().getName() + ", word = " + word + ", count = " + count);collector.emit(new Values(word, count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new MyRandomSentenceSpout(), 1);builder.setBolt("split", new MySplit(" "), 8).shuffleGrouping("spout");builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());// Thread.sleep(10000); // // cluster.shutdown();}} }

(2)spout程序,MyRandomSentenceSpout。定義一個字符串數(shù)組,{ "a b c d", "e f g h", "i j k l"},為了便于觀察,所以每個字符都不重復。每個字符串的字符由空格隔開,每個字符串逐個發(fā)送。

package WordCount;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;import java.util.Map; import java.util.Random;public class MyRandomSentenceSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;SpoutOutputCollector _collector;Random _rand;// 單詞字符串,由空格隔開String[] sentences = new String[]{ "a b c d", "e f g h", "i j k l"};@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_rand = new Random();}@Overridepublic void nextTuple() {for (String sentence : sentences) {_collector.emit(new Values(sentence));}// 睡眠10秒鐘Utils.sleep(10 * 1000);}@Overridepublic void ack(Object id) {}@Overridepublic void fail(Object id) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

(3)bolt程序,MySplit,接收字符串,分割成單個單詞。

package WordCount;import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.FailedException; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;/*** IBasic開頭的不需要寫ack方法。會自動調(diào)用ack方法* @description bolt,分割單詞. * @author whiteshark* @date 2019年6月30日 下午11:46:02*/ public class MySplit implements IBasicBolt{private String pattern;public MySplit(String pattern) {this.pattern = pattern;}/*** 每個bolt 和 spout 最好序列化,免得開高并發(fā)出錯*/private static final long serialVersionUID = 1L;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {try {String sentence = input.getString(0);if (null != sentence){// 字符串由空格隔開,用split分割成單個字符for (String word : sentence.split(pattern)) {collector.emit(new Values(word));}}// IBasic開頭的不需要寫ack方法。執(zhí)行成功會自動調(diào)用ack方法// 如果拋出FailedException異常,失敗了也會通知} catch (FailedException e) {e.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}

(4)運行主程序WordCountTopology,在控制臺有打印bolt線程名稱,單詞,單詞個數(shù)。spout運行了2次,所有發(fā)送了2次數(shù)據(jù)。可以看到,由于在bolt程序WordCount使用了fields,每個線程兩次處理的字符都相同。

驗證了上文在Field Grouping策略中提到的,相同的tuple會分發(fā)給同一個Executor或task處理。

Thread-16-count, word = e, count = 1 Thread-16-count, word = h, count = 1 Thread-16-count, word = k, count = 1 Thread-16-count, word = b, count = 1 Thread-18-count, word = f, count = 1 Thread-18-count, word = i, count = 1 Thread-18-count, word = l, count = 1 Thread-18-count, word = c, count = 1 Thread-20-count, word = g, count = 1 Thread-20-count, word = j, count = 1 Thread-20-count, word = a, count = 1 Thread-20-count, word = d, count = 1Thread-16-count, word = b, count = 2 Thread-16-count, word = e, count = 2 Thread-16-count, word = h, count = 2 Thread-16-count, word = k, count = 2 Thread-18-count, word = f, count = 2 Thread-18-count, word = c, count = 2 Thread-18-count, word = i, count = 2 Thread-18-count, word = l, count = 2 Thread-20-count, word = a, count = 2 Thread-20-count, word = g, count = 2 Thread-20-count, word = d, count = 2 Thread-20-count, word = j, count = 2

?

并發(fā)度

在Storm中,一個task可以簡單的理解為在集群某節(jié)點上運行的一個spout或者bolt實例。在集群運行運行中,topology主要有四個組成部分:他們從低到高分別是:task(bolt/spout實例)、Executor(線程)、Workers(JVM虛擬機)、Nodes(服務(wù)器)

各個部分的含義如下:

(1)Nodes(服務(wù)器):是指配置在一個Storm集群中的服務(wù)器,會執(zhí)行topology的一部分運算。一個Storm集群可以包括一個或者多個工作node。

(2)Workers(JVM虛擬機):是指一個node節(jié)點服務(wù)器上相互獨立運行的JVM進程。每一個node可以配置運行一個或者多個worker。一個topology會分配到一個或者多個worker上運行。

(3)Executor(線程):是指一個worker的JVM進程中運行的Java線程。多個task可以指派給同一個executor來執(zhí)行。除非是明確指定,Storm默認會給每一個executor分配一個task。

(4)Task(bolt/spout實例):task是spout和bolt的實例,里面的nextTuple()和execute()方法會被executors線程調(diào)用執(zhí)行。

?

并發(fā)度:用戶指定一個任務(wù),可以被多個線程執(zhí)行,并發(fā)度的數(shù)量等于線程 executor 的數(shù)量。
task 就是具體的處理邏輯對象,一個 executor 線程可以執(zhí)行一個或多個 tasks,但一般默認每個 executor 只執(zhí)行一個 task,所以我們往往認為 task 就是執(zhí)行線程,其實不是。
task 代表最大并發(fā)度,一個 component 的 task 數(shù)是不會改變的,但是一個 componet 的 executer 數(shù)目是會發(fā)生變化的(storm rebalance 命令),task 數(shù) >= executor 數(shù),executor 數(shù)代表實際并發(fā)數(shù)。

?

結(jié)構(gòu)圖如下:

WordCountTopology 統(tǒng)計單詞的案例,包含的的spout和bolt如下,

(1)spout,類名為SentenceSpout,產(chǎn)生字符串。

(2)bolt,類名為SplitSentence,分割字符串為單詞。

(3)bolt,類名為WordCount,統(tǒng)計單詞。

(4)bolt,類名為ReportBolt,報告單詞統(tǒng)計。

設(shè)置不同的線程數(shù)和任務(wù)數(shù),看并發(fā)圖

(1)默認情況下,每個 spout / bolt 的并發(fā)度(executor)是1,任務(wù)(task)也是1。

builder.setSpout(SENTENCE_SPOUT_ID, spout); // SentenceSpout --> SplitSentenceBolt builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID); // SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); // WordCountBolt --> ReportBolt builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

并發(fā)圖如下。唯一的并發(fā)機制出現(xiàn)在線程級,每個任務(wù)在同一個JVM的不同線程中運行。如何增加并發(fā)度以充分利用硬件能力?讓我們來增加分配給topology 的 worker 和 executer 的數(shù)量。

(2)把 SentenceSpout 的并發(fā)度設(shè)置為2,worker不變。

//這個2指的是有兩個executor,雖然沒有顯示指定task的數(shù)量, //1個executor至少有1個task。因為executor為2,默認task也就是2 builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);

并發(fā)圖如下。SentenceSpout 有2個線程,每個線程有1個任務(wù)。

(3)配置worker數(shù)量為2,SplitSentence設(shè)置為4個task和2個executor。WordCount設(shè)置為4個executor。

Config config = new Config(); config.setNumWorkers(2); builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID); // SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

并發(fā)圖如下:

?

?

網(wǎng)站瀏覽量和用戶數(shù)統(tǒng)計

PV-UV案例需求分析

網(wǎng)站最常用的2個指標:

PV(page views):count(session_id)

UV(user views):count(distinct session_id)

多線程下,注意線程安全問題。

一、PV統(tǒng)計

方案分析

如下是否可行?

1、定義static long pv,Synchronized控制累計操作。

Synchronized和Lock在單JVM下有效,但在多JVM下無效。

可行的兩個方案:

1、shuffleGrouping下,pv * executor并發(fā)數(shù)。比較簡單,但只能局限于shuffleGrouping,且會有中間數(shù)據(jù)。

2、bolt1進行多并發(fā)局部匯總,bolt2單線程進行全局匯總。這種方式可行,推薦這種方式。

?線程安全:多線程處理的結(jié)果和單線程一致,就是線程安全。否則不安全。

?

案例代碼

準備數(shù)據(jù):track.log文件有50行數(shù)據(jù)。每行有3列,分別為網(wǎng)站,sessionId,時間。列與列由tab隔開。如下圖。

www.taobao.com 5CFBA5BD76BACF436ACA9DCC8 2019-06-29 11:01:20 www.taobao.com 5D16C7A886E2P2AE3EA29FC3E 2019-06-29 08:01:36 www.taobao.com 5D16C7A886E2P2AE3EA29FC3E 2019-06-29 10:51:27

(1)程序的啟動類。1個spout,4個bolt1,1個匯總的sumbolt。

package visits;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder;/*** 程序的啟動類* @description * @author whiteshark* @date 2019年7月6日 下午5:13:42*/ public class Main {public static void main(String[] args) {// 1.創(chuàng)建topology, 拓撲對象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 設(shè)置spout,bolt// setSpout方法的3個參數(shù)分別為,// spout的id(string類型),實例,并發(fā)數(shù)。大量數(shù)據(jù)場景并行數(shù)設(shè)置大一些topoBuilder.setSpout("spout", new MySpout(), 1);// shuffle Grouping 分組topoBuilder.setBolt("bolt", new PVBolt1(), 4).shuffleGrouping("spout");// 匯總topoBuilder.setBolt("sumbolt", new PVSumBolt(), 1).shuffleGrouping("bolt");// 3. 設(shè)置works個數(shù)Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3個參數(shù)分別為:拓撲名稱,stormconfig配置,拓撲實例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}} }

(2)spout,從track.log讀取每一行,發(fā)送到下一級bolt。

package visits;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/*** * @description Spout,讀取數(shù)據(jù),一行一行的發(fā)送到下一級處理* @author whiteshark* @date 2019年6月30日 上午9:28:17*/ public class MySpout implements IRichSpout{private static final long serialVersionUID = 1L;private FileInputStream fis;private InputStreamReader isr;private BufferedReader br;private String str = null;SpoutOutputCollector collector = null;@Overridepublic void nextTuple() {try {while ((str = this.br.readLine()) != null) {/*在這可以對數(shù)據(jù)進行加工或過濾*/// 發(fā)射數(shù)據(jù)collector.emit(new Values(str));// 為了在控制臺觀察打印出來的數(shù)據(jù),這里暫停Thread.sleep(500);}} catch (Exception e) {e.printStackTrace();}}// 在提交作業(yè)時,會把storm.yaml的所有項讀取到Map中,在open方法中如果修改map的值,// 會覆蓋原來storm.yaml所定義的值,@Overridepublic void open(Map conf, TopologyContext content, SpoutOutputCollector collector) {this.collector = collector;try {this.fis = new FileInputStream("track.log");this.isr = new InputStreamReader(fis, "UTF-8");this.br = new BufferedReader(isr);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 定義發(fā)射數(shù)據(jù)的格式declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}// 下一級成功應答ack,打印內(nèi)容@Overridepublic void ack(Object msgId) {System.out.println("spout ack: " + msgId.toString());}@Overridepublic void activate() {// TODO Auto-generated method stub}// 資源關(guān)閉@Overridepublic void close() {try {br.close();isr.close();fis.close();} catch (Exception e) {e.printStackTrace();}}@Overridepublic void deactivate() {// TODO Auto-generated method stub}// 下一級失敗應答ack,打印內(nèi)容@Overridepublic void fail(Object msgId) {System.out.println("spout fail: " + msgId.toString());} }

(3)bolt1,在main中設(shè)置有4個線程。接收從spout發(fā)射的每一行,分割成3個字段。并統(tǒng)計第二個字段(sessionId)的個數(shù),sessionId的個數(shù)和當前線程序號,一起傳遞到sumBolt做最后的匯總。

package visits;import java.util.Map;import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;public class PVBolt1 implements IRichBolt{private static final long serialVersionUID = 1L;OutputCollector collector;// 初始化方法@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;}String logString = null;String sessionId = null;Integer pv = 0;@Overridepublic void execute(Tuple input) {logString = input.getString(0);sessionId = logString.split("\t")[1];if (null != sessionId) {pv++;}// 第一個值是當前線程ID,第二個值是瀏覽次數(shù)collector.emit(new Values(Thread.currentThread().getId(), pv));System.out.println(Thread.currentThread().getName() + ", pv = " + pv);}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId", "pv"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;} }

(4)PVSumBolt,做最后的匯總。從上一級接收線程序號和sessionId個數(shù),保存和更新到map中。遍歷map,累加所有的個數(shù)。得出總的個數(shù)。

package visits;import java.util.HashMap; import java.util.Map;import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple;public class PVSumBolt implements IRichBolt{private static final long serialVersionUID = 1L;Map<Long, Integer> counts = new HashMap<Long, Integer>();@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {}@Overridepublic void execute(Tuple input) {Long threadId = input.getLong(0);Integer pv = input.getInteger(1);counts.put(threadId, pv);long wordSum =0;// 獲取總數(shù),遍歷counts的values,進行累加for (Map.Entry<Long, Integer> count : counts.entrySet()) {wordSum += count.getValue();}System.out.println(Thread.currentThread().getName() + ", wordSum = " + wordSum);}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;} }

(5)輸出結(jié)果:可以看出,有4個bolt1線程,只有1個sumbolt匯總結(jié)果。

Thread-26-bolt, pv = 1 Thread-57-sumbolt, wordSum = 1 Thread-53-bolt, pv = 1 Thread-57-sumbolt, wordSum = 2 Thread-23-bolt, pv = 1 Thread-57-sumbolt, wordSum = 3 Thread-55-bolt, pv = 1 Thread-57-sumbolt, wordSum = 4 . . . Thread-53-bolt, pv = 11 Thread-57-sumbolt, wordSum = 41 Thread-23-bolt, pv = 11 Thread-57-sumbolt, wordSum = 42 Thread-55-bolt, pv = 11 Thread-57-sumbolt, wordSum = 43 Thread-26-bolt, pv = 11 Thread-57-sumbolt, wordSum = 44 Thread-23-bolt, pv = 12 Thread-57-sumbolt, wordSum = 45 Thread-55-bolt, pv = 12 Thread-57-sumbolt, wordSum = 46 Thread-26-bolt, pv = 12 Thread-57-sumbolt, wordSum = 47 Thread-53-bolt, pv = 12 Thread-57-sumbolt, wordSum = 48 Thread-26-bolt, pv = 13 Thread-57-sumbolt, wordSum = 49 Thread-53-bolt, pv = 13 Thread-57-sumbolt, wordSum = 50

?

?

PV-UV案例優(yōu)化引入Zookeeper鎖控制線程操作

匯總型方案:

1、在shuffleGrouping下,pv(單線程結(jié)果)*Executer并發(fā)數(shù),一個Executer默認一個task,如果設(shè)置Task數(shù)大于1,公式應該是 pv(單線程結(jié)果)*Task數(shù)。同一個Executer下task的線程ID相同,taskId不同。

優(yōu)點:簡單、計算量小

缺點:存在有一點誤差,但大部分場景能接受。

優(yōu)化:

案例PVBolt中每個Task都會輸出一個匯總值,實際只需要一個Task輸出匯總值。利用Zookeeper鎖來做到只有一個Task輸出匯總值,而且每5秒輸出一次。

2、bolt1進行多次并發(fā)局部匯總,bolt2單線程進行全局匯總。

優(yōu)點:

(1)計算絕對準確;

(2)如果用fieldGrouping可以得到中間值,如單個user的訪問PV(訪問深度,也是有用的指標)

缺點:計算量稍大,且多一個Bolt。

?

預處理:現(xiàn)在虛擬機hadoop-senior和hadoop-senior02啟動zookeeper集群。先創(chuàng)建"/lock”目錄,再創(chuàng)建"/lock/storm”目錄,保存的值均為空。

?

案例代碼:

(1)SourceSpout用于產(chǎn)生源數(shù)據(jù)。每次產(chǎn)生100行字符串,每行字符串的格式均為:域名 + "\t" + sessionId + "\t" +時間

package lock;import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/*** * @description 生成數(shù)據(jù)* @author whiteshark* @time 2019年6月29日 上午9:45:40* @version*/ public class SourceSpout implements IRichSpout{private static final long serialVersionUID = 1L;SpoutOutputCollector collector;Queue<String> queue = new ConcurrentLinkedQueue<String>();String str = null;@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 1");this.collector = collector;Random random = new Random();// 登錄的網(wǎng)站是taobaoString hosts = "www.taobao.com";//每次登錄的session idString[] sessionId = {"5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8", "5D16C3E0209C16DEAA28L1824","5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413","5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E","5C3FBA728FD7D264B80769B23", "5B16C0F7215109AG43528BA2D","5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F"};//登錄的時間String[] times = {"2019-06-29 08:01:36", "2019-06-29 08:11:37", "2019-06-29 08:31:38", "2019-06-29 09:23:07", "2019-06-29 10:51:27", "2019-06-29 10:51:56","2019-06-29 11:01:07", "2019-06-29 11:01:20", "2019-06-29 11:45:30","2019-06-29 12:31:49", "2019-06-29 12:41:51", "2019-06-29 12:51:37", "2019-06-29 13:11:27", "2019-06-29 13:20:40", "2019-06-29 13:31:38",};for (int i = 0; i < 100; i++) {queue.add(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]);}System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 2");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}@Overridepublic void nextTuple() {// System.out.println("SourceSpout nextTuple");if (queue.size() >= 0) {this.collector.emit(new Values(queue.poll()));}try {Thread.sleep(200);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void ack(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void fail(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}

(2)PVBolt,在prepare中,會創(chuàng)建一個zookeeper 臨時目錄 "/lock/storm/pv",保存的值為 IP + ":" + taskId,所以只有一個任務(wù)能創(chuàng)建成功該臨時目錄。在execute方法中,每次讀取一行字符串,取sessionId累加。如果是創(chuàng)建zookeeper目錄的任務(wù)task,每隔5秒會輸出總的sessionId個數(shù)。

package lock;import java.net.InetAddress; import java.util.Map;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper;import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple;public class PVBolt implements IRichBolt{private static final long serialVersionUID = 1L;//在zk中創(chuàng)建鎖的路徑public static final String zkPath = "/lock/storm/pv";ZooKeeper zKeeper = null;String lockData = null;OutputCollector collector;String logString = null;String sessionId = null;Integer pv = 0;long beginTime = System.currentTimeMillis();long endTime = 0;// 初始化方法@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) { // this.collector = collector;try {System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 1");// 新建zk客戶端到zk集群。3秒連不到集群算超時. ip地址是虛擬機的地址,2臺虛擬機都有裝zk,并且已經(jīng)啟動,存在目錄"/lock/storm"zKeeper = new ZooKeeper("192.168.178.131:2181,192.168.178.132:2181", 20000, new Watcher(){@Overridepublic void process (WatchedEvent event) {System.out.println("event : " + event.getType());}});System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 2");//如果沒有連接到集群,休眠1秒,讓其重連。如果沒有連接成功,一直等待while (zKeeper.getState() != ZooKeeper.States.CONNECTED) {Thread.sleep(1000);}System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 3");//獲得本機的ipInetAddress address = InetAddress.getLocalHost();//保存在zk路徑中的值,IP + ":" + taskIdlockData = address.getHostAddress() + ":" + context.getThisTaskId();System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 4");// 只是判斷目錄是否存在,不放監(jiān)控watchif (null == zKeeper.exists(zkPath, false)) {//如果不存在該目錄,創(chuàng)建一個臨時目錄節(jié)點zKeeper.create(zkPath, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - create " + zkPath);}System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 5");} catch (Exception e) {try {zKeeper.close();} catch (InterruptedException e1) {e1.printStackTrace();}}}@Overridepublic void execute(Tuple input) {try {logString = input.getString(0);if (logString != null) {endTime = System.currentTimeMillis();sessionId = logString.split("\t")[1];if (null != sessionId) {pv++;}// 第一個值是當前線程ID,第二個值是瀏覽次數(shù) // collector.emit(new Values(Thread.currentThread().getId(), pv));// 每5秒打印一次if (endTime - beginTime >= 5000) {System.err.println(lockData + "============================="); // System.out.println("lockData = " + new String(zKeeper.getData(zkPath, false, null)));if (lockData.equals(new String(zKeeper.getData(zkPath, false, null)))) {System.out.println("pv ====================== " + pv * 4);}beginTime = System.currentTimeMillis();}}} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) { // declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;} }

(3)PVTopo,設(shè)置spout并發(fā)數(shù)為1,bolt并發(fā)數(shù)為4,并且策略為shuffle Grouping,使得數(shù)據(jù)均勻分配到每個bolt。

package lock;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder;/*** 程序的啟動類* @description * @author whiteshark* @date 2019年7月6日 下午5:13:42*/ public class PVTopo {public static void main(String[] args) {// 1.創(chuàng)建topology, 拓撲對象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 設(shè)置spout,bolt// setSpout方法的3個參數(shù)分別為,// spout的id(string類型),實例,并發(fā)數(shù)。大量數(shù)據(jù)場景并行數(shù)設(shè)置大一些topoBuilder.setSpout("spout", new SourceSpout(), 1);// shuffle Grouping 分組,并發(fā)數(shù)為4topoBuilder.setBolt("bolt", new PVBolt(), 4).shuffleGrouping("spout");// 3. 設(shè)置works個數(shù)Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3個參數(shù)分別為:拓撲名稱,stormconfig配置,拓撲實例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}} }

輸入的結(jié)果為:

192.168.43.105:6============================= 192.168.43.105:8============================= 192.168.43.105:5============================= pv ====================== 16 192.168.43.105:7============================= 192.168.43.105:8============================= 192.168.43.105:6============================= 192.168.43.105:7============================= 192.168.43.105:5============================= pv ====================== 44 192.168.43.105:8============================= 192.168.43.105:6============================= 192.168.43.105:7============================= 192.168.43.105:5============================= pv ====================== 72 192.168.43.105:8============================= 192.168.43.105:6============================= 192.168.43.105:7============================= 192.168.43.105:5============================= pv ====================== 100

如果程序不關(guān)閉,在zookeeper能看到臨時目錄"/lock/storm/pv"保存的值(IP + ":" + taskId):

[zk: localhost:2181(CONNECTED) 18] get /lock/storm/pv 192.168.43.105:5 cZxid = 0x2000000050 ctime = Tue Jul 30 00:02:39 CST 2019 mZxid = 0x2000000050 mtime = Tue Jul 30 00:02:39 CST 2019 pZxid = 0x2000000050 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x16c3e672803000f dataLength = 16 numChildren = 0

?

打包發(fā)布到虛擬機storm集群運行

上面的案例是在本地機子上運行。接下來要在storm集群運行。

1、工程打包,工程右鍵 Run AS,點擊Maven install,最終控制臺顯示“BUILD SUCCESS”,說明打包成功。

在target目錄下,有2個包,名稱帶有depedencies的是有依賴,需要的是這個包。

用Filezilla 把包發(fā)送到虛擬機hadoop-senior的目錄/opt/datas/stormjars下

2、啟動虛擬機的storm集群。

先啟動hadoop-senior虛擬機和hadoop-senior02虛擬機上的zookeeper。

在hadoop-senior虛擬機啟動storm作為主節(jié)點,并啟動ui。nohup是后臺啟動,關(guān)閉窗口不會停止該storm節(jié)點

nohup ./storm nimbus & nohup ./storm ui &

在hadoop-senior02虛擬機啟動storm作為從節(jié)點。

nohup ./storm supervisor &

在瀏覽器http://hadoop-senior:8081,可以看到集群的監(jiān)控界面

3、提交拓撲任務(wù)

在hadoop-senior的storm安裝目錄下,進入bin目錄。輸入命令提交拓撲任務(wù)到集群

./storm jar /opt/datas/stormjars/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar lock.PVTopo PVTopo

包是在/opt/datas/stormjars下,所以要加上前綴。后面帶有2個參數(shù),參數(shù)1是類名,類名前綴要加上具體的包,參數(shù)2是拓撲名,拓撲名必須唯一。這里參數(shù)1是lock.PVTopo,參數(shù)2是PVTopo。

輸入提交拓撲任務(wù)的命令后,截取部分控制臺的輸出腳本,可以看到已經(jīng)完成提交。

在監(jiān)控界面可以看到,新增一個拓撲,名稱為PVTopo,狀態(tài)為active。

點擊拓撲名稱,進入查看拓撲詳情,可以看到有1個spout,4個bolt。spout發(fā)送了2060條數(shù)據(jù),4個bolt總共接收了2060條數(shù)據(jù)。

4、查看運行結(jié)果

點擊bolt進入查看bolt詳情,4個bolt都在hadoop-senior02虛擬機的storm上運行,端口是work占用。我們在bolt程序中,有輸出總的sessionid個數(shù),有4個bolt,只有1個bolt有輸出總數(shù)。如果要查看每一個bolt的log文件,這樣要耗時。

我們在zookeeper中,保存在"/lock/storm/pv"的值是 IP + taskId,所以找到taskid,就知道在哪個虛擬機和端口。

從圖片中可以看到,有輸出sessionid總數(shù)的taskid是6,輸出的log日志在hadoop-senior02虛擬機logs目錄下,端口為6707.

因為4個bolt都是在hadoop-senior02虛擬機上運行,所以在hadoop-senior02虛擬機storm目錄logs目錄下,可以看到各個bolt的log文件。查看logs目錄下的文件列表。4個bolt對應的log文件。每個bolt的log文件由端口號來識別。

有輸出sessionid總數(shù)的log是 worker-6706.log,查看文件,有輸出總數(shù)。上文中,soput發(fā)送和bolt接收的數(shù)量都遠大于這個數(shù),是因為發(fā)送的有空字符串。sessionid統(tǒng)計的是非空。

5、停止拓撲任務(wù)

有2中方法。分別是監(jiān)控界面操作,命令行操作。

(1)界面操作(沒有權(quán)限控制,只要能登陸進這個界面就能操作,所以不推薦),所有點deactivate,使任務(wù)停止(并沒有立即停止,而是處理完正在運行中的程序),此時任務(wù)狀態(tài)為inactivate。再點擊kill,任務(wù)停止運行。

此時任務(wù)狀態(tài)為inactivate,inactivate的下一個操作可以是activate,或者kill。顧名思義,activate是使得任務(wù)繼續(xù)運行,kill是殺死任務(wù),使得任務(wù)完全停止。

點擊kill,完全停止任務(wù)。狀態(tài)變?yōu)閗illed

(2)命令行操作,在storm的bin目錄下,輸入命令

./storm kill PVTopo

?

?

案例升級 計算網(wǎng)站UV(去重計算模式)

方案分析

1、把sessionId放入set實現(xiàn)自動去重,set.size()獲得UV

可行的方案(類似wordcount的計算去重word總數(shù)):

bolt1通過fieldGrouping進行多線程局部匯總,下一級bolt2進行單線程保存sessionId和count數(shù)到map且進行遍歷,可以得到:

PV,UV,訪問深度(每個sessionId的瀏覽數(shù))

簡單、快速,但比較耗內(nèi)存,要求集群資源內(nèi)存多。

適用于中小企業(yè),不適合大企業(yè)。

適用于小數(shù)據(jù)量,如訂單。

2、no-sql分布式數(shù)據(jù)庫,如HBase

通過rowkey實現(xiàn)去重,統(tǒng)計行數(shù)得到去重后的sessionId總數(shù)。

適用于大數(shù)據(jù)量,如統(tǒng)計流量。

?

storm的局限性:

storm應用場景廣泛,但能做的復雜度有限,通常為匯總型。

對源數(shù)據(jù)做預處理,寫入數(shù)據(jù)庫。

?

下文的案例采用第一種方案。

程序分析:

(1)DateFmt,工具類,用于把長日期字符串,轉(zhuǎn)化為短日期字符串(只含有年月日);

(2)SourceSpout ,spout類,1個線程,用于產(chǎn)生源數(shù)據(jù)。每次產(chǎn)生100行字符串,每行字符串的格式均為:域名 + "\t" + sessionId + "\t" +時間;

(3)FmtLogBolt,bolt類,4個線程,接收spout的數(shù)據(jù),截取日期和sessionId發(fā)送到下一級;

(4)DeepVisitBolt,bolt類,4個線程,局部匯總,接收FmtLogBolt的數(shù)據(jù),把接收到的日期和sessionId拼接成以下形式的字符串:日期+ "_" + sessionId。定義map,把日期+ "_" + sessionId作為key,每次進來相同的key,value加1;把日期+ "_" + sessionId,以及對應的value發(fā)送到下一級;

(5)UVSumBolt,bolt類,1個線程,全局匯總,接收DeepVisitBolt的數(shù)據(jù)。統(tǒng)計PV:用戶的瀏覽數(shù)(對用戶不去重),UV:用戶的瀏覽數(shù)(對用戶去重)

(6)UVTopo,topology類,定義spout,bolt類。

案例代碼:

(1)DateFmt,工具類

package user_visit;import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date;/*** 工具類* @author silva**/ public class DateFmt {public static final String DATE_LONG = "yyyy-MM-dd HH:mm:ss";public static final String DATE_SHORT = "yyyy-MM-dd";public static SimpleDateFormat sdf = new SimpleDateFormat(DATE_SHORT);// 返回指定格式的字符串public static String getCountDate(String date, String pattern) {SimpleDateFormat sdf = new SimpleDateFormat(pattern);Calendar cal = Calendar.getInstance();if (null != date) {try {cal.setTime(sdf.parse(date));} catch (Exception e) {e.printStackTrace();}}return sdf.format(cal.getTime());}// 字符串轉(zhuǎn)化為datepublic static Date parseDate(String dateStr) throws Exception {return sdf.parse(dateStr);}public static void main(String[] args) {String dateStr = "2019-06-29 13:11:27";System.out.println("date = " + getCountDate(dateStr, DATE_SHORT));} }

(2)SourceSpout ,spout類,1個線程,用于產(chǎn)生源數(shù)據(jù)。日期要和運行程序為同一個日期,否則最后的出來的結(jié)果是0.

package user_visit;import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/*** * @description 生成數(shù)據(jù)* @author whiteshark* @time 2019年6月29日 上午9:45:40* @version*/ public class SourceSpout implements IRichSpout{private static final long serialVersionUID = 1L;SpoutOutputCollector collector;Queue<String> queue = new ConcurrentLinkedQueue<String>();String str = null;@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 1");this.collector = collector;Random random = new Random();// 登錄的網(wǎng)站是taobaoString hosts = "www.taobao.com";//每次登錄的session idString[] sessionId = {"5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8", "5D16C3E0209C16DEAA28L1824","5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413","5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E","5C3FBA728FD7D264B80769B23", "5B16C0F7215109AG43528BA2D","5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F"};//登錄的時間String[] times = {"2019-08-04 08:01:36", "2019-08-04 08:11:37", "2019-08-04 08:31:38", "2019-08-04 09:23:07", "2019-08-04 10:51:27", "2019-08-04 10:51:56","2019-08-04 11:01:07", "2019-08-04 11:01:20", "2019-08-04 11:45:30","2019-08-04 12:31:49", "2019-08-04 12:41:51", "2019-08-04 12:51:37", "2019-08-04 13:11:27", "2019-08-04 13:20:40", "2019-08-04 13:31:38",};for (int i = 0; i < 100; i++) {queue.add(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]);}System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 2");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}@Overridepublic void nextTuple() {// System.out.println("SourceSpout nextTuple");if (queue.size() >= 0) {this.collector.emit(new Values(queue.poll()));}try {Thread.sleep(200);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void ack(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void fail(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}

(3)FmtLogBolt,bolt類,4個線程

package user_visit;import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;public class FmtLogBolt implements IBasicBolt{/*** */private static final long serialVersionUID = 1L;private String eachLog = null;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date", "sessionId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {eachLog = input.getString(0);if (null != eachLog && eachLog.length() > 0) {// 分別發(fā)送日期,sessionIdcollector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2], DateFmt.DATE_SHORT), eachLog.split("\t")[1]));}}@Overridepublic void cleanup() {// TODO Auto-generated method stub} }

(4)DeepVisitBolt,bolt類,4個線程

package user_visit;import java.util.HashMap; import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;/*** 局部匯總* @author silva**/ public class DeepVisitBolt implements IBasicBolt{/*** */private static final long serialVersionUID = 1L;private Map<String, Integer> counts = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date_sessionId", "count"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String dateStr = input.getString(0);String sessionId = input.getString(1);Integer count = counts.get(dateStr + "_" + sessionId);if (null == count) {count = 0;}count++;counts.put(dateStr + "_" + sessionId, count);// 發(fā)送到下一級,做全局匯總collector.emit(new Values(dateStr + "_" + sessionId, count));}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}

(5)UVSumBolt,bolt類,1個線程

package user_visit;import java.util.HashMap; import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple;/** 全局匯總*/ public class UVSumBolt implements IBasicBolt{private static final long serialVersionUID = 1L;private Map<String, Integer> counts = new HashMap<String, Integer>();private String curDate = null;private long beginTime;@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {try {long PV = 0; // 總數(shù)long UV = 0; // 去重后的個數(shù)long endTime = System.currentTimeMillis();String dateSessionId = input.getString(0);Integer count = input.getInteger(1);// 如果不是以當前日期開頭,并且比當前日期大,說明已經(jīng)跨天到第二天// 需要把map清空,curDate 設(shè)置為新的日期if (!dateSessionId.startsWith(curDate) && DateFmt.parseDate(dateSessionId.split("_")[0]).after(DateFmt.parseDate(curDate))) {curDate = dateSessionId.split("_")[0];counts.clear();}counts.put(dateSessionId, count);if (endTime - beginTime > 1 * 1000) {for (Map.Entry<String, Integer> map : counts.entrySet()) {if (map.getKey().startsWith(curDate)) { // 只統(tǒng)計今天的數(shù)據(jù),過濾非今天的數(shù)據(jù)UV++; // 用戶總數(shù)+1PV += map.getValue(); // 瀏覽數(shù)累加}}System.out.println("UV = " + UV + ", PV = " + PV);}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// 預處理,設(shè)置為當天時間,格式為 yyyy-MM-ddcurDate = DateFmt.getCountDate(null, DateFmt.DATE_SHORT);beginTime = System.currentTimeMillis();} }

(6)UVTopo,topology類,定義spout,bolt類。

package user_visit;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields;/*** 程序的啟動類* @description * @author whiteshark* @date 2019年8月2日 下午1:03:02*/ public class UVTopo {public static void main(String[] args) {// 1.創(chuàng)建topology, 拓撲對象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 設(shè)置spout,bolt// setSpout方法的3個參數(shù)分別為,// spout的id(string類型),實例,并發(fā)數(shù)。大量數(shù)據(jù)場景并行數(shù)設(shè)置大一些topoBuilder.setSpout("spout", new SourceSpout(), 1);// shuffle Grouping 分組,并發(fā)數(shù)為4topoBuilder.setBolt("fmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout");// fields Grouping 分組,并發(fā)數(shù)為4。根據(jù) date + sessionId 分組,相同 date + sessionId 的被放到同一個bolt線程處理topoBuilder.setBolt("deepVisitBolt", new DeepVisitBolt(), 4).fieldsGrouping("fmtLogBolt", new Fields("date", "sessionId"));// shuffle Grouping 分組,并發(fā)數(shù)為1topoBuilder.setBolt("sumBolt", new UVSumBolt(), 1).shuffleGrouping("deepVisitBolt");// 3. 設(shè)置works個數(shù)Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3個參數(shù)分別為:拓撲名稱,stormconfig配置,拓撲實例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}} }

運行程序打印的結(jié)果。用戶的瀏覽數(shù)為100次,有12個不同的用戶訪問。與spout發(fā)送的數(shù)據(jù)一致。

. . . UV = 12, PV = 97 UV = 12, PV = 98 UV = 12, PV = 99 UV = 12, PV = 100

?

總結(jié)

以上是生活随笔為你收集整理的【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。