日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量

發布時間:2023/11/28 生活经验 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

共享變量

廣播變量

累加器

???????案例演示


共享變量

在默認情況下,當Spark在集群的多個不同節點的多個任務上并行運行一個函數時,它會把函數中涉及到的每個變量,在每個任務上都生成一個副本。但是,有時候需要在多個任務之間共享變量,或者在任務(Task)和任務控制節點(Driver Program)之間共享變量。

為了滿足這種需求,Spark提供了兩種類型的變量:

?1)、廣播變量Broadcast Variables

廣播變量用來把變量在所有節點的內存之間進行共享,在每個機器上緩存一個只讀的變量,而不是為機器上的每個任務都生成一個副本;

??2)、累加器Accumulators

累加器支持在所有不同節點之間進行累加計算(比如計數或者求和);

官方文檔:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#shared-variables

?

???????廣播變量

廣播變量允許開發人員在每個節點(Worker or Executor)緩存只讀變量,而不是在Task之間傳遞這些變量。使用廣播變量能夠高效地在集群每個節點創建大數據集的副本。同時Spark還使用高效的廣播算法分發這些變量,從而減少通信的開銷。

?

可以通過調用sc.broadcast(v)創建一個廣播變量,該廣播變量的值封裝在v變量中,可使用獲取該變量value的方法進行訪問。

?

?

???????累加器

Spark提供的Accumulator,主要用于多個節點對一個變量進行共享性的操作。Accumulator只提供了累加的功能,即確提供了多個task對一個變量并行操作的功能。但是task只能對Accumulator進行累加操作,不能讀取Accumulator的值,只有Driver程序可以讀取Accumulator的值。創建的Accumulator變量的值能夠在Spark Web UI上看到,在創建時應該盡量為其命名。

?

Spark內置了三種類型的Accumulator,分別是LongAccumulator用來累加整數型,DoubleAccumulator用來累加浮點型,CollectionAccumulator用來累加集合元素

當內置的Accumulator無法滿足要求時,可以繼承AccumulatorV2實現自定義的累加器。實現自定義累加器的步驟:

?第一步、繼承AccumulatorV2,實現相關方法;

?第二步、創建自定義Accumulator的實例,然后在SparkContext上注冊它;

官方提供實例如下:

?

???????案例演示

?????以詞頻統計WordCount程序為例,假設處理的數據如下所示,包括非單詞符合,統計數據詞頻時過濾非單詞的特殊符號并且統計總的格式。

?

實現功能:

?第一、過濾特殊字符

非單詞符合存儲列表List中

使用廣播變量廣播列表

?

?

?

?

?第二、累計統計非單詞符號出現次數

定義一個LongAccumulator累加器,進行計數

示例代碼:

?

package cn.itcast.coreimport org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}/*** 基于Spark框架使用Scala語言編程實現詞頻統計WordCount程序,將符號數據過濾,并統計出現的次數* -a. 過濾標點符號數據* 使用廣播變量* -b. 統計出標點符號數據出現次數* 使用累加器*/
object SparkSharedVariableTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 讀取文件數據val datasRDD: RDD[String] = sc.textFile("data/input/words2.txt", minPartitions = 2)// 字典數據,只要有這些單詞就過濾: 特殊字符存儲列表List中val list: List[String] = List(",", ".", "!", "#", "$", "%")// 通過廣播變量 將列表list廣播到各個Executor內存中,便于多個Task使用val listBroadcast: Broadcast[List[String]] = sc.broadcast(list)// 定義累加器,記錄單詞為符號數據的個數val accumulator: LongAccumulator = sc.longAccumulator("mycounter")// 分割單詞,過濾數據val wordsRDD = datasRDD// 1)、過濾數據,去除空行數據.filter(line => line != null && line.trim.length > 0)// 2)、分割單詞.flatMap(_.trim.split("\\s+"))// 3)、過濾字典數據:符號數據.filter(word => {// 獲取符合列表?,從廣播變量中獲取列表list的值val listValue = listBroadcast.value// 判斷單詞是否為符號數據,如果是就過濾掉val isCharacter = listValue.contains(word)if (isCharacter) {// 如果單詞為符號數據,累加器加1accumulator.add(1L)}!isCharacter})val resultRDD: RDD[(String, Int)] = wordsRDD// 轉換為二元組.mapPartitions(iter => {iter.map((_, 1))})// 按照單詞聚合統計.reduceByKey(_+_)resultRDD.foreach(println)println(s"過濾符合數據的個數:${accumulator.value}")// 應用程序運行結束,關閉資源sc.stop()}
}

也可以通過WEB UI查看累加器的值

總結

以上是生活随笔為你收集整理的2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。