日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

使用Storm实现实时大数据分析

發(fā)布時間:2024/6/21 综合教程 30 生活家
生活随笔 收集整理的這篇文章主要介紹了 使用Storm实现实时大数据分析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

摘要:隨著數(shù)據(jù)體積的越來越大,實時處理成為了許多機構需要面對的首要挑戰(zhàn)。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上結合了汽車超速監(jiān)視,為我們演示了使用Storm進行實時大數(shù)據(jù)分析。CSDN在此編譯、整理。

簡單和明了,Storm讓大數(shù)據(jù)分析變得輕松加愉快。

當今世界,公司的日常運營經(jīng)常會生成TB級別的數(shù)據(jù)。數(shù)據(jù)來源囊括了互聯(lián)網(wǎng)裝置可以捕獲的任何類型數(shù)據(jù),網(wǎng)站、社交媒體、交易型商業(yè)數(shù)據(jù)以及其它商業(yè)環(huán)境中創(chuàng)建的數(shù)據(jù)。考慮到數(shù)據(jù)的生成量,實時處理成為了許多機構需要面對的首要挑戰(zhàn)。我們經(jīng)常用的一個非常有效的開源實時計算工具就是Storm—— Twitter開發(fā),通常被比作“實時的Hadoop”。然而Storm遠比Hadoop來的簡單,因為用它處理大數(shù)據(jù)不會帶來新老技術的交替。

Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分別從事技術分析和研發(fā)工作。本文詳述了Storm的使用方法,例子中的項目名稱為“超速報警系統(tǒng)(Speeding Alert System)”。我們想實現(xiàn)的功能是:實時分析過往車輛的數(shù)據(jù),一旦車輛數(shù)據(jù)超過預設的臨界值 —— 便觸發(fā)一個trigger并把相關的數(shù)據(jù)存入數(shù)據(jù)庫。

1. Storm是什么

全量數(shù)據(jù)處理使用的大多是鼎鼎大名的hadoop或者hive,作為一個批處理系統(tǒng),hadoop以其吞吐量大、自動容錯等優(yōu)點,在海量數(shù)據(jù)處理上得到了廣泛的使用。

Hadoop下的Map/Reduce框架對于數(shù)據(jù)的處理流程是:

1、 將要處理的數(shù)據(jù)上傳到Hadoop的文件系統(tǒng)HDFS中。

2、 Map階段

a) Master對Map的預處理:對于大量的數(shù)據(jù)進行切分,劃分為M個16~64M的數(shù)據(jù)分片(可通過參數(shù)自定義分片大小)

b) 調(diào)用Mapper函數(shù):Master為Worker分配Map任務,每個分片都對應一個Worker進行處理。各個Worker讀取并調(diào)用用戶定義的Mapper函數(shù) 處理數(shù)據(jù),并將結果存入HDFS,返回存儲位置給Master。

一個Worker在Map階段完成時,在HDFS中,生成一個排好序的Key-values組成的文件。并將位置信息匯報給Master。

3、 Reduce階段

a) Master對Reduce的預處理:Master為Worker分配Reduce任務,他會將所有Mapper產(chǎn)生的數(shù)據(jù)進行映射,將相同key的任務分配給某個Worker。

b) 調(diào)用Reduce函數(shù):各個Worker將分配到的數(shù)據(jù)集進行排序(使用工具類Merg),并調(diào)用用戶自定義的Reduce函數(shù),并將結果寫入HDFS。

每個Worker的Reduce任務完成后,都會在HDFS中生成一個輸出文件。Hadoop并不將這些文件合并,因為這些文件往往會作為另一個Map/reduce程序的輸入。

以上的流程,粗略概括,就是從HDFS中獲取數(shù)據(jù),將其按照大小分片,進行分布式處理,最終輸出結果。從流程來看,Hadoop框架進行數(shù)據(jù)處理有以下要求:

1、 數(shù)據(jù)已經(jīng)存在在HDFS當中。

2、 數(shù)據(jù)間是少關聯(lián)的。各個任務執(zhí)行器在執(zhí)行負責的數(shù)據(jù)時,無需考慮對其他數(shù)據(jù)的影響,數(shù)據(jù)之間應盡可能是無聯(lián)系、不會影響的。

使用Hadoop,適合大批量的數(shù)據(jù)處理,這是他所擅長的。由于基于Map/Reduce這種單級的數(shù)據(jù)處理模型進行,因此,如果數(shù)據(jù)間的關聯(lián)系較大,需要進行數(shù)據(jù)的多級交互處理(某個階段的處理數(shù)據(jù)依賴于上一個階段),需要進行多次map/reduce。又由于map/reduce每次執(zhí)行都需要遍歷整個數(shù)據(jù)集,對于數(shù)據(jù)的實時計算并不合適,于是有了storm。

對比Hadoop的批處理,Storm是個實時的、分布式以及具備高容錯的計算系統(tǒng)。同Hadoop一樣Storm也可以處理大批量的數(shù)據(jù),然而Storm在保證高可靠性的前提下還可以讓處理進行的更加實時;也就是說,所有的信息都會被處理。Storm同樣還具備容錯和分布計算這些特性,這就讓Storm可以擴展到不同的機器上進行大批量的數(shù)據(jù)處理。他同樣還有以下的這些特性:

