日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark算子大全glom_2小时入门Spark之RDD编程

發布時間:2024/10/8 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark算子大全glom_2小时入门Spark之RDD编程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

公眾號后臺回復關鍵字:pyspark,獲取本項目github地址。

本節將介紹RDD數據結構的常用函數。包括如下內容:

創建RDD

常用Action操作

常用Transformation操作

常用PairRDD的轉換操作

緩存操作

共享變量

分區操作

這些函數中,我最常用的是如下15個函數,需要認真掌握其用法。

map

flatMap

mapPartitions

filter

count

reduce

take

saveAsTextFile

collect

join

union

persist

repartition

reduceByKey

aggregateByKey

import?findspark

#指定spark_home為剛才的解壓路徑,指定python路徑

spark_home?=?"/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"

python_path?=?"/Users/liangyun/anaconda3/bin/python"

findspark.init(spark_home,python_path)

import?pyspark

from?pyspark?import?SparkContext,?SparkConf

conf?=?SparkConf().setAppName("rdd_tutorial").setMaster("local[4]")

sc?=?SparkContext(conf=conf)

print(pyspark.__version__)

3.0.1

一,創建RDD

創建RDD主要有兩種方式,一個是textFile加載本地或者集群文件系統中的數據,

第二個是用parallelize方法將Driver中的數據結構并行化成RDD。

#從本地文件系統中加載數據

file?=?"./data/hello.txt"

rdd?=?sc.textFile(file,3)

rdd.collect()

['hello?world',

'hello?spark',

'spark?love?jupyter',

'spark?love?pandas',

'spark?love?sql']

#從集群文件系統中加載數據

#file?=?"hdfs://localhost:9000/user/hadoop/data.txt"

#也可以省去hdfs://localhost:9000

#rdd?=?sc.textFile(file,3)

#parallelize將Driver中的數據結構生成RDD,第二個參數指定分區數

rdd?=?sc.parallelize(range(1,11),2)

rdd.collect()

[1,?2,?3,?4,?5,?6,?7,?8,?9,?10]

二,常用Action操作

Action操作將觸發基于RDD依賴關系的計算。

collect

rdd?=?sc.parallelize(range(10),5)

#collect操作將數據匯集到Driver,數據過大時有超內存風險

all_data?=?rdd.collect()

all_data

[0,?1,?2,?3,?4,?5,?6,?7,?8,?9]

take

#take操作將前若干個數據匯集到Driver,相比collect安全

rdd?=?sc.parallelize(range(10),5)

part_data?=?rdd.take(4)

part_data

[0,?1,?2,?3]

takeSample

#takeSample可以隨機取若干個到Driver,第一個參數設置是否放回抽樣

rdd?=?sc.parallelize(range(10),5)

sample_data?=?rdd.takeSample(False,10,0)

sample_data

[7,?8,?1,?5,?3,?4,?2,?0,?9,?6]

first

#first取第一個數據

rdd?=?sc.parallelize(range(10),5)

first_data?=?rdd.first()

print(first_data)

0

count

#count查看RDD元素數量

rdd?=?sc.parallelize(range(10),5)

data_count?=?rdd.count()

print(data_count)

10

reduce

#reduce利用二元函數對數據進行規約

rdd?=?sc.parallelize(range(10),5)

rdd.reduce(lambda?x,y:x+y)

45

foreach

#foreach對每一個元素執行某種操作,不生成新的RDD

#累加器用法詳見共享變量

rdd?=?sc.parallelize(range(10),5)

accum?=?sc.accumulator(0)

rdd.foreach(lambda?x:accum.add(x))

print(accum.value)

45

countByKey

#countByKey對Pair?RDD按key統計數量

pairRdd?=?sc.parallelize([(1,1),(1,4),(3,9),(2,16)])

pairRdd.countByKey()

defaultdict(int,?{1:?2,?3:?1,?2:?1})

