日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(十五):Spark Core的RDD常用算子

發布時間:2023/11/28 生活经验 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(十五):Spark Core的RDD常用算子 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

常用算子

基本算子

分區操作函數算子

重分區函數算子

?1)、增加分區函數

?2)、減少分區函數

?3)、調整分區函數

???????聚合函數算子

???????Scala集合中的聚合函數

???????RDD中的聚合函數

?????????????PairRDDFunctions聚合函數

面試題:groupByKey和reduceByKey

關聯函數

排序函數-求TopKey


常用算子

RDD中包含很多函數,主要可以分為兩類:Transformation轉換函數和Action函數

?

主要常見使用函數如下,一一通過演示范例講解。

?

?

基本算子

RDD中map、filter、flatMap及foreach等函數為最基本函數,都是都RDD中每個元素進行操作,將元素傳遞到函數中進行轉換。

  1. ?map?算子:
    1. map(f:T=>U) : RDD[T]=>RDD[U],表示將 RDD 經由某一函數 f 后,轉變為另一個RDD。
  1. ?flatMap?算子:
    1. flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U]),表示將 RDD 經由某一函數 f 后,轉變為一個新的 RDD,但是與 map 不同,RDD 中的每一個元素會被映射成新的 0 到多個元素(f 函數返回的是一個序列 Seq)。
  1. ?filter 算子:
    1. filter(f:T=>Bool) : RDD[T]=>RDD[T],表示將 RDD 經由某一函數 f 后,只保留 f 返回為 true 的數據,組成新的 RDD。
  2. ?foreach 算子:
    1. foreach(func),將函數 func 應用在數據集的每一個元素上,通常用于更新一個累加器,或者和外部存儲系統進行交互,例如 Redis。關于 foreach,在后續章節中還會使用,到時會詳細介紹它的使用方法及注意事項。
  1. ?saveAsTextFile 算子:
    1. saveAsTextFile(path:String),數據集內部的元素會調用其 toString 方法,轉換為字符串形式,然后根據傳入的路徑保存成文本文件,既可以是本地文件系統,也可以是HDFS 等。

?

???????分區操作函數算子

每個RDD由多分區組成的,實際開發建議對每個分區數據的進行操作,map函數使用mapPartitions代替、foreache函數使用foreachPartition代替。

?

針對詞頻統計WordCount代碼進行修改,針對分區數據操作,示例代碼如下:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}/*** 分區操作函數:mapPartitions和foreachPartition*/
object SparkIterTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 1、從文件系統加載數據,創建RDD數據集val inputRDD: RDD[String] = sc.textFile("data/input/words.txt", minPartitions = 2)// 2、處理數據,調用RDD集合中函數(類比于Scala集合類中列表List)/*def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]*/val wordcountsRDD: RDD[(String, Int)] = inputRDD// 將每行數據按照分隔符進行分割,將數據扁平化.flatMap(_.trim.split("\\s+"))// 針對每個分區數據操作.mapPartitions{ iter =>// iter 表示RDD中每個分區中的數據,存儲在迭代器中,相當于列表Listiter.map((_, 1))}// 按照Key聚合統計, 先按照Key分組,再聚合統計(此函數局部聚合,再進行全局聚合).reduceByKey(_+_)// 3、輸出結果RDD到本地文件系統wordcountsRDD.foreachPartition{ iter =>// 獲取各個分區IDval partitionId: Int = TaskContext.getPartitionId()// val xx: Iterator[(String, Int)] = datasiter.foreach{ case (word, count) =>println(s"p-${partitionId}: word = $word, count = $count")}}// 應用程序運行結束,關閉資源sc.stop()}
}

?

為什么要對分區操作,而不是對每個數據操作,好處在哪里呢???

  1. ?應用場景:處理網站日志數據,數據量為10GB,統計各個省份PV和UV
    1. 假設10GB日志數據,從HDFS上讀取的,此時RDD的分區數目:80 分區;
    2. 但是分析PV和UV有多少條數據:34,存儲在80個分區中,實際項目中降低分區數目,比如設置為2個分區。

      ?

?

???????重分區函數算子

