Spark之spark shell
前言:要學(xué)習(xí)spark程序開(kāi)發(fā),建議先學(xué)習(xí)spark-shell交互式學(xué)習(xí),加深對(duì)spark程序開(kāi)發(fā)的理解。spark-shell提供了一種學(xué)習(xí)API的簡(jiǎn)單方式,以及一個(gè)能夠進(jìn)行交互式分析數(shù)據(jù)的強(qiáng)大工具,可以使用scala編寫(xiě)(scala運(yùn)行與Java虛擬機(jī)可以使用現(xiàn)有的Java庫(kù))或使用Python編寫(xiě)。
1.啟動(dòng)spark-shell
? ? spark-shell的本質(zhì)是在后臺(tái)調(diào)用了spark-submit腳本來(lái)啟動(dòng)應(yīng)用程序的,在spark-shell中已經(jīng)創(chuàng)建了一個(gè)名為sc的SparkContext對(duì)象,在4個(gè)CPU核運(yùn)行spark-shell命令如下:
spark-shell --master local[4]? ? 如果指定Jar包路徑,則命令如下:
spark-shell --master local[4] --jars xxx.jar,yyy,jar? ? --master用來(lái)設(shè)置context將要連接并使用的資源主節(jié)點(diǎn),master的值是standalone模式中spark的集群地址、yarn或mesos集群的URL,或是一個(gè)local地址
? ? --jars可以添加需要用到的jar包,通過(guò)逗號(hào)分隔來(lái)添加多個(gè)包。
2.加載text文件
? ? spark創(chuàng)建sc后,可以加載本地文件創(chuàng)建RDD,這里測(cè)試是加載spark自帶的本地文件README.md,返回一個(gè)MapPartitionsRDD文件。
scala> val textFile = sc.textFile("file:///opt/cloud/spark-2.1.1-bin-hadoop2.7/README.md");
textFile: org.apache.spark.rdd.RDD[String] = file:///opt/cloud/spark-2.1.1-bin-hadoop2.7/README.md MapPartitionsRDD[9] at textFile at <console>:24
? ? 加載HDFS文件和本地文件都是使用textFile,區(qū)別是添加前綴(hdfs://和file://)進(jìn)行標(biāo)識(shí),從本地讀取文件直接返回MapPartitionsRDD,而從HDFS讀取的文件是先轉(zhuǎn)成HadoopRDD,然后隱試轉(zhuǎn)換成MapPartitionsRDD。想了解MapPartitions可以看這篇MapPartition和Map的區(qū)別。
3.簡(jiǎn)單RDD操作
? ? 對(duì)于RDD可以執(zhí)行Transformation返回新的RDD,也可以執(zhí)行Action得到返回結(jié)果。first命令返回文件第一行,count命令返回文件所有行數(shù)。
scala> textFile.first(); res6: String = # Apache Sparkscala> textFile.count(); res7: Long = 104接下來(lái)進(jìn)行transformation操作,使用filter命令從README.md文件中抽取出一個(gè)子集,返回一個(gè)新的FilteredRDD。
scala> val textFilter = textFile.filter(line=>line.contains("Spark")); textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at filter at <console>:26鏈接多個(gè)Transformation和Action,計(jì)算包括"Spark"字符串的行數(shù)。
scala> textFile.filter(line=>line.contains("Spark")).count(); res10: Long = 204.RDD應(yīng)用的簡(jiǎn)單操作
(1)計(jì)算文本中單詞最多的一行的單詞數(shù)
scala> textFile.map(line =>line.split(" ").size).reduce((a,b) => if (a > b) a else b); res11: Int = 22先將每一行的單詞使用空格進(jìn)行拆分,并統(tǒng)計(jì)每一行的單詞數(shù),創(chuàng)建一個(gè)基于單詞數(shù)的新RDD,然后對(duì)該RDD進(jìn)行Reduce操作返回最大值。
(2)統(tǒng)計(jì)單詞
詞頻統(tǒng)計(jì)WordCount是大數(shù)據(jù)處理最流行的入門(mén)程序之一,Spark可以很容易實(shí)現(xiàn)WordCount操作。
//這個(gè)過(guò)程返回的是一個(gè)(string,int)類型的鍵值對(duì)ShuffledRDD(y執(zhí)行reduceByKey的時(shí)候需要進(jìn)行Shuffle操作,返回的是一個(gè)Shuffle形式的RDD),最后用Collect聚合統(tǒng)計(jì)結(jié)果scala> val wordCount = textFile.flatMap(line =>line.split(" ")).map(x => (x,1)).reduceByKey((a,b) => a+b); wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:26 scala> wordCount.collect [Stage 7:> (0 + 0)
[Stage 7:> (0 + 2)
res12: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (YARN,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (["Parallel,1), (ar... //這里使用了占位符_,使表達(dá)式更為簡(jiǎn)潔,是Scala語(yǔ)音的特色,每個(gè)_代表一個(gè)參數(shù)。
scala> val wordCount2 = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_); wordCount2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at <console>:26 scala> wordCount2.collect res14: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (YARN,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (["Parallel,1), (ar... //Spark默認(rèn)不進(jìn)行排序,如有需要排序輸出,排序的時(shí)候?qū)ey和value互換,使用sortByKey方法指定升序(true)和降序(false)
scala> val wordCount3 = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)); wordCount3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at map at <console>:26scala> wordCount3.collect res15: Array[(String, Int)] = Array(("",71), (the,24), (to,17), (Spark,16), (for,12), (##,9), (and,9), (a,8), (can,7), (run,7), (on,7), (is,6), (in,6), (using,5), (of,5), (build,4), (Please,4), (with,4), (also,4), (if,4), (including,4), (an,4), (You,4), (you,4), (general,3), (documentation,3), (example,3), (how,3), (one,3), (For,3), (use,3), (or,3), (see,3), (Hadoop,3), (Python,2), (locally,2), (This,2), (Hive,2), (SparkPi,2), (refer,2), (Interactive,2), (Scala,2), (detailed,2), (return,2), (Shell,2), (class,2), (Python,,2), (set,2), (building,2), (SQL,2), (guidance,2), (cluster,2), (shell:,2), (supports,2), (particular,2), (following,2), (which,2), (should,2), (To,2), (be,2), (do,2), (./bin/run-example,2), (It,2), (1000:,2), (tests,2), (examples,2), (at,2), (`examples`,2), (that,2), (H...
5.RDD緩存使用RDD的cache()方法
?
轉(zhuǎn)載于:https://www.cnblogs.com/schoolbag/p/9635615.html
總結(jié)
以上是生活随笔為你收集整理的Spark之spark shell的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 云栖blog
- 下一篇: VS2010/MFC编程入门之二十三(常