saveAsTextFile

#saveAsTextFile保存rdd成text文件到本地

text_file?=?"./data/rdd.txt"

rdd?=?sc.parallelize(range(5))

rdd.saveAsTextFile(text_file)

#重新讀入會被解析文本

rdd_loaded?=?sc.textFile(file)

rdd_loaded.collect()

['2',?'3',?'4',?'1',?'0']

三,常用Transformation操作

Transformation轉換操作具有懶惰執行的特性,它只指定新的RDD和其父RDD的依賴關系,只有當Action操作觸發到該依賴的時候,它才被計算。

map

#map操作對每個元素進行一個映射轉換

rdd?=?sc.parallelize(range(10),3)

rdd.collect()

[0,?1,?2,?3,?4,?5,?6,?7,?8,?9]

rdd.map(lambda?x:x**2).collect()

[0,?1,?4,?9,?16,?25,?36,?49,?64,?81]

filter

#filter應用過濾條件過濾掉一些數據

rdd?=?sc.parallelize(range(10),3)

rdd.filter(lambda?x:x>5).collect()

[6,?7,?8,?9]

flatMap

#flatMap操作執行將每個元素生成一個Array后壓平

rdd?=?sc.parallelize(["hello?world","hello?China"])

rdd.map(lambda?x:x.split("?")).collect()

[['hello',?'world'],?['hello',?'China']]

rdd.flatMap(lambda?x:x.split("?")).collect()

['hello',?'world',?'hello',?'China']

sample

#sample對原rdd在每個分區按照比例進行抽樣,第一個參數設置是否可以重復抽樣

rdd?=?sc.parallelize(range(10),1)

rdd.sample(False,0.5,0).collect()

[1,?4,?9]

distinct

#distinct去重

rdd?=?sc.parallelize([1,1,2,2,3,3,4,5])

rdd.distinct().collect()

[4,?1,?5,?2,?3]

subtract

#subtract找到屬于前一個rdd而不屬于后一個rdd的元素

a?=?sc.parallelize(range(10))

b?=?sc.parallelize(range(5,15))

a.subtract(b).collect()

[0,?1,?2,?3,?4]

union

#union合并數據

a?=?sc.parallelize(range(5))

b?=?sc.parallelize(range(3,8))

a.union(b).collect()

[0,?1,?2,?3,?4,?3,?4,?5,?6,?7]

intersection

#intersection求交集

a?=?sc.parallelize(range(1,6))

b?=?sc.parallelize(range(3,9))

a.intersection(b).collect()

[3,?4,?5]

cartesian

#cartesian笛卡爾積

boys?=?sc.parallelize(["LiLei","Tom"])

girls?=?sc.parallelize(["HanMeiMei","Lily"])

boys.cartesian(girls).collect()

[('LiLei',?'HanMeiMei'),

('LiLei',?'Lily'),

('Tom',?'HanMeiMei'),

('Tom',?'Lily')]

sortBy

#按照某種方式進行排序

#指定按照第3個元素大小進行排序

rdd?=?sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])

rdd.sortBy(lambda?x:x[2]).collect()

[(4,?1,?1),?(3,?2,?2),?(1,?2,?3)]

zip

#按照拉鏈方式連接兩個RDD,效果類似python的zip函數

#需要兩個RDD具有相同的分區,每個分區元素數量相同

rdd_name?=?sc.parallelize(["LiLei","Hanmeimei","Lily"])

rdd_age?=?sc.parallelize([19,18,20])

rdd_zip?=?rdd_name.zip(rdd_age)

print(rdd_zip.collect())

[('LiLei',?19),?('Hanmeimei',?18),?('Lily',?20)]

zipWithIndex

#將RDD和一個從0開始的遞增序列按照拉鏈方式連接。

rdd_name?=??sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])

rdd_index?=?rdd_name.zipWithIndex()

print(rdd_index.collect())