如何對RDD中分區數目進行調整(增加分區或減少分區),在RDD函數中主要有如下三個函數。

?1)、增加分區函數

函數名稱:repartition,此函數使用的謹慎,會產生Shuffle。

?

?

??注意: repartition底層調用coalesce(numPartitions, shuffle=true)

?2)、減少分區函數

函數名稱:coalesce,shuffle參數默認為false,不會產生Shuffle,默認只能減少分區

比如RDD的分區數目為10個分區,此時調用rdd.coalesce(12),不會對RDD進行任何操作

?

?

?3)、調整分區函數

在PairRDDFunctions中partitionBy函數:

?

?

import org.apache.spark.Partitioner/*** 自定義分區器,實現RDD分區,在進行Shuffle過程中*/class MyPartitioner extends Partitioner{// 確定分區數目override def numPartitions: Int = 3// 依據Key,確定所屬分區,返回值:0,...,2override def getPartition(key: Any): Int = {// 獲取每個單詞第一個字符val firstChar: Int = key.asInstanceOf[String].charAt(0).toIntif(firstChar >= 97 && firstChar <= 122){0 ?// 小寫字母開頭單詞,在第一個分區}else if(firstChar >= 65 && firstChar <= 90){1 // 大寫字母開頭單詞,在第二個分區}else{2 // 非大小字母開頭單詞,在第三個分區}}}

?

范例演示代碼,適當使用函數調整RDD分區數目:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** RDD中分區函數,調整RDD分區數目,可以增加分區和減少分區*/
object SparkPartitionTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 讀取本地文件系統文本文件數據val datasRDD: RDD[String] = sc.textFile("data/input/words.txt", minPartitions = 2)// 增加RDD分區數val etlRDD: RDD[String] = datasRDD.repartition(3)println(s"EtlRDD 分區數目?= ${etlRDD.getNumPartitions}")// 詞頻統計val resultRDD: RDD[(String, Int)] = etlRDD// 數據分析,考慮過濾臟數據.filter(line => null != line && line.trim.length > 0)// 分割單詞,注意去除左右空格.flatMap(line => line.trim.split("\\s+"))// 轉換為二元組,表示單詞出現一次.mapPartitions{iter =>iter.map((_, 1))}// 分組聚合,按照Key單詞.reduceByKey(_+_)//resultRDD.partitionBy(傳入自定義分區器)// 輸出結果RDDresultRDD// 對結果RDD降低分區數目.coalesce(1).foreachPartition(iter => iter.foreach(println))// 應用程序運行結束,關閉資源sc.stop()}
}

?

在實際開發中,什么時候適當調整RDD的分區數目呢?讓程序性能更好好呢????

?第一點:增加分區數目

當處理的數據很多的時候,可以考慮增加RDD的分區數

?

?

?第二點:減少分區數目

其一:當對RDD數據進行過濾操作(filter函數)后,考慮是否降低RDD分區數目

?

?

其二:當對結果RDD存儲到外部系統

?

?

???????聚合函數算子

在數據分析領域中,對數據聚合操作是最為關鍵的,在Spark框架中各個模塊使用時,主要就是其中聚合函數的使用。

?

???????Scala集合中的聚合函數

回顧列表List中reduce聚合函數核心概念:聚合的時候,往往需要聚合中間臨時變量。查看列表List中聚合函數reduce和fold源碼如下:

?

通過代碼,看看列表List中聚合函數使用:

?

運行截圖如下所示:

?

fold聚合函數,比reduce聚合函數,多提供一個可以初始化聚合中間臨時變量的值參數:

?

聚合操作時,往往聚合過程中需要中間臨時變量(到底時幾個變量,具體業務而定),如下案例:

?

?

???????RDD中的聚合函數

在RDD中提供類似列表List中聚合函數reduce和fold,查看如下:

?

案例演示:求列表List中元素之和,RDD中分區數目為2,核心業務代碼如下:

?

?

運行原理分析:

?

?

使用RDD中fold聚合函數:

?

?

查看RDD中高級聚合函數aggregate,函數聲明如下:

?

