编程实现将rdd转换为dataframe:源文件内容如下(_大数据 什么是RDD?可以干什么?为什么要有RDD?...
什么是RDD
彈性分布式數據集(Resilient Distributed Dataset,RDD)是 Spark 中的核心概念。
RDD在抽象上來講是一種抽象的分布式的數據集。它是被分區的,每個分區分布在集群中的不同的節點上。從而可以讓數據進行并行的計算它主要特點就是彈性和容錯性。彈性:RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁盤容錯性:RDD可以自動從節點失敗中恢復過來。即如果某個節點上的RDD partition,因為節點故障,導致數據丟了,那么RDD會自動通過自己的數據來源重新計算該partition。RDD來源:通常是Hadoop的HDFS,Hive 表等等;也可以通過Linux的本地文件;應用程序中的數組;jdbc(mysql 等);也可以是kafka、flume數據采集工具、中間件等轉化而來的RDD
RDD的創建
進行Spark核心編程時,首先要做的第一件事,就是創建一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程序的輸入源數據。然后在創建了初始的RDD之后,才可以通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。Spark Core提供了三種創建RDD的方式,包括:使用程序中的集合創建RDD;使用本地文件創建RDD;使用HDFS文件創建RDD。
1、使用程序中的集合創建RDD,主要用于進行測試,在實際部署到集群運行之前,自己使用集合構造測試數據,來測試后面的spark應用流程。
2、使用本地文件創建RDD,主要用于臨時性地處理一些存儲了大量數據的文件。
3、使用HDFS文件創建RDD,應該是最常用的生產環境處理方式,主要可以針對HDFS上存儲的大數據,進行離線批處理操作。
RDD特性
Spark RDD五大特性
1.分區列表
Spark RDD是被分區的,每一個分區都會被一個計算任務(Task)處理,分區數決定了并行計算的數量,RDD的并行度默認從父RDD傳給子RDD。默認情況下,一個HDFS上的數據分片就是一個 partiton,RDD分片數決定了并行計算的力度,可以在創建RDD時指定RDD分片個數(分區。
如果不指定分區數量,當RDD從集合創建時,則默認分區數量為該程序所分配到的資源的CPU核數(每個Core可以承載2~4個 partition),如果是從HDFS文件創建,默認為文件的 Block數。
2.每一個分區(分片)都有一個計算函數
每個分區都會有計算函數, Spark的RDD的計算函數是以分片為基本單位的,每個RDD都會實現 compute函數,對具體的分片進行計算,RDD中的分片是并行的,所以是分布式并行計算。
有一點非常重要,就是由于RDD有前后依賴關系,遇到寬依賴關系,如reduce By Key等這些操作時劃分成 Stage, Stage內部的操作都是通過 Pipeline進行的,在具體處理數據時它會通過 Blockmanager來獲取相關的數據,因為具體的 split要從外界讀數據,也要把具體的計算結果寫入外界,所以用了一個管理器,具體的 split都會映射成 BlockManager的Block,而具體的splt會被函數處理,函數處理的具體形式是以任務的形式進行的。
3.依賴于其他RDD的列表( a list of dependencies on other RDDS)
由于RDD每次轉換都會生成新的RDD,所以RDD會形成類似流水線一樣的前后依賴關系,當然寬依賴就不類似于流水線了,寬依賴后面的RDD具體的數據分片會依賴前面所有的RDD的所有數據分片,這個時候數據分片就不進行內存中的 Pipeline,一般都是跨機器的,因為有前后的依賴關系,所以當有分區的數據丟失時, Spark會通過依賴關系進行重新計算,從而計算出丟失的數據,而不是對RDD所有的分區進行重新計算。
RDD之間的依賴有兩種:窄依賴( Narrow Dependency)和寬依賴( Wide Dependency)。
RDD是 Spark的核心數據結構,通過RDD的依賴關系形成調度關系。通過對RDD的操作形成整個 Spark程序。
4.key- value數據類型的RDD分區器( a Partitioner for key- alue RDDS)、控制分區策略和分區數
每個key- value形式的RDD都有 Partitioner屬性,它決定了RDD如何分區。
當然,Partiton的個數還決定了每個Stage的Task個數。
RDD的分片函數可以分區( Partitioner),可傳入相關的參數,如 Hash Partitioner和 Range Partitioner,它本身針對key- value的形式,如果不是key-value的形式它就不會有具體的 Partitioner, Partitioner本身決定了下一步會產生多少并行的分片,同時它本身也決定了當前并行( Parallelize) Shuffle輸出的并行數據,從而使Spark具有能夠控制數據在不同結點上分區的特性,用戶可以自定義分區策略,如Hash分區等。
spark提供了 partition By運算符,能通過集群對RDD進行數據再分配來創建一個新的RDD。
5.每個分區都有一個優先位置列表( a list of preferred locations to compute each split on)
優先位置列表會存儲每個 Partition的優先位置,對于一個HDFS文件來說,就是每個Partition塊的位置。
觀察運行 Spark集群的控制臺就會發現,Spark在具體計算、具體分片以前,它已經清楚地知道任務發生在哪個結點上,也就是說任務本身是計算層面的、代碼層面的,代碼發生運算之前它就已經知道它要運算的數據在什么地方,有具體結點的信息。
這就符合大數據中數據不動代碼動的原則,即“移動數據不如移動計算”的理念。數據不動代碼動的最高境界是數據就在當前結點的內存中。這時候有可能是 Memory級別或 Tachyon級別的,Spark本身在進行任務調度時會盡可能地將任務分配到處理數據的數據塊所在的具體位置。
Scala源代碼函數get Parferredlocations可知,每次計算都符合完美的數據本地性。可在RDD類源代碼文件中找到4個方法和1個屬性,對應上述所闡述的RDD的五大特性,源代碼剪輯如下
RDD有哪些操作
reduce()
接受一個函數,作用在RDD兩個類型相同的元素上,返回新元素。
可以實現,RDD中元素的累加,計數,和其他類型的聚集操作。
scala> val rdd = sc.parallelize(Array(1,2,3,3))rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at :24scala> rdd.collect()res9: Array[Int] = Array(1, 2, 3, 3)scala> rdd.reduce((x,y)=>x+y)res10: Int = 9collect()
遍歷整個RDD,向driver program返回RDD的內容
需要單機內存能夠容納下(因為數據要拷貝到driver,測試使用)
大數據的時候,使用saveAsTextFile() action等。
take(n)
返回RDD的n個元素(同時嘗試訪問最少的partitions)
返回結果是無序的,測試使用。
scala> rdd.take(2)res11: Array[Int] = Array(1, 2)scala> rdd.take(3)res12: Array[Int] = Array(1, 2, 3)top()
排序(根據RDD中數據的比較器)
scala> rdd.top(1)res13: Array[Int] = Array(3)scala> rdd.top(2)res14: Array[Int] = Array(3, 3)scala> rdd.top(3)res15: Array[Int] = Array(3, 3, 2)foreach()
計算RDD中的每個元素,但不返回到本地。
可以配合println()友好的打印出數據。
map
map的輸入變換函數應用于RDD中所有元素,而mapPartitions應用于所有分區。區別于mapPartitions主要在于調用粒度不同。如parallelize(1 to 10, 3),map函數執行10次,而mapPartitions函數執行3次。
每個方框表示一個 RDD 分區,左側的分區經過用戶自定義函數 f:T->U 映射為右側的新 RDD 分區。但是,實際只有等到 Action算子觸發后,這個 f 函數才會和其他函數在一個stage 中對數據進行運算。在圖 1 中的第一個分區,數據記錄 V1 輸入 f,通過 f 轉換輸出為轉換后的分區中的數據記錄 V’1。
filter(function)
過濾操作,滿足filter內function函數為true的RDD內所有元素組成一個新的數據集。如:filter(a == 1)。
flatMap(function)
map是對RDD中元素逐一進行函數操作映射為另外一個RDD,而flatMap操作是將函數應用于RDD之中的每一個元素,將返回的迭代器的所有內容構成新的RDD。而flatMap操作是將函數應用于RDD中每一個元素,將返回的迭代器的所有內容構成RDD。
flatMap與map區別在于map為“映射”,而flatMap“先映射,后扁平化”,map對每一次(func)都產生一個元素,返回一個對象,而flatMap多一步就是將所有對象合并為一個對象。
將原來 RDD 中的每個元素通過函數 f 轉換為新的元素,并將生成的 RDD 的每個集合中的元素合并為一個集合,內部創建 FlatMappedRDD(this,sc.clean(f))。
表 示 RDD 的 一 個 分 區 ,進 行 flatMap函 數 操 作, flatMap 中 傳 入 的 函 數 為 f:T->U, T和 U 可以是任意的數據類型。將分區中的數據通過用戶自定義函數 f 轉換為新的數據。外部大方框可以認為是一個 RDD 分區,小方框代表一個集合。 V1、 V2、 V3 在一個集合作為 RDD 的一個數據項,可能存儲為數組或其他容器,轉換為V’1、 V’2、 V’3 后,將原來的數組或容器結合拆散,拆散的數據形成為 RDD 中的數據項。
mapPartitions(function)
區于foreachPartition(屬于Action,且無返回值),而mapPartitions可獲取返回值。與map的區別前面已經提到過了,但由于單獨運行于RDD的每個分區上(block),所以在一個類型為T的RDD上運行時,(function)必須是Iterator => Iterator類型的方法(入參)。
mapPartitionsWithIndex(function)
與mapPartitions類似,但需要提供一個表示分區索引值的整型值作為參數,因此function必須是(int, Iterator)=>Iterator類型的。
sample(withReplacement, fraction, seed)
采樣操作,用于從樣本中取出部分數據。withReplacement是否放回,fraction采樣比例,seed用于指定的隨機數生成器的種子。(是否放回抽樣分true和false,fraction取樣比例為(0, 1]。seed種子為整型實數。)
union(otherDataSet)
對于源數據集和其他數據集求并集,不去重。
intersection(otherDataSet)
對于源數據集和其他數據集求交集,并去重,且無序返回。
distinct([numTasks])
返回一個在源數據集去重之后的新數據集,即去重,并局部無序而整體有序返回。(詳細介紹見
groupByKey([numTasks])
在一個PairRDD或(k,v)RDD上調用,返回一個(k,Iterable)。主要作用是將相同的所有的鍵值對分組到一個集合序列當中,其順序是不確定的。groupByKey是把所有的鍵值對集合都加載到內存中存儲計算,若一個鍵對應值太多,則易導致內存溢出。
在此,用之前求并集的union方法,將pair1,pair2變為有相同鍵值的pair3,而后進行groupByKey
reduceByKey(function,[numTasks])
與groupByKey類似,卻有不同。如(a,1), (a,2), (b,1), (b,2)。groupByKey產生中間結果為( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey為(a,3), (b,3)。
reduceByKey主要作用是聚合,groupByKey主要作用是分組。(function對于key值來進行聚合)
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
類似reduceByKey,對pairRDD中想用的key值進行聚合操作,使用初始值(seqOp中使用,而combOpenCL中未使用)對應返回值為pairRDD,而區于aggregate(返回值為非RDD)
sortByKey([ascending], [numTasks])
同樣是基于pairRDD的,根據key值來進行排序。ascending升序,默認為true,即升序;numTasks
join(otherDataSet,[numTasks])
加入一個RDD,在一個(k,v)和(k,w)類型的dataSet上調用,返回一個(k,(v,w))的pair dataSet。
cartesian(otherDataSet)
求笛卡爾乘積。該操作不會執行shuffle操作。
pipe(command,[envVars])
通過一個shell命令來對RDD各分區進行“管道化”。通過pipe變換將一些shell命令用于Spark中生成的新RDD
coalesce(numPartitions)
重新分區,減少RDD中分區的數量到numPartitions。
repartition(numPartitions)
repartition是coalesce接口中shuffle為true的簡易實現,即Reshuffle RDD并隨機分區,使各分區數據量盡可能平衡。若分區之后分區數遠大于原分區數,則需要shuffle。
repartitionAndSortWithinPartitions(partitioner)
該方法根據partitioner對RDD進行分區,并且在每個結果分區中按key進行排序。
Action:
first()
返回數據集的第一個元素(類似于take(1))
takeSample(withReplacement, num, [seed])
對于一個數據集進行隨機抽樣,返回一個包含num個隨機抽樣元素的數組,withReplacement表示是否有放回抽樣,參數seed指定生成隨機數的種子。
該方法僅在預期結果數組很小的情況下使用,因為所有數據都被加載到driver端的內存中。
take(n)
返回一個包含數據集前n個元素的數組(從0下標到n-1下標的元素),不排序。
takeOrdered(n,[ordering])
返回RDD中前n個元素,并按默認順序排序(升序)或者按自定義比較器順序排序。
saveAsTextFile(path)
將dataSet中元素以文本文件的形式寫入本地文件系統或者HDFS等。Spark將對每個元素調用toString方法,將數據元素轉換為文本文件中的一行記錄。
若將文件保存到本地文件系統,那么只會保存在executor所在機器的本地目錄。
saveAsSequenceFile(path)(Java and Scala)
將dataSet中元素以Hadoop SequenceFile的形式寫入本地文件系統或者HDFS等。(對pairRDD操作)
saveAsObjectFile(path)(Java and Scala)
將數據集中元素以ObjectFile形式寫入本地文件系統或者HDFS等。
countByKey()
用于統計RDD[K,V]中每個K的數量,返回具有每個key的計數的(k,int)pairs的hashMap。
寬依賴和窄依賴
窄依賴(narrow dependencies)
子RDD的每個分區依賴于常數個父分區(與數據規模無關)輸入輸出一對一的算子,且結果RDD的分區結構不變。主要是map/flatmap輸入輸出一對一的算子,但結果RDD的分區結構發生了變化,如union/coalesce從輸入中選擇部分元素的算子,如filter、distinct、substract、sample寬依賴(wide dependencies)
子RDD的每個分區依賴于所有的父RDD分區對單個RDD基于key進行重組和reduce,如groupByKey,reduceByKey對兩個RDD基于key進行join和重組,如join經過大量shuffle生成的RDD,建議進行緩存。這樣避免失敗后重新計算帶來的開銷。總結
以上是生活随笔為你收集整理的编程实现将rdd转换为dataframe:源文件内容如下(_大数据 什么是RDD?可以干什么?为什么要有RDD?...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pythonos基础_python基础之
- 下一篇: python grid用法_Python