日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

reduceByKey和groupByKey区别与用法

發(fā)布時(shí)間:2023/12/9 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 reduceByKey和groupByKey区别与用法 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

在Spar看中,我們知道一切的操作都是基于RDD的。在使用中,RDD有一種非常特殊也是非常實(shí)用的format——pair RDD,即RDD的每一行是(key, value)的格式。這種格式很像Python的字典類(lèi)型,便于針對(duì)key進(jìn)行一些處理。

針對(duì)pair RDD這樣的特殊形式,spark中定義了許多方便的操作,今天主要介紹一下reduceByKey和groupByKey,因?yàn)樵诮酉聛?lái)講解《在spark中如何實(shí)現(xiàn)SQL中的group_concat功能?》時(shí)會(huì)用到這兩個(gè)operations。

首先,看一看spark官網(wǎng)[1]是怎么解釋的:

reduceByKey(func,?numPartitions=None)

Merge the values for each key using an associative reduce function. This will also perform the merging?locally on each mapper?before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with?numPartitions?partitions, or the default parallelism level if?numPartitions?is not specified.

也就是,reduceByKey用于對(duì)每個(gè)key對(duì)應(yīng)的多個(gè)value進(jìn)行merge操作,最重要的是它能夠在本地先進(jìn)行merge操作,并且merge操作可以通過(guò)函數(shù)自定義。

groupByKey(numPartitions=None)

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.?Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using?reduceByKey?or aggregateByKey will provide much better performance.

也就是,groupByKey也是對(duì)每個(gè)key進(jìn)行操作,但只生成一個(gè)sequence。需要特別注意“Note”中的話,它告訴我們:如果需要對(duì)sequence進(jìn)行aggregation操作(注意,groupByKey本身不能自定義操作函數(shù)),那么,選擇reduceByKey/aggregateByKey更好。這是因?yàn)間roupByKey不能自定義函數(shù),我們需要先用groupByKey生成RDD,然后才能對(duì)此RDD通過(guò)map進(jìn)行自定義函數(shù)操作。

為了更好的理解上面這段話,下面我們使用兩種不同的方式去計(jì)算單詞的個(gè)數(shù)[2]:

val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _) val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))

上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一樣的,但是,它們的內(nèi)部運(yùn)算過(guò)程是不同的。

(1)當(dāng)采用reduceByKeyt時(shí),Spark可以在每個(gè)分區(qū)移動(dòng)數(shù)據(jù)之前將待輸出數(shù)據(jù)與一個(gè)共用的key結(jié)合。借助下圖可以理解在reduceByKey里究竟發(fā)生了什么。 注意在數(shù)據(jù)對(duì)被搬移前同一機(jī)器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數(shù))。然后lamdba函數(shù)在每個(gè)區(qū)上被再次調(diào)用來(lái)將所有值reduce成一個(gè)最終結(jié)果。整個(gè)過(guò)程如下:

(2)當(dāng)采用groupByKey時(shí),由于它不接收函數(shù),spark只能先將所有的鍵值對(duì)(key-value pair)都移動(dòng),這樣的后果是集群節(jié)點(diǎn)之間的開(kāi)銷(xiāo)很大,導(dǎo)致傳輸延時(shí)。整個(gè)過(guò)程如下:

因此,在對(duì)大數(shù)據(jù)進(jìn)行復(fù)雜計(jì)算時(shí),reduceByKey優(yōu)于groupByKey。

另外,如果僅僅是group處理,那么以下函數(shù)應(yīng)該優(yōu)先于 groupByKey?:
  (1)、combineByKey?組合數(shù)據(jù),但是組合之后的數(shù)據(jù)類(lèi)型與輸入時(shí)值的類(lèi)型不一樣。
  (2)、foldByKey合并每一個(gè) key 的所有值,在級(jí)聯(lián)函數(shù)和“零值”中使用。

最后,對(duì)reduceByKey中的func做一些介紹:

如果是用python寫(xiě)的spark,那么有一個(gè)庫(kù)非常實(shí)用:operator[3],其中可以用的函數(shù)包括:大小比較函數(shù),邏輯操作函數(shù),數(shù)學(xué)運(yùn)算函數(shù),序列操作函數(shù)等等。這些函數(shù)可以直接通過(guò)“from operator import *”進(jìn)行調(diào)用,直接把函數(shù)名作為參數(shù)傳遞給reduceByKey即可。如下:

<span style="font-size:14px;">from operator import add rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)]</span>

?

轉(zhuǎn)載于:https://my.oschina.net/u/2935389/blog/1359396

總結(jié)

以上是生活随笔為你收集整理的reduceByKey和groupByKey区别与用法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。