日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度

發(fā)布時(shí)間:2023/12/20 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

大數(shù)據(jù)技術(shù)之_17_Storm學(xué)習(xí)

    • 一 Storm 概述
      • 1.1 離線計(jì)算是什么?
      • 1.2 流式計(jì)算是什么?
      • 1.3 Storm 是什么?
      • 1.4 Storm 與 Hadoop 的區(qū)別
      • 1.5 Storm 應(yīng)用場(chǎng)景及行業(yè)案例
        • 1.5.1 運(yùn)用場(chǎng)景
        • 1.5.2 典型案列
      • 1.6 Storm 特點(diǎn)
    • 二 Storm 基礎(chǔ)知識(shí)
      • 2.1 Storm 編程模型
        • 2.1.1 元組(Tuple)
        • 2.1.2 流(Stream)
        • 2.1.3 水龍頭(Spout)
        • 2.1.4 轉(zhuǎn)接頭(Bolt)
        • 2.1.5 拓?fù)?#xff08;Topology)
      • 2.2 Storm 核心組件
        • 2.2.1 主控節(jié)點(diǎn)與工作節(jié)點(diǎn)
        • 2.2.2 Nimbus 進(jìn)程與 Supervisor 進(jìn)程
        • 2.2.3 流分組(Stream Grouping)
        • 2.2.4 工作進(jìn)程(Worker)
        • 2.2.5 執(zhí)行器(Executor)
        • 2.2.6 任務(wù)(Task)
      • 2.3 實(shí)時(shí)流計(jì)算常見架構(gòu)圖
    • 三 Storm 集群搭建
      • 3.1 環(huán)境準(zhǔn)備
        • 3.1.1 集群規(guī)劃
        • 3.1.2 jar 包下載
        • 3.1.3 虛擬機(jī)準(zhǔn)備
        • 3.1.4 安裝 jdk
        • 3.1.5 安裝 Zookeeper
      • 3.2 Storm 集群部署
        • 3.2.1 配置集群
        • 3.2.2 Storm 日志信息查看
        • 3.2.3 Storm 命令行操作
    • 四 Storm 常用 API
      • 4.1 API 簡(jiǎn)介
        • 4.1.1 Component 組件
        • 4.1.2 Spout 水龍頭
        • 4.1.3 Bolt 轉(zhuǎn)接頭
        • 4.1.4 Spout 的 tail 特性
      • 4.2 網(wǎng)站日志處理案例
        • 4.2.1 實(shí)操環(huán)境準(zhǔn)備
        • 4.2.2 需求1:將接收到日志的會(huì)話 id 打印在控制臺(tái)
        • 4.2.3 需求2:動(dòng)態(tài)增加日志,查看控制臺(tái)打印信息(tail特性)
    • 五 Storm 分組策略和并發(fā)度
      • 5.1 讀取文件案例思考
      • 5.2 分組策略(Stream Grouping)
      • 5.3 并發(fā)度
        • 5.3.1 場(chǎng)景分析
        • 5.3.2 并發(fā)度
      • 5.4 實(shí)操案例
        • 5.4.1 實(shí)時(shí)單詞統(tǒng)計(jì)案例
        • 5.4.2 實(shí)時(shí)計(jì)算網(wǎng)站 PV 案例
        • 5.4.3 實(shí)時(shí)計(jì)算網(wǎng)站 UV 去重案例

一 Storm 概述

1.1 離線計(jì)算是什么?

??離線計(jì)算:批量獲取數(shù)據(jù)、批量傳輸數(shù)據(jù)、周期性批量計(jì)算數(shù)據(jù)、數(shù)據(jù)展示。
??代表技術(shù):Sqoop 批量導(dǎo)入數(shù)據(jù)、HDFS 批量存儲(chǔ)數(shù)據(jù)、MapReduce 批量計(jì)算數(shù)據(jù)、Hive 批量計(jì)算數(shù)據(jù)。

1.2 流式計(jì)算是什么?

??流式計(jì)算:數(shù)據(jù)實(shí)時(shí)產(chǎn)生、數(shù)據(jù)實(shí)時(shí)傳輸、數(shù)據(jù)實(shí)時(shí)計(jì)算、實(shí)時(shí)展示。
??代表技術(shù):Flume 實(shí)時(shí)獲取數(shù)據(jù)、Kafka 實(shí)時(shí)數(shù)據(jù)存儲(chǔ)、Storm(阿帕奇)/JStorm(淘寶) 實(shí)時(shí)數(shù)據(jù)計(jì)算、Redis 實(shí)時(shí)結(jié)果緩存、Mysql 持久化存儲(chǔ)。

??離線計(jì)算與實(shí)時(shí)計(jì)算最大的區(qū)別:實(shí)時(shí)收集、實(shí)時(shí)計(jì)算、實(shí)時(shí)展示。

公司整個(gè)后臺(tái)系統(tǒng)架構(gòu)圖解

1.3 Storm 是什么?

??Storm 是一個(gè)分布式計(jì)算框架,主要使用 Clojure 與 Java 語(yǔ)言編寫,最初是由Nathan Marz 帶領(lǐng) Backtype 公司團(tuán)隊(duì)創(chuàng)建,在 Backtype 公司被 Twitter 公司收購(gòu)后進(jìn)行開源。最初的版本是在 2011 年 9 月 17 日發(fā)行,版本號(hào) 0.5.0。

??2013 年9 月,Apache 基金會(huì)開始接管并孵化 Storm 項(xiàng)目。Apache Storm 是在Eclipse Public License下進(jìn)行開發(fā)的,它提供給大多數(shù)企業(yè)使用。經(jīng)過 1 年多時(shí)間,2014 年 9 月,Storm 項(xiàng)目成為 Apache 的頂級(jí)項(xiàng)目。目前,Storm 的最新版本:Storm 1.2.2 Released (04 Jun 2018)。

??Storm 是一個(gè)免費(fèi)開源的分布式實(shí)時(shí)計(jì)算系統(tǒng)。Storm 能輕松可靠地處理無界的數(shù)據(jù)流,就像 Hadoop 對(duì)數(shù)據(jù)進(jìn)行批處理。

1.4 Storm 與 Hadoop 的區(qū)別

??1)Storm 用于實(shí)時(shí)計(jì)算;Hadoop 用于離線計(jì)算。
??2)Storm 處理的數(shù)據(jù)保存在內(nèi)存中,源源不斷;Hadoop 處理的數(shù)據(jù)保存在文件系統(tǒng)中,一批一批處理。
??3)Storm 的數(shù)據(jù)通過網(wǎng)絡(luò)傳輸進(jìn)來;Hadoop 的數(shù)據(jù)保存在磁盤中。
??4)Storm 與 Hadoop 的編程模型相似。


(1)Hadoop 相關(guān)名稱
??Job:任務(wù)名稱
??JobTracker:項(xiàng)目經(jīng)理(JobTracker 對(duì)應(yīng)于 NameNode;JobTracker 是一個(gè) master 服務(wù),軟件啟動(dòng)之后 JobTracker 接收 Job,負(fù)責(zé)調(diào)度 Job 的每一個(gè)子任務(wù) task 運(yùn)行于 TaskTracker 上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的 task 就重新運(yùn)行它)
??TaskTracker:開發(fā)組長(zhǎng)(TaskTracker 對(duì)應(yīng)于 DataNode;TaskTracker 是運(yùn)行在多個(gè)節(jié)點(diǎn)上的 slaver 服務(wù)。TaskTracker 主動(dòng)與 JobTracker 通信,接收作業(yè),并負(fù)責(zé)直接執(zhí)行每一個(gè)任務(wù))
??Child:負(fù)責(zé)開發(fā)的人員
??Mapper/Reduce:開發(fā)人員中的兩種角色,一種是服務(wù)器開發(fā)、一種是客戶端開發(fā)
(2)Storm 相關(guān)名稱
??Topology(拓?fù)?:任務(wù)名稱
??Nimbus:項(xiàng)目經(jīng)理
??Supervisor:開發(fā)組長(zhǎng)
??Worker:開發(fā)人員
??Spout(水龍頭)/Bolt(轉(zhuǎn)接頭):開發(fā)人員中的兩種角色,一種是服務(wù)器開發(fā)、一種是客戶端開發(fā)

1.5 Storm 應(yīng)用場(chǎng)景及行業(yè)案例

??Storm 用來實(shí)時(shí)計(jì)算源源不斷產(chǎn)生的數(shù)據(jù),如同流水線生產(chǎn)。

1.5.1 運(yùn)用場(chǎng)景

??Storm 能用到很多場(chǎng)景中,包括:實(shí)時(shí)分析、在線機(jī)器學(xué)習(xí)、連續(xù)計(jì)算等。
??1)推薦系統(tǒng):實(shí)時(shí)推薦,根據(jù)下單或加入購(gòu)物車推薦相關(guān)商品。
??2)金融系統(tǒng):實(shí)時(shí)分析股票信息數(shù)據(jù)。
??3)預(yù)警系統(tǒng):根據(jù)實(shí)時(shí)采集數(shù)據(jù),判斷是否到了預(yù)警閾值。
??4)網(wǎng)站統(tǒng)計(jì):實(shí)時(shí)銷量、流量統(tǒng)計(jì),如淘寶雙11效果圖。

1.5.2 典型案列

1)京東-實(shí)時(shí)分析系統(tǒng):實(shí)時(shí)分析用戶的屬性,并反饋給搜索引擎
??最初,用戶屬性分析是通過每天在云上定時(shí)運(yùn)行的 MR job 來完成的。為了滿足實(shí)時(shí)性的要求,希望能夠?qū)崟r(shí)分析用戶的行為日志,將最新的用戶屬性反饋給搜索引擎,能夠?yàn)橛脩粽宫F(xiàn)最貼近其當(dāng)前需求的結(jié)果。

2)攜程-網(wǎng)站性能監(jiān)控:實(shí)時(shí)分析系統(tǒng)監(jiān)控?cái)y程網(wǎng)的網(wǎng)站性能
??利用 HTML5 提供的 performance 標(biāo)準(zhǔn)獲得可用的指標(biāo),并記錄日志。Storm 集群實(shí)時(shí)分析日志和入庫(kù)。使用 DRPC 聚合成報(bào)表,通過歷史數(shù)據(jù)對(duì)比等判斷規(guī)則,觸發(fā)預(yù)警事件。

3)淘寶雙十一:實(shí)時(shí)統(tǒng)計(jì)銷售總額

1.6 Storm 特點(diǎn)

??1)適用場(chǎng)景廣泛:Storm 可以適用實(shí)時(shí)處理消息、更新數(shù)據(jù)庫(kù)、持續(xù)計(jì)算等場(chǎng)景。
??2)可伸縮性高:Storm 的可伸縮性可以讓 Storm 每秒處理的消息量達(dá)到很高。擴(kuò)展一個(gè)實(shí)時(shí)計(jì)算任務(wù),你所需要做的就是加機(jī)器并且提高這個(gè)計(jì)算任務(wù)的并行度。Storm 使用 Zookeeper 來協(xié)調(diào)機(jī)器內(nèi)的各種配置使得 Storm 的集群可以很容易的擴(kuò)展。
??3)保證無數(shù)據(jù)丟失:Storm 保證所有的數(shù)據(jù)都被處理。
??4)異常健壯:Storm 集群非常容易管理,輪流重啟節(jié)點(diǎn)不影響應(yīng)用。
??5)容錯(cuò)性好:在消息處理過程中出現(xiàn)異常,Storm 會(huì)進(jìn)行重試。

二 Storm 基礎(chǔ)知識(shí)

2.1 Storm 編程模型

2.1.1 元組(Tuple)