易于擴展:對于擴展,伴隨著業(yè)務的發(fā)展,我們的數(shù)據(jù)量、計算量可能會越來越大,所以希望這個系統(tǒng)是可擴展的。你只需要添加機器和改變對應的topology(拓撲)設置。Storm使用Hadoop
Zookeeper進行集群協(xié)調(diào),這樣可以充分的保證大型集群的良好運行。每條信息的處理都可以得到保證。Storm集群管理簡易。Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關閉。而在執(zhí)行中出現(xiàn)錯誤時,也會由Storm重新分配任務。這是分布式系統(tǒng)中通用問題。一個節(jié)點掛了不能影響我的應用。低延遲。都說了是實時計算系統(tǒng)了,延遲是一定要低的。
盡管通常使用Java,Storm中的topology可以用任何語言設計。

在線實時流處理模型

對于處理大批量數(shù)據(jù)的Map/reduce程序,在任務完成之后就停止了,但Storm是用于實時計算的,所以,相應的處理程序會一直執(zhí)行(等待任務,有任務則執(zhí)行)直至手動停止。

對于Storm,他是實時處理模型,與hadoop的不同是,他是針對在線業(yè)務而存在的計算平臺,如統(tǒng)計某用戶的交易量、生成為某個用戶的推薦列表等實時性高的需求。他是一個“流處理”框架。何謂流處理?storm將數(shù)據(jù)以Stream的方式,并按照Topology的順序,依次處理并最終生成結果。

當然為了更好的理解文章,你首先需要安裝和設置Storm。需要通過以下幾個簡單的步驟:

從Storm官方下載Storm安裝文件將bin/directory解壓到你的PATH上,并保證bin/storm腳本是可執(zhí)行的。

盡管 Storm 是使用 Clojure 語言開發(fā)的,您仍然可以在 Storm 中使用幾乎任何語言編寫應用程序。所需的只是一個連接到 Storm 的架構的適配器。已存在針對 Scala、JRuby、Perl 和 PHP 的適配器,但是還有支持流式傳輸?shù)?Storm 拓撲結構中的結構化查詢語言適配器。

2. Storm的組件

Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區(qū)別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。

Storm集群主要由一個主節(jié)點(Nimbus后臺程序)和一群工作節(jié)點(worker
node)Supervisor的節(jié)點組成,通過 Zookeeper進行協(xié)調(diào)。Nimbus類似Hadoop里面的JobTracker。Nimbus負責在集群里面分發(fā)代碼,分配計算任務給機器,
并且監(jiān)控狀態(tài)。

每一個工作節(jié)點上面運行一個叫做Supervisor的節(jié)點。Supervisor會監(jiān)聽分配給它那臺機器的工作,根據(jù)需要啟動/關閉工作進程。每一個工作進程執(zhí)行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。

1、 Nimbus主節(jié)點:

主節(jié)點通常運行一個后臺程序 —— Nimbus,用于響應分布在集群中的節(jié)點,分配任務和監(jiān)測故障。這個很類似于Hadoop中的Job Tracker。

2、Supervisor工作節(jié)點:

工作節(jié)點同樣會運行一個后臺程序 —— Supervisor,用于收聽工作指派并基于要求運行工作進程。每個工作節(jié)點都是topology中一個子集的實現(xiàn)。而Nimbus和Supervisor之間的協(xié)調(diào)則通過Zookeeper系統(tǒng)或者集群。

3、Zookeeper

Zookeeper是完成Supervisor和Nimbus之間協(xié)調(diào)的服務。而應用程序實現(xiàn)實時的邏輯則被封裝進Storm中的“topology”。topology則是一組由Spouts(數(shù)據(jù)源)和Bolts(數(shù)據(jù)操作)通過Stream Groupings進行連接的圖。下面對出現(xiàn)的術語進行更深刻的解析。

4、Worker:

運行具體處理組件邏輯的進程。

5、Task:

worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之后,task不再與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。

6、Topology(拓撲):

storm中運行的一個實時應用程序,因為各個組件間的消息流動形成邏輯上的一個拓撲結構。一個topology是spouts和bolts組成的圖,
通過stream groupings將圖中的spouts和bolts連接起來,如下圖:

一個topology會一直運行直到你手動kill掉,Storm自動重新分配執(zhí)行失敗的任務, 并且Storm可以保證你不會有數(shù)據(jù)丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上。

運行一個topology很簡單。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然后運行類似下面的這個命令:

storm jar all-my-code.jar
backtype.storm.MyTopology arg1 arg2

這個命令會運行主類:backtype.strom.MyTopology, 參數(shù)是arg1,arg2。這個類的main函數(shù)定義這個topology并且把它提交給Nimbus。storm
jar負責連接到Nimbus并且上傳jar包。

Topology的定義是一個Thrift結構,并且Nimbus就是一個Thrift服務, 你可以提交由任何語言創(chuàng)建的topology。上面的方面是用JVM-based語言提交的最簡單的方法。

