2021年大数据Spark(十一):应用开发基于IDEA集成环境
目錄
Spark應(yīng)用開(kāi)發(fā)-基于IDEA
創(chuàng)建工程
WordCount本地運(yùn)行
WordCount集群運(yùn)行
注意
修改代碼如下
打成jar包
改名
上傳jar包
提交到Y(jié)arn
WordCount-Java8版[了解]
說(shuō)明:
WordCount流程圖解
WordCount,主要流程如下圖所示:
Spark應(yīng)用開(kāi)發(fā)-基于IDEA
實(shí)際開(kāi)發(fā)Spark 應(yīng)用程序使用IDEA集成開(kāi)發(fā)環(huán)境,Spark課程所有代碼均使用Scala語(yǔ)言開(kāi)發(fā),利用函數(shù)式編程分析處理數(shù)據(jù),更加清晰簡(jiǎn)潔。
企業(yè)中也使用Java語(yǔ)言開(kāi)發(fā)Spark程序,但較少,后續(xù)也可以給大家演示
?
創(chuàng)建工程
創(chuàng)建Maven Project工程
添加依賴(lài)至POM文件中,內(nèi)容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast</groupId><artifactId>spark_v8_bak</artifactId><version>1.0-SNAPSHOT</version><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>apache</id><url>https://repository.apache.org/content/repositories/snapshots/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><properties><encoding>UTF-8</encoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.8</scala.version><hadoop.version>2.7.5</hadoop.version><spark.version>2.4.5</spark.version></properties><dependencies><!--依賴(lài)Scala語(yǔ)言--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!--SparkCore依賴(lài)--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><!--SparkSQL依賴(lài)--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><!--SparkSQL+ Hive依賴(lài)--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_2.11</artifactId><version>${spark.version}</version></dependency><!-- spark-streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>${spark.version}</version></dependency><!--spark-streaming+Kafka依賴(lài)--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><!--StructuredStreaming+Kafka依賴(lài)--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.5</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><plugins><!-- 指定編譯java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin><!-- 指定編譯scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>
?
WordCount本地運(yùn)行
http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html
?
?
package cn.itcast.helloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//1.創(chuàng)建SparkContextval conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")//設(shè)置運(yùn)行參數(shù)val sc: SparkContext = new SparkContext(conf)//創(chuàng)建scsc.setLogLevel("WARN") //設(shè)置日志級(jí)別//2.讀取文本文件//RDD:A Resilient Distributed Dataset (RDD)//彈性分布式數(shù)據(jù)集,我們可以把它理解為一個(gè)分布式的集合//Spark對(duì)于Scala集合的封裝,使用起來(lái)更方便,就像操作起來(lái)就像本地集合一樣簡(jiǎn)單,那這樣程序員用起來(lái)就很happy//RDD[每一行數(shù)據(jù)]val fileRDD: RDD[String] = sc.textFile("data/input/words.txt")//3.處理數(shù)據(jù),每一行按" "切分,每個(gè)單詞記為1,按照單詞進(jìn)行聚合//3.1每一行按" "切分//RDD[單詞]val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_表示每一行//3.2每個(gè)單詞記為1//val unit: RDD[(String, Int)] = wordRDD.map(word=>(word,1))//(hello,1),(hello,1),(hello,1),(hello,1)val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每個(gè)單詞//3.3按照單詞進(jìn)行聚合//reduceByKey是Spark提供的API,Scala沒(méi)有,如果是Scala得先groupBy,再對(duì)Value進(jìn)行操作//reduceByKey即根據(jù)key進(jìn)行reduce(聚合)//_+_//第1個(gè)_表示之前聚合的歷史值//第2個(gè)_表示當(dāng)前這一次操作的值//RDD[(hello,4)]....val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)//4.將結(jié)果收集到本地,變?yōu)楸镜丶蟰al result: Array[(String, Int)] = resultRDD.collect()//5.打印//result.foreach(println)println(result.toBuffer)//array轉(zhuǎn)為buffer可以直接打印內(nèi)容//為了測(cè)試,線程休眠,查看WEB UI界面Thread.sleep(1000 * 120)//6.關(guān)閉sc.stop()}
}
?
WordCount集群運(yùn)行
注意
寫(xiě)入HDFS如果存在權(quán)限問(wèn)題:
進(jìn)行如下設(shè)置:
hadoop fs -chmod -R 777 ?/
并在代碼中添加:
System.setProperty("HADOOP_USER_NAME", "root")
?
修改代碼如下
將開(kāi)發(fā)測(cè)試完成的WordCount程序打成jar保存,使用【spark-submit】分別提交運(yùn)行在本地模式LocalMode和集群模式Standalone集群。先修改代碼,通過(guò)master設(shè)置運(yùn)行模式及傳遞處理數(shù)據(jù)路徑,代碼如下:
package cn.itcast.helloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//為了程序健壯性,判斷是否傳遞參數(shù)if(args.length != 2){println("Usage: SparkSubmit <input> <output>............")System.exit(1)//非0表示非正常退出程序}//1.創(chuàng)建SparkContextval conf: SparkConf = new SparkConf().setAppName("wc")//.setMaster("local[*]")//設(shè)置運(yùn)行參數(shù)val sc: SparkContext = new SparkContext(conf)//創(chuàng)建scsc.setLogLevel("WARN") //設(shè)置日志級(jí)別//2.讀取文本文件//RDD:A Resilient Distributed Dataset (RDD)//彈性分布式數(shù)據(jù)集,我們可以把它理解為一個(gè)分布式的集合//Spark對(duì)于Scala集合的封裝,使用起來(lái)更方便,就像操作起來(lái)就像本地集合一樣簡(jiǎn)單,那這樣程序員用起來(lái)就很happy//RDD[每一行數(shù)據(jù)]val fileRDD: RDD[String] = sc.textFile(args(0))//3.處理數(shù)據(jù),每一行按" "切分,每個(gè)單詞記為1,按照單詞進(jìn)行聚合//3.1每一行按" "切分//RDD[單詞]val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_表示每一行//3.2每個(gè)單詞記為1//val unit: RDD[(String, Int)] = wordRDD.map(word=>(word,1))//(hello,1),(hello,1),(hello,1),(hello,1)val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每個(gè)單詞//3.3按照單詞進(jìn)行聚合//reduceByKey是Spark提供的API,Scala沒(méi)有,如果是Scala得先groupBy,再對(duì)Value進(jìn)行操作//reduceByKey即根據(jù)key進(jìn)行reduce(聚合)//_+_//第1個(gè)_表示之前聚合的歷史值//第2個(gè)_表示當(dāng)前這一次操作的值//RDD[(hello,4)]....val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)//4.將結(jié)果收集到本地,變?yōu)楸镜丶?/val result: Array[(String, Int)] = resultRDD.collect()//5.輸出//result.foreach(println)//println(result.toBuffer)//array轉(zhuǎn)為buffer可以直接打印內(nèi)容resultRDD.saveAsTextFile(s"${args(1)}-${System.currentTimeMillis()}")//文件輸出路徑//為了測(cè)試,線程休眠,查看WEB UI界面Thread.sleep(1000 * 120)//6.關(guān)閉sc.stop()}
}
?
?
打成jar包
?
改名
?
?
?
上傳jar包
上傳至HDFS文件系統(tǒng)目錄【/spark/apps/】下,方便在其他機(jī)器提交任務(wù)時(shí)也可以讀取。
創(chuàng)建HDFS目錄
hdfs dfs -mkdir -p /spark/apps/
上傳jar包
hdfs dfs -put /root/wc.jar /spark/apps/
?
提交到Y(jié)arn
SPARK_HOME=/export/server/spark${SPARK_HOME}/bin/spark-submit \--master yarn?\--deploy-mode cluster \--driver-memory 512m \--executor-memory 512m \--num-executors 1 \--total-executor-cores 2 \--class cn.itcast.hello.WordCount?\hdfs://node1:8020/spark/apps/wc.jar \hdfs://node1:8020/wordcount/input/words.txt hdfs://node1:8020/wordcount/output
http://node1:8088/cluster
?
???????WordCount-Java8版[了解]
說(shuō)明:
Scala中函數(shù)的本質(zhì)是對(duì)象
Java8中函數(shù)的本質(zhì)可以理解為匿名內(nèi)部類(lèi)對(duì)象,即Java8中的函數(shù)本質(zhì)也是對(duì)象
Java8中的函數(shù)式編程的語(yǔ)法,lambda表達(dá)式
(參數(shù))->{函數(shù)體}
書(shū)寫(xiě)原則:能省則省,不能省則加上
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.List;public class WordCountJava8 {public static void main(String[] args){//1.創(chuàng)建scSparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);jsc.setLogLevel("WARN");//2.讀取文件JavaRDD<String> fileRDD?= jsc.textFile("data/input/words.txt");//3.處理數(shù)據(jù)//3.1每一行按照" "切割//java8中的函數(shù)格式: (參數(shù)列表)->{函數(shù)體;} ?注意:原則也是能省則省//public interface FlatMapFunction<T, R> extends Serializable {// ?Iterator<R> call(T t) throws Exception;//}//通過(guò)查看源碼,我們發(fā)現(xiàn),flatMap中需要的函數(shù)的參數(shù)是T(就是String)//返回值是Iterator//所以我們?cè)诤瘮?shù)體里面要返回IteratorJavaRDD<String> wordRDD?= fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());//3.2每個(gè)單詞記為1 (word,1)//public interface PairFunction<T, K, V> extends Serializable {// ?Tuple2<K, V> call(T t) throws Exception;//}JavaPairRDD<String, Integer> wordAndOneRDD?= wordRDD.mapToPair(word -> new Tuple2<>(word, 1));//3.3按照key進(jìn)行聚合//public interface Function2<T1, T2, R> extends Serializable {// ?R call(T1 v1, T2 v2) throws Exception;//}JavaPairRDD<String, Integer> wrodAndCountRDD?= wordAndOneRDD.reduceByKey((a, b) -> a + b);//4.收集結(jié)果并輸出List<Tuple2<String, Integer>> result = wrodAndCountRDD.collect();//result.forEach(t->System.out.println(t));result.forEach(System.out::println);//函數(shù)式編程的思想:行為參數(shù)化,你要干嘛,把要做的事情當(dāng)作參數(shù)進(jìn)行傳遞就可以了//5.關(guān)閉jsc.stop();}
}
?
???????WordCount流程圖解
WordCount,主要流程如下圖所示:
?
?
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(十一):应用开发基于IDEA集成环境的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2021年大数据Spark(九):Spa
- 下一篇: 2021年大数据Spark(十二):Sp