日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

(1)spark核心RDD的概念解析、创建、以及相关操作

發布時間:2023/12/19 综合教程 55 生活家
生活随笔 收集整理的這篇文章主要介紹了 (1)spark核心RDD的概念解析、创建、以及相关操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

spark核心之RDD

什么是RDD

RDD指的是彈性分布式數據集(Resilient Distributed Dataset),它是spark計算的核心。盡管后面我們會使用DataFrame、Dataset進行編程,但是它們的底層依舊是依賴于RDD的。我們來解釋一下RDD(Resilient Distributed Dataset)的這幾個單詞含義。

彈性:在計算上具有容錯性,spark是一個計算框架,如果某一個節點掛了,可以自動進行計算之間血緣關系的跟蹤
分布式:很好理解,hdfs上數據是跨節點的,那么spark的計算也是要跨節點的。
數據集:可以將數組、文件等一系列數據的集合轉換為RDD

RDD是spark的一個最基本的抽象(如果你看一下源碼的話,你會發現RDD在底層是一個抽象類,抽象類顯然不能直接使用,必須要繼承它然后實現它內部的一些方法才可以使用,它代表了不可變的、元素的分區(partition)集合,這些分區可以被并行操作。假設我們有一個300萬元素的數組,那么我們就可以將300萬個元素分成3份,每一個份就是一個分區,每個分區都可以在不同的機器上進行運算,這樣就能提高運算效率。

RDD支持很多操作,比如:map、filter等等,我們后面會慢慢介紹。當然,RDD在scala的底層是一個類,但是我們后面有時候會把RDD和RDD實例對象都叫做RDD,沒有刻意區分,心里面清楚就可以啦。

RDD特性

RDD有如下五大特性:

RDD是一系列分區的集合。我們說了對于大的數據集我們可以切分成多份,每一份就是一個分區,可以每一個分區單獨計算,所以RDD就是這些所有分區的集合。就類似于hdfs中的block,一個大文件也可以切分成多個block
RDD計算會對每一個分區進行計算。假設我們對RDD做一個map操作,顯然是對RDD內部的每一個分區都進行相同的map操作。
RDD會依賴于一系列其它的RDD。假設我們對RDD1進行操作得到了RDD2,然后對RDD2操作得到了RDD3,同理再得到RDD4。而我們說RDD是不可變的,對RDD進行操作會形成新的RDD,所以RDD2依賴于RDD1,RDD3依賴于RDD2,RDD4依賴于RDD3,RDD1 => RDD2 => RDD3 => RDD4,所以RDD在轉換期間就如同流水線一樣,RDD之間是存在依賴關系的。這些依賴關系是非常重要的,假設RDD1有五個分區,那么顯然RDD2、3、4也是有五個分區的,假設在計算RDD3的時候RDD2的第三個分區數據丟失了,那么spark會通過RDD之間血緣關系,知道RDD2依賴于RDD1,那么會通過RDD1重新進行之前的計算得到RDD2第三個分區的數據,注意:這種情況只會計算丟失的分區的數據。所以我們說RDD具有容錯性,如果第n個操作失敗了,那么會從第n-1個操作重新開始。
可選,針對于key-value類型的RDD,會有一個partitioner,來表示這個RDD如何進行分區,比如:基于哈希進行分區。如果不是這種類型的RDD,那么這個partitioner顯然就是空了。
可選,用于計算每一個分區最好位置。怎么理解呢?我們說數據和計算都是分布式的,如果該分區對應的數據在A機器上,那么顯然計算該分區的最好位置就是A機器。如果計算和數據不在同一個機器或者說是節點上,那么我們會把計算移動到相應的節點上,因為在大數據中是有說法的,移動計算優于移動數據。所以RDD第五個特性就是具有計算每一個分區最好位置的集合。

圖解RDD

spark在運行的時候,每一個計算任務就是一個task,另外:對于RDD而言,不是一個RDD計算對應一個task,而是RDD內部的每一個分區計算都會對應一個task。假設這個RDD具有5個分區,那么對這個RDD進行一個map操作,就會生成5個task。另外,分區的數據是可以進行persist(持久化)的,比如:內存、磁盤、內存+磁盤、多副本、序列化。

關于RDD計算,我們畫一下圖

SparkContext和SparkConf

在介紹RDD之前,我們需要了解一下什么SparkContext和SparkConf,因為我們肯定要先連接到spark集群,才可以創建RDD進行編程。

SparkContext是pyspark的編程入口,作業的提交,任務的分發,應用的注冊都會在SparkContext中進行。一個SparkContext實例對象代表了和spark的一個連接,只有建立的連接才可以把作業提交到spark集群當中去。實例化了SparkContext之后才能創建RDD、以及我們后面會介紹的Broadcast廣播變量。

SparkConf是用來設置配置的,然后傳遞給SparkContext。

對于創建一個SparkContext對象,首先我們可以通過pyspark模塊來創建:

from pyspark import SparkContext
from pyspark import SparkConf

# setAppName是設置展示在webUI上的名字,setMaster表示運行模式
# 但是我們目前是硬編碼,官方推薦在提交任務的時候傳遞。當然我們后面說,現在有個印象即可
conf = SparkConf().setAppName("satori").setMaster("local")
# 此時我們就實例化出來一個SparkContext對象了,傳遞SparkConf對象
sc = SparkContext(conf=conf)
# 我們就可以使用sc來創建RDD

# 總之記?。篠parkContext是用來實例化一個對象和spark集群建立連接的
# SparkConf是用來設置一些配置的,傳遞給SparkContext

其次我們通過shell進行操作,我們直接啟動pyspark:

當我們啟動之后,輸入sc,我們看到pyspark shell直接為我們創建了一個默認的SparkContext實例對象,master叫做local[*](*表示使用計算機所以的核),appName叫做PySparkShell。我們在介紹RDD相關操作的時候,會先使用shell的方式進行演示,當然使用py腳本編程的時候也是一樣的。另外,pyspark使用的是原生的Cpython解釋器,所以像numpy、pandas之類的包,原生python可以導入的,在pyspark shell里面也是可以導入的。

我們通過sc.getConf()也能拿到對應的SparkConf實例對象。

那么我們可不可以在創建的時候手動指定master和name呢?答案顯然是可以的。

我們看到我們在創建的時候手動設置的master和name生效了,我們再通過webUI來看一下,pyspark的webUI默認是4040。

創建RDD

我們說RDD是spark的核心,那么如何創建一個RDD呢?答案顯然是通過SparkContext實例對象,因為上面已經說了。你可以通過編寫py文件的方式(我們后面會說)、手動創建一個SparkContext實例對象,也可以通過啟動pyspark shell,直接使用默認為你創建好的,對,就是那個sc。由于SparkContext實例對象操作方式都是一樣的,所以我們目前就先使用pyspark shell來進行編程。后面我們會說如何通過編寫腳本的方式進行spark編程,以及作業如何提交到spark上運行。

通過sc(為了方便,sc就代指了SparkContext實例對象)創建RDD有兩種方式。

將一個已經存在的集合轉成RDD
通過讀取存儲系統里面的文件,轉成RDD。這個存儲系統可以是本地、hdfs、hbase、s3等等,甚至可以是mysql等關系型數據庫。

下面我們就來代碼操作如何創建RDD,注意:現在我們是在pyspark shell中進行操作的。所以sc是創建好的,不要看到了sc覺得納悶,為什么變量沒定義就可以使用;還有由于是交互式環境,我們也不需要print,如果是可打印的,會自動打印。

從已經存在的集合創建

>>> data = range(10)
>>> rdd1 = sc.parallelize(data)  # 調用sc.parallelize方法,可以將已經存在的集合轉為RDD
>>> data
range(0, 10)
>>> rdd1  # 輸出得到的是一個RDD對象
PythonRDD[1] at RDD at PythonRDD.scala:53
>>> rdd1.collect()  # 如果想輸出的話,調用collect方法,這些后面會說。
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]   