??元組(Tuple),是消息傳遞的基本單元,是一個(gè)命名的值列表,元組中的字段可以是任何類型的對(duì)象。Storm 使用元組作為其數(shù)據(jù)模型,元組支持所有的基本類型、字符串和字節(jié)數(shù)組作為字段值,只要實(shí)現(xiàn)類型的序列化接口就可以使用該類型的對(duì)象。元組本來應(yīng)該是一個(gè) key-value 的 Map,但是由于各個(gè)組件間傳遞的元組的字段名稱已經(jīng)事先定義好,所以只要按序把元組填入各個(gè) value 即可,所以元組是一個(gè) value 的 List。

2.1.2 流(Stream)

??流是 Storm 的核心抽象,是一個(gè)無界的元組系列。源源不斷傳遞的元組就組成了流,在分布式環(huán)境中并行地進(jìn)行創(chuàng)建和處理。

2.1.3 水龍頭(Spout)

??Spout 是拓?fù)涞牧鞯膩碓?#xff0c;是一個(gè)拓?fù)渲挟a(chǎn)生源數(shù)據(jù)流的組件。通常情況下,Spout 會(huì)從外部數(shù)據(jù)源中讀取數(shù)據(jù),然后轉(zhuǎn)換為拓?fù)鋬?nèi)部的源數(shù)據(jù)。
??Spout 可以是可靠的,也可以是不可靠的。如果 Storm 處理元組失敗,可靠的 Spout 能夠重新發(fā)射,而不可靠的 Spout 就盡快忘記發(fā)出的元組。
??Spout 可以發(fā)出超過一個(gè)流。
??Spout 的主要方法是 nextTuple()。NextTuple() 會(huì)發(fā)出一個(gè)新的 Tuple 到拓?fù)?#xff0c;如果沒有新的元組發(fā)出,則簡(jiǎn)單返回。
??Spout 的其他方法是 ack() 和 fail()。當(dāng) Storm 檢測(cè)到一個(gè)元組從 Spout 發(fā)出時(shí),ack() 和 fail() 會(huì)被調(diào)用,要么成功完成通過拓?fù)?#xff0c;要么未能完成。ack() 和 fail() 僅被可靠的 Spout 調(diào)用。
??IRichSpout 是 Spout 必須實(shí)現(xiàn)的接口。

2.1.4 轉(zhuǎn)接頭(Bolt)

??在拓?fù)渲兴刑幚矶荚?Bolt 中完成,Bolt 是流的處理節(jié)點(diǎn),從一個(gè)拓?fù)浣邮諗?shù)據(jù),然后執(zhí)行進(jìn)行處理的組件。Bolt 可以完成過濾、業(yè)務(wù)處理、連接運(yùn)算、連接與訪問數(shù)據(jù)庫(kù)等任何操作。
??Bolt 是一個(gè)被動(dòng)的角色,其接口中有一個(gè) execute() 方法,在接收到消息后會(huì)調(diào)用此方法,用戶可以在其中執(zhí)行自己希望的操作。
??Bolt 可以完成簡(jiǎn)單的流的轉(zhuǎn)換,而完成復(fù)雜的流的轉(zhuǎn)換通常需要多個(gè)步驟,因此需要多個(gè) Bolt。

2.1.5 拓?fù)?#xff08;Topology)

??拓?fù)?#xff08;Topology)是 Storm 中運(yùn)行的一個(gè)實(shí)時(shí)應(yīng)用程序,因?yàn)楦鱾€(gè)組件間的消息流動(dòng)而形成邏輯上的拓?fù)浣Y(jié)構(gòu)。
??把實(shí)時(shí)應(yīng)用程序的運(yùn)行邏輯打成 jar 包后提交到 Storm 的拓?fù)?#xff08;Topology)。Storm 的拓?fù)漕愃朴?MapReduce 的作業(yè)(Job)。其主要的區(qū)別是,MapReduce 的作業(yè)最終會(huì)完成,而一個(gè)拓?fù)溆肋h(yuǎn)都在運(yùn)行直到它被殺死。一個(gè)拓?fù)涫且粋€(gè)圖的 Spout 和 Bolt 的連接流分組。

2.2 Storm 核心組件

??Nimbus 是整個(gè)集群的控管核心,負(fù)責(zé) Topology 的提交、運(yùn)行狀態(tài)監(jiān)控、任務(wù)重新分配等工作。
??Zookeeper 就是一個(gè)管理者,監(jiān)控者。
??總體描述:Nimbus下命令(分配任務(wù)),Zookeeper 監(jiān)督執(zhí)行(心跳監(jiān)控,Worker、Supurvisor的心跳都?xì)w它管),Supervisor領(lǐng)旨(下載代碼),招募人馬(創(chuàng)建Worker和線程等),Worker、Executor就給我干活!Task 就是具體要干的活。

2.2.1 主控節(jié)點(diǎn)與工作節(jié)點(diǎn)

??Storm 集群中有兩類節(jié)點(diǎn):主控節(jié)點(diǎn)(Master Node)和工作節(jié)點(diǎn)(Worker Node)。其中,主控節(jié)點(diǎn)只有一個(gè),而工作節(jié)點(diǎn)可以有多個(gè)。

2.2.2 Nimbus 進(jìn)程與 Supervisor 進(jìn)程

??主控節(jié)點(diǎn)運(yùn)行一個(gè)稱為 Nimbus 的守護(hù)進(jìn)程類似于 Hadoop 的 JobTracker。Nimbus 負(fù)責(zé)在集群中分發(fā)代碼、對(duì)節(jié)點(diǎn)分配任務(wù)、并監(jiān)視主機(jī)故障。
??每個(gè)工作節(jié)點(diǎn)運(yùn)行一個(gè)稱為 Supervisor 的守護(hù)進(jìn)程。Supervisor 監(jiān)聽其主機(jī)上已經(jīng)分配的主機(jī)的作業(yè)、啟動(dòng)和停止 Nimbus 已經(jīng)分配的工作進(jìn)程。

2.2.3 流分組(Stream Grouping)

??流分組,是拓?fù)涠x中的一部分,為每個(gè) Bolt 指定應(yīng)該接收哪個(gè)流作為輸入。流分組定義了流/元組如何在 Bolt 的任務(wù)之間進(jìn)行分發(fā)。
??Storm 內(nèi)置了 8 種流分組方式。

2.2.4 工作進(jìn)程(Worker)

??Worker 是 Spout/Bolt 中運(yùn)行具體處理邏輯的進(jìn)程。一個(gè) Worker 就是一個(gè)進(jìn)程,進(jìn)程里面包含一個(gè)或多個(gè)線程。

2.2.5 執(zhí)行器(Executor)

??一個(gè)線程就是一個(gè) Executor,一個(gè)線程會(huì)處理一個(gè)或多個(gè)任務(wù)。

2.2.6 任務(wù)(Task)

??一個(gè)任務(wù)就是一個(gè) Task。

2.3 實(shí)時(shí)流計(jì)算常見架構(gòu)圖


??1)Flume 獲取數(shù)據(jù)。
??2)Kafka 臨時(shí)保存數(shù)據(jù)。
??3)Strom 計(jì)算數(shù)據(jù)。
??4)Redis 是個(gè)內(nèi)存數(shù)據(jù)庫(kù),用來保存數(shù)據(jù)。

三 Storm 集群搭建

3.1 環(huán)境準(zhǔn)備

3.1.1 集群規(guī)劃

hadoop102 hadoop103 hadoop104 zk zk zk storm storm storm

3.1.2 jar 包下載

(1)官方網(wǎng)址:http://storm.apache.org/
注意:本次學(xué)習(xí)演示,本博主使用版本 Storm 1.1.1 Released (1 Aug 2018)

(2)安裝集群步驟:
??官方文檔地址:http://storm.apache.org/releases/1.1.1/Setting-up-a-Storm-cluster.html

3.1.3 虛擬機(jī)準(zhǔn)備

1)準(zhǔn)備3臺(tái)虛擬機(jī)
2)配置ip地址、配置主機(jī)名稱、3臺(tái)主機(jī)分別關(guān)閉防火墻
參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/10335265.html
參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/10349717.html

3.1.4 安裝 jdk

參考鏈接地址:https://www.cnblogs.com/chenmingjun/p/9931593.html

3.1.5 安裝 Zookeeper

0)集群規(guī)劃
在 hadoop102、hadoop103 和 hadoop104 三個(gè)節(jié)點(diǎn)上部署 Zookeeper。
1)解壓安裝
(1)解壓 zookeeper 安裝包到 /opt/module/ 目錄下

[atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

(2)在 /opt/module/zookeeper-3.4.10/ 這個(gè)目錄下創(chuàng)建目錄 zkData

mkdir -p zkData

(3)重命名 /opt/module/zookeeper-3.4.10/conf 這個(gè)目錄下的 zoo_sample.cfg 為 zoo.cfg

mv zoo_sample.cfg zoo.cfg

2)配置 zoo.cfg 文件
(1)具體配置

dataDir=/opt/module/zookeeper-3.4.10/zkData增加如下配置 #######################cluster########################## server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888

(2)配置參數(shù)解讀

server.A=B:C:D。 A 是一個(gè)數(shù)字,表示這個(gè)是第幾號(hào)服務(wù)器; B 是這個(gè)服務(wù)器的ip地址; C 是這個(gè)服務(wù)器與集群中的 Leader 服務(wù)器交換信息的端口; D 是萬(wàn)一集群中的 Leader 服務(wù)器掛了,需要一個(gè)端口來重新進(jìn)行選舉,選出一個(gè)新的 Leader,而這個(gè)端口就是用來執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。集群模式下配置一個(gè)文件 myid,這個(gè)文件在 zkData 目錄下,這個(gè)文件里面有一個(gè)數(shù)據(jù)就是 A 的值,Zookeeper 啟動(dòng)時(shí)讀取此文件,拿到里面的數(shù)據(jù)與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個(gè) server。

3)集群操作
(1)在 /opt/module/zookeeper-3.4.10/zkData 目錄下創(chuàng)建一個(gè) myid 的文件

touch myid

添加 myid 文件,注意一定要在 linux 里面創(chuàng)建,在 notepad++ 里面很可能亂碼。
(2)編輯 myid 文件

vim myid

在文件中添加與 server 對(duì)應(yīng)的編號(hào):如 2
(3)拷貝配置好的 zookeeper 到其他機(jī)器上 或者執(zhí)行配置分發(fā)的腳本

scp -r /opt/module/zookeeper-3.4.10/ root@hadoop103:/opt/module/ scp -r /opt/module/zookeeper-3.4.10/ root@hadoop104:/opt/module/ 并分別修改 myid 文件中內(nèi)容為 3、4

(4)分別啟動(dòng) zookeeper 或者 使用群起腳本啟動(dòng)

[root@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh start [root@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh start [root@hadoop104 zookeeper-3.4.10]# bin/zkServer.sh start

(5)查看狀態(tài) 或者 使用腳本查看狀態(tài)

[root@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower [root@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: leader [root@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower

3.2 Storm 集群部署

3.2.1 配置集群

1)拷貝 jar 包到 hadoop102 的 /opt/software/ 目錄下
2)解壓 jar 包到 /opt/module 目錄下

[atguigu@hadoop102 software]$ tar -zxvf apache-storm-1.1.1.tar.gz -C /opt/module/

3)修改解壓后的 apache-storm-1.1.1.tar.gz 文件名稱為 storm,為了方便

[atguigu@hadoop102 module]$ mv apache-storm-1.1.1/ storm

4)在 /opt/module/storm/ 目錄下創(chuàng)建 data 文件夾

[atguigu@hadoop102 storm]$ mkdir data

5)修改配置文件