[('LiLei',?0),?('Hanmeimei',?1),?('Lily',?2),?('Lucy',?3),?('Ann',?4),?('Dachui',?5),?('RuHua',?6)]

四,常用PairRDD的轉換操作

PairRDD指的是數據為長度為2的tuple類似(k,v)結構的數據類型的RDD,其每個數據的第一個元素被當做key,第二個元素被當做value.

reduceByKey

#reduceByKey對相同的key對應的values應用二元歸并操作

rdd?=?sc.parallelize([("hello",1),("world",2),

("hello",3),("world",5)])

rdd.reduceByKey(lambda?x,y:x+y).collect()

[('hello',?4),?('world',?7)]

groupByKey

#groupByKey將相同的key對應的values收集成一個Iterator

rdd?=?sc.parallelize([("hello",1),("world",2),("hello",3),("world",5)])

rdd.groupByKey().collect()

[('hello',?),

('world',?)]

sortByKey

#sortByKey按照key排序,可以指定是否降序

rdd?=?sc.parallelize([("hello",1),("world",2),

("China",3),("Beijing",5)])

rdd.sortByKey(False).collect()

[('world',?2),?('hello',?1),?('China',?3),?('Beijing',?5)]

join

#join相當于根據key進行內連接

age?=?sc.parallelize([("LiLei",18),

("HanMeiMei",16),("Jim",20)])

gender?=?sc.parallelize([("LiLei","male"),

("HanMeiMei","female"),("Lucy","female")])

age.join(gender).collect()

[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]

leftOuterJoin和rightOuterJoin

#leftOuterJoin相當于關系表的左連接

age?=?sc.parallelize([("LiLei",18),

("HanMeiMei",16)])

gender?=?sc.parallelize([("LiLei","male"),

("HanMeiMei","female"),("Lucy","female")])

age.leftOuterJoin(gender).collect()

[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]

#rightOuterJoin相當于關系表的右連接

age?=?sc.parallelize([("LiLei",18),

("HanMeiMei",16),("Jim",20)])

gender?=?sc.parallelize([("LiLei","male"),

("HanMeiMei","female")])

age.rightOuterJoin(gender).collect()

[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]

cogroup

#cogroup相當于對兩個輸入分別goupByKey然后再對結果進行groupByKey

x?=?sc.parallelize([("a",1),("b",2),("a",3)])

y?=?sc.parallelize([("a",2),("b",3),("b",5)])

result?=?x.cogroup(y).collect()

print(result)

print(list(result[0][1][0]))

[('a',?(,?)),?('b',?(,?))]

[1,?3]

subtractByKey

#subtractByKey去除x中那些key也在y中的元素

x?=?sc.parallelize([("a",1),("b",2),("c",3)])

y?=?sc.parallelize([("a",2),("b",(1,2))])

x.subtractByKey(y).collect()

[('c',?3)]

foldByKey

#foldByKey的操作和reduceByKey類似,但是要提供一個初始值

x?=?sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)

x.foldByKey(1,lambda?x,y:x*y).collect()

[('a',?3),?('b',?10)]

五,緩存操作

如果一個rdd被多個任務用作中間量,那么對其進行cache緩存到內存中對加快計算會非常有幫助。

聲明對一個rdd進行cache后,該rdd不會被立即緩存,而是等到它第一次被計算出來時才進行緩存。

可以使用persist明確指定存儲級別,常用的存儲級別是MEMORY_ONLY和EMORY_AND_DISK。

如果一個RDD后面不再用到,可以用unpersist釋放緩存,unpersist是立即執行的。

緩存數據不會切斷血緣依賴關系,這是因為緩存數據某些分區所在的節點有可能會有故障,例如內存溢出或者節點損壞。

這時候可以根據血緣關系重新計算這個分區的數據。

#cache緩存到內存中,使用存儲級別 MEMORY_ONLY。

