日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka+Storm+HDFS整合实践

發布時間:2023/12/31 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka+Storm+HDFS整合实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

在基于Hadoop平臺的很多應用場景中,我們需要對數據進行離線和實時分析,離線分析可以很容易地借助于Hive來實現統計分析,但是對于實時的需求Hive就不合適了。實時應用場景可以使用Storm,它是一個實時處理系統,它為實時處理類應用提供了一個計算模型,可以很容易地進行編程處理。為了統一離線和實時計算,一般情況下,我們都希望將離線和實時計算的數據源的集合統一起來作為輸入,然后將數據的流向分別經由實時系統和離線分析系統,分別進行分析處理,這時我們可以考慮將數據源(如使用Flume收集日志)直接連接一個消息中間件,如Kafka,可以整合Flume+Kafka,Flume作為消息的Producer,生產的消息數據(日志數據、業務請求數據等等)發布到Kafka中,然后通過訂閱的方式,使用Storm的Topology作為消息的Consumer,在Storm集群中分別進行如下兩個需求場景的處理:

  • 直接使用Storm的Topology對數據進行實時分析處理
  • 整合Storm+HDFS,將消息處理后寫入HDFS進行離線分析處理

實時處理,只要開發滿足業務需要的Topology即可,不做過多說明。這里,我們主要從安裝配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS這幾點來配置實踐,滿足上面提出的一些需求。配置實踐使用的軟件包如下所示:

  • zookeeper-3.4.5.tar.gz
  • kafka_2.9.2-0.8.1.1.tgz
  • apache-storm-0.9.2-incubating.tar.gz
  • hadoop-2.2.0.tar.gz

程序配置運行所基于的操作系統為CentOS 5.11。

Kafka安裝配置

我們使用3臺機器搭建Kafka集群:

1192.168.4.142?? h1
2192.168.4.143?? h2
3192.168.4.144?? h3

在安裝Kafka集群之前,這里沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集群,也是使用這3臺機器,保證Zookeeper集群正常運行。
首先,在h1上準備Kafka安裝文件,執行如下命令:

1cd?/usr/local/
2wget?http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
3tar?xvzf kafka_2.9.2-0.8.1.1.tgz
4ln?-s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
5chown?-R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

修改配置文件/usr/local/kafka/config/server.properties,修改如下內容:

1broker.id=0
2zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

這里需要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,如果你有其他的應用也在使用ZooKeeper集群,查看ZooKeeper中數據可能會不直觀,所以強烈建議指定一個chroot路徑,直接在zookeeper.connect配置項中指定:

1zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

而且,需要手動在ZooKeeper中創建路徑/kafka,使用如下命令連接到任意一臺ZooKeeper服務器:

1cd?/usr/local/zookeeper
2bin/zkCli.sh

在ZooKeeper執行如下命令創建chroot路徑:

1create /kafka ''

這樣,每次連接Kafka集群的時候(使用--zookeeper選項),也必須使用帶chroot路徑的連接字符串,后面會看到。
然后,將配置好的安裝文件同步到其他的h2、h3節點上:

1scp?-r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
2scp?-r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/

最后,在h2、h3節點上配置,執行如下命令:

1cd?/usr/local/
2ln?-s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
3chown?-R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

并修改配置文件/usr/local/kafka/config/server.properties內容如下所示:

1broker.id=1? # 在h1修改
2?
3broker.id=2? # 在h2修改

因為Kafka集群需要保證各個Broker的id在整個集群中必須唯一,需要調整這個配置項的值(如果在單機上,可以通過建立多個Broker進程來模擬分布式的Kafka集群,也需要Broker的id唯一,還需要修改一些配置目錄的信息)。
在集群中的h1、h2、h3這三個節點上分別啟動Kafka,分別執行如下命令:

1bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

可以通過查看日志,或者檢查進程狀態,保證Kafka集群啟動成功。
我們創建一個名稱為my-replicated-topic5的Topic,5個分區,并且復制因子為3,執行如下命令:

1bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5

查看創建的Topic,執行如下命令:

1bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5

結果信息如下所示:

1Topic:my-replicated-topic5???? PartitionCount:5???? ReplicationFactor:3???? Configs:
2?????Topic: my-replicated-topic5???? Partition: 0???? Leader: 0???? Replicas: 0,2,1???? Isr: 0,2,1
3?????Topic: my-replicated-topic5???? Partition: 1???? Leader: 0???? Replicas: 1,0,2???? Isr: 0,2,1
4?????Topic: my-replicated-topic5???? Partition: 2???? Leader: 2???? Replicas: 2,1,0???? Isr: 2,0,1
5?????Topic: my-replicated-topic5???? Partition: 3???? Leader: 0???? Replicas: 0,1,2???? Isr: 0,2,1
6?????Topic: my-replicated-topic5???? Partition: 4???? Leader: 2???? Replicas: 1,2,0???? Isr: 2,0,1

上面Leader、Replicas、Isr的含義如下:

1Partition: 分區
2Leader?? : 負責讀寫指定分區的節點
3Replicas : 復制該分區log的節點列表
4Isr????? : "in-sync" replicas,當前活躍的副本列表(是一個子集),并且可能成為Leader

我們可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示如果發布消息、消費消息。
在一個終端,啟動Producer,并向我們上面創建的名稱為my-replicated-topic5的Topic中生產消息,執行如下腳本:

1bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5

在另一個終端,啟動Consumer,并訂閱我們上面創建的名稱為my-replicated-topic5的Topic中生產的消息,執行如下腳本:

1bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5

可以在Producer終端上輸入字符串消息行,然后回車,就可以在Consumer終端上看到消費者消費的消息內容。
也可以參考Kafka的Producer和Consumer的Java API,通過API編碼的方式來實現消息生產和消費的處理邏輯。

Storm安裝配置

Storm集群也依賴Zookeeper集群,要保證Zookeeper集群正常運行。Storm的安裝配置比較簡單,我們仍然使用下面3臺機器搭建:

1192.168.4.142?? h1
2192.168.4.143?? h2
3192.168.4.144?? h3

首先,在h1節點上,執行如下命令安裝:

1cd?/usr/local/
2wget?http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
3tar?xvzf apache-storm-0.9.2-incubating.tar.gz
4ln?-s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
5chown?-R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

然后,修改配置文件conf/storm.yaml,內容如下所示:

01storm.zookeeper.servers:
02?????- "h1"
03?????- "h2"
04?????- "h3"
05storm.zookeeper.port: 2181
06#
07nimbus.host: "h1"
08?
09supervisor.slots.ports:
10????- 6700
11????- 6701
12????- 6702
13????- 6703
14?
15storm.local.dir: "/tmp/storm"

將配置好的安裝文件,分發到其他節點上:

1scp?-r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
2scp?-r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

最后,在h2、h3節點上配置,執行如下命令:

1cd?/usr/local/
2ln?-s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
3chown?-R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

Storm集群的主節點為Nimbus,從節點為Supervisor,我們需要在h1上啟動Nimbus服務,在從節點h2、h3上啟動Supervisor服務:

1bin/storm nimbus &
2bin/storm supervisor &

為了方便監控,可以啟動Storm UI,可以從Web頁面上監控Storm Topology的運行狀態,例如在h2上啟動:

1bin/storm ui &

這樣可以通過訪問http://h2:8080/來查看Topology的運行狀況。

整合Kafka+Storm

消息通過各種方式進入到Kafka消息中間件,比如可以通過使用Flume來收集日志數據,然后在Kafka中路由暫存,然后再由實時計算程序Storm做實時分析,這時我們就需要將在Storm的Spout中讀取Kafka中的消息,然后交由具體的Spot組件去分析處理。實際上,apache-storm-0.9.2-incubating這個版本的Storm已經自帶了一個集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依賴配置,如下所示:

