日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

流式计算storm核心组件介绍以及入门案例---跟着就能在本地跑起来的storm项目

發布時間:2024/7/23 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 流式计算storm核心组件介绍以及入门案例---跟着就能在本地跑起来的storm项目 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

關于storm的基礎,參照我這篇文章:流式計算storm
關于并發和并行,參照我這篇文章:并發和并行
關于storm的并行度解釋,參照我這篇文章:storm的并行度解釋
關于storm的流分組策略,參照我這篇文章:storm的流分組策略
關于storm的消息可靠機制,參照我這篇文章:storm的消息可靠機制

storm簡介

Storm是一個免費并開源的分布式實時計算系統。利用Storm可以很容易做到可靠地處理無限的數據流,像Hadoop批量處理大數據一樣,Storm可以實時處理數據。Storm簡單,可以使用任何編程語言。

storm核心組件

1.Nimbus

相當于storm的master,負責資源分配和任務調度,一個普通的storm集群只有一個nimbus(京東是對nimbus做了集群,加入了選舉等概念,防止nimbus突然掛掉)

2.Supervisor

相當于storm的slave,負責接收Nimbus分配的任務,管理和啟動所有的Worker

3.Worker

一個Worker就是一個jvm進程,對應一個Topology程序,可以有多個Executor

4.Executor

一個Executor就是一個線程,默認對應一個task,也可以設置成對應多個task

5.Task

一個Task是一個實例(spot/bolt),有多少個task就會new多少個bolt,task是storm中進行計算的最小的運行單位

6.Topology

拓撲結構,一個計算任務場景對應一個拓撲結構,拓撲結構中對聲明spout和bolt直接的關系

7.Spout

是拓撲結構中的數據來源,可以向多個bolt發送數據,Spout 既可以定義為可靠的數據源,也可以定義為不可靠的數據源

8.Bolt

真正的數據處理部分,一個bolt可以發給多個bolt,多個bolt也可以發給一個bolt

9.Component

Spout和Bolt都是Component,Storm定義了一個名叫IComponent的總接口
全家譜如下:綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關的

spout和bolt的關系:

整體的topology結構:

storm使用zookeeper來協調集群中的多個節點,但并不是用zookeeper來傳遞消息
zookeeper可以看這個
Nimbus和Supervisor都是無狀態的,他們的心跳都由zookeeper協調

storm優點

1.使用簡單,容易上手
2.可擴展,可以調整正在運行的topologies的并行度
3.容錯,可靠,當工作節點宕了,storm會嘗試重啟另一個,而且Nimbus和Supervisors都是無狀態的,死掉重啟都不影響
4.無數據丟失,Storm的抽象組件確保了數據至少處理一次,即使使用消息隊列系統失敗時,也能確保消息被處理
5.支持多種編程語言,Storm用Thrift定義和提交topologies.由于Thrift能被任何一種編程語言使用,因此,topologies也能被任何一種編程語言定義和使用。
6.容易部署和操作
7.高性能,低延遲

storm入門案例 ( 實時統計單次個數 )

首先導入maven依賴

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.0.4</version></dependency>

1.先寫一個Spout,確定數據源,實際應用中一般是接入kafka等消息,入門案例使用隨機字符串代替