7、Spout:

消息源spout是Storm里面一個topology里面的消息生產(chǎn)者。簡而言之,Spout從來源處讀取數(shù)據(jù)并放入topology。Spout分成可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數(shù)據(jù)項組成的列表)進行重發(fā);而不可靠的Spout不會考慮接收成功與否只發(fā)射一次。

消息源可以發(fā)射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,然后使用SpoutOutputCollector來發(fā)射指定的stream。

而Spout中最主要的方法就是nextTuple(),該方法會發(fā)射一個新的tuple到topology,如果沒有新tuple發(fā)射則會簡單的返回。

要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調(diào)用所有消息源spout的方法。

另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調(diào)用ack,否則調(diào)用fail。storm只對可靠的spout調(diào)用ack和fail。

8、Bolt:

Topology中所有的處理都由Bolt完成。即所有的消息處理邏輯被封裝在bolts里面。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/數(shù)據(jù)庫、等等。

Bolt從Spout中接收數(shù)據(jù)并進行處理,如果遇到復雜流的處理也可能將tuple發(fā)送給另一個Bolt進行處理。即需要經(jīng)過很多blots。比如算出一堆圖片里面被轉發(fā)最多的圖片就至少需要兩步:第一步算出每個圖片的轉發(fā)數(shù)量。第二步找出轉發(fā)最多的前10個圖片。(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。

Bolts可以發(fā)射多條消息流,
使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發(fā)射的stream。

而Bolt中最重要的方法是execute(),以新的tuple作為參數(shù)接收。不管是Spout還是Bolt,如果將tuple發(fā)射成多個流,這些流都可以通過declareStream()來聲明。

bolts使用OutputCollector來發(fā)射tuple,bolts必須要為它處理的每一個tuple調(diào)用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發(fā)射者spouts。
一般的流程是: bolts處理一個輸入tuple, 發(fā)射0個或者多個tuple, 然后調(diào)用ack通知storm自己已經(jīng)處理過這個tuple了。storm提供了一個IBasicBolt會自動調(diào)用ack。

9、Tuple:

一次消息傳遞的基本單元。本來應該是一個key-value的map,但是由于各個組件間傳遞的tuple的字段名稱已經(jīng)事先定義好,所以tuple中只要按序填入各個value就行了,所以就是一個value list.

10、Stream:

源源不斷傳遞的tuple就組成了stream。消息流stream是storm里的關鍵抽象。一個消息流是一個沒有邊界的tuple序列,
而這些tuple序列會以一種分布式的方式并行地創(chuàng)建和處理。通過對stream中tuple序列中每個字段命名來定義stream。在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定義類型(只要實現(xiàn)相應的序列化器)。

每個消息流在定義的時候會被分配給一個id,因為單向消息流使用的相當普遍, OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用指定這個id。在這種情況下這個stream會分配個值為‘default’默認的id 。

Storm提供的最基本的處理stream的原語是spout和bolt。你可以實現(xiàn)spout和bolt提供的接口來處理你的業(yè)務邏輯。

11、Stream Groupings:

Stream Grouping定義了一個流在Bolt任務間該如何被切分。這里有Storm提供的6個Stream Grouping類型:

1). 隨機分組(Shuffle grouping):隨機分發(fā)tuple到Bolt的任務,保證每個任務獲得相等數(shù)量的tuple。

2). 字段分組(Fields grouping):根據(jù)指定字段分割數(shù)據(jù)流,并分組。例如,根據(jù)“user-id”字段,相同“user-id”的元組總是分發(fā)到同一個任務,不同“user-id”的元組可能分發(fā)到不同的任務。

3). 全部分組(All grouping):tuple被復制到bolt的所有任務。這種類型需要謹慎使用。

4). 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。

5). 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效于隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執(zhí)行(如果可能)。

6). 直接分組(Direct grouping):這是一個特別的分組類型。元組生產(chǎn)者決定tuple由哪個元組處理者任務接收。

當然還可以實現(xiàn)CustomStreamGroupimg接口來定制自己需要的分組。

storm 和hadoop的對比來了解storm中的基本概念。

Hadoop Storm
系統(tǒng)角色 JobTracker Nimbus
TaskTracker Supervisor
Child Worker
應用名稱 Job Topology
組件接口 Mapper/Reducer Spout/Bolt


3. Storm應用場景

Storm 與其他大數(shù)據(jù)解決方案的不同之處在于它的處理方式。Hadoop 在本質上是一個批處理系統(tǒng)。數(shù)據(jù)被引入 Hadoop 文件系統(tǒng) (HDFS)
并分發(fā)到各個節(jié)點進行處理。當處理完成時,結果數(shù)據(jù)返回到 HDFS 供始發(fā)者使用。Storm 支持創(chuàng)建拓撲結構來轉換沒有終點的數(shù)據(jù)流。不同于 Hadoop 作業(yè),這些轉換從不停止,它們會持續(xù)處理到達的數(shù)據(jù)。

Twitter列舉了Storm的三大類應用:

