02、体验Spark shell下RDD编程
生活随笔
收集整理的這篇文章主要介紹了
02、体验Spark shell下RDD编程
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
02、體驗Spark shell下RDD編程
1、Spark RDD介紹
RDD是Resilient Distributed Dataset,中文翻譯是彈性分布式數(shù)據(jù)集。該類是Spark是核心類成員之一,是貫穿Spark編程的始終。初期階段,我們可以把RDD看成是Java中的集合就可以了,在后面的章節(jié)中會詳細講解RDD的內部結構和工作原理。
2、Spark-shell下實現(xiàn)對本地文件的單詞統(tǒng)計
2.1思路
word count是大數(shù)據(jù)學習的經典案例,很多功能實現(xiàn)都可以歸結為是word count的使用。工作過程為使用SparkContext對象的textFile方法加載文件形成Spark RDD1,RDD1中每個元素就是文件中的每一行文本,然后對RDD的每個元素進行壓扁flatMap操作,形成RDD2,RDD2中每個元素是將RDD1的每行拆分出來產生的單詞,因此RDD2就是單詞的集合,然后再對RDD2進行標一成對,形成(單詞,1)的元組的集合RDD3,最后對RDD3進行按照key進行聚合操作形成RDD4,最終將RDD4計算后得到的集合就是每個單詞的數(shù)量
2.2 處理流程
App->SparkContext: textFile加載文件 SparkContext->RDD1: 創(chuàng)建RDD RDD1-->App: 返回RDD1 App->RDD1: flatMap壓扁操作 RDD1->RDD2: 產生RDD2 RDD2-->App: 返回RDD2 App->RDD2: map標一成對 RDD2->RDD3: 產生RDD3 RDD3-->App: 返回RDD3 App->RDD3: reduceByKey聚合 RDD3->RDD4: 產生RDD4 RDD4-->App: 返回RDD4 App->RDD4: collect收集結果數(shù)據(jù)2.3 分步實現(xiàn)代碼
// 進入spark shell環(huán)境 $>spark-shell// 1.加載文件 scala>val rdd1 = sc.textFile("file:///homec/centos/1.txt")// 2.壓扁每行 scala>val rdd2 = rdd1.flatMap(_.split(" ")) // 3.標1成對 scala>val rdd3 = rdd2.map(w=>(w,1))// 4.按照key聚合每個key下的所有值 scala>val rdd4 = rdd3.reduceByKey(_+_)// 5.顯式數(shù)據(jù) scala>rdd4.collect()2.4 一步實現(xiàn)代碼
$scala>sc.textFile("file:///home/centos/1.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect3、Spark-shell下實現(xiàn)對氣溫數(shù)據(jù)的最大最小聚合
3.1 思路分析
氣溫數(shù)據(jù)數(shù)各年度內氣溫列表,將每一行變換成(year,temp)元組后,按照yearn進行聚合即可。
3.2 處理流程
App->SparkContext: textFile加載文件 SparkContext->RDD1: 產生RDD1 RDD1-->App: 返回RDD1 App->RDD1: map變換每行為(year,(max,min))元組 RDD1->RDD2: 產生RDD2 RDD2-->App: 返回RDD2 App->RDD2: reduceByKey雙聚合氣溫極值 RDD2->RDD3:產生RDD3 App->RDD3: collect()收集結果3.3 分步實現(xiàn)代碼
// 進入spark shell環(huán)境 $>spark-shell// 1.加載氣溫數(shù)據(jù)文件 scala>val rdd1 = sc.textFile("/home/centos/temps.dat")// 2.壓扁每行 scala>val rdd2 = rdd1.flatMap(e=>{val arr = e.split(" ")(arr(0).toInt, (arr(1).toInt ,arr(1).toInt)) }) // 3.reduceByKey scala>val rdd3 = rdd2.reduceByKey((a,b)=>{import scala.math(math.max(a(0),b(0)) , math.min(a(1),b(1))) })// 4.收集日志 scala>rdd3.collect()3.4 一步實現(xiàn)代碼
$scala>sc.textFile("file:///home/centos/temps.dat").map(line=>{val arr = line.split(" ")(arr(0).toInt,(arr(1).toInt , arr(1).toInt))}).reduceByKey((a,b)=>{import scala.math(math.max(a(0) , b(0)) , math.min(a(1) , b(1)))}).collect()轉載于:https://www.cnblogs.com/xupccc/p/9543961.html
總結
以上是生活随笔為你收集整理的02、体验Spark shell下RDD编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于互信息(Mutual Informa
- 下一篇: EF另一个 SqlParameterCo