日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度

發布時間:2023/12/20 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

大數據技術之_17_Storm學習

    • 一 Storm 概述
      • 1.1 離線計算是什么?
      • 1.2 流式計算是什么?
      • 1.3 Storm 是什么?
      • 1.4 Storm 與 Hadoop 的區別
      • 1.5 Storm 應用場景及行業案例
        • 1.5.1 運用場景
        • 1.5.2 典型案列
      • 1.6 Storm 特點
    • 二 Storm 基礎知識
      • 2.1 Storm 編程模型
        • 2.1.1 元組(Tuple)
        • 2.1.2 流(Stream)
        • 2.1.3 水龍頭(Spout)
        • 2.1.4 轉接頭(Bolt)
        • 2.1.5 拓撲(Topology)
      • 2.2 Storm 核心組件
        • 2.2.1 主控節點與工作節點
        • 2.2.2 Nimbus 進程與 Supervisor 進程
        • 2.2.3 流分組(Stream Grouping)
        • 2.2.4 工作進程(Worker)
        • 2.2.5 執行器(Executor)
        • 2.2.6 任務(Task)
      • 2.3 實時流計算常見架構圖
    • 三 Storm 集群搭建
      • 3.1 環境準備
        • 3.1.1 集群規劃
        • 3.1.2 jar 包下載
        • 3.1.3 虛擬機準備
        • 3.1.4 安裝 jdk
        • 3.1.5 安裝 Zookeeper
      • 3.2 Storm 集群部署
        • 3.2.1 配置集群
        • 3.2.2 Storm 日志信息查看
        • 3.2.3 Storm 命令行操作
    • 四 Storm 常用 API
      • 4.1 API 簡介
        • 4.1.1 Component 組件
        • 4.1.2 Spout 水龍頭
        • 4.1.3 Bolt 轉接頭
        • 4.1.4 Spout 的 tail 特性
      • 4.2 網站日志處理案例
        • 4.2.1 實操環境準備
        • 4.2.2 需求1:將接收到日志的會話 id 打印在控制臺
        • 4.2.3 需求2:動態增加日志,查看控制臺打印信息(tail特性)
    • 五 Storm 分組策略和并發度
      • 5.1 讀取文件案例思考
      • 5.2 分組策略(Stream Grouping)
      • 5.3 并發度
        • 5.3.1 場景分析
        • 5.3.2 并發度
      • 5.4 實操案例
        • 5.4.1 實時單詞統計案例
        • 5.4.2 實時計算網站 PV 案例
        • 5.4.3 實時計算網站 UV 去重案例

一 Storm 概述

1.1 離線計算是什么?

??離線計算:批量獲取數據、批量傳輸數據、周期性批量計算數據、數據展示。
??代表技術:Sqoop 批量導入數據、HDFS 批量存儲數據、MapReduce 批量計算數據、Hive 批量計算數據。

1.2 流式計算是什么?

??流式計算:數據實時產生、數據實時傳輸、數據實時計算、實時展示。
??代表技術:Flume 實時獲取數據、Kafka 實時數據存儲、Storm(阿帕奇)/JStorm(淘寶) 實時數據計算、Redis 實時結果緩存、Mysql 持久化存儲。

??離線計算與實時計算最大的區別:實時收集、實時計算、實時展示。

公司整個后臺系統架構圖解

1.3 Storm 是什么?

??Storm 是一個分布式計算框架,主要使用 Clojure 與 Java 語言編寫,最初是由Nathan Marz 帶領 Backtype 公司團隊創建,在 Backtype 公司被 Twitter 公司收購后進行開源。最初的版本是在 2011 年 9 月 17 日發行,版本號 0.5.0。

??2013 年9 月,Apache 基金會開始接管并孵化 Storm 項目。Apache Storm 是在Eclipse Public License下進行開發的,它提供給大多數企業使用。經過 1 年多時間,2014 年 9 月,Storm 項目成為 Apache 的頂級項目。目前,Storm 的最新版本:Storm 1.2.2 Released (04 Jun 2018)。

??Storm 是一個免費開源的分布式實時計算系統。Storm 能輕松可靠地處理無界的數據流,就像 Hadoop 對數據進行批處理。

1.4 Storm 與 Hadoop 的區別

??1)Storm 用于實時計算;Hadoop 用于離線計算。
??2)Storm 處理的數據保存在內存中,源源不斷;Hadoop 處理的數據保存在文件系統中,一批一批處理。
??3)Storm 的數據通過網絡傳輸進來;Hadoop 的數據保存在磁盤中。
??4)Storm 與 Hadoop 的編程模型相似。


(1)Hadoop 相關名稱
??Job:任務名稱
??JobTracker:項目經理(JobTracker 對應于 NameNode;JobTracker 是一個 master 服務,軟件啟動之后 JobTracker 接收 Job,負責調度 Job 的每一個子任務 task 運行于 TaskTracker 上,并監控它們,如果發現有失敗的 task 就重新運行它)
??TaskTracker:開發組長(TaskTracker 對應于 DataNode;TaskTracker 是運行在多個節點上的 slaver 服務。TaskTracker 主動與 JobTracker 通信,接收作業,并負責直接執行每一個任務)
??Child:負責開發的人員
??Mapper/Reduce:開發人員中的兩種角色,一種是服務器開發、一種是客戶端開發
(2)Storm 相關名稱
??Topology(拓撲):任務名稱
??Nimbus:項目經理
??Supervisor:開發組長
??Worker:開發人員
??Spout(水龍頭)/Bolt(轉接頭):開發人員中的兩種角色,一種是服務器開發、一種是客戶端開發

1.5 Storm 應用場景及行業案例

??Storm 用來實時計算源源不斷產生的數據,如同流水線生產。

1.5.1 運用場景

??Storm 能用到很多場景中,包括:實時分析、在線機器學習、連續計算等。
??1)推薦系統:實時推薦,根據下單或加入購物車推薦相關商品。
??2)金融系統:實時分析股票信息數據。
??3)預警系統:根據實時采集數據,判斷是否到了預警閾值。
??4)網站統計:實時銷量、流量統計,如淘寶雙11效果圖。