#MEMORY_ONLY意味著如果內存存儲不下,放棄存儲其余部分,需要時重新計算。

a?=?sc.parallelize(range(10000),5)

a.cache()

sum_a?=?a.reduce(lambda?x,y:x+y)

cnt_a?=?a.count()

mean_a?=?sum_a/cnt_a

print(mean_a)

#persist緩存到內存或磁盤中,默認使用存儲級別MEMORY_AND_DISK

#MEMORY_AND_DISK意味著如果內存存儲不下,其余部分存儲到磁盤中。

#persist可以指定其它存儲級別,cache相當于persist(MEMORY_ONLY)

from??pyspark.storagelevel?import?StorageLevel

a?=?sc.parallelize(range(10000),5)

a.persist(StorageLevel.MEMORY_AND_DISK)

sum_a?=?a.reduce(lambda?x,y:x+y)

cnt_a?=?a.count()

mean_a?=?sum_a/cnt_a

a.unpersist()?#立即釋放緩存

print(mean_a)

六,共享變量

當spark集群在許多節點上運行一個函數時,默認情況下會把這個函數涉及到的對象在每個節點生成一個副本。

但是,有時候需要在不同節點或者節點和Driver之間共享變量。

Spark提供兩種類型的共享變量,廣播變量和累加器。

廣播變量是不可變變量,實現在不同節點不同任務之間共享數據。

廣播變量在每個機器上緩存一個只讀的變量,而不是為每個task生成一個副本,可以減少數據的傳輸。

累加器主要是不同節點和Driver之間共享變量,只能實現計數或者累加功能。

累加器的值只有在Driver上是可讀的,在節點上不可見。

#廣播變量?broadcast?不可變,在所有節點可讀

broads?=?sc.broadcast(100)

rdd?=?sc.parallelize(range(10))

print(rdd.map(lambda?x:x+broads.value).collect())

print(broads.value)

[100,?101,?102,?103,?104,?105,?106,?107,?108,?109]

100

#累加器?只能在Driver上可讀,在其它節點只能進行累加

total?=?sc.accumulator(0)

rdd?=?sc.parallelize(range(10),3)

rdd.foreach(lambda?x:total.add(x))

total.value

45

#?計算數據的平均值

rdd?=?sc.parallelize([1.1,2.1,3.1,4.1])

total?=?sc.accumulator(0.1)

count?=?sc.accumulator(0)

def?func(x):

total.add(x)

count.add(1)

rdd.foreach(func)

total.value/count.value

2.625

七,分區操作

分區操作包括改變分區操作,以及針對分區執行的一些轉換操作。

glom:將一個分區內的數據轉換為一個列表作為一行。

coalesce:shuffle可選,默認為False情況下窄依賴,不能增加分區。repartition和partitionBy調用它實現。

repartition:按隨機數進行shuffle,相同key不一定在同一個分區

partitionBy:按key進行shuffle,相同key放入同一個分區

HashPartitioner:默認分區器,根據key的hash值進行分區,相同的key進入同一分區,效率較高,key不可為Array.

RangePartitioner:只在排序相關函數中使用,除相同的key進入同一分區,相鄰的key也會進入同一分區,key必須可排序。

TaskContext: ?獲取當前分區id方法 TaskContext.get.partitionId

mapPartitions:每次處理分區內的一批數據,適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支

mapPartitionsWithIndex:類似mapPartitions,提供了分區索引,輸入參數為(i,Iterator)

foreachPartition:類似foreach,但每次提供一個Partition的一批數據

glom

#glom將一個分區內的數據轉換為一個列表作為一行。

a?=?sc.parallelize(range(10),2)

b?=?a.glom()

b.collect()

