日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark RDD使用详解5--Action算子

發布時間:2024/1/17 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark RDD使用详解5--Action算子 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本質上在Actions算子中通過SparkContext執行提交作業的runJob操作,觸發了RDD DAG的執行。?
根據Action算子的輸出空間將Action算子進行分類:無輸出、 HDFS、 Scala集合和數據類型。

無輸出

foreach

對RDD中的每個元素都應用f函數操作,不返回RDD和Array,而是返回Uint。?

圖中,foreach算子通過用戶自定義函數對每個數據項進行操作。 本例中自定義函數為println,控制臺打印所有數據項。

源碼:

?

?
  • /**

  • * Applies a function f to all elements of this RDD.

  • */

  • def foreach(f: T => Unit) {

  • val cleanF = sc.clean(f)

  • sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))

  • }

  • ?

    HDFS

    (1)saveAsTextFile

    函數將數據輸出,存儲到HDFS的指定目錄。將RDD中的每個元素映射轉變為(Null,x.toString),然后再將其寫入HDFS。?

    圖中,左側的方框代表RDD分區,右側方框代表HDFS的Block。 通過函數將RDD的每個分區存儲為HDFS中的一個Block。

    源碼:

    ?

    ?
  • /**

  • * Save this RDD as a text file, using string representations of elements.

  • */

  • def saveAsTextFile(path: String) {

  • // https://issues.apache.org/jira/browse/SPARK-2075

  • //

  • // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit

  • // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`

  • // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an

  • // Ordering for `NullWritable`. That's why the compiler will generate different anonymous

  • // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.

  • //

  • // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate

  • // same bytecodes for `saveAsTextFile`.

  • val nullWritableClassTag = implicitly[ClassTag[NullWritable]]

  • val textClassTag = implicitly[ClassTag[Text]]

  • val r = this.mapPartitions { iter =>

  • val text = new Text()

  • iter.map { x =>

  • text.set(x.toString)

  • (NullWritable.get(), text)

  • }

  • }

  • RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

  • .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

  • }

  • ?
  • /**

  • * Save this RDD as a compressed text file, using string representations of elements.

  • */

  • def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {

  • // https://issues.apache.org/jira/browse/SPARK-2075

  • val nullWritableClassTag = implicitly[ClassTag[NullWritable]]

  • val textClassTag = implicitly[ClassTag[Text]]

  • val r = this.mapPartitions { iter =>

  • val text = new Text()

  • iter.map { x =>

  • text.set(x.toString)

  • (NullWritable.get(), text)

  • }

  • }

  • RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

  • .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)

  • }

  • ?

    (2)saveAsObjectFile

    saveAsObjectFile將分區中的每10個元素組成一個Array,然后將這個Array序列化,映射為(Null,BytesWritable(Y))的元素,寫入HDFS為SequenceFile的格式。

    圖中,左側方框代表RDD分區,右側方框代表HDFS的Block。 通過函數將RDD的每個分區存儲為HDFS上的一個Block。

    源碼:

    ?

    ?
  • /**

  • * Save this RDD as a SequenceFile of serialized objects.

  • */

  • def saveAsObjectFile(path: String) {

  • this.mapPartitions(iter => iter.grouped(10).map(_.toArray))

  • .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))

  • .saveAsSequenceFile(path)

  • }

  • ?

    Scala集合和數據類型

    (1)collect

    collect相當于toArray,toArray已經過時不推薦使用,collect將分布式的RDD返回為一個單機的scala Array數組。 在這個數組上運用scala的函數式操作。

    圖中,左側方框代表RDD分區,右側方框代表單機內存中的數組。通過函數操作,將結果返回到Driver程序所在的節點,以數組形式存儲。

    源碼:

    ?

    ?
  • /**

  • * Return an array that contains all of the elements in this RDD.

  • */

  • def collect(): Array[T] = {

  • val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

  • Array.concat(results: _*)

  • }

  • ?

    (2)collectAsMap

    collectAsMap對(K,V)型的RDD數據返回一個單機HashMap。對于重復K的RDD元素,后面的元素覆蓋前面的元素。?

    圖中,左側方框代表RDD分區,右側方框代表單機數組。數據通過collectAsMap函數返回給Driver程序計算結果,結果以HashMap形式存儲。

    源碼:

    ?

    ?
  • /**

  • * Return the key-value pairs in this RDD to the master as a Map.

  • *

  • * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only

  • * one value per key is preserved in the map returned)

  • */

  • def collectAsMap(): Map[K, V] = {

  • val data = self.collect()

  • val map = new mutable.HashMap[K, V]

  • map.sizeHint(data.length)

  • data.foreach { pair => map.put(pair._1, pair._2) }

  • map

  • }

  • ?

    (3)reduceByKeyLocally

    實現的是先reduce再collectAsMap的功能,先對RDD的整體進行reduce操作,然后再收集所有結果返回為一個HashMap。

    源碼:

    ?

    ?
  • /**

  • * Merge the values for each key using an associative reduce function, but return the results

  • * immediately to the master as a Map. This will also perform the merging locally on each mapper

  • * before sending results to a reducer, similarly to a "combiner" in MapReduce.

  • */

  • def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {

  • ?
  • if (keyClass.isArray) {

  • throw new SparkException("reduceByKeyLocally() does not support array keys")

  • }

  • ?
  • val reducePartition = (iter: Iterator[(K, V)]) => {

  • val map = new JHashMap[K, V]

  • iter.foreach { pair =>

  • val old = map.get(pair._1)

  • map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))

  • }

  • Iterator(map)

  • } : Iterator[JHashMap[K, V]]

  • ?
  • val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {

  • m2.foreach { pair =>

  • val old = m1.get(pair._1)

  • m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))

  • }

  • m1

  • } : JHashMap[K, V]

  • ?
  • self.mapPartitions(reducePartition).reduce(mergeMaps)

  • }

  • ?

    (4)lookup

    Lookup函數對(Key,Value)型的RDD操作,返回指定Key對應的元素形成的Seq。這個函數處理優化的部分在于,如果這個RDD包含分區器,則只會對應處理K所在的分區,然后返回由(K,V)形成的Seq。如果RDD不包含分區器,則需要對全RDD元素進行暴力掃描處理,搜索指定K對應的元素。

    圖中,左側方框代表RDD分區,右側方框代表Seq,最后結果返回到Driver所在節點的應用中。

    源碼:

    ?

    ?
  • /**

  • * Return the list of values in the RDD for key `key`. This operation is done efficiently if the

  • * RDD has a known partitioner by only searching the partition that the key maps to.

  • */

  • def lookup(key: K): Seq[V] = {

  • self.partitioner match {

  • case Some(p) =>

  • val index = p.getPartition(key)

  • val process = (it: Iterator[(K, V)]) => {

  • val buf = new ArrayBuffer[V]

  • for (pair <- it if pair._1 == key) {

  • buf += pair._2

  • }

  • buf

  • } : Seq[V]

  • val res = self.context.runJob(self, process, Array(index), false)

  • res(0)

  • case None =>

  • self.filter(_._1 == key).map(_._2).collect()

  • }

  • }

  • ?

    (5)count

    count返回整個RDD的元素個數。?

    圖中,返回數據的個數為5。一個方塊代表一個RDD分區。

    源碼:

    ?

    ?
  • /**

  • * Return the number of elements in the RDD.

  • */

  • def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

  • ?

    (6)top

    top可返回最大的k個元素。?
    相近函數說明:

    • top返回最大的k個元素。
    • take返回最小的k個元素。
    • takeOrdered返回最小的k個元素, 并且在返回的數組中保持元素的順序。
    • first相當于top( 1) 返回整個RDD中的前k個元素, 可以定義排序的方式Ordering[T]。返回的是一個含前k個元素的數組。

    源碼:

    ?

    ?
  • /**

  • * Returns the top k (largest) elements from this RDD as defined by the specified

  • * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:

  • * {{{

  • * sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)

  • * // returns Array(12)

  • *

  • * sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)

  • * // returns Array(6, 5)

  • * }}}

  • *

  • * @param num k, the number of top elements to return

  • * @param ord the implicit ordering for T

  • * @return an array of top elements

  • */

  • def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

  • ?

    (7)reduce

    reduce函數相當于對RDD中的元素進行reduceLeft函數的操作。?
    reduceLeft先對兩個元素

    ?

    ?
  • /**

  • * Reduces the elements of this RDD using the specified commutative and

  • * associative binary operator.

  • */

  • def reduce(f: (T, T) => T): T = {

  • val cleanF = sc.clean(f)

  • val reducePartition: Iterator[T] => Option[T] = iter => {

  • if (iter.hasNext) {

  • Some(iter.reduceLeft(cleanF))

  • } else {

  • None

  • }

  • }

  • var jobResult: Option[T] = None

  • val mergeResult = (index: Int, taskResult: Option[T]) => {

  • if (taskResult.isDefined) {

  • jobResult = jobResult match {

  • case Some(value) => Some(f(value, taskResult.get))

  • case None => taskResult

  • }

  • }

  • }

  • sc.runJob(this, reducePartition, mergeResult)

  • // Get the final result out of our Option, or throw an exception if the RDD was empty

  • jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))

  • }

  • ?

    (8)fold

    fold和reduce的原理相同,但是與reduce不同,相當于每個reduce時,迭代器取的第一個元素是zeroValue。?

    圖中,通過用戶自定義函數進行fold運算,圖中的一個方框代表一個RDD分區。

    源碼:

    ?

    ?
  • /**

  • * Aggregate the elements of each partition, and then the results for all the partitions, using a

  • * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to

  • * modify t1 and return it as its result value to avoid object allocation; however, it should not

  • * modify t2.

  • */

  • def fold(zeroValue: T)(op: (T, T) => T): T = {

  • // Clone the zero value since we will also be serializing it as part of tasks

  • var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())

  • val cleanOp = sc.clean(op)

  • val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)

  • val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)

  • sc.runJob(this, foldPartition, mergeResult)

  • jobResult

  • }

  • ?

    (9)aggregate

    aggregate先對每個分區的所有元素進行aggregate操作,再對分區的結果進行fold操作。?
    aggreagate與fold和reduce的不同之處在于,aggregate相當于采用歸并的方式進行數據聚集,這種聚集是并行化的。 而在fold和reduce函數的運算過程中,每個分區中需要進行串行處理,每個分區串行計算完結果,結果再按之前的方式進行聚集,并返回最終聚集結果。

    圖中,通過用戶自定義函數對RDD 進行aggregate的聚集操作,圖中的每個方框代表一個RDD分區。

    源碼:

    ?

    ?
  • /**

  • * Aggregate the elements of each partition, and then the results for all the partitions, using

  • * given combine functions and a neutral "zero value". This function can return a different result

  • * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U

  • * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are

  • * allowed to modify and return their first argument instead of creating a new U to avoid memory

  • * allocation.

  • */

  • def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {

  • // Clone the zero value since we will also be serializing it as part of tasks

  • var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())

  • val cleanSeqOp = sc.clean(seqOp)

  • val cleanCombOp = sc.clean(combOp)

  • val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

  • val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)

  • sc.runJob(this, aggregatePartition, mergeResult)

  • jobResult

  • }

  • 原文鏈接:http://blog.csdn.net/jasonding1354

    總結

    以上是生活随笔為你收集整理的Spark RDD使用详解5--Action算子的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。