1.5.2 典型案列

1)京東-實時分析系統:實時分析用戶的屬性,并反饋給搜索引擎
??最初,用戶屬性分析是通過每天在云上定時運行的 MR job 來完成的。為了滿足實時性的要求,希望能夠實時分析用戶的行為日志,將最新的用戶屬性反饋給搜索引擎,能夠為用戶展現最貼近其當前需求的結果。

2)攜程-網站性能監控:實時分析系統監控攜程網的網站性能
??利用 HTML5 提供的 performance 標準獲得可用的指標,并記錄日志。Storm 集群實時分析日志和入庫。使用 DRPC 聚合成報表,通過歷史數據對比等判斷規則,觸發預警事件。

3)淘寶雙十一:實時統計銷售總額

1.6 Storm 特點

??1)適用場景廣泛:Storm 可以適用實時處理消息、更新數據庫、持續計算等場景。
??2)可伸縮性高:Storm 的可伸縮性可以讓 Storm 每秒處理的消息量達到很高。擴展一個實時計算任務,你所需要做的就是加機器并且提高這個計算任務的并行度。Storm 使用 Zookeeper 來協調機器內的各種配置使得 Storm 的集群可以很容易的擴展。
??3)保證無數據丟失:Storm 保證所有的數據都被處理。
??4)異常健壯:Storm 集群非常容易管理,輪流重啟節點不影響應用。
??5)容錯性好:在消息處理過程中出現異常,Storm 會進行重試。

二 Storm 基礎知識

2.1 Storm 編程模型

2.1.1 元組(Tuple)

??元組(Tuple),是消息傳遞的基本單元,是一個命名的值列表,元組中的字段可以是任何類型的對象。Storm 使用元組作為其數據模型,元組支持所有的基本類型、字符串和字節數組作為字段值,只要實現類型的序列化接口就可以使用該類型的對象。元組本來應該是一個 key-value 的 Map,但是由于各個組件間傳遞的元組的字段名稱已經事先定義好,所以只要按序把元組填入各個 value 即可,所以元組是一個 value 的 List。

2.1.2 流(Stream)

??流是 Storm 的核心抽象,是一個無界的元組系列。源源不斷傳遞的元組就組成了流,在分布式環境中并行地進行創建和處理。

2.1.3 水龍頭(Spout)

??Spout 是拓撲的流的來源,是一個拓撲中產生源數據流的組件。通常情況下,Spout 會從外部數據源中讀取數據,然后轉換為拓撲內部的源數據。
??Spout 可以是可靠的,也可以是不可靠的。如果 Storm 處理元組失敗,可靠的 Spout 能夠重新發射,而不可靠的 Spout 就盡快忘記發出的元組。
??Spout 可以發出超過一個流。
??Spout 的主要方法是 nextTuple()。NextTuple() 會發出一個新的 Tuple 到拓撲,如果沒有新的元組發出,則簡單返回。
??Spout 的其他方法是 ack() 和 fail()。當 Storm 檢測到一個元組從 Spout 發出時,ack() 和 fail() 會被調用,要么成功完成通過拓撲,要么未能完成。ack() 和 fail() 僅被可靠的 Spout 調用。
??IRichSpout 是 Spout 必須實現的接口。

2.1.4 轉接頭(Bolt)

??在拓撲中所有處理都在 Bolt 中完成,Bolt 是流的處理節點,從一個拓撲接收數據,然后執行進行處理的組件。Bolt 可以完成過濾、業務處理、連接運算、連接與訪問數據庫等任何操作。
??Bolt 是一個被動的角色,其接口中有一個 execute() 方法,在接收到消息后會調用此方法,用戶可以在其中執行自己希望的操作。
??Bolt 可以完成簡單的流的轉換,而完成復雜的流的轉換通常需要多個步驟,因此需要多個 Bolt。

2.1.5 拓撲(Topology)

??拓撲(Topology)是 Storm 中運行的一個實時應用程序,因為各個組件間的消息流動而形成邏輯上的拓撲結構。
??把實時應用程序的運行邏輯打成 jar 包后提交到 Storm 的拓撲(Topology)。Storm 的拓撲類似于 MapReduce 的作業(Job)。其主要的區別是,MapReduce 的作業最終會完成,而一個拓撲永遠都在運行直到它被殺死。一個拓撲是一個圖的 Spout 和 Bolt 的連接流分組。

2.2 Storm 核心組件

??Nimbus 是整個集群的控管核心,負責 Topology 的提交、運行狀態監控、任務重新分配等工作。
??Zookeeper 就是一個管理者,監控者。
??總體描述:Nimbus下命令(分配任務),Zookeeper 監督執行(心跳監控,Worker、Supurvisor的心跳都歸它管),Supervisor領旨(下載代碼),招募人馬(創建Worker和線程等),Worker、Executor就給我干活!Task 就是具體要干的活。

2.2.1 主控節點與工作節點

??Storm 集群中有兩類節點:主控節點(Master Node)和工作節點(Worker Node)。其中,主控節點只有一個,而工作節點可以有多個。

2.2.2 Nimbus 進程與 Supervisor 進程

??主控節點運行一個稱為 Nimbus 的守護進程類似于 Hadoop 的 JobTracker。Nimbus 負責在集群中分發代碼、對節點分配任務、并監視主機故障。
??每個工作節點運行一個稱為 Supervisor 的守護進程。Supervisor 監聽其主機上已經分配的主機的作業、啟動和停止 Nimbus 已經分配的工作進程。

2.2.3 流分組(Stream Grouping)

??流分組,是拓撲定義中的一部分,為每個 Bolt 指定應該接收哪個流作為輸入。流分組定義了流/元組如何在 Bolt 的任務之間進行分發。
??Storm 內置了 8 種流分組方式。

2.2.4 工作進程(Worker)