1.信息流處理{Streamprocessing}
Storm可用來實時處理新數(shù)據(jù)和更新數(shù)據(jù)庫,兼具容錯性和可擴展性。即Storm可以用來處理源源不斷流進來的消息,處理之后將結果寫入到某個存儲中去。

2.連續(xù)計算{Continuouscomputation}
Storm可進行連續(xù)查詢并把結果即時反饋給客戶端。比如把Twitter上的熱門話題發(fā)送到瀏覽器中。

3.分布式遠程程序調(diào)用{DistributedRPC}
Storm可用來并行處理密集查詢。Storm的拓撲結構是一個等待調(diào)用信息的分布函數(shù),當它收到一條調(diào)用信息后,會對查詢進行計算,并返回查詢結果。舉個例子DistributedRPC可以做并行搜索或者處理大集合的數(shù)據(jù)。

通過配置drpc服務器,將storm的topology發(fā)布為drpc服務。客戶端程序可以調(diào)用drpc服務將數(shù)據(jù)發(fā)送到storm集群中,并接收處理結果的反饋。這種方式需要drpc服務器進行轉發(fā),其中drpc服務器底層通過thrift實現(xiàn)。適合的業(yè)務場景主要是實時計算。并且擴展性良好,可以增加每個節(jié)點的工作worker數(shù)量來動態(tài)擴展。

4. 項目實施,構建Topology

當下情況我們需要給Spout和Bolt設計一種能夠處理大量數(shù)據(jù)(日志文件)的topology,當一個特定數(shù)據(jù)值超過預設的臨界值時促發(fā)警報。使用Storm的topology,逐行讀入日志文件并且監(jiān)視輸入數(shù)據(jù)。在Storm組件方面,Spout負責讀入輸入數(shù)據(jù)。它不僅從現(xiàn)有的文件中讀入數(shù)據(jù),同時還監(jiān)視著新文件。文件一旦被修改Spout會讀入新的版本并且覆蓋之前的tuple(可以被Bolt讀入的格式),將tuple發(fā)射給Bolt進行臨界分析,這樣就可以發(fā)現(xiàn)所有可能超臨界的記錄。

下一節(jié)將對用例進行詳細介紹。

臨界分析

這一節(jié),將主要聚焦于臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。

瞬間臨界值監(jiān)測:一個字段的值在那個瞬間超過了預設的臨界值,如果條件符合的話則觸發(fā)一個trigger。舉個例子當車輛超越80公里每小時,則觸發(fā)trigger。時間序列臨界監(jiān)測:字段的值在一個給定的時間段內(nèi)超過了預設的臨界值,如果條件符合則觸發(fā)一個觸發(fā)器。比如:在5分鐘類,時速超過80KM兩次及以上的車輛。

Listing One顯示了我們將使用的一個類型日志,其中包含的車輛數(shù)據(jù)信息有:車牌號、車輛行駛的速度以及數(shù)據(jù)獲取的位置。

AB 123 60 North city
BC 123 70 South city
CD 234 40 South city
DE 123 40 East city
EF 123 90 South city
GH 123 50 West city

這里將創(chuàng)建一個對應的XML文件,這將包含引入數(shù)據(jù)的模式。這個XML將用于日志文件的解析。XML的設計模式和對應的說明請見下表。

XML文件和日志文件都存放在Spout可以隨時監(jiān)測的目錄下,用以關注文件的實時更新。而這個用例中的topology請見下圖。

Figure 1:Storm中建立的topology,用以實現(xiàn)數(shù)據(jù)實時處理

如圖所示:FilelistenerSpout接收輸入日志并進行逐行的讀入,接著將數(shù)據(jù)發(fā)射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數(shù)據(jù)將發(fā)送給DBWriterBolt,然后由DBWriterBolt存入給數(shù)據(jù)庫。下面將對這個過程的實現(xiàn)進行詳細的解析。

Spout的實現(xiàn)

Spout以日志文件和XML描述文件作為接收對象。XML文件包含了與日志一致的設計模式。不妨設想一下一個示例日志文件,包含了車輛的車牌號、行駛速度、以及數(shù)據(jù)的捕獲位置。(看下圖)

Figure2:數(shù)據(jù)從日志文件到Spout的流程圖

Listing Two顯示了tuple對應的XML,其中指定了字段、將日志文件切割成字段的定界符以及字段的類型。XML文件以及數(shù)據(jù)都被保存到Spout指定的路徑。

Listing Two:用以描述日志文件的XML文件。

<TUPLEINFO> 
<FIELDLIST> 
<FIELD> 
<COLUMNNAME>vehicle_number</COLUMNNAME> 
<COLUMNTYPE>string</COLUMNTYPE> 
</FIELD> 
 
<FIELD>
<COLUMNNAME>speed</COLUMNNAME> 
<COLUMNTYPE>int</COLUMNTYPE> 
</FIELD> 
 
<FIELD> 
<COLUMNNAME>location</COLUMNNAME> 
<COLUMNTYPE>string</COLUMNTYPE> 
</FIELD> 
</FIELDLIST> 
<DELIMITER>,</DELIMITER> 
</TUPLEINFO>   