>>> # 進行map操作得到rdd2
>>> rdd2 = rdd1.map(lambda x: x + 1)
>>> rdd2.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> # 進行reduce操作
>>> rdd2.reduce(lambda x, y: x + y)
55
>>> # 這些RDD相關的操作函數我們后面會說,但是從python的內置函數map、reduce顯然也能明白是干什么的

我們看一下web界面

上面顯示了三個任務,為什么是三個,我們后面會說。另外我們通過parallelize創建RDD的時候是可以指定分區的。

>>> rdd3 = sc.parallelize(data, 5)
>>> rdd3.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> 

雖然結果沒有變化,但是我們來看一下web界面。

我們看到任務數量變成了5,因為指定了5個分區,至于下面的2,說明默認是兩個分區。因為分區可大可小,如果每一個節點的cpu只執行一個分區可能有點浪費,如果跑的快的、或者分區的數據集比較少的,很快就跑完了,那么容易造成資源浪費,因此spark官方建議每隔CPU對應2到4個分區,這樣可以把資源充分利用起來。至于具體設置多少個,這個就取決于實際項目、以及規定的處理時間、節點對應的機器性能等等,所以如果你根據業務找到了比較好的分區個數,那么就傳遞給parallelize的第二個參數即可。

從存儲系統里面的文件創建

我們還可以讀取存儲系統里面的文件來創建RDD。我們演示一下從本地讀取文件、和從hdfs上讀取文件。

在本地創建一個satori.txt,內容如下,并上傳到hdfs上面。

>>> # 讀取文件使用textFile,接收一個文件路徑,當然同時也可以指定分區
>>> # 我們可以從本地讀取,讀取的格式為"file://文件路徑"
>>> rdd1 = sc.textFile("file:///root/satori.txt")
>>> rdd1.collect()  # 我們看到默認是以
分隔的
['hello golang', 'hello java', 'hello python', 'hello scala']
>>> 
>>> # 從hdfs上讀取,格式為"hdfs://ip:port文件路徑",port就是hdfs集群上的端口,就是你在core-site.xml里面設置的
>>> rdd2 = sc.textFile("hdfs://localhost:9000/satori.txt", 4)
>>> rdd2.collect()
['hello golang', 'hello java', 'hello python', 'hello scala']
>>> 
>>> rdd2.map(lambda x: len(x)).collect()
[12, 10, 12, 11]
>>> rdd2.map(lambda x: len(x)).reduce(lambda x, y: x + y)
45
>>> 

我們看到通過textFile讀取外部文件的方式創建RDD也是沒有問題的,但是需要注意的是:如果你是spark集群,并且還是通過本地文件的方式,那么你要保證所有節點上相同路徑都存在該文件。

我目前都是單節點的,當然對于學習來講單節點和多節點都是差不多的,不可能因為用的多節點,語法就變了,只是多節點在操作的時候要考慮到通信、資源等問題。比如:我們這里讀取的是本地的/root/satori.txt,這就表示訪問本地的/root/satori.txt文件,如果你搭建的是集群,那么你要保證每個節點都存在/root/satori.txt,否則節點根本獲取不到這個數據。因此這種情況需要也別注意了,所以在學習語法的時候我個人不建議搭建spark集群(也就是所謂的standalone模式),公司生產上面也很少使用這種模式,當然不是沒有,只是很少,絕大部分都是跑在yarn上面的。關于spark的運行模式,資源管理以及調度、我們后面也會慢慢聊。

因此解決辦法就是把文件拷貝到每一個節點上面,或者使用網絡共享的文件系統。

