日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark基础脚本入门实践2:基础开发

發布時間:2024/4/17 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark基础脚本入门实践2:基础开发 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

1、最基本的Map用法

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val result = distData.map(x=>x*x)
println(result.collect().mkString(","))

其中最關鍵的操作就是:從分布式數據集 --轉換--> 并行數據集
from a distributed dataset to Parallelized collections

Spark分布式數據集包含:

  • local file system
  • HDFS
  • Cassandra
  • HBase
  • Amazon S3

Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

  • 比如文件:val distFile = sc.textFile("data.txt")
  • 比如hdfs:hdfs://
  • 比如s3:s3n://


讀取文件時需要注意的是:

  • 如果使用的是本地文件路徑,那么worker節點一定是有訪問權限的.
  • 文本文件的訪問方式: textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
  • hdfs系統會把文件按128MB進行分區

2、從外部文件系統獲取數據

val lines = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

3、flatMap用法

flatMap的做法有點象把迭代器拍扁拍碎,比如以下代碼
val lines = sc.parallelize(List("hi man","ha girl"))
val wordsFlatmap = lines.flatMap(line=>line.split(" "))
val wordsMap = lines.map(line=>line.split(" "))

看看區別:
scala> wordsFlatmap.first
res9: String = hi

scala> wordsMap.first
res10: Array[String] = Array(hi, man)
實際上wordsFlatmap被拆成了4個string,而wordsMap是對輸入的list每個元素進行了split操作,所以說flatMap的做法有點象把迭代器拍扁拍碎。比如說分詞就容易用flatMap

4、笛卡爾積

在推薦系統中,要計算各用戶對多個產品的興趣度,就可以制作一個笛卡爾積,用于比較用戶的的喜愛產品的相似度。
val man = sc.parallelize(List("Tom","Cat"))
val product = sc.parallelize(List("car","iphone","android","surfacePro"))
val result = man.cartesian(product)
result.collect

運行結果:
res0: Array[(String, String)] = Array((Tom,car), (Tom,iphone), (Tom,android), (Tom,surfacePro), (Cat,car), (Cat,iphone), (Cat,android), (Cat,surfacePro))
笛卡兒計算是很恐怖的,它會迅速消耗大量的內存,所以在使用這個函數的時候請小心

5、cache操作

在spark中使用cache是非常重要的,因為行動操作都是惰性求值,每次都會重新計算所有的依賴,如果有大量迭代,代價巨大。
緩存就可以從內容讀取,無需再次計算

scala> var data = sc.parallelize(List(1,2,3,4))
data: org.apache.spark.rdd.RDD[Int] =
  ParallelCollectionRDD[44] at parallelize at <console>:12

scala> data.getStorageLevel
res65: org.apache.spark.storage.StorageLevel =
  StorageLevel(false, false, false, false, 1)

scala> data.cache
res66: org.apache.spark.rdd.RDD[Int] =
  ParallelCollectionRDD[44] at parallelize at <console>:12

scala> data.getStorageLevel
res67: org.apache.spark.storage.StorageLevel =
  StorageLevel(false, true, false, true, 1)

我們先是定義了一個RDD,然后通過getStorageLevel函數得到該RDD的默認存儲級別,這里是NONE。然后我們調用cache函數,將RDD的存儲級別改成了MEMORY_ONLY(看StorageLevel的第二個參數)


6、檢查點

將生成的RDD保存到外部可靠的存儲當中,對于一些數據跨度為多個bactch的有狀態tranformation操作來說,checkpoint非常有必要,因為在這些transformation操作生成的RDD對前一RDD有依賴,隨著時間的增加,依賴鏈可能會非常長,checkpoint機制能夠切斷依賴鏈,將中間的RDD周期性地checkpoint到可靠存儲當中,從而在出錯時可以直接從checkpoint點恢復。
val data = sc.parallelize(1 to 100 , 5)
sc.setCheckpointDir("/myCheckPoint")
data.checkpoint
data.count


7、cogroup組合

