日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【转】storm 开发系列一 第一个程序

發布時間:2025/5/22 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【转】storm 开发系列一 第一个程序 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文:?http://blog.csdn.net/csfreebird/article/details/49104777

-------------------------------------------------------------------------------------------------

本文將在本地開發環境創建一個storm程序,力求簡單。

首先用mvn創建一個簡單的工程hello_storm

?

[plain]?view plaincopy print?
  • mvn?archetype:generate?-DgroupId=org.csfreebird?-DartifactId=hello_storm?-DarchetypeArtifactId=maven-archetype-quickstart?-DinteractiveMode=false??

  • 編輯pom.xml,添加dependency

    ?

    ?

    [html]?view plaincopy print?
  • <project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"??
  • ??xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/maven-v4_0_0.xsd">??
  • ??<modelVersion>4.0.0</modelVersion>??
  • ??<groupId>org.csfreebird</groupId>??
  • ??<artifactId>hello_storm</artifactId>??
  • ??<version>0.9.5</version>??
  • ??<packaging>jar</packaging>??
  • ??<name>hello_storm</name>??
  • ??<url>http://maven.apache.org</url>??
  • ??<dependencies>??
  • ????<dependency>??
  • ??????<groupId>org.apache.storm</groupId>??
  • ??????<artifactId>storm-core</artifactId>??
  • ??????<version>${project.version}</version>??
  • ??????<!--?keep?storm?out?of?the?jar-with-dependencies?-->??
  • ??????<scope>provided</scope>??
  • ????</dependency>??
  • ??</dependencies>??
  • </project>??

  • provided 表示storm-core的jar包只作為編譯和測試時使用,在集群環境下運行時完全依賴集群環境的storm-core的jar包。

    ?

    ?

    然后重命名App.Java為HelloTopology.java文件,開始編碼。模仿之前的Example, 這里將所有的spout/bolt類都作為靜態類定義,就放在HelloTopology.java文件。

    功能如下

    ?

    編寫HelloTopology.java代碼,spout代碼來自于TestWordSpout,去掉了log的代碼,改變了_引導的成員變量命名方法

    ?

    [plain]?view plaincopy print?
  • package?org.csfreebird;??
  • ??
  • import?backtype.storm.Config;??
  • import?backtype.storm.LocalCluster;??
  • import?backtype.storm.StormSubmitter;??
  • import?backtype.storm.task.OutputCollector;??
  • import?backtype.storm.task.TopologyContext;??
  • import?backtype.storm.testing.TestWordSpout;??
  • import?backtype.storm.topology.OutputFieldsDeclarer;??
  • import?backtype.storm.topology.TopologyBuilder;??
  • import?backtype.storm.topology.base.BaseRichBolt;??
  • import?backtype.storm.topology.base.BaseRichSpout;??
  • import?backtype.storm.tuple.Fields;??
  • import?backtype.storm.tuple.Tuple;??
  • import?backtype.storm.tuple.Values;??
  • import?backtype.storm.utils.Utils;??
  • import?backtype.storm.spout.SpoutOutputCollector;??
  • import?java.util.Map;??
  • import?java.util.TreeMap;??
  • import?java.util.Random;??
  • ??
  • public?class?HelloTopology?{??
  • ??????
  • ????public?static?class?HelloSpout?extends?BaseRichSpout?{??
  • ??????
  • ????boolean?isDistributed;??
  • ????SpoutOutputCollector?collector;??
  • ??
  • ????public?HelloSpout()?{??
  • ????????this(true);??
  • ????}??
  • ??
  • ????public?HelloSpout(boolean?isDistributed)?{??
  • ????????this.isDistributed?=?isDistributed;??
  • ????}??
  • ??????????
  • ????public?void?open(Map?conf,?TopologyContext?context,?SpoutOutputCollector?collector)?{??
  • ????????this.collector?=?collector;??
  • ????}??
  • ??????
  • ????public?void?close()?{??
  • ????}??
  • ??????????
  • ????public?void?nextTuple()?{??
  • ????????Utils.sleep(100);??
  • ????????final?String[]?words?=?new?String[]?{"china",?"usa",?"japan",?"russia",?"england"};??
  • ????????final?Random?rand?=?new?Random();??
  • ????????final?String?word?=?words[rand.nextInt(words.length)];??
  • ????????this.collector.emit(new?Values(word));??
  • ????}??
  • ??????
  • ????public?void?ack(Object?msgId)?{??
  • ????}??
  • ??
  • ????public?void?fail(Object?msgId)?{??
  • ????}??
  • ??????
  • ????public?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{??
  • ????????declarer.declare(new?Fields("word"));??
  • ????}??
  • ??
  • ????@Override??
  • ????public?Map<String,?Object>?getComponentConfiguration()?{??
  • ????????if(!this.isDistributed)?{??
  • ????????Map<String,?Object>?ret?=?new?TreeMap<String,?Object>();??
  • ????????ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,?1);??
  • ????????return?ret;??
  • ????????}?else?{??
  • ????????return?null;??
  • ????????}??
  • ????}??
  • ????}??
  • ??
  • ????public?static?class?HelloBolt?extends?BaseRichBolt?{??
  • ????OutputCollector?collector;??
  • ??
  • ????@Override??
  • ????public?void?prepare(Map?conf,?TopologyContext?context,?OutputCollector?collector)?{??
  • ????????this.collector?=?collector;??
  • ????}??
  • ??
  • ????@Override??
  • ????public?void?execute(Tuple?tuple)?{??
  • ????????this.collector.emit(tuple,?new?Values("hello,"?+?tuple.getString(0)));??
  • ????????this.collector.ack(tuple);??
  • ????}??
  • ??
  • ????@Override??
  • ????public?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{??
  • ????????declarer.declare(new?Fields("word"));??
  • ????}??
  • ????}??
  • ??????
  • ????public?static?void?main(String[]?args)?throws?Exception?{??
  • ??????
  • ????TopologyBuilder?builder?=?new?TopologyBuilder();??
  • ????builder.setSpout("a",?new?HelloSpout(),?10);??
  • ????builder.setBolt("b",?new?HelloBolt(),?5).shuffleGrouping("a");??
  • ??
  • ????Config?conf?=?new?Config();??
  • ????conf.setDebug(true);??
  • ??
  • ????if?(args?!=?null?&&?args.length?>?0)?{??
  • ????????conf.setNumWorkers(3);??
  • ????????StormSubmitter.submitTopologyWithProgressBar(args[0],?conf,?builder.createTopology());??
  • ????}?else?{??
  • ????????String?test_id?=?"hello_test";??
  • ????????LocalCluster?cluster?=?new?LocalCluster();??
  • ????????cluster.submitTopology(test_id,?conf,?builder.createTopology());??
  • ????????Utils.sleep(10000);??
  • ????????cluster.killTopology(test_id);??
  • ????????cluster.shutdown();??
  • ????}??
  • ????}?????
  • }??

  • 編譯成功

    ?

    ?

    [plain]?view plaincopy print?
  • mvn?clean?compile??

  • 為了能夠在本地模式運行,需要在pom.xml中添加如下:

    ?

    ?

    [html]?view plaincopy print?
  • <build>??
  • ??<plugins>??
  • ????<plugin>??
  • ??????<groupId>org.codehaus.mojo</groupId>??
  • ??????<artifactId>exec-maven-plugin</artifactId>??
  • ??????<version>1.2.1</version>??
  • ??????<executions>??
  • ????????<execution>??
  • ??????????<goals>??
  • ????????????<goal>exec</goal>??
  • ??????????</goals>??
  • ????????</execution>??
  • ??????</executions>??
  • ??????<configuration>??
  • ????????<executable>java</executable>??
  • ????????<includeProjectDependencies>true</includeProjectDependencies>??
  • ????????<includePluginDependencies>false</includePluginDependencies>??
  • ????????<classpathScope>compile</classpathScope>??
  • ????????<mainClass>${storm.topology}</mainClass>??
  • ??????</configuration>??
  • ????</plugin>??
  • ??</plugins>??
  • </build>??

  • 然后運行命令

    ?

    ?

    [plain]?view plaincopy print?
  • mvn?compile?exec:java?-Dstorm.topology=org.csfreebird.HelloTopology ? ?
  • 總結

    以上是生活随笔為你收集整理的【转】storm 开发系列一 第一个程序的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。