seqOp函數的第一個參數是累加器,第一次執行時,會把zeroValue賦給累加器。第一次之后會把返回值賦給累加器,作為下一次運算的第一個參數。

seqOP函數每個分區下的每個key有個累加器,combOp函數全部分區有幾個key就有幾個累加器。如果某個key只存在于一個分區下,不會對他執行combOp函數

?

業務需求:使用aggregate函數實現RDD中最大的兩個數據,分析如下:

?

核心業務代碼如下:

?

運行結果原理剖析示意圖:

?

上述完整范例演示代碼:


package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}import scala.collection.mutable
import scala.collection.mutable.ListBuffer/*** RDD中聚合函數:reduce、aggregate函數*/
object SparkAggTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 模擬數據,1 到?10 的列表,通過并行方式創建RDDval datas = 1 to 10val datasRDD: RDD[Int] = sc.parallelize(datas, numSlices = 2)// 查看每個分區中的數據datasRDD.foreachPartition { iter =>println(s"p-${TaskContext.getPartitionId()}: ${iter.mkString(", ")}")}println("==================使用reduce函數聚合=======================")// 使用reduce函數聚合val result: Int = datasRDD.reduce((tmp, item) => {println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")tmp + item})println(result)println("==================使用fold函數聚合=======================")// 使用fold函數聚合val result2: Int = datasRDD.fold(0)((tmp, item) => {println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")tmp + item})println(result2)println("================使用aggregate函數獲取最大的兩個值=========================")// 使用aggregate函數獲取最大的兩個值val top2: mutable.Seq[Int] = datasRDD.aggregate(new ListBuffer[Int]())(// 分區內聚合函數,每個分區內數據如何聚合??seqOp: (U, T) => U,(u, t) => {println(s"p-${TaskContext.getPartitionId()}: u = $u, t = $t")// 將元素加入到列表中u += t //// 降序排序val top = u.sorted.takeRight(2)// 返回top},// 分區間聚合函數,每個分區聚合的結果如何聚合?combOp: (U, U) => U(u1, u2) => {println(s"p-${TaskContext.getPartitionId()}: u1 = $u1, u2 = $u2")u1 ++= u2 // 將列表的數據合并,到u1中//u1.sorted.takeRight(2)})println(top2)// 應用程序運行結束,關閉資源sc.stop()}
}

?

?

?

?????????????PairRDDFunctions聚合函數

在Spark中有一個object對象PairRDDFunctions,主要針對RDD的數據類型是Key/Value對的數據提供函數,方便數據分析處理。比如使用過的函數:reduceByKey、groupByKey等。*ByKey函數將相同Key的Value進行聚合操作的,省去先分組再聚合

?

?第一類:分組函數groupByKey

?

?

?

?第二類:分組聚合函數reduceByKeyfoldByKey

?

?

但是reduceByKey和foldByKey聚合以后的結果數據類型與RDD中Value的數據類型是一樣的。

?

?第三類:分組聚合函數aggregateByKey

?

?

在企業中如果對數據聚合使用,不能使用reduceByKey完成時,考慮使用aggregateByKey函數,基本上都能完成任意聚合功能。

演示范例代碼如下:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** RDD中聚合函數,針對RDD中數據類型Key/Value對:* ?????groupByKey* ?????reduceByKey/foldByKey* ?????aggregateByKey*/
object SparkAggByKeyTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 1、并行化集合創建RDD數據集val linesSeq: Seq[String] = Seq("hello me you her","hello you her","hello her","hello")val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)// 2、分割單詞,轉換為二元組val wordsRDD: RDD[(String, Int)] = inputRDD.flatMap(_.split("\\s+")).map((_,1))println("==使用groupByKey函數分組,再使用mapValues函數聚合==")val wordsGroupRDD: RDD[(String, Iterable[Int])] = wordsRDD.groupByKey()val resultRDD: RDD[(String, Int)] = wordsGroupRDD.mapValues(_.sum)println(resultRDD.collectAsMap())println("==使用reduceByKey或foldByKey分組聚合==")val resultRDD2: RDD[(String, Int)] = wordsRDD.reduceByKey(_+_)println(resultRDD2.collectAsMap())val resultRDD3 = wordsRDD.foldByKey(0)(_+_)println(resultRDD3.collectAsMap())println("==使用aggregateByKey聚合==")/*def aggregateByKey[U: ClassTag](zeroValue: U) // 聚合中間臨時變量初始值,類似fold函數zeroValue(seqOp: (U, V) => U, // 各個分區內數據聚合操作函數combOp: (U, U) => U // 分區間聚合結果的聚合操作函數): RDD[(K, U)]*/val resultRDD4 = wordsRDD.aggregateByKey(0)((tmp: Int, item: Int) => {tmp + item},(tmp: Int, result: Int) => {tmp + result})println(resultRDD4.collectAsMap())// 應用程序運行結束,關閉資源sc.stop()}}