01<dependency>
02?????<groupId>org.apache.storm</groupId>
03?????<artifactId>storm-core</artifactId>
04?????<version>0.9.2-incubating</version>
05?????<scope>provided</scope>
06</dependency>
07<dependency>
08?????<groupId>org.apache.storm</groupId>
09?????<artifactId>storm-kafka</artifactId>
10?????<version>0.9.2-incubating</version>
11</dependency>
12<dependency>
13?????<groupId>org.apache.kafka</groupId>
14?????<artifactId>kafka_2.9.2</artifactId>
15?????<version>0.8.1.1</version>
16?????<exclusions>
17??????????<exclusion>
18???????????????<groupId>org.apache.zookeeper</groupId>
19???????????????<artifactId>zookeeper</artifactId>
20??????????</exclusion>
21??????????<exclusion>
22???????????????<groupId>log4j</groupId>
23???????????????<artifactId>log4j</artifactId>
24??????????</exclusion>
25?????</exclusions>
26</dependency>

下面,我們開發了一個簡單WordCount示例程序,從Kafka讀取訂閱的消息行,通過空格拆分出單個單詞,然后再做詞頻統計計算,實現的Topology的代碼,如下所示:

001package?org.shirdrn.storm.examples;
002?
003import?java.util.Arrays;
004import?java.util.HashMap;
005import?java.util.Iterator;
006import?java.util.Map;
007import?java.util.Map.Entry;
008import?java.util.concurrent.atomic.AtomicInteger;
009?
010import?org.apache.commons.logging.Log;
011import?org.apache.commons.logging.LogFactory;
012?
013import?storm.kafka.BrokerHosts;
014import?storm.kafka.KafkaSpout;
015import?storm.kafka.SpoutConfig;
016import?storm.kafka.StringScheme;
017import?storm.kafka.ZkHosts;
018import?backtype.storm.Config;
019import?backtype.storm.LocalCluster;
020import?backtype.storm.StormSubmitter;
021import?backtype.storm.generated.AlreadyAliveException;
022import?backtype.storm.generated.InvalidTopologyException;
023import?backtype.storm.spout.SchemeAsMultiScheme;
024import?backtype.storm.task.OutputCollector;
025import?backtype.storm.task.TopologyContext;
026import?backtype.storm.topology.OutputFieldsDeclarer;
027import?backtype.storm.topology.TopologyBuilder;
028import?backtype.storm.topology.base.BaseRichBolt;
029import?backtype.storm.tuple.Fields;
030import?backtype.storm.tuple.Tuple;
031import?backtype.storm.tuple.Values;
032?
033public?class?MyKafkaTopology {
034?
035?????public?static?class?KafkaWordSplitter?extends?BaseRichBolt {
036?
037??????????private?static?final?Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
038??????????private?static?final?long?serialVersionUID = 886149197481637894L;
039??????????private?OutputCollector collector;
040??????????
041??????????@Override
042??????????public?void?prepare(Map stormConf, TopologyContext context,
043????????????????????OutputCollector collector) {
044???????????????this.collector = collector;?????????????
045??????????}
046?
047??????????@Override
048??????????public?void?execute(Tuple input) {
049???????????????String line = input.getString(0);
050???????????????LOG.info("RECV[kafka -> splitter] "?+ line);
051???????????????String[] words = line.split("\\s+");
052???????????????for(String word : words) {
053????????????????????LOG.info("EMIT[splitter -> counter] "?+ word);
054????????????????????collector.emit(input,?new?Values(word,?1));
055???????????????}
056???????????????collector.ack(input);
057??????????}
058?
059??????????@Override
060??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) {
061???????????????declarer.declare(new?Fields("word",?"count"));????????
062??????????}
063??????????
064?????}
065?????
066?????public?static?class?WordCounter?extends?BaseRichBolt {
067?
068??????????private?static?final?Log LOG = LogFactory.getLog(WordCounter.class);
069??????????private?static?final?long?serialVersionUID = 886149197481637894L;
070??????????private?OutputCollector collector;
071??????????private?Map<String, AtomicInteger> counterMap;
072??????????
073??????????@Override
074??????????public?void?prepare(Map stormConf, TopologyContext context,
075????????????????????OutputCollector collector) {
076???????????????this.collector = collector;???
077???????????????this.counterMap =?new?HashMap<String, AtomicInteger>();
078??????????}
079?
080??????????@Override
081??????????public?void?execute(Tuple input) {
082???????????????String word = input.getString(0);
083???????????????int?count = input.getInteger(1);
084???????????????LOG.info("RECV[splitter -> counter] "?+ word +?" : "?+ count);
085???????????????AtomicInteger ai =?this.counterMap.get(word);
086???????????????if(ai ==?null) {
087????????????????????ai =?new?AtomicInteger();
088????????????????????this.counterMap.put(word, ai);
089???????????????}
090???????????????ai.addAndGet(count);
091???????????????collector.ack(input);
092???????????????LOG.info("CHECK statistics map: "?+?this.counterMap);
093??????????}
094?
095??????????@Override
096??????????public?void?cleanup() {
097???????????????LOG.info("The final result:");
098???????????????Iterator<Entry<String, AtomicInteger>> iter =this.counterMap.entrySet().iterator();
099???????????????while(iter.hasNext()) {
100????????????????????Entry<String, AtomicInteger> entry = iter.next();
101????????????????????LOG.info(entry.getKey() +?"\t:\t"?+ entry.getValue().get());
102???????????????}
103???????????????
104??????????}
105?
106??????????@Override
107??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) {
108???????????????declarer.declare(new?Fields("word",?"count"));????????
109??????????}
110?????}
111?????
112?????public?static?void?main(String[] args)?throws?AlreadyAliveException, InvalidTopologyException, InterruptedException {
113??????????String zks =?"h1:2181,h2:2181,h3:2181";
114??????????String topic =?"my-replicated-topic5";
115??????????String zkRoot =?"/storm";?// default zookeeper root configuration for storm
116??????????String id =?"word";
117??????????
118??????????BrokerHosts brokerHosts =?new?ZkHosts(zks);
119??????????SpoutConfig spoutConf =?new?SpoutConfig(brokerHosts, topic, zkRoot, id);
120??????????spoutConf.scheme =?new?SchemeAsMultiScheme(new?StringScheme());
121??????????spoutConf.forceFromStart =?false;
122??????????spoutConf.zkServers = Arrays.asList(new?String[] {"h1",?"h2",?"h3"});
123??????????spoutConf.zkPort =?2181;
124??????????
125??????????TopologyBuilder builder =?new?TopologyBuilder();
126??????????builder.setSpout("kafka-reader",?new?KafkaSpout(spoutConf),?5);?// Kafka我們創建了一個5分區的Topic,這里并行度設置為5
127??????????builder.setBolt("word-splitter",?new?KafkaWordSplitter(),2).shuffleGrouping("kafka-reader");
128??????????builder.setBolt("word-counter",?new?WordCounter()).fieldsGrouping("word-splitter",new?Fields("word"));
129??????????
130??????????Config conf =?new?Config();
131??????????
132??????????String name = MyKafkaTopology.class.getSimpleName();
133??????????if?(args !=?null?&& args.length >?0) {
134???????????????// Nimbus host name passed from command line
135???????????????conf.put(Config.NIMBUS_HOST, args[0]);
136???????????????conf.setNumWorkers(3);
137???????????????StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
138??????????}?else?{
139???????????????conf.setMaxTaskParallelism(3);
140???????????????LocalCluster cluster =?new?LocalCluster();
141???????????????cluster.submitTopology(name, conf, builder.createTopology());
142???????????????Thread.sleep(60000);
143???????????????cluster.shutdown();
144??????????}
145?????}
146}

