日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

学习笔记Spark(四)—— Spark编程基础(创建RDD、RDD算子、文件读取与存储)

發(fā)布時(shí)間:2025/3/21 编程问答 15 豆豆
生活随笔 收集整理的這篇文章主要介紹了 学习笔记Spark(四)—— Spark编程基础(创建RDD、RDD算子、文件读取与存储) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 一、創(chuàng)建RDD
    • 1.1、啟動(dòng)Spark shell
    • 1.2、創(chuàng)建RDD
      • 1.2.1、從集合中創(chuàng)建RDD
      • 1.2.2、從外部存儲(chǔ)中創(chuàng)建RDD
    • 任務(wù)1:
  • 二、RDD算子
    • 2.1、map與flatMap算子應(yīng)用
      • 2.1.1、map
      • 2.1.2、flatMap
      • 2.1.3、mapPartitions
    • 2.2、sortBy與filter算子應(yīng)用
      • 2.2.1、sortBy
      • 2.2.2、filter
    • 任務(wù)2:
    • 2.3、交集與并集計(jì)算的算子應(yīng)用
      • 2.3.1、distinct
      • 2.3.2、union
      • 2.3.3、intersection
      • 2.3.4、subtract
      • 2.3.5、cartesian
    • 任務(wù)3:
    • 2.4、 鍵值對(duì)RDD常用算子
      • 2.4.1、 創(chuàng)建鍵值對(duì)RDD
      • 2.4.2、mapValues
      • 2.4.3、groupByKey
      • 2.4.4、reduceByKey
      • 2.4.5、join
    • 2.5、常用Action類型算子
      • 2.5.1、lookup
      • 2.5.2、collect
      • 2.5.3、take
      • 2.5.4、count
    • 任務(wù)4:
  • 三、文件讀取與存儲(chǔ)
    • 3.1、saveAsTextFile
    • 3.2、repartition
    • 3.3、saveAsSequenceFile
    • 3.4、sequenceFile
    • 綜合練習(xí)1

一、創(chuàng)建RDD

1.1、啟動(dòng)Spark shell

進(jìn)入Spark命令行交互界面:spark-shell
退出交互界面::q


查看客戶端:(http://master:8080)


設(shè)置日志級(jí)別

sc.setLogLevel("INFO") sc.setLogLevel("WARN")

1.2、創(chuàng)建RDD

在Spark中創(chuàng)建RDD的創(chuàng)建方式大概可以分為三種:

  • 從集合中創(chuàng)建RDD
  • 從外部存儲(chǔ)創(chuàng)建RDD
  • 從其他RDD創(chuàng)建

1.2.1、從集合中創(chuàng)建RDD

parallelize():通過(guò)parallelize函數(shù)把一般數(shù)據(jù)結(jié)構(gòu)加載為RDD parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]

Parallelize Rdd默認(rèn)分區(qū)數(shù):sc.defaultParallelism,可通過(guò)spark.default.parallelism設(shè)置sc.defaultParallelism的值,沒(méi)有配置spark.default.parallelism時(shí)的默認(rèn)值等于cpu的核數(shù)

例1

例2


1.2.2、從外部存儲(chǔ)中創(chuàng)建RDD

通過(guò)textFile直接加載數(shù)據(jù)文件為RDD

textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

讀取HDFS時(shí)默認(rèn)分區(qū)數(shù):rdd的分區(qū)數(shù) = max(hdfs文件的block數(shù)目, sc.defaultMinPartitions),sc.defaultMinPartitions=min(sc.defaultParallelism,2)

從本地文件讀取:本地file的分片規(guī)則,應(yīng)該按照hdfs的block大小劃分,但實(shí)測(cè)的結(jié)果是固定按照32M來(lái)分片

讀取HDFS上文件:

讀取本地文件:
(注意:讀取本地文件時(shí),要確保每個(gè)集群上都要有文件,否則會(huì)報(bào)錯(cuò))


任務(wù)1:

1、HDFS上有三份文件,分別為student.txt(學(xué)生信息表),result_bigdata.txt(大數(shù)據(jù)基礎(chǔ)成績(jī)表),result_math.txt(數(shù)學(xué)成績(jī)表)
加載student.txt為名稱為student的RDD數(shù)據(jù),result_bigdata.txt為名稱為bigdata的RDD數(shù)據(jù),result_math.txt為名稱為math的RDD數(shù)據(jù)

數(shù)據(jù):

程序:


二、RDD算子