/*** 向后端發射tuple數據流* @author soul**/ public class SentenceSpout extends BaseRichSpout {//BaseRichSpout是ISpout接口和IComponent接口的簡單實現,接口對用不到的方法提供了默認的實現private SpoutOutputCollector collector;private String[] sentences = {"my name is soul","im a boy","i have a dog","my dog has fleas","my girl friend is beautiful"};private int index=0;/*** open()方法中是ISpout接口中定義,在Spout組件初始化時被調用。* open()接受三個參數:一個包含Storm配置的Map,一個TopologyContext對象,提供了topology中組件的信息,SpoutOutputCollector對象提供發射tuple的方法。* 在這個例子中,我們不需要執行初始化,只是簡單的存儲在一個SpoutOutputCollector實例變量。*/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector;}/*** nextTuple()方法是任何Spout實現的核心。* Storm調用這個方法,向輸出的collector發出tuple。* 在這里,我們只是發出當前索引的句子,并增加該索引準備發射下一個句子。*/@Overridepublic void nextTuple() {//collector.emit(new Values("hello world this is a test"));// TODO Auto-generated method stubthis.collector.emit(new Values(sentences[index]));index++;if (index>=sentences.length) {index=0;}Utils.sleep(1000);}/*** declareOutputFields是在IComponent接口中定義的,所有Storm的組件(spout和bolt)都必須實現這個接口* 用于告訴Storm流組件將會發出那些數據流,每個流的tuple將包含的字段*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("sentence"));//告訴組件發出數據流包含sentence字段}}

2.寫第一個bolt,將Spout傳過來的Tuple拆成一個個的單次,循環發給下一個bolt

/*** 訂閱sentence spout發射的tuple流,實現分割單詞* @author soul**/ public class SplitSentenceBolt extends BaseRichBolt {//BaseRichBolt是IComponent和IBolt接口的實現//繼承這個類,就不用去實現本例不關心的方法private OutputCollector collector;/*** prepare()方法類似于ISpout 的open()方法。* 這個方法在blot初始化時調用,可以用來準備bolt用到的資源,比如數據庫連接。* 本例子和SentenceSpout類一樣,SplitSentenceBolt類不需要太多額外的初始化,* 所以prepare()方法只保存OutputCollector對象的引用。*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector=collector;}/*** SplitSentenceBolt核心功能是在類IBolt定義execute()方法,這個方法是IBolt接口中定義。* 每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。* 本例中,收到的元組中查找“sentence”的值,* 并將該值拆分成單個的詞,然后按單詞發出新的tuple。*/@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString sentence = input.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {this.collector.emit(new Values(word));//向下一個bolt發射數據}}/*** plitSentenceBolt類定義一個元組流,每個包含一個字段(“word”)。*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("word"));}}

3.再寫一個bolt,一方面接收上個bolt傳過來的單次,另一方面將相同單次出現的次數記錄下來,并將現在的結果傳給下個bolt

/*** 訂閱 split sentence bolt的輸出流,實現單詞計數,并發送當前計數給下一個bolt* @author soul**/ public class WordCountBolt extends BaseRichBolt {private OutputCollector collector;//存儲單詞和對應的計數private HashMap<String, Long> counts = null;//注:不可序列化對象需在prepare中實例化/*** 大部分實例變量通常是在prepare()中進行實例化,這個設計模式是由topology的部署方式決定的* 因為在部署拓撲時,組件spout和bolt是在網絡上發送的序列化的實例變量。* 如果spout或bolt有任何non-serializable實例變量在序列化之前被實例化(例如,在構造函數中創建)* 會拋出NotSerializableException并且拓撲將無法發布。* 本例中因為HashMap 是可序列化的,所以可以安全地在構造函數中實例化。* 但是,通常情況下最好是在構造函數中對基本數據類型和可序列化的對象進行復制和實例化* 而在prepare()方法中對不可序列化的對象進行實例化。*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector;this.counts = new HashMap<String, Long>();}/*** 在execute()方法中,我們查找的收到的單詞的計數(如果不存在,初始化為0)* 然后增加計數并存儲,發出一個新的詞和當前計數組成的二元組。* 發射計數作為流允許拓撲的其他bolt訂閱和執行額外的處理。*/@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word = input.getStringByField("word");Long count = this.counts.get(word);if (count == null) {count = 0L;//如果不存在,初始化為0}count++;//增加計數this.counts.put(word, count);//存儲計數this.collector.emit(new Values(word,count));}/****/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//聲明一個輸出流,其中tuple包括了單詞和對應的計數,向后發射//其他bolt可以訂閱這個數據流進一步處理declarer.declare(new Fields("word","count"));}}

4.再寫一個bolt,接收上個bolt傳過來的單次統計結果,在控制臺打印.實際最后一步一般會將數據結果存在非關系型數據庫中,比如存入HBase或者Redis中

/*** 生成一份報告* @author soul**/ public class ReportBolt extends BaseRichBolt {private HashMap<String, Long> counts = null;//保存單詞和對應的計數@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.counts = new HashMap<String, Long>();}@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word = input.getStringByField("word");Long count = input.getLongByField("count");this.counts.put(word, count);//實時輸出System.out.println("結果:"+this.counts);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//這里是末端bolt,不需要發射數據流,這里無需定義}/*** cleanup是IBolt接口中定義* Storm在終止一個bolt之前會調用這個方法* 本例我們利用cleanup()方法在topology關閉時輸出最終的計數結果* 通常情況下,cleanup()方法用來釋放bolt占用的資源,如打開的文件句柄或數據庫連接* 但是當Storm拓撲在一個集群上運行,IBolt.cleanup()方法不能保證執行(這里是開發模式,生產環境不要這樣做)。*/@Overridepublic void cleanup(){System.out.println("---------- FINAL COUNTS -----------");ArrayList<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for(String key : keys){System.out.println(key + " : " + this.counts.get(key));}System.out.println("----------------------------");}}

5.寫拓撲結構,將前面四步的Spout和Bolt組成一個拓撲結構,直接運行主方法就能看到結果,這個是Storm的本地模式,將提交的方法稍作修改,就可以變成集群模式,實際都是集群模式,將這些代碼打成jar包傳到Nimbus上,運行在集群中

/*** 實現單詞計數topology**/ public class App {private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String REPORT_BOLT_ID = "report-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";public static void main( String[] args ) //throws Exception{//System.out.println( "Hello World!" );//實例化spout和boltSentenceSpout spout = new SentenceSpout();SplitSentenceBolt splitBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();TopologyBuilder builder = new TopologyBuilder();//創建了一個TopologyBuilder實例//TopologyBuilder提供流式風格的API來定義topology組件之間的數據流//builder.setSpout(SENTENCE_SPOUT_ID, spout);//注冊一個sentence spout//設置兩個Executeor(線程),默認一個builder.setSpout(SENTENCE_SPOUT_ID, spout,2);// SentenceSpout --> SplitSentenceBolt//注冊一個bolt并訂閱sentence發射出的數據流,shuffleGrouping方法告訴Storm要將SentenceSpout發射的tuple隨機均勻的分發給SplitSentenceBolt的實例//builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);//SplitSentenceBolt單詞分割器設置4個Task,2個Executeor(線程)builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);// SplitSentenceBolt --> WordCountBolt//fieldsGrouping將含有特定數據的tuple路由到特殊的bolt實例中//這里fieldsGrouping()方法保證所有“word”字段相同的tuuple會被路由到同一個WordCountBolt實例中//builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));//WordCountBolt單詞計數器設置4個Executeor(線程)builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));// WordCountBolt --> ReportBolt//globalGrouping是把WordCountBolt發射的所有tuple路由到唯一的ReportBoltbuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);Config config = new Config();//Config類是一個HashMap<String,Object>的子類,用來配置topology運行時的行為//設置worker數量//config.setNumWorkers(2);LocalCluster cluster = new LocalCluster();//本地提交cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());Utils.sleep(10000);cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();} }

storm與其他流式計算框架的對比

1.Spark Streaming
在處理前按時間間隔預先將其切分為一段一段的批處理作業.
Spark針對持續性數據流的抽象稱為DStream(DiscretizedStream),
一個DStream是一個微批處理(micro-batching)的RDD(彈性分布式數據集),
而RDD則是一種分布式數據集,能夠以兩種方式并行運作,分別是任意函數和滑動窗口數據的轉換。

2.Flink
針對流數據和批數據的分布式處理引擎
原生的流處理系統,
其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已
Flink 會把所有任務當成流來處理

3.Storm
原生的流處理系統,可以做到毫秒級處理

總結

以上是生活随笔為你收集整理的流式计算storm核心组件介绍以及入门案例---跟着就能在本地跑起来的storm项目的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产日韩二区 | 成人福利视频导航 | 国产乱人伦精品一区二区 | 欧美一级视频 | 超碰成人在线免费观看 | 国产性猛交xxxx免费看久久 | 特级性生活片 | 尤物视频一区 | 国产精品分类 | 国产福利影院 | 人人干人 | 熟女国产精品一区二区三 | 亚洲一二三av| 欧美一区二区三区在线观看 | 中国免费看的片 | 成人h视频在线 | 国产毛片一区二区三区va在线 | 一区二区在线观看免费 | 特种兵之深入敌后高清全集免费观看 | www.日本高清 | 九九热在线视频 | 国产精品亚洲一区二区无码 | 韩国精品在线观看 | 美女网站在线看 | 好吊视频在线观看 | 影音先锋一区 | 久久综合成人 | 色播久久 | 国产一区二区三区精品在线 | 久久久久久国产精品日本 | 成人欧美性 | 瑟瑟视频免费观看 | 一起艹在线观看 | 成 人片 黄 色 大 片 | 91亚洲精品久久久蜜桃 | 日本激情在线 | 国产伦理片在线观看 | 亚洲精品传媒 | 扒开腿揉捏花蒂h | 尤物在线免费观看 | 国产亚洲欧美在线 | 亚洲欧美激情精品一区二区 | 大桥未久av在线播放 | 日韩av一区二区三区四区 | 成人动漫免费观看 | 亚洲色图.com| 日韩av二区 | 尤物在线免费观看 | 欧美性猛交一区二区三区精品 | av鲁丝一区鲁丝二区鲁丝 | 精品一区二区三区久久久 | 香蕉视频免费在线观看 | 狠狠躁18三区二区一区传媒剧情 | 久久久久久逼 | 超污网站在线观看 | 精品久久久蜜桃 | 2017天天干 | 欧美高清性 | 久久精品国产欧美亚洲人人爽 | 美腿丝袜一区二区三区 | av网站在线观看不卡 | 桃色视频网站 | 日批免费观看 | 欧美最猛性xxxxx(亚洲精品) | 色撸撸av | 精品一区久久 | 日本黄视频网站 | 国产αv| 国产在线123 | 阿v天堂网| 亚洲福利 | 激情综合五月天 | 九一在线视频 | 国产麻豆午夜三级精品 | 黄片毛片视频 | xxxxwwww国产 | 午夜性刺激免费视频 | 99久久久无码国产精品免费 | 91在线 | 69人人 | 国产精品xxx在线 | 亚洲久久久久久久 | 天天爽天天爽天天爽 | 午夜国产在线视频 | 免费看裸体视频网站 | 国产超碰人人爽人人做人人爱 | 色眯眯av | 91视频最新 | 日韩免费视频观看 | 国产伦精品一区二区三区四区视频 | 亚洲短视频| 物业福利视频 | 日韩中文在线一区 | 91福利视频导航 | 国产中文在线观看 | 亚洲男人第一网站 | 久久国产三级 | 日韩黄色短视频 | 精品国偷自产一区二区三区 |