2019獨角獸企業重金招聘Python工程師標準>>>
在基于Hadoop平臺的很多應用場景中,我們需要對數據進行離線和實時分析,離線分析可以很容易地借助于Hive來實現統計分析,但是對于實時的需求Hive就不合適了。實時應用場景可以使用Storm,它是一個實時處理系統,它為實時處理類應用提供了一個計算模型,可以很容易地進行編程處理。為了統一離線和實時計算,一般情況下,我們都希望將離線和實時計算的數據源的集合統一起來作為輸入,然后將數據的流向分別經由實時系統和離線分析系統,分別進行分析處理,這時我們可以考慮將數據源(如使用Flume收集日志)直接連接一個消息中間件,如Kafka,可以整合Flume+Kafka,Flume作為消息的Producer,生產的消息數據(日志數據、業務請求數據等等)發布到Kafka中,然后通過訂閱的方式,使用Storm的Topology作為消息的Consumer,在Storm集群中分別進行如下兩個需求場景的處理:
- 直接使用Storm的Topology對數據進行實時分析處理
- 整合Storm+HDFS,將消息處理后寫入HDFS進行離線分析處理
實時處理,只要開發滿足業務需要的Topology即可,不做過多說明。這里,我們主要從安裝配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS這幾點來配置實踐,滿足上面提出的一些需求。配置實踐使用的軟件包如下所示:
- zookeeper-3.4.5.tar.gz
- kafka_2.9.2-0.8.1.1.tgz
- apache-storm-0.9.2-incubating.tar.gz
- hadoop-2.2.0.tar.gz
程序配置運行所基于的操作系統為CentOS 5.11。
Kafka安裝配置
我們使用3臺機器搭建Kafka集群:
在安裝Kafka集群之前,這里沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集群,也是使用這3臺機器,保證Zookeeper集群正常運行。
首先,在h1上準備Kafka安裝文件,執行如下命令:
| 2 | wget?http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz |
| 3 | tar?xvzf kafka_2.9.2-0.8.1.1.tgz |
| 4 | ln?-s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka |
| 5 | chown?-R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka |
修改配置文件/usr/local/kafka/config/server.properties,修改如下內容:
| 2 | zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka |
這里需要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,如果你有其他的應用也在使用ZooKeeper集群,查看ZooKeeper中數據可能會不直觀,所以強烈建議指定一個chroot路徑,直接在zookeeper.connect配置項中指定:
| 1 | zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka |
而且,需要手動在ZooKeeper中創建路徑/kafka,使用如下命令連接到任意一臺ZooKeeper服務器:
在ZooKeeper執行如下命令創建chroot路徑:
這樣,每次連接Kafka集群的時候(使用--zookeeper選項),也必須使用帶chroot路徑的連接字符串,后面會看到。
然后,將配置好的安裝文件同步到其他的h2、h3節點上:
| 1 | scp?-r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/ |
| 2 | scp?-r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/ |
最后,在h2、h3節點上配置,執行如下命令:
| 2 | ln?-s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka |
| 3 | chown?-R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka |
并修改配置文件/usr/local/kafka/config/server.properties內容如下所示:
因為Kafka集群需要保證各個Broker的id在整個集群中必須唯一,需要調整這個配置項的值(如果在單機上,可以通過建立多個Broker進程來模擬分布式的Kafka集群,也需要Broker的id唯一,還需要修改一些配置目錄的信息)。
在集群中的h1、h2、h3這三個節點上分別啟動Kafka,分別執行如下命令:
| 1 | bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & |
可以通過查看日志,或者檢查進程狀態,保證Kafka集群啟動成功。
我們創建一個名稱為my-replicated-topic5的Topic,5個分區,并且復制因子為3,執行如下命令:
| 1 | bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5 |
查看創建的Topic,執行如下命令:
| 1 | bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5 |
結果信息如下所示:
| 1 | Topic:my-replicated-topic5???? PartitionCount:5???? ReplicationFactor:3???? Configs: |
| 2 | ?????Topic: my-replicated-topic5???? Partition: 0???? Leader: 0???? Replicas: 0,2,1???? Isr: 0,2,1 |
| 3 | ?????Topic: my-replicated-topic5???? Partition: 1???? Leader: 0???? Replicas: 1,0,2???? Isr: 0,2,1 |
| 4 | ?????Topic: my-replicated-topic5???? Partition: 2???? Leader: 2???? Replicas: 2,1,0???? Isr: 2,0,1 |
| 5 | ?????Topic: my-replicated-topic5???? Partition: 3???? Leader: 0???? Replicas: 0,1,2???? Isr: 0,2,1 |
| 6 | ?????Topic: my-replicated-topic5???? Partition: 4???? Leader: 2???? Replicas: 1,2,0???? Isr: 2,0,1 |
上面Leader、Replicas、Isr的含義如下:
| 3 | Replicas : 復制該分區log的節點列表 |
| 4 | Isr????? : "in-sync" replicas,當前活躍的副本列表(是一個子集),并且可能成為Leader |
我們可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示如果發布消息、消費消息。
在一個終端,啟動Producer,并向我們上面創建的名稱為my-replicated-topic5的Topic中生產消息,執行如下腳本:
| 1 | bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5 |
在另一個終端,啟動Consumer,并訂閱我們上面創建的名稱為my-replicated-topic5的Topic中生產的消息,執行如下腳本:
| 1 | bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5 |
可以在Producer終端上輸入字符串消息行,然后回車,就可以在Consumer終端上看到消費者消費的消息內容。
也可以參考Kafka的Producer和Consumer的Java API,通過API編碼的方式來實現消息生產和消費的處理邏輯。
Storm安裝配置
Storm集群也依賴Zookeeper集群,要保證Zookeeper集群正常運行。Storm的安裝配置比較簡單,我們仍然使用下面3臺機器搭建:
首先,在h1節點上,執行如下命令安裝:
| 2 | wget?http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz |
| 3 | tar?xvzf apache-storm-0.9.2-incubating.tar.gz |
| 4 | ln?-s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm |
| 5 | chown?-R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm |
然后,修改配置文件conf/storm.yaml,內容如下所示:
| 01 | storm.zookeeper.servers: |
| 05 | storm.zookeeper.port: 2181 |
| 09 | supervisor.slots.ports: |
| 15 | storm.local.dir: "/tmp/storm" |
將配置好的安裝文件,分發到其他節點上:
| 1 | scp?-r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/ |
| 2 | scp?-r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/ |
最后,在h2、h3節點上配置,執行如下命令:
| 2 | ln?-s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm |
| 3 | chown?-R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm |
Storm集群的主節點為Nimbus,從節點為Supervisor,我們需要在h1上啟動Nimbus服務,在從節點h2、h3上啟動Supervisor服務:
為了方便監控,可以啟動Storm UI,可以從Web頁面上監控Storm Topology的運行狀態,例如在h2上啟動:
這樣可以通過訪問http://h2:8080/來查看Topology的運行狀況。
整合Kafka+Storm
消息通過各種方式進入到Kafka消息中間件,比如可以通過使用Flume來收集日志數據,然后在Kafka中路由暫存,然后再由實時計算程序Storm做實時分析,這時我們就需要將在Storm的Spout中讀取Kafka中的消息,然后交由具體的Spot組件去分析處理。實際上,apache-storm-0.9.2-incubating這個版本的Storm已經自帶了一個集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依賴配置,如下所示:
| 02 | ?????<groupId>org.apache.storm</groupId> |
| 03 | ?????<artifactId>storm-core</artifactId> |
| 04 | ?????<version>0.9.2-incubating</version> |
| 05 | ?????<scope>provided</scope> |
| 08 | ?????<groupId>org.apache.storm</groupId> |
| 09 | ?????<artifactId>storm-kafka</artifactId> |
| 10 | ?????<version>0.9.2-incubating</version> |
| 13 | ?????<groupId>org.apache.kafka</groupId> |
| 14 | ?????<artifactId>kafka_2.9.2</artifactId> |
| 15 | ?????<version>0.8.1.1</version> |
| 18 | ???????????????<groupId>org.apache.zookeeper</groupId> |
| 19 | ???????????????<artifactId>zookeeper</artifactId> |
| 22 | ???????????????<groupId>log4j</groupId> |
| 23 | ???????????????<artifactId>log4j</artifactId> |
下面,我們開發了一個簡單WordCount示例程序,從Kafka讀取訂閱的消息行,通過空格拆分出單個單詞,然后再做詞頻統計計算,實現的Topology的代碼,如下所示:
| 001 | package?org.shirdrn.storm.examples; |
| 003 | import?java.util.Arrays; |
| 004 | import?java.util.HashMap; |
| 005 | import?java.util.Iterator; |
| 007 | import?java.util.Map.Entry; |
| 008 | import?java.util.concurrent.atomic.AtomicInteger; |
| 010 | import?org.apache.commons.logging.Log; |
| 011 | import?org.apache.commons.logging.LogFactory; |
| 013 | import?storm.kafka.BrokerHosts; |
| 014 | import?storm.kafka.KafkaSpout; |
| 015 | import?storm.kafka.SpoutConfig; |
| 016 | import?storm.kafka.StringScheme; |
| 017 | import?storm.kafka.ZkHosts; |
| 018 | import?backtype.storm.Config; |
| 019 | import?backtype.storm.LocalCluster; |
| 020 | import?backtype.storm.StormSubmitter; |
| 021 | import?backtype.storm.generated.AlreadyAliveException; |
| 022 | import?backtype.storm.generated.InvalidTopologyException; |
| 023 | import?backtype.storm.spout.SchemeAsMultiScheme; |
| 024 | import?backtype.storm.task.OutputCollector; |
| 025 | import?backtype.storm.task.TopologyContext; |
| 026 | import?backtype.storm.topology.OutputFieldsDeclarer; |
| 027 | import?backtype.storm.topology.TopologyBuilder; |
| 028 | import?backtype.storm.topology.base.BaseRichBolt; |
| 029 | import?backtype.storm.tuple.Fields; |
| 030 | import?backtype.storm.tuple.Tuple; |
| 031 | import?backtype.storm.tuple.Values; |
| 033 | public?class?MyKafkaTopology { |
| 035 | ?????public?static?class?KafkaWordSplitter?extends?BaseRichBolt { |
| 037 | ??????????private?static?final?Log LOG = LogFactory.getLog(KafkaWordSplitter.class); |
| 038 | ??????????private?static?final?long?serialVersionUID = 886149197481637894L; |
| 039 | ??????????private?OutputCollector collector; |
| 042 | ??????????public?void?prepare(Map stormConf, TopologyContext context, |
| 043 | ????????????????????OutputCollector collector) { |
| 044 | ???????????????this.collector = collector;????????????? |
| 048 | ??????????public?void?execute(Tuple input) { |
| 049 | ???????????????String line = input.getString(0); |
| 050 | ???????????????LOG.info("RECV[kafka -> splitter] "?+ line); |
| 051 | ???????????????String[] words = line.split("\\s+"); |
| 052 | ???????????????for(String word : words) { |
| 053 | ????????????????????LOG.info("EMIT[splitter -> counter] "?+ word); |
| 054 | ????????????????????collector.emit(input,?new?Values(word,?1)); |
| 056 | ???????????????collector.ack(input); |
| 060 | ??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) { |
| 061 | ???????????????declarer.declare(new?Fields("word",?"count"));???????? |
| 066 | ?????public?static?class?WordCounter?extends?BaseRichBolt { |
| 068 | ??????????private?static?final?Log LOG = LogFactory.getLog(WordCounter.class); |
| 069 | ??????????private?static?final?long?serialVersionUID = 886149197481637894L; |
| 070 | ??????????private?OutputCollector collector; |
| 071 | ??????????private?Map<String, AtomicInteger> counterMap; |
| 074 | ??????????public?void?prepare(Map stormConf, TopologyContext context, |
| 075 | ????????????????????OutputCollector collector) { |
| 076 | ???????????????this.collector = collector;??? |
| 077 | ???????????????this.counterMap =?new?HashMap<String, AtomicInteger>(); |
| 081 | ??????????public?void?execute(Tuple input) { |
| 082 | ???????????????String word = input.getString(0); |
| 083 | ???????????????int?count = input.getInteger(1); |
| 084 | ???????????????LOG.info("RECV[splitter -> counter] "?+ word +?" : "?+ count); |
| 085 | ???????????????AtomicInteger ai =?this.counterMap.get(word); |
| 086 | ???????????????if(ai ==?null) { |
| 087 | ????????????????????ai =?new?AtomicInteger(); |
| 088 | ????????????????????this.counterMap.put(word, ai); |
| 090 | ???????????????ai.addAndGet(count); |
| 091 | ???????????????collector.ack(input); |
| 092 | ???????????????LOG.info("CHECK statistics map: "?+?this.counterMap); |
| 096 | ??????????public?void?cleanup() { |
| 097 | ???????????????LOG.info("The final result:"); |
| 098 | ???????????????Iterator<Entry<String, AtomicInteger>> iter =this.counterMap.entrySet().iterator(); |
| 099 | ???????????????while(iter.hasNext()) { |
| 100 | ????????????????????Entry<String, AtomicInteger> entry = iter.next(); |
| 101 | ????????????????????LOG.info(entry.getKey() +?"\t:\t"?+ entry.getValue().get()); |
| 107 | ??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) { |
| 108 | ???????????????declarer.declare(new?Fields("word",?"count"));???????? |
| 112 | ?????public?static?void?main(String[] args)?throws?AlreadyAliveException, InvalidTopologyException, InterruptedException { |
| 113 | ??????????String zks =?"h1:2181,h2:2181,h3:2181"; |
| 114 | ??????????String topic =?"my-replicated-topic5"; |
| 115 | ??????????String zkRoot =?"/storm";?// default zookeeper root configuration for storm |
| 116 | ??????????String id =?"word"; |
| 118 | ??????????BrokerHosts brokerHosts =?new?ZkHosts(zks); |
| 119 | ??????????SpoutConfig spoutConf =?new?SpoutConfig(brokerHosts, topic, zkRoot, id); |
| 120 | ??????????spoutConf.scheme =?new?SchemeAsMultiScheme(new?StringScheme()); |
| 121 | ??????????spoutConf.forceFromStart =?false; |
| 122 | ??????????spoutConf.zkServers = Arrays.asList(new?String[] {"h1",?"h2",?"h3"}); |
| 123 | ??????????spoutConf.zkPort =?2181; |
| 125 | ??????????TopologyBuilder builder =?new?TopologyBuilder(); |
| 126 | ??????????builder.setSpout("kafka-reader",?new?KafkaSpout(spoutConf),?5);?// Kafka我們創建了一個5分區的Topic,這里并行度設置為5 |
| 127 | ??????????builder.setBolt("word-splitter",?new?KafkaWordSplitter(),2).shuffleGrouping("kafka-reader"); |
| 128 | ??????????builder.setBolt("word-counter",?new?WordCounter()).fieldsGrouping("word-splitter",new?Fields("word")); |
| 130 | ??????????Config conf =?new?Config(); |
| 132 | ??????????String name = MyKafkaTopology.class.getSimpleName(); |
| 133 | ??????????if?(args !=?null?&& args.length >?0) { |
| 134 | ???????????????// Nimbus host name passed from command line |
| 135 | ???????????????conf.put(Config.NIMBUS_HOST, args[0]); |
| 136 | ???????????????conf.setNumWorkers(3); |
| 137 | ???????????????StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); |
| 139 | ???????????????conf.setMaxTaskParallelism(3); |
| 140 | ???????????????LocalCluster cluster =?new?LocalCluster(); |
| 141 | ???????????????cluster.submitTopology(name, conf, builder.createTopology()); |
| 142 | ???????????????Thread.sleep(60000); |
| 143 | ???????????????cluster.shutdown(); |
上面程序,在本地調試(使用LocalCluster)不需要輸入任何參數,提交到實際集群中運行時,需要傳遞一個參數,該參數為Nimbus的主機名稱。
通過Maven構建,生成一個包含依賴的single jar文件(不要把Storm的依賴包添加進去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因為用到了Kafka,需要拷貝一下依賴jar文件到Storm集群中的lib目錄下面:
| 1 | cp?/usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/ |
| 2 | cp?/usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/ |
| 3 | cp?/usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/ |
| 4 | cp?/usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/ |
| 5 | cp?/usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/ |
| 6 | cp?/usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/ |
| 7 | cp?/usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/ |
| 8 | cp?/usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/ |
然后,就可以提交我們開發的Topology程序了:
| 1 | bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1 |
可以通過查看日志文件(logs/目錄下)或者Storm UI來監控Topology的運行狀況。如果程序沒有錯誤,可以使用前面我們使用的Kafka Producer來生成消息,就能看到我們開發的Storm Topology能夠實時接收到并進行處理。
上面Topology實現代碼中,有一個很關鍵的配置對象SpoutConfig,配置屬性如下所示:
| 1 | spoutConf.forceFromStart =?false; |
該配置是指,如果該Topology因故障停止處理,下次正常運行時是否從Spout對應數據源Kafka中的該訂閱Topic的起始位置開始讀取,如果forceFromStart=true,則之前處理過的Tuple還要重新處理一遍,否則會從上次處理的位置繼續處理,保證Kafka中的Topic數據不被重復處理,是在數據源的位置進行狀態記錄。
整合Storm+HDFS
Storm實時計算集群從Kafka消息中間件中消費消息,有實時處理需求的可以走實時處理程序,還有需要進行離線分析的需求,如寫入到HDFS進行分析。下面實現了一個Topology,代碼如下所示:
| 001 | package?org.shirdrn.storm.examples; |
| 003 | import?java.text.DateFormat; |
| 004 | import?java.text.SimpleDateFormat; |
| 005 | import?java.util.Date; |
| 007 | import?java.util.Random; |
| 009 | import?org.apache.commons.logging.Log; |
| 010 | import?org.apache.commons.logging.LogFactory; |
| 011 | import?org.apache.storm.hdfs.bolt.HdfsBolt; |
| 012 | import?org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; |
| 013 | import?org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; |
| 014 | import?org.apache.storm.hdfs.bolt.format.FileNameFormat; |
| 015 | import?org.apache.storm.hdfs.bolt.format.RecordFormat; |
| 016 | import?org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; |
| 017 | import?org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; |
| 018 | import?org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; |
| 019 | import?org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; |
| 020 | import?org.apache.storm.hdfs.bolt.sync.SyncPolicy; |
| 022 | import?backtype.storm.Config; |
| 023 | import?backtype.storm.LocalCluster; |
| 024 | import?backtype.storm.StormSubmitter; |
| 025 | import?backtype.storm.generated.AlreadyAliveException; |
| 026 | import?backtype.storm.generated.InvalidTopologyException; |
| 027 | import?backtype.storm.spout.SpoutOutputCollector; |
| 028 | import?backtype.storm.task.TopologyContext; |
| 029 | import?backtype.storm.topology.OutputFieldsDeclarer; |
| 030 | import?backtype.storm.topology.TopologyBuilder; |
| 031 | import?backtype.storm.topology.base.BaseRichSpout; |
| 032 | import?backtype.storm.tuple.Fields; |
| 033 | import?backtype.storm.tuple.Values; |
| 034 | import?backtype.storm.utils.Utils; |
| 036 | public?class?StormToHDFSTopology { |
| 038 | ?????public?static?class?EventSpout?extends?BaseRichSpout { |
| 040 | ??????????private?static?final?Log LOG = LogFactory.getLog(EventSpout.class); |
| 041 | ??????????private?static?final?long?serialVersionUID = 886149197481637894L; |
| 042 | ??????????private?SpoutOutputCollector collector; |
| 043 | ??????????private?Random rand; |
| 044 | ??????????private?String[] records; |
| 047 | ??????????public?void?open(Map conf, TopologyContext context, |
| 048 | ????????????????????SpoutOutputCollector collector) { |
| 049 | ???????????????this.collector = collector;??? |
| 050 | ???????????????rand =?new?Random(); |
| 051 | ???????????????records =?new?String[] { |
| 052 | ?????????????????????????"10001???? ef2da82d4c8b49c44199655dc14f39f6???? 4.2.1???? HUAWEI G610-U00???? HUAWEI???? 2???? 70:72:3c:73:8b:22???? 2014-10-13 12:36:35", |
| 053 | ?????????????????????????"10001???? ffb52739a29348a67952e47c12da54ef???? 4.3???? GT-I9300???? samsung???? 2???? 50:CC:F8:E4:22:E2???? 2014-10-13 12:36:02", |
| 054 | ?????????????????????????"10001???? ef2da82d4c8b49c44199655dc14f39f6???? 4.2.1???? HUAWEI G610-U00???? HUAWEI???? 2???? 70:72:3c:73:8b:22???? 2014-10-13 12:36:35" |
| 060 | ??????????public?void?nextTuple() { |
| 061 | ???????????????Utils.sleep(1000); |
| 062 | ???????????????DateFormat df =?new?SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); |
| 063 | ???????????????Date d =?new?Date(System.currentTimeMillis()); |
| 064 | ???????????????String minute = df.format(d); |
| 065 | ???????????????String record = records[rand.nextInt(records.length)]; |
| 066 | ???????????????LOG.info("EMIT[spout -> hdfs] "?+ minute +?" : "?+ record); |
| 067 | ???????????????collector.emit(new?Values(minute, record)); |
| 071 | ??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) { |
| 072 | ???????????????declarer.declare(new?Fields("minute",?"record"));???????? |
| 078 | ?????public?static?void?main(String[] args)?throws?AlreadyAliveException, InvalidTopologyException, InterruptedException { |
| 079 | ??????????// use "|" instead of "," for field delimiter |
| 080 | ??????????RecordFormat format =?new?DelimitedRecordFormat() |
| 081 | ??????????????????.withFieldDelimiter(" : "); |
| 083 | ??????????// sync the filesystem after every 1k tuples |
| 084 | ??????????SyncPolicy syncPolicy =?new?CountSyncPolicy(1000); |
| 086 | ??????????// rotate files |
| 087 | ??????????FileRotationPolicy rotationPolicy =?new?TimedRotationPolicy(1.0f, TimeUnit.MINUTES); |
| 089 | ??????????FileNameFormat fileNameFormat =?new?DefaultFileNameFormat() |
| 090 | ??????????????????.withPath("/storm/").withPrefix("app_").withExtension(".log"); |
| 092 | ??????????HdfsBolt hdfsBolt =?new?HdfsBolt() |
| 093 | ??????????????????.withFsUrl("hdfs://h1:8020") |
| 094 | ??????????????????.withFileNameFormat(fileNameFormat) |
| 095 | ??????????????????.withRecordFormat(format) |
| 096 | ??????????????????.withRotationPolicy(rotationPolicy) |
| 097 | ??????????????????.withSyncPolicy(syncPolicy); |
| 099 | ??????????TopologyBuilder builder =?new?TopologyBuilder(); |
| 100 | ??????????builder.setSpout("event-spout",?new?EventSpout(),?3); |
| 101 | ??????????builder.setBolt("hdfs-bolt", hdfsBolt,?2).fieldsGrouping("event-spout",?newFields("minute")); |
| 103 | ??????????Config conf =?new?Config(); |
| 105 | ??????????String name = StormToHDFSTopology.class.getSimpleName(); |
| 106 | ??????????if?(args !=?null?&& args.length >?0) { |
| 107 | ???????????????conf.put(Config.NIMBUS_HOST, args[0]); |
| 108 | ???????????????conf.setNumWorkers(3); |
| 109 | ???????????????StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); |
| 111 | ???????????????conf.setMaxTaskParallelism(3); |
| 112 | ???????????????LocalCluster cluster =?new?LocalCluster(); |
| 113 | ???????????????cluster.submitTopology(name, conf, builder.createTopology()); |
| 114 | ???????????????Thread.sleep(60000); |
| 115 | ???????????????cluster.shutdown(); |
上面的處理邏輯,可以對HdfsBolt進行更加詳細的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以設置在滿足什么條件下,切出一個新的日志,如可以指定多長時間切出一個新的日志文件,可以指定一個日志文件大小達到設置值后,再寫一個新日志文件),更多設置可以參考storm-hdfs,。
上面代碼在打包的時候,需要注意,使用storm-starter自帶的Maven打包配置,可能在將Topology部署運行的時候,會報錯,可以使用maven-shade-plugin這個插件,如下配置所示:
| 02 | ????<groupId>org.apache.maven.plugins</groupId> |
| 03 | ????<artifactId>maven-shade-plugin</artifactId> |
| 04 | ????<version>1.4</version> |
| 06 | ????????<createDependencyReducedPom>true</createDependencyReducedPom> |
| 10 | ????????????<phase>package</phase> |
| 12 | ????????????????<goal>shade</goal> |
| 14 | ????????????<configuration> |
| 15 | ????????????????<transformers> |
| 16 | ????????????????????<transformer |
| 17 | ????????????????????????????implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> |
| 18 | ????????????????????<transformer |
| 19 | ????????????????????????????implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> |
| 20 | ????????????????????????<mainClass></mainClass> |
| 21 | ????????????????????</transformer> |
| 22 | ????????????????</transformers> |
| 23 | ????????????</configuration> |
整合Kafka+Storm+HDFS
上面分別對整合Kafka+Storm和Storm+HDFS做了實踐,可以將后者的Spout改成前者的Spout,從Kafka中消費消息,在Storm中可以做簡單處理,然后將數據寫入HDFS,最后可以在Hadoop平臺上對數據進行離線分析處理。下面,寫了一個簡單的例子,從Kafka消費消息,然后經由Storm處理,寫入到HDFS存儲,代碼如下所示:
| 001 | package?org.shirdrn.storm.examples; |
| 003 | import?java.util.Arrays; |
| 006 | import?org.apache.commons.logging.Log; |
| 007 | import?org.apache.commons.logging.LogFactory; |
| 008 | import?org.apache.storm.hdfs.bolt.HdfsBolt; |
| 009 | import?org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; |
| 010 | import?org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; |
| 011 | import?org.apache.storm.hdfs.bolt.format.FileNameFormat; |
| 012 | import?org.apache.storm.hdfs.bolt.format.RecordFormat; |
| 013 | import?org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; |
| 014 | import?org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; |
| 015 | import?org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; |
| 016 | import?org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; |
| 017 | import?org.apache.storm.hdfs.bolt.sync.SyncPolicy; |
| 019 | import?storm.kafka.BrokerHosts; |
| 020 | import?storm.kafka.KafkaSpout; |
| 021 | import?storm.kafka.SpoutConfig; |
| 022 | import?storm.kafka.StringScheme; |
| 023 | import?storm.kafka.ZkHosts; |
| 024 | import?backtype.storm.Config; |
| 025 | import?backtype.storm.LocalCluster; |
| 026 | import?backtype.storm.StormSubmitter; |
| 027 | import?backtype.storm.generated.AlreadyAliveException; |
| 028 | import?backtype.storm.generated.InvalidTopologyException; |
| 029 | import?backtype.storm.spout.SchemeAsMultiScheme; |
| 030 | import?backtype.storm.task.OutputCollector; |
| 031 | import?backtype.storm.task.TopologyContext; |
| 032 | import?backtype.storm.topology.OutputFieldsDeclarer; |
| 033 | import?backtype.storm.topology.TopologyBuilder; |
| 034 | import?backtype.storm.topology.base.BaseRichBolt; |
| 035 | import?backtype.storm.tuple.Fields; |
| 036 | import?backtype.storm.tuple.Tuple; |
| 037 | import?backtype.storm.tuple.Values; |
| 039 | public?class?DistributeWordTopology { |
| 041 | ?????public?static?class?KafkaWordToUpperCase?extends?BaseRichBolt { |
| 043 | ??????????private?static?final?Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class); |
| 044 | ??????????private?static?final?long?serialVersionUID = -5207232012035109026L; |
| 045 | ??????????private?OutputCollector collector; |
| 048 | ??????????public?void?prepare(Map stormConf, TopologyContext context, |
| 049 | ????????????????????OutputCollector collector) { |
| 050 | ???????????????this.collector = collector;????????????? |
| 054 | ??????????public?void?execute(Tuple input) { |
| 055 | ???????????????String line = input.getString(0).trim(); |
| 056 | ???????????????LOG.info("RECV[kafka -> splitter] "?+ line); |
| 057 | ???????????????if(!line.isEmpty()) { |
| 058 | ????????????????????String upperLine = line.toUpperCase(); |
| 059 | ????????????????????LOG.info("EMIT[splitter -> counter] "?+ upperLine); |
| 060 | ????????????????????collector.emit(input,?new?Values(upperLine, upperLine.length())); |
| 062 | ???????????????collector.ack(input); |
| 066 | ??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) { |
| 067 | ???????????????declarer.declare(new?Fields("line",?"len"));???????? |
| 072 | ?????public?static?class?RealtimeBolt?extends?BaseRichBolt { |
| 074 | ??????????private?static?final?Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class); |
| 075 | ??????????private?static?final?long?serialVersionUID = -4115132557403913367L; |
| 076 | ??????????private?OutputCollector collector; |
| 079 | ??????????public?void?prepare(Map stormConf, TopologyContext context, |
| 080 | ????????????????????OutputCollector collector) { |
| 081 | ???????????????this.collector = collector;????????????? |
| 085 | ??????????public?void?execute(Tuple input) { |
| 086 | ???????????????String line = input.getString(0).trim(); |
| 087 | ???????????????LOG.info("REALTIME: "?+ line); |
| 088 | ???????????????collector.ack(input); |
| 092 | ??????????public?void?declareOutputFields(OutputFieldsDeclarer declarer) { |
| 098 | ?????public?static?void?main(String[] args)?throws?AlreadyAliveException, InvalidTopologyException, InterruptedException { |
| 100 | ??????????// Configure Kafka |
| 101 | ??????????String zks =?"h1:2181,h2:2181,h3:2181"; |
| 102 | ??????????String topic =?"my-replicated-topic5"; |
| 103 | ??????????String zkRoot =?"/storm";?// default zookeeper root configuration for storm |
| 104 | ??????????String id =?"word"; |
| 105 | ??????????BrokerHosts brokerHosts =?new?ZkHosts(zks); |
| 106 | ??????????SpoutConfig spoutConf =?new?SpoutConfig(brokerHosts, topic, zkRoot, id); |
| 107 | ??????????spoutConf.scheme =?new?SchemeAsMultiScheme(new?StringScheme()); |
| 108 | ??????????spoutConf.forceFromStart =?false; |
| 109 | ??????????spoutConf.zkServers = Arrays.asList(new?String[] {"h1",?"h2",?"h3"}); |
| 110 | ??????????spoutConf.zkPort =?2181; |
| 112 | ??????????// Configure HDFS bolt |
| 113 | ??????????RecordFormat format =?new?DelimitedRecordFormat() |
| 114 | ??????????????????.withFieldDelimiter("\t");?// use "\t" instead of "," for field delimiter |
| 115 | ??????????SyncPolicy syncPolicy =?new?CountSyncPolicy(1000);?// sync the filesystem after every 1k tuples |
| 116 | ??????????FileRotationPolicy rotationPolicy =?new?TimedRotationPolicy(1.0f, TimeUnit.MINUTES);?// rotate files |
| 117 | ??????????FileNameFormat fileNameFormat =?new?DefaultFileNameFormat() |
| 118 | ??????????????????.withPath("/storm/").withPrefix("app_").withExtension(".log");?// set file name format |
| 119 | ??????????HdfsBolt hdfsBolt =?new?HdfsBolt() |
| 120 | ??????????????????.withFsUrl("hdfs://h1:8020") |
| 121 | ??????????????????.withFileNameFormat(fileNameFormat) |
| 122 | ??????????????????.withRecordFormat(format) |
| 123 | ??????????????????.withRotationPolicy(rotationPolicy) |
| 124 | ??????????????????.withSyncPolicy(syncPolicy); |
| 126 | ??????????// configure & build topology |
| 127 | ??????????TopologyBuilder builder =?new?TopologyBuilder(); |
| 128 | ??????????builder.setSpout("kafka-reader",?new?KafkaSpout(spoutConf),?5); |
| 129 | ??????????builder.setBolt("to-upper",?new?KafkaWordToUpperCase(),?3).shuffleGrouping("kafka-reader"); |
| 130 | ??????????builder.setBolt("hdfs-bolt", hdfsBolt,?2).shuffleGrouping("to-upper"); |
| 131 | ??????????builder.setBolt("realtime",?new?RealtimeBolt(),?2).shuffleGrouping("to-upper"); |
| 133 | ??????????// submit topology |
| 134 | ??????????Config conf =?new?Config(); |
| 135 | ??????????String name = DistributeWordTopology.class.getSimpleName(); |
| 136 | ??????????if?(args !=?null?&& args.length >?0) { |
| 137 | ???????????????String nimbus = args[0]; |
| 138 | ???????????????conf.put(Config.NIMBUS_HOST, nimbus); |
| 139 | ???????????????conf.setNumWorkers(3); |
| 140 | ???????????????StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); |
| 142 | ???????????????conf.setMaxTaskParallelism(3); |
| 143 | ???????????????LocalCluster cluster =?new?LocalCluster(); |
| 144 | ???????????????cluster.submitTopology(name, conf, builder.createTopology()); |
| 145 | ???????????????Thread.sleep(60000); |
| 146 | ???????????????cluster.shutdown(); |
上面代碼中,名稱為to-upper的Bolt將接收到的字符串行轉換成大寫以后,會將處理過的數據向后面的hdfs-bolt、realtime這兩個Bolt各發一份拷貝,然后由這兩個Bolt分別根據實際需要(實時/離線)單獨處理。
打包后,在Storm集群上部署并運行這個Topology:
| 1 | bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1 |
可以通過Storm UI查看Topology運行情況,可以查看HDFS上生成的數據。
轉載于:https://my.oschina.net/u/2603356/blog/687243
總結
以上是生活随笔為你收集整理的Kafka+Storm+HDFS整合实践的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。