2.1、map與flatMap算子應(yīng)用

2.1.1、map

map(func)
  • Transformation類型算子
  • map: 將原來(lái)RDD的每個(gè)數(shù)據(jù)項(xiàng)通過(guò)map中的用戶自定義函數(shù)f轉(zhuǎn)換成一個(gè)新的RDD,map操作不會(huì)改變RDD的分區(qū)數(shù)目

示例:使用map函數(shù)對(duì)RDD中每個(gè)元素進(jìn)行倍數(shù)操作

2.1.2、flatMap

flatMap(func)
  • Transformation類型算子
  • flatMap:對(duì)集合中的每個(gè)元素進(jìn)行map操作再扁平化

示例:使用flatMap分割單詞


2.1.3、mapPartitions

mapPartitions(func)
  • Transformation類型算子
  • 和map功能類似,但是輸入的元素是整個(gè)分區(qū),即傳入函數(shù)的操作對(duì)象是每個(gè)分區(qū)的Iterator集合,該操作不會(huì)導(dǎo)致Partitions數(shù)量的變化

示例:取出每個(gè)分區(qū)中大于3的值


2.2、sortBy與filter算子應(yīng)用

2.2.1、sortBy

sortBy(f:(T) => K, ascending, numPartitions)
  • Transformation類型算子
  • 是可以對(duì)標(biāo)準(zhǔn)RDD進(jìn)行排序
  • sortBy()可接受三個(gè)參數(shù):
  • f:(T) => K:左邊是要被排序?qū)ο笾械拿恳粋€(gè)元素,右邊返回的值是元素中要進(jìn)行排序的值。
  • ascending:決定排序后RDD中的元素是升序還是降序,默認(rèn)是true,也就是升序,false為降序排序。
  • numPartitions:該參數(shù)決定排序后的RDD的分區(qū)個(gè)數(shù),默認(rèn)排序后的分區(qū)個(gè)數(shù)和排序之前的個(gè)數(shù)相等。

示例:按照每個(gè)元素的第二個(gè)值進(jìn)行降序排序


2.2.2、filter

filter(func)
  • Transformation類型算子
  • 保留通過(guò)函數(shù)func,返回值為true的元素,組成新的RDD
  • 過(guò)濾掉data RDD中元素小于或等于2的元素

示例:


任務(wù)2:

根據(jù)任務(wù)1得到的RDD bigdata及math,取出成績(jī)排名前5的學(xué)生成績(jī)信息



2.3、交集與并集計(jì)算的算子應(yīng)用

2.3.1、distinct

distinct([numPartitions]))
  • Transformation類型算子
  • 針對(duì)RDD中重復(fù)的元素,只保留一個(gè)元素

示例:


2.3.2、union

union(otherDataset)
  • 合并RDD,需要保證兩個(gè)RDD元素類型一致

示例:合并rdd1和rdd2


2.3.3、intersection

intersection(otherDataset)
  • 找出兩個(gè)RDD的共同元素,也就是找出兩個(gè)RDD的交集

示例:找出c_rdd1和c_rdd2中相同的元素


2.3.4、subtract

subtract (otherDataset)
  • 獲取兩個(gè)RDD之間的差集

示例:找出rdd1與rdd2之間的差集


2.3.5、cartesian

cartesian(otherDataset)
  • 笛卡爾積就是將兩個(gè)集合的元素兩兩組合成一組

示例:


任務(wù)3:

1、找出考試成績(jī)得過(guò)100分的學(xué)生ID,最終的結(jié)果需要集合到一個(gè)RDD中。
2、找出兩門成績(jī)都得100分的學(xué)生ID,結(jié)果匯總為一個(gè)RDD。


2.4、 鍵值對(duì)RDD常用算子

雖然大部分Spark的RDD操作都支持所有種類的單值RDD,但是有少部分特殊的操作只能作用于鍵值對(duì)類型的RDD。

顧名思義,鍵值對(duì)RDD由一組組的鍵值對(duì)組成,這些RDD被稱為PairRDD。PairRDD提供了并行操作各個(gè)鍵或跨節(jié)點(diǎn)重新進(jìn)行數(shù)據(jù)分組的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分別規(guī)約每個(gè)鍵對(duì)應(yīng)的數(shù)據(jù),還有join()方法,可以把兩個(gè)RDD中鍵相同的元素組合在一起,合并為一個(gè)RDD。


2.4.1、 創(chuàng)建鍵值對(duì)RDD

