日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java spark wordcount_提交任务到spark(以wordcount为例)

發布時間:2025/3/12 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java spark wordcount_提交任务到spark(以wordcount为例) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、首先需要搭建好hadoop+spark環境,并保證服務正常。本文以wordcount為例。

2、創建源文件,即輸入源。hello.txt文件,內容如下:

tom jerry

henry jim

suse lusy

注:以空格為分隔符

3、然后執行如下命令:

hadoop fs -mkdir -p /Hadoop/Input(在HDFS創建目錄)

hadoop fs -put hello.txt /Hadoop/Input(將hello.txt文件上傳到HDFS)

hadoop fs -ls /Hadoop/Input?(查看上傳的文件)

hadoop fs -text /Hadoop/Input/hello.txt?(查看文件內容)

4、用spark-shell先測試一下wordcount任務。

(1)啟動spark-shell,當然需要在spark的bin目錄下執行,但是這里我配置了環境變量。

(2)然后直接輸入scala語句:

val file=sc.textFile("hdfs://hacluster/Hadoop/Input/hello.txt")

val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

rdd.collect()

rdd.foreach(println)

ok,測試通過。

5、Scala實現單詞計數

1 packagecom.example.spark2

3 /**4 * User: hadoop

5 * Date: 2017/8/17 0010

6 * Time: 10:20

7*/

8 importorg.apache.spark.SparkConf9 importorg.apache.spark.SparkContext10 importorg.apache.spark.SparkContext._11

12 /**13 * 統計字符出現次數

14*/

15object ScalaWordCount {16def main(args: Array[String]) {17 if (args.length < 1) {18 System.err.println("Usage: ")19 System.exit(1)20}21

22 val conf = newSparkConf()23 val sc = newSparkContext(conf)24 val line = sc.textFile(args(0))25

26 line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)27

28sc.stop()29}30 }

6、用java實現wordcount

packagecom.example.spark;importjava.util.Arrays;importjava.util.List;importjava.util.regex.Pattern;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.FlatMapFunction;importorg.apache.spark.api.java.function.Function2;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;public final classWordCount {private static final Pattern SPACE = Pattern.compile(" ");public static void main(String[] args) throwsException {if (args.length < 1) {

System.err.println("Usage: JavaWordCount ");

System.exit(1);

}

SparkConf conf= new SparkConf().setAppName("JavaWordCount");

JavaSparkContext sc= newJavaSparkContext(conf);

JavaRDD lines = sc.textFile(args[0],1);

JavaRDD words = lines.flatMap(new FlatMapFunction() {private static final long serialVersionUID = 1L;

@Overridepublic Iterablecall(String s) {returnArrays.asList(SPACE.split(s));

}

});

JavaPairRDD ones = words.mapToPair(new PairFunction() {private static final long serialVersionUID = 1L;

@Overridepublic Tuple2call(String s) {return new Tuple2(s, 1);

}

});

JavaPairRDD counts = ones.reduceByKey(new Function2() {private static final long serialVersionUID = 1L;

@OverridepublicInteger call(Integer i1, Integer i2) {return i1 +i2;

}

});

List> output =counts.collect();for (Tuple2, ?>tuple : output) {

System.out.println(tuple._1()+ ": " +tuple._2());

}

sc.stop();

}

}

7、IDEA打包。

(1)File ---> Project Structure

點擊ok,配置完成后,在菜單欄中選擇Build->Build Artifacts...,然后使用Build等命令打包。打包完成后會在狀態欄中顯示“Compilation completed?successfully...”的信息,去jar包輸出路徑下查看jar包,如下所示。

將這個wordcount.jar上傳到集群的節點上,scp wordcount.jar root@10.57.22.244:/opt/ ? 輸入虛擬機root密碼。

8、運行jar包。

本文以spark on yarn模式運行jar包。

執行命令運行javawordcount:spark-submit --master yarn-client --class com.example.spark.WordCount --executor-memory 1G --total-executor-cores 2 /opt/wordcount.jar hdfs://hacluster/aa/hello.txt

執行命令運行scalawordcount:spark-submit --master yarn-client --class com.example.spark.ScalaWordCount --executor-memory 1G --total-executor-cores 2 /opt/wordcount.jar hdfs://hacluster/aa/hello.txt

本文以java的wordcount為演示對象,如下圖:

以上是直接以spark-submit方式提交任務,下面介紹一種以java web的方式提交。