??Worker 是 Spout/Bolt 中運行具體處理邏輯的進程。一個 Worker 就是一個進程,進程里面包含一個或多個線程。

2.2.5 執行器(Executor)

??一個線程就是一個 Executor,一個線程會處理一個或多個任務。

2.2.6 任務(Task)

??一個任務就是一個 Task。

2.3 實時流計算常見架構圖


??1)Flume 獲取數據。
??2)Kafka 臨時保存數據。
??3)Strom 計算數據。
??4)Redis 是個內存數據庫,用來保存數據。

三 Storm 集群搭建

3.1 環境準備

3.1.1 集群規劃

hadoop102 hadoop103 hadoop104 zk zk zk storm storm storm

3.1.2 jar 包下載

(1)官方網址:http://storm.apache.org/
注意:本次學習演示,本博主使用版本 Storm 1.1.1 Released (1 Aug 2018)

(2)安裝集群步驟:
??官方文檔地址:http://storm.apache.org/releases/1.1.1/Setting-up-a-Storm-cluster.html

3.1.3 虛擬機準備

1)準備3臺虛擬機
2)配置ip地址、配置主機名稱、3臺主機分別關閉防火墻
參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/10335265.html
參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/10349717.html

3.1.4 安裝 jdk

參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/9931593.html

3.1.5 安裝 Zookeeper

0)集群規劃
在 hadoop102、hadoop103 和 hadoop104 三個節點上部署 Zookeeper。
1)解壓安裝
(1)解壓 zookeeper 安裝包到 /opt/module/ 目錄下

[atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

(2)在 /opt/module/zookeeper-3.4.10/ 這個目錄下創建目錄 zkData

mkdir -p zkData

(3)重命名 /opt/module/zookeeper-3.4.10/conf 這個目錄下的 zoo_sample.cfg 為 zoo.cfg

mv zoo_sample.cfg zoo.cfg

2)配置 zoo.cfg 文件
(1)具體配置

dataDir=/opt/module/zookeeper-3.4.10/zkData增加如下配置 #######################cluster########################## server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888

(2)配置參數解讀

server.A=B:C:D。 A 是一個數字,表示這個是第幾號服務器; B 是這個服務器的ip地址; C 是這個服務器與集群中的 Leader 服務器交換信息的端口; D 是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。集群模式下配置一個文件 myid,這個文件在 zkData 目錄下,這個文件里面有一個數據就是 A 的值,Zookeeper 啟動時讀取此文件,拿到里面的數據與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個 server。

3)集群操作
(1)在 /opt/module/zookeeper-3.4.10/zkData 目錄下創建一個 myid 的文件

touch myid

添加 myid 文件,注意一定要在 linux 里面創建,在 notepad++ 里面很可能亂碼。
(2)編輯 myid 文件

vim myid

在文件中添加與 server 對應的編號:如 2
(3)拷貝配置好的 zookeeper 到其他機器上 或者執行配置分發的腳本

scp -r /opt/module/zookeeper-3.4.10/ root@hadoop103:/opt/module/ scp -r /opt/module/zookeeper-3.4.10/ root@hadoop104:/opt/module/ 并分別修改 myid 文件中內容為 3、4

(4)分別啟動 zookeeper 或者 使用群起腳本啟動

[root@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh start [root@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh start [root@hadoop104 zookeeper-3.4.10]# bin/zkServer.sh start

(5)查看狀態 或者 使用腳本查看狀態

[root@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower [root@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: leader [root@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower

3.2 Storm 集群部署

3.2.1 配置集群

1)拷貝 jar 包到 hadoop102 的 /opt/software/ 目錄下
2)解壓 jar 包到 /opt/module 目錄下

[atguigu@hadoop102 software]$ tar -zxvf apache-storm-1.1.1.tar.gz -C /opt/module/

3)修改解壓后的 apache-storm-1.1.1.tar.gz 文件名稱為 storm,為了方便

[atguigu@hadoop102 module]$ mv apache-storm-1.1.1/ storm

4)在 /opt/module/storm/ 目錄下創建 data 文件夾

[atguigu@hadoop102 storm]$ mkdir data

5)修改配置文件

[atguigu@hadoop102 conf]$ pwd /opt/module/storm/conf [atguigu@hadoop102 conf]$ vim storm.yaml

修改內容如下:

# 設置 Zookeeper 的主機名稱 storm.zookeeper.servers:- "hadoop102"- "hadoop103"- "hadoop104"# 設置主節點的主機名稱 nimbus.seeds: ["hadoop102"]# 設置 Storm 的數據存儲路徑 storm.local.dir: "/opt/module/storm/data"# 設置 Worker 的端口號 supervisor.slots.ports:- 6700- 6701- 6702- 6703

6)以 root 用戶,配置環境變量

[root@hadoop102 storm]# vim /etc/profile#STORM_HOME export STORM_HOME=/opt/module/storm export PATH=$PATH:$STORM_HOME/bin

使配置文件生效

[root@hadoop102 storm]# source /etc/profile

7)分發配置好的 storm 安裝包

[atguigu@hadoop102 storm]$ xsync storm/

8)啟動 Storm 集群
(1)后臺啟動 Nimbus

[atguigu@hadoop102 storm]$ bin/storm nimbus & [atguigu@hadoop103 storm]$ bin/storm nimbus & [atguigu@hadoop104 storm]$ bin/storm nimbus &

(2)后臺啟動 Supervisor

[atguigu@hadoop102 storm]$ bin/storm supervisor & [atguigu@hadoop102 storm]$ bin/storm supervisor & [atguigu@hadoop102 storm]$ bin/storm supervisor &

拓展:fg 命令 表示將放在后臺的進程放到前臺。
(3)啟動 Storm UI

[atguigu@hadoop102 storm]$ bin/storm ui

9)通過瀏覽器查看集群狀態
地址:http://hadoop102:8080/index.html

3.2.2 Storm 日志信息查看

1)查看 Nimbus 的日志信息
在 Nimbus 的服務器上