另外textFile不光可以讀取文件,還可以讀取目錄:/dir、模糊匹配:/dir/*.txt、以及讀取gz壓縮包都是支持的。

除了textFile,還可以使用wholeTextFiles讀取目錄。

wholeTextFiles:接收一個目錄,會把里面所有的文件內容讀取出來,以[("文件名", "文件內容"), ("文件名", "文件內容")...]的格式返回

>>> sc.wholeTextFiles("hdfs://localhost:9000/").collect()
[('hdfs://localhost:9000/satori.txt', 'hello golang
hello java
hello python
hello scala
')]
>>> # 我這里/目錄下面只有一個文件,把文件內容全部讀取出來了

我們現在知道如何讀取文件轉化為RDD,那么我們如何將RDD保存為文件呢?可以使用saveAsTextFile

>>> data = [1, 2, 3, 4, 5]
>>> rdd1 = rdd.map(lambda x: f"古明地覺{x}號")
>>> # 默認是本地,當然也可以指定file://
>>> rdd1.saveAsTextFile("/root/a.txt")
>>> # 保存到hdfs上面
>>> rdd1.saveAsTextFile("hdfs://localhost:9000/a.txt")

但是我們發現保存的a.txt并不是一個文件,并不是說把整個rdd都保存一個文件,這個是由你的分區決定的。保存的是一個目錄,文件在目錄里面,我們看到有兩部分,因為是兩個分區。

>>> data = [1, 2, 3, 4, 5]
>>> # 這里我們創建rdd的時候,指定5個分區
>>> rdd = sc.parallelize(data, 5)
>>> rdd1 = rdd.map(lambda x: f"古明地覺{x}號")
>>> # 保存為b.txt,顯然這個b.txt是個目錄
>>> rdd1.saveAsTextFile("/root/b.txt")
>>>     

結果跟我們預想的是一樣的,有多少個分區就會有多少個part,因為spark是把每個分區單獨寫入一個文件里面。至于hdfs我們就不用演示了,一樣的,算了還是看看吧。

spark應用程序開發以及運行

我們目前是通過pyspark shell進行操作的,顯然這僅僅是用來做測試使用的,我們真正開發項目肯定是使用ide進行操作的(vim、notepad你也給我當成是ide,Σ(⊙▽⊙"a)。下面我們就來看看如何使用python開發一個spark應用程序,并且運行它。這里我在Windows上使用pycharm開發,注意:但是python解釋器配置的我阿里云上python3,pycharm是支持這個功能的,遠程連接服務器上的python環境,所以我們在Windows上操作的python是linux上的python。

import os
import platform
print(os.name)  # posix
print(platform.system())  # Linux
print(os.listdir("/"))
"""
['home', 'run', 'tmp', 'opt', 'usr', 'lost+found', 'srv', 'lib', '.autorelabel', 
'proc', 'mnt', 'boot', 'lib64', 'dev', 'redis6379.log', 'sbin', 'sys', 'root', 
'bin', 'media', 'etc', 'var', 'data']
"""

還有一種簡便的方法,你在服務器上啟動一個jupyter notebook,然后再Windows上通過瀏覽器打開、輸入token遠程連接也是可以的。當然如果需要編寫的py文件比較多就不推薦了,如果只是學習的話還是可以的。

from pyspark import SparkContext
from pyspark import SparkConf

# 創建SparkConf實例:設置的是spark相關的參數信息
# 我們這里只設置appName,master默認就好,當然名字設置不設置也無所謂啊
conf = SparkConf().setAppName("satori")
# 傳入conf,創建SparkContext對象。另外master、appName也是可以在SparkContext里面單獨設置的
sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(data)
# 不在shell里面了,我們需要print才能看到結果
print(rdd1.collect())  # [1, 2, 3, 4, 5]

# 好的習慣,編程結束之后stop掉,表示關閉與spark的連接
# 否則當你再次創建相同的SparkContext實例的時候就會報錯
# 會提示你:Cannot run multiple SparkContexts at once; existing SparkContext(app=satori, master=local[*]
sc.stop()

我們這里是通過pyspark模塊執行是成功的,那么我們也可以編寫一個py文件提交到spark上面運行。

提交方式:pyspark-submit --master xxx --name xxx py文件

from pyspark import SparkContext
from pyspark import SparkConf

# 這里我們不再設置master和appName(name)了,還記得我們之前說過嗎?
# 官方不推薦這種硬編碼的模式,而是通過提交任務的時候指定
conf = SparkConf()
# 既然如此,那么我們就不再需要這個SparkConf了,這里我們寫上但是不傳遞到SparkContext里
sc = SparkContext()
data = [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(data)
print(rdd1.collect())  # [1, 2, 3, 4, 5]
sc.stop()

上面的代碼我們起名為test1.py,然后提交該作業:spark-submit --master local[*] --name 古明地覺 /root/test1.py

我們提交之后,執行是成功了的,但是輸出的東西灰常多,程序的結果就隱藏在中間。

那么問題來了,如果我有很多文件怎么辦?要是標準庫里面的包我們可以導入,但如果是我們自己寫的依賴怎么提交呢?首先多個文件(目錄)里面一定存在一個啟動文件,用來啟動整個程序。假設這個啟動文件叫start.py(當然啟動文件一定在項目的最外層,如果在項目的包里面,那么它就不可能成為啟動文件),那么把除了start.py的其它文件(目錄)打包成一個zip包或者egg,假設叫做dependency.egg,那么執行的時候就可以這么執行:

spark-submit --master xxx --name xxx --py-files dependency.egg start.py

如果我們寫的程序需要從命令行中傳遞參數,那么直接跟在start.py(啟動文件)后面就行。

關于輸出結果,我們只截取了一部分,詳細信息可以自己慢慢查看。以及spark-submit支持的其它參數,也可以通過spark-submit --help來查看,不過很多都用不到,因為spark-submit不僅可以提交python程序,還可以提交java等其它程序,里面的很多參數是為其它語言編寫的程序準備的,python用不到。

RDD相關操作

我們已經知道如何創建一個RDD、以及使用python開發spark程序并提交運行,那么下面我們來看看RDD都能進行哪些操作。我們讀取數據轉成RDD之后肯定是要進行操作的,我們之前看到了map、reduce、collect等操作,但是除了這些,RDD還支持很多其他的操作,我們來看一下。

RDD的操作分為兩種:transformation和action。

transformation:從一個RDD轉換成新的RDD這個過程就是transformation,比如map操作
action:對RDD進行計算得到一個值的過程叫做action,比如collect。

直接看可能不好理解,我們來舉個例子。我們對一個RDD進行map操作得到了新的RDD,但是這個RDD它并不是具體的值。我們對RDD進行collect操作的時候,才會把值返回回來。實際上,所有的transformation都是惰性的,意思是我們進行map操作的時候,RDD只是記錄了這個操作,但是它并沒有具體的計算,當我們進行collect求值的時候才會真正的開始進行計算。

>>> data = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(data)
>>> rdd1 = rdd.map(lambda x: str(x) + "~~~")
>>> rdd2 = rdd1.map(lambda x: "~~~" + x)
>>> 
>>> rdd2.collect()
['~~~1~~~', '~~~2~~~', '~~~3~~~', '~~~4~~~', '~~~5~~~']
>>> 

我們對rdd進行操作得到rdd1,rdd1得到rdd2,像這種對一個RDD操作得到新的RDD的過程我們稱之為transformation,它是惰性的(lazy),這些過程并不會真正的開始計算,只是記錄了相關的操作。當我們對于rdd2進行collect操作、要獲取值的時候,才會真正的開始計算,會從最初的rdd開始計算,這個過程我們稱之為action。

下面我們就來舉例說明RDD的相關操作:

map

map:接收一個函數,會對RDD里面每一個分區的每一個元素都執行相同的操作。話說,能用pyspark的編程的,我估計這些說了都是廢話。因此如果有些函數和python的內置函數比較類似的,我就不說那么詳細了。

>>> rdd1 = sc.parallelize([1, 2, 3, 4, 5])
>>> # 給里面每一個元素都執行加1的操作
>>> rdd1.map(lambda x: x+1).collect()
[2, 3, 4, 5, 6] 

filter

filter:類似Python中的filter,選擇出符合條件的

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8]
>>> rdd = sc.parallelize(numbers)
>>> rdd.filter(lambda x: x > 3).collect()
[4, 5, 6, 7, 8]
>>> 
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4, 6, 8]

flatMap

flatMap:和map不同的是,map是輸出一個值返回一個值,而flatMap是輸入一個值,返回一個序列、然后將這個序列打開,我們舉例說明。

>>> word = ["satori"]
>>> # 函數接收什么,返回什么,所以還是原來的結果
>>> sc.parallelize(word).map(lambda x: x).collect()
['satori']
>>> # 接收一個值,返回一個序列,然后會自動將這個序列打開
>>> sc.parallelize(word).flatMap(lambda x: x).collect()
['s', 'a', 't', 'o', 'r', 'i']
>>> 
>>> # split之后是一個列表,對于map,那么返回的就是列表
>>> words = ["hello mashiro", "hello satori"]
>>> sc.parallelize(words).map(lambda x: x.split(" ")).collect()
[['hello', 'mashiro'], ['hello', 'satori']]
>>> # 但對于flatMap來說,會將這個列表打開
>>> sc.parallelize(words).flatMap(lambda x: x.split(" ")).collect()
['hello', 'mashiro', 'hello', 'satori']
>>> 

所以從名字上看,flatMap相比map多了一個flat,也是很形象的,flat表示平的,操作上就是直接將列表打開,不再嵌套。另外我們看到我們將很多操作都寫在了一行,這是沒有問題的,如果操作比較多,我們鼓勵寫在一行,這叫做鏈式編程。當然如果為了直觀,你也可以分為多行來寫,反正transformation也是懶加載。

groupByKey

groupByKey:這個語言表達有點困難,我們直接看一個例子。

>>> val = [("a", "hello"), ("a", "how are you"), ("b", "who am i"), ("a", 4)]
>>> rdd = sc.parallelize(val)
>>> 
>>> rdd.groupByKey().collect()
[('b', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe37b8>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3630>)]

>>> rdd.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
[('b', ['who am i']), ('a', ['hello', 'how are you', 4])]
>>> 

我們看到使用groupByKey的rdd,是一個由[(x1, y1), (x2, y2), (x3, y3)...]這樣的序列(當然里面不一定是元組、列表也是可以的)轉化得到的,然后使用groupByKey會將元組里面第一個值相同的聚合到一起,就像我們看到的那樣,只不過得到的是一個可迭代對象,我們需要轉化為list對象。這個功能特別適合word count,也就是詞頻統計,再來看一個例子。

>>> words = ["hello mashiro", "hello world", "hello koishi"]
>>> rdd = sc.parallelize(words)
>>> # 先進行分隔
>>> rdd1 = rdd.flatMap(lambda x: x.split(" "))
>>> rdd1.collect()
['hello', 'mashiro', 'hello', 'world', 'hello', 'koishi']
>>> # 給每個詞都標上一個1,因為它們每個詞都出現了1次
>>> rdd2 = rdd1.map(lambda x: (x, 1))
>>> rdd2.collect()
[('hello', 1), ('mashiro', 1), ('hello', 1), ('world', 1), ('hello', 1), ('koishi', 1)]
>>> 
>>> # 使用groupByKey將值相同的匯聚到一起
>>> rdd3 = rdd2.groupByKey()
>>> rdd3.collect()
[('mashiro', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3828>), ('world', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3128>), ('koishi', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3c50>), ('hello', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3470>)]
>>> # 變成list對象
>>> rdd4 = rdd3.map(lambda x: (x[0], list(x[1])))
>>> rdd4.collect()
[('mashiro', [1]), ('world', [1]), ('koishi', [1]), ('hello', [1, 1, 1])]
>>> # 進行求和,即可得到每個詞出現的次數。當然求和的話可以直接使用sum,沒必要先變成list對象
>>> rdd5 = rdd4.map(lambda x: (x[0], sum(x[1])))
>>> rdd5.collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]
>>> 
>>> 

還記得之前說的鏈式編程嗎?其實這個詞頻統計很簡單,工作上是沒必要寫這么多行的。

>>> words = ["hello mashiro", "hello world", "hello koishi"]
>>> rdd = sc.parallelize(words)
>>> rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).groupByKey().map(lambda x: (x[0], sum(x[1]))).collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]

所以groupByKey非常適合詞頻統計,這里面不接收參數,調用這個方法RDD需要是一個列表或者元組、里面嵌套多個列表或者元組(包含兩個元素),然后把索引為0的值相同的聚合在一起。

reduceByKey

調用reduceByKey方法的rdd對應的數據集和groupByKey是一樣的,我們一旦看到ByKey,就應該想到序列里面的元素要是一個有兩個元素的序列,然后第一個元素相同的分發到一起。但是它和groupByKey不同的是,groupByKey不接收參數,然后直接把第一個元素相同聚合在一起,而reduceByKey會比groupByKey多一步,因為它需要接受一個函數,會自動將分發到一起的值(原來所有序列的第二個元素)進行一個計算。舉例說明:

>>> words = ["hello mashiro", "hello world", "hello koishi"]
>>> rdd = sc.parallelize(words)
>>> rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).groupByKey().map(lambda x: (x[0], sum(x[1]))).collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]
>>> 
>>> rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]

和groupByKey對比的話,還是很清晰的。

sortByKey

sortByKey:從名字能看出來,這個是排序用的,按照索引為0的元素進行排序。

>>> words = [('c', 2), ('a', 1), ('b', 3)]
>>> rdd = sc.parallelize(words)
>>> 
>>> rdd.sortByKey().collect()
[('a', 1), ('b', 3), ('c', 2)]
>>> 
>>> rdd.sortByKey(False).collect()
[('c', 2), ('b', 3), ('a', 1)]
>>> # 把元祖里面的兩個元素想象成字典的key: value,ByKey自然是根據Key來進行操作
>>> # 可顯然我們是想根據value來進行排序,根據出現次數多的進行排序。所以我們可以先交換順序,排完了再交換回來
>>> rdd.map(lambda x: (x[1], x[0])).sortByKey().map(lambda x: (x[1], x[0])).collect()
[('a', 1), ('c', 2), ('b', 3)]
>>> rdd.map(lambda x: (x[1], x[0])).sortByKey(False).map(lambda x: (x[1], x[0])).collect()
[('b', 3), ('c', 2), ('a', 1)]
>>> # 默認從小到大排,False則表示逆序、從大到小排

union

union:合并兩個RDD

>>> rdd1 = sc.parallelize([1, 2, 3])
>>> rdd2 = sc.parallelize([11, 22, 33])
>>> # 很簡單,就是將兩個RDD合并
>>> rdd1.union(rdd2).collect()
[1, 2, 3, 11, 22, 33]
>>> # 甚至和自身做union也是可以的
>>> rdd1.union(rdd1).collect()
[1, 2, 3, 1, 2, 3]

distinct

distinct:去重,我們看到這有點像sql啊。其實spark還支持spark sql、也就是寫sql語句的方式進行編程。我們后面、或者下一篇博客會說。

>>> rdd = sc.parallelize([11, 11, 2, 22, 3, 33, 3]).distinct()
>>> # 不過去重之后貌似沒什么順序了
>>> rdd.collect()
[2, 22, 11, 3, 33]

join

join:熟悉sql的估計肯定不陌生,join有以下幾種:inner join、left join、right join、outer join。這個操作join的RDD和xxxByKey對應的RDD應該具有相同的數據格式,對,就是[(x1, y1), (x2, y2)...]這種格式。

有時候光說不好理解,看例子就能很容易明白。

>>> rdd1 = sc.parallelize([("name", "古明地覺"), ("age", 16), ("gender", "female")])
>>> rdd2 = sc.parallelize([("name", "古明地戀"), ("age", 15), ("place", "東方地靈殿")])
>>> 
>>> # join默認是內連接,還是想象成key: value,把兩個RDD的key相同的匯聚在一起
>>> # 如果不存在相同的key,那么舍棄
>>> rdd1.join(rdd2).collect()
[('name', ('古明地覺', '古明地戀')), ('age', (16, 15))]
>>> 
>>> # 以左RDD為基準,如果右RDD沒有與之匹配的則為None,比如rdd1的"gender"在rdd2中不存在,所以置為None
>>> rdd1.leftOuterJoin(rdd2).collect()
[('name', ('古明地覺', '古明地戀')), ('gender', ('female', None)), ('age', (16, 15))]
>>> 
>>> # 同理以右RDD為基準,當然啦,順序還是從左到右的,里面的元素顯示rdd1的元素,再是rdd2的元素
>>> rdd1.rightOuterJoin(rdd2).collect()
[('name', ('古明地覺', '古明地戀')), ('age', (16, 15)), ('place', (None, '東方地靈殿'))]
>>> 
>>> # 全連接,不用我說了
>>> rdd1.fullOuterJoin(rdd2).collect()
[('name', ('古明地覺', '古明地戀')), ('gender', ('female', None)), ('age', (16, 15)), ('place', (None, '東方地靈殿'))]

zip

zip:類似于python中的zip,但是要求兩個RDD的元素個數以及分區數必須一樣。

>>> rdd1 = sc.parallelize(['a', 'b', 'c'])
>>> rdd2 = sc.parallelize([1, 2, 3])
>>> 
>>> rdd1.zip(rdd2).collect()
[('a', 1), ('b', 2), ('c', 3)]
>>> 

zipWithIndex

zipWithIndex:對單個RDD操作的,會給每個元素加上一層索引,從0開始自增。

>>> rdd1 = sc.parallelize(['a', 'b', 'c'])
>>> rdd1.zipWithIndex().collect()
[('a', 0), ('b', 1), ('c', 2)]

以上就是一些常用的transformation操作,我們說RDD轉換得到新的RDD這個過程叫做transformation,它是惰性的,只是記錄了操作,但是并沒有立刻進行計算。當遇到action操作時(計算具體的值,比如collect、reduce、當然還有其它action操作,我們后面會說),才會真正進行計算。那么下面我們再來看看一些不是很常用的transformation操作。

mapPartitions

mapPartitions:這個是對每一個分區進行map

>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> # 函數參數x不再是rdd的每一個元素,而是rdd的每一個分區
>>> # 這個不能寫return,要寫yield,或者返回一個可迭代的對象,會自動獲取里面的所有元素
>>> def f(x): yield sum(x)
... 
>>> # 三個分區,顯然一個分區兩個元素,那么會把每個分區的所有元素進行相加
>>> rdd.mapPartitions(f).collect()
[3, 7, 11] 
>>> # sum(x)不是一個可迭代的,我們需要放在一個列表里面,或者定義函數使用yield也行
>>> # 會自動遍歷返回的可迭代對象,把元素依次放到列表里面
>>> rdd.mapPartitions(lambda x: [sum(x)]).collect()
[3, 7, 11]

mapPartitionsWithIndex

mapPartitionsWithIndex:還是對每一個分區進行map,但是會多出一個索引

>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> rdd.mapPartitionsWithIndex(lambda index, x: (index, sum(x))).collect()
[0, 3, 1, 7, 2, 11]

列表中的0 1 2表示分區索引。

intersection

intersection:union是將兩個RDD合并,其實是取兩者的并集,intersection則是取交集,subtract則是取差集。

>>> rdd1 = sc.parallelize([1, 2, 3])
>>> rdd2 = sc.parallelize([1, 22, 3])
>>> rdd1.intersection(rdd2).collect()
[1, 3]
>>> rdd1.subtract(rdd2).collect()
[2]

sortBy

sortBy:我們之前說過sortByKey會默認按照key來排序,sortBy需要我們自己指定,可以按照key也可以按照value

>>> rdd = sc.parallelize([('a', 1), ('c', 2), ('b', 3)])
>>> rdd.sortBy(lambda x: x[0]).collect()
[('a', 1), ('b', 3), ('c', 2)]
>>> rdd.sortBy(lambda x: x[1]).collect()
[('a', 1), ('c', 2), ('b', 3)]
>>> 
>>> rdd.sortBy(lambda x: x[0], False).collect()
[('c', 2), ('b', 3), ('a', 1)]
>>> rdd.sortBy(lambda x: x[1], False).collect()
[('b', 3), ('c', 2), ('a', 1)]
>>> 

coalesce

coalesce:改變RDD的分區數。分區數會影響作業的并行度,因此會視作業的具體情況而定。這個方法第一個參數接收要改變的分區個數,第二個參數是shuffle,默認為False,表示重新分區的時候不進行shuffle操作,此時效率較高;如果指定為True,表示重分區的時候進行shuffle操作,此時效果等價于下面要介紹的repartition,效率較低。關于什么是shuffle操作,我們后面會說。

>>> rdd = sc.parallelize(range(10), 5)
>>> # 使用該函數可以查看分區數
>>> rdd.getNumPartitions()
5
>>> # 改變分區數,變成3
>>> rdd1 = rdd.coalesce(3)
>>> rdd1.getNumPartitions()
3
>>> # 分區數只能變少,不能變多
>>> rdd2 = rdd1.coalesce(4)
>>> rdd2.getNumPartitions()
3
>>> 

repartition

repartition:該方法也是對RDD進行重新分區,其內部使用shuffle算法,并且分區可以變多、也可以變少,如果是減少分區數,那么推薦使用coalesce。

>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd1 = rdd.repartition(4)
>>> rdd1.getNumPartitions()
4
>>> rdd1.repartition(2).getNumPartitions()
2
>>> 

flatMapValues

flatMapValues:和groupByKey相反,我們看個栗子就清楚了。

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("a", 3), ("b", 2)])
>>> rdd1 = rdd.groupByKey().map(lambda x: (x[0], list(x[1])))
>>> rdd1.collect()
[('b', [1, 2]), ('a', [1, 2, 3])]
>>> # 所以它個groupByKey是相反的,這里面一般寫lambda x: x
>>> rdd1.flatMapValues(lambda x: x).collect()
[('b', 1), ('b', 2), ('a', 1), ('a', 2), ('a', 3)]

groupBy

groupBy:之前的groupByKey默認是按照相同的key進行聚合,這里則可以單獨指定,并且里面序列里面的元素可以不再是元組,普通的整型也是可以的。

>>> rdd = sc.parallelize([12, "a", "ab", "1", 23, "xx"])
>>> # 將里面的元素變成str之后,長度大于1的分為一組,小于等于1的分為一組
>>> rdd.groupBy(lambda x: len(str(x))>1).collect()
[(False, <pyspark.resultiterable.ResultIterable object at 0x7f4c4f40f5c0>), (True, <pyspark.resultiterable.ResultIterable object at 0x7f4c4f40f048>)]
>>>
>>> rdd.groupBy(lambda x: len(str(x))>1).map(lambda x: (x[0], list(x[1]))).collect()
[(False, ['a', '1']), (True, [12, 'ab', 23, 'xx'])]

keyBy

keyBy:看例子就能理解,其實很多方法我們完全可以用已經存在的來替代。

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.keyBy(lambda x: f"hello_{x}").collect()
[('hello_1', 1), ('hello_2', 2), ('hello_3', 3)]
>>> 
>>> rdd.map(lambda x: (f"hello_{x}", x)).collect()
[('hello_1', 1), ('hello_2', 2), ('hello_3', 3)]

可以看到keyBy就是將函數返回的元素和原來的元素組合成一個二元tuple,這個我們完全可以使用map來替代,或許keyBy簡單了那么一點點,但是說實話我個人還是習慣用map。其實一些api如果沒有什么不可替代性、或者無法在很大程度上簡化工作量的話,我覺得記太多反而是個負擔。

keys和values

keys:獲取所有的key。values:獲取所有的value。我們這里的key和value都指的是二元tuple里面的兩個元素。其實RDD對應的數據類型無非兩種,一種是對應的列表里面都是整型或者字符串的RDD,另一種是里面都是二元tuple(或者list)的RDD,我們基本上使用這兩種RDD。我們上面出現的所有的key指的都是二元tuple里面的第一個元素,把這個tuple的兩個元素想象成字典的key和value即可。

>>> rdd = sc.parallelize([("a", 1), ("b", "a"), ("c", "c")])
>>> rdd.keys().collect()
['a', 'b', 'c']
>>> rdd.values().collect()
[1, 'a', 'c']

glom

glom:將每一個分區變成一個單獨的列表

>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6]]
>>> 

pipe

pipe:將RDD里面的每一個元素都執行相同的linux命令

>>> rdd = sc.parallelize(["hello", "hello1", "hello2"], 3)
>>> rdd.pipe("cat").collect()
['hello', 'hello1', 'hello2']
>>> # 1 1 6表示1行、1個單詞、6個字符
>>> rdd.pipe("wc").collect()
['      1       1       6', '      1       1       7', '      1       1       7']
>>> 

randomSplit

randomSplit:將RDD里面的元素隨機分隔

>>> rdd = sc.parallelize(range(10))
>>> rdd1 = rdd.randomSplit([1, 4])
>>> rdd1
[PythonRDD[203] at RDD at PythonRDD.scala:53, PythonRDD[204] at RDD at PythonRDD.scala:53]
>>> [_.collect() for _ in rdd1]
[[5, 7, 9], [0, 1, 2, 3, 4, 6, 8]]
>>> 

sample

sample:隨機取樣

>>> rdd = sc.parallelize(range(10))
>>> # 參數一:是否有放回。參數二:抽樣比例。參數三:隨機種子
>>> rdd.sample(True, 0.2, 123).collect()
[0, 9]

foldByKey

foldByKey:針對于key: value形式的RDD,進行聚合

>>> rdd = sc.parallelize([("a", (1, 2, 3, 4)), ("b", (11, 22, 33, 44))])
>>> rdd1 = rdd.flatMapValues(lambda x: x)
>>> rdd1.collect()
[('a', 1), ('a', 2), ('a', 3), ('a', 4), ('b', 11), ('b', 22), ('b', 33), ('b', 44)]
>>> # 參數一:起始值,參數二:操作函數
>>> rdd1.foldByKey(0, lambda x, y: x + y).collect()
[('b', 110), ('a', 10)]
>>> # 起始值指定20,那么會把20也當成一個元素、也就是初始元素,扔到函數里面去
>>> rdd1.foldByKey(20, lambda x, y: x + y).collect()
[('b', 130), ('a', 30)]
>>> # 我們看到0確實在里面
>>> rdd1.foldByKey(0, lambda x, y: f"{x}->{y}").collect()
[('b', '0->11->22->33->44'), ('a', '0->1->2->3->4')]
>>> 

以上就是一些transformation算子,有一些算子比較簡單我就沒介紹,比如mapValues之類的,我們完全可以使用map來替代,也很簡單,沒必要記這么多。如果有一些沒有介紹到的,可以自己通過pycharm查看RDD這個類源碼,看看它都支持哪些方法。源碼是很詳細的,都有大量的注釋。

那么下面我們來看一下action方法,action方法估計我們最一開始就見過了,沒錯就是collect,把RDD里面的內容以列表的形式返回,那么除了collect還有哪些action算子呢?我們來看一下。

reduce

reduce:這個應該也早就見過了,將里面的內容相加。

>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd.reduce(lambda x, y: x + y)
10

count

count:計算元素的個數。

>>> rdd = sc.parallelize([1, 2, 3, [4, 5]])
>>> rdd.count()
4

take、first

take、first:獲取指定個數的元素、獲取第一個元素。

>>> rdd = sc.parallelize([1, 2, [3, 4, 5], 6, 7, 8])
>>> # 如果指定的個數超過了元素的總個數也不會報錯,而是返回所有元素,即便RDD為空也可以。
>>> rdd.take(3)
[1, 2, [3, 4, 5]]
>>> # 注意:對于first來說,空的rdd調用的話會報錯
>>> rdd.first()
1

max、min、mean、sum

max、min、mean、sum:獲取元素最大值、最小值、平均值、總和。

>>> rdd = sc.parallelize([11, 22, 33, 22])
>>> rdd.max()
33
>>> rdd.min()
11
>>> rdd.mean()
22.0
>>> rdd.sum()
88

當然還有其它的數學函數,比如:stdev,求標準差、variance,求方差等等。遇到相應的需求,可以去查找。并且對于上面的數學操作,還分別對應另一個函數,比如:count -> countApprox,sum -> sumApprox等等,這些函數的特點是可以傳入一個timeout,單位為毫秒,要是在指定的時間內沒有計算完畢的話,那么就直接返回當前的計算結果??梢宰约簢L試一下。

foreach

foreach:類似于map,對序列里面的每一個元素都執行相同的操作。

>>> rdd = sc.parallelize([11, 22, 33, 22])
>>> # 但是foreach不會有任何的反應,不會跟map一樣返回新的RDD
>>> rdd.foreach(lambda x: x + 1)
>>> # 我們可以執行打印操作
>>> rdd.foreach(lambda x: print(x, x+123))
11 134
22 145
33 156
22 145
>>>

foreachPartition

foreachPartition:會對每一個分區都執行相同的操作。

>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> rdd.foreachPartition(lambda x: print(x))
<itertools.chain object at 0x7f9c90ca0978>
<itertools.chain object at 0x7f9c90ca0978>
<itertools.chain object at 0x7f9c90ca0978>
>>> rdd.foreachPartition(lambda x: print(list(x)))
[1, 2]
[3, 4]
[5, 6]
>>> 

aggregate

aggregate:這個稍微有點復雜,里面接收三個參數。

參數一:起始值,這個起始值是作用在每個分區上的
參數二:每個分區進行的操作
參數三:每個分區操作完之后的這些返回的結果進行的操作

>>> rdd = sc.parallelize([1, 2, 3, 1, 2, 3], 3)
>>> # 指定了三個分區,那么結果每個分區對應的值應該是這樣: [1, 2] [3, 1] [2, 3]
>>> # 每個分區按照第二個參數指定的操作進行計算,別忘記初始值,這個是作用在每個分區上面的
>>> # 結果就是:2 * 1 * 2, 2 * 3 * 1, 2 * 2 * 3 --> 4, 6, 12
>>> # 然后每個分區返回的結果執行第三個參數指定的操作,加在一起,所以是24
>>> rdd.aggregate(2, lambda x, y:x*y, lambda x, y: x+y)
24

aggregateByKey

aggregateByKey:這個是一個transformation方法,不是action,之所以放進來是為了和aggregate進行對比便于理解。這個是把相同的key分成一組,說不好說,直接看例子吧

>>> rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("c", 1), ("c", 2), ("c", 3)], 3)
>>> # 相同的分為一組,但是注意分區,倒數第三個("c", 1)是和("b", 3)在一個分區里面的
>>> # [("a", [1, 2])]    [("b", [3]), ("c", [1])]    [("c", [2, 3])]
>>> # 初始元素和里面元素依次相乘--> [("a", 4)]   [("b", 6), ("c", 2)]   [("c", 12)]
>>> # 然后對分區里面相同key再次進行參數三指定的操作--> [("a", 4)]  [("b", 6)]  [("c", 14)]
>>> # 上面的每一個列表看成是一個分區即可,為了清晰展示,我把每一個分區單獨寫成了一個列表
>>> rdd.aggregateByKey(2, lambda x,y:x*y, lambda x,y:x+y).collect()
[('b', 6), ('a', 4), ('c', 14)]

另外,對于很多的transformation操作,我們都是可以通過參數:numPartitions指定生成的新的RDD的分區的,不過一般情況下我們不指定這個參數,會和初始的RDD的分區數保持一致。當然如果初始的RDD的分區數設置的不合理,那么是可以在transformation操作的時候進行更改的。

fold

fold:類似于aggregateByKey,但它是action方法,而且調用的不是key、value形式的RDD、并且只需要指定一個函數,會對每個分區、以及每個分區返回的結果都執行相同的操作

>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> # [1, 2] [3, 4] [5, 6] -> 2 * 1 * 2,   2 * 3 * 4,   2 * 5 * 6
>>> # 4 * 24 * 60 * 2 = 11520,并且每一個分區計算之后的結果還要乘上指定的初始值,這一點需要注意
>>> rdd.fold(2, lambda x,y: x*y)
11520
>>>

collectAsMap

collectAsMap:對于內部是二元tuple的RDD,我們可以轉化為字典。

>>> rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("c", 1), ("c", 2), ("c", 3)], 3)
>>> # key相同的,value就會被替換掉
>>> rdd.collectAsMap()
{'a': 2, 'b': 3, 'c': 3}
>>> 

id

id:返回RDD的id值,每個RDD的id值是唯一的

>>> rdd1 = sc.parallelize([])
>>> rdd2 = sc.parallelize([])
>>> rdd3 = sc.parallelize([])
>>> 
>>> rdd1.id(), rdd2.id(), rdd3.id()
(326, 327, 328)
>>> 

histogram

histogram:返回一個直方圖數據,看栗子

>>> rdd = sc.parallelize(range(10))
>>> # 返回0-5以及5-8中間的元素個數,當然會連同區間一起返回。注意區間是左閉右開的
>>> rdd.histogram([0, 5, 8])
([0, 5, 8], [5, 4])
>>> # 如果不指定列表,而是指定整型的話
>>> # 會自動為我們將[min, max]等分4個區間,那么第一個列表就有5個元素
>>> rdd = sc.parallelize([0, 11, 33, 22, 44, 55, 66, 33, 100])
>>> rdd.histogram(4)
([0, 25, 50, 75, 100], [3, 3, 2, 1])
>>> 

isEmpty

isEmpty:檢測一個RDD是否為空

>>> rdd1 = sc.parallelize([])
>>> rdd2 = sc.parallelize([1])
>>> 
>>> rdd1.isEmpty(), rdd2.isEmpty()
(True, False)

lookup

lookup:查找指定key對應的value,那么顯然操作的RDD要是key: value形式的

>>> rdd = sc.parallelize([("a", 1), ("a", 2), ("b", "a")])
>>> rdd.lookup("a")
[1, 2]
>>> rdd.lookup("b")
['a']
>>> 

總結

以上就是RDD的一些操作,當然我們這里沒有全部介紹完,但是也介紹挺多了,如果工作中不夠用的話,那么只能看源碼了。當然這么多一次性肯定是無法全部背下來的,需要用的時候再去查即可,當然還是要多動手敲,孰能生巧。

總結

以上是生活随笔為你收集整理的(1)spark核心RDD的概念解析、创建、以及相关操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。