上面程序,在本地調試(使用LocalCluster)不需要輸入任何參數,提交到實際集群中運行時,需要傳遞一個參數,該參數為Nimbus的主機名稱。
通過Maven構建,生成一個包含依賴的single jar文件(不要把Storm的依賴包添加進去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因為用到了Kafka,需要拷貝一下依賴jar文件到Storm集群中的lib目錄下面:

1cp?/usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
2cp?/usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
3cp?/usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
4cp?/usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
5cp?/usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
6cp?/usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
7cp?/usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
8cp?/usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

然后,就可以提交我們開發的Topology程序了:

1bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

可以通過查看日志文件(logs/目錄下)或者Storm UI來監控Topology的運行狀況。如果程序沒有錯誤,可以使用前面我們使用的Kafka Producer來生成消息,就能看到我們開發的Storm Topology能夠實時接收到并進行處理。
上面Topology實現代碼中,有一個很關鍵的配置對象SpoutConfig,配置屬性如下所示:

1spoutConf.forceFromStart =?false;

該配置是指,如果該Topology因故障停止處理,下次正常運行時是否從Spout對應數據源Kafka中的該訂閱Topic的起始位置開始讀取,如果forceFromStart=true,則之前處理過的Tuple還要重新處理一遍,否則會從上次處理的位置繼續處理,保證Kafka中的Topic數據不被重復處理,是在數據源的位置進行狀態記錄。

整合Storm+HDFS

Storm實時計算集群從Kafka消息中間件中消費消息,有實時處理需求的可以走實時處理程序,還有需要進行離線分析的需求,如寫入到HDFS進行分析。下面實現了一個Topology,代碼如下所示:

001package?org.shirdrn.storm.examples;
002?
003import?java.text.DateFormat;
004import?java.text.SimpleDateFormat;
005import?java.util.Date;
006import?java.util.Map;
007import?java.util.Random;
008?
009import?org.apache.commons.logging.Log;
010import?org.apache.commons.logging.LogFactory;
011import?org.apache.storm.hdfs.bolt.HdfsBolt;
012import?org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
013import?org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
014import?org.apache.storm.hdfs.bolt.format.FileNameFormat;
015import?org.apache.storm.hdfs.bolt.format.RecordFormat;
016import?org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
017import?org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
018import?org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
019import?org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
020import?org.apache.storm.hdfs.bolt.sync.SyncPolicy;
021?
022import?backtype.storm.Config;
023import?backtype.storm.LocalCluster;
024import?backtype.storm.StormSubmitter;
025import?backtype.storm.generated.AlreadyAliveException;
026import?backtype.storm.generated.InvalidTopologyException;
027import?backtype.storm.spout.SpoutOutputCollector;
028import?backtype.storm.task.TopologyContext;
029import?backtype.storm.topology.OutputFieldsDeclarer;
030import?backtype.storm.topology.TopologyBuilder;
031import?backtype.storm.topology.base.BaseRichSpout;
032import?backtype.storm.tuple.Fields;
033import?backtype.storm.tuple.Values;
034import?backtype.storm.utils.Utils;
035?
036public?class?StormToHDFSTopology {
037?
038?????public?static?class?EventSpout?extends?BaseRichSpout {
039?
040??????????private?static?final?Log LOG = LogFactory.getLog(EventSpout.class);
041??????????private?static?final?long?serialVersionUID = 886149197481637894L;
042??????????private?SpoutOutputCollector collector;
043??????????private?Random rand;
044??????????private?String[] records;
045??????????
046??????????@Override
047??????????public?void?open(Map conf, TopologyContext context,
048????????????????????SpoutOutputCollector collector) {
049???????????????this.collector = collector;???
050???????????????rand =?new?Random();
051???????????????records =?new?String[] {
052?????????????????????????"10001???? ef2da82d4c8b49c44199655dc14f39f6???? 4.2.1???? HUAWEI G610-U00???? HUAWEI???? 2???? 70:72:3c:73:8b:22???? 2014-10-13 12:36:35",
053?????????????????????????"10001???? ffb52739a29348a67952e47c12da54ef???? 4.3???? GT-I9300???? samsung???? 2???? 50:CC:F8:E4:22:E2???? 2014-10-13 12:36:02",
054?????????????????????????"10001???? ef2da82d4c8b49c44199655dc14f39f6???? 4.2.1???? HUAWEI G610-U00???? HUAWEI???? 2???? 70:72:3c:73:8b:22???? 2014-10-13 12:36:35"
055???????????????};
056??????????}
057?
058?
059??????????@Override
060??????????public?void?nextTuple() {
061???????????????Utils.sleep(1000);
062???????????????DateFormat df =?new?SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
063???????????????Date d =?new?Date(System.currentTimeMillis());
064???????????????String minute = df.format(d);
065???????????????String record = records[rand.nextInt(records.length)];
066???????????????LOG.info("EMIT[spout -> hdfs] "?+ minute +?" : "?+ record);
067???????????????collector.emit(new?Values(minute, record));
068??????????}
069?
070??????????@Override
071??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) {
072???????????????declarer.declare(new?Fields("minute",?"record"));????????
073??????????}
074?
075?
076?????}
077?????
078?????public?static?void?main(String[] args)?throws?AlreadyAliveException, InvalidTopologyException, InterruptedException {
079??????????// use "|" instead of "," for field delimiter
080??????????RecordFormat format =?new?DelimitedRecordFormat()
081??????????????????.withFieldDelimiter(" : ");
082?
083??????????// sync the filesystem after every 1k tuples
084??????????SyncPolicy syncPolicy =?new?CountSyncPolicy(1000);
085?
086??????????// rotate files
087??????????FileRotationPolicy rotationPolicy =?new?TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
088?
089??????????FileNameFormat fileNameFormat =?new?DefaultFileNameFormat()
090??????????????????.withPath("/storm/").withPrefix("app_").withExtension(".log");
091?
092??????????HdfsBolt hdfsBolt =?new?HdfsBolt()
093??????????????????.withFsUrl("hdfs://h1:8020")
094??????????????????.withFileNameFormat(fileNameFormat)
095??????????????????.withRecordFormat(format)
096??????????????????.withRotationPolicy(rotationPolicy)
097??????????????????.withSyncPolicy(syncPolicy);
098??????????
099??????????TopologyBuilder builder =?new?TopologyBuilder();
100??????????builder.setSpout("event-spout",?new?EventSpout(),?3);
101??????????builder.setBolt("hdfs-bolt", hdfsBolt,?2).fieldsGrouping("event-spout",?newFields("minute"));
102??????????
103??????????Config conf =?new?Config();
104??????????
105??????????String name = StormToHDFSTopology.class.getSimpleName();
106??????????if?(args !=?null?&& args.length >?0) {
107???????????????conf.put(Config.NIMBUS_HOST, args[0]);
108???????????????conf.setNumWorkers(3);
109???????????????StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
110??????????}?else?{
111???????????????conf.setMaxTaskParallelism(3);
112???????????????LocalCluster cluster =?new?LocalCluster();
113???????????????cluster.submitTopology(name, conf, builder.createTopology());
114???????????????Thread.sleep(60000);
115???????????????cluster.shutdown();
116??????????}
117?????}
118?
119}