cd /opt/module/storm/logs tail -100f /opt/module/storm/logs/nimbus.log

2)查看 ui 運行日志信息
在 ui 的服務器上,一般和 Nimbus 在一個服務器上

cd /opt/module/storm/logs tail -100f /opt/module/storm/logs/ui.log

3)查看 Supervisor 運行日志信息
在 Supervisor 服務 上

cd /opt/module/storm/logs tail -100f /opt/module/storm/logs/supervisor.log

4)查看 Supervisor 上 Worker 運行日志信息
在 supervisor 服務上

cd /opt/module/storm/logs tail -100f /opt/module/storm/logs/worker-6702.log

5)logviewer,可以在 web 頁面點擊相應的端口號即可查看日志
分別在 Supervisor 節點上執行:

[atguigu@hadoop102 storm]$ bin/storm logviewer & [atguigu@hadoop103 storm]$ bin/storm logviewer & [atguigu@hadoop104 storm]$ bin/storm logviewer &

瀏覽器截圖如下

3.2.3 Storm 命令行操作

1)Nimbus:啟動 Nimbus 守護進程。

storm nimbus

2)Supervisor:啟動 Supervisor 守護進程。

storm supervisor

3)ui:啟動UI守護進程。

storm ui

4)list:列出正在運行的拓撲及其狀態。

storm list

5)logviewer:Logviewer 提供一個 web 接口查看 Storm 日志文件。

storm logviewer

6)jar:

storm jar [jar路徑] [拓撲包名.拓撲類名] [拓撲名稱]

7)kill:殺死名為 topology-name 的拓撲。

storm kill topology-name [-w wait-time-secs] -w:等待多久后殺死拓撲

8)active:激活指定的拓撲 Spout。

storm activate topology-name

9)deactivate:禁用指定的拓撲 Spout。

storm deactivate topology-name

10)help:打印一條幫助消息或者可用命令的列表。

storm help storm help <command>

四 Storm 常用 API

4.1 API 簡介

4.1.1 Component 組件

1)基本接口
??(1)IComponent 接口
??(2)ISpout 接口
??(3)IRichSpout 接口
??(4)IStateSpout 接口
??(5)IRichStateSpout 接口
??(6)IBolt 接口
??(7)IRichBolt 接口
??(8)IBasicBolt 接口
2)基本抽象類
??(1)BaseComponent 抽象類
??(2)BaseRichSpout 抽象類
??(3)BaseRichBolt 抽象類
??(4)BaseTransactionalBolt 抽象類
??(5)BaseBasicBolt 抽象類

4.1.2 Spout 水龍頭

Spout 的最頂層抽象是 ISpout 接口。

(1)open()
??是初始化方法。
(2)close()
??在該 Spout 關閉前執行,但是并不能得到保證其一定被執行,kill -9 時不執行,Storm kill {topoName} 時執行。
(3)activate()
??當 Spout 已經從失效模式中激活時被調用。該 Spout 的 nextTuple() 方法很快就會被調用。
(4)deactivate ()
??當 Spout 已經失效時被調用。在 Spout 失效期間,nextTuple 不會被調用。Spout 將來可能會也可能不會被重新激活。
(5)nextTuple()
??當調用 nextTuple() 方法時,Storm 要求 Spout 發射元組到輸出收集器(OutputCollecctor)。
??nextTuple() 方法應該是非阻塞的,所以,如果 Spout 沒有元組可以發射,該方法應該返回。
??nextTuple()、ack() 和 fail() 方法都在 Spout 任務的單一線程內緊密循環被調用。
??當沒有元組可以發射時,可以讓 nextTuple 去 sleep 很短的時間,例如1毫秒,這樣就不會浪費太多的 CPU 資源。
(6)ack()
??成功處理 Tuple 回調方法。
(7)fail()
??處理失敗 Tuple 回調方法。
??原則:通常情況下(Shell 和事務型的除外),實現一個 Spout,可以直接實現接口 IRichSpout,如果不想寫多余的代碼,可以直接繼承 BaseRichSpout。

4.1.3 Bolt 轉接頭

Bolt 的最頂層抽象是 IBolt 接口。

(1)prepare()
??prepare() 方法在集群的工作進程內被初始化時被調用,提供了 Bolt 執行所需要的環境。
(2)execute()
??接受一個 Tuple 進行處理,也可 emit 數據到下一級組件。
(3)cleanup()
??cleanup方法當一個 IBolt 即將關閉時被調用。不能保證 cleanup() 方法一定會被調用,因為 Supervisor 可以對集群的工作進程使用 kill -9 命令強制殺死進程命令。
??如果在本地模式下運行 Storm,當拓撲被殺死的時候,可以保證 cleanup() 方法一定會被調用。
??實現一個 Bolt,可以實現 IRichBolt 接口或繼承 BaseRichBolt,如果不想自己處理結果反饋,可以實現 IBasicBolt 接口或繼承 BaseBasicBolt,它實際上相當于自動做了 prepare 方法和 collector.emit.ack(inputTuple)。

4.1.4 Spout 的 tail 特性

Storm 可以實時監測文件數據,當文件數據變化時,Storm 自動讀取。

4.2 網站日志處理案例

4.2.1 實操環境準備

??1)打開 eclipse,創建一個 java 工程
??2)在工程目錄中創建 lib 文件夾
??3)解壓 apache-storm-1.1.1,并把解壓后 lib 包下的文件復制到 java 工程的 lib 文件夾中,然后執行 build path。

4.2.2 需求1:將接收到日志的會話 id 打印在控制臺

1)需求:
??(1)模擬訪問網站的日志信息,包括:網站名稱、會話 id、訪問網站時間等。
??(2)將接收到日志的會話 id 打印到控制臺。
2)分析:
??(1)創建網站訪問日志工具類。
??(2)在 spout 中讀取日志文件,并一行一行發射出去。
??(3)在 bolt 中將獲取到的一行一行數據的會話 id 獲取到,并打印到控制臺。
??(4)main 方法負責拼接 spout 和 bolt 的拓撲。