[atguigu@hadoop102 conf]$ pwd /opt/module/storm/conf [atguigu@hadoop102 conf]$ vim storm.yaml

修改內(nèi)容如下:

# 設(shè)置 Zookeeper 的主機(jī)名稱 storm.zookeeper.servers:- "hadoop102"- "hadoop103"- "hadoop104"# 設(shè)置主節(jié)點(diǎn)的主機(jī)名稱 nimbus.seeds: ["hadoop102"]# 設(shè)置 Storm 的數(shù)據(jù)存儲(chǔ)路徑 storm.local.dir: "/opt/module/storm/data"# 設(shè)置 Worker 的端口號(hào) supervisor.slots.ports:- 6700- 6701- 6702- 6703

6)以 root 用戶,配置環(huán)境變量

[root@hadoop102 storm]# vim /etc/profile#STORM_HOME export STORM_HOME=/opt/module/storm export PATH=$PATH:$STORM_HOME/bin

使配置文件生效

[root@hadoop102 storm]# source /etc/profile

7)分發(fā)配置好的 storm 安裝包

[atguigu@hadoop102 storm]$ xsync storm/

8)啟動(dòng) Storm 集群
(1)后臺(tái)啟動(dòng) Nimbus

[atguigu@hadoop102 storm]$ bin/storm nimbus & [atguigu@hadoop103 storm]$ bin/storm nimbus & [atguigu@hadoop104 storm]$ bin/storm nimbus &

(2)后臺(tái)啟動(dòng) Supervisor

[atguigu@hadoop102 storm]$ bin/storm supervisor & [atguigu@hadoop102 storm]$ bin/storm supervisor & [atguigu@hadoop102 storm]$ bin/storm supervisor &

拓展:fg 命令 表示將放在后臺(tái)的進(jìn)程放到前臺(tái)。
(3)啟動(dòng) Storm UI

[atguigu@hadoop102 storm]$ bin/storm ui

9)通過瀏覽器查看集群狀態(tài)
地址:http://hadoop102:8080/index.html

3.2.2 Storm 日志信息查看

1)查看 Nimbus 的日志信息
在 Nimbus 的服務(wù)器上

cd /opt/module/storm/logs tail -100f /opt/module/storm/logs/nimbus.log

2)查看 ui 運(yùn)行日志信息
在 ui 的服務(wù)器上,一般和 Nimbus 在一個(gè)服務(wù)器上

cd /opt/module/storm/logs tail -100f /opt/module/storm/logs/ui.log

3)查看 Supervisor 運(yùn)行日志信息
在 Supervisor 服務(wù) 上

cd /opt/module/storm/logs tail -100f /opt/module/storm/logs/supervisor.log

4)查看 Supervisor 上 Worker 運(yùn)行日志信息
在 supervisor 服務(wù)上

cd /opt/module/storm/logs tail -100f /opt/module/storm/logs/worker-6702.log

5)logviewer,可以在 web 頁(yè)面點(diǎn)擊相應(yīng)的端口號(hào)即可查看日志
分別在 Supervisor 節(jié)點(diǎn)上執(zhí)行:

[atguigu@hadoop102 storm]$ bin/storm logviewer & [atguigu@hadoop103 storm]$ bin/storm logviewer & [atguigu@hadoop104 storm]$ bin/storm logviewer &

瀏覽器截圖如下

3.2.3 Storm 命令行操作

1)Nimbus:啟動(dòng) Nimbus 守護(hù)進(jìn)程。

storm nimbus

2)Supervisor:啟動(dòng) Supervisor 守護(hù)進(jìn)程。

storm supervisor

3)ui:啟動(dòng)UI守護(hù)進(jìn)程。

storm ui

4)list:列出正在運(yùn)行的拓?fù)浼捌錉顟B(tài)。

storm list

5)logviewer:Logviewer 提供一個(gè) web 接口查看 Storm 日志文件。

storm logviewer

6)jar:

storm jar [jar路徑] [拓?fù)浒?拓?fù)漕惷鸧 [拓?fù)涿Q]

7)kill:殺死名為 topology-name 的拓?fù)洹?/p> storm kill topology-name [-w wait-time-secs] -w:等待多久后殺死拓?fù)?

8)active:激活指定的拓?fù)?Spout。

storm activate topology-name

9)deactivate:禁用指定的拓?fù)?Spout。

storm deactivate topology-name

10)help:打印一條幫助消息或者可用命令的列表。

storm help storm help <command>

四 Storm 常用 API

4.1 API 簡(jiǎn)介

4.1.1 Component 組件

1)基本接口
??(1)IComponent 接口
??(2)ISpout 接口
??(3)IRichSpout 接口
??(4)IStateSpout 接口
??(5)IRichStateSpout 接口
??(6)IBolt 接口
??(7)IRichBolt 接口
??(8)IBasicBolt 接口
2)基本抽象類
??(1)BaseComponent 抽象類
??(2)BaseRichSpout 抽象類
??(3)BaseRichBolt 抽象類
??(4)BaseTransactionalBolt 抽象類
??(5)BaseBasicBolt 抽象類

4.1.2 Spout 水龍頭

Spout 的最頂層抽象是 ISpout 接口。

(1)open()
??是初始化方法。
(2)close()
??在該 Spout 關(guān)閉前執(zhí)行,但是并不能得到保證其一定被執(zhí)行,kill -9 時(shí)不執(zhí)行,Storm kill {topoName} 時(shí)執(zhí)行。
(3)activate()
??當(dāng) Spout 已經(jīng)從失效模式中激活時(shí)被調(diào)用。該 Spout 的 nextTuple() 方法很快就會(huì)被調(diào)用。
(4)deactivate ()
??當(dāng) Spout 已經(jīng)失效時(shí)被調(diào)用。在 Spout 失效期間,nextTuple 不會(huì)被調(diào)用。Spout 將來可能會(huì)也可能不會(huì)被重新激活。
(5)nextTuple()
??當(dāng)調(diào)用 nextTuple() 方法時(shí),Storm 要求 Spout 發(fā)射元組到輸出收集器(OutputCollecctor)。
??nextTuple() 方法應(yīng)該是非阻塞的,所以,如果 Spout 沒有元組可以發(fā)射,該方法應(yīng)該返回。
??nextTuple()、ack() 和 fail() 方法都在 Spout 任務(wù)的單一線程內(nèi)緊密循環(huán)被調(diào)用。
??當(dāng)沒有元組可以發(fā)射時(shí),可以讓 nextTuple 去 sleep 很短的時(shí)間,例如1毫秒,這樣就不會(huì)浪費(fèi)太多的 CPU 資源。
(6)ack()
??成功處理 Tuple 回調(diào)方法。
(7)fail()
??處理失敗 Tuple 回調(diào)方法。
??原則:通常情況下(Shell 和事務(wù)型的除外),實(shí)現(xiàn)一個(gè) Spout,可以直接實(shí)現(xiàn)接口 IRichSpout,如果不想寫多余的代碼,可以直接繼承 BaseRichSpout。

4.1.3 Bolt 轉(zhuǎn)接頭

Bolt 的最頂層抽象是 IBolt 接口。

(1)prepare()
??prepare() 方法在集群的工作進(jìn)程內(nèi)被初始化時(shí)被調(diào)用,提供了 Bolt 執(zhí)行所需要的環(huán)境。
(2)execute()
??接受一個(gè) Tuple 進(jìn)行處理,也可 emit 數(shù)據(jù)到下一級(jí)組件。
(3)cleanup()
??cleanup方法當(dāng)一個(gè) IBolt 即將關(guān)閉時(shí)被調(diào)用。不能保證 cleanup() 方法一定會(huì)被調(diào)用,因?yàn)?Supervisor 可以對(duì)集群的工作進(jìn)程使用 kill -9 命令強(qiáng)制殺死進(jìn)程命令。
??如果在本地模式下運(yùn)行 Storm,當(dāng)拓?fù)浔粴⑺赖臅r(shí)候,可以保證 cleanup() 方法一定會(huì)被調(diào)用。
??實(shí)現(xiàn)一個(gè) Bolt,可以實(shí)現(xiàn) IRichBolt 接口或繼承 BaseRichBolt,如果不想自己處理結(jié)果反饋,可以實(shí)現(xiàn) IBasicBolt 接口或繼承 BaseBasicBolt,它實(shí)際上相當(dāng)于自動(dòng)做了 prepare 方法和 collector.emit.ack(inputTuple)。

4.1.4 Spout 的 tail 特性

Storm 可以實(shí)時(shí)監(jiān)測(cè)文件數(shù)據(jù),當(dāng)文件數(shù)據(jù)變化時(shí),Storm 自動(dòng)讀取。

4.2 網(wǎng)站日志處理案例

4.2.1 實(shí)操環(huán)境準(zhǔn)備

??1)打開 eclipse,創(chuàng)建一個(gè) java 工程
??2)在工程目錄中創(chuàng)建 lib 文件夾
??3)解壓 apache-storm-1.1.1,并把解壓后 lib 包下的文件復(fù)制到 java 工程的 lib 文件夾中,然后執(zhí)行 build path。

4.2.2 需求1:將接收到日志的會(huì)話 id 打印在控制臺(tái)

1)需求:
??(1)模擬訪問網(wǎng)站的日志信息,包括:網(wǎng)站名稱、會(huì)話 id、訪問網(wǎng)站時(shí)間等。
??(2)將接收到日志的會(huì)話 id 打印到控制臺(tái)。
2)分析:
??(1)創(chuàng)建網(wǎng)站訪問日志工具類。
??(2)在 spout 中讀取日志文件,并一行一行發(fā)射出去。
??(3)在 bolt 中將獲取到的一行一行數(shù)據(jù)的會(huì)話 id 獲取到,并打印到控制臺(tái)。
??(4)main 方法負(fù)責(zé)拼接 spout 和 bolt 的拓?fù)洹?br />
3)案例實(shí)操:
(1)創(chuàng)建網(wǎng)站訪問日志
示例代碼如下:

