日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark广播变量的使用(转)

發布時間:2023/12/20 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark广播变量的使用(转) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

環境:
ubuntu16.04 64
偽分布式
使用的spark是2.3.1
scala 2.11.8
參考連接:
https://blog.csdn.net/android_xue/article/details/79780463#commentsedit

注意,這篇博客是對上述參考鏈接的總結和概括.

一句話講明,廣播變量干嘛的?
就是你代碼里的某個變量在程序運行時太耗內存了,所以丟到各個slave機子中弄一個備份,省得代碼運行時傳來傳去的,就是為了省一下內存以及傳遞的耗時.
下面上代碼,
1.不用廣播變量的完整代碼BroadcastTest1.scala:

import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.execution.datasources.textobject BroadcastTest {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR) //這里是用來抑制一大堆log信息的. val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();spark.sparkContext.setLogLevel("ERROR")val list=List("hello java lalala")val linesRDD= spark.read.textFile("hdfs://master:9000/test/word.txt")linesRDD.filter(line=>{list.contains(line)}).collect().foreach(println)} }

運行方法:
1.啟動Hadoop的HDFS系統
2.hdfs dfs -put word.txt hdfs://master:9000/test/
3.scalac BroadcastTest1.scala
4.scala BroadcastTest
當然,這里也可以采用maven打包用spark-submit來運行

2.用廣播變量的完整代碼BroadcastTest2.scala:

import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.execution.datasources.textobject BroadcastTest {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR) //這里是用來抑制一大堆log信息的. val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val sc = spark.sparkContextsc.setLogLevel("ERROR")val list=List("hello java lalala")val broadcast=sc.broadcast(list)val linesRDD= spark.read.textFile("hdfs://master:9000/test/word.txt")linesRDD.filter(line=> {broadcast.value.contains(line)}).collect().foreach(println)spark.stop()//這里也可以使用sc.stop()}}

運行方法:
1.啟動Hadoop的HDFS系統
2.hdfs dfs -put word.txt hdfs://master:9000/test/
3.scalac BroadcastTest2.scala
4.scala BroadcastTest
當然,這里也可以采用maven打包用spark-submit來運行

總結

以上是生活随笔為你收集整理的spark广播变量的使用(转)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。