3)案例實操:
(1)創建網站訪問日志
示例代碼如下:

package com.atgui.storm.weblog;import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random;public class GenerateData {public static void main(String[] args) {// 1、創建文件路徑File logFile = new File("d:/temp/storm/website.log");// 2、準備數據// 2.1 網站名稱String[] hosts = { "www.atguigu.com" };// 2.2 會話idString[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34","BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };// 2.3 訪問網站時間String[] time = { "2017-08-07 08:40:50", "2017-08-07 08:40:51", "2017-08-07 08:40:52", "2017-08-07 08:40:53","2017-08-07 09:40:49", "2017-08-07 10:40:49", "2017-08-07 11:40:49", "2017-08-07 12:40:49" };// 3、拼接數據StringBuffer sb = new StringBuffer();Random random = new Random();for (int i = 0; i < 30; i++) {sb.append(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)] + "\n");}// 判斷log日志是否存在,不存在要創建if (!logFile.exists()) {try {logFile.createNewFile();} catch (IOException e) {System.out.println("Create logFile fail !");}}byte[] b = (sb.toString()).getBytes();// 4、 寫數據到文件FileOutputStream fileOutputStream = null;try {fileOutputStream = new FileOutputStream(logFile);fileOutputStream.write(b);System.out.println("Generate data over !");} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {// 5、關閉資源try {fileOutputStream.close();} catch (IOException e) {e.printStackTrace();}}} }

(2)創建 spout
示例代碼如下:

package com.atgui.storm.weblog;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class WebLogSpout implements IRichSpout {private static final long serialVersionUID = 1L;private BufferedReader bufferedReader = null;private SpoutOutputCollector collector = null;private String str = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;// 打開輸入的文件try {this.bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("d:/temp/storm/website.log"),"UTF-8"));} catch (FileNotFoundException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}@Overridepublic void nextTuple() {// 循環調用的方法try {while ((str = bufferedReader.readLine()) != null) {// 發射數據collector.emit(new Values(str));Thread.sleep(500);}} catch (IOException | InterruptedException e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("log"));}@Overridepublic void close() {}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

(3)創建 bolt
示例代碼如下:

package com.atgui.storm.weblog;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple;public class WebLogBolt implements IRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector = null;private int num = 0;private String valueString = null;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {try {// 1、獲取傳遞過來的數據valueString = input.getStringByField("log");// 2、如果輸入的數據不為空,行數++if (valueString != null) {num++;System.err.println(Thread.currentThread().getName() + " lines:" + num + " session_id:" + valueString.split("\t")[1]);}// 3、應答Spout接收成功collector.ack(input);Thread.sleep(1000);} catch (Exception e) {// 4、應答Spout接收失敗collector.fail(input);e.printStackTrace();}}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段類型declarer.declare(new Fields(""));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

(4)創建main
示例代碼如下:

package com.atgui.storm.weblog;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;public class WebLogMain {public static void main(String[] args) {// 1、創建拓撲對象TopologyBuilder builder = new TopologyBuilder();// 2、設置 Spout 和 Boltbuilder.setSpout("weblogspout", new WebLogSpout(), 1);builder.setBolt("weblogbolt", new WebLogBolt(), 1).shuffleGrouping("weblogspout");// 3、配置 Worker 開啟個數Config conf = new Config();conf.setNumWorkers(4);if (args.length > 0) {try {// 4、分布式提交StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 5、本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("weblogtopology", conf, builder.createTopology());}} }

輸出結果如下:

Thread-46-weblogbolt-executor[5 5] lines:1 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:2 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:3 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:4 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:5 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:6 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:7 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:8 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:9 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:10 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:11 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:12 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:13 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:14 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:15 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:16 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:17 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:18 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:19 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:20 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:21 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:22 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:23 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:24 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:25 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:26 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:27 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:28 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:29 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:30 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123

4.2.3 需求2:動態增加日志,查看控制臺打印信息(tail特性)

1)在需求1基礎上,運行程序。
2)打開 website.log 日志文件,增加日志調試并保存。
3)觀察控制臺打印的信息。
結論:Storm 可以動態實時監測文件的增加信息,并把信息讀取到再處理。

五 Storm 分組策略和并發度

5.1 讀取文件案例思考

1)spout 數據源:數據庫、文件、MQ(比如:Kafka)
2)數據源是數據庫:只適合讀取數據庫的配置文件
3)數據源是文件:只適合測試、講課用(因為集群是分布式集群)
4)企業產生的 log 文件處理步驟:
??(1)讀出內容寫 入MQ
??(2)Storm 再處理

5.2 分組策略(Stream Grouping)

stream grouping 用來定義一個 stream 應該如何分配給 Bolts 上面的多個 Executors(多線程、多并發)。

Storm 里面有 7 種類型的 stream grouping,詳情如下:
1)Shuffle Grouping: 隨機分組,輪詢,平均分配。隨機派發 stream 里面的 tuple,保證每個 bolt 接收到的 tuple 數目大致相同。

2)Fields Grouping:按字段分組,比如按 userid 來分組,具有同樣 userid 的 tuple 會被分到相同的 bolts 里的一個 task,而不同的 userid 則會被分配到不同的 bolts 里的 task。

3)All Grouping:廣播發送,對于每一個 tuple,所有的 bolts 都會收到。

4)Global Grouping:全局分組,這個 tuple 被分配到 storm 中的一個 bolt 的其中一個 task。再具體一點就是分配給 id 值最低的那個 task。

5)None Grouping:不分組,這個分組的意思是說 stream 不關心到底誰會收到它的 tuple。目前這種分組和 Shuffle Grouping 是一樣的效果。在多線程情況下不平均分配。

6)Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發送者指定由消息接收者的哪個 task 處理這個消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息 tuple 必須使用 emitDirect 方法來發射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的 task 的 id (OutputCollector.emit 方法也會返回 task 的 id)。

