SparkProgrammingRDDs
Introduction to Core Spark Concepts
- driver program:
- executors:
- the place to run the operations
- Spark automatically takes ur function and?ships it to executor nodes.
Programming with RDDs
- RDD: spark's core abstraction for working with data.
- RDD簡單來說就是元素的分布式集合
- 在Spark中所有的工作都可以表示成創建一個新的RDDs,轉換已有的RDDs,或者是在RDDs上運行operations
RDD Basics
- An?immutable?distributed collection of objects.
- 每個RDD被split成多個partitions,每個partition可能在cluster的不同節點上被計算
- RDD的創建:
- loading一個外部數據集
- distributing對象集合(eg: a list or set) RDD操作:(區分這兩種操作的原因是Spark的計算是lazy fashion的
Creating RDDs
- parallelize()
- textFile()
RDD Operations
- Transformation & Action
Transformations
- Compute lazily
- 沒有改變原RDD(immutable),而是生成了新的RDD,spark會保存這一系列依賴關系(lineage)
Actions
- Actually do something with our dataset
Passing Functions to Spark
- Scala: we can pass in functions defined inline, references to methods, or static functions
- Scala: 我們所傳送的函數和其中的數據引用需要被序列化(實現Java的Serializable接口)
- 如果我們pass一個對象中的函數,或者包含了對象中的字段的引用(eg: self.field),spark會把整個對象發送給worker nodes,這會遠大于你所需要的信息。并且如果你的對象不能持久化(pickle in python)的話,會導致是你的程序失敗。舉一個python的例子:
錯誤示范如下:
View Code正確示范:(提取對象中你所需的字段為局部變量,然后傳進去)
View Code- 同樣的,對scala我們也要盡量避免上述情況,而且要注意的是在scala中不需要顯示的self.或者this.,所以這種情況顯得很不明顯,但仍然要注意。舉個栗子
如果在scala中出現了NotSerializableException,那么多半是因為引用了一個不可序列化的類中的變量或字段。所以,傳送一個局部的可序列化的變量或函數才是安全的。
- Any code that is shared by RDD transformations must always be serializable.
Common Transformations and Actions
Basic RDDs
- 我們首先介紹基本的RDD操作,它們可以執行在所有RDDs上而不用管數據
Element-wise transformations
- map() and filter()
- flatMap(): 為每一個輸入元素產生多個輸出元素。返回的是一個迭代器iterator
Psedudo set operations
- 一些簡單的集合操作:(需要RDDs是同一類型的)
- RDD1.distinct() ?--> 十分昂貴的操作,需要shuffle all data over the network
- RDD1.union(RDD2) ?--> 最簡單的集合操作,會保留原RDD中的重復值
- RDD1.intersection(RDD2) --> 需要去重(來識別共同元素),因而也需要shuffle
- RDD1.substract(RDD2) ?--> perform shuffle
- RDD1.cartesian(RDD2) ?-->?returns all possible pairs of (a, b) where a is in the source RDD and b is in the other RDD .十分昂貴
- 為什么叫psedudo即假的集合操作呢,因為這里的集合丟失了一個重要特性:uniqueness即元素的唯一性。因為我們經常有duplicates
Actions
- reduce() &?fold() :都需要返回值和RDD中的元素保持同一類型。
fold()接收與reduce接收的函數簽名相同的函數,另外再加上一個初始值作為第一次調用的結果.
val sum = rdd.reduce((x, y) => x + y)- aggregate(): frees us from the constraint of having the return be the same types as the RDD we are working on.
aggregate的函數原型:
def aggregate [U: ClassTag] (zeroValue: U) (seqOp: (U,T)=>U,combOp: (U,U)=>U):U可以看到,(zeroValue: U)是給定的一個初值,后半部分有兩個函數,seqOp相當于是在各個分區里進行的聚合操作,它支持(U, T) => U,也就是支持不同類型的聚合。comOp是將sepOp后的結果聚合,此時的結果全部是U類,也就是只能進行同構聚合。
一個經典的例子是求平均值。即先用seqOp求出各個分區中的sum和個數,再將seqOp后的結果聚合得到總的sum和總的個數。
View Code- collect(): 返回整個RDD中的內容,常用于單元測試,因為它需要你的整個數據集能夠fit on a single machine.
- take(n): 返回RDD中的n個元素,并且試圖最小化所訪問的partition數,所以它可能會返回一個biased?collection。
- takeSample(withReplacement, num, seed): allows us to take a sample of our data either with or without replacement.
- foreach(): 可以允許我們在每個元素上執行操作or計算,而不需要把元素送回driver
?Converting Between RDD Types
- 一些functions只在某些特定類型RDD上可用。比如mean(), variance()只用于numericRDDs, join()只用于key/value pair RDDs.
- 在scala和Java中,這些方法未在標準RDD類中定義,因此為了訪問這些附加的功能,我們需要確保我們得到了正確的specialized class。
Scala
- 在scala中。RDDs的轉換可以通過使用隱式轉換(using implicit conversions)來自動進行。
- 看一段RDD.scala源碼中的介紹
- 關于scala隱式轉換:?當對象調用類中不存在的方法或成員時,編譯器會自動將對象進行隱式轉換
- 隱式轉換帶來的confusion:當你在RDD上調用mean()這樣的方法時,你會發現在RDDclass 的Scaladocs中找不到mean()方法,但是該方法能成功調用是由于實現了RDD[Double]到DoubleRDDFunctions的隱式轉換。
Persistence(Caching)
- As discussed earlier, Spark RDDs是惰性求值的,如果我們想要多次使用同一個RDD的話,Spark通常會每次都重新計算該RDD和它所有的依賴。這對于迭代算法是十分昂貴的。
- 一個比較直觀的例子如下,每次action的時候都會重新計算:
- 為了避免多次重復計算同一個RDD,我們可以讓Spark來persist數據。這樣的話,計算該RDD的那個節點會保存它們的partition。
- 如果有數據持久化的節點fail掉了,Spark會在需要的時候重新計算丟失的partitons。當然我們也可以通過在多個節點保存副本的方式來避免節點故障時的slowdown。
- Spark有很多levels of persistence供選擇。
-
Level Space Used CPU time In Memory On Disk Comments MEMORY_ONLY?
High Low Y N ? MEMORY_ONLY_SER?
Low High Y N ? MEMORY_AND_DISK?
High Medium Some Some Spils to disk if there is too much data to fit in memory. MEMORY_AND_DISK_SER?
Low High Some Some Spills to disk if there is too much data to fit in memory. Stores serialized representation in memory.?
DISK_ONLY?
Low High N Y ? - 在Java和scala中,缺省的情況下persist()回將未序列化的對象數據保存在JVM的堆中。
- 如果你試圖在內存中cache過多的數據,Spark將會自動驅逐舊的partitions,使用最少最近使用(Least Recently Used, LRU)緩存策略。對于MEMORY_ONLY level,下次訪問的時候會重新計算這些被驅逐的分片。
- 由于Spark'的各種機制,無論使用哪種level,你都可以不用擔心job breaking。但是緩存不必要的數據將會導致有用數據被驅逐,從而增加重計算的時間。
- Spark提供了unpersist()方法可以讓你手工地將RDD移除緩存。
-
Off-heap caching is experimental and uses Tachyon. If you are interested in off-heap caching with Spark, take a look at the Running Spark on Tachyon guide.?
?
轉載于:https://www.cnblogs.com/wttttt/p/6826719.html
總結
以上是生活随笔為你收集整理的SparkProgrammingRDDs的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 体验套餐管理系统
- 下一篇: 字符输出流写文本文件【Writer、Fi