package com.atgui.storm.weblog;import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random;public class GenerateData {public static void main(String[] args) {// 1、創(chuàng)建文件路徑File logFile = new File("d:/temp/storm/website.log");// 2、準(zhǔn)備數(shù)據(jù)// 2.1 網(wǎng)站名稱String[] hosts = { "www.atguigu.com" };// 2.2 會(huì)話idString[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34","BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };// 2.3 訪問網(wǎng)站時(shí)間String[] time = { "2017-08-07 08:40:50", "2017-08-07 08:40:51", "2017-08-07 08:40:52", "2017-08-07 08:40:53","2017-08-07 09:40:49", "2017-08-07 10:40:49", "2017-08-07 11:40:49", "2017-08-07 12:40:49" };// 3、拼接數(shù)據(jù)StringBuffer sb = new StringBuffer();Random random = new Random();for (int i = 0; i < 30; i++) {sb.append(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)] + "\n");}// 判斷l(xiāng)og日志是否存在,不存在要?jiǎng)?chuàng)建if (!logFile.exists()) {try {logFile.createNewFile();} catch (IOException e) {System.out.println("Create logFile fail !");}}byte[] b = (sb.toString()).getBytes();// 4、 寫數(shù)據(jù)到文件FileOutputStream fileOutputStream = null;try {fileOutputStream = new FileOutputStream(logFile);fileOutputStream.write(b);System.out.println("Generate data over !");} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {// 5、關(guān)閉資源try {fileOutputStream.close();} catch (IOException e) {e.printStackTrace();}}} }

(2)創(chuàng)建 spout
示例代碼如下:

package com.atgui.storm.weblog;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class WebLogSpout implements IRichSpout {private static final long serialVersionUID = 1L;private BufferedReader bufferedReader = null;private SpoutOutputCollector collector = null;private String str = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;// 打開輸入的文件try {this.bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("d:/temp/storm/website.log"),"UTF-8"));} catch (FileNotFoundException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}@Overridepublic void nextTuple() {// 循環(huán)調(diào)用的方法try {while ((str = bufferedReader.readLine()) != null) {// 發(fā)射數(shù)據(jù)collector.emit(new Values(str));Thread.sleep(500);}} catch (IOException | InterruptedException e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("log"));}@Overridepublic void close() {}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

(3)創(chuàng)建 bolt
示例代碼如下:

package com.atgui.storm.weblog;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple;public class WebLogBolt implements IRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector = null;private int num = 0;private String valueString = null;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {try {// 1、獲取傳遞過來的數(shù)據(jù)valueString = input.getStringByField("log");// 2、如果輸入的數(shù)據(jù)不為空,行數(shù)++if (valueString != null) {num++;System.err.println(Thread.currentThread().getName() + " lines:" + num + " session_id:" + valueString.split("\t")[1]);}// 3、應(yīng)答Spout接收成功collector.ack(input);Thread.sleep(1000);} catch (Exception e) {// 4、應(yīng)答Spout接收失敗collector.fail(input);e.printStackTrace();}}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段類型declarer.declare(new Fields(""));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

(4)創(chuàng)建main
示例代碼如下:

package com.atgui.storm.weblog;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;public class WebLogMain {public static void main(String[] args) {// 1、創(chuàng)建拓?fù)鋵?duì)象TopologyBuilder builder = new TopologyBuilder();// 2、設(shè)置 Spout 和 Boltbuilder.setSpout("weblogspout", new WebLogSpout(), 1);builder.setBolt("weblogbolt", new WebLogBolt(), 1).shuffleGrouping("weblogspout");// 3、配置 Worker 開啟個(gè)數(shù)Config conf = new Config();conf.setNumWorkers(4);if (args.length > 0) {try {// 4、分布式提交StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 5、本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("weblogtopology", conf, builder.createTopology());}} }

輸出結(jié)果如下:

Thread-46-weblogbolt-executor[5 5] lines:1 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:2 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:3 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:4 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:5 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:6 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:7 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:8 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:9 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:10 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:11 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:12 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:13 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:14 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:15 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:16 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:17 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:18 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:19 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:20 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:21 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:22 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:23 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-46-weblogbolt-executor[5 5] lines:24 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:25 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-46-weblogbolt-executor[5 5] lines:26 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123 Thread-46-weblogbolt-executor[5 5] lines:27 session_id:CYYH6Y2345GHI899OFG4V9U567 Thread-46-weblogbolt-executor[5 5] lines:28 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:29 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-46-weblogbolt-executor[5 5] lines:30 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123

4.2.3 需求2:動(dòng)態(tài)增加日志,查看控制臺(tái)打印信息(tail特性)

1)在需求1基礎(chǔ)上,運(yùn)行程序。
2)打開 website.log 日志文件,增加日志調(diào)試并保存。
3)觀察控制臺(tái)打印的信息。
結(jié)論:Storm 可以動(dòng)態(tài)實(shí)時(shí)監(jiān)測(cè)文件的增加信息,并把信息讀取到再處理。

五 Storm 分組策略和并發(fā)度

5.1 讀取文件案例思考

1)spout 數(shù)據(jù)源:數(shù)據(jù)庫(kù)、文件、MQ(比如:Kafka)
2)數(shù)據(jù)源是數(shù)據(jù)庫(kù):只適合讀取數(shù)據(jù)庫(kù)的配置文件
3)數(shù)據(jù)源是文件:只適合測(cè)試、講課用(因?yàn)榧菏欠植际郊?#xff09;
4)企業(yè)產(chǎn)生的 log 文件處理步驟:
??(1)讀出內(nèi)容寫 入MQ
??(2)Storm 再處理

5.2 分組策略(Stream Grouping)

stream grouping 用來定義一個(gè) stream 應(yīng)該如何分配給 Bolts 上面的多個(gè) Executors(多線程、多并發(fā))。

Storm 里面有 7 種類型的 stream grouping,詳情如下:
1)Shuffle Grouping: 隨機(jī)分組,輪詢,平均分配。隨機(jī)派發(fā) stream 里面的 tuple,保證每個(gè) bolt 接收到的 tuple 數(shù)目大致相同。

2)Fields Grouping:按字段分組,比如按 userid 來分組,具有同樣 userid 的 tuple 會(huì)被分到相同的 bolts 里的一個(gè) task,而不同的 userid 則會(huì)被分配到不同的 bolts 里的 task。

3)All Grouping:廣播發(fā)送,對(duì)于每一個(gè) tuple,所有的 bolts 都會(huì)收到。

4)Global Grouping:全局分組,這個(gè) tuple 被分配到 storm 中的一個(gè) bolt 的其中一個(gè) task。再具體一點(diǎn)就是分配給 id 值最低的那個(gè) task。

5)None Grouping:不分組,這個(gè)分組的意思是說 stream 不關(guān)心到底誰(shuí)會(huì)收到它的 tuple。目前這種分組和 Shuffle Grouping 是一樣的效果。在多線程情況下不平均分配。

6)Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個(gè) task 處理這個(gè)消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息 tuple 必須使用 emitDirect 方法來發(fā)射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的 task 的 id (OutputCollector.emit 方法也會(huì)返回 task 的 id)。

7)Local or Shuffle Grouping:如果目標(biāo) bolt 有一個(gè)或者多個(gè) task 在同一個(gè)工作進(jìn)程中,tuple 將會(huì)被隨機(jī)發(fā)送給這些 tasks。否則,和普通的 Shuffle Grouping 行為一致。

8)測(cè)試
??(1)spout 并發(fā)度修改為 2,bolt 并發(fā)度修改為 1,Shuffle Grouping 模式

// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 2); builder.setBolt("weblogbolt", new WebLogBolt(), 1).shuffleGrouping("weblogspout");spout 開兩個(gè)線程會(huì)對(duì)數(shù)據(jù)讀取兩份,打印出來就是 2 份。如果數(shù)據(jù)源是消息隊(duì)列,就不會(huì)出來讀取兩份的數(shù)據(jù)(統(tǒng)一消費(fèi)者組,只能有一個(gè)消費(fèi)者)。Thread-53-weblogbolt-executor[5 5] lines:1 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-53-weblogbolt-executor[5 5] lines:2 session_id:XXYH6YCGFJYERTT834R52FDXV9U34 Thread-53-weblogbolt-executor[5 5] lines:3 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7 Thread-53-weblogbolt-executor[5 5] lines:4 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7

??(2)spout 并發(fā)度修改為 1,bolt 并發(fā)度修改為 2,None Grouping 模式

// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).noneGrouping("weblogspout");每個(gè) bolt 接收到的數(shù)據(jù)不同。

??(3)spout 并發(fā)度修改為 1,bolt 并發(fā)度修改為 2,Fields Grouping 模式

// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).fieldsGrouping("weblogspout", new Fields("log"));基于 web 案例效果不明顯,后續(xù)案例效果比較明顯。

??(4)spout 并發(fā)度修改為 1,bolt 并發(fā)度修改為 2,All Grouping 模式

// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).allGrouping("weblogspout");每一個(gè) bolt 獲取到的數(shù)據(jù)都是一樣的。

??(5)spout 并發(fā)度修改為 1,bolt 并發(fā)度修改為 2,Global Grouping 模式

// 2、設(shè)置 Spout 和 Bolt builder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt", new WebLogBolt(), 2).globalGrouping("weblogspout");task 的 id 最低的 bolt 獲取到了所有數(shù)據(jù)。

5.3 并發(fā)度

5.3.1 場(chǎng)景分析

1)單線程下:加減乘除、全局匯總
2)多線程下:局部加減乘除、持久化DB等
??(1)思考:如何計(jì)算:word 總數(shù)和 word 個(gè)數(shù)?并且在高并發(fā)下完成
??前者是統(tǒng)計(jì)總行數(shù),后者是去重 word 個(gè)數(shù)。
??類似企業(yè)場(chǎng)景:計(jì)算網(wǎng)站 PV 和 UV
??(2)網(wǎng)站最常用的兩個(gè)指標(biāo):
??PV(page views):count(session_id) 即頁(yè)面瀏覽量。
??UV(user views):count(distinct session_id) 即獨(dú)立訪客數(shù)。
??a)用 ip 地址分析
??指訪問某個(gè)站點(diǎn)或點(diǎn)擊某個(gè)網(wǎng)頁(yè)的不同 ip 地址的人數(shù)。在同一天內(nèi),UV 只記錄第一次進(jìn)入網(wǎng)站的具有獨(dú)立 IP 的訪問者,在同一天內(nèi)再次訪問該網(wǎng)站則不計(jì)數(shù)。
??b)用 Cookie 分析 UV 值
??當(dāng)客戶端第一次訪問某個(gè)網(wǎng)站服務(wù)器的時(shí)候,網(wǎng)站服務(wù)器會(huì)給這個(gè)客戶端的電腦發(fā)出一個(gè) Cookie,通常放在這個(gè)客戶端電腦的 C 盤當(dāng)中。在這個(gè) Cookie 中會(huì)分配一個(gè)獨(dú)一無二的編號(hào),這其中會(huì)記錄一些訪問服務(wù)器的信息,如訪問時(shí)間、訪問了哪些頁(yè)面等等。當(dāng)你下次再訪問這個(gè)服務(wù)器的時(shí)候,服務(wù)器就可以直接從你的電腦中找到上一次放進(jìn)去的 Cookie 文件,并且對(duì)其進(jìn)行一些更新,但那個(gè)獨(dú)一無二的編號(hào)是不會(huì)變的。
??實(shí)時(shí)處理的業(yè)務(wù)場(chǎng)景主要包括:匯總型(如網(wǎng)站 PV、銷售額、訂單數(shù))、去重型(如網(wǎng)站 UV、顧客數(shù)、銷售商品數(shù))

5.3.2 并發(fā)度

??并發(fā)度:用戶指定一個(gè)任務(wù),可以被多個(gè)線程執(zhí)行,并發(fā)度的數(shù)量等于線程 executor 的數(shù)量。
??task 就是具體的處理邏輯對(duì)象,一個(gè) executor 線程可以執(zhí)行一個(gè)或多個(gè) tasks,但一般默認(rèn)每個(gè) executor 只執(zhí)行一個(gè) task,所以我們往往認(rèn)為 task 就是執(zhí)行線程,其實(shí)不是。
??task 代表最大并發(fā)度,一個(gè) component 的 task 數(shù)是不會(huì)改變的,但是一個(gè) componet 的 executer 數(shù)目是會(huì)發(fā)生變化的(storm rebalance 命令),task 數(shù) >= executor 數(shù),executor 數(shù)代表實(shí)際并發(fā)數(shù)。

5.4 實(shí)操案例

5.4.1 實(shí)時(shí)單詞統(tǒng)計(jì)案例

1)需求
??實(shí)時(shí)統(tǒng)計(jì)發(fā)射到 Storm 框架中單詞的總數(shù)。

2)分析
??設(shè)計(jì)一個(gè) topology,來實(shí)現(xiàn)對(duì)文檔里面的單詞出現(xiàn)的頻率進(jìn)行統(tǒng)計(jì)。
整個(gè) topology 分為三個(gè)部分:
??(1)WordCountSpout:數(shù)據(jù)源,在已知的英文句子中,隨機(jī)發(fā)送一條句子出去。
??(2)WordCountSplitBolt:負(fù)責(zé)將單行文本記錄(句子)切分成單詞。
??(3)WordCountBolt:負(fù)責(zé)對(duì)單詞的頻率進(jìn)行累加。