7)Local or Shuffle Grouping:如果目標 bolt 有一個或者多個 task 在同一個工作進程中,tuple 將會被隨機發送給這些 tasks。否則,和普通的 Shuffle Grouping 行為一致。

8)測試
??(1)spout 并發度修改為 2,bolt 并發度修改為 1,Shuffle Grouping 模式

// 2、設置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 2); builder.setBolt("weblogbolt", new WebLogBolt(), 1).shuffleGrouping("weblogspout");spout 開兩個線程會對數據讀取兩份,打印出來就是 2 份。如果數據源是消息隊列,就不會出來讀取兩份的數據(統一消費者組,只能有一個消費者)。Thread-53-weblogbolt-executor[5 5] lines:1 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-53-weblogbolt-executor[5 5] lines:2 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-53-weblogbolt-executor[5 5] lines:3 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-53-weblogbolt-executor[5 5] lines:4 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7

??(2)spout 并發度修改為 1,bolt 并發度修改為 2,None Grouping 模式

// 2、設置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).noneGrouping("weblogspout");每個 bolt 接收到的數據不同。

??(3)spout 并發度修改為 1,bolt 并發度修改為 2,Fields Grouping 模式

// 2、設置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).fieldsGrouping("weblogspout", new Fields("log"));基于 web 案例效果不明顯,后續案例效果比較明顯。

??(4)spout 并發度修改為 1,bolt 并發度修改為 2,All Grouping 模式

// 2、設置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).allGrouping("weblogspout");每一個 bolt 獲取到的數據都是一樣的。

??(5)spout 并發度修改為 1,bolt 并發度修改為 2,Global Grouping 模式

// 2、設置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).globalGrouping("weblogspout");task 的 id 最低的 bolt 獲取到了所有數據。

5.3 并發度

5.3.1 場景分析

1)單線程下:加減乘除、全局匯總
2)多線程下:局部加減乘除、持久化DB等
??(1)思考:如何計算:word 總數和 word 個數?并且在高并發下完成
??前者是統計總行數,后者是去重 word 個數。
??類似企業場景:計算網站 PV 和 UV
??(2)網站最常用的兩個指標:
??PV(page views):count(session_id) 即頁面瀏覽量。
??UV(user views):count(distinct session_id) 即獨立訪客數。
??a)用 ip 地址分析
??指訪問某個站點或點擊某個網頁的不同 ip 地址的人數。在同一天內,UV 只記錄第一次進入網站的具有獨立 IP 的訪問者,在同一天內再次訪問該網站則不計數。
??b)用 Cookie 分析 UV 值
??當客戶端第一次訪問某個網站服務器的時候,網站服務器會給這個客戶端的電腦發出一個 Cookie,通常放在這個客戶端電腦的 C 盤當中。在這個 Cookie 中會分配一個獨一無二的編號,這其中會記錄一些訪問服務器的信息,如訪問時間、訪問了哪些頁面等等。當你下次再訪問這個服務器的時候,服務器就可以直接從你的電腦中找到上一次放進去的 Cookie 文件,并且對其進行一些更新,但那個獨一無二的編號是不會變的。
??實時處理的業務場景主要包括:匯總型(如網站 PV、銷售額、訂單數)、去重型(如網站 UV、顧客數、銷售商品數)

5.3.2 并發度

??并發度:用戶指定一個任務,可以被多個線程執行,并發度的數量等于線程 executor 的數量。
??task 就是具體的處理邏輯對象,一個 executor 線程可以執行一個或多個 tasks,但一般默認每個 executor 只執行一個 task,所以我們往往認為 task 就是執行線程,其實不是。
??task 代表最大并發度,一個 component 的 task 數是不會改變的,但是一個 componet 的 executer 數目是會發生變化的(storm rebalance 命令),task 數 >= executor 數,executor 數代表實際并發數。

5.4 實操案例

5.4.1 實時單詞統計案例

1)需求
??實時統計發射到 Storm 框架中單詞的總數。

2)分析
??設計一個 topology,來實現對文檔里面的單詞出現的頻率進行統計。
整個 topology 分為三個部分:
??(1)WordCountSpout:數據源,在已知的英文句子中,隨機發送一條句子出去。
??(2)WordCountSplitBolt:負責將單行文本記錄(句子)切分成單詞。
??(3)WordCountBolt:負責對單詞的頻率進行累加。

3)實操
??(1)創建 spout

package com.atgui.storm.wordcount;import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class WordCountSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}@Overridepublic void nextTuple() {// 發射一條語句collector.emit(new Values("i am ximen love jinlian"));try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("love"));}}

??(2)創建切割單詞的 bolt

package com.atgui.storm.wordcount;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class WordCountSplitBolt extends BaseRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector = null;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的一行數據// String line = input.getStringByField("love");String line = input.getString(0);// 2、截取數據String[] arrWords = line.split(" ");// 3、發射數據(發送給下一級 Bolt)for (String word : arrWords) {collector.emit(new Values(word, 1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("word", "num"));}}

??(3)創建匯總單詞個數的 bolt

package com.atgui.storm.wordcount;import java.util.HashMap; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;public class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 1L;// 定義一個 HashMap 用于存放統計后的結果,其中單詞為 key,單詞個數為 valueprivate Map<String, Integer> map = new HashMap<>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數據String word = input.getString(0); // 第一個數據Integer num = input.getInteger(1); // 第二個數據// 2、統計單詞個數if (map.containsKey(word)) {Integer count = map.get(word);count = count + num;map.put(word, count);} else {map.put(word, num);}// 3、控制臺打印(以紅色的字體 err 方式)System.err.println(Thread.currentThread().getId() + " word:" + word + " num:" + map.get(word));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}}

??(4)創建程序的拓撲 main

package com.atgui.storm.wordcount;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class WordCountMain {public static void main(String[] args) {// 1、創建拓撲對象TopologyBuilder builder = new TopologyBuilder();// 2、設置 Spout 和 Boltbuilder.setSpout("WordCountSpout", new WordCountSpout(), 1);builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).shuffleGrouping("WordCountSpout");builder.setBolt("WordCountBolt", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word"));// 3、配置 Worker 開啟的個數Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {// 4、分布式提交StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 5、本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("WordCountTopology", conf, builder.createTopology());}} }

??(5)測試
發現 159 線程只處理單詞 am 和單詞 love,163 進程處理單詞 i、ximen、jianlian。這就是分組的好處。

163 word:i num:1 163 word:ximen num:1 163 word:jinlian num:1 159 word:am num:1 159 word:love num:1 163 word:i num:2 163 word:ximen num:2 163 word:jinlian num:2 159 word:am num:2 159 word:love num:2 163 word:i num:3 163 word:ximen num:3 163 word:jinlian num:3 159 word:am num:3 159 word:love num:3

5.4.2 實時計算網站 PV 案例

0)基礎知識準備
1)需求
??統計網站 pv(頁面瀏覽量)。
2)需求分析
方案一:
??定義 static long pv,Synchronized 控制累計操作。(不可行)
??原因:Synchronized 和 Lock 在單 JVM 下有效,但在多 JVM 下無效。
方案二:
??ShuffleGrouping 下,pv * Executer 并發數
??驅動函數中配置如下:

