spark广播变量的使用(转)
環境:
ubuntu16.04 64
偽分布式
使用的spark是2.3.1
scala 2.11.8
參考連接:
https://blog.csdn.net/android_xue/article/details/79780463#commentsedit
注意,這篇博客是對上述參考鏈接的總結和概括.
一句話講明,廣播變量干嘛的?
就是你代碼里的某個變量在程序運行時太耗內存了,所以丟到各個slave機子中弄一個備份,省得代碼運行時傳來傳去的,就是為了省一下內存以及傳遞的耗時.
下面上代碼,
1.不用廣播變量的完整代碼BroadcastTest1.scala:
運行方法:
1.啟動Hadoop的HDFS系統
2.hdfs dfs -put word.txt hdfs://master:9000/test/
3.scalac BroadcastTest1.scala
4.scala BroadcastTest
當然,這里也可以采用maven打包用spark-submit來運行
2.用廣播變量的完整代碼BroadcastTest2.scala:
import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.execution.datasources.textobject BroadcastTest {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR) //這里是用來抑制一大堆log信息的. val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val sc = spark.sparkContextsc.setLogLevel("ERROR")val list=List("hello java lalala")val broadcast=sc.broadcast(list)val linesRDD= spark.read.textFile("hdfs://master:9000/test/word.txt")linesRDD.filter(line=> {broadcast.value.contains(line)}).collect().foreach(println)spark.stop()//這里也可以使用sc.stop()}}運行方法:
1.啟動Hadoop的HDFS系統
2.hdfs dfs -put word.txt hdfs://master:9000/test/
3.scalac BroadcastTest2.scala
4.scala BroadcastTest
當然,這里也可以采用maven打包用spark-submit來運行
總結
以上是生活随笔為你收集整理的spark广播变量的使用(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: scala中_*的作用
- 下一篇: spark中累加器的使用(转)