?

面試題:groupByKey和reduceByKey

RDD中groupByKey和reduceByKey區別???

?groupByKey函數:在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的函數,將相同key的值聚合到一起。

?reduceByKey函數:在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,reduce任務的個數可以通過第二個可選的參數來設置。有預聚合

?

?

?

關聯函數

????當兩個RDD的數據類型為二元組Key/Value對時,可以依據Key進行關聯Join。

?

首先回顧一下SQL JOIN,用Venn圖表示如下:

?

RDD中關聯JOIN函數都在PairRDDFunctions中,具體截圖如下:

?

具體看一下join(等值連接)函數說明:

?

范例演示代碼:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** RDD中關聯函數Join,針對RDD中數據類型為Key/Value對*/
object SparkJoinTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 模擬數據集val empRDD: RDD[(Int, String)] = sc.parallelize(Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")))val deptRDD: RDD[(Int, String)] = sc.parallelize(Seq((1001, "sales"), (1002, "tech")))/*def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]*/val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)println(joinRDD.collectAsMap())/*def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]*/val leftJoinRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)println(leftJoinRDD.collectAsMap())// 應用程序運行結束,關閉資源sc.stop()}
}

?

排序函數-求TopKey

在上述詞頻統計WordCount代碼基礎上,對統計出的每個單詞的詞頻Count,按照降序排序,獲取詞頻次數最多Top3單詞

RDD中關于排序函數有如下三個:

?1)、sortByKey:針對RDD中數據類型key/value對時,按照Key進行排序

2)、sortBy:針對RDD中數據指定排序規則

?

3)、top:按照RDD中數據采用降序方式排序,如果是Key/Value對,按照Key降序排序

?

具體演示代碼如下,注意慎用top函數。

?

package cn.itcast.helloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 獲取詞頻最高三個單詞*/
object WordCountTopKey {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")val inputRDD: RDD[String] = sc.textFile("data/input/words.txt")val wordsRDD = inputRDD.flatMap(line => line.split("\\s+"))val tuplesRDD: RDD[(String, Int)] = wordsRDD.map((_, 1))val wordCountsRDD: RDD[(String, Int)] = tuplesRDD.reduceByKey(_+_)wordCountsRDD.foreach(println)// 按照詞頻count降序排序獲取前3個單詞, 有三種方式println("======================== sortByKey =========================")// 方式一:按照Key排序sortByKey函數,/*def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.length): RDD[(K, V)]*/wordCountsRDD.map(tuple => tuple.swap) //.map(tuple => (tuple._2, tuple._1)).sortByKey(ascending = false)//逆序.take(3).foreach(println)println("======================== sortBy =========================")// 方式二:sortBy函數, 底層調用sortByKey函數/*def sortBy[K](f: (T) => K, // T 表示RDD集合中數據類型,此處為二元組ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]*/wordCountsRDD.sortBy(tuple => tuple._2, ascending = false).take(3).foreach(println)println("======================== top =========================")// 方式三:top函數,含義獲取最大值,傳遞排序規則, 慎用/*def top(num: Int)(implicit ord: Ordering[T]): Array[T]*/wordCountsRDD.top(3)(Ordering.by(_._2)).foreach(println)sc.stop()}
}

?

總結

以上是生活随笔為你收集整理的2021年大数据Spark(十五):Spark Core的RDD常用算子的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。