3)實(shí)操
??(1)創(chuàng)建 spout

package com.atgui.storm.wordcount;import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class WordCountSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}@Overridepublic void nextTuple() {// 發(fā)射一條語(yǔ)句collector.emit(new Values("i am ximen love jinlian"));try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("love"));}}

??(2)創(chuàng)建切割單詞的 bolt

package com.atgui.storm.wordcount;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class WordCountSplitBolt extends BaseRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector = null;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的一行數(shù)據(jù)// String line = input.getStringByField("love");String line = input.getString(0);// 2、截取數(shù)據(jù)String[] arrWords = line.split(" ");// 3、發(fā)射數(shù)據(jù)(發(fā)送給下一級(jí) Bolt)for (String word : arrWords) {collector.emit(new Values(word, 1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("word", "num"));}}

??(3)創(chuàng)建匯總單詞個(gè)數(shù)的 bolt

package com.atgui.storm.wordcount;import java.util.HashMap; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;public class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 1L;// 定義一個(gè) HashMap 用于存放統(tǒng)計(jì)后的結(jié)果,其中單詞為 key,單詞個(gè)數(shù)為 valueprivate Map<String, Integer> map = new HashMap<>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數(shù)據(jù)String word = input.getString(0); // 第一個(gè)數(shù)據(jù)Integer num = input.getInteger(1); // 第二個(gè)數(shù)據(jù)// 2、統(tǒng)計(jì)單詞個(gè)數(shù)if (map.containsKey(word)) {Integer count = map.get(word);count = count + num;map.put(word, count);} else {map.put(word, num);}// 3、控制臺(tái)打印(以紅色的字體 err 方式)System.err.println(Thread.currentThread().getId() + " word:" + word + " num:" + map.get(word));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}}

??(4)創(chuàng)建程序的拓?fù)?main

package com.atgui.storm.wordcount;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class WordCountMain {public static void main(String[] args) {// 1、創(chuàng)建拓?fù)鋵?duì)象TopologyBuilder builder = new TopologyBuilder();// 2、設(shè)置 Spout 和 Boltbuilder.setSpout("WordCountSpout", new WordCountSpout(), 1);builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).shuffleGrouping("WordCountSpout");builder.setBolt("WordCountBolt", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word"));// 3、配置 Worker 開啟的個(gè)數(shù)Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {// 4、分布式提交StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 5、本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("WordCountTopology", conf, builder.createTopology());}} }

??(5)測(cè)試
發(fā)現(xiàn) 159 線程只處理單詞 am 和單詞 love,163 進(jìn)程處理單詞 i、ximen、jianlian。這就是分組的好處。

163 word:i num:1 163 word:ximen num:1 163 word:jinlian num:1 159 word:am num:1 159 word:love num:1 163 word:i num:2 163 word:ximen num:2 163 word:jinlian num:2 159 word:am num:2 159 word:love num:2 163 word:i num:3 163 word:ximen num:3 163 word:jinlian num:3 159 word:am num:3 159 word:love num:3

5.4.2 實(shí)時(shí)計(jì)算網(wǎng)站 PV 案例

0)基礎(chǔ)知識(shí)準(zhǔn)備
1)需求
??統(tǒng)計(jì)網(wǎng)站 pv(頁(yè)面瀏覽量)。
2)需求分析
方案一:
??定義 static long pv,Synchronized 控制累計(jì)操作。(不可行)
??原因:Synchronized 和 Lock 在單 JVM 下有效,但在多 JVM 下無效。
方案二:
??ShuffleGrouping 下,pv * Executer 并發(fā)數(shù)
??驅(qū)動(dòng)函數(shù)中配置如下:

builder.setSpout("PVSpout", new PVSpout(), 1); builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout"); 在 PVBolt1 中輸出時(shí) System.err.println("threadid:" + Thread.currentThread().getId() + " pv:" + pv * 4); 因?yàn)?shuffleGrouping 輪詢分配

??優(yōu)點(diǎn):簡(jiǎn)單、計(jì)算量小。
??缺點(diǎn):稍有誤差,但絕大多數(shù)場(chǎng)景能接受。
方案三:
??PVBolt1 進(jìn)行多并發(fā)局部匯總,PVSumBolt 單線程進(jìn)行全局匯總。
??線程安全:多線程處理的結(jié)果和單線程一致。
??優(yōu)點(diǎn):絕對(duì)準(zhǔn)確;如果用 filedGrouping 可以得到中間值,如單個(gè) user 的訪問 PV(訪問深度等)。
??缺點(diǎn):計(jì)算量稍大,且多一個(gè) Bolt。

3)案例實(shí)操
??(1)創(chuàng)建數(shù)據(jù)輸入源 PVSpout

package com.atgui.storm.pv;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class PVSpout implements IRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector = null;private BufferedReader reader = null;private String str = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;// 讀取文件try {reader = new BufferedReader(new InputStreamReader(new FileInputStream("d:/temp/storm/website.log"), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}@Overridepublic void close() {try {if (reader != null) {reader.close();}} catch (IOException e) {e.printStackTrace();}}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void nextTuple() {// 發(fā)射數(shù)據(jù)try {while ((str = reader.readLine()) != null) {// 發(fā)射collector.emit(new Values(str));Thread.sleep(500);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(2)創(chuàng)建數(shù)據(jù)處理 PVBolt1

package com.atgui.storm.pv;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class PVBolt1 implements IRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector;private long pv = 0;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數(shù)據(jù)String line = input.getString(0);// 2、截取出 session_idString sessionID = line.split("\t")[1];// 3、根據(jù)會(huì)話id不同統(tǒng)計(jì) pv 次數(shù)if (sessionID != null) {pv++;}// 4、提交collector.emit(new Values(Thread.currentThread().getId(), pv));System.err.println("threadID:" + Thread.currentThread().getId() + " pv:" + pv);}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("threadID", "pv"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(3)創(chuàng)建 PVSumBolt

package com.atgui.storm.pv;import java.util.HashMap; import java.util.Iterator; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple;public class PVSumBolt implements IRichBolt {private static final long serialVersionUID = 1L;private Map<Long, Long> map = new HashMap<Long, Long>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 獲取數(shù)據(jù)Long threadID = input.getLong(0);Long pv = input.getLong(1);map.put(threadID, pv);long wordSum = 0;Iterator<Long> iterator = map.values().iterator();while(iterator.hasNext()) {wordSum += iterator.next();}System.err.println("pvAll:" + wordSum);}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(4)創(chuàng)建程序的拓?fù)?PVMain

package com.atgui.storm.pv;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;public class PVMain {public static void main(String[] args) {// 1、創(chuàng)建拓?fù)鋵?duì)象TopologyBuilder builder = new TopologyBuilder();// 2、設(shè)置 Spout 和 Boltbuilder.setSpout("PVSpout", new PVSpout(), 1);builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout");builder.setBolt("PVSumBolt", new PVSumBolt(), 1).shuffleGrouping("PVBolt1");// 3、配置 Worker 開啟的個(gè)數(shù)Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {// 4、分布式提交StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 5、本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("PVopology", conf, builder.createTopology());}} }

??(5)測(cè)試,執(zhí)行程序輸出如下結(jié)果

threadID:157 pv:1 pvAll:1 threadID:161 pv:1 pvAll:2 threadID:161 pv:2 pvAll:3 threadID:157 pv:2 pvAll:4 threadID:169 pv:1 pvAll:5 threadID:161 pv:3 pvAll:6 threadID:159 pv:1 pvAll:7 threadID:169 pv:2 pvAll:8 threadID:161 pv:4 pvAll:9 threadID:157 pv:3 pvAll:10 threadID:169 pv:3 pvAll:11 threadID:169 pv:4 pvAll:12 threadID:169 pv:5 pvAll:13 threadID:161 pv:5 pvAll:14 threadID:159 pv:2 pvAll:15 threadID:157 pv:4 pvAll:16 threadID:161 pv:6 pvAll:17 threadID:159 pv:3 pvAll:18 threadID:159 pv:4 pvAll:19 threadID:169 pv:6 pvAll:20 threadID:157 pv:5 pvAll:21 threadID:157 pv:6 pvAll:22 threadID:157 pv:7 pvAll:23 threadID:169 pv:7 pvAll:24 threadID:159 pv:5 pvAll:25 threadID:169 pv:8 pvAll:26 threadID:157 pv:8 pvAll:27 threadID:169 pv:9 pvAll:28 threadID:157 pv:9 pvAll:29 threadID:159 pv:6 pvAll:30我們將各個(gè)線程最后一次的輸出進(jìn)行累加 threadID:161 pv:6 threadID:169 pv:9 threadID:157 pv:9 threadID:159 pv:6 結(jié)果是 pvAll:30綜上:代碼測(cè)試完成!

5.4.3 實(shí)時(shí)計(jì)算網(wǎng)站 UV 去重案例

1)需求:
??統(tǒng)計(jì)網(wǎng)站 UV(獨(dú)立訪客數(shù))。

2)需求分析
方案一:
??把 ip 放入 Set 實(shí)現(xiàn)自動(dòng)去重,Set.size() 獲得 UV(分布式應(yīng)用中不可行)。
方案二:
??UVBolt1 通過 fieldGrouping 進(jìn)行多線程局部匯總,下一級(jí) UVSumBolt 進(jìn)行單線程全局匯總?cè)ブ亍0?ip 地址統(tǒng)計(jì) UV 數(shù)。
??既然去重,必須持久化數(shù)據(jù):
??(1)內(nèi)存:數(shù)據(jù)結(jié)構(gòu) map
??(2)no-sql 分布式數(shù)據(jù)庫(kù),如 Hbase

3)案例實(shí)操
??(1)創(chuàng)建帶 ip 地址的數(shù)據(jù)源 GenerateData

