日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(十四):Spark Core的RDD操作

發布時間:2023/11/28 生活经验 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(十四):Spark Core的RDD操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

RDD的操作

函數(算子)分類

Transformation函數

???????Action函數


RDD的操作

有一定開發經驗的讀者應該都使用過多線程,利用多核 CPU 的并行能力來加快運算速率。在開發并行程序時,可以利用類似 Fork/Join 的框架將一個大的任務切分成細小的任務,每個小任務模塊之間是相互獨立的,可以并行執行,然后將所有小任務的結果匯總起來,得到最終的結果。

一個非常好的例子便是歸并排序。對整個序列進行排序時,可以將序列切分成多個子序列進行排序,然后將排好序的子序列歸并起來得到最終的結果。

?

對 Hadoop 有所了解的讀者都知道 map、reduce 操作。對于大量的數據,我們可以通過 map 操作讓不同的集群節點并行計算,之后通過 reduce 操作將結果整合起來得到最終輸出。

?

???????函數(算子)分類

對于 Spark 處理的大量數據而言,會將數據切分后放入RDD作為Spark 的基本數據結構,開發者可以在 RDD 上進行豐富的操作,之后 Spark 會根據操作調度集群資源進行計算。總結起來,RDD 的操作主要可以分為 Transformation 和 Action 兩種

?

官方文檔:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations

RDD中操作(函數、算子)分為兩類:

?1)、Transformation轉換操作:返回一個新的RDD

which create a new dataset from an existing one

所有Transformation函數都是Lazy,不會立即執行,需要Action函數觸發

?2)、Action動作操作:返回值不是RDD(無返回值或返回其他的)

which return a value to the driver program after running a computation on the datase

所有Action函數立即執行(Eager),比如count、first、collect、take等

?

?

此外注意RDD中函數細節:

?第一點:RDD不實際存儲真正要計算的數據,而是記錄了數據的位置在哪里,數據的轉換關系(調用了什么方法,傳入什么函數);

?第二點:RDD中的所有轉換都是惰性求值/延遲執行的,也就是說并不會直接計算。只有當發生一個要求返回結果給Driver的Action動作時,這些轉換才會真正運行。之所以使用惰性求值/延遲執行,是因為這樣可以在Action時對RDD操作形成DAG有向無環圖進行Stage的劃分和并行優化,這種設計讓Spark更加有效率地運行

也就是在運行action之前,前面的計劃都列出來了,就可以根據集群的具體情況,優化分區的分布,和網絡的傳輸關系。讓性能最優。

如果沒有懶操作,那么一步步的執行,就沒辦法從整體做規劃,做優化了。

?

???????Transformation函數

在Spark中Transformation操作表示將一個RDD通過一系列操作變為另一個RDD的過程,這個操作可能是簡單的加減操作,也可能是某個函數或某一系列函數。值得注意的是Transformation操作并不會觸發真正的計算,只會建立RDD間的關系圖。

如下圖所示,RDD內部每個方框是一個分區。假設需要采樣50%的數據,通過sample函數,從 V1、V2、U1、U2、U3、U4 采樣出數據 V1、U1 和 U4,形成新的RDD

?

常用Transformation轉換函數:

?

?

?

轉換

含義

map(func)

返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成

filter(func)

返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成

flatMap(func)

類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)

mapPartitions(func)

類似于map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用于指定隨機數生成器種子

union(otherDataset)

對源RDD和參數RDD求并集后返回一個新的RDD

intersection(otherDataset)

對源RDD和參數RDD求交集后返回一個新的RDD

distinct([numTasks]))

對源RDD進行去重后返回一個新的RDD

groupByKey([numTasks]) ??

在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

?

sortByKey([ascending], [numTasks])

在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

與sortByKey類似,但是更靈活

join(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

cartesian(otherDataset)

笛卡爾積

pipe(command, [envVars])

對rdd進行管道操作

coalesce(numPartitions) ?

減少 RDD 的分區數到指定值。在過濾大量數據之后,可以執行此操作

repartition(numPartitions)

重新給 RDD 分區

?

???????Action函數

不同于Transformation操作,Action操作代表一次計算的結束,不再產生新的 RDD,將結果返回到Driver程序或者輸出到外部。所以Transformation操作只是建立計算關系,而Action 操作才是實際的執行者。每個Action操作都會調用SparkContext的runJob 方法向集群正式提交請求,所以每個Action操作對應一個Job。

常用Action執行函數:

?

動作

含義

reduce(func)

通過func函數聚集RDD中的所有元素,這個功能必須是可交換且可并聯的

collect()

在驅動程序中,以數組的形式返回數據集的所有元素

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(類似于take(1))

take(n)

返回一個由數據集的前n個元素組成的數組

takeSample(withReplacement,num, [seed])

返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用于指定隨機數生成器種子

takeOrdered(n, [ordering])

返回自然順序或者自定義順序的前 n 個元素

saveAsTextFile(path)

將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對于每個元素,Spark將會調用toString方法,將它裝換為文件中的文本

saveAsSequenceFile(path)

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。

saveAsObjectFile(path)

將數據集的元素,以 Java 序列化的方式保存到指定的目錄下

countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。

foreach(func)

在數據集的每一個元素上,運行函數func進行更新。

foreachPartition(func)

在數據集的每一個分區上,運行函數func

總結

以上是生活随笔為你收集整理的2021年大数据Spark(十四):Spark Core的RDD操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。