spark中累加器的使用(转)
生活随笔
收集整理的這篇文章主要介紹了
spark中累加器的使用(转)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
環境:
ubuntu16.04 64
偽分布式
使用的spark是2.3.1
scala 2.11.8
參考連接:
https://blog.csdn.net/android_xue/article/details/79780463#commentsedit
注意,這篇博客是對上述參考鏈接的總結和概括.
一句話講明,累加器干嘛的?
統計slava機中的數據的總數量的.
上代碼,
1.不使用累加器的代碼AccumulatorTest.scala:
1.啟動Hadoop的HDFS系統
2.hdfs dfs -put word.txt hdfs://master:9000/test/
3.scalac AccumulatorTest1.scala
4.scala AccumulatorTest
2.使用累加器的代碼AccumulatorTest2.scala:
import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.execution.datasources.textobject AccumulatorTest {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR) //這里是用來抑制一大堆log信息的. val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val sc = spark.sparkContextsc.setLogLevel("ERROR")val linesRDD=sc.textFile("hdfs://master:9000/test/word.txt")val accumulator=sc.accumulator(0); //創建accumulator并初始化為0val result=linesRDD.map(s=> {accumulator.add(1)//有一條數據就增加1s//返回s給result,意思也就是,把word.txt中的內容賦值(可能會打亂順序)給result})result.collect().foreach(println);println("words lines is :"+accumulator.value)sc.stop()} }1.啟動Hadoop的HDFS系統
2.hdfs dfs -put word.txt hdfs://master:9000/test/
3.scalac AccumulatorTest2.scala
4.scala AccumulatorTest
總結
以上是生活随笔為你收集整理的spark中累加器的使用(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark广播变量的使用(转)
- 下一篇: cairo-clock设置为自动启动后总