通過構造函數(shù)及它的參數(shù)Directory、PathSpout和TupleInfo對象創(chuàng)建Spout對象。TupleInfo儲存了日志文件的字段、定界符、字段的類型這些很必要的信息。這個對象通過XSTream序列化XML時建立。

Spout的實現(xiàn)步驟:

對文件的改變進行分開的監(jiān)聽,并監(jiān)視目錄下有無新日志文件添加。在數(shù)據(jù)得到了字段的說明后,將其轉換成tuple。聲明Spout和Bolt之間的分組,并決定tuple發(fā)送給Bolt的途徑。

Spout的具體編碼在Listing Three中顯示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。

public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )   
{   
           _collector = collector;   
         try   
         {   
         fileReader  =  new BufferedReader(new FileReader(new File(file)));  
         }  
         catch (FileNotFoundException e)  
         {  
         System.exit(1);   
         }  
}                                                          
 
public void nextTuple()  
{  
         protected void ListenFile(File file)  
         {  
         Utils.sleep(2000);  
         RandomAccessFile access = null;  
         String line = null;   
            try   
            {  
                while ((line = access.readLine()) != null)  
                {  
                    if (line !=null)  
                    {   
                         String[] fields=null;  
                          if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());   
                          else   
                          fields = line.split  (tupleInfo.getDelimiter());   
                          if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));  
                    }  
               }  
            }  
            catch (IOException ex){ }  
            }  
}  
 
public void declareOutputFields(OutputFieldsDeclarer declarer)  
{  
      String[] fieldsArr = new String [tupleInfo.getFieldList().size()];  
      for(int i=0; i<tupleInfo.getFieldList().size(); i++)  
      {  
              fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();  
      }  
declarer.declare(new Fields(fieldsArr));  
}      

declareOutputFileds()決定了tuple發(fā)射的格式,這樣的話Bolt就可以用類似的方法將tuple譯碼。Spout持續(xù)對日志文件的數(shù)據(jù)的變更進行監(jiān)聽,一旦有添加Spout就會進行讀入并且發(fā)送給Bolt進行處理。

Bolt的實現(xiàn)

Spout的輸出結果將給予Bolt進行更深一步的處理。經(jīng)過對用例的思考,我們的topology中需要如Figure 3中的兩個Bolt。

Figure 3:Spout到Bolt的數(shù)據(jù)流程。

ThresholdCalculatorBolt

Spout將tuple發(fā)出,由ThresholdCalculatorBolt接收并進行臨界值處理。在這里,它將接收好幾項輸入進行檢查;分別是:

臨界值檢查

臨界值欄數(shù)檢查(拆分成字段的數(shù)目)臨界值數(shù)據(jù)類型(拆分后字段的類型)臨界值出現(xiàn)的頻數(shù)臨界值時間段檢查

Listing Four中的類,定義用來保存這些值。

Listing Four:ThresholdInfo類

public class ThresholdInfo implementsSerializable  
 
{    
        private String action;   
        private String rule;   
        private Object thresholdValue;  
        private int thresholdColNumber;   
        private Integer timeWindow;   
        private int frequencyOfOccurence;   
}   

基于字段中提供的值,臨界值檢查將被Listing Five中的execute()方法執(zhí)行。代碼大部分的功能是解析和接收值的檢測。

Listing Five:臨界值檢測代碼段