9、以Java Web的方式提交任務到spark。

用spring boot搭建java web框架,實現代碼如下:

1)新建maven項目spark-submit

2)pom.xml文件內容,這里要注意spark的依賴jar包要與scala的版本相對應,如spark-core_2.11,這后面2.11就是你安裝的scala的版本

4.0.0

org.springframework.boot

spring-boot-starter-parent

1.4.1.RELEASE

wordcount

spark-submit

1.0-SNAPSHOT

com.example.spark.SparkSubmitApplication

UTF-8

1.8

3.4

2.1.0

org.apache.commons

commons-lang3

${commons.version}

org.apache.tomcat.embed

tomcat-embed-jasper

provided

org.springframework.boot

spring-boot-starter-data-jpa

org.springframework.boot

spring-boot-starter-data-redis

org.springframework.boot

spring-boot-starter-test

test

com.jayway.jsonpath

json-path

org.springframework.boot

spring-boot-starter-web

spring-boot-starter-tomcat

org.springframework.boot

org.springframework.boot

spring-boot-starter-jetty

org.eclipse.jetty.websocket

*

org.springframework.boot

spring-boot-starter-jetty

provided

javax.servlet

jstl

org.eclipse.jetty

apache-jsp

provided

org.springframework.boot

spring-boot-starter-data-solr

org.springframework.boot

spring-boot-starter-data-jpa

org.springframework.boot

spring-boot-starter-web

javax.servlet

jstl

org.apache.spark

spark-core_2.11

${org.apache.spark-version}

org.apache.spark

spark-sql_2.11

${org.apache.spark-version}

org.apache.spark

spark-hive_2.11

${org.apache.spark-version}

org.apache.spark

spark-streaming_2.11

${org.apache.spark-version}

org.apache.hadoop

hadoop-client

2.7.3

org.apache.spark

spark-streaming-kafka_2.11

1.6.3

org.apache.spark

spark-graphx_2.11

${org.apache.spark-version}

org.apache.maven.plugins

maven-assembly-plugin

3.0.0

com.fasterxml.jackson.core

jackson-core

2.6.5

com.fasterxml.jackson.core

jackson-databind

2.6.5

com.fasterxml.jackson.core

jackson-annotations

2.6.5

war

spring-snapshots

Spring Snapshots

https://repo.spring.io/snapshot

true

spring-milestones

Spring Milestones

https://repo.spring.io/milestone

false

maven2

http://repo1.maven.org/maven2/

spring-snapshots

Spring Snapshots

https://repo.spring.io/snapshot

true

spring-milestones

Spring Milestones

https://repo.spring.io/milestone

false

maven-war-plugin

src/main/webapp

org.mortbay.jetty

jetty-maven-plugin

spring.profiles.active

development

org.eclipse.jetty.server.Request.maxFormContentSize

600000

true

/

7080

(3)SubmitJobToSpark.java

packagecom.example.spark;importorg.apache.spark.deploy.SparkSubmit;/***@authorkevin

**/

public classSubmitJobToSpark {public static voidsubmitJob() {

String[] args= new String[] { "--master", "yarn-client", "--name", "test java submit job to spark", "--class", "com.example.spark.WordCount", "/opt/wordcount.jar", "hdfs://hacluster/aa/hello.txt"};

SparkSubmit.main(args);

}

}

(4)SparkController.java

packagecom.example.spark.web.controller;importjavax.servlet.http.HttpServletRequest;importjavax.servlet.http.HttpServletResponse;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.stereotype.Controller;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestMethod;importorg.springframework.web.bind.annotation.ResponseBody;importcom.example.spark.SubmitJobToSpark;

@Controller

@RequestMapping("spark")public classSparkController {private Logger logger = LoggerFactory.getLogger(SparkController.class);

@RequestMapping(value= "sparkSubmit", method ={ RequestMethod.GET, RequestMethod.POST })

@ResponseBodypublicString sparkSubmit(HttpServletRequest request, HttpServletResponse response) {

logger.info("start submit spark tast...");

SubmitJobToSpark.submitJob();return "hello";

}

}

5)將項目spark-submit打成war包部署到Master節點tomcat上,訪問如下請求:

http://10.57.22.244:9090/spark/sparkSubmit

在tomcat的log中能看到計算的結果。

總結

以上是生活随笔為你收集整理的java spark wordcount_提交任务到spark(以wordcount为例)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。