日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

storm 机器上日志查询_Storm原理与实践大数据技术栈14

發(fā)布時(shí)間:2023/12/19 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm 机器上日志查询_Storm原理与实践大数据技术栈14 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

回顧:大數(shù)據(jù)平臺(tái)技術(shù)棧?(ps:可點(diǎn)擊查看),今天就來說說其中的Storm!

來自:有米加瓦

一、Storm簡(jiǎn)介

1. 引例

在介紹Storm之前,我們先看一個(gè)日志統(tǒng)計(jì)的例子:假如我們想要根據(jù)用戶的訪問日志統(tǒng)計(jì)使用斗魚客戶端的用戶的地域分布情況,一般情況下我們會(huì)分這幾步:

  • 取出訪問日志中客戶端的IP

  • 把IP轉(zhuǎn)換成對(duì)應(yīng)地域

  • 按照地域進(jìn)行統(tǒng)計(jì)

Hadoop貌似就可以輕松搞定:

  • map做ip提取,轉(zhuǎn)換成地域

  • reduce以地域?yàn)閗ey聚合,計(jì)數(shù)統(tǒng)計(jì)

  • 從HDFS取出結(jié)果

如果有時(shí)效性要求呢?

  • 小時(shí)級(jí):還行,每小時(shí)跑一個(gè)MapReduce Job

  • 10分鐘:還湊合能跑

  • 5分鐘 :夠嗆了,等槽位可能要幾分鐘呢

  • 1分鐘 :算了吧,啟動(dòng)Job就要幾十秒呢

  • 秒級(jí) :… 要滿足秒級(jí)別的數(shù)據(jù)統(tǒng)計(jì)需求,需要

  • 進(jìn)程常駐運(yùn)行;

  • 數(shù)據(jù)在內(nèi)存中

Storm正好適合這種需求。

2. 特性

Storm是一個(gè)分布式實(shí)時(shí)流式計(jì)算平臺(tái)。主要特性如下:

  • 簡(jiǎn)單的編程模型:類似于MapReduce降低了并行批處理復(fù)雜性,Storm降低了實(shí)時(shí)處理的復(fù)雜性,只需實(shí)現(xiàn)幾個(gè)接口即可(Spout實(shí)現(xiàn)ISpout接口,Bolt實(shí)現(xiàn)IBolt接口)。

  • 支持多種語言:你可以在Storm之上使用各種編程語言。默認(rèn)支持Clojure、Java、Ruby和Python。要增加對(duì)其他語言的支持,只需實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Storm通信協(xié)議即可。

  • 容錯(cuò)性:nimbus、supervisor都是無狀態(tài)的, 可以用kill -9來殺死Nimbus和Supervisor進(jìn)程, 然后再重啟它們,任務(wù)照常進(jìn)行; 當(dāng)worker失敗后, supervisor會(huì)嘗試在本機(jī)重啟它。

  • 分布式:計(jì)算是在多個(gè)線程、進(jìn)程和服務(wù)器之間并行進(jìn)行的。

  • 持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失。

  • 可靠的消息處理:Storm保證每個(gè)消息至少能得到一次完整處理。任務(wù)失敗時(shí),它會(huì)負(fù)責(zé)從消息源重試消息(ack機(jī)制)。

  • 快速、實(shí)時(shí):Storm保證每個(gè)消息能能得到快速的處理。

3. 與常用其他大數(shù)據(jù)計(jì)算平臺(tái)對(duì)比

  • Storm vs. MapReduce Storm的一個(gè)拓?fù)涑qv內(nèi)存運(yùn)行,MR作業(yè)運(yùn)行完了進(jìn)行就被kill了;storm是流式處理,MR是批處理;Storm數(shù)據(jù)在內(nèi)存中不寫磁盤,而MR會(huì)與磁盤進(jìn)行交互;Storm的DAG(有向無環(huán)圖)模型可以組合多個(gè)階段,而MR只可以有MAP和REDUCE兩個(gè)階段。

Storm vs. Spark Streaming Storm處理的是每次傳入的一條數(shù)據(jù),Spark Streaming實(shí)際處理的是微批量數(shù)據(jù)。

二、Storm的架構(gòu)和運(yùn)行時(shí)原理

1. 集群架構(gòu)

如上圖所示,一個(gè)典型的storm集群包含一個(gè)主控節(jié)點(diǎn)Nimbus,負(fù)責(zé)資源分配和任務(wù)調(diào)度;還有若干個(gè)子節(jié)點(diǎn)Supervisor,負(fù)責(zé)接受nimbus分配的任務(wù),啟動(dòng)和停止屬于自己管理的worker進(jìn)程;Nimbus和Supervisor之間的所有協(xié)調(diào)工作都是通過Zookeeper集群完成。