public void execute(Tuple tuple, BasicOutputCollector collector)   
{  
    if(tuple!=null)   
    {  
        List<Object> inputTupleList = (List<Object>) tuple.getValues();  
        int thresholdColNum = thresholdInfo.getThresholdColNumber();   
        Object thresholdValue = thresholdInfo.getThresholdValue();   
        String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();   
        Integer timeWindow = thresholdInfo.getTimeWindow();  
         int frequency = thresholdInfo.getFrequencyOfOccurence();  
         if(thresholdDataType.equalsIgnoreCase("string"))  
         {  
             String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();  
             String frequencyChkOp = thresholdInfo.getAction();  
             if(timeWindow!=null)  
             {  
                 long curTime = System.currentTimeMillis();  
                 long diffInMinutes = (curTime-startTime)/(1000);  
                 if(diffInMinutes>=timeWindow)  
                 {  
                     if(frequencyChkOp.equals("=="))  
                     {  
                          if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                          {  
                              count.incrementAndGet();  
                              if(count.get() > frequency)  
                                  splitAndEmit(inputTupleList,collector);  
                          }  
                     }  
                     else if(frequencyChkOp.equals("!="))  
                     {  
                         if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                         {  
                              count.incrementAndGet();  
                              if(count.get() > frequency)  
                                  splitAndEmit(inputTupleList,collector);  
                          }  
                      }  
                      else                         System.out.println("Operator not supported");   
                  }  
              }  
              else 
              {  
                  if(frequencyChkOp.equals("=="))  
                  {  
                      if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                      {  
                          count.incrementAndGet();  
                          if(count.get() > frequency)  
                              splitAndEmit(inputTupleList,collector);  
                          }  
                  }  
                  else if(frequencyChkOp.equals("!="))  
                  {  
                       if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                       {  
                           count.incrementAndGet();  
                           if(count.get() > frequency)  
                               splitAndEmit(inputTupleList,collector);  
                          }  
                   }  
               }  
            }  
            else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))  
            {  
                String frequencyChkOp = thresholdInfo.getAction();  
                if(timeWindow!=null)  
                {  
                     long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());  
                     long curTime = System.currentTimeMillis();  
                     long diffInMinutes = (curTime-startTime)/(1000);  
                     System.out.println("Difference in minutes="+diffInMinutes);  
                     if(diffInMinutes>=timeWindow)  
                     {  
                          if(frequencyChkOp.equals("<"))  
                          {  
                              if(valueToCheck < Double.parseDouble(thresholdValue.toString()))  
                              {  
                                   count.incrementAndGet();  
                                   if(count.get() > frequency)  
                                       splitAndEmit(inputTupleList,collector);  
                              }  
                          }  
                          else if(frequencyChkOp.equals(">"))  
                          {  
                               if(valueToCheck > Double.parseDouble(thresholdValue.toString()))  
                                {  
                                   count.incrementAndGet();  
                                   if(count.get() > frequency)  
                                       splitAndEmit(inputTupleList,collector);  
                               }  
                           }  
                           else if(frequencyChkOp.equals("=="))  
                           {  
                              if(valueToCheck == Double.parseDouble(thresholdValue.toString()))  
                              {  
                                  count.incrementAndGet();  
                                  if(count.get() > frequency)  
                                      splitAndEmit(inputTupleList,collector);  
                               }  
                           }  
                           else if(frequencyChkOp.equals("!="))  
                           {  
    . . .  
                            }  
                       }  
             }  
      else 
          splitAndEmit(null,collector);  
      }  
      else 
     {  
           System.err.println("Emitting null in bolt");  
           splitAndEmit(null,collector);  
    }  
} 

經(jīng)由Bolt發(fā)送的的tuple將會傳遞到下一個對應的Bolt,在我們的用例中是DBWriterBolt。

DBWriterBolt

經(jīng)過處理的tuple必須被持久化以便于觸發(fā)tigger或者更深層次的使用。DBWiterBolt做了這個持久化的工作并把tuple存入了數(shù)據(jù)庫。表的建立由prepare()函數(shù)完成,這也將是topology調(diào)用的第一個方法。方法的編碼如Listing Six所示。

Listing Six:建表編碼。