上面的處理邏輯,可以對HdfsBolt進行更加詳細的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以設置在滿足什么條件下,切出一個新的日志,如可以指定多長時間切出一個新的日志文件,可以指定一個日志文件大小達到設置值后,再寫一個新日志文件),更多設置可以參考storm-hdfs,。
上面代碼在打包的時候,需要注意,使用storm-starter自帶的Maven打包配置,可能在將Topology部署運行的時候,會報錯,可以使用maven-shade-plugin這個插件,如下配置所示:

01<plugin>
02????<groupId>org.apache.maven.plugins</groupId>
03????<artifactId>maven-shade-plugin</artifactId>
04????<version>1.4</version>
05????<configuration>
06????????<createDependencyReducedPom>true</createDependencyReducedPom>
07????</configuration>
08????<executions>
09????????<execution>
10????????????<phase>package</phase>
11????????????<goals>
12????????????????<goal>shade</goal>
13????????????</goals>
14????????????<configuration>
15????????????????<transformers>
16????????????????????<transformer
17????????????????????????????implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
18????????????????????<transformer
19????????????????????????????implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
20????????????????????????<mainClass></mainClass>
21????????????????????</transformer>
22????????????????</transformers>
23????????????</configuration>
24????????</execution>
25????</executions>
26</plugin>

