日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark RDD使用详解4--Key-Value型Transformation算子

發布時間:2024/1/17 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark RDD使用详解4--Key-Value型Transformation算子 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Transformation處理的數據為Key-Value形式的算子大致可以分為:輸入分區與輸出分區一對一、聚集、連接操作。

輸入分區與輸出分區一對一

mapValues

mapValues:針對(Key,Value)型數據中的Value進行Map操作,而不對Key進行處理。?

方框代表RDD分區。a=>a+2代表只對( V1, 1)數據中的1進行加2操作,返回結果為3。

源碼:

?

?
  • /**

  • * Pass each value in the key-value pair RDD through a map function without changing the keys;

  • * this also retains the original RDD's partitioning.

  • */

  • def mapValues[U](f: V => U): RDD[(K, U)] = {

  • val cleanF = self.context.clean(f)

  • new MapPartitionsRDD[(K, U), (K, V)](self,

  • (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },

  • preservesPartitioning = true)

  • }

  • ?

    單個RDD或兩個RDD聚集

    (1)combineByKey

    combineByKey是對單個Rdd的聚合。相當于將元素為(Int,Int)的RDD轉變為了(Int,Seq[Int])類型元素的RDD。?
    定義combineByKey算子的說明如下:

    • createCombiner: V => C, 在C不存在的情況下,如通過V創建seq C。
    • mergeValue:(C, V) => C, 當C已經存在的情況下,需要merge,如把item V加到seq?
      C中,或者疊加。
    • mergeCombiners:(C,C) => C,合并兩個C。
    • partitioner: Partitioner(分區器),Shuffle時需要通過Partitioner的分區策略進行分區。
    • mapSideCombine: Boolean=true, 為了減小傳輸量,很多combine可以在map端先做。例如, 疊加可以先在一個partition中把所有相同的Key的Value疊加, 再shuffle。
    • serializerClass:String=null,傳輸需要序列化,用戶可以自定義序列化類。


    方框代表RDD分區。 通過combineByKey,將(V1,2)、 (V1,1)數據合并為(V1,Seq(2,1))。

    源碼:

    ?

    ?
  • /**

  • * Generic function to combine the elements for each key using a custom set of aggregation

  • * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C

  • * Note that V and C can be different -- for example, one might group an RDD of type

  • * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:

  • *

  • * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)

  • * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)

  • * - `mergeCombiners`, to combine two C's into a single one.

  • *

  • * In addition, users can control the partitioning of the output RDD, and whether to perform

  • * map-side aggregation (if a mapper can produce multiple items with the same key).

  • */

  • def combineByKey[C](createCombiner: V => C,

  • mergeValue: (C, V) => C,

  • mergeCombiners: (C, C) => C,

  • partitioner: Partitioner,

  • mapSideCombine: Boolean = true,

  • serializer: Serializer = null): RDD[(K, C)] = {

  • require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0

  • if (keyClass.isArray) {

  • if (mapSideCombine) {

  • throw new SparkException("Cannot use map-side combining with array keys.")

  • }

  • if (partitioner.isInstanceOf[HashPartitioner]) {

  • throw new SparkException("Default partitioner cannot partition array keys.")

  • }

  • }

  • val aggregator = new Aggregator[K, V, C](

  • self.context.clean(createCombiner),

  • self.context.clean(mergeValue),

  • self.context.clean(mergeCombiners))

  • if (self.partitioner == Some(partitioner)) {

  • self.mapPartitions(iter => {

  • val context = TaskContext.get()

  • new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))

  • }, preservesPartitioning = true)

  • } else {

  • new ShuffledRDD[K, V, C](self, partitioner)

  • .setSerializer(serializer)

  • .setAggregator(aggregator)

  • .setMapSideCombine(mapSideCombine)

  • }

  • }

  • ?
  • /**

  • * Simplified version of combineByKey that hash-partitions the output RDD.

  • */

  • def combineByKey[C](createCombiner: V => C,

  • mergeValue: (C, V) => C,

  • mergeCombiners: (C, C) => C,

  • numPartitions: Int): RDD[(K, C)] = {

  • combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))

  • }

  • ?

    (2)reduceByKey

    reduceByKey是更簡單的一種情況,只是兩個值合并成一個值,所以createCombiner很簡單,就是直接返回v,而mergeValue和mergeCombiners的邏輯相同,沒有區別。

    方框代表RDD分區。 通過用戶自定義函數(A,B)=>(A+B),將相同Key的數據(V1,2)、(V1,1)的value相加,結果為(V1,3)。

    源碼:

    ?

    ?
  • /**

  • * Merge the values for each key using an associative reduce function. This will also perform

  • * the merging locally on each mapper before sending results to a reducer, similarly to a

  • * "combiner" in MapReduce.

  • */

  • def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {

  • combineByKey[V]((v: V) => v, func, func, partitioner)

  • }

  • ?
  • /**

  • * Merge the values for each key using an associative reduce function. This will also perform

  • * the merging locally on each mapper before sending results to a reducer, similarly to a

  • * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.

  • */

  • def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {

  • reduceByKey(new HashPartitioner(numPartitions), func)

  • }

  • ?
  • /**

  • * Merge the values for each key using an associative reduce function. This will also perform

  • * the merging locally on each mapper before sending results to a reducer, similarly to a

  • * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/

  • * parallelism level.

  • */

  • def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {

  • reduceByKey(defaultPartitioner(self), func)

  • }

  • ?

    (3)partitionBy

    partitionBy函數對RDD進行分區操作。?
    如果原有RDD的分區器和現有分區器(partitioner)一致,則不重分區,如果不一致,則相當于根據分區器生成一個新的ShuffledRDD。?

    方框代表RDD分區。 通過新的分區策略將原來在不同分區的V1、 V2數據都合并到了一個分區。

    源碼:

    ?

    ?
  • /**

  • * Return a copy of the RDD partitioned using the specified partitioner.

  • */

  • def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {

  • if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {

  • throw new SparkException("Default partitioner cannot partition array keys.")

  • }

  • if (self.partitioner == Some(partitioner)) {

  • self

  • } else {

  • new ShuffledRDD[K, V, V](self, partitioner)

  • }

  • }

  • ?

    (4)cogroup

    cogroup函數將兩個RDD進行協同劃分。對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分別聚合為一個集合,并且返回兩個RDD中對應Key的元素集合的迭代器(K, (Iterable[V], Iterable[w]))。其中,Key和Value,Value是兩個RDD下相同Key的兩個數據集合的迭代器所構成的元組。

    大方框代表RDD,大方框內的小方框代表RDD中的分區。 將RDD1中的數據(U1,1)、(U1,2)和RDD2中的數據(U1,2)合并為(U1,((1,2),(2)))。

    源碼:

    ?

    ?
  • /**

  • * For each key k in `this` or `other1` or `other2` or `other3`,

  • * return a resulting RDD that contains a tuple with the list of values

  • * for that key in `this`, `other1`, `other2` and `other3`.

  • */

  • def cogroup[W1, W2, W3](other1: RDD[(K, W1)],

  • other2: RDD[(K, W2)],

  • other3: RDD[(K, W3)],

  • partitioner: Partitioner)

  • : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {

  • if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {

  • throw new SparkException("Default partitioner cannot partition array keys.")

  • }

  • val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)

  • cg.mapValues { case Array(vs, w1s, w2s, w3s) =>

  • (vs.asInstanceOf[Iterable[V]],

  • w1s.asInstanceOf[Iterable[W1]],

  • w2s.asInstanceOf[Iterable[W2]],

  • w3s.asInstanceOf[Iterable[W3]])

  • }

  • }

  • ?
  • /**

  • * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the

  • * list of values for that key in `this` as well as `other`.

  • */

  • def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)

  • : RDD[(K, (Iterable[V], Iterable[W]))] = {

  • if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {

  • throw new SparkException("Default partitioner cannot partition array keys.")

  • }

  • val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)

  • cg.mapValues { case Array(vs, w1s) =>

  • (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])

  • }

  • }

  • ?
  • /**

  • * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a

  • * tuple with the list of values for that key in `this`, `other1` and `other2`.

  • */

  • def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)

  • : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {

  • if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {

  • throw new SparkException("Default partitioner cannot partition array keys.")

  • }

  • val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)

  • cg.mapValues { case Array(vs, w1s, w2s) =>

  • (vs.asInstanceOf[Iterable[V]],

  • w1s.asInstanceOf[Iterable[W1]],

  • w2s.asInstanceOf[Iterable[W2]])

  • }

  • }

  • ?
  • /**

  • * For each key k in `this` or `other1` or `other2` or `other3`,

  • * return a resulting RDD that contains a tuple with the list of values

  • * for that key in `this`, `other1`, `other2` and `other3`.

  • */

  • def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])

  • : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {

  • cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))

  • }

  • ?
  • /**

  • * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the

  • * list of values for that key in `this` as well as `other`.

  • */

  • def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {

  • cogroup(other, defaultPartitioner(self, other))

  • }

  • ?
  • /**

  • * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a

  • * tuple with the list of values for that key in `this`, `other1` and `other2`.

  • */

  • def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])

  • : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {

  • cogroup(other1, other2, defaultPartitioner(self, other1, other2))

  • }

  • ?
  • /**

  • * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the

  • * list of values for that key in `this` as well as `other`.

  • */

  • def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {

  • cogroup(other, new HashPartitioner(numPartitions))

  • }

  • ?
  • /**

  • * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a

  • * tuple with the list of values for that key in `this`, `other1` and `other2`.

  • */

  • def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)

  • : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {

  • cogroup(other1, other2, new HashPartitioner(numPartitions))

  • }

  • ?
  • /**

  • * For each key k in `this` or `other1` or `other2` or `other3`,

  • * return a resulting RDD that contains a tuple with the list of values

  • * for that key in `this`, `other1`, `other2` and `other3`.

  • */

  • def cogroup[W1, W2, W3](other1: RDD[(K, W1)],

  • other2: RDD[(K, W2)],

  • other3: RDD[(K, W3)],

  • numPartitions: Int)

  • : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {

  • cogroup(other1, other2, other3, new HashPartitioner(numPartitions))

  • }

  • ?

    連接

    (1)join

    join對兩個需要連接的RDD進行cogroup函數操作。cogroup操作之后形成的新RDD,對每個key下的元素進行笛卡爾積操作,返回的結果再展平,對應Key下的所有元組形成一個集合,最后返回RDD[(K,(V,W))]。
    join的本質是通過cogroup算子先進行協同劃分,再通過flatMapValues將合并的數據打散。?

    對兩個RDD的join操作示意圖。 大方框代表RDD,小方框代表RDD中的分區。函數對擁有相同Key的元素(例如V1)為Key,以做連接后的數據結果為(V1,(1,1))和(V1,(1,2))。

    源碼:

    ?

    ?
  • /**

  • * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each

  • * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and

  • * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.

  • */

  • def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {

  • this.cogroup(other, partitioner).flatMapValues( pair =>

  • for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)

  • )

  • }

  • ?

    (2)leftOuterJoin和rightOuterJoin

    LeftOuterJoin(左外連接)和RightOuterJoin(右外連接)相當于在join的基礎上先判斷一側的RDD元素是否為空,如果為空,則填充為空。 如果不為空,則將數據進行連接運算,并返回結果。

    源碼:

    ?

    ?
  • /**

  • * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the

  • * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the

  • * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to

  • * partition the output RDD.

  • */

  • def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {

  • this.cogroup(other, partitioner).flatMapValues { pair =>

  • if (pair._2.isEmpty) {

  • pair._1.iterator.map(v => (v, None))

  • } else {

  • for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))

  • }

  • }

  • }

  • ?
  • /**

  • * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the

  • * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the

  • * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to

  • * partition the output RDD.

  • */

  • def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)

  • : RDD[(K, (Option[V], W))] = {

  • this.cogroup(other, partitioner).flatMapValues { pair =>

  • if (pair._1.isEmpty) {

  • pair._2.iterator.map(w => (None, w))

  • } else {

  • for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)

  • }

  • }

  • }

  • 原文鏈接:http://blog.csdn.net/jasonding1354

    總結

    以上是生活随笔為你收集整理的Spark RDD使用详解4--Key-Value型Transformation算子的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。