2. Storm的容錯(cuò)(Fault Tolerance)機(jī)制

Nimbus和Supervisor進(jìn)程被設(shè)計(jì)成快速失敗(fail fast)的(當(dāng)遇到異常的情況,進(jìn)程就會(huì)掛掉)并且是無狀態(tài)的(狀態(tài)都保存在Zookeeper或者在磁盤上)。

  • Nimbus與Supervisor本身也是無狀態(tài)的,狀態(tài)信息是由zookeeper存儲(chǔ)(實(shí)現(xiàn)了高可用,當(dāng)nimbus掛掉,可以找另外一個(gè)節(jié)點(diǎn)啟動(dòng)nimbus進(jìn)程,狀態(tài)信息從zookeeper獲得)。

  • 在Nimbus進(jìn)程失敗后,可以快速重啟恢復(fù)正常工作,不需要很長(zhǎng)的時(shí)間來進(jìn)行初始化和狀態(tài)恢復(fù)。

  • 當(dāng)Nimbus從zookeeper得知有supervisor節(jié)點(diǎn)掛掉,可以將該節(jié)點(diǎn)的任務(wù)重新分配給其他子節(jié)點(diǎn)。

  • Nimbus在“某種程度”上屬于單點(diǎn)故障的。在實(shí)際中,即使Nimbus進(jìn)程掛掉,也不會(huì)有災(zāi)難性的事情發(fā)生 。

  • 當(dāng)Nimbus掛掉會(huì)怎樣?

  • 已經(jīng)存在的拓?fù)淇梢岳^續(xù)正常運(yùn)行,但是不能提交新拓?fù)?#xff1b;

  • 正在運(yùn)行的worker進(jìn)程仍然可以繼續(xù)工作。而且當(dāng)worker掛掉,Supervisor會(huì)一直重啟worker。

  • 失敗的任務(wù)不會(huì)被分配到其他機(jī)器(是Nimbus的職責(zé))上了

  • 當(dāng)一個(gè)Supervisor(slave節(jié)點(diǎn))掛掉會(huì)怎樣?

  • 分配到這臺(tái)機(jī)器的所有任務(wù)(task)會(huì)超時(shí),Nimbus會(huì)把這些任務(wù)(task)重新分配給其他機(jī)器。當(dāng)一個(gè)worker掛掉會(huì)怎么樣?

  • 當(dāng)一個(gè)worker掛掉,Supervisor會(huì)重啟它。如果啟動(dòng)一直失敗那么此時(shí)worker也就不能和Nimbus保持心跳了,Nimbus會(huì)重新分配worker到其他機(jī)器

  • 3. Storm的編程模型

    Strom在運(yùn)行中可分為spout與bolt兩個(gè)組件,其中,數(shù)據(jù)源從spout開始,數(shù)據(jù)以tuple的方式發(fā)送到bolt,多個(gè)bolt可以串連起來,一個(gè)bolt也可以接入多個(gè)spot/bolt。運(yùn)行時(shí)Topology如下圖:

    編程模型的一些基本概念:

    • 元組

      • storm使用tuple(元組)來作為它的數(shù)據(jù)模型。每個(gè)tuple由一堆域(field)組成,每個(gè)域有一個(gè)值,并且每個(gè)值可以是任何類型。

      • 一個(gè)tuple可以看作一個(gè)沒有方法的java對(duì)象。總體來看,storm支持所有的基本類型、字符串以及字節(jié)數(shù)組作為tuple的值類型。

    • Spout

      • i. BaseRichSpout是實(shí)現(xiàn) IRichSpout接口的類,對(duì)上述必要的方法有默認(rèn)的實(shí)現(xiàn);

      • ii. 如果業(yè)務(wù)需要自定義ack()、fail() 等方法,選擇實(shí)現(xiàn) IRichSpout接口;

      • iii. 如果業(yè)務(wù)沒有自定義需求,選擇繼承BaseRichSpout類,可以不實(shí)現(xiàn)并不一定需要用戶實(shí)現(xiàn)的方法,簡(jiǎn)化開發(fā)。

      • i. open方法是初始化動(dòng)作。允許你在該spout初始化時(shí)做一些動(dòng)作,傳入了上下文,方便取上下文的一些數(shù)據(jù)。

      • ii. close方法在該spout關(guān)閉前執(zhí)行。

      • iii. activate和deactivate :一個(gè)spout可以被暫時(shí)激活和關(guān)閉,這兩個(gè)方法分別在對(duì)應(yīng)的時(shí)刻被調(diào)用。

      • iv. nextTuple 用來發(fā)射數(shù)據(jù)。Spout中最重要的方法。

      • v. ack(Object)傳入的Object其實(shí)是一個(gè)id,唯一表示一個(gè)tuple。該方法是這個(gè)id所對(duì)應(yīng)的tuple被成功處理后執(zhí)行。

      • vi. fail(Object)同ack,只不過是tuple處理失敗時(shí)執(zhí)行。

      • Spout是在一個(gè)topology中產(chǎn)生源數(shù)據(jù)流的組件。通常情況下spout會(huì)從外部數(shù)據(jù)源中讀取數(shù)據(jù),然后轉(zhuǎn)換為topology內(nèi)部的源數(shù)據(jù)。Spout是一個(gè)主動(dòng)的角色,其接口中有個(gè)nextTuple()函數(shù),storm框架會(huì)不停地調(diào)用此函數(shù),用戶只要在其中生成源數(shù)據(jù)即可。

      • 實(shí)現(xiàn)Spout時(shí),需要實(shí)現(xiàn)最頂層抽象ISpout接口里面的幾個(gè)方法

      • 實(shí)現(xiàn)Spout時(shí),還需要實(shí)現(xiàn)Icomponent接口,來聲明發(fā)射到下游bolt的字段名稱。

      • 通常情況下,實(shí)現(xiàn)一個(gè)Spout,可以直接實(shí)現(xiàn)接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout。

    • Bolt

      • prepare方法是初始化動(dòng)作。允許你在該Bolt初始化時(shí)做一些動(dòng)作,傳入了上下文,方便取上下文的一些數(shù)據(jù)。

      • excute 用來處理數(shù)據(jù)。Bolt中最重要的方法。

      • cleanup在該Bolt關(guān)閉前執(zhí)行.

      • 在拓?fù)渲兴械挠?jì)算邏輯都是在Bolt中實(shí)現(xiàn)的。一個(gè)Bolt可以處理任意數(shù)量的輸入流,產(chǎn)生任意數(shù)量新的輸出流。Bolt可以做函數(shù)處理,過濾,流的合并,聚合,存儲(chǔ)到數(shù)據(jù)庫(kù)等操作。在Bolt中最主要的函數(shù)是execute函數(shù),它使用一個(gè)新的元組當(dāng)作輸入。Bolt使用OutputCollector對(duì)象來吐出新的元組。

      • 實(shí)現(xiàn)Bolt時(shí),需要實(shí)現(xiàn)IBolt接口,它聲明了Bolt的核心方法,負(fù)責(zé)Topology所有的計(jì)算邏輯:

      • 實(shí)現(xiàn)Bolt時(shí),還需要實(shí)現(xiàn)Icomponent接口,來聲明發(fā)射到下游bolt的字段名稱

      • 通常情況下,實(shí)現(xiàn)一個(gè)Bolt ,可以直接實(shí)現(xiàn)接口IRichBolt/IBasicBolt,也可以直接繼承BaseRichBolt/BaseBasicBolt。IBasicBolt/BaseBasicBolt在emit數(shù)據(jù)的時(shí)候,會(huì)自動(dòng)和輸入的tuple相關(guān)聯(lián),而在execute方法結(jié)束的時(shí)候那個(gè)輸入tuple會(huì)被自動(dòng)ack。使用IRichBolt/BaseRichBolt需要在emit數(shù)據(jù)的時(shí)候,顯示指定該數(shù)據(jù)的源tuple要加上第二個(gè)參數(shù)anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple,newTuple);并且需要在execute執(zhí)行成功后調(diào)用OutputCollector.ack(tuple), 當(dāng)失敗處理時(shí),執(zhí)行OutputCollector.fail(tuple)。

    • Stream Groupings(流分組)

      • 定義了一個(gè)流在Bolt任務(wù)間該如何被切分。

    - 隨機(jī)分組(Shuffle grouping):隨機(jī)分發(fā)tuple到Bolt的任務(wù),保證每個(gè)任務(wù)獲得相等數(shù)量的tuple。

    - 字段分組(Fields grouping):根據(jù)指定字段分割數(shù)據(jù)流,并分組。例如,根據(jù)“user-id”字段,相同“user-id”的元組總是分發(fā)到同一個(gè)任務(wù),不同“user-id”的元組可能分發(fā)到不同的任務(wù)。

    - 全部分組(All grouping):tuple被復(fù)制到bolt的所有任務(wù)。這種類型需要謹(jǐn)慎使用。
    - 全局分組(Global grouping):全部流都分配到bolt的同一個(gè)任務(wù)。明確地說,是分配給ID最小的那個(gè)task。
    - 無分組(None grouping):你不需要關(guān)心流是如何分組。目前,無分組等效于隨機(jī)分組。
    - 直接分組(Direct grouping):這是一個(gè)特別的分組類型。元組生產(chǎn)者決定tuple由哪個(gè)元組處理者任務(wù)接收。

    4. Storm消息處理的可靠性機(jī)制

    可靠性機(jī)制(Ack機(jī)制)指的是Storm可以保證從Spout發(fā)出的每個(gè)消息都能被完全處理。一條消息被“完整處理”,指一個(gè)從Spout發(fā)出的元組所觸發(fā)的消息樹中所有的消息都被Storm處理了。如果在指定的超時(shí)時(shí)間里,這個(gè)Spout元組觸發(fā)的消息樹中有任何一個(gè)消息沒有處理完,就認(rèn)為這個(gè)Spout元組處理失敗了。這個(gè)超時(shí)時(shí)間是通過每個(gè)拓?fù)涞腃onfig.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置項(xiàng)來進(jìn)行配置的,默認(rèn)是30秒。

    Storm 是這樣實(shí)現(xiàn)可靠性機(jī)制的:

    • Storm 的拓?fù)溆幸恍┨厥獾姆Q為“acker”的任務(wù),這些任務(wù)負(fù)責(zé)跟蹤每個(gè) Spout 發(fā)出的 tuple 的 DAG。當(dāng)一個(gè) acker 發(fā)現(xiàn)一個(gè) DAG 結(jié)束了,它就會(huì)給創(chuàng)建 spout tuple 的 Spout 任務(wù)發(fā)送一條消息,讓這個(gè)任務(wù)來應(yīng)答這個(gè)消息。你可以使用Config.TOPOLOGY_ACKERS 來配置拓?fù)涞?acker 數(shù)量。Storm 默認(rèn)會(huì)將 acker 的數(shù)量設(shè)置為1,不過如果你有大量消息的處理需求,你可能需要增加這個(gè)數(shù)量。

    • acker任務(wù)跟蹤一個(gè)元組樹,只占用固定大小的空間(大約20字節(jié))。若采用 Ack機(jī)制,每個(gè)處理的tuple, 必須被ack或者fail。因?yàn)閟torm追蹤每個(gè)tuple要占用內(nèi)存。所以如果不ack/fail每一個(gè)tuple, 那么最終你會(huì)看到OutOfMemory錯(cuò)誤。

    • 編程實(shí)現(xiàn)(必要條件):acker數(shù)設(shè)置大于0;Spout發(fā)送元組時(shí),指定messageId;bolt處理完元組時(shí),一定要調(diào)用ack/fail方法。

    5. Storm的并發(fā)機(jī)制

    在一個(gè) Storm 集群中,Storm 主要通過以下三個(gè)部件來運(yùn)行拓?fù)?#xff1a;工作進(jìn)程(worker processes)、執(zhí)行器(executors)、任務(wù)(tasks)。三者的關(guān)系如下:

    • 1個(gè)worker進(jìn)程執(zhí)行的是1個(gè)topology的子集(注:不會(huì)出現(xiàn)1個(gè)worker為多個(gè)topology服務(wù))。1個(gè)worker進(jìn)程會(huì)啟動(dòng)1個(gè)或多個(gè)executor線程來執(zhí)行1個(gè)topology的component(spout或bolt)。因此,1個(gè)運(yùn)行中的topology就是由集群中多臺(tái)物理機(jī)上的多個(gè)worker進(jìn)程組成的。

    • executor是1個(gè)被worker進(jìn)程啟動(dòng)的單獨(dú)線程。每個(gè)executor只會(huì)運(yùn)行1個(gè)topology的1個(gè)component(spout或bolt)的task(注:task可以是1個(gè)或多個(gè),storm默認(rèn)是1個(gè)component只生成1個(gè)task,executor線程里會(huì)在每次循環(huán)里順序調(diào)用所有task實(shí)例)。

    • task是最終運(yùn)行spout或bolt中代碼的單元(注:1個(gè)task即為spout或bolt的1個(gè)實(shí)例,executor線程在執(zhí)行期間會(huì)調(diào)用該task的nextTuple或execute方法)。topology啟動(dòng)后,1個(gè)component(spout或bolt)的task數(shù)目是固定不變的,但該component使用的executor線程數(shù)可以動(dòng)態(tài)調(diào)整(例如:1個(gè)executor線程可以執(zhí)行該component的1個(gè)或多個(gè)task實(shí)例)。這意味著,對(duì)于1個(gè)component存在這樣的條件:#threads<=#tasks(即:線程數(shù)小于等于task數(shù)目)。默認(rèn)情況下task的數(shù)目等于executor線程數(shù)目,即1個(gè)executor線程只運(yùn)行1個(gè)task。

    三、構(gòu)建基于Storm的實(shí)時(shí)數(shù)據(jù)分析平臺(tái)實(shí)戰(zhàn)經(jīng)驗(yàn)

    構(gòu)建基于Storm的實(shí)時(shí)數(shù)據(jù)分析平臺(tái),第一步當(dāng)然應(yīng)該是搭建storm集群。這個(gè)網(wǎng)上的教程還有輪子實(shí)在是太多,我就不貼出來了。請(qǐng)大家Google或者Baidu之,然后一步步搭建集群就完了。

    1. Storm使用的一些實(shí)戰(zhàn)經(jīng)驗(yàn)

    • 在架構(gòu)上,推薦 “消息中間件 + storm + 外部存儲(chǔ)” 3架馬車式架構(gòu)

      • Storm從消息中間件中取出數(shù)據(jù),計(jì)算出結(jié)果,存儲(chǔ)到外部存儲(chǔ)上

      • 通常消息中間件推薦使用RocketMQ,Kafka

      • 外部存儲(chǔ)推薦使用HBase,Redis

      • 該架構(gòu),非常方便Storm程序進(jìn)行重啟(如因?yàn)樵黾訕I(yè)務(wù)升級(jí)程序)

      • 職責(zé)清晰化,減少和外部系統(tǒng)的交互,Storm將計(jì)算結(jié)果存儲(chǔ)到外部存儲(chǔ)后,用戶的查詢就無需訪問Storm中服務(wù)進(jìn)程,查詢外部存儲(chǔ)即可。在實(shí)際計(jì)算中,常常發(fā)現(xiàn)需要做數(shù)據(jù)訂正,因此在設(shè)計(jì)整個(gè)項(xiàng)目時(shí),需要考慮重跑功能 。在最終生成的結(jié)果中,數(shù)據(jù)最好帶時(shí)間戳 。

    • 結(jié)合Storm UI查看topology各個(gè)組件的負(fù)載,合理配置各組件的并發(fā)度。

    • Spout和Bolt的構(gòu)造函數(shù)只會(huì)在submit Topology時(shí)調(diào)一次,然后序列化起來,直接發(fā)給工作節(jié)點(diǎn),工作節(jié)點(diǎn)里實(shí)例化時(shí)不會(huì)被調(diào)用里,所以復(fù)雜的成員變量記得都定義成transient,在open(),prepare()里初始化及連接數(shù)據(jù)庫(kù)等資源。

    • 按照性能來說, 使用ack機(jī)制普通接口 < 關(guān)掉ack機(jī)制的普通接口, 因此,需要根據(jù)業(yè)務(wù)對(duì)數(shù)據(jù)處理的速率需求決定是否采用ack機(jī)制。

    • 當(dāng)使用fieldGrouping方式時(shí),有可能造成有的task任務(wù)重,有的task任務(wù)輕,因此讓整個(gè)數(shù)據(jù)流變慢, 盡量讓task之間壓力均勻。

    • KafkaSpout的并發(fā)度最好設(shè)置成Kafka的分區(qū)數(shù)。消費(fèi)Kafka時(shí), 一個(gè)分區(qū)只能一個(gè)線程消費(fèi),因此有可能簡(jiǎn)單的增加并發(fā)無法解決問題, 可以嘗試增加Kafka的分區(qū)數(shù)。

    • 如果topology性能有問題, 可以嘗試關(guān)掉ack機(jī)制,查看性能如何,如果性能有大幅提升,則預(yù)示著瓶頸不在spout, 有可能是Acker的并發(fā)少了,或者業(yè)務(wù)處理邏輯慢了。

    2. Storm編程實(shí)踐-WordCount

    • Spout

    • SpiltSentenceBolt

    • WordCountBolt

    • ReportBolt

    • Topology

    • Result

    長(zhǎng)按訂閱更多精彩▼

    如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝

    總結(jié)

    以上是生活随笔為你收集整理的storm 机器上日志查询_Storm原理与实践大数据技术栈14的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。