[[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]

coalesce

#coalesce?默認shuffle為False,不能增加分區,只能減少分區

#如果要增加分區,要設置shuffle?=?true

#parallelize等許多操作可以指定分區數

a?=?sc.parallelize(range(10),3)

print(a.getNumPartitions())

print(a.glom().collect())

3

[[0,?1,?2],?[3,?4,?5],?[6,?7,?8,?9]]

b?=?a.coalesce(2)

print(b.glom().collect())

[[0,?1,?2],?[3,?4,?5,?6,?7,?8,?9]]

repartition

#repartition按隨機數進行shuffle,相同key不一定在一個分區,可以增加分區

#repartition實際上調用coalesce實現,設置了shuffle?=?True

a?=?sc.parallelize(range(10),3)

c?=?a.repartition(4)

print(c.glom().collect())

[[6,?7,?8,?9],?[3,?4,?5],?[],?[0,?1,?2]]

#repartition按隨機數進行shuffle,相同key不一定在一個分區

a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])

c?=?a.repartition(2)

print(c.glom().collect())

[[('a',?1),?('a',?2),?('c',?3)],?[('a',?1)]]

partitionBy

#partitionBy按key進行shuffle,相同key一定在一個分區

a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])

c?=?a.partitionBy(2)

print(c.glom().collect())

mapPartitions

#mapPartitions可以對每個分區分別執行操作

#每次處理分區內的一批數據,適合需要按批處理數據的情況

#例如將數據寫入數據庫時,可以極大的減少連接次數。

#mapPartitions的輸入分區內數據組成的Iterator,其輸出也需要是一個Iterator

#以下例子查看每個分區內的數據,相當于用mapPartitions實現了glom的功能。

a?=?sc.parallelize(range(10),2)

a.mapPartitions(lambda?it:iter([list(it)])).collect()

[[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]

mapPartitionsWithIndex

#mapPartitionsWithIndex可以獲取兩個參數

#即分區id和每個分區內的數據組成的Iterator

a?=?sc.parallelize(range(11),2)

def?func(pid,it):

s?=?sum(it)

return(iter([str(pid)?+?"|"?+?str(s)]))

[str(pid)?+?"|"?+?str]

b?=?a.mapPartitionsWithIndex(func)

b.collect()

#利用TaskContext可以獲取當前每個元素的分區

from?pyspark.taskcontext?import?TaskContext

a?=?sc.parallelize(range(5),3)

c?=?a.map(lambda?x:(TaskContext.get().partitionId(),x))

c.collect()

[(0,?0),?(1,?1),?(1,?2),?(2,?3),?(2,?4)]

foreachPartitions

#foreachPartition對每個分區分別執行操作

#范例:求每個分區內最大值的和

total?=?sc.accumulator(0.0)

a?=?sc.parallelize(range(1,101),3)

def?func(it):

total.add(max(it))

a.foreachPartition(func)

total.value

199.0

aggregate

#aggregate是一個Action操作

#aggregate比較復雜,先對每個分區執行一個函數,再對每個分區結果執行一個合并函數。

#例子:求元素之和以及元素個數

#三個參數,第一個參數為初始值,第二個為分區執行函數,第三個為結果合并執行函數。

rdd?=?sc.parallelize(range(1,21),3)

def?inner_func(t,x):

return((t[0]+x,t[1]+1))

def?outer_func(p,q):

return((p[0]+q[0],p[1]+q[1]))

rdd.aggregate((0,0),inner_func,outer_func)

(210,?20)

aggregateByKey

#aggregateByKey的操作和aggregate類似,但是會對每個key分別進行操作

#第一個參數為初始值,第二個參數為分區內歸并函數,第三個參數為分區間歸并函數

a?=?sc.parallelize([("a",1),("b",1),("c",2),

("a",2),("b",3)],3)

b?=?a.aggregateByKey(0,lambda?x,y:max(x,y),

lambda?x,y:max(x,y))

b.collect()

[('b',?3),?('a',?2),?('c',?2)]

總結

以上是生活随笔為你收集整理的spark算子大全glom_2小时入门Spark之RDD编程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。