public void prepare( Map StormConf, TopologyContext context )   
{         
    try   
    {  
        Class.forName(dbClass);  
    }   
    catch (ClassNotFoundException e)   
    {  
        System.out.println("Driver not found");  
        e.printStackTrace();  
    }  
   
    try   
    {  
       connection driverManager.getConnection(   
           "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);  
       connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();  
   
       StringBuilder createQuery = new StringBuilder(  
           "CREATE TABLE IF NOT EXISTS "+tableName+"(");  
       for(Field fields : tupleInfo.getFieldList())  
       {  
           if(fields.getColumnType().equalsIgnoreCase("String"))  
               createQuery.append(fields.getColumnName()+" VARCHAR(500),");  
           else 
               createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");  
       }  
       createQuery.append("thresholdTimeStamp timestamp)");  
       connection.prepareStatement(createQuery.toString()).execute();  
   
       // Insert Query  
       StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");  
       String tempCreateQuery = new String();  
       for(Field fields : tupleInfo.getFieldList())  
       {  
            insertQuery.append(fields.getColumnName()+",");  
       }  
       insertQuery.append("thresholdTimeStamp").append(") values (");  
       for(Field fields : tupleInfo.getFieldList())  
       {  
           insertQuery.append("?,");  
       }  
   
       insertQuery.append("?)");  
       prepStatement = connection.prepareStatement(insertQuery.toString());  
    }  
    catch (SQLException e)   
    {         
        e.printStackTrace();  
    }         
}  

數(shù)據(jù)分批次的插入數(shù)據(jù)庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現(xiàn)可能存在不同類型輸入的解析。

Listing Seven:數(shù)據(jù)插入的代碼部分。

public void execute(Tuple tuple, BasicOutputCollector collector)   
{  
    batchExecuted=false;  
    if(tuple!=null)  
    {  
       List<Object> inputTupleList = (List<Object>) tuple.getValues();  
       int dbIndex=0;  
       for(int i=0;i<tupleInfo.getFieldList().size();i++)  
       {  
           Field field = tupleInfo.getFieldList().get(i);  
           try {  
               dbIndex = i+1;  
               if(field.getColumnType().equalsIgnoreCase("String"))               
                   prepStatement.setString(dbIndex, inputTupleList.get(i).toString());  
               else if(field.getColumnType().equalsIgnoreCase("int"))  
                   prepStatement.setInt(dbIndex,  
                       Integer.parseInt(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("long"))  
                   prepStatement.setLong(dbIndex,   
                       Long.parseLong(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("float"))  
                   prepStatement.setFloat(dbIndex,   
                       Float.parseFloat(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("double"))  
                   prepStatement.setDouble(dbIndex,   
                       Double.parseDouble(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("short"))  
                   prepStatement.setShort(dbIndex,   
                       Short.parseShort(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("boolean"))  
                   prepStatement.setBoolean(dbIndex,   
                       Boolean.parseBoolean(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("byte"))  
                   prepStatement.setByte(dbIndex,   
                       Byte.parseByte(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("Date"))  
               {  
                  Date dateToAdd=null;  
                  if (!(inputTupleList.get(i) instanceof Date))    
                  {    
                       DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");  
                       try   
                       {  
                           dateToAdd = df.parse(inputTupleList.get(i).toString());  
                       }  
                       catch (ParseException e)   
                       {  
                           System.err.println("Data type not valid");  
                       }  
                   }    
                   else 
                   {  
            dateToAdd = (Date)inputTupleList.get(i);  
            java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());  
            prepStatement.setDate(dbIndex, sqlDate);  
            }     
            }   
        catch (SQLException e)   
        {  
             e.printStackTrace();  
        }  
    }  
    Date now = new Date();            
    try 
    {  
        prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));  
        prepStatement.addBatch();  
        counter.incrementAndGet();  
        if (counter.get()== batchSize)   
        executeBatch();  
    }   
    catch (SQLException e1)   
    {  
        e1.printStackTrace();  
    }             
   }  
   else 
   {  
        long curTime = System.currentTimeMillis();  
       long diffInSeconds = (curTime-startTime)/(60*1000);  
       if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)  
       {  
            try {  
                executeBatch();  
                startTime = System.currentTimeMillis();  
            }  
            catch (SQLException e) {  
                 e.printStackTrace();  
            }  
       }  
   }  
}  
   
public void executeBatch() throws SQLException  
{  
    batchExecuted=true;  
    prepStatement.executeBatch();  
    counter = new AtomicInteger(0);  
} 

一旦Spout和Bolt準備就緒(等待被執(zhí)行),topology生成器將會建立topology并準備執(zhí)行。下面就來看一下執(zhí)行步驟。

在本地集群上運行和測試topology

通過TopologyBuilder建立topology。使用Storm Submitter,將topology遞交給集群。以topology的名字、配置和topology的對象作為參數(shù)。提交topology。

Listing Eight:建立和執(zhí)行topology。

public class StormMain  
{  
     public static void main(String[] args) throws AlreadyAliveException,   
                                                   InvalidTopologyException,   
                                                   InterruptedException   
     {  
          ParallelFileSpout parallelFileSpout = new ParallelFileSpout();  
          ThresholdBolt thresholdBolt = new ThresholdBolt();  
          DBWriterBolt dbWriterBolt = new DBWriterBolt();  
          TopologyBuilder builder = new TopologyBuilder();  
          builder.setSpout("spout", parallelFileSpout, 1);  
          builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");  
          builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");  
          if(this.argsMain!=null && this.argsMain.length > 0)   
          {  
              conf.setNumWorkers(1);  
              StormSubmitter.submitTopology(   
                   this.argsMain[0], conf, builder.createTopology());  
          }  
          else 
          {      
              Config conf = new Config();  
              conf.setDebug(true);  
              conf.setMaxTaskParallelism(3);  
              LocalCluster cluster = new LocalCluster();  
              cluster.submitTopology(  
              "Threshold_Test", conf, builder.createTopology());  
          }  
     }  
} 

topology被建立后將被提交到本地集群。一旦topology被提交,除非被取締或者集群關閉,它將一直保持運行不需要做任何的修改。這也是Storm的另一大特色之一。

這個簡單的例子體現(xiàn)了當你掌握了topology、spout和bolt的概念,將可以輕松的使用Storm進行實時處理。如果你既想處理大數(shù)據(jù)又不想遍歷Hadoop的話,不難發(fā)現(xiàn)使用Storm將是個很好的選擇。

5. storm常見問題解答

一、我有一個數(shù)據(jù)文件,或者我有一個系統(tǒng)里面有數(shù)據(jù),怎么導入storm做計算?



你需要實現(xiàn)一個Spout,Spout負責將數(shù)據(jù)emit到storm系統(tǒng)里,交給bolts計算。怎么實現(xiàn)spout可以參考官方的kestrel spout實現(xiàn):

https://github.com/nathanmarz/storm-kestrel



如果你的數(shù)據(jù)源不支持事務性消費,那么就無法得到storm提供的可靠處理的保證,也沒必要實現(xiàn)ISpout接口中的ack和fail方法。



二、Storm為了保證tuple的可靠處理,需要保存tuple信息,這會不會導致內(nèi)存OOM?



Storm為了保證tuple的可靠處理,acker會保存該節(jié)點創(chuàng)建的tuple id的xor值,這稱為ack value,那么每ack一次,就將tuple id和ack value做異或(xor)。當所有產(chǎn)生的tuple都被ack的時候, ack value一定為0。這是個很簡單的策略,對于每一個tuple也只要占用約20個字節(jié)的內(nèi)存。對于100萬tuple,也才20M左右。關于可靠處理看這個:

https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing



三、Storm計算后的結果保存在哪里?可以保存在外部存儲嗎?



Storm不處理計算結果的保存,這是應用代碼需要負責的事情,如果數(shù)據(jù)不大,你可以簡單地保存在內(nèi)存里,也可以每次都更新數(shù)據(jù)庫,也可以采用NoSQL存儲。storm并沒有像s4那樣提供一個Persist API,根據(jù)時間或者容量來做存儲輸出。這部分事情完全交給用戶。



數(shù)據(jù)存儲之后的展現(xiàn),也是你需要自己處理的,storm UI只提供對topology的監(jiān)控和統(tǒng)計。



四、Storm怎么處理重復的tuple?



因為Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail并重新發(fā)送該tuple,那么就會有tuple重復計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:

(1)不處理,這也算是種策略。因為實時計算通常并不要求很高的精確度,后續(xù)的批處理計算會更正實時計算的誤差。

(2)使用第三方集中存儲來過濾,比如利用mysql,memcached或者redis根據(jù)邏輯主鍵來去重。

(3)使用bloom filter做過濾,簡單高效。



五、Storm的動態(tài)增刪節(jié)點



我在storm和s4里比較里談到的動態(tài)增刪節(jié)點,是指storm可以動態(tài)地添加和減少supervisor節(jié)點。對于減少節(jié)點來說,被移除的supervisor上的worker會被nimbus重新負載均衡到其他supervisor節(jié)點上。在storm 0.6.1以前的版本,增加supervisor節(jié)點不會影響現(xiàn)有的topology,也就是現(xiàn)有的topology不會重新負載均衡到新的節(jié)點上,在擴展集群的時候很不方便,需要重新提交topology。因此我在storm的郵件列表里提了這個問題,storm的開發(fā)者nathanmarz創(chuàng)建了一個issue
54并在0.6.1提供了rebalance命令來讓正在運行的topology重新負載均衡,具體見:

https://github.com/nathanmarz/storm/issues/54

和0.6.1的變更:

http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246



storm并不提供機制來動態(tài)調(diào)整worker和task數(shù)目。



六、Storm UI里spout統(tǒng)計的complete latency的具體含義是什么?為什么emit的數(shù)目會是acked的兩倍?

這個事實上是storm郵件列表里的一個問題。Storm作者marz的解答:

Thecompletelatencyisthetimefromthespoutemittingatupletothat
tuplebeingackedonthespout
.Soittracksthetimeforthewholetuple
treetobeprocessed.

IfyoudiveintothespoutcomponentintheUI,you'llseethatalotof
theemitted/transferredisonthe__ack*stream.Thisisthespout
communicatingwiththeackerswhichtakecareoftrackingthetupletrees.



簡單地說,complete latency表示了tuple從emit到被acked經(jīng)過的時間,可以認為是tuple以及該tuple的后續(xù)子孫(形成一棵樹)整個處理時間。其次spout的emit和transfered還統(tǒng)計了spout和acker之間內(nèi)部的通信信息,比如對于可靠處理的spout來說,會在emit的時候同時發(fā)送一個_ack_init給acker,記錄tuple
id到task id的映射,以便ack的時候能找到正確的acker task。

6. 其他開源的大數(shù)據(jù)解決方案

自 Google 在 2004 年推出 MapReduce 范式以來,已誕生了多個使用原始 MapReduce 范式(或擁有該范式的質量)的解決方案。Google 對 MapReduce 的最初應用是建立萬維網(wǎng)的索引。盡管此應用程序仍然很流行,但這個簡單模型解決的問題也正在增多。

表 1提供了一個可用開源大數(shù)據(jù)解決方案的列表,包括傳統(tǒng)的批處理和流式處理應用程序。在將 Storm 引入開源之前將近一年的時間里,Yahoo! 的 S4 分布式流計算平臺已向 Apache 開源。S4 于 2010 年 10 月發(fā)布,它提供了一個高性能計算 (HPC) 平臺,向應用程序開發(fā)人員隱藏了并行處理的復雜性。S4
實現(xiàn)了一個可擴展的、分散化的集群架構,并納入了部分容錯功能。



表 1. 開源大數(shù)據(jù)解決方案

解決方案 開發(fā)商 類型 描述
Storm Twitter 流式處理 Twitter 的新流式大數(shù)據(jù)分析解決方案
S4 Yahoo! 流式處理 來自 Yahoo! 的分布式流計算平臺
Hadoop Apache 批處理 MapReduce 范式的第一個開源實現(xiàn)
Spark UC Berkeley AMPLab 批處理 支持內(nèi)存中數(shù)據(jù)集和恢復能力的最新分析平臺
Disco Nokia 批處理 Nokia 的分布式 MapReduce 框架
HPCC LexisNexis 批處理 HPC 大數(shù)據(jù)集群

csdn(編譯/仲浩 王旭東/審校):http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis

原文鏈接:Easy, Real-Time Big Data Analysis Using Storm

Meet so Meet.
C plusplus
I-PLUS....

總結

以上是生活随笔為你收集整理的使用Storm实现实时大数据分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。