Spark详解(三):Spark编程模型(RDD概述)
1. RDD概述
RDD是Spark的最基本抽象,是對分布式內存的抽象使用,實現了以操作本地集合的方式來操作分布式數據集的抽象實現。RDD是Spark最核心的東西,它表示已被分區,不可變的并能夠被并行操作的數據集合,不同的數據集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到內存中,每次對RDD數據集的操作之后的結果,都可以存放到內存中,下一個操作可以直接從內存中輸入,省去了MapReduce大量的磁盤IO操作。這對于迭代運算比較常見的機器學習算法, 交互式數據挖掘來說,效率提升非常大。
RDD 最適合那種在數據集上的所有元素都執行相同操作的批處理式應用。在這種情況下, RDD 只需記錄血統中每個轉換就能還原丟失的數據分區,而無需記錄大量的數據操作日志。所以 RDD 不適合那些需要異步、細粒度更新狀態的應用 ,比如 Web 應用的存儲系統,或增量式的 Web 爬蟲等。對于這些應用,使用具有事務更新日志和數據檢查點的數據庫系統更為高效。
RDD實際上不存儲數據,這里方便理解,暫時理解為存儲數據
1.1 RDD簡介
RDD實現了多種模型計算,這些模型包括以下幾個方面的內容
- 迭代計算
- 交互式SQL查詢
- MapReduceRDDD:通過MapReduce的超集,能夠高效地執行MapReduce程序
- 流式數據處理:Spark提出了離散數據流(DStream)來解決這樣的問題,DStream把流式計算的執行當做一系列短而確定的批量計算的序列,并將狀態保存到RDD中。DStream根據相關RDD的依賴關系圖進行并行化恢復,可以達到快速恢復故障,避免了數據復制。另外,通過推測執行來對Straggler遷移執行,例如,對于慢任務運行經過推測的備份副本。
1.2 RDD的特征
1.3 RDD的五大特征
1.4 RDD的類型
Spark編程中開發者需要編寫一個驅動程序(DriverProgram)來連接到工作進程(Worker)。驅動程序定義一個或者多個RDD以及相關的操作,驅動程序同時記錄RDD的繼承關系,即“血統”。而工作進程(Worker)是一種運行的進程,它經過一系列操作后的RDD分區數據保存在內存中。
Spark的操作大致上可以分為以下四類操作
- Create Operation:用于RDD的創建工作
- Transformation Operation:將RDD通過一定的操作轉化為新的RDD
- Control Operation:進行RDD的持久化操作,可以將RDD按照不同的存儲策略保存在磁盤或者內存中
- Action Operation:能觸發Spark運行的操作,例如,對RDD進行collect就是Action操作。
什么是K,V格式的RDD?
如果RDD里面存儲的數據都是二元組對象,那么這個RDD我們就叫做K,V格式的RDD。
哪里體現RDD的彈性(容錯)?
partition數量,大小沒有限制,體現了RDD的彈性。
RDD之間依賴關系,可以基于上一個RDD重新計算出RDD。
哪里體現RDD的分布式?
RDD是由Partition組成,partition是分布在不同節點上的。RDD提供計算最佳位置,體現了數據本地化。體現了大數據中“計算移動數據不移動”的理念。
2. RDD的實現
2.1 作業調度
對于款依賴的操作,在Spark將中間結果物化到父分區的節點上,這里和MapReduce中的物化Map的輸出類似,可以簡化數據的故障恢復過程。
對于執行失敗的任務,只要它對應調度階段父類信息可以用,該任務就會分散到其他節點上重新執行。如果某些調度階段不可用(例如,因為Shuffle在map節點的輸出丟失了),則重新提交任務,并以并行計算的方式計算丟失的分區。
2.2 解析器集成
Spark提供了一種交互式的Shell(解析器),借助內存數據的低延遲性,可以讓用戶通過解析器對大數據進行交互式查詢。
2.3 內存管理
Spark提供了3種持久化RDD的存儲策略:為序列化Java對象存在內存中、序列化的數據存于內存以及存儲在磁盤中。
- 第一個選項的性能是最優的,因為可以直接訪問在Java虛擬機內存里的RDD對象
- 在空間有限的情況下,第二種方式可以讓用戶采用比Java對象更加有效的內存組織方式,但是代價是降低了性能。
- 第三種策略使用于RDD太大的情形,每次重新計算該RDD都會帶來額外的資源開銷(I/O)
對于內存使用LRU回收算法來間來進行管理,當計算得到一個新的RDD分區,但沒有足夠的空間來存儲的時候,系統會從最近最少未使用的RDD中回收其中一個分區的空間。
2.4 檢查點支持
雖然“血統”可以用于錯誤的RDD的恢復,但是對于很長的“血統”的RDD來說,這樣恢復的耗時很長,因此需要通過檢查點的方式(CheckPoint)保存到外部存儲當中。
通常情況下,對于包含寬依賴的長”血統“的RDD設置檢查點是非常有效的。
2.5 多用戶管理
- 在每個應用程序中,Spark運行多線程同時提交作業,并通過一種等級公平調度器來實現多個作業集群資源的共享,這種調度器和Hadoop Fair Scheduler類似。該算法主要用于創建基于相同內存數據的多用戶應用
- Spark的公平調度也使用延遲調度,通過輪詢每臺機器的數據,在保持公平的情況下給予作業高的本地性。
- 由于任務相對獨立,調度器還支持取消作業來為高優先級作業騰出資源
3. RDD編程接口
3.1 RDD分區(partitions)
一個RDD劃分成很多的分區(partition)分布在集群的節點中,分區的多少涉及對這個RDD進行并行計算的粒度。在RDD操作中用戶可以使用partitions方法獲取RDD劃分的分區數,當然用戶也可以設定分區數目。如果沒有指定將使用默認值,而默認數值是該程序所分配到CPU核數,如果是從HDFS文件創建,默認為文件的block數(有一點我們必須要注意,當我們顯示的設置分區數時,分區數不允許小于HDFS文件的block數)。
// 使用textFile方法獲取指定路徑的文件,未設置分區數 val rdd = sc.textFile("/app/spark/workcount.txt") // 使用partitions方法獲取分區數,假設默認的分區數為2,那么將返回2 val partitionSize = rdd.partitions.size// 顯示地設置RDD為6個分區 rdd = sc.textFile("/app/spark/wordcount.txt", 6) // 獲取分區數,此時返回6 partitionSize = rdd.partitions.size3.2 RDD首選位置(preferredlocations)
在Spark形成任務有向無環圖(DAG)時,會盡可能地把計算分配到靠近數據的位置,減少數據網絡傳輸。當RDD產生的時候存在首選位置,如HadoopRDD分區的首選位置就是HDFS塊所在的節點。當RDD分區被緩存,則計算應該發送到緩存分區所在的節點進行,再不然回溯RDD的血統,一直找到具有首選位置屬性的父RDD,并據此決定子RDD的位置。
3.3 RDD依賴關系(dependencies)
dependencies顧名思義就是依賴的意思,由于RDD是粗粒度的操作數據集,每一個轉換操作都會生成一個新的RDD,所以RDD之間就會形成類似于流水一樣的前后依賴關系,在spark中存在兩種類型的依賴,即窄依賴(Narrow dependencies)和寬依賴(wide dependencies)
- 窄依賴:每一個父RDD的分區最多只被子RDD的一個分區所使用
- 寬依賴:多個子RDD的分區會依賴于同一個父RDD的分區,
一:窄依賴可以在集群的一個節點上如流水線一般的執行,可以計算所有父RDD的分區,相反的,寬依賴需要取得父RDD的所有分區上的數據進行計算,將會執行類似于MapReduce一樣的shuffle操作,二:對于窄依賴來說,節點計算失敗后的恢復會更加有效,只需要重新計算對應的父RDD的分區,而且可以在其他的節點上并行地計算,相反的,在有寬依賴的繼承關系中,一個節點的失敗將會導致其父RDD的多個分區重新計算,這個代價是非常高的
3.4 RDD分區計算
Spark中RDD的計算操作都是以分區為單位的,而且計算函數都是在對迭代器復合,不需要保存每次計算的結果。分區計算一般是使用mapPartitions等操作來進行的,mapPartitions的輸入函數是應用于每個分區,也就是把每個分區的內容作為整體來處理的。
3.5 RDD分區函數(Partitioner)
分區劃分對于Shuffle類操作很關鍵,它決定了該操作的父RDD和子RDD之間的依賴類型。例如Join操作,如果協同劃分的話,兩個父RDD之間、父RDD和子RDD之間形成一致的分區安排,即同一個Key保證被映射到同一個分區,這樣就能形成窄依賴。反正如果不是協同劃分,導致寬依賴,這里所說的協同劃分是指的是指分區劃分器以產生前后一致的分區安排。
在Spark默認提供的兩種劃分器:哈希分區劃分器(HashPartitioner)和范圍分區劃分器(RangePartitioner),其Partitioner只存在于(K,V)類型的RDD。
4. 創建操作
目前有兩種類型的RDD,一種是并行集合(Prallelized Collections),接受到一個已經存在的Scala集合,然后進行并行計算;另外一種是從外部存儲創建RDD,外部存儲可以是文本文件或Hadoop文件系統HDFS,還可以使從Hadoop接口API創建。
4.1 并行化集合創建操作
例如:val rdd = sc.parallelize(Array(1 to 10)) 根據能啟動的executor的數量來進行切分多個slice,每一個slice啟動一個Task來進行處理。
val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的數量
4.2 外部存儲創建RDD
Spark可以將任何Hadoop所支持的存儲資源轉化成RDD,如本地文件(需要網絡文件系統,所有的節點都必須能訪問到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。
(1)使用textFile()方法可以將本地文件或HDFS文件轉換成RDD
支持整個文件目錄讀取,文件可以是文本或者壓縮文件(如gzip等,自動執行解壓縮并加載數據)。如textFile(”file:///dfs/data”)
支持通配符讀取,例如:
val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");val rdd2=rdd1.map(_.split("t")).filter(_.length==6)rdd2.count()......14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903......textFile()可選第二個參數slice,默認情況下為每一個block分配一個slice。用戶也可以通過slice指定更多的分片,但不能使用少于HDFS block的分片數。
(2)使用wholeTextFiles()讀取目錄里面的小文件,返回(用戶名、內容)對
(3)使用sequenceFileK,V方法可以將SequenceFile轉換成RDD。SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。
(4)使用SparkContext.hadoopRDD方法可以將其他任何Hadoop輸入類型轉化成RDD使用方法。一般來說,HadoopRDD中每一個HDFS block都成為一個RDD分區。
此外,通過Transformation可以將HadoopRDD等轉換成FilterRDD(依賴一個父RDD產生)和JoinedRDD(依賴所有父RDD)等。
5. 轉換操作
轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,并不會去執行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
| map(func) | 返回一個新的分布式數據集,由每個原元素經過func函數轉換后組成 |
| filter(func) | 返回一個新的數據集,由經過func函數后返回值為true的原元素組成 |
| flatMap(func) | 類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素) |
| sample(withReplacement, frac, seed) | 根據給定的隨機種子seed,隨機抽樣出數量為frac的數據 |
| union(otherDataset) | 返回一個新的數據集,由原數據集和參數聯合而成 |
| groupByKey([numTasks]) | 在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意:默認情況下,使用8個并行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task |
| reduceByKey(func, [numTasks]) | 在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。 |
| join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集 |
| groupWith(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)類型的數據集上調用,返回一個數據集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為CoGroup |
| cartesian(otherDataset) | 笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,所有元素交互進行笛卡爾積。 |
| flatMap(func) | 類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素) |
6. 行動Action操作
操作(Actions) (如:count, collect, save等),Actions操作會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啟動計算的動因。
| reduce(func) | 通過函數func聚集數據集中的所有元素。Func函數接受2個參數,返回一個值。這個函數必須是關聯性的,確??梢员徽_的并發執行 |
| count() | 返回數據集的元素個數 |
| take(n) | 返回一個數組,由數據集的前n個元素組成。注意,這個操作目前并非在多個節點上,并行執行,而是Driver程序所在機器,單機計算所有的元素(Gateway的內存壓力會增大,需要謹慎使用) |
| first() | 返回數據集的第一個元素(類似于take(1) |
| saveAsTextFile(path) | 將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每個元素的toString方法,并將它轉換為文件中的一行文本 |
| saveAsSequenceFile(path) | 將數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由key-value對組成,并都實現了Hadoop的Writable接口,或隱式可以轉換為Writable(Spark包括了基本類型的轉換,例如Int,Double,String等等) |
| foreach(func) | 在數據集的每一個元素上,運行函數func。這通常用于更新一個累加器變量,或者和外部存儲系統做交互 |
7. 控制操作
Spark可以將RDD持久化到內存或者磁盤文件系統中,把RDD持久化到內存中可以極大地提高迭代計算以及計算模型之間的數據共享,一般情況下執行節點的60%內存用于緩存數據,剩下的40%用于運行任務。
cache():RDD[T] persists():RDD[T] persist(level:StorageLevel):RDD[T]Spark中,RDD類可以使用cache() 和 persist() 方法來緩存。cache()是persist()的特例,將該RDD緩存到內存中。而persist可以指定一個StorageLevel。StorageLevel的列表可以在StorageLevel 伴生單例對象中找到:
object StorageLevel {val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon}// 其中,StorageLevel 類的構造器參數如下:class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOfSpark的不同StorageLevel ,目的滿足內存使用和CPU效率權衡上的不同需求。我們建議通過以下的步驟來進行選擇:
- 如果你的RDDs可以很好的與默認的存儲級別(MEMORY_ONLY)契合,就不需要做任何修改了。這已經是CPU使用效率最高的選項,它使得RDDs的操作盡可能的快;
- 如果不行,試著使用MEMORY_ONLY_SER并且選擇一個快速序列化的庫使得對象在有比較高的空間使用率的情況下,依然可以較快被訪問;
- 盡可能不要存儲到硬盤上,除非計算數據集的函數,計算量特別大,或者它們過濾了大量的數據。否則,重新計算一個分區的速度,和與從硬盤中讀取基本差不多快;
- 如果你想有快速故障恢復能力,使用復制存儲級別(例如:用Spark來響應web應用的請求)。所有的存儲級別都有通過重新計算丟失數據恢復錯誤的容錯機制,但是復制存儲級別可以讓你在RDD上持續的運行任務,而不需要等待丟失的分區被重新計算;
- 如果你想要定義你自己的存儲級別(比如復制因子為3而不是2),可以使用StorageLevel 單例對象的apply()方法;
- 在不使用cached RDD的時候,及時使用unpersist方法來釋放它。
總結
以上是生活随笔為你收集整理的Spark详解(三):Spark编程模型(RDD概述)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark详解(二):Spark完全分布
- 下一篇: Spark详解(四):Spark组件以及