將多個RDD中同一個Key對應的Value組合到一起。
scala> val data1 = sc.parallelize(List((1, "www"), (2, "bbs")))
scala> val data2 = sc.parallelize(List((1, "iteblog"), (2, "iteblog"), (3, "very")))
scala> val data3 = sc.parallelize(List((1, "com"), (2, "com"), (3, "good")))
scala> val result = data1.cogroup(data2, data3)
scala> result.collect
res30: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] =
Array((1,(CompactBuffer(www),CompactBuffer(iteblog),CompactBuffer(com))),
(2,(CompactBuffer(bbs),CompactBuffer(iteblog),CompactBuffer(com))),
(3,(CompactBuffer(),CompactBuffer(very),CompactBuffer(good))))


8、廣播變量

廣播變量是通過調用sparkcontext從變量v創建。廣播變量是V的包裝器,它的值可以通過調用值方法來訪問。下面的代碼顯示了這一點:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在創建廣播變量之后,應該使用它在集群上運行的任何函數中代替V值,這樣v就不會不止一次地發送到節點。此外,對象v在廣播之后不應該被修改,以確保所有節點獲得相同的廣播變量值(例如,如果變量稍后被運送到新節點)。

9、累加器


累加器一般用來累計和計數
val accum = sc.longAccumulator("My Accumulator")

//計數
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(1))
accum.value
res1: Long = 4

//累加
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value
res2: Long = 10

總結

以上是生活随笔為你收集整理的Spark基础脚本入门实践2:基础开发的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亚洲精品中字 | av第一福利大全导航 | 国产精品久久久久久久蜜臀 | 激情小说亚洲色图 | 中国妇女做爰视频 | 一级片黑人| 中文字幕乱码中文乱码777 | 国产伦理一区二区 | 欧美日韩成人在线 | 乱码av| 男男av网站| 西川结衣在线观看 | 在线观看二区 | 曰批免费视频播放免费 | 欧美精品videos极品 | 免费黄色在线观看 | 午夜久久久久 | 欧洲精品码一区二区三区免费看 | 91青青操 | 爱情岛亚洲首页论坛 | 波多野结衣视频在线播放 | 免费一级suv好看的国产网站 | 日韩精品一区二区三区免费视频 | 三级精品在线观看 | 精品无码一区二区三区在线 | 美女操操操 | 91在线国产观看 | 欧美精品乱码久久久久久 | 精品无码久久久久久久久果冻 | 免费看国产视频 | 中国一及毛片 | 国产精品久久免费 | 国产精品视频成人 | 国产又大又粗又硬 | 成人不卡av | 国产探花一区 | 黑人巨大精品欧美黑白配亚洲 | 香蕉色视频 | www.久久婷婷 | 国产精品高潮呻吟久久久 | 日本四虎影院 | 亚洲成人无码久久 | 久久久久麻豆v国产精华液好用吗 | 亚洲www啪成人一区二区麻豆 | 免费一级网站 | 粗大黑人巨茎大战欧美成人 | 亚洲在线| 成人久久久精品国产乱码一区二区 | 亚洲最新在线视频 | 中文在线观看免费网站 | 99精品久久久久久久 | 欧美日韩h| 欧美国产日韩一区二区三区 | 国产一区二区三区视频免费观看 | 熟女一区二区三区视频 | 亚洲成av人片在线观看无码 | 男女透逼视频 | 日日草草 | 伊人精品一区二区三区 | 激情一级片 | 姑娘第5集高清在线观看 | 四虎影视免费永久大全 | 男操女免费网站 | 自拍偷拍福利视频 | 不卡黄色 | 黄色的网站免费观看 | 免费性网站| 波多野结衣加勒比 | 亚洲人成色777777老人头 | 我的丝袜美腿尤物麻麻 | 91天天色 | 国产亚洲精品久久久久婷婷瑜伽 | 国产一二三在线观看 | 饥渴少妇色诱水电工 | 热久久这里只有精品 | 午夜一区二区三区 | 国产精品桃色 | а√在线中文网新版地址在线 | av资源网站| 一区二区在线视频免费观看 | 国产精品午夜在线 | 久久免费高清视频 | 欧美色图在线播放 | 色在线视频观看 | 国产91视频在线 | 国产精品11 | 直接看av的网站 | 久久橹| 韩国av毛片 | av影片在线看 | 日韩中文字幕视频在线 | 国产精品污 | 国产高清精品一区 | 国产黄色免费 | 久久久一区二区 | 美乳人妻一区二区三区 | 免费男女乱淫真视频免费播放 | 性欧美熟妇videofreesex | 女同vk |