builder.setSpout("PVSpout", new PVSpout(), 1); builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout"); 在 PVBolt1 中輸出時 System.err.println("threadid:" + Thread.currentThread().getId() + " pv:" + pv * 4); 因為 shuffleGrouping 輪詢分配

??優點:簡單、計算量小。
??缺點:稍有誤差,但絕大多數場景能接受。
方案三:
??PVBolt1 進行多并發局部匯總,PVSumBolt 單線程進行全局匯總。
??線程安全:多線程處理的結果和單線程一致。
??優點:絕對準確;如果用 filedGrouping 可以得到中間值,如單個 user 的訪問 PV(訪問深度等)。
??缺點:計算量稍大,且多一個 Bolt。

3)案例實操
??(1)創建數據輸入源 PVSpout

package com.atgui.storm.pv;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class PVSpout implements IRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector = null;private BufferedReader reader = null;private String str = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;// 讀取文件try {reader = new BufferedReader(new InputStreamReader(new FileInputStream("d:/temp/storm/website.log"), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}@Overridepublic void close() {try {if (reader != null) {reader.close();}} catch (IOException e) {e.printStackTrace();}}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void nextTuple() {// 發射數據try {while ((str = reader.readLine()) != null) {// 發射collector.emit(new Values(str));Thread.sleep(500);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(2)創建數據處理 PVBolt1

package com.atgui.storm.pv;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class PVBolt1 implements IRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector;private long pv = 0;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數據String line = input.getString(0);// 2、截取出 session_idString sessionID = line.split("\t")[1];// 3、根據會話id不同統計 pv 次數if (sessionID != null) {pv++;}// 4、提交collector.emit(new Values(Thread.currentThread().getId(), pv));System.err.println("threadID:" + Thread.currentThread().getId() + " pv:" + pv);}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("threadID", "pv"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(3)創建 PVSumBolt

package com.atgui.storm.pv;import java.util.HashMap; import java.util.Iterator; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple;public class PVSumBolt implements IRichBolt {private static final long serialVersionUID = 1L;private Map<Long, Long> map = new HashMap<Long, Long>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 獲取數據Long threadID = input.getLong(0);Long pv = input.getLong(1);map.put(threadID, pv);long wordSum = 0;Iterator<Long> iterator = map.values().iterator();while(iterator.hasNext()) {wordSum += iterator.next();}System.err.println("pvAll:" + wordSum);}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(4)創建程序的拓撲 PVMain

package com.atgui.storm.pv;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;public class PVMain {public static void main(String[] args) {// 1、創建拓撲對象TopologyBuilder builder = new TopologyBuilder();// 2、設置 Spout 和 Boltbuilder.setSpout("PVSpout", new PVSpout(), 1);builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout");builder.setBolt("PVSumBolt", new PVSumBolt(), 1).shuffleGrouping("PVBolt1");// 3、配置 Worker 開啟的個數Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {// 4、分布式提交StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 5、本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("PVopology", conf, builder.createTopology());}} }

??(5)測試,執行程序輸出如下結果

threadID:157 pv:1 pvAll:1 threadID:161 pv:1 pvAll:2 threadID:161 pv:2 pvAll:3 threadID:157 pv:2 pvAll:4 threadID:169 pv:1 pvAll:5 threadID:161 pv:3 pvAll:6 threadID:159 pv:1 pvAll:7 threadID:169 pv:2 pvAll:8 threadID:161 pv:4 pvAll:9 threadID:157 pv:3 pvAll:10 threadID:169 pv:3 pvAll:11 threadID:169 pv:4 pvAll:12 threadID:169 pv:5 pvAll:13 threadID:161 pv:5 pvAll:14 threadID:159 pv:2 pvAll:15 threadID:157 pv:4 pvAll:16 threadID:161 pv:6 pvAll:17 threadID:159 pv:3 pvAll:18 threadID:159 pv:4 pvAll:19 threadID:169 pv:6 pvAll:20 threadID:157 pv:5 pvAll:21 threadID:157 pv:6 pvAll:22 threadID:157 pv:7 pvAll:23 threadID:169 pv:7 pvAll:24 threadID:159 pv:5 pvAll:25 threadID:169 pv:8 pvAll:26 threadID:157 pv:8 pvAll:27 threadID:169 pv:9 pvAll:28 threadID:157 pv:9 pvAll:29 threadID:159 pv:6 pvAll:30我們將各個線程最后一次的輸出進行累加 threadID:161 pv:6 threadID:169 pv:9 threadID:157 pv:9 threadID:159 pv:6 結果是 pvAll:30綜上:代碼測試完成!

5.4.3 實時計算網站 UV 去重案例

1)需求:
??統計網站 UV(獨立訪客數)。

2)需求分析
方案一:
??把 ip 放入 Set 實現自動去重,Set.size() 獲得 UV(分布式應用中不可行)。
方案二:
??UVBolt1 通過 fieldGrouping 進行多線程局部匯總,下一級 UVSumBolt 進行單線程全局匯總去重。按 ip 地址統計 UV 數。
??既然去重,必須持久化數據:
??(1)內存:數據結構 map
??(2)no-sql 分布式數據庫,如 Hbase