package com.atgui.storm.uv;import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random;public class GenerateData {public static void main(String[] args) {// 1、創(chuàng)建文件路徑File logFile = new File("d:/temp/storm/website.log");// 2、準(zhǔn)備數(shù)據(jù)// 2.1 網(wǎng)站名稱String[] hosts = { "www.atguigu.com" };// 2.2 會(huì)話idString[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34","BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };// 2.3 訪問網(wǎng)站時(shí)間String[] time = { "2017-08-07 08:40:50", "2017-08-07 08:40:51", "2017-08-07 08:40:52", "2017-08-07 08:40:53","2017-08-07 09:40:49", "2017-08-07 10:40:49", "2017-08-07 11:40:49", "2017-08-07 12:40:49" };// 2.4 訪問網(wǎng)站ip地址String[] ip = { "192.168.1.101", "192.168.1.102", "192.168.1.103", "192.168.1.104", "192.168.1.105","192.168.1.106", "192.168.1.107", "192.168.1.108" };// 3、拼接數(shù)據(jù)StringBuffer sb = new StringBuffer();Random random = new Random();for (int i = 0; i < 30; i++) {sb.append(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)] + "\t" + ip[random.nextInt(8)] + "\n");}// 判斷l(xiāng)og日志是否存在,不存在要?jiǎng)?chuàng)建if (!logFile.exists()) {try {logFile.createNewFile();} catch (IOException e) {System.out.println("Create logFile fail !");}}byte[] b = (sb.toString()).getBytes();// 4、 寫數(shù)據(jù)到文件FileOutputStream fileOutputStream = null;try {fileOutputStream = new FileOutputStream(logFile);fileOutputStream.write(b);System.out.println("Generate data over !");} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {// 5、關(guān)閉資源try {fileOutputStream.close();} catch (IOException e) {e.printStackTrace();}}} }

??(2)創(chuàng)建接收數(shù)據(jù) UVSpout

package com.atgui.storm.uv;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;public class UVSpout implements IRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector = null;private BufferedReader reader = null;private String str = null;@SuppressWarnings("rawtypes")@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;// 讀取文件try {reader = new BufferedReader(new InputStreamReader(new FileInputStream("d:/temp/storm/website.log"), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}@Overridepublic void close() {try {if (reader != null) {reader.close();}} catch (IOException e) {e.printStackTrace();}}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void nextTuple() {// 發(fā)射數(shù)據(jù)try {while ((str = reader.readLine()) != null) {// 發(fā)射collector.emit(new Values(str));Thread.sleep(500);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(3)創(chuàng)建 UVBolt1

package com.atgui.storm.uv;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class UVBolt1 implements IRichBolt {private static final long serialVersionUID = 1L;private OutputCollector collector;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數(shù)據(jù)String line = input.getString(0);// 2、截取出 ipString ip = line.split("\t")[3];// 3、提交collector.emit(new Values(ip, 1));}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 聲明輸出字段的類型declarer.declare(new Fields("ip", "num"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(4)創(chuàng)建 UVSumBolt

package com.atgui.storm.uv;import java.util.HashMap; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple;public class UVSumBolt implements IRichBolt {private static final long serialVersionUID = 1L;private Map<String, Integer> map = new HashMap<String, Integer>();@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {// 1、獲取傳遞過來的數(shù)據(jù)String ip = input.getString(0);Integer num = input.getInteger(1);// 2、累加單詞if (map.containsKey(ip)) {Integer count = map.get(ip);map.put(ip, count + num);} else {map.put(ip, num);}System.err.println(Thread.currentThread().getId() + " ip:" + ip + " num:" + map.get(ip));}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

??(5)創(chuàng)建驅(qū)動(dòng) UVMain

package com.atgui.storm.uv;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;public class UVMain {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("UVSpout", new UVSpout(), 1);builder.setBolt("UVBolt1", new UVBolt1(), 4).shuffleGrouping("UVSpout");builder.setBolt("UVSumBolt", new UVSumBolt(), 1).shuffleGrouping("UVBolt1");Config conf = new Config();conf.setNumWorkers(2);if (args.length > 0) {try {StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("UVtopology", conf, builder.createTopology());}} }

??(6)測(cè)試

163 ip:192.168.1.104 num:1 163 ip:192.168.1.105 num:1 163 ip:192.168.1.108 num:1 163 ip:192.168.1.104 num:2 163 ip:192.168.1.106 num:1 163 ip:192.168.1.107 num:1 163 ip:192.168.1.103 num:1 163 ip:192.168.1.101 num:1 163 ip:192.168.1.102 num:1 163 ip:192.168.1.105 num:2 163 ip:192.168.1.107 num:2 163 ip:192.168.1.104 num:3 163 ip:192.168.1.103 num:2 163 ip:192.168.1.107 num:3 163 ip:192.168.1.104 num:4 163 ip:192.168.1.105 num:3 163 ip:192.168.1.108 num:2 163 ip:192.168.1.106 num:2 163 ip:192.168.1.106 num:3 163 ip:192.168.1.108 num:3 163 ip:192.168.1.105 num:4 163 ip:192.168.1.104 num:5 163 ip:192.168.1.107 num:4 163 ip:192.168.1.103 num:3 163 ip:192.168.1.103 num:4 163 ip:192.168.1.103 num:5 163 ip:192.168.1.101 num:2 163 ip:192.168.1.102 num:2 163 ip:192.168.1.105 num:5 163 ip:192.168.1.101 num:3測(cè)試結(jié)果:一共8個(gè)用戶, 101:訪問3次; 102:訪問2次; 103:訪問5次; 104:訪問5次; 105:訪問5次; 106:訪問3次; 107:訪問4次; 108:訪問3次;

我的GitHub地址:https://github.com/heizemingjun
我的博客園地址:https://www.cnblogs.com/chenmingjun
我的CSDN地址:https://blog.csdn.net/u012990179
我的螞蟻筆記博客地址:https://blog.leanote.com/chenmingjun
Copyright ?2018~2019 黑澤君
【轉(zhuǎn)載文章務(wù)必保留出處和署名,謝謝!】

總結(jié)

以上是生活随笔為你收集整理的大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

久久久999| 五月婷婷中文 | 欧美日本中文字幕 | 91免费视频网站在线观看 | 久久国产区| 免费在线观看国产精品 | 999久久国精品免费观看网站 | 91视频免费看网站 | 国内精品亚洲 | 亚洲天堂在线观看完整版 | av片免费播放 | 国产精品6| 国产日韩精品一区二区在线观看播放 | 人人插人人舔 | 日韩视频在线不卡 | 99精品国产高清在线观看 | 激情综合啪啪 | 天天干天天操天天做 | 国产一区二区在线观看视频 | 亚洲理论片在线观看 | 亚洲视频免费在线看 | 综合久久精品 | 激情av网 | 精品国产成人在线 | 久久久久久综合 | 国产精品av免费在线观看 | www.色国产 | 国产1区2区3区精品美女 | 成人在线视频一区 | 日韩在线观看网址 | 成人免费一区二区三区在线观看 | 免费观看一区二区三区视频 | 精品亚洲视频在线 | 日韩字幕在线 | 久久久久久久久亚洲精品 | 国产成人精品一区二区三区在线 | 久久精品国产亚洲精品2020 | 中文字幕在线日本 | 日韩欧美观看 | 天天操夜夜逼 | 亚洲视频一 | 久草久视频| 久久精品直播 | 91人人澡人人爽人人精品 | 午夜视频久久久 | 色欧美综合 | 国产高清视频网 | 国产精品乱码久久 | 国产亚洲精品美女久久 | 久久草网站 | 亚洲精品国产综合久久 | 在线中文视频 | 久草视频在线资源 | 国产精品入口传媒 | 日韩午夜av | 午夜久久久久久久 | 东方av在线免费观看 | 欧美午夜一区二区福利视频 | 玖玖视频精品 | 正在播放一区二区 | 国产精品高清免费在线观看 | 亚洲三级视频 | 日韩在线免费看 | 国产精品精品国产色婷婷 | 国产一区二区三区四区在线 | 成人影视免费看 | 美女久久久久久久久久久 | 亚洲精品中文字幕在线观看 | 日日夜夜亚洲 | a天堂最新版中文在线地址 久久99久久精品国产 | 91丨九色丨高潮丰满 | 欧洲精品在线视频 | 中文久久精品 | 黄色免费在线看 | 成人午夜精品 | 青青色影院 | 亚洲精品欧美成人 | 欧美日韩国产免费视频 | 天堂中文在线视频 | 色99之美女主播在线视频 | 亚洲在线视频免费 | 婷婷中文在线 | 美女视频黄免费 | 天天综合操 | 国产电影黄色av | 久久精品视 | av电影免费在线看 | 日韩午夜在线播放 | 999电影免费在线观看2020 | 在线视频麻豆 | 黄色小网站在线观看 | 亚洲精品国产精品国 | 日韩理论电影在线 | 狠狠色伊人亚洲综合成人 | 婷婷六月网 | www.福利视频| 在线观看国产日韩 | 毛片1000部免费看 | 日韩网| 91av在线国产| 天天射天天射 | 久久爽久久爽久久av东京爽 | а中文在线天堂 | 男女靠逼app | 亚洲 欧洲 国产 日本 综合 | 久久久久日本精品一区二区三区 | 9色在线视频 | 婷婷色av | 久久电影国产免费久久电影 | 玖草在线观看 | 色av色av色av | 五月天久久 | 一区二区 精品 | 国产精品第二十页 | 日韩欧美一区二区在线观看 | 免费在线看v | 爱情影院aqdy鲁丝片二区 | 草久热 | 国产亚洲精品久久久久秋 | 伊人小视频 | 中文字幕在线观看网 | 久久久久这里只有精品 | 国产精品ⅴa有声小说 | 操操操日日日干干干 | 欧美韩日在线 | 91精品国产一区二区在线观看 | 中文高清av | 免费无遮挡动漫网站 | 中文字幕视频网站 | 天天操天天综合网 | 欧美a级片免费看 | 国产精品国产三级国产aⅴ入口 | 91精品免费在线观看 | 国产免费一区二区三区最新6 | 久久怡红院| 国产精品成人久久久久久久 | 91久久国产露脸精品国产闺蜜 | 亚洲婷婷丁香 | 在线观看av的网站 | 免费在线黄 | 中文字幕在线观看不卡 | 日韩久久一区二区 | 欧美成人久久 | 亚洲乱码在线观看 | 亚洲国产日韩精品 | 婷婷久久久 | 国产精品入口传媒 | 日韩在线观看视频一区二区三区 | 国产精品你懂的在线观看 | 激情久久小说 | 国产成人久久精品77777 | 天天操天天操天天操天天操天天操 | 国产色女人 | 精品99久久 | 日韩中文在线电影 | 欧美在线a视频 | 国产白浆在线观看 | 亚洲理论影院 | 国产精品 国产精品 | 国产 精品 资源 | 超碰在线免费福利 | 久久 精品一区 | 国产免费叼嘿网站免费 | 伊人天堂网 | 一区二区三区影院 | 婷婷在线免费 | 视频在线播放国产 | 久久久久久黄 | 日韩精品专区在线影院重磅 | 国产福利a | 亚洲高清在线观看视频 | 最近中文字幕完整视频高清1 | 久草久热 | 91成人精品一区在线播放69 | 久久免费精品一区二区三区 | 天天射天天操天天 | 丰满少妇麻豆av | 特级西西人体444是什么意思 | 激情一区二区三区欧美 | 不卡视频一区二区三区 | 国产区精品在线 | 亚洲综合色视频 | 欧美一级在线观看视频 | 亚洲精品一区二区三区在线观看 | 欧美精选一区二区三区 | 亚洲一级免费电影 | 最新国产在线 | 免费亚洲视频 | 波多野结衣在线视频免费观看 | 国产男女爽爽爽免费视频 | 成人久久18免费网站 | 99精品免费久久久久久久久日本 | 激情久久久久久久久久久久久久久久 | 久久精品视频观看 | 午夜视频欧美 | 欧美一二三视频 | 九九在线高清精品视频 | 国产在线一线 | 日韩av在线看| 欧美国产三区 | 中文字幕在线观看第二页 | 久久亚洲精品国产亚洲老地址 | 国产精品麻豆果冻传媒在线播放 | 成人免费观看完整版电影 | 99在线观看精品 | 天堂在线成人 | 成年人视频免费在线播放 | 精品久久久久久国产 | 久草在线免费新视频 | 亚洲精品综合一二三区在线观看 | 国产1区2区3区精品美女 | 狠狠综合| 精品亚洲一区二区三区 | 国产黄视频在线观看 | 国产 在线 高清 精品 | 91av资源网 | 精品免费在线视频 | 在线视频麻豆 | 国产精品不卡视频 | 射射射综合网 | 国产精品毛片久久久久久久久久99999999 | 久草在线资源网 | 免费看国产视频 | 91在线精品秘密一区二区 | 日韩成人高清在线 | 蜜臀aⅴ精品一区二区三区 久久视屏网 | 国产 成人 久久 | 91精品亚洲影视在线观看 | 日韩欧美xx | 精品国产一区二区三区男人吃奶 | 一级欧美一级日韩 | 久久久久久久99 | 国产精品视频99 | 九九在线国产视频 | 国产亚洲成人网 | 在线观看中文字幕 | 99热精品国产 | 91久久久久久国产精品 | 久久久黄视频 | av片中文字幕| 精品亚洲一区二区 | 91精品啪在线观看国产线免费 | 一级黄色免费网站 | 日韩高清网站 | 国产一区二区在线影院 | 国产精品久久久久婷婷 | 激情欧美一区二区三区 | 久久艹在线观看 | 国产精品乱码久久久久久1区2区 | 亚洲一二三在线 | 91麻豆精品国产自产 | 国产黄色特级片 | 在线观看视频色 | 中日韩免费视频 | 国产三级在线播放 | 日本一区二区免费在线观看 | 欧美久久久影院 | 国产三级香港三韩国三级 | 国产精品久久久久久久久婷婷 | 国产精品综合在线观看 | 黄色在线看网站 | 九九九毛片 | 日韩在线一二三区 | 99国产精品 | 欧美日产在线观看 | 久久免费视频在线观看6 | 久久综合九色综合欧美就去吻 | 看污网站 | 69国产盗摄一区二区三区五区 | 久99久中文字幕在线 | 99精品99| 久久夜色精品国产欧美乱极品 | 亚洲经典视频在线观看 | 午夜av在线 | 蜜臀久久99精品久久久久久网站 | 激情久久五月 | 国产精品久久久久久久久久久久 | 视频二区 | 九九视频免费观看视频精品 | 亚洲精品激情 | 黄色天堂在线观看 | 四虎成人免费观看 | 国产成人精品亚洲精品 | 国产97色| 久久中文字幕在线视频 | 成人精品久久 | 国产精品久久久影视 | 国产精品 中文在线 | 在线观看aaa| 夜夜躁狠狠躁日日躁 | 人人插超碰 | 中文字幕精品一区久久久久 | 亚洲乱码久久久 | 午夜91在线 | 三级在线播放视频 | 91精品麻豆| 九色最新网址 | 久久久国产在线视频 | 成人免费xxx在线观看 | a亚洲视频 | 在线观看av的网站 | 在线观看黄色小视频 | 亚洲午夜在线视频 | 西西4444www大胆艺术 | 久99久精品 | 欧美另类网站 | av观看在线观看 | 九九久久久久99精品 | 国产成人一区二区三区在线观看 | 国产一线在线 | 中文字幕亚洲精品日韩 | 久久综合久色欧美综合狠狠 | 日韩av影片在线观看 | 日韩精品一区二区三区高清免费 | 国产精品99久久久久久武松影视 | 96精品高清视频在线观看软件特色 | 色国产视频 | 亚洲在线精品视频 | 亚洲一区二区三区四区精品 | 国产在线免费av | 婷婷激情综合网 | 96国产在线 | 91在线视频观看免费 | 婷婷丁香国产 | 91手机电视 | 九九精品久久 | 成人免费观看电影 | 免费能看的黄色片 | 国产 日韩 欧美 在线 | 久久久久久网站 | 免费一级片视频 | 亚洲欧美日本一区二区三区 | 欧美精品在线观看一区 | 五月亚洲婷婷 | 亚洲成a人片在线www | 片网站| 亚洲一区二区三区四区在线视频 | 国产一级一片免费播放放a 一区二区三区国产欧美 | 黄色大全视频 | 亚洲欧美日韩一区二区三区在线观看 | 国产一区二区在线视频观看 | 三级黄色在线 | 激情www | 国产美女精品视频免费观看 | 久久亚洲福利 | 国产中文字幕在线看 | 久久精品欧美 | 色五月激情五月 | 伊人五月 | 久久新视频 | 欧洲视频一区 | 中文字幕亚洲综合久久五月天色无吗'' | 亚洲一区美女视频在线观看免费 | 免费黄色在线网站 | 国产高清久久 | 色丁香久久 | 国产91精品欧美 | 日本三级人妇 | 天天爽综合网 | 国产小视频免费观看 | 日韩有码在线播放 | 久草视频视频在线播放 | 成人午夜毛片 | 日韩电影在线观看一区二区三区 | 免费又黄又爽视频 | 国产福利av | 国产精品99久久久久久有的能看 | 欧美性黑人 | 蜜桃传媒一区二区 | 国产一级淫片在线观看 | 夜夜操网| 在线看91| 丰满少妇一级 | 黄色影院在线免费观看 | 91av在线看| 操操操天天操 | 色狠狠久久av五月综合 | 日韩欧美一区二区三区在线观看 | 成人午夜精品福利免费 | 成人h在线观看 | 天天激情天天干 | 精品久久国产精品 | 在线观看午夜av | 国产视频一区精品 | 九九九在线观看 | 天天爱天天插 | 婷婷六月天丁香 | 美女网站黄在线观看 | 国产96精品 | 久久影院精品 | 九九免费观看视频 | 日本色小说视频 | 色资源中文字幕 | 三级黄免费看 | 一区二区三区四区在线 | 91福利区一区二区三区 | 在线av资源| 日韩视频免费播放 | 91成人午夜 | 国产精品精品国产 | 免费在线黄网 | 成人在线观看免费视频 | 国产精品二区三区 | 黄色特级毛片 | 91视频在线观看大全 | 99久久精品免费 | 操操操日日日干干干 | 日韩精品在线一区 | 欧美一级视频免费看 | 国产小视频国产精品 | 96av麻豆蜜桃一区二区 | 久久久久女人精品毛片 | 日韩极品在线 | 欧美精品亚洲精品 | www.在线看片.com| 久久免费毛片 | 狠狠干.com | 国产亚洲婷婷免费 | 国产精品久久久久久久久久东京 | av在观看| 九色精品免费永久在线 | 插久久| 最新午夜电影 | 精精国产xxxx视频在线播放 | 亚洲婷婷在线 | 中国一级特黄毛片大片久久 | 一区二区三区免费在线观看视频 | 99在线观看视频 | 一本之道乱码区 | 日韩视频 一区 | 亚洲视频999 | 亚洲不卡av一区二区三区 | 久久久久久综合网天天 | 日日干夜夜干 | 久久久高清一区二区三区 | 99se视频在线观看 | 亚洲午夜久久久久久久久电影网 | 欧美人体xx | 国产做a爱一级久久 | 日本性xxx | 99精品国产一区二区三区不卡 | 欧美伦理电影一区二区 | 久久国产精品免费一区二区三区 | 精品一区二区三区四区在线 | 日本精品一 | 中文字幕在线观看不卡 | 黄色午夜 | 美女免费视频一区 | 国产精品久久久久久a | 午夜电影av | 久久国产精品一区二区 | 人人射人人爽 | 在线看黄网站 | 中文理论片 | 日韩免费福利 | 成年人在线看视频 | 又黄又爽又无遮挡的视频 | 国产亚洲精品久久 | 亚洲天堂在线观看完整版 | 欧美最猛性xxxxx(亚洲精品) | 奇米影视8888| 99久久精品网 | 日韩在线视频网 | 四虎影视成人永久免费观看视频 | 久久国产精品影视 | 久久精品99精品国产香蕉 | 国产第一页福利影院 | www.色国产 | 男女啪啪免费网站 | 99欧美视频 | 尤物一区二区三区 | 久久久99精品免费观看 | 亚洲精品视频在线观看网站 | 久久午夜网 | 久久精品a | 久久免费视频1 | 热热热热热色 | 2017狠狠干| 亚洲欧美成人综合 | 99久久精品国产一区二区三区 | 日韩18p| www夜夜操com| 久久久亚洲国产精品麻豆综合天堂 | 五月天婷亚洲天综合网精品偷 | 国产一区网址 | 天天五月天色 | 亚洲精品国产拍在线 | 成人一区二区在线 | 日韩黄色影院 | 国产又粗又猛又色又黄网站 | 免费 在线 中文 日本 | 91探花国产综合在线精品 | 免费视频一区二区 | 国产精品久久久久9999吃药 | 波多野结衣视频一区 | 欧美久久久久久久久中文字幕 | 久久久久麻豆v国产 | 欧美日韩激情视频8区 | 激情小说 五月 | 日韩免费三级 | 九九精品无码 | 福利视频导航网址 | 亚洲精品国偷拍自产在线观看蜜桃 | 97人人澡人人添人人爽超碰 | 日韩视频免费观看高清完整版在线 | 成全在线视频免费观看 | 亚洲欧洲精品一区二区精品久久久 | 天天综合网~永久入口 | 99情趣网视频 | 亚洲精品在线国产 | 久色婷婷| 欧美男同网站 | 国产成人精品久久二区二区 | av导航福利 | 亚洲另类视频在线 | 国产无区一区二区三麻豆 | 中文字幕一二三区 | 综合网在线视频 | 99久久久久成人国产免费 | 国产日产精品一区二区三区四区的观看方式 | 欧美精品免费一区二区 | 国产第一福利 | 免费观看日韩av | 91香蕉视频污在线 | 69视频国产| 国产高清视频免费观看 | a资源在线 | 国产 视频 久久 | 欧美国产日韩一区二区 | 国产 视频 高清 免费 | 免费观看的黄色 | 成人av免费 | 91女神的呻吟细腰翘臀美女 | 国产伦理一区二区三区 | 国产黄色av | 久久久精品 一区二区三区 国产99视频在线观看 | 亚洲天天摸日日摸天天欢 | 亚洲欧美激情精品一区二区 | 久久精品视频国产 | 日韩中文字幕亚洲一区二区va在线 | 国产日韩欧美在线观看视频 | 粉嫩高清一区二区三区 | 亚洲成人一二三 | 久久精品国产第一区二区三区 | 视频二区| se婷婷| 欧美色综合天天久久综合精品 | 国产精品一区免费在线观看 | 国产日产高清dvd碟片 | 日韩免费播放 | 亚洲开心色 | 在线观看mv的中文字幕网站 | 91福利试看 | 国产精品婷婷午夜在线观看 | 亚洲精品小区久久久久久 | 69av在线播放 | 久射网| 久久1区| 国产在线精品区 | 国产一区二区三区免费视频 | 久久这里只有精品1 | 91成年人在线观看 | 国产精品99久久久精品 | 手机av电影在线观看 | 正在播放 久久 | 美腿丝袜av| 最新国产在线视频 | 成 人 黄 色 免费播放 | 最近2019中文免费高清视频观看www99 | 久久国产经典 | 超碰日韩在线 | 国产色女| 天天操天天射天天添 | 天天操天 | 在线免费观看一区二区三区 | 亚洲视频99| 又色又爽的网站 | 国产精品专区一 | 国产欧美综合在线观看 | 久久综合成人 | 成人欧美在线 | 成年人免费在线观看网站 | 中文字幕在线影院 | 亚洲一级特黄 | 手机在线观看国产精品 | 深爱五月激情五月 | 欧美日韩国产一二三区 | 国产一区二区三区黄 | 一级性视频 | 欧美一级视频免费看 | 亚洲精品高清在线 | 国产99久久久国产精品成人免费 | 国产高清成人在线 | 免费日韩 精品中文字幕视频在线 | 国产在线欧美日韩 | 激情五月播播久久久精品 | 国产精品色婷婷 | 99精品区| www.久久91| 91精选| 亚洲精品一区二区三区新线路 | 精品99免费视频 | 91精品1区 | 欧美亚洲一区二区在线 | 欧美一级乱黄 | 中文字幕丝袜美腿 | 午夜10000 | 伊人色综合久久天天网 | 麻豆视传媒官网免费观看 | 午夜久久福利视频 | 国产成人久久精品亚洲 | 国产一区二区综合 | 久久热亚洲 | 成人91在线观看 | 亚洲最新视频在线播放 | 亚洲国产精品久久久久婷婷884 | 91av蜜桃| 天天爽天天射 | 91九色蝌蚪国产 | 天天天天天天天操 | 欧美性黑人 | 欧美成年人在线视频 | 伊人www22综合色 | 人人射人人爱 | 6080yy午夜一二三区久久 | 网站在线观看日韩 | 五月导航 | 国产精品18久久久久久不卡孕妇 | www国产精品com | 久久久久国产精品一区 | 九九爱免费视频在线观看 | 伊人五月| 91中文在线 | 久久不卡国产精品一区二区 | 国产精品中文在线 | 欧美性生活免费 | 久久久久亚洲天堂 | 久久婷婷综合激情 | 国产福利资源 | 伊人婷婷久久 | 国产视频在线看 | 人人爽人人爽人人爽人人爽 | 97电影院网 | 狠狠狠干| 久久精视频| 久久久久久久影视 | 久久综合久久综合九色 | 国产麻豆剧传媒免费观看 | 韩国三级一区 | 天天操天天添天天吹 | 在线观看视频一区二区三区 | 91高清视频免费 | 91精品国产综合久久福利 | 亚洲第一区精品 | 国产精品久久久久久久久搜平片 | 久草影视在线 | 天天玩天天操天天射 | 久久99精品久久久久蜜臀 | 亚洲黄色免费网站 | 成人免费91| 国产精品久久久久久久久久直播 | 一区二区在线不卡 | 九九视频在线 | 国产午夜一区 | 久久婷婷色 | 免费在线一区二区三区 | 99精品欧美一区二区三区黑人哦 | 色综久久 | 成人aⅴ视频 | 成人午夜影院在线观看 | 亚洲美女视频网 | 日韩免费在线视频观看 | 久久在线观看视频 | 日韩欧美在线高清 | 亚洲精品三级 | 欧美一区二区在线刺激视频 | 97国产超碰| 99热这里有精品 | 欧美性久久久 | 中日韩男男gay无套 日韩精品一区二区三区高清免费 | 久热免费在线观看 | 欧洲一区精品 | 97色在线观看免费视频 | 久久午夜色播影院免费高清 | 人人玩人人弄 | 中文字幕在线一区二区三区 | 国产精品理论片在线观看 | 国产精品久久久久久久久久 | 麻豆视频免费观看 | 日韩专区一区二区 | 日韩成人在线一区二区 | 精品国产片| 久久影院精品 | www五月天婷婷 | 久久综合福利 | 黄色一级片视频 | 免费日韩一区二区三区 | 日韩成人在线免费观看 | 久草在线官网 | 精品女同一区二区三区在线观看 | 日本精品一 | 五月天激情电影 | 成人黄色大片网站 | 在线观看视频免费播放 | 91视频在线播放视频 | 久久久国产99久久国产一 | 在线看av网址 | 国产第一二区 | 99精品视频在线免费观看 | 综合久久久久久 | 精品国产免费看 | 天天草天天干天天射 | 奇米网8888 | 视频福利在线观看 | 久久激情视频 | 免费成人黄色av | 91在线成人| 91毛片在线观看 | 人人澡视频 | 成人免费视频网站在线观看 | 亚洲国产精品久久 | 中文字幕久久精品一区 | 免费一级片在线 | 在线观看亚洲国产 | 久久精品国产亚洲精品 | 中文字幕一区二区三区在线观看 | 中文字幕丝袜美腿 | 日韩免 | 久久综合狠狠综合久久激情 | 国产成人1区 | 亚洲好视频 | 国产成人av一区二区三区在线观看 | 国产中文字幕大全 | 福利视频一区二区 | 亚洲国产成人久久综合 | 日韩三区在线观看 | 五月婷婷六月丁香 | 欧美一级大片在线观看 | 久久精品第一页 | 中文字幕在线观看你懂的 | 精品一区二区电影 | 国产一级91 | 欧美精品久久久久久久久久 | 在线观看一区二区精品 | 91精品国产乱码久久 | 日韩精品一区二区在线观看视频 | 黄色一级大片在线免费看产 | 天天操夜 | 天天干天天天天 | 天天干天天天 | 欧美成人xxxx | 操少妇视频 | 亚洲精品小视频 | 日韩欧美精品在线视频 | 欧美日韩一区二区在线观看 | 中文字幕国产视频 | 久久久久国产一区二区三区四区 | 99热在线免费观看 | 一区二区三区精品在线 | www.xxxx欧美 | 日日夜精品 | 国产 日韩 在线 亚洲 字幕 中文 | 国产免费久久久久 | 一级a毛片高清视频 | 操久| 91视频在线播放视频 | 五月婷婷色 | 国产在线不卡一区 | 一区二区精品国产 | 欧美日韩中文国产一区发布 | www.神马久久 | 中文字幕av在线播放 | 日日干日日操 | 91网址在线看 | 免费一区在线 | 亚洲午夜激情网 | 天堂在线视频中文网 | 日日精品| 午夜av不卡 | 天天想夜夜操 | 黄色的片子 | 少妇搡bbbb搡bbb搡忠贞 | 少妇高潮流白浆在线观看 | 婷婷色中文 | 色婷婷啪啪免费在线电影观看 | a级国产乱理伦片在线播放 久久久久国产精品一区 | 天天操天天弄 | 国产一区二区三区免费在线 | 精品视频资源站 | 免费网站在线观看成人 | 久久a级片 | 色婷婷综合五月 | free. 性欧美.com | 欧美另类交在线观看 | 色网av| 国产精品黄 | 豆豆色资源网xfplay | 91av在线播放视频 | 在线观看视频你懂 | 综合av在线 | 免费亚洲片 | 中文字幕高清在线 | www.99av| 色综合天天狠狠 | 久久综合狠狠综合久久综合88 | www.久草.com | 色中文字幕在线观看 | 精品国产乱码久久久久久浪潮 | 国产亚洲婷婷 | 中文字幕av网站 | 在线观看免费一级片 | 13日本xxxxxⅹxxx20 | 99热都是精品 | 在线播放一区 | 国产精品久久久久久一二三四五 | 亚洲精品国偷拍自产在线观看蜜桃 | 亚洲欧美视频一区二区三区 | 亚洲国产精品成人va在线观看 | 中文久久精品 | 日韩欧美一级二级 | 久久久久久久久久影院 | 亚洲男男gaygay无套 | 一区二区三区av在线 | 免费在线观看午夜视频 | 日韩精品无 | 久久久久免费精品视频 | 国产自产高清不卡 | 1024手机基地在线观看 | 菠萝菠萝蜜在线播放 | 久久综合影院 | 欧美美女视频在线观看 | 一本一本久久a久久精品综合妖精 | 欧美日韩三级 | 国产精品资源 | 久久国产精品久久精品国产演员表 | 日韩一区精品 | 91精品一区二区三区久久久久久 | 久久er99热精品一区二区三区 | av电影不卡在线 | 亚洲视频在线观看网站 | 亚洲婷婷在线 | 日韩精品一区二区三区高清免费 | 人人看人人爱 | 久久久免费视频播放 | 国产黄色成人 | 91超碰免费在线 | 日日夜夜精品免费 | 亚色视频在线观看 | 色综合久 | 91精品成人| 成全免费观看视频 | 国产色资源| 国产中文字幕av | 在线欧美最极品的av | 久久久久久国产精品免费 | 久久久久久高潮国产精品视 | 日韩在线视频观看 | 亚洲精品乱码久久久久久蜜桃91 | 亚洲午夜在线视频 | 亚洲动漫在线观看 | 亚洲国产伊人 | 国产高清小视频 | 日韩视频一区二区 | 亚洲精品欧洲精品 | 国产69精品久久久久99尤 | 91视频国产高清 | 久操视频在线播放 | 成年人免费在线观看 | 久久不卡免费视频 | 99999精品视频 | 手机av观看 | 免费av免费观看 | 色资源中文字幕 | 婷婷福利影院 | 国产一级h | 久久久久久国产精品美女 | 亚洲在线视频网站 | 日韩h在线观看 | 日韩精品视频免费看 | 国产午夜免费视频 | 亚洲成人一区 | 麻豆传媒视频在线 | 色婷婷激情四射 | 久草综合在线 | 久久人人爽人人爽人人片av免费 | 91丨九色丨蝌蚪丨老版 | 成人av在线观 | 成片免费观看视频999 | 中文字幕丝袜一区二区 | 最近中文字幕mv免费高清在线 | av色一区| 久久99精品久久久久久久久久久久 | 久久久国产99久久国产一 | 久久精品人人做人人综合老师 | 一区二区三区精品久久久 | 超碰在线观看av.com | 亚洲 欧美 精品 | 日韩xxxx视频 | 99久久久久久国产精品 | 9色在线视频| 97超碰免费| 在线观看免费国产小视频 | 一区二区三区在线影院 | 婷婷色中文 | 亚洲天天干| 久久99精品国产99久久6尤 | 日日操天天操狠狠操 | 免费大片av| 日韩国产精品一区 | 九九热在线视频免费观看 | 色多视频在线观看 | 四虎在线观看视频 | 久久99视频免费 | 手机版av在线 | 综合激情婷婷 | 二区三区在线 | 日p在线观看 | 色偷偷88888欧美精品久久 | 丝袜美腿亚洲综合 | 99精品系列| 青青河边草免费视频 | 欧洲亚洲女同hd | 日韩一片| 中文成人字幕 | av免费网页 | 成年人网站免费在线观看 | 欧美黄污视频 | 国产亚洲精品久久久久久电影 | 草久在线观看视频 | 免费麻豆网站 | 日韩大片在线看 | 在线观看的a站 | 欧洲精品二区 | 91黄色免费网站 | 久一网站 | 国产精品系列在线 | 日韩av电影中文字幕 | 色综合久久精品 | 国产中文字幕大全 | 97精品欧美91久久久久久 | 五月婷婷婷婷婷 | 精品一区91| 久久精品网站免费观看 | 色在线视频 | 日韩a免费 | 成人18视频| 黄色a视频免费 | 亚洲一级影院 | 91免费日韩| 超碰人人干人人 | 最新动作电影 | 天天综合天天做天天综合 | 国产一级免费观看视频 | 国产精品久久久久9999吃药 | 高清av免费观看 | 日韩欧美一区二区三区视频 | 亚洲国产精品资源 | 丁香婷婷综合色啪 | 午夜三级福利 | 99视频网站 | 97在线观看免费观看 | 欧美激情另类文学 | 久久人人爽人人人人片 | 99久久精品一区二区成人 | 久久国产精品小视频 | 日韩在线观看视频免费 | 狠狠躁18三区二区一区ai明星 | 国产在线高清精品 | 亚洲精品国偷自产在线99热 | 在线亚洲欧美视频 | 亚洲一级影院 | 日韩欧美在线国产 | 美女视频久久久 | 最新日本中文字幕 | 激情婷婷网 | 在线直播av | 欧美精品亚州精品 | 美女视频黄,久久 | 久久艹国产视频 | 黄色av电影在线观看 | 色五月成人| 久久99精品久久久久久三级 | 日韩xxxx视频 | 黄色一级在线免费观看 | 色在线中文字幕 | 日韩精品一区二区三区免费观看视频 | av 一区二区三区四区 | 日韩av一区二区三区在线观看 | 久久综合色天天久久综合图片 | 免费看一级特黄a大片 | 日韩精品三区四区 | 国产在线观看你懂得 | 久久久亚洲麻豆日韩精品一区三区 | 国产精品毛片一区视频播不卡 | 韩国av免费 | 亚洲国产精品va在线看黑人 | 精品v亚洲v欧美v高清v | 成人h电影在线观看 |