日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark中累加器的使用(转)

發布時間:2023/12/20 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark中累加器的使用(转) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

環境:
ubuntu16.04 64
偽分布式
使用的spark是2.3.1
scala 2.11.8
參考連接:
https://blog.csdn.net/android_xue/article/details/79780463#commentsedit

注意,這篇博客是對上述參考鏈接的總結和概括.

一句話講明,累加器干嘛的?
統計slava機中的數據的總數量的.
上代碼,
1.不使用累加器的代碼AccumulatorTest.scala:

import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.execution.datasources.textobject AccumulatorTest {def main(args:Array[String]):Unit={Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR) //這里是用來抑制一大堆log信息的. val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();spark.sparkContext.setLogLevel("ERROR")val sc = spark.sparkContextval linesRDD=sc.textFile("hdfs://master:9000/test/word.txt")var i=0;val result=linesRDD.map(s=>{i+=1})result.collect();println("word lines is:"+i)sc.stop()}}

1.啟動Hadoop的HDFS系統
2.hdfs dfs -put word.txt hdfs://master:9000/test/
3.scalac AccumulatorTest1.scala
4.scala AccumulatorTest

2.使用累加器的代碼AccumulatorTest2.scala:

import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.execution.datasources.textobject AccumulatorTest {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR) //這里是用來抑制一大堆log信息的. val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val sc = spark.sparkContextsc.setLogLevel("ERROR")val linesRDD=sc.textFile("hdfs://master:9000/test/word.txt")val accumulator=sc.accumulator(0); //創建accumulator并初始化為0val result=linesRDD.map(s=> {accumulator.add(1)//有一條數據就增加1s//返回s給result,意思也就是,把word.txt中的內容賦值(可能會打亂順序)給result})result.collect().foreach(println);println("words lines is :"+accumulator.value)sc.stop()} }

1.啟動Hadoop的HDFS系統
2.hdfs dfs -put word.txt hdfs://master:9000/test/
3.scalac AccumulatorTest2.scala
4.scala AccumulatorTest

總結

以上是生活随笔為你收集整理的spark中累加器的使用(转)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。