日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计

發布時間:2024/9/27 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.啟動Spark Shell

spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶可以在該命令行下用scala編寫spark程序。要注意的是要啟動Spark-Shell需要先啟動Spark-ha集群,Spark集群安裝和部署參考:http://blog.csdn.net/tototuzuoquan/article/details/74481570

1.2.1、啟動spark shell

啟動方式一:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# pwd /home/tuzq/software/spark-2.1.1-bin-hadoop2.7 [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077

通過使用–master指定master的地址,連接的是啟動著的那個master

同樣,還可以指定執行的內存數和總的核心數

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 2g --total-executor-cores 2

參數說明:
–master spark://hadoop:7077 指定Master的地址
–executor-memory 2g 指定每個worker可用內存為2G
–total-executor-cores 2 指定整個集群使用的cup核數為2個

注意:
如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程序,其實是啟動了spark的local模式,該模式僅在本機啟動一個進程,沒有與集群建立聯系。

Spark Shell中已經默認將SparkContext類初始化為對象sc。用戶代碼如果需要用到,則直接應用sc即可

1.2.2、在spark shell中編寫WordCount程序

1.首先啟動hdfs
2.向hdfs上傳一個文件到hdfs(hdfs://mycluster/wordcount/input/2.txt)
效果圖下:

如果通過帶有協議的方式訪問hadoop集群上的文件可以通過下面的方式:

[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/ Found 2 items drwx-wx-wx - root supergroup 0 2017-07-06 11:11 hdfs://mycluster/tmp drwxr-xr-x - root supergroup 0 2017-07-06 11:16 hdfs://mycluster/wordcount [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input Found 9 items -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/1.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/3.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/4.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/5.txt -rw-r--r-- 3 root supergroup 27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/a.txt -rw-r--r-- 3 root supergroup 27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/aaa.txt -rw-r--r-- 3 root supergroup 27787264 2017-07-06 11:16 hdfs://mycluster/wordcount/input/b.txt -rw-r--r-- 3 root supergroup 26738688 2017-07-06 11:16 hdfs://mycluster/wordcount/input/c.txt [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input/2.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt [root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/input/2.txt Collecting and analysis base data for big data analysis;Maintenance Hadoop platform Development Hadoop framework Cooperate with data scientist, verify and implement data models to realize automatic and accurate fraud detection, in order to improve the risk management level of E-commerce/payment platforms Analyze information acquired and compare solutions and weight them against the actual needs, provide root cause analysis affecting key business problems Play an active role in company's anti-fraud platform strategy Support related data analysis work, and provide valuable business reports[root@hadoop2 hadoop-2.8.0]#

3.在spark shell中用scala語言編寫spark程序

scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://mycluster/wordcount/output")

1.使用hdfs命令查看結果

[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/output Found 3 items -rw-r--r-- 3 root supergroup 0 2017-07-06 11:48 hdfs://mycluster/wordcount/output/_SUCCESS -rw-r--r-- 3 root supergroup 400 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00000 -rw-r--r-- 3 root supergroup 346 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00001 [root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/output/part-00000 (role,1) (Play,1) (fraud,1) (level,1) (business,2) (improve,1) (platforms,1) (order,1) (big,1) (with,1) (scientist,,1) (active,1) (valuable,1) (data,5) (information,1) (Cooperate,1) (Collecting,1) (framework,1) (E-commerce/payment,1) (acquired,1) (root,1) (accurate,1) (solutions,1) (analysis;Maintenance,1) (problems,1) (them,1) (Analyze,1) (models,1) (analysis,3) (realize,1) (actual,1) (weight,1) [root@hadoop2 hadoop-2.8.0]#

說明:
sc是SparkContext對象,該對象是提交spark程序的入口
sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)是從hdfs中讀取數據
flatMap(_.split(” “))先map在壓平
map((_,1))將單詞和1構成元組
reduceByKey(+)按照key進行reduce,并將value累加
saveAsTextFile(“hdfs://mycluster/wordcount/output”)將結果寫入到hdfs中

將wordCound的結果排序,并顯示的代碼:

scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect res2: Array[(String, Int)] = Array((and,6), (data,5), (analysis,3), (business,2), (to,2), (platform,2), (in,2), (provide,2), (the,2), (Hadoop,2), (compare,1), (risk,1), (anti-fraud,1), (key,1), (related,1), (base,1), (Support,1), (against,1), (automatic,1), (company's,1), (needs,,1), (implement,1), (affecting,1), (strategy,1), (of,1), (reports,1), (management,1), (detection,,1), (for,1), (work,,1), (cause,1), (an,1), (verify,1), (Development,1), (role,1), (Play,1), (fraud,1), (level,1), (improve,1), (platforms,1), (order,1), (big,1), (with,1), (scientist,,1), (active,1), (valuable,1), (information,1), (Cooperate,1), (Collecting,1), (framework,1), (E-commerce/payment,1), (acquired,1), (root,1), (accurate,1), (solutions,1), (analysis;Maintenance,1), (problems,1), (them,1), (Analyze,1), (m... scala>

2、idea中創建spark的maven工程

spark shell僅在測試和驗證我們的程序時使用的較多,在生產環境中,通常會在IDE中編制程序,然后打成jar包,然后提交到集群,最常用的是創建一個Maven項目,利用Maven來管理jar包的依賴。

創建Maven工程:




要注意的是,在創建好項目之后,一定要重新制定好Maven倉庫所在的位置,不然可能會導致重新下載jar包:

創建好maven項目后,點擊Enable Auto-Import

配置Maven的pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId><artifactId>wordCount</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><scala.compat.version>2.10</scala.compat.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>1.5.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.5.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.WordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build> </project>

將src/main/java和src/test/java分別修改成src/main/scala和src/test/scala(或者創建scala的Directory),與pom.xml中的配置保持一致

或者通過如下方式:


新建一個scala class,類型為Object

編寫spark程序代碼:

package cn.toto.sparkimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/6.*/ object WordCount {def main(args: Array[String]): Unit = {//創建sparkconfval conf=new SparkConf().setAppName("WordCount")//創建sparkcontextval sc=new SparkContext(conf)//讀取hdfs中的數據val line:RDD[String]=sc.textFile(args(0))//切分單詞val words:RDD[String]=line.flatMap(_.split(" "))//將單詞計算val wordAndOne:RDD[(String,Int)]=words.map((_,1))//分組聚合val result:RDD[(String,Int)]=wordAndOne.reduceByKey((x,y)=>x+y)//排序val finalResult:RDD[(String,Int)]=result.sortBy(_._2,false)//將數據存到HDFS中finalResult.saveAsTextFile(args(1))//釋放資源sc.stop()} }

打包:

進入工程的target目錄下面,獲取jar包

或者直接在IDEA的工程目錄下找到:

將wordCount-1.0-SNAPSHOT.jar上傳到/home/tuzq/software/sparkdata下

使用spark的jar來做單詞統計
要注意的是最后的輸出路徑要不存在,并且運行下面的程序的時候,最好是把spark-shell給關閉了。否則可能會報錯。

bin/spark-submit --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 512m --total-executor-cores 6 --class cn.toto.spark.WordCount /home/tuzq/software/sparkdata/wordCount-1.0-SNAPSHOT.jar hdfs://mycluster/wordcount/input hdfs://mycluster/wordcount/out0001

運行時的狀態:

查看hdfs上的結果:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -ls hdfs://mycluster/wordcount/out0002 Found 10 items -rw-r--r-- 3 root supergroup 0 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/_SUCCESS -rw-r--r-- 3 root supergroup 191 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00000 -rw-r--r-- 3 root supergroup 671 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00001 -rw-r--r-- 3 root supergroup 245 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00002 -rw-r--r-- 3 root supergroup 31 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00003 -rw-r--r-- 3 root supergroup 1096 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00004 -rw-r--r-- 3 root supergroup 11 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00005 -rw-r--r-- 3 root supergroup 936 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00006 -rw-r--r-- 3 root supergroup 588 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00007 -rw-r--r-- 3 root supergroup 609 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00008

查看其中的任何一個:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -cat hdfs://mycluster/wordcount/out0002/part-00000 (and,770752) (is,659375) (I,505440) (a,468642) (to,431857) (in,421230) (the,331176) (of,272080) (FDS,218862) (for,213029) (The,196569) (true,196567) (but,196566) (on,193650) (without,193649) [root@hadoop1 spark-2.1.1-bin-hadoop2.7]#

總結

以上是生活随笔為你收集整理的启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。