將一個(gè)普通的RDD轉(zhuǎn)化為一個(gè)PairRDD時(shí)可以使用map函數(shù)來(lái)進(jìn)行操作,傳遞的函數(shù)需要返回鍵值對(duì)。

做為鍵值對(duì)類型的RDD,包含了鍵跟值兩個(gè)部分。Spark提供了兩個(gè)方法分別獲取鍵值對(duì)RDD的鍵跟值。keys返回一個(gè)僅包含鍵的RDD,values返回一個(gè)僅包含值的RDD。


2.4.2、mapValues

mapValues(func)
  • 類似map,針對(duì)鍵值對(duì)(Key,Value)類型的數(shù)據(jù)中的Value進(jìn)行map操作,而不對(duì)Key進(jìn)行處理

示例:


2.4.3、groupByKey

groupByKey([numPartitions])
  • 按鍵分組,在(K,V)對(duì)組成的RDD上調(diào)用時(shí),返回(K,Iterable)對(duì)組成的新的RDD。

示例:將rdd按鍵進(jìn)行分組


2.4.4、reduceByKey

  • 將鍵值對(duì)RDD按鍵分組后進(jìn)行聚合
  • 當(dāng)在(K,V)類型的鍵值對(duì)組成的RDD上調(diào)用時(shí),返回一個(gè)(K,V)類型鍵值對(duì)組成的新RDD
  • 其中新RDD每個(gè)鍵的值使用給定的reduce函數(shù)func進(jìn)行聚合,該函數(shù)必須是(V,V)=>V類型

示例:統(tǒng)計(jì)每個(gè)鍵出現(xiàn)的次數(shù)


2.4.5、join

  • 把鍵值對(duì)數(shù)據(jù)相同鍵的值整合起來(lái)
  • 其他連接有:leftOuterJoin, rightOuterJoin, and fullOuterJoin

join: 把鍵值對(duì)數(shù)據(jù)相同鍵的值整合起來(lái)


2.5、常用Action類型算子

2.5.1、lookup

lookup(key: K)
  • Action類型算子
  • 作用于(K,V)類型的RDD上,返回指定K的所有V值

示例:


2.5.2、collect

collect()
  • 返回RDD中所有的元素
  • collectAsMap(): Map[K, V]

示例:


2.5.3、take

take(num)
  • 返回RDD前面num條記錄

示例:


2.5.4、count

count()
  • 計(jì)算RDD中所有元素個(gè)數(shù)


任務(wù)4:

1、輸出每位學(xué)生的總成績(jī),要求將兩個(gè)成績(jī)表中學(xué)生ID相同的成績(jī)相加。
2、輸出每位學(xué)生的平均成績(jī),要求將兩個(gè)成績(jī)表中學(xué)生ID相同的成績(jī)相加并計(jì)算出平均分。
3、合并每個(gè)學(xué)生的總成績(jī)和平均成績(jī)。





三、文件讀取與存儲(chǔ)

3.1、saveAsTextFile

saveAsTextFile(path: String)
  • 把RDD保存到HDFS中



3.2、repartition

repartition(numPartitions: Int)
  • 可以增加或減少此RDD中的并行級(jí)別。在內(nèi)部,它使用shuffle重新分發(fā)數(shù)據(jù)。
  • 如果要減少此RDD中的分區(qū)數(shù),請(qǐng)考慮使用coalesce,這樣可以避免執(zhí)行shuffle。
  • coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)



3.3、saveAsSequenceFile

saveAsSequenceFile(path)
  • 保存成序列化文件
  • 將數(shù)據(jù)集的元素作為Hadoop SequenceFile編寫,只支持鍵值對(duì)RDD


3.4、sequenceFile

sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int)
  • 讀取序列化文件


綜合練習(xí)1

綜合練習(xí):基于3個(gè)基站的日志數(shù)據(jù),要求計(jì)算某個(gè)手機(jī)號(hào)碼在一天之內(nèi)出現(xiàn)時(shí)間最多的兩個(gè)地點(diǎn)。

模擬了一些簡(jiǎn)單的日志數(shù)據(jù),共4個(gè)字段:手機(jī)號(hào)碼,時(shí)間戳,基站id,連接類型(1表示建立連接,0表示斷開(kāi)連接):
基站A:

基站B:
基站C:

程序:

總結(jié)

以上是生活随笔為你收集整理的学习笔记Spark(四)—— Spark编程基础(创建RDD、RDD算子、文件读取与存储)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。