整合Kafka+Storm+HDFS

上面分別對整合Kafka+Storm和Storm+HDFS做了實踐,可以將后者的Spout改成前者的Spout,從Kafka中消費消息,在Storm中可以做簡單處理,然后將數據寫入HDFS,最后可以在Hadoop平臺上對數據進行離線分析處理。下面,寫了一個簡單的例子,從Kafka消費消息,然后經由Storm處理,寫入到HDFS存儲,代碼如下所示:

001package?org.shirdrn.storm.examples;
002?
003import?java.util.Arrays;
004import?java.util.Map;
005?
006import?org.apache.commons.logging.Log;
007import?org.apache.commons.logging.LogFactory;
008import?org.apache.storm.hdfs.bolt.HdfsBolt;
009import?org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
010import?org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
011import?org.apache.storm.hdfs.bolt.format.FileNameFormat;
012import?org.apache.storm.hdfs.bolt.format.RecordFormat;
013import?org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
014import?org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
015import?org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
016import?org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
017import?org.apache.storm.hdfs.bolt.sync.SyncPolicy;
018?
019import?storm.kafka.BrokerHosts;
020import?storm.kafka.KafkaSpout;
021import?storm.kafka.SpoutConfig;
022import?storm.kafka.StringScheme;
023import?storm.kafka.ZkHosts;
024import?backtype.storm.Config;
025import?backtype.storm.LocalCluster;
026import?backtype.storm.StormSubmitter;
027import?backtype.storm.generated.AlreadyAliveException;
028import?backtype.storm.generated.InvalidTopologyException;
029import?backtype.storm.spout.SchemeAsMultiScheme;
030import?backtype.storm.task.OutputCollector;
031import?backtype.storm.task.TopologyContext;
032import?backtype.storm.topology.OutputFieldsDeclarer;
033import?backtype.storm.topology.TopologyBuilder;
034import?backtype.storm.topology.base.BaseRichBolt;
035import?backtype.storm.tuple.Fields;
036import?backtype.storm.tuple.Tuple;
037import?backtype.storm.tuple.Values;
038?
039public?class?DistributeWordTopology {
040?????
041?????public?static?class?KafkaWordToUpperCase?extends?BaseRichBolt {
042?
043??????????private?static?final?Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
044??????????private?static?final?long?serialVersionUID = -5207232012035109026L;
045??????????private?OutputCollector collector;
046??????????
047??????????@Override
048??????????public?void?prepare(Map stormConf, TopologyContext context,
049????????????????????OutputCollector collector) {
050???????????????this.collector = collector;?????????????
051??????????}
052?
053??????????@Override
054??????????public?void?execute(Tuple input) {
055???????????????String line = input.getString(0).trim();
056???????????????LOG.info("RECV[kafka -> splitter] "?+ line);
057???????????????if(!line.isEmpty()) {
058????????????????????String upperLine = line.toUpperCase();
059????????????????????LOG.info("EMIT[splitter -> counter] "?+ upperLine);
060????????????????????collector.emit(input,?new?Values(upperLine, upperLine.length()));
061???????????????}
062???????????????collector.ack(input);
063??????????}
064?
065??????????@Override
066??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) {
067???????????????declarer.declare(new?Fields("line",?"len"));????????
068??????????}
069??????????
070?????}
071?????
072?????public?static?class?RealtimeBolt?extends?BaseRichBolt {
073?
074??????????private?static?final?Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
075??????????private?static?final?long?serialVersionUID = -4115132557403913367L;
076??????????private?OutputCollector collector;
077??????????
078??????????@Override
079??????????public?void?prepare(Map stormConf, TopologyContext context,
080????????????????????OutputCollector collector) {
081???????????????this.collector = collector;?????????????
082??????????}
083?
084??????????@Override
085??????????public?void?execute(Tuple input) {
086???????????????String line = input.getString(0).trim();
087???????????????LOG.info("REALTIME: "?+ line);
088???????????????collector.ack(input);
089??????????}
090?
091??????????@Override
092??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) {
093???????????????
094??????????}
095?
096?????}
097?
098?????public?static?void?main(String[] args)?throws?AlreadyAliveException, InvalidTopologyException, InterruptedException {
099?
100??????????// Configure Kafka
101??????????String zks =?"h1:2181,h2:2181,h3:2181";
102??????????String topic =?"my-replicated-topic5";
103??????????String zkRoot =?"/storm";?// default zookeeper root configuration for storm
104??????????String id =?"word";
105??????????BrokerHosts brokerHosts =?new?ZkHosts(zks);
106??????????SpoutConfig spoutConf =?new?SpoutConfig(brokerHosts, topic, zkRoot, id);
107??????????spoutConf.scheme =?new?SchemeAsMultiScheme(new?StringScheme());
108??????????spoutConf.forceFromStart =?false;
109??????????spoutConf.zkServers = Arrays.asList(new?String[] {"h1",?"h2",?"h3"});
110??????????spoutConf.zkPort =?2181;
111??????????
112??????????// Configure HDFS bolt
113??????????RecordFormat format =?new?DelimitedRecordFormat()
114??????????????????.withFieldDelimiter("\t");?// use "\t" instead of "," for field delimiter
115??????????SyncPolicy syncPolicy =?new?CountSyncPolicy(1000);?// sync the filesystem after every 1k tuples
116??????????FileRotationPolicy rotationPolicy =?new?TimedRotationPolicy(1.0f, TimeUnit.MINUTES);?// rotate files
117??????????FileNameFormat fileNameFormat =?new?DefaultFileNameFormat()
118??????????????????.withPath("/storm/").withPrefix("app_").withExtension(".log");?// set file name format
119??????????HdfsBolt hdfsBolt =?new?HdfsBolt()
120??????????????????.withFsUrl("hdfs://h1:8020")
121??????????????????.withFileNameFormat(fileNameFormat)
122??????????????????.withRecordFormat(format)
123??????????????????.withRotationPolicy(rotationPolicy)
124??????????????????.withSyncPolicy(syncPolicy);
125??????????
126??????????// configure & build topology
127??????????TopologyBuilder builder =?new?TopologyBuilder();
128??????????builder.setSpout("kafka-reader",?new?KafkaSpout(spoutConf),?5);
129??????????builder.setBolt("to-upper",?new?KafkaWordToUpperCase(),?3).shuffleGrouping("kafka-reader");
130??????????builder.setBolt("hdfs-bolt", hdfsBolt,?2).shuffleGrouping("to-upper");
131??????????builder.setBolt("realtime",?new?RealtimeBolt(),?2).shuffleGrouping("to-upper");
132??????????
133??????????// submit topology
134??????????Config conf =?new?Config();
135??????????String name = DistributeWordTopology.class.getSimpleName();
136??????????if?(args !=?null?&& args.length >?0) {
137???????????????String nimbus = args[0];
138???????????????conf.put(Config.NIMBUS_HOST, nimbus);
139???????????????conf.setNumWorkers(3);
140???????????????StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
141??????????}?else?{
142???????????????conf.setMaxTaskParallelism(3);
143???????????????LocalCluster cluster =?new?LocalCluster();
144???????????????cluster.submitTopology(name, conf, builder.createTopology());
145???????????????Thread.sleep(60000);
146???????????????cluster.shutdown();
147??????????}
148?????}
149?
150}

上面代碼中,名稱為to-upper的Bolt將接收到的字符串行轉換成大寫以后,會將處理過的數據向后面的hdfs-bolt、realtime這兩個Bolt各發一份拷貝,然后由這兩個Bolt分別根據實際需要(實時/離線)單獨處理。
打包后,在Storm集群上部署并運行這個Topology:

1bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

可以通過Storm UI查看Topology運行情況,可以查看HDFS上生成的數據。

轉載于:https://my.oschina.net/u/2603356/blog/687243

總結

以上是生活随笔為你收集整理的Kafka+Storm+HDFS整合实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。