大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度
大數(shù)據(jù)技術(shù)之_17_Storm學(xué)習(xí)
- 一 Storm 概述
- 1.1 離線計(jì)算是什么?
- 1.2 流式計(jì)算是什么?
- 1.3 Storm 是什么?
- 1.4 Storm 與 Hadoop 的區(qū)別
- 1.5 Storm 應(yīng)用場(chǎng)景及行業(yè)案例
- 1.5.1 運(yùn)用場(chǎng)景
- 1.5.2 典型案列
- 1.6 Storm 特點(diǎn)
- 二 Storm 基礎(chǔ)知識(shí)
- 2.1 Storm 編程模型
- 2.1.1 元組(Tuple)
- 2.1.2 流(Stream)
- 2.1.3 水龍頭(Spout)
- 2.1.4 轉(zhuǎn)接頭(Bolt)
- 2.1.5 拓?fù)?#xff08;Topology)
- 2.2 Storm 核心組件
- 2.2.1 主控節(jié)點(diǎn)與工作節(jié)點(diǎn)
- 2.2.2 Nimbus 進(jìn)程與 Supervisor 進(jìn)程
- 2.2.3 流分組(Stream Grouping)
- 2.2.4 工作進(jìn)程(Worker)
- 2.2.5 執(zhí)行器(Executor)
- 2.2.6 任務(wù)(Task)
- 2.3 實(shí)時(shí)流計(jì)算常見架構(gòu)圖
- 三 Storm 集群搭建
- 3.1 環(huán)境準(zhǔn)備
- 3.1.1 集群規(guī)劃
- 3.1.2 jar 包下載
- 3.1.3 虛擬機(jī)準(zhǔn)備
- 3.1.4 安裝 jdk
- 3.1.5 安裝 Zookeeper
- 3.2 Storm 集群部署
- 3.2.1 配置集群
- 3.2.2 Storm 日志信息查看
- 3.2.3 Storm 命令行操作
- 四 Storm 常用 API
- 4.1 API 簡(jiǎn)介
- 4.1.1 Component 組件
- 4.1.2 Spout 水龍頭
- 4.1.3 Bolt 轉(zhuǎn)接頭
- 4.1.4 Spout 的 tail 特性
- 4.2 網(wǎng)站日志處理案例
- 4.2.1 實(shí)操環(huán)境準(zhǔn)備
- 4.2.2 需求1:將接收到日志的會(huì)話 id 打印在控制臺(tái)
- 4.2.3 需求2:動(dòng)態(tài)增加日志,查看控制臺(tái)打印信息(tail特性)
- 五 Storm 分組策略和并發(fā)度
- 5.1 讀取文件案例思考
- 5.2 分組策略(Stream Grouping)
- 5.3 并發(fā)度
- 5.3.1 場(chǎng)景分析
- 5.3.2 并發(fā)度
- 5.4 實(shí)操案例
- 5.4.1 實(shí)時(shí)單詞統(tǒng)計(jì)案例
- 5.4.2 實(shí)時(shí)計(jì)算網(wǎng)站 PV 案例
- 5.4.3 實(shí)時(shí)計(jì)算網(wǎng)站 UV 去重案例
一 Storm 概述
1.1 離線計(jì)算是什么?
??離線計(jì)算:批量獲取數(shù)據(jù)、批量傳輸數(shù)據(jù)、周期性批量計(jì)算數(shù)據(jù)、數(shù)據(jù)展示。
??代表技術(shù):Sqoop 批量導(dǎo)入數(shù)據(jù)、HDFS 批量存儲(chǔ)數(shù)據(jù)、MapReduce 批量計(jì)算數(shù)據(jù)、Hive 批量計(jì)算數(shù)據(jù)。
1.2 流式計(jì)算是什么?
??流式計(jì)算:數(shù)據(jù)實(shí)時(shí)產(chǎn)生、數(shù)據(jù)實(shí)時(shí)傳輸、數(shù)據(jù)實(shí)時(shí)計(jì)算、實(shí)時(shí)展示。
??代表技術(shù):Flume 實(shí)時(shí)獲取數(shù)據(jù)、Kafka 實(shí)時(shí)數(shù)據(jù)存儲(chǔ)、Storm(阿帕奇)/JStorm(淘寶) 實(shí)時(shí)數(shù)據(jù)計(jì)算、Redis 實(shí)時(shí)結(jié)果緩存、Mysql 持久化存儲(chǔ)。
??離線計(jì)算與實(shí)時(shí)計(jì)算最大的區(qū)別:實(shí)時(shí)收集、實(shí)時(shí)計(jì)算、實(shí)時(shí)展示。
公司整個(gè)后臺(tái)系統(tǒng)架構(gòu)圖解
1.3 Storm 是什么?
??Storm 是一個(gè)分布式計(jì)算框架,主要使用 Clojure 與 Java 語(yǔ)言編寫,最初是由Nathan Marz 帶領(lǐng) Backtype 公司團(tuán)隊(duì)創(chuàng)建,在 Backtype 公司被 Twitter 公司收購(gòu)后進(jìn)行開源。最初的版本是在 2011 年 9 月 17 日發(fā)行,版本號(hào) 0.5.0。
??2013 年9 月,Apache 基金會(huì)開始接管并孵化 Storm 項(xiàng)目。Apache Storm 是在Eclipse Public License下進(jìn)行開發(fā)的,它提供給大多數(shù)企業(yè)使用。經(jīng)過 1 年多時(shí)間,2014 年 9 月,Storm 項(xiàng)目成為 Apache 的頂級(jí)項(xiàng)目。目前,Storm 的最新版本:Storm 1.2.2 Released (04 Jun 2018)。
??Storm 是一個(gè)免費(fèi)開源的分布式實(shí)時(shí)計(jì)算系統(tǒng)。Storm 能輕松可靠地處理無界的數(shù)據(jù)流,就像 Hadoop 對(duì)數(shù)據(jù)進(jìn)行批處理。
1.4 Storm 與 Hadoop 的區(qū)別
??1)Storm 用于實(shí)時(shí)計(jì)算;Hadoop 用于離線計(jì)算。
??2)Storm 處理的數(shù)據(jù)保存在內(nèi)存中,源源不斷;Hadoop 處理的數(shù)據(jù)保存在文件系統(tǒng)中,一批一批處理。
??3)Storm 的數(shù)據(jù)通過網(wǎng)絡(luò)傳輸進(jìn)來;Hadoop 的數(shù)據(jù)保存在磁盤中。
??4)Storm 與 Hadoop 的編程模型相似。
(1)Hadoop 相關(guān)名稱
??Job:任務(wù)名稱
??JobTracker:項(xiàng)目經(jīng)理(JobTracker 對(duì)應(yīng)于 NameNode;JobTracker 是一個(gè) master 服務(wù),軟件啟動(dòng)之后 JobTracker 接收 Job,負(fù)責(zé)調(diào)度 Job 的每一個(gè)子任務(wù) task 運(yùn)行于 TaskTracker 上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的 task 就重新運(yùn)行它)
??TaskTracker:開發(fā)組長(zhǎng)(TaskTracker 對(duì)應(yīng)于 DataNode;TaskTracker 是運(yùn)行在多個(gè)節(jié)點(diǎn)上的 slaver 服務(wù)。TaskTracker 主動(dòng)與 JobTracker 通信,接收作業(yè),并負(fù)責(zé)直接執(zhí)行每一個(gè)任務(wù))
??Child:負(fù)責(zé)開發(fā)的人員
??Mapper/Reduce:開發(fā)人員中的兩種角色,一種是服務(wù)器開發(fā)、一種是客戶端開發(fā)
(2)Storm 相關(guān)名稱
??Topology(拓?fù)?:任務(wù)名稱
??Nimbus:項(xiàng)目經(jīng)理
??Supervisor:開發(fā)組長(zhǎng)
??Worker:開發(fā)人員
??Spout(水龍頭)/Bolt(轉(zhuǎn)接頭):開發(fā)人員中的兩種角色,一種是服務(wù)器開發(fā)、一種是客戶端開發(fā)
1.5 Storm 應(yīng)用場(chǎng)景及行業(yè)案例
??Storm 用來實(shí)時(shí)計(jì)算源源不斷產(chǎn)生的數(shù)據(jù),如同流水線生產(chǎn)。
1.5.1 運(yùn)用場(chǎng)景
??Storm 能用到很多場(chǎng)景中,包括:實(shí)時(shí)分析、在線機(jī)器學(xué)習(xí)、連續(xù)計(jì)算等。
??1)推薦系統(tǒng):實(shí)時(shí)推薦,根據(jù)下單或加入購(gòu)物車推薦相關(guān)商品。
??2)金融系統(tǒng):實(shí)時(shí)分析股票信息數(shù)據(jù)。
??3)預(yù)警系統(tǒng):根據(jù)實(shí)時(shí)采集數(shù)據(jù),判斷是否到了預(yù)警閾值。
??4)網(wǎng)站統(tǒng)計(jì):實(shí)時(shí)銷量、流量統(tǒng)計(jì),如淘寶雙11效果圖。
1.5.2 典型案列
1)京東-實(shí)時(shí)分析系統(tǒng):實(shí)時(shí)分析用戶的屬性,并反饋給搜索引擎
??最初,用戶屬性分析是通過每天在云上定時(shí)運(yùn)行的 MR job 來完成的。為了滿足實(shí)時(shí)性的要求,希望能夠?qū)崟r(shí)分析用戶的行為日志,將最新的用戶屬性反饋給搜索引擎,能夠?yàn)橛脩粽宫F(xiàn)最貼近其當(dāng)前需求的結(jié)果。
2)攜程-網(wǎng)站性能監(jiān)控:實(shí)時(shí)分析系統(tǒng)監(jiān)控?cái)y程網(wǎng)的網(wǎng)站性能
??利用 HTML5 提供的 performance 標(biāo)準(zhǔn)獲得可用的指標(biāo),并記錄日志。Storm 集群實(shí)時(shí)分析日志和入庫(kù)。使用 DRPC 聚合成報(bào)表,通過歷史數(shù)據(jù)對(duì)比等判斷規(guī)則,觸發(fā)預(yù)警事件。
3)淘寶雙十一:實(shí)時(shí)統(tǒng)計(jì)銷售總額
1.6 Storm 特點(diǎn)
??1)適用場(chǎng)景廣泛:Storm 可以適用實(shí)時(shí)處理消息、更新數(shù)據(jù)庫(kù)、持續(xù)計(jì)算等場(chǎng)景。
??2)可伸縮性高:Storm 的可伸縮性可以讓 Storm 每秒處理的消息量達(dá)到很高。擴(kuò)展一個(gè)實(shí)時(shí)計(jì)算任務(wù),你所需要做的就是加機(jī)器并且提高這個(gè)計(jì)算任務(wù)的并行度。Storm 使用 Zookeeper 來協(xié)調(diào)機(jī)器內(nèi)的各種配置使得 Storm 的集群可以很容易的擴(kuò)展。
??3)保證無數(shù)據(jù)丟失:Storm 保證所有的數(shù)據(jù)都被處理。
??4)異常健壯:Storm 集群非常容易管理,輪流重啟節(jié)點(diǎn)不影響應(yīng)用。
??5)容錯(cuò)性好:在消息處理過程中出現(xiàn)異常,Storm 會(huì)進(jìn)行重試。
二 Storm 基礎(chǔ)知識(shí)
2.1 Storm 編程模型
2.1.1 元組(Tuple)
??元組(Tuple),是消息傳遞的基本單元,是一個(gè)命名的值列表,元組中的字段可以是任何類型的對(duì)象。Storm 使用元組作為其數(shù)據(jù)模型,元組支持所有的基本類型、字符串和字節(jié)數(shù)組作為字段值,只要實(shí)現(xiàn)類型的序列化接口就可以使用該類型的對(duì)象。元組本來應(yīng)該是一個(gè) key-value 的 Map,但是由于各個(gè)組件間傳遞的元組的字段名稱已經(jīng)事先定義好,所以只要按序把元組填入各個(gè) value 即可,所以元組是一個(gè) value 的 List。
2.1.2 流(Stream)
??流是 Storm 的核心抽象,是一個(gè)無界的元組系列。源源不斷傳遞的元組就組成了流,在分布式環(huán)境中并行地進(jìn)行創(chuàng)建和處理。
2.1.3 水龍頭(Spout)
??Spout 是拓?fù)涞牧鞯膩碓?#xff0c;是一個(gè)拓?fù)渲挟a(chǎn)生源數(shù)據(jù)流的組件。通常情況下,Spout 會(huì)從外部數(shù)據(jù)源中讀取數(shù)據(jù),然后轉(zhuǎn)換為拓?fù)鋬?nèi)部的源數(shù)據(jù)。
??Spout 可以是可靠的,也可以是不可靠的。如果 Storm 處理元組失敗,可靠的 Spout 能夠重新發(fā)射,而不可靠的 Spout 就盡快忘記發(fā)出的元組。
??Spout 可以發(fā)出超過一個(gè)流。
??Spout 的主要方法是 nextTuple()。NextTuple() 會(huì)發(fā)出一個(gè)新的 Tuple 到拓?fù)?#xff0c;如果沒有新的元組發(fā)出,則簡(jiǎn)單返回。
??Spout 的其他方法是 ack() 和 fail()。當(dāng) Storm 檢測(cè)到一個(gè)元組從 Spout 發(fā)出時(shí),ack() 和 fail() 會(huì)被調(diào)用,要么成功完成通過拓?fù)?#xff0c;要么未能完成。ack() 和 fail() 僅被可靠的 Spout 調(diào)用。
??IRichSpout 是 Spout 必須實(shí)現(xiàn)的接口。
2.1.4 轉(zhuǎn)接頭(Bolt)
??在拓?fù)渲兴刑幚矶荚?Bolt 中完成,Bolt 是流的處理節(jié)點(diǎn),從一個(gè)拓?fù)浣邮諗?shù)據(jù),然后執(zhí)行進(jìn)行處理的組件。Bolt 可以完成過濾、業(yè)務(wù)處理、連接運(yùn)算、連接與訪問數(shù)據(jù)庫(kù)等任何操作。
??Bolt 是一個(gè)被動(dòng)的角色,其接口中有一個(gè) execute() 方法,在接收到消息后會(huì)調(diào)用此方法,用戶可以在其中執(zhí)行自己希望的操作。
??Bolt 可以完成簡(jiǎn)單的流的轉(zhuǎn)換,而完成復(fù)雜的流的轉(zhuǎn)換通常需要多個(gè)步驟,因此需要多個(gè) Bolt。
2.1.5 拓?fù)?#xff08;Topology)
??拓?fù)?#xff08;Topology)是 Storm 中運(yùn)行的一個(gè)實(shí)時(shí)應(yīng)用程序,因?yàn)楦鱾€(gè)組件間的消息流動(dòng)而形成邏輯上的拓?fù)浣Y(jié)構(gòu)。
??把實(shí)時(shí)應(yīng)用程序的運(yùn)行邏輯打成 jar 包后提交到 Storm 的拓?fù)?#xff08;Topology)。Storm 的拓?fù)漕愃朴?MapReduce 的作業(yè)(Job)。其主要的區(qū)別是,MapReduce 的作業(yè)最終會(huì)完成,而一個(gè)拓?fù)溆肋h(yuǎn)都在運(yùn)行直到它被殺死。一個(gè)拓?fù)涫且粋€(gè)圖的 Spout 和 Bolt 的連接流分組。
2.2 Storm 核心組件
??Nimbus 是整個(gè)集群的控管核心,負(fù)責(zé) Topology 的提交、運(yùn)行狀態(tài)監(jiān)控、任務(wù)重新分配等工作。
??Zookeeper 就是一個(gè)管理者,監(jiān)控者。
??總體描述:Nimbus下命令(分配任務(wù)),Zookeeper 監(jiān)督執(zhí)行(心跳監(jiān)控,Worker、Supurvisor的心跳都?xì)w它管),Supervisor領(lǐng)旨(下載代碼),招募人馬(創(chuàng)建Worker和線程等),Worker、Executor就給我干活!Task 就是具體要干的活。
2.2.1 主控節(jié)點(diǎn)與工作節(jié)點(diǎn)
??Storm 集群中有兩類節(jié)點(diǎn):主控節(jié)點(diǎn)(Master Node)和工作節(jié)點(diǎn)(Worker Node)。其中,主控節(jié)點(diǎn)只有一個(gè),而工作節(jié)點(diǎn)可以有多個(gè)。
2.2.2 Nimbus 進(jìn)程與 Supervisor 進(jìn)程
??主控節(jié)點(diǎn)運(yùn)行一個(gè)稱為 Nimbus 的守護(hù)進(jìn)程類似于 Hadoop 的 JobTracker。Nimbus 負(fù)責(zé)在集群中分發(fā)代碼、對(duì)節(jié)點(diǎn)分配任務(wù)、并監(jiān)視主機(jī)故障。
??每個(gè)工作節(jié)點(diǎn)運(yùn)行一個(gè)稱為 Supervisor 的守護(hù)進(jìn)程。Supervisor 監(jiān)聽其主機(jī)上已經(jīng)分配的主機(jī)的作業(yè)、啟動(dòng)和停止 Nimbus 已經(jīng)分配的工作進(jìn)程。
2.2.3 流分組(Stream Grouping)
??流分組,是拓?fù)涠x中的一部分,為每個(gè) Bolt 指定應(yīng)該接收哪個(gè)流作為輸入。流分組定義了流/元組如何在 Bolt 的任務(wù)之間進(jìn)行分發(fā)。
??Storm 內(nèi)置了 8 種流分組方式。
2.2.4 工作進(jìn)程(Worker)
??Worker 是 Spout/Bolt 中運(yùn)行具體處理邏輯的進(jìn)程。一個(gè) Worker 就是一個(gè)進(jìn)程,進(jìn)程里面包含一個(gè)或多個(gè)線程。
2.2.5 執(zhí)行器(Executor)
??一個(gè)線程就是一個(gè) Executor,一個(gè)線程會(huì)處理一個(gè)或多個(gè)任務(wù)。
2.2.6 任務(wù)(Task)
??一個(gè)任務(wù)就是一個(gè) Task。
2.3 實(shí)時(shí)流計(jì)算常見架構(gòu)圖
??1)Flume 獲取數(shù)據(jù)。
??2)Kafka 臨時(shí)保存數(shù)據(jù)。
??3)Strom 計(jì)算數(shù)據(jù)。
??4)Redis 是個(gè)內(nèi)存數(shù)據(jù)庫(kù),用來保存數(shù)據(jù)。
三 Storm 集群搭建
3.1 環(huán)境準(zhǔn)備
3.1.1 集群規(guī)劃
hadoop102 hadoop103 hadoop104 zk zk zk storm storm storm3.1.2 jar 包下載
(1)官方網(wǎng)址:http://storm.apache.org/
注意:本次學(xué)習(xí)演示,本博主使用版本 Storm 1.1.1 Released (1 Aug 2018)
(2)安裝集群步驟:
??官方文檔地址:http://storm.apache.org/releases/1.1.1/Setting-up-a-Storm-cluster.html
3.1.3 虛擬機(jī)準(zhǔn)備
1)準(zhǔn)備3臺(tái)虛擬機(jī)
2)配置ip地址、配置主機(jī)名稱、3臺(tái)主機(jī)分別關(guān)閉防火墻
參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/10335265.html
參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/10349717.html
3.1.4 安裝 jdk
參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/9931593.html
3.1.5 安裝 Zookeeper
0)集群規(guī)劃
在 hadoop102、hadoop103 和 hadoop104 三個(gè)節(jié)點(diǎn)上部署 Zookeeper。
1)解壓安裝
(1)解壓 zookeeper 安裝包到 /opt/module/ 目錄下
(2)在 /opt/module/zookeeper-3.4.10/ 這個(gè)目錄下創(chuàng)建目錄 zkData
mkdir -p zkData(3)重命名 /opt/module/zookeeper-3.4.10/conf 這個(gè)目錄下的 zoo_sample.cfg 為 zoo.cfg
mv zoo_sample.cfg zoo.cfg2)配置 zoo.cfg 文件
(1)具體配置
(2)配置參數(shù)解讀
server.A=B:C:D。 A 是一個(gè)數(shù)字,表示這個(gè)是第幾號(hào)服務(wù)器; B 是這個(gè)服務(wù)器的ip地址; C 是這個(gè)服務(wù)器與集群中的 Leader 服務(wù)器交換信息的端口; D 是萬(wàn)一集群中的 Leader 服務(wù)器掛了,需要一個(gè)端口來重新進(jìn)行選舉,選出一個(gè)新的 Leader,而這個(gè)端口就是用來執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。集群模式下配置一個(gè)文件 myid,這個(gè)文件在 zkData 目錄下,這個(gè)文件里面有一個(gè)數(shù)據(jù)就是 A 的值,Zookeeper 啟動(dòng)時(shí)讀取此文件,拿到里面的數(shù)據(jù)與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個(gè) server。3)集群操作
(1)在 /opt/module/zookeeper-3.4.10/zkData 目錄下創(chuàng)建一個(gè) myid 的文件
添加 myid 文件,注意一定要在 linux 里面創(chuàng)建,在 notepad++ 里面很可能亂碼。
(2)編輯 myid 文件
在文件中添加與 server 對(duì)應(yīng)的編號(hào):如 2
(3)拷貝配置好的 zookeeper 到其他機(jī)器上 或者執(zhí)行配置分發(fā)的腳本
(4)分別啟動(dòng) zookeeper 或者 使用群起腳本啟動(dòng)
[root@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh start [root@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh start [root@hadoop104 zookeeper-3.4.10]# bin/zkServer.sh start(5)查看狀態(tài) 或者 使用腳本查看狀態(tài)
[root@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower [root@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: leader [root@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower3.2 Storm 集群部署
3.2.1 配置集群
1)拷貝 jar 包到 hadoop102 的 /opt/software/ 目錄下
2)解壓 jar 包到 /opt/module 目錄下
3)修改解壓后的 apache-storm-1.1.1.tar.gz 文件名稱為 storm,為了方便
[atguigu@hadoop102 module]$ mv apache-storm-1.1.1/ storm4)在 /opt/module/storm/ 目錄下創(chuàng)建 data 文件夾
[atguigu@hadoop102 storm]$ mkdir data5)修改配置文件
[atguigu@hadoop102 conf]$ pwd /opt/module/storm/conf [atguigu@hadoop102 conf]$ vim storm.yaml修改內(nèi)容如下:
# 設(shè)置 Zookeeper 的主機(jī)名稱 storm.zookeeper.servers:- "hadoop102"- "hadoop103"- "hadoop104"# 設(shè)置主節(jié)點(diǎn)的主機(jī)名稱 nimbus.seeds: ["hadoop102"]# 設(shè)置 Storm 的數(shù)據(jù)存儲(chǔ)路徑 storm.local.dir: "/opt/module/storm/data"# 設(shè)置 Worker 的端口號(hào) supervisor.slots.ports:- 6700- 6701- 6702- 67036)以 root 用戶,配置環(huán)境變量
[root@hadoop102 storm]# vim /etc/profile#STORM_HOME export STORM_HOME=/opt/module/storm export PATH=$PATH:$STORM_HOME/bin使配置文件生效
[root@hadoop102 storm]# source /etc/profile7)分發(fā)配置好的 storm 安裝包
[atguigu@hadoop102 storm]$ xsync storm/8)啟動(dòng) Storm 集群
(1)后臺(tái)啟動(dòng) Nimbus
(2)后臺(tái)啟動(dòng) Supervisor
[atguigu@hadoop102 storm]$ bin/storm supervisor & [atguigu@hadoop102 storm]$ bin/storm supervisor & [atguigu@hadoop102 storm]$ bin/storm supervisor &拓展:fg 命令 表示將放在后臺(tái)的進(jìn)程放到前臺(tái)。
(3)啟動(dòng) Storm UI
9)通過瀏覽器查看集群狀態(tài)
地址:http://hadoop102:8080/index.html
3.2.2 Storm 日志信息查看
1)查看 Nimbus 的日志信息
在 Nimbus 的服務(wù)器上
2)查看 ui 運(yùn)行日志信息
在 ui 的服務(wù)器上,一般和 Nimbus 在一個(gè)服務(wù)器上
3)查看 Supervisor 運(yùn)行日志信息
在 Supervisor 服務(wù) 上
4)查看 Supervisor 上 Worker 運(yùn)行日志信息
在 supervisor 服務(wù)上
5)logviewer,可以在 web 頁(yè)面點(diǎn)擊相應(yīng)的端口號(hào)即可查看日志
分別在 Supervisor 節(jié)點(diǎn)上執(zhí)行:
瀏覽器截圖如下
3.2.3 Storm 命令行操作
1)Nimbus:啟動(dòng) Nimbus 守護(hù)進(jìn)程。
storm nimbus2)Supervisor:啟動(dòng) Supervisor 守護(hù)進(jìn)程。
storm supervisor3)ui:啟動(dòng)UI守護(hù)進(jìn)程。
storm ui4)list:列出正在運(yùn)行的拓?fù)浼捌錉顟B(tài)。
storm list5)logviewer:Logviewer 提供一個(gè) web 接口查看 Storm 日志文件。
storm logviewer6)jar:
storm jar [jar路徑] [拓?fù)浒?拓?fù)漕惷鸧 [拓?fù)涿Q]7)kill:殺死名為 topology-name 的拓?fù)洹?/p> storm kill topology-name [-w wait-time-secs] -w:等待多久后殺死拓?fù)?
8)active:激活指定的拓?fù)?Spout。
storm activate topology-name9)deactivate:禁用指定的拓?fù)?Spout。
storm deactivate topology-name10)help:打印一條幫助消息或者可用命令的列表。
storm help storm help <command>四 Storm 常用 API
4.1 API 簡(jiǎn)介
4.1.1 Component 組件
1)基本接口
??(1)IComponent 接口
??(2)ISpout 接口
??(3)IRichSpout 接口
??(4)IStateSpout 接口
??(5)IRichStateSpout 接口
??(6)IBolt 接口
??(7)IRichBolt 接口
??(8)IBasicBolt 接口
2)基本抽象類
??(1)BaseComponent 抽象類
??(2)BaseRichSpout 抽象類
??(3)BaseRichBolt 抽象類
??(4)BaseTransactionalBolt 抽象類
??(5)BaseBasicBolt 抽象類
4.1.2 Spout 水龍頭
Spout 的最頂層抽象是 ISpout 接口。
(1)open()
??是初始化方法。
(2)close()
??在該 Spout 關(guān)閉前執(zhí)行,但是并不能得到保證其一定被執(zhí)行,kill -9 時(shí)不執(zhí)行,Storm kill {topoName} 時(shí)執(zhí)行。
(3)activate()
??當(dāng) Spout 已經(jīng)從失效模式中激活時(shí)被調(diào)用。該 Spout 的 nextTuple() 方法很快就會(huì)被調(diào)用。
(4)deactivate ()
??當(dāng) Spout 已經(jīng)失效時(shí)被調(diào)用。在 Spout 失效期間,nextTuple 不會(huì)被調(diào)用。Spout 將來可能會(huì)也可能不會(huì)被重新激活。
(5)nextTuple()
??當(dāng)調(diào)用 nextTuple() 方法時(shí),Storm 要求 Spout 發(fā)射元組到輸出收集器(OutputCollecctor)。
??nextTuple() 方法應(yīng)該是非阻塞的,所以,如果 Spout 沒有元組可以發(fā)射,該方法應(yīng)該返回。
??nextTuple()、ack() 和 fail() 方法都在 Spout 任務(wù)的單一線程內(nèi)緊密循環(huán)被調(diào)用。
??當(dāng)沒有元組可以發(fā)射時(shí),可以讓 nextTuple 去 sleep 很短的時(shí)間,例如1毫秒,這樣就不會(huì)浪費(fèi)太多的 CPU 資源。
(6)ack()
??成功處理 Tuple 回調(diào)方法。
(7)fail()
??處理失敗 Tuple 回調(diào)方法。
??原則:通常情況下(Shell 和事務(wù)型的除外),實(shí)現(xiàn)一個(gè) Spout,可以直接實(shí)現(xiàn)接口 IRichSpout,如果不想寫多余的代碼,可以直接繼承 BaseRichSpout。
4.1.3 Bolt 轉(zhuǎn)接頭
Bolt 的最頂層抽象是 IBolt 接口。
(1)prepare()
??prepare() 方法在集群的工作進(jìn)程內(nèi)被初始化時(shí)被調(diào)用,提供了 Bolt 執(zhí)行所需要的環(huán)境。
(2)execute()
??接受一個(gè) Tuple 進(jìn)行處理,也可 emit 數(shù)據(jù)到下一級(jí)組件。
(3)cleanup()
??cleanup方法當(dāng)一個(gè) IBolt 即將關(guān)閉時(shí)被調(diào)用。不能保證 cleanup() 方法一定會(huì)被調(diào)用,因?yàn)?Supervisor 可以對(duì)集群的工作進(jìn)程使用 kill -9 命令強(qiáng)制殺死進(jìn)程命令。
??如果在本地模式下運(yùn)行 Storm,當(dāng)拓?fù)浔粴⑺赖臅r(shí)候,可以保證 cleanup() 方法一定會(huì)被調(diào)用。
??實(shí)現(xiàn)一個(gè) Bolt,可以實(shí)現(xiàn) IRichBolt 接口或繼承 BaseRichBolt,如果不想自己處理結(jié)果反饋,可以實(shí)現(xiàn) IBasicBolt 接口或繼承 BaseBasicBolt,它實(shí)際上相當(dāng)于自動(dòng)做了 prepare 方法和 collector.emit.ack(inputTuple)。
4.1.4 Spout 的 tail 特性
Storm 可以實(shí)時(shí)監(jiān)測(cè)文件數(shù)據(jù),當(dāng)文件數(shù)據(jù)變化時(shí),Storm 自動(dòng)讀取。
4.2 網(wǎng)站日志處理案例
4.2.1 實(shí)操環(huán)境準(zhǔn)備
??1)打開 eclipse,創(chuàng)建一個(gè) java 工程
??2)在工程目錄中創(chuàng)建 lib 文件夾
??3)解壓 apache-storm-1.1.1,并把解壓后 lib 包下的文件復(fù)制到 java 工程的 lib 文件夾中,然后執(zhí)行 build path。
4.2.2 需求1:將接收到日志的會(huì)話 id 打印在控制臺(tái)
1)需求:
??(1)模擬訪問網(wǎng)站的日志信息,包括:網(wǎng)站名稱、會(huì)話 id、訪問網(wǎng)站時(shí)間等。
??(2)將接收到日志的會(huì)話 id 打印到控制臺(tái)。
2)分析:
??(1)創(chuàng)建網(wǎng)站訪問日志工具類。
??(2)在 spout 中讀取日志文件,并一行一行發(fā)射出去。
??(3)在 bolt 中將獲取到的一行一行數(shù)據(jù)的會(huì)話 id 獲取到,并打印到控制臺(tái)。
??(4)main 方法負(fù)責(zé)拼接 spout 和 bolt 的拓?fù)洹?br />
3)案例實(shí)操:
(1)創(chuàng)建網(wǎng)站訪問日志
示例代碼如下:
(2)創(chuàng)建 spout
示例代碼如下:
(3)創(chuàng)建 bolt
示例代碼如下:
(4)創(chuàng)建main
示例代碼如下:
輸出結(jié)果如下:
Thread-46-weblogbolt-executor[5 5] lines:1 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:2 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:3 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:4 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:5 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:6 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:7 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:8 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:9 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:10 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:11 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:12 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:13 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:14 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:15 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:16 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:17 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:18 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:19 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:20 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:21 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:22 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:23 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:24 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:25 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:26 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:27 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:28 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:29 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:30 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U1234.2.3 需求2:動(dòng)態(tài)增加日志,查看控制臺(tái)打印信息(tail特性)
1)在需求1基礎(chǔ)上,運(yùn)行程序。
2)打開 website.log 日志文件,增加日志調(diào)試并保存。
3)觀察控制臺(tái)打印的信息。
結(jié)論:Storm 可以動(dòng)態(tài)實(shí)時(shí)監(jiān)測(cè)文件的增加信息,并把信息讀取到再處理。
五 Storm 分組策略和并發(fā)度
5.1 讀取文件案例思考
1)spout 數(shù)據(jù)源:數(shù)據(jù)庫(kù)、文件、MQ(比如:Kafka)
2)數(shù)據(jù)源是數(shù)據(jù)庫(kù):只適合讀取數(shù)據(jù)庫(kù)的配置文件
3)數(shù)據(jù)源是文件:只適合測(cè)試、講課用(因?yàn)榧菏欠植际郊?#xff09;
4)企業(yè)產(chǎn)生的 log 文件處理步驟:
??(1)讀出內(nèi)容寫 入MQ
??(2)Storm 再處理
5.2 分組策略(Stream Grouping)
stream grouping 用來定義一個(gè) stream 應(yīng)該如何分配給 Bolts 上面的多個(gè) Executors(多線程、多并發(fā))。
Storm 里面有 7 種類型的 stream grouping,詳情如下:
1)Shuffle Grouping: 隨機(jī)分組,輪詢,平均分配。隨機(jī)派發(fā) stream 里面的 tuple,保證每個(gè) bolt 接收到的 tuple 數(shù)目大致相同。
2)Fields Grouping:按字段分組,比如按 userid 來分組,具有同樣 userid 的 tuple 會(huì)被分到相同的 bolts 里的一個(gè) task,而不同的 userid 則會(huì)被分配到不同的 bolts 里的 task。
3)All Grouping:廣播發(fā)送,對(duì)于每一個(gè) tuple,所有的 bolts 都會(huì)收到。
4)Global Grouping:全局分組,這個(gè) tuple 被分配到 storm 中的一個(gè) bolt 的其中一個(gè) task。再具體一點(diǎn)就是分配給 id 值最低的那個(gè) task。
5)None Grouping:不分組,這個(gè)分組的意思是說 stream 不關(guān)心到底誰(shuí)會(huì)收到它的 tuple。目前這種分組和 Shuffle Grouping 是一樣的效果。在多線程情況下不平均分配。
6)Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個(gè) task 處理這個(gè)消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息 tuple 必須使用 emitDirect 方法來發(fā)射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的 task 的 id (OutputCollector.emit 方法也會(huì)返回 task 的 id)。
7)Local or Shuffle Grouping:如果目標(biāo) bolt 有一個(gè)或者多個(gè) task 在同一個(gè)工作進(jìn)程中,tuple 將會(huì)被隨機(jī)發(fā)送給這些 tasks。否則,和普通的 Shuffle Grouping 行為一致。
8)測(cè)試
??(1)spout 并發(fā)度修改為 2,bolt 并發(fā)度修改為 1,Shuffle Grouping 模式
??(2)spout 并發(fā)度修改為 1,bolt 并發(fā)度修改為 2,None Grouping 模式
// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).noneGrouping("weblogspout");每個(gè) bolt 接收到的數(shù)據(jù)不同。??(3)spout 并發(fā)度修改為 1,bolt 并發(fā)度修改為 2,Fields Grouping 模式
// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).fieldsGrouping("weblogspout", new Fields("log"));基于 web 案例效果不明顯,后續(xù)案例效果比較明顯。??(4)spout 并發(fā)度修改為 1,bolt 并發(fā)度修改為 2,All Grouping 模式
// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).allGrouping("weblogspout");每一個(gè) bolt 獲取到的數(shù)據(jù)都是一樣的。??(5)spout 并發(fā)度修改為 1,bolt 并發(fā)度修改為 2,Global Grouping 模式
// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).globalGrouping("weblogspout");task 的 id 最低的 bolt 獲取到了所有數(shù)據(jù)。5.3 并發(fā)度
5.3.1 場(chǎng)景分析
1)單線程下:加減乘除、全局匯總
2)多線程下:局部加減乘除、持久化DB等
??(1)思考:如何計(jì)算:word 總數(shù)和 word 個(gè)數(shù)?并且在高并發(fā)下完成
??前者是統(tǒng)計(jì)總行數(shù),后者是去重 word 個(gè)數(shù)。
??類似企業(yè)場(chǎng)景:計(jì)算網(wǎng)站 PV 和 UV
??(2)網(wǎng)站最常用的兩個(gè)指標(biāo):
??PV(page views):count(session_id) 即頁(yè)面瀏覽量。
??UV(user views):count(distinct session_id) 即獨(dú)立訪客數(shù)。
??a)用 ip 地址分析
??指訪問某個(gè)站點(diǎn)或點(diǎn)擊某個(gè)網(wǎng)頁(yè)的不同 ip 地址的人數(shù)。在同一天內(nèi),UV 只記錄第一次進(jìn)入網(wǎng)站的具有獨(dú)立 IP 的訪問者,在同一天內(nèi)再次訪問該網(wǎng)站則不計(jì)數(shù)。
??b)用 Cookie 分析 UV 值
??當(dāng)客戶端第一次訪問某個(gè)網(wǎng)站服務(wù)器的時(shí)候,網(wǎng)站服務(wù)器會(huì)給這個(gè)客戶端的電腦發(fā)出一個(gè) Cookie,通常放在這個(gè)客戶端電腦的 C 盤當(dāng)中。在這個(gè) Cookie 中會(huì)分配一個(gè)獨(dú)一無二的編號(hào),這其中會(huì)記錄一些訪問服務(wù)器的信息,如訪問時(shí)間、訪問了哪些頁(yè)面等等。當(dāng)你下次再訪問這個(gè)服務(wù)器的時(shí)候,服務(wù)器就可以直接從你的電腦中找到上一次放進(jìn)去的 Cookie 文件,并且對(duì)其進(jìn)行一些更新,但那個(gè)獨(dú)一無二的編號(hào)是不會(huì)變的。
??實(shí)時(shí)處理的業(yè)務(wù)場(chǎng)景主要包括:匯總型(如網(wǎng)站 PV、銷售額、訂單數(shù))、去重型(如網(wǎng)站 UV、顧客數(shù)、銷售商品數(shù))
5.3.2 并發(fā)度
??并發(fā)度:用戶指定一個(gè)任務(wù),可以被多個(gè)線程執(zhí)行,并發(fā)度的數(shù)量等于線程 executor 的數(shù)量。
??task 就是具體的處理邏輯對(duì)象,一個(gè) executor 線程可以執(zhí)行一個(gè)或多個(gè) tasks,但一般默認(rèn)每個(gè) executor 只執(zhí)行一個(gè) task,所以我們往往認(rèn)為 task 就是執(zhí)行線程,其實(shí)不是。
??task 代表最大并發(fā)度,一個(gè) component 的 task 數(shù)是不會(huì)改變的,但是一個(gè) componet 的 executer 數(shù)目是會(huì)發(fā)生變化的(storm rebalance 命令),task 數(shù) >= executor 數(shù),executor 數(shù)代表實(shí)際并發(fā)數(shù)。
5.4 實(shí)操案例
5.4.1 實(shí)時(shí)單詞統(tǒng)計(jì)案例
1)需求
??實(shí)時(shí)統(tǒng)計(jì)發(fā)射到 Storm 框架中單詞的總數(shù)。
2)分析
??設(shè)計(jì)一個(gè) topology,來實(shí)現(xiàn)對(duì)文檔里面的單詞出現(xiàn)的頻率進(jìn)行統(tǒng)計(jì)。
整個(gè) topology 分為三個(gè)部分:
??(1)WordCountSpout:數(shù)據(jù)源,在已知的英文句子中,隨機(jī)發(fā)送一條句子出去。
??(2)WordCountSplitBolt:負(fù)責(zé)將單行文本記錄(句子)切分成單詞。
??(3)WordCountBolt:負(fù)責(zé)對(duì)單詞的頻率進(jìn)行累加。
3)實(shí)操
??(1)創(chuàng)建 spout
??(2)創(chuàng)建切割單詞的 bolt
package com.atgui.storm.wordcount;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class WordCountSplitBolt extends BaseRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector = null;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的一行數(shù)據(jù)// String line = input.getStringByField("love");String line = input.getString(0);// 2、截取數(shù)據(jù)String[] arrWords = line.split(" ");// 3、發(fā)射數(shù)據(jù)(發(fā)送給下一級(jí) Bolt)for (String word : arrWords) {collector.emit(new Values(word, 1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("word", "num"));}}??(3)創(chuàng)建匯總單詞個(gè)數(shù)的 bolt
package com.atgui.storm.wordcount;import java.util.HashMap; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;public class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 1L;// 定義一個(gè) HashMap 用于存放統(tǒng)計(jì)后的結(jié)果,其中單詞為 key,單詞個(gè)數(shù)為 valueprivate Map<String, Integer> map = new HashMap<>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數(shù)據(jù)String word = input.getString(0); // 第一個(gè)數(shù)據(jù)Integer num = input.getInteger(1); // 第二個(gè)數(shù)據(jù)// 2、統(tǒng)計(jì)單詞個(gè)數(shù)if (map.containsKey(word)) {Integer count = map.get(word);count = count + num;map.put(word, count);} else {map.put(word, num);}// 3、控制臺(tái)打印(以紅色的字體 err 方式)System.err.println(Thread.currentThread().getId() + " word:" + word + " num:" + map.get(word));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}}??(4)創(chuàng)建程序的拓?fù)?main
package com.atgui.storm.wordcount;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class WordCountMain {public static void main(String[] args) {// 1、創(chuàng)建拓?fù)鋵?duì)象TopologyBuilder builder = new TopologyBuilder();// 2、設(shè)置 Spout 和 Boltbuilder.setSpout("WordCountSpout", new WordCountSpout(), 1);builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).shuffleGrouping("WordCountSpout");builder.setBolt("WordCountBolt", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word"));// 3、配置 Worker 開啟的個(gè)數(shù)Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {// 4、分布式提交StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 5、本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("WordCountTopology", conf, builder.createTopology());}} }??(5)測(cè)試
發(fā)現(xiàn) 159 線程只處理單詞 am 和單詞 love,163 進(jìn)程處理單詞 i、ximen、jianlian。這就是分組的好處。
5.4.2 實(shí)時(shí)計(jì)算網(wǎng)站 PV 案例
0)基礎(chǔ)知識(shí)準(zhǔn)備
1)需求
??統(tǒng)計(jì)網(wǎng)站 pv(頁(yè)面瀏覽量)。
2)需求分析
方案一:
??定義 static long pv,Synchronized 控制累計(jì)操作。(不可行)
??原因:Synchronized 和 Lock 在單 JVM 下有效,但在多 JVM 下無效。
方案二:
??ShuffleGrouping 下,pv * Executer 并發(fā)數(shù)
??驅(qū)動(dòng)函數(shù)中配置如下:
??優(yōu)點(diǎn):簡(jiǎn)單、計(jì)算量小。
??缺點(diǎn):稍有誤差,但絕大多數(shù)場(chǎng)景能接受。
方案三:
??PVBolt1 進(jìn)行多并發(fā)局部匯總,PVSumBolt 單線程進(jìn)行全局匯總。
??線程安全:多線程處理的結(jié)果和單線程一致。
??優(yōu)點(diǎn):絕對(duì)準(zhǔn)確;如果用 filedGrouping 可以得到中間值,如單個(gè) user 的訪問 PV(訪問深度等)。
??缺點(diǎn):計(jì)算量稍大,且多一個(gè) Bolt。
3)案例實(shí)操
??(1)創(chuàng)建數(shù)據(jù)輸入源 PVSpout
??(2)創(chuàng)建數(shù)據(jù)處理 PVBolt1
package com.atgui.storm.pv;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class PVBolt1 implements IRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector;private long pv = 0;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數(shù)據(jù)String line = input.getString(0);// 2、截取出 session_idString sessionID = line.split("\t")[1];// 3、根據(jù)會(huì)話id不同統(tǒng)計(jì) pv 次數(shù)if (sessionID != null) {pv++;}// 4、提交collector.emit(new Values(Thread.currentThread().getId(), pv));System.err.println("threadID:" + Thread.currentThread().getId() + " pv:" + pv);}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("threadID", "pv"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}??(3)創(chuàng)建 PVSumBolt
package com.atgui.storm.pv;import java.util.HashMap; import java.util.Iterator; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple;public class PVSumBolt implements IRichBolt {private static final long serialVersionUID = 1L;private Map<Long, Long> map = new HashMap<Long, Long>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 獲取數(shù)據(jù)Long threadID = input.getLong(0);Long pv = input.getLong(1);map.put(threadID, pv);long wordSum = 0;Iterator<Long> iterator = map.values().iterator();while(iterator.hasNext()) {wordSum += iterator.next();}System.err.println("pvAll:" + wordSum);}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}??(4)創(chuàng)建程序的拓?fù)?PVMain
package com.atgui.storm.pv;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;public class PVMain {public static void main(String[] args) {// 1、創(chuàng)建拓?fù)鋵?duì)象TopologyBuilder builder = new TopologyBuilder();// 2、設(shè)置 Spout 和 Boltbuilder.setSpout("PVSpout", new PVSpout(), 1);builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout");builder.setBolt("PVSumBolt", new PVSumBolt(), 1).shuffleGrouping("PVBolt1");// 3、配置 Worker 開啟的個(gè)數(shù)Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {// 4、分布式提交StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 5、本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("PVopology", conf, builder.createTopology());}} }??(5)測(cè)試,執(zhí)行程序輸出如下結(jié)果
threadID:157 pv:1 pvAll:1 threadID:161 pv:1 pvAll:2 threadID:161 pv:2 pvAll:3 threadID:157 pv:2 pvAll:4 threadID:169 pv:1 pvAll:5 threadID:161 pv:3 pvAll:6 threadID:159 pv:1 pvAll:7 threadID:169 pv:2 pvAll:8 threadID:161 pv:4 pvAll:9 threadID:157 pv:3 pvAll:10 threadID:169 pv:3 pvAll:11 threadID:169 pv:4 pvAll:12 threadID:169 pv:5 pvAll:13 threadID:161 pv:5 pvAll:14 threadID:159 pv:2 pvAll:15 threadID:157 pv:4 pvAll:16 threadID:161 pv:6 pvAll:17 threadID:159 pv:3 pvAll:18 threadID:159 pv:4 pvAll:19 threadID:169 pv:6 pvAll:20 threadID:157 pv:5 pvAll:21 threadID:157 pv:6 pvAll:22 threadID:157 pv:7 pvAll:23 threadID:169 pv:7 pvAll:24 threadID:159 pv:5 pvAll:25 threadID:169 pv:8 pvAll:26 threadID:157 pv:8 pvAll:27 threadID:169 pv:9 pvAll:28 threadID:157 pv:9 pvAll:29 threadID:159 pv:6 pvAll:30我們將各個(gè)線程最后一次的輸出進(jìn)行累加 threadID:161 pv:6 threadID:169 pv:9 threadID:157 pv:9 threadID:159 pv:6 結(jié)果是 pvAll:30綜上:代碼測(cè)試完成!5.4.3 實(shí)時(shí)計(jì)算網(wǎng)站 UV 去重案例
1)需求:
??統(tǒng)計(jì)網(wǎng)站 UV(獨(dú)立訪客數(shù))。
2)需求分析
方案一:
??把 ip 放入 Set 實(shí)現(xiàn)自動(dòng)去重,Set.size() 獲得 UV(分布式應(yīng)用中不可行)。
方案二:
??UVBolt1 通過 fieldGrouping 進(jìn)行多線程局部匯總,下一級(jí) UVSumBolt 進(jìn)行單線程全局匯總?cè)ブ亍0?ip 地址統(tǒng)計(jì) UV 數(shù)。
??既然去重,必須持久化數(shù)據(jù):
??(1)內(nèi)存:數(shù)據(jù)結(jié)構(gòu) map
??(2)no-sql 分布式數(shù)據(jù)庫(kù),如 Hbase
3)案例實(shí)操
??(1)創(chuàng)建帶 ip 地址的數(shù)據(jù)源 GenerateData
??(2)創(chuàng)建接收數(shù)據(jù) UVSpout
package com.atgui.storm.uv;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class UVSpout implements IRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector = null;private BufferedReader reader = null;private String str = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;// 讀取文件try {reader = new BufferedReader(new InputStreamReader(new FileInputStream("d:/temp/storm/website.log"), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}@Overridepublic void close() {try {if (reader != null) {reader.close();}} catch (IOException e) {e.printStackTrace();}}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void nextTuple() {// 發(fā)射數(shù)據(jù)try {while ((str = reader.readLine()) != null) {// 發(fā)射collector.emit(new Values(str));Thread.sleep(500);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}??(3)創(chuàng)建 UVBolt1
package com.atgui.storm.uv;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class UVBolt1 implements IRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數(shù)據(jù)String line = input.getString(0);// 2、截取出 ipString ip = line.split("\t")[3];// 3、提交collector.emit(new Values(ip, 1));}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("ip", "num"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}??(4)創(chuàng)建 UVSumBolt
package com.atgui.storm.uv;import java.util.HashMap; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple;public class UVSumBolt implements IRichBolt {private static final long serialVersionUID = 1L;private Map<String, Integer> map = new HashMap<String, Integer>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數(shù)據(jù)String ip = input.getString(0);Integer num = input.getInteger(1);// 2、累加單詞if (map.containsKey(ip)) {Integer count = map.get(ip);map.put(ip, count + num);} else {map.put(ip, num);}System.err.println(Thread.currentThread().getId() + " ip:" + ip + " num:" + map.get(ip));}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}??(5)創(chuàng)建驅(qū)動(dòng) UVMain
package com.atgui.storm.uv;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;public class UVMain {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("UVSpout", new UVSpout(), 1);builder.setBolt("UVBolt1", new UVBolt1(), 4).shuffleGrouping("UVSpout");builder.setBolt("UVSumBolt", new UVSumBolt(), 1).shuffleGrouping("UVBolt1");Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("UVtopology", conf, builder.createTopology());}} }??(6)測(cè)試
163 ip:192.168.1.104 num:1 163 ip:192.168.1.105 num:1 163 ip:192.168.1.108 num:1 163 ip:192.168.1.104 num:2 163 ip:192.168.1.106 num:1 163 ip:192.168.1.107 num:1 163 ip:192.168.1.103 num:1 163 ip:192.168.1.101 num:1 163 ip:192.168.1.102 num:1 163 ip:192.168.1.105 num:2 163 ip:192.168.1.107 num:2 163 ip:192.168.1.104 num:3 163 ip:192.168.1.103 num:2 163 ip:192.168.1.107 num:3 163 ip:192.168.1.104 num:4 163 ip:192.168.1.105 num:3 163 ip:192.168.1.108 num:2 163 ip:192.168.1.106 num:2 163 ip:192.168.1.106 num:3 163 ip:192.168.1.108 num:3 163 ip:192.168.1.105 num:4 163 ip:192.168.1.104 num:5 163 ip:192.168.1.107 num:4 163 ip:192.168.1.103 num:3 163 ip:192.168.1.103 num:4 163 ip:192.168.1.103 num:5 163 ip:192.168.1.101 num:2 163 ip:192.168.1.102 num:2 163 ip:192.168.1.105 num:5 163 ip:192.168.1.101 num:3測(cè)試結(jié)果:一共8個(gè)用戶, 101:訪問3次; 102:訪問2次; 103:訪問5次; 104:訪問5次; 105:訪問5次; 106:訪問3次; 107:訪問4次; 108:訪問3次;我的GitHub地址:https://github.com/heizemingjun
我的博客園地址:https://www.cnblogs.com/chenmingjun
我的CSDN地址:https://blog.csdn.net/u012990179
我的螞蟻筆記博客地址:https://blog.leanote.com/chenmingjun
Copyright ?2018~2019 黑澤君
【轉(zhuǎn)載文章務(wù)必保留出處和署名,謝謝!】
總結(jié)
以上是生活随笔為你收集整理的大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 抽屉效果_[Java教程]抽屉
- 下一篇: 手把手教你在centos7安装k8s集群