3)案例實操
??(1)創建帶 ip 地址的數據源 GenerateData

package com.atgui.storm.uv;import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random;public class GenerateData {public static void main(String[] args) {// 1、創建文件路徑File logFile = new File("d:/temp/storm/website.log");// 2、準備數據// 2.1 網站名稱String[] hosts = { "www.atguigu.com" };// 2.2 會話idString[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34","BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };// 2.3 訪問網站時間String[] time = { "2017-08-07 08:40:50", "2017-08-07 08:40:51", "2017-08-07 08:40:52", "2017-08-07 08:40:53","2017-08-07 09:40:49", "2017-08-07 10:40:49", "2017-08-07 11:40:49", "2017-08-07 12:40:49" };// 2.4 訪問網站ip地址String[] ip = { "192.168.1.101", "192.168.1.102", "192.168.1.103", "192.168.1.104", "192.168.1.105","192.168.1.106", "192.168.1.107", "192.168.1.108" };// 3、拼接數據StringBuffer sb = new StringBuffer();Random random = new Random();for (int i = 0; i < 30; i++) {sb.append(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)] + "\t" + ip[random.nextInt(8)] + "\n");}// 判斷log日志是否存在,不存在要創建if (!logFile.exists()) {try {logFile.createNewFile();} catch (IOException e) {System.out.println("Create logFile fail !");}}byte[] b = (sb.toString()).getBytes();// 4、 寫數據到文件FileOutputStream fileOutputStream = null;try {fileOutputStream = new FileOutputStream(logFile);fileOutputStream.write(b);System.out.println("Generate data over !");} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {// 5、關閉資源try {fileOutputStream.close();} catch (IOException e) {e.printStackTrace();}}} }

??(2)創建接收數據 UVSpout

package com.atgui.storm.uv;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class UVSpout implements IRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector = null;private BufferedReader reader = null;private String str = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;// 讀取文件try {reader = new BufferedReader(new InputStreamReader(new FileInputStream("d:/temp/storm/website.log"), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}@Overridepublic void close() {try {if (reader != null) {reader.close();}} catch (IOException e) {e.printStackTrace();}}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void nextTuple() {// 發射數據try {while ((str = reader.readLine()) != null) {// 發射collector.emit(new Values(str));Thread.sleep(500);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(3)創建 UVBolt1

package com.atgui.storm.uv;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class UVBolt1 implements IRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數據String line = input.getString(0);// 2、截取出 ipString ip = line.split("\t")[3];// 3、提交collector.emit(new Values(ip, 1));}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("ip", "num"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(4)創建 UVSumBolt

package com.atgui.storm.uv;import java.util.HashMap; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple;public class UVSumBolt implements IRichBolt {private static final long serialVersionUID = 1L;private Map<String, Integer> map = new HashMap<String, Integer>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數據String ip = input.getString(0);Integer num = input.getInteger(1);// 2、累加單詞if (map.containsKey(ip)) {Integer count = map.get(ip);map.put(ip, count + num);} else {map.put(ip, num);}System.err.println(Thread.currentThread().getId() + " ip:" + ip + " num:" + map.get(ip));}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(5)創建驅動 UVMain

package com.atgui.storm.uv;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;public class UVMain {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("UVSpout", new UVSpout(), 1);builder.setBolt("UVBolt1", new UVBolt1(), 4).shuffleGrouping("UVSpout");builder.setBolt("UVSumBolt", new UVSumBolt(), 1).shuffleGrouping("UVBolt1");Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("UVtopology", conf, builder.createTopology());}} }

??(6)測試

163 ip:192.168.1.104 num:1 163 ip:192.168.1.105 num:1 163 ip:192.168.1.108 num:1 163 ip:192.168.1.104 num:2 163 ip:192.168.1.106 num:1 163 ip:192.168.1.107 num:1 163 ip:192.168.1.103 num:1 163 ip:192.168.1.101 num:1 163 ip:192.168.1.102 num:1 163 ip:192.168.1.105 num:2 163 ip:192.168.1.107 num:2 163 ip:192.168.1.104 num:3 163 ip:192.168.1.103 num:2 163 ip:192.168.1.107 num:3 163 ip:192.168.1.104 num:4 163 ip:192.168.1.105 num:3 163 ip:192.168.1.108 num:2 163 ip:192.168.1.106 num:2 163 ip:192.168.1.106 num:3 163 ip:192.168.1.108 num:3 163 ip:192.168.1.105 num:4 163 ip:192.168.1.104 num:5 163 ip:192.168.1.107 num:4 163 ip:192.168.1.103 num:3 163 ip:192.168.1.103 num:4 163 ip:192.168.1.103 num:5 163 ip:192.168.1.101 num:2 163 ip:192.168.1.102 num:2 163 ip:192.168.1.105 num:5 163 ip:192.168.1.101 num:3測試結果:一共8個用戶, 101:訪問3次; 102:訪問2次; 103:訪問5次; 104:訪問5次; 105:訪問5次; 106:訪問3次; 107:訪問4次; 108:訪問3次;

我的GitHub地址:https://github.com/heizemingjun
我的博客園地址:https://www.cnblogs.com/chenmingjun
我的CSDN地址:https://blog.csdn.net/u012990179
我的螞蟻筆記博客地址:https://blog.leanote.com/chenmingjun
Copyright ?2018~2019 黑澤君
【轉載文章務必保留出處和署名,謝謝!】

總結

以上是生活随笔為你收集整理的大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。