日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)

發布時間:2023/12/10 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

看longAccumulator()方法源碼里是val acc = new LongAccumulator然后用register(acc)在Spark中注冊了累加器,進入ctrl+鼠標左鍵進入LongAccumulator,可以看到繼承了AccumulatorV2[jl.Long, jl.Long],根據LongAccumulator來實現自定義累加器

實現類
//1.繼承父類AccumulatorV2[IN,OUT](IN,OUT是Driver發到Executor的類型與Executor返回給Driver的類型) //2.實現抽象方法 //3.創建累加器 class WordAccumulator extends AccumulatorV2[String,util.ArrayList[String]] {val list = new util.ArrayList[String]()//當前的累加器是不是初始化狀態(這里是判斷創建的集合是不是空)override def isZero: Boolean = {list.isEmpty}//復制累加器對象override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {new WordAccumulator}//重置累加器對象(這里把集合清空即可)override def reset(): Unit = {list.clear()}//向累加器中增加數據override def add(v: String): Unit = {if (v.contains("h")){list.add(v)}}//合并累加器(不同executor返回會有個合并的過程)override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {list.addAll(other.value)}//獲取累加器的結果override def value: util.ArrayList[String] = list }
然后是main函數
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CheckPoint").setMaster("local")//創建上下文對象val sc = new SparkContext(conf)val dataRDD:RDD[String] = sc.makeRDD(List("chun1","chun2","chun3","chun4"),2)// TODO 創建累加器val wordAccumulator = new WordAccumulator()// TODO 注冊累加器sc.register(wordAccumulator)dataRDD.foreach{case word=>{//TODO 執行累加器的累加功能wordAccumulator.add(word)}}// TODO 獲取累加結果println(wordAccumulator.value)} 結果:[chun1, chun2, chun3, chun4]

完整代碼

package date_9_23import java.utilimport org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext}/*** 自定義累加器*/ object Spark4_LongAccumulator {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CheckPoint").setMaster("local")//創建上下文對象val sc = new SparkContext(conf)val dataRDD:RDD[String] = sc.makeRDD(List("chun1","chun2","chun3","chun4"),2)// TODO 創建累加器val wordAccumulator = new WordAccumulator()// TODO 注冊累加器sc.register(wordAccumulator)dataRDD.foreach{case word=>{//TODO 執行累加器的累加功能wordAccumulator.add(word)}}// TODO 獲取累加結果println(wordAccumulator.value)} }//聲明累加器 //1.繼承父類AccumulatorV2[IN,OUT](IN,OUT是Driver發到Executor的類型與Executor返回給Driver的類型) //2.實現抽象方法 //3.創建累加器 class WordAccumulator extends AccumulatorV2[String,util.ArrayList[String]] {val list = new util.ArrayList[String]()//當前的累加器是不是初始化狀態(這里是判斷創建的集合是不是空)override def isZero: Boolean = {list.isEmpty}//復制累加器對象override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {new WordAccumulator}//重置累加器對象(這里把集合清空即可)override def reset(): Unit = {list.clear()}//向累加器中增加數據override def add(v: String): Unit = {if (v.contains("h")){list.add(v)}}//合并累加器(不同executor返回會有個合并的過程)override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {list.addAll(other.value)}//獲取累加器的結果override def value: util.ArrayList[String] = list } 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。