02、体验Spark shell下RDD编程
02、體驗(yàn)Spark shell下RDD編程
1、Spark RDD介紹
RDD是Resilient Distributed Dataset,中文翻譯是彈性分布式數(shù)據(jù)集。該類是Spark是核心類成員之一,是貫穿Spark編程的始終。初期階段,我們可以把RDD看成是Java中的集合就可以了,在后面的章節(jié)中會(huì)詳細(xì)講解RDD的內(nèi)部結(jié)構(gòu)和工作原理。
2、Spark-shell下實(shí)現(xiàn)對(duì)本地文件的單詞統(tǒng)計(jì)
2.1思路
word count是大數(shù)據(jù)學(xué)習(xí)的經(jīng)典案例,很多功能實(shí)現(xiàn)都可以歸結(jié)為是word count的使用。工作過程為使用SparkContext對(duì)象的textFile方法加載文件形成Spark RDD1,RDD1中每個(gè)元素就是文件中的每一行文本,然后對(duì)RDD的每個(gè)元素進(jìn)行壓扁flatMap操作,形成RDD2,RDD2中每個(gè)元素是將RDD1的每行拆分出來產(chǎn)生的單詞,因此RDD2就是單詞的集合,然后再對(duì)RDD2進(jìn)行標(biāo)一成對(duì),形成(單詞,1)的元組的集合RDD3,最后對(duì)RDD3進(jìn)行按照key進(jìn)行聚合操作形成RDD4,最終將RDD4計(jì)算后得到的集合就是每個(gè)單詞的數(shù)量
2.2 處理流程
App->SparkContext: textFile加載文件 SparkContext->RDD1: 創(chuàng)建RDD RDD1-->App: 返回RDD1 App->RDD1: flatMap壓扁操作 RDD1->RDD2: 產(chǎn)生RDD2 RDD2-->App: 返回RDD2 App->RDD2: map標(biāo)一成對(duì) RDD2->RDD3: 產(chǎn)生RDD3 RDD3-->App: 返回RDD3 App->RDD3: reduceByKey聚合 RDD3->RDD4: 產(chǎn)生RDD4 RDD4-->App: 返回RDD4 App->RDD4: collect收集結(jié)果數(shù)據(jù)2.3 分步實(shí)現(xiàn)代碼
// 進(jìn)入spark shell環(huán)境 $>spark-shell// 1.加載文件 scala>val rdd1 = sc.textFile("file:///homec/centos/1.txt")// 2.壓扁每行 scala>val rdd2 = rdd1.flatMap(_.split(" ")) // 3.標(biāo)1成對(duì) scala>val rdd3 = rdd2.map(w=>(w,1))// 4.按照key聚合每個(gè)key下的所有值 scala>val rdd4 = rdd3.reduceByKey(_+_)// 5.顯式數(shù)據(jù) scala>rdd4.collect()2.4 一步實(shí)現(xiàn)代碼
$scala>sc.textFile("file:///home/centos/1.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect3、Spark-shell下實(shí)現(xiàn)對(duì)氣溫?cái)?shù)據(jù)的最大最小聚合
3.1 思路分析
氣溫?cái)?shù)據(jù)數(shù)各年度內(nèi)氣溫列表,將每一行變換成(year,temp)元組后,按照yearn進(jìn)行聚合即可。
3.2 處理流程
App->SparkContext: textFile加載文件 SparkContext->RDD1: 產(chǎn)生RDD1 RDD1-->App: 返回RDD1 App->RDD1: map變換每行為(year,(max,min))元組 RDD1->RDD2: 產(chǎn)生RDD2 RDD2-->App: 返回RDD2 App->RDD2: reduceByKey雙聚合氣溫極值 RDD2->RDD3:產(chǎn)生RDD3 App->RDD3: collect()收集結(jié)果3.3 分步實(shí)現(xiàn)代碼
// 進(jìn)入spark shell環(huán)境 $>spark-shell// 1.加載氣溫?cái)?shù)據(jù)文件 scala>val rdd1 = sc.textFile("/home/centos/temps.dat")// 2.壓扁每行 scala>val rdd2 = rdd1.flatMap(e=>{val arr = e.split(" ")(arr(0).toInt, (arr(1).toInt ,arr(1).toInt)) }) // 3.reduceByKey scala>val rdd3 = rdd2.reduceByKey((a,b)=>{import scala.math(math.max(a(0),b(0)) , math.min(a(1),b(1))) })// 4.收集日志 scala>rdd3.collect()3.4 一步實(shí)現(xiàn)代碼
$scala>sc.textFile("file:///home/centos/temps.dat").map(line=>{val arr = line.split(" ")(arr(0).toInt,(arr(1).toInt , arr(1).toInt))}).reduceByKey((a,b)=>{import scala.math(math.max(a(0) , b(0)) , math.min(a(1) , b(1)))}).collect()轉(zhuǎn)載于:https://www.cnblogs.com/xupccc/p/9543961.html
總結(jié)
以上是生活随笔為你收集整理的02、体验Spark shell下RDD编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于互信息(Mutual Informa
- 下一篇: EF另一个 SqlParameterCo