storm 机器上日志查询_Storm原理与实践大数据技术栈14
回顧:大數(shù)據(jù)平臺(tái)技術(shù)棧?(ps:可點(diǎn)擊查看),今天就來說說其中的Storm!
來自:有米加瓦
一、Storm簡(jiǎn)介
1. 引例
在介紹Storm之前,我們先看一個(gè)日志統(tǒng)計(jì)的例子:假如我們想要根據(jù)用戶的訪問日志統(tǒng)計(jì)使用斗魚客戶端的用戶的地域分布情況,一般情況下我們會(huì)分這幾步:
取出訪問日志中客戶端的IP
把IP轉(zhuǎn)換成對(duì)應(yīng)地域
按照地域進(jìn)行統(tǒng)計(jì)
Hadoop貌似就可以輕松搞定:
map做ip提取,轉(zhuǎn)換成地域
reduce以地域?yàn)閗ey聚合,計(jì)數(shù)統(tǒng)計(jì)
從HDFS取出結(jié)果
如果有時(shí)效性要求呢?
小時(shí)級(jí):還行,每小時(shí)跑一個(gè)MapReduce Job
10分鐘:還湊合能跑
5分鐘 :夠嗆了,等槽位可能要幾分鐘呢
1分鐘 :算了吧,啟動(dòng)Job就要幾十秒呢
秒級(jí) :… 要滿足秒級(jí)別的數(shù)據(jù)統(tǒng)計(jì)需求,需要
進(jìn)程常駐運(yùn)行;
數(shù)據(jù)在內(nèi)存中
Storm正好適合這種需求。
2. 特性
Storm是一個(gè)分布式實(shí)時(shí)流式計(jì)算平臺(tái)。主要特性如下:
簡(jiǎn)單的編程模型:類似于MapReduce降低了并行批處理復(fù)雜性,Storm降低了實(shí)時(shí)處理的復(fù)雜性,只需實(shí)現(xiàn)幾個(gè)接口即可(Spout實(shí)現(xiàn)ISpout接口,Bolt實(shí)現(xiàn)IBolt接口)。
支持多種語言:你可以在Storm之上使用各種編程語言。默認(rèn)支持Clojure、Java、Ruby和Python。要增加對(duì)其他語言的支持,只需實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Storm通信協(xié)議即可。
容錯(cuò)性:nimbus、supervisor都是無狀態(tài)的, 可以用kill -9來殺死Nimbus和Supervisor進(jìn)程, 然后再重啟它們,任務(wù)照常進(jìn)行; 當(dāng)worker失敗后, supervisor會(huì)嘗試在本機(jī)重啟它。
分布式:計(jì)算是在多個(gè)線程、進(jìn)程和服務(wù)器之間并行進(jìn)行的。
持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失。
可靠的消息處理:Storm保證每個(gè)消息至少能得到一次完整處理。任務(wù)失敗時(shí),它會(huì)負(fù)責(zé)從消息源重試消息(ack機(jī)制)。
快速、實(shí)時(shí):Storm保證每個(gè)消息能能得到快速的處理。
3. 與常用其他大數(shù)據(jù)計(jì)算平臺(tái)對(duì)比
Storm vs. MapReduce Storm的一個(gè)拓?fù)涑qv內(nèi)存運(yùn)行,MR作業(yè)運(yùn)行完了進(jìn)行就被kill了;storm是流式處理,MR是批處理;Storm數(shù)據(jù)在內(nèi)存中不寫磁盤,而MR會(huì)與磁盤進(jìn)行交互;Storm的DAG(有向無環(huán)圖)模型可以組合多個(gè)階段,而MR只可以有MAP和REDUCE兩個(gè)階段。
Storm vs. Spark Streaming Storm處理的是每次傳入的一條數(shù)據(jù),Spark Streaming實(shí)際處理的是微批量數(shù)據(jù)。
二、Storm的架構(gòu)和運(yùn)行時(shí)原理
1. 集群架構(gòu)
如上圖所示,一個(gè)典型的storm集群包含一個(gè)主控節(jié)點(diǎn)Nimbus,負(fù)責(zé)資源分配和任務(wù)調(diào)度;還有若干個(gè)子節(jié)點(diǎn)Supervisor,負(fù)責(zé)接受nimbus分配的任務(wù),啟動(dòng)和停止屬于自己管理的worker進(jìn)程;Nimbus和Supervisor之間的所有協(xié)調(diào)工作都是通過Zookeeper集群完成。
2. Storm的容錯(cuò)(Fault Tolerance)機(jī)制
Nimbus和Supervisor進(jìn)程被設(shè)計(jì)成快速失敗(fail fast)的(當(dāng)遇到異常的情況,進(jìn)程就會(huì)掛掉)并且是無狀態(tài)的(狀態(tài)都保存在Zookeeper或者在磁盤上)。
Nimbus與Supervisor本身也是無狀態(tài)的,狀態(tài)信息是由zookeeper存儲(chǔ)(實(shí)現(xiàn)了高可用,當(dāng)nimbus掛掉,可以找另外一個(gè)節(jié)點(diǎn)啟動(dòng)nimbus進(jìn)程,狀態(tài)信息從zookeeper獲得)。
在Nimbus進(jìn)程失敗后,可以快速重啟恢復(fù)正常工作,不需要很長(zhǎng)的時(shí)間來進(jìn)行初始化和狀態(tài)恢復(fù)。
當(dāng)Nimbus從zookeeper得知有supervisor節(jié)點(diǎn)掛掉,可以將該節(jié)點(diǎn)的任務(wù)重新分配給其他子節(jié)點(diǎn)。
Nimbus在“某種程度”上屬于單點(diǎn)故障的。在實(shí)際中,即使Nimbus進(jìn)程掛掉,也不會(huì)有災(zāi)難性的事情發(fā)生 。
當(dāng)Nimbus掛掉會(huì)怎樣?
已經(jīng)存在的拓?fù)淇梢岳^續(xù)正常運(yùn)行,但是不能提交新拓?fù)?#xff1b;
正在運(yùn)行的worker進(jìn)程仍然可以繼續(xù)工作。而且當(dāng)worker掛掉,Supervisor會(huì)一直重啟worker。
失敗的任務(wù)不會(huì)被分配到其他機(jī)器(是Nimbus的職責(zé))上了
當(dāng)一個(gè)Supervisor(slave節(jié)點(diǎn))掛掉會(huì)怎樣?
分配到這臺(tái)機(jī)器的所有任務(wù)(task)會(huì)超時(shí),Nimbus會(huì)把這些任務(wù)(task)重新分配給其他機(jī)器。當(dāng)一個(gè)worker掛掉會(huì)怎么樣?
當(dāng)一個(gè)worker掛掉,Supervisor會(huì)重啟它。如果啟動(dòng)一直失敗那么此時(shí)worker也就不能和Nimbus保持心跳了,Nimbus會(huì)重新分配worker到其他機(jī)器
3. Storm的編程模型
Strom在運(yùn)行中可分為spout與bolt兩個(gè)組件,其中,數(shù)據(jù)源從spout開始,數(shù)據(jù)以tuple的方式發(fā)送到bolt,多個(gè)bolt可以串連起來,一個(gè)bolt也可以接入多個(gè)spot/bolt。運(yùn)行時(shí)Topology如下圖:
編程模型的一些基本概念:
元組
storm使用tuple(元組)來作為它的數(shù)據(jù)模型。每個(gè)tuple由一堆域(field)組成,每個(gè)域有一個(gè)值,并且每個(gè)值可以是任何類型。
一個(gè)tuple可以看作一個(gè)沒有方法的java對(duì)象。總體來看,storm支持所有的基本類型、字符串以及字節(jié)數(shù)組作為tuple的值類型。
Spout
i. BaseRichSpout是實(shí)現(xiàn) IRichSpout接口的類,對(duì)上述必要的方法有默認(rèn)的實(shí)現(xiàn);
ii. 如果業(yè)務(wù)需要自定義ack()、fail() 等方法,選擇實(shí)現(xiàn) IRichSpout接口;
iii. 如果業(yè)務(wù)沒有自定義需求,選擇繼承BaseRichSpout類,可以不實(shí)現(xiàn)并不一定需要用戶實(shí)現(xiàn)的方法,簡(jiǎn)化開發(fā)。
i. open方法是初始化動(dòng)作。允許你在該spout初始化時(shí)做一些動(dòng)作,傳入了上下文,方便取上下文的一些數(shù)據(jù)。
ii. close方法在該spout關(guān)閉前執(zhí)行。
iii. activate和deactivate :一個(gè)spout可以被暫時(shí)激活和關(guān)閉,這兩個(gè)方法分別在對(duì)應(yīng)的時(shí)刻被調(diào)用。
iv. nextTuple 用來發(fā)射數(shù)據(jù)。Spout中最重要的方法。
v. ack(Object)傳入的Object其實(shí)是一個(gè)id,唯一表示一個(gè)tuple。該方法是這個(gè)id所對(duì)應(yīng)的tuple被成功處理后執(zhí)行。
vi. fail(Object)同ack,只不過是tuple處理失敗時(shí)執(zhí)行。
Spout是在一個(gè)topology中產(chǎn)生源數(shù)據(jù)流的組件。通常情況下spout會(huì)從外部數(shù)據(jù)源中讀取數(shù)據(jù),然后轉(zhuǎn)換為topology內(nèi)部的源數(shù)據(jù)。Spout是一個(gè)主動(dòng)的角色,其接口中有個(gè)nextTuple()函數(shù),storm框架會(huì)不停地調(diào)用此函數(shù),用戶只要在其中生成源數(shù)據(jù)即可。
實(shí)現(xiàn)Spout時(shí),需要實(shí)現(xiàn)最頂層抽象ISpout接口里面的幾個(gè)方法
實(shí)現(xiàn)Spout時(shí),還需要實(shí)現(xiàn)Icomponent接口,來聲明發(fā)射到下游bolt的字段名稱。
通常情況下,實(shí)現(xiàn)一個(gè)Spout,可以直接實(shí)現(xiàn)接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout。
Bolt
prepare方法是初始化動(dòng)作。允許你在該Bolt初始化時(shí)做一些動(dòng)作,傳入了上下文,方便取上下文的一些數(shù)據(jù)。
excute 用來處理數(shù)據(jù)。Bolt中最重要的方法。
cleanup在該Bolt關(guān)閉前執(zhí)行.
在拓?fù)渲兴械挠?jì)算邏輯都是在Bolt中實(shí)現(xiàn)的。一個(gè)Bolt可以處理任意數(shù)量的輸入流,產(chǎn)生任意數(shù)量新的輸出流。Bolt可以做函數(shù)處理,過濾,流的合并,聚合,存儲(chǔ)到數(shù)據(jù)庫(kù)等操作。在Bolt中最主要的函數(shù)是execute函數(shù),它使用一個(gè)新的元組當(dāng)作輸入。Bolt使用OutputCollector對(duì)象來吐出新的元組。
實(shí)現(xiàn)Bolt時(shí),需要實(shí)現(xiàn)IBolt接口,它聲明了Bolt的核心方法,負(fù)責(zé)Topology所有的計(jì)算邏輯:
實(shí)現(xiàn)Bolt時(shí),還需要實(shí)現(xiàn)Icomponent接口,來聲明發(fā)射到下游bolt的字段名稱
通常情況下,實(shí)現(xiàn)一個(gè)Bolt ,可以直接實(shí)現(xiàn)接口IRichBolt/IBasicBolt,也可以直接繼承BaseRichBolt/BaseBasicBolt。IBasicBolt/BaseBasicBolt在emit數(shù)據(jù)的時(shí)候,會(huì)自動(dòng)和輸入的tuple相關(guān)聯(lián),而在execute方法結(jié)束的時(shí)候那個(gè)輸入tuple會(huì)被自動(dòng)ack。使用IRichBolt/BaseRichBolt需要在emit數(shù)據(jù)的時(shí)候,顯示指定該數(shù)據(jù)的源tuple要加上第二個(gè)參數(shù)anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple,newTuple);并且需要在execute執(zhí)行成功后調(diào)用OutputCollector.ack(tuple), 當(dāng)失敗處理時(shí),執(zhí)行OutputCollector.fail(tuple)。
Stream Groupings(流分組)
定義了一個(gè)流在Bolt任務(wù)間該如何被切分。
- 隨機(jī)分組(Shuffle grouping):隨機(jī)分發(fā)tuple到Bolt的任務(wù),保證每個(gè)任務(wù)獲得相等數(shù)量的tuple。
- 字段分組(Fields grouping):根據(jù)指定字段分割數(shù)據(jù)流,并分組。例如,根據(jù)“user-id”字段,相同“user-id”的元組總是分發(fā)到同一個(gè)任務(wù),不同“user-id”的元組可能分發(fā)到不同的任務(wù)。
- 全部分組(All grouping):tuple被復(fù)制到bolt的所有任務(wù)。這種類型需要謹(jǐn)慎使用。
- 全局分組(Global grouping):全部流都分配到bolt的同一個(gè)任務(wù)。明確地說,是分配給ID最小的那個(gè)task。
- 無分組(None grouping):你不需要關(guān)心流是如何分組。目前,無分組等效于隨機(jī)分組。
- 直接分組(Direct grouping):這是一個(gè)特別的分組類型。元組生產(chǎn)者決定tuple由哪個(gè)元組處理者任務(wù)接收。
4. Storm消息處理的可靠性機(jī)制
可靠性機(jī)制(Ack機(jī)制)指的是Storm可以保證從Spout發(fā)出的每個(gè)消息都能被完全處理。一條消息被“完整處理”,指一個(gè)從Spout發(fā)出的元組所觸發(fā)的消息樹中所有的消息都被Storm處理了。如果在指定的超時(shí)時(shí)間里,這個(gè)Spout元組觸發(fā)的消息樹中有任何一個(gè)消息沒有處理完,就認(rèn)為這個(gè)Spout元組處理失敗了。這個(gè)超時(shí)時(shí)間是通過每個(gè)拓?fù)涞腃onfig.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置項(xiàng)來進(jìn)行配置的,默認(rèn)是30秒。
Storm 是這樣實(shí)現(xiàn)可靠性機(jī)制的:
Storm 的拓?fù)溆幸恍┨厥獾姆Q為“acker”的任務(wù),這些任務(wù)負(fù)責(zé)跟蹤每個(gè) Spout 發(fā)出的 tuple 的 DAG。當(dāng)一個(gè) acker 發(fā)現(xiàn)一個(gè) DAG 結(jié)束了,它就會(huì)給創(chuàng)建 spout tuple 的 Spout 任務(wù)發(fā)送一條消息,讓這個(gè)任務(wù)來應(yīng)答這個(gè)消息。你可以使用Config.TOPOLOGY_ACKERS 來配置拓?fù)涞?acker 數(shù)量。Storm 默認(rèn)會(huì)將 acker 的數(shù)量設(shè)置為1,不過如果你有大量消息的處理需求,你可能需要增加這個(gè)數(shù)量。
acker任務(wù)跟蹤一個(gè)元組樹,只占用固定大小的空間(大約20字節(jié))。若采用 Ack機(jī)制,每個(gè)處理的tuple, 必須被ack或者fail。因?yàn)閟torm追蹤每個(gè)tuple要占用內(nèi)存。所以如果不ack/fail每一個(gè)tuple, 那么最終你會(huì)看到OutOfMemory錯(cuò)誤。
編程實(shí)現(xiàn)(必要條件):acker數(shù)設(shè)置大于0;Spout發(fā)送元組時(shí),指定messageId;bolt處理完元組時(shí),一定要調(diào)用ack/fail方法。
5. Storm的并發(fā)機(jī)制
在一個(gè) Storm 集群中,Storm 主要通過以下三個(gè)部件來運(yùn)行拓?fù)?#xff1a;工作進(jìn)程(worker processes)、執(zhí)行器(executors)、任務(wù)(tasks)。三者的關(guān)系如下:
1個(gè)worker進(jìn)程執(zhí)行的是1個(gè)topology的子集(注:不會(huì)出現(xiàn)1個(gè)worker為多個(gè)topology服務(wù))。1個(gè)worker進(jìn)程會(huì)啟動(dòng)1個(gè)或多個(gè)executor線程來執(zhí)行1個(gè)topology的component(spout或bolt)。因此,1個(gè)運(yùn)行中的topology就是由集群中多臺(tái)物理機(jī)上的多個(gè)worker進(jìn)程組成的。
executor是1個(gè)被worker進(jìn)程啟動(dòng)的單獨(dú)線程。每個(gè)executor只會(huì)運(yùn)行1個(gè)topology的1個(gè)component(spout或bolt)的task(注:task可以是1個(gè)或多個(gè),storm默認(rèn)是1個(gè)component只生成1個(gè)task,executor線程里會(huì)在每次循環(huán)里順序調(diào)用所有task實(shí)例)。
task是最終運(yùn)行spout或bolt中代碼的單元(注:1個(gè)task即為spout或bolt的1個(gè)實(shí)例,executor線程在執(zhí)行期間會(huì)調(diào)用該task的nextTuple或execute方法)。topology啟動(dòng)后,1個(gè)component(spout或bolt)的task數(shù)目是固定不變的,但該component使用的executor線程數(shù)可以動(dòng)態(tài)調(diào)整(例如:1個(gè)executor線程可以執(zhí)行該component的1個(gè)或多個(gè)task實(shí)例)。這意味著,對(duì)于1個(gè)component存在這樣的條件:#threads<=#tasks(即:線程數(shù)小于等于task數(shù)目)。默認(rèn)情況下task的數(shù)目等于executor線程數(shù)目,即1個(gè)executor線程只運(yùn)行1個(gè)task。
三、構(gòu)建基于Storm的實(shí)時(shí)數(shù)據(jù)分析平臺(tái)實(shí)戰(zhàn)經(jīng)驗(yàn)
構(gòu)建基于Storm的實(shí)時(shí)數(shù)據(jù)分析平臺(tái),第一步當(dāng)然應(yīng)該是搭建storm集群。這個(gè)網(wǎng)上的教程還有輪子實(shí)在是太多,我就不貼出來了。請(qǐng)大家Google或者Baidu之,然后一步步搭建集群就完了。
1. Storm使用的一些實(shí)戰(zhàn)經(jīng)驗(yàn)
在架構(gòu)上,推薦 “消息中間件 + storm + 外部存儲(chǔ)” 3架馬車式架構(gòu)
Storm從消息中間件中取出數(shù)據(jù),計(jì)算出結(jié)果,存儲(chǔ)到外部存儲(chǔ)上
通常消息中間件推薦使用RocketMQ,Kafka
外部存儲(chǔ)推薦使用HBase,Redis
該架構(gòu),非常方便Storm程序進(jìn)行重啟(如因?yàn)樵黾訕I(yè)務(wù)升級(jí)程序)
職責(zé)清晰化,減少和外部系統(tǒng)的交互,Storm將計(jì)算結(jié)果存儲(chǔ)到外部存儲(chǔ)后,用戶的查詢就無需訪問Storm中服務(wù)進(jìn)程,查詢外部存儲(chǔ)即可。在實(shí)際計(jì)算中,常常發(fā)現(xiàn)需要做數(shù)據(jù)訂正,因此在設(shè)計(jì)整個(gè)項(xiàng)目時(shí),需要考慮重跑功能 。在最終生成的結(jié)果中,數(shù)據(jù)最好帶時(shí)間戳 。
結(jié)合Storm UI查看topology各個(gè)組件的負(fù)載,合理配置各組件的并發(fā)度。
Spout和Bolt的構(gòu)造函數(shù)只會(huì)在submit Topology時(shí)調(diào)一次,然后序列化起來,直接發(fā)給工作節(jié)點(diǎn),工作節(jié)點(diǎn)里實(shí)例化時(shí)不會(huì)被調(diào)用里,所以復(fù)雜的成員變量記得都定義成transient,在open(),prepare()里初始化及連接數(shù)據(jù)庫(kù)等資源。
按照性能來說, 使用ack機(jī)制普通接口 < 關(guān)掉ack機(jī)制的普通接口, 因此,需要根據(jù)業(yè)務(wù)對(duì)數(shù)據(jù)處理的速率需求決定是否采用ack機(jī)制。
當(dāng)使用fieldGrouping方式時(shí),有可能造成有的task任務(wù)重,有的task任務(wù)輕,因此讓整個(gè)數(shù)據(jù)流變慢, 盡量讓task之間壓力均勻。
KafkaSpout的并發(fā)度最好設(shè)置成Kafka的分區(qū)數(shù)。消費(fèi)Kafka時(shí), 一個(gè)分區(qū)只能一個(gè)線程消費(fèi),因此有可能簡(jiǎn)單的增加并發(fā)無法解決問題, 可以嘗試增加Kafka的分區(qū)數(shù)。
如果topology性能有問題, 可以嘗試關(guān)掉ack機(jī)制,查看性能如何,如果性能有大幅提升,則預(yù)示著瓶頸不在spout, 有可能是Acker的并發(fā)少了,或者業(yè)務(wù)處理邏輯慢了。
2. Storm編程實(shí)踐-WordCount
Spout
SpiltSentenceBolt
WordCountBolt
ReportBolt
Topology
Result
長(zhǎng)按訂閱更多精彩▼
如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝
總結(jié)
以上是生活随笔為你收集整理的storm 机器上日志查询_Storm原理与实践大数据技术栈14的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spss如何进行显著性差异分析
- 下一篇: 高内聚低耦合_高渗透环氧树脂灌浆料