Scala 开发 Spark 程序
看spark和scala版本
運行spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
?
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
得到spark版本1.6.0,scala版本2.10.5
?
下載安裝特定版本的scala
https://www.scala-sbt.org/download.html
?
?
下載特定版本的spark包
https://archive.apache.org/dist/spark/
spark-1.6.0.tgz
其中examples是示例代碼
?
開發
idea的scala插件安裝參見Scala 寫第一個程序HelloWorld:https://blog.csdn.net/whq12789/article/details/89453424。
idea新建Project,選擇Scala——sbt,輸入名稱目錄,注意版本選擇與spark的scala相同版本。
build.sbt添加依賴,此處版本要求與spark版本相同
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"
src/main/scala右擊新建Package:com.whq.test
右擊包,新建Scala Class選擇Kind為Object,名稱為Hi,代碼如下
package com.whq.testimport org.apache.spark.{SparkConf, SparkContext}object HdfsWordCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: HdfsWordCount <directory>")System.exit(1)}val inputFile=args(0)val outputFile=args(1)val conf = new SparkConf().setAppName("wordCount")// Create a Scala Spark Context.val sc = new SparkContext(conf)// Load our input data.val input = sc.textFile(inputFile)// Split up into words.val words = input.flatMap(line => line.split(" "))// Transform into word and count.val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}// Save the word count back out to a text file, causing evaluation.counts.saveAsTextFile(outputFile)}}統計分組信息。
右側sbt點sbt tasks中的package,打包后的文件拷貝到服務器上準備運行。
?
su - hdfs
vi hello.txt添加內容
you,jumpi,jumpyou,jumpi,jumpjump?
提交到spark目錄下
hdfs dfs -put hello.txt /spark/
清空目標目錄
hdfs dfs -rm -r /spark/out
?
提交運行
spark-submit --master yarn --deploy-mode client --class com.whq.test.HdfsWordCount sparksbt_2.10-0.1.jar /spark/hello.txt /spark/out
?
結果會輸出到hdfs的/spark/out目錄中
查看結果命令
hdfs dfs -cat /spark/out/_SUCCESS
hdfs dfs -cat /spark/out/part-00000
hdfs dfs -cat /spark/out/part-00001
?
?
Spark常用接口
常用接口
Spark主要使用到如下這幾個類:
- SparkContext:是Spark的對外接口,負責向調用該類的java應用提供Spark的各種功能,如連接Spark集群,創建RDD等。
- SparkConf:Spark應用配置類,如設置應用名稱,執行模式,executor內存等。
- RDD(Resilient Distributed Dataset):用于在Spark應用程序中定義RDD的類。
- PairRDDFunctions:為key-value對的RDD數據提供運算操作,如groupByKey。
- Broadcast: 廣播變量類。廣播變量允許保留一個只讀的變量,緩存在每一臺機器上,而非每個任務保存一份拷貝。
- StorageLevel: 數據存儲級別,有內存(MEMORY_ONLY),磁盤(DISK_ONLY),內存+磁盤(MEMORY_AND_DISK)等。
RDD上支持兩種類型的操作: :transformation和action,這兩種類型的常用方法如下。
表?transformation
| 方法 | 說明 |
| map(func) | 對調用map的RDD數據集中的每個element都使用func。 |
| filter(func) | 對RDD中所有元素調用func,返回f為true的元素。 |
| flatMap(func) | 先對RDD所有元素調用func,然后將結果扁平化。 |
| sample(withReplacement,faction,seed) | 抽樣。 |
| union(otherDataset) | 返回一個新的dataset,包含源dataset和給定dataset的元素的集合。 |
| distinct([numTasks]) | 去除重復元素。 |
| groupByKey(numTasks) | 返回(K,Iterable[V]),將key相同的value組成一個集合。 |
| reduceByKey(func,[numTasks]) | 對key相同的value調用func。 |
| sortByKey([ascending],[numTasks]) | 按照key來進行排序,是升序還是降序,ascending是boolean類型。 |
| join(otherDataset,[numTasks]) | 當有兩個KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks為并發的任務數。 |
| cogroup(otherDataset,[numTasks]) | 當有兩個KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks為并發的任務數。 |
| cartesian(otherDataset) | 笛卡爾積。 |
表?action
| API | 說明 |
| reduce(func) | 對RDD中的元素調用func。 |
| collect() | 返回包含RDD中所有元素的一個數組。 |
| count() | 返回的是dataset中的element的個數。 |
| first() | 返回的是dataset中的第一個元素。 |
| take(n) | 返回前n個elements。 |
| takeSample(withReplacement,num,seed) | takeSample(withReplacement,num,seed)對dataset隨機抽樣,返回有num個元素組成的數組。withReplacement表示是否使用replacement。 。 |
| saveAsTextFile(path) | 把dataset寫到一個text file中,或者hdfs,或者hdfs支持的文件系統中,spark把每條記錄都轉換為一行記錄,然后寫到file中。 |
| saveAsSequenceFile(path) | 只能用在key-value對上,然后生成SequenceFile寫到本地或者hadoop文件系統。 |
| countByKey() | 對每個key出現的次數做統計。 |
| foreach(func) | 在數據集的每一個元素上,運行函數func。 |
| countByValue() | 對RDD中每個元素出現的次數進行統計。 |
?
?
總結
以上是生活随笔為你收集整理的Scala 开发 Spark 程序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 树状数组的建树 单点修改 单点查询
- 下一篇: 青蓝电影质感LR预设达芬奇/PS/PR/