日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Pyspark:DataFrame的转化操作及行动操作

發(fā)布時(shí)間:2024/1/1 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Pyspark:DataFrame的转化操作及行动操作 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Spark版本:V3.2.1
持續(xù)補(bǔ)充

序言

??Spark DataFrame中的創(chuàng)建及常用的列操作可以參考博客:https://blog.csdn.net/yeshang_lady/article/details/89528090

正文

??因?yàn)镾park DataFrame是基于RDD創(chuàng)建的,所以DataFrame的操作也可以分為兩種類型:轉(zhuǎn)化操作和行動(dòng)操作。轉(zhuǎn)化操作可以將Spark DataFrame轉(zhuǎn)化為新的DataFrame,而不改變?cè)袛?shù)據(jù)。轉(zhuǎn)化操作都是惰性的,不會(huì)立即計(jì)算出來。而行動(dòng)操作會(huì)觸發(fā)所有轉(zhuǎn)化操作的實(shí)際求值。

1. 行動(dòng)操作

1.1 show展示數(shù)據(jù)

show方法作可以以表格的形式展示DataFrame中的數(shù)據(jù),該方法主要有以下幾個(gè)參數(shù):

  • n:要展示的數(shù)據(jù)行數(shù);
  • truncate: 是否對(duì)字符串截?cái)?#xff0c;也可以直接使用該字段指定顯示的字符串長度數(shù);
  • vertical: 是否垂直顯示DataFrame中的行;

其用法舉例如下:

from pyspark.sql.types import * from pyspark.sql import SparkSession from pyspark.sql import functions as func import pandas as pd from pyspark.sql.functions import pandas_udf spark=SparkSession.builder.appName("jsonRDD").getOrCreate() data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df.show(2,truncate=3) df.show(vertical=True)

其結(jié)果如下:

1.2 獲取所有數(shù)據(jù)到數(shù)組

show方法只能將DataFrame中的數(shù)據(jù)展示出來,但無法使用變量接收DataFrame。為了獲取數(shù)據(jù),可以使用collect方法將DataFrame中的數(shù)據(jù)保存到List對(duì)象中。具體用法如下:

data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df_array=df.collect() print(df_array)

其結(jié)果如下:

需要注意一點(diǎn),collect方法會(huì)將集群中DataFrame的所有數(shù)據(jù)取回到一個(gè)節(jié)點(diǎn)當(dāng)中,所以單臺(tái)節(jié)點(diǎn)的內(nèi)存不足以保存整個(gè)DataFrame中的所有數(shù)據(jù)時(shí)就會(huì)報(bào)內(nèi)存溢出錯(cuò)誤。

1.3 獲取若干行記錄

first、head、take、tail這四個(gè)方法可以獲取DataFrame中的若干行記錄。這四個(gè)方法比較類似,其中:

  • first: 該方法獲取DataFrame的第1行記錄。
  • head: 該方法獲取DataFrame的前nnn行記錄。該方法只適用于DataFrame數(shù)據(jù)量較小,且所有數(shù)據(jù)都保存在內(nèi)存中的情況。
  • take: 獲取DataFrame的前nnn行記錄。
  • tail: 獲取DataFrame的后nnn行記錄。

僅以take為例進(jìn)行用法說明(結(jié)果不再展示):

data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) res_1=df.take(2) print(res_1)
1.4 將DataFrame轉(zhuǎn)化pandas.DataFrame

toPandas方法可以將spark DataFrame轉(zhuǎn)化為Pandas DataFrame。用法如下:

data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) dt=df.toPandas() print(dt)

其結(jié)果如下:

2. 轉(zhuǎn)化操作

在具體介紹轉(zhuǎn)化操作之前,需要說明以下幾點(diǎn):

  • Spark DataFrame中的轉(zhuǎn)化操作方法中的字段名參數(shù)的數(shù)據(jù)類型一般為:String類型及Column對(duì)象,或者這兩種對(duì)象組成的List對(duì)象。當(dāng)方法能同時(shí)接收多個(gè)字段名時(shí),String類型和Column不能混用。
  • Spark DataFrame的轉(zhuǎn)化操作返回的結(jié)果均為Spark DataFrame類型,所以這些方法可以連續(xù)使用。
2.1 decribe獲取指定字段的統(tǒng)計(jì)信息

describe方法接收一個(gè)或多個(gè)String類型(Column類型不可以)的字段名,返回對(duì)應(yīng)字段的統(tǒng)計(jì)值,包括:Count、Mean、Stddev、Min、Max。該方法也可以不指定字段名,此時(shí)則返回所有字段的統(tǒng)計(jì)信息。具體用法如下:

data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df.describe(['Name','age']).show()

其結(jié)果如下:

2.2 where/filter篩選數(shù)據(jù)

where和filter的作用相同,都可以對(duì)DataFrame的數(shù)據(jù)進(jìn)行篩選。這里僅以where方法為例進(jìn)行說明。where(condition)中的condition可以接收兩種參數(shù)類型。具體如下:

  • 當(dāng)接收的參數(shù)為String型時(shí),其寫法參照SQL語言中where子句;
  • 當(dāng)接收的參數(shù)為Column類型時(shí),對(duì)于每一個(gè)字段的篩選要求需要分別描述,然后使用邏輯運(yùn)算組合起來即可(與或非寫法為:&、|、~)。

其用法舉例如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) #條件: C2不為空且C1大于25 df.where('C2 is not null and C1>25').show() #條件: C2為空 df.where(func.isnull('C2')).show() #條件: C2不為空且C3長度大于3 df.where(~func.isnull('C2')).where(func.length('C3')>3).show() #條件: C2不為空或C1大于25 df.filter((~func.isnull('C2'))|(func.col('C1')>25)).show()

其結(jié)果如下:

2.3 select/selectExpr查詢指定列

select和selectExpr的作用相同,區(qū)別在于這兩個(gè)方法接收的參數(shù)類型不同。具體如下:

  • select:該操作接收Sting類型(列名)、Column或List型的參數(shù)。如果想要查詢所有列,也可以使用?*?;
  • selectExpr:該操作接收SQL表達(dá)式,可以同時(shí)對(duì)特定字段進(jìn)行函數(shù)處理;

其用法舉例如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) #查詢所有列 df.select('*').show() #多個(gè)Column列組成的List df.select([func.length('C3'),func.lower('C3')]).show() #接收SQL表達(dá)式 df.selectExpr('length(C3)','C1>25').show()

其結(jié)果如下:

2.4 drop刪除指定列

drop方法中既可以接String型的參數(shù),也可以使用Column型參數(shù)。使用前者時(shí),可以同時(shí)刪除多列,使用后者時(shí)一次只能刪除一列。其用法舉例如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.drop('C1').show() df.drop('C1','C2').show() #注意這里不能寫成List df.drop(df.C1).show()

其結(jié)果如下:

2.5 limit獲取前n行記錄

limit方法獲取指定DataFrame的前n行記錄,其用法如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.limit(2).show()

其結(jié)果如下:

2.6 orderBy/sort/sortWithinPartitions按指定字段排序

orderBy和sort方法使用方法相同,而sortWithinPartitions方法可以對(duì)每個(gè)Parition排序,這里僅以sort為例進(jìn)行說明。sort方法可以接受兩種類型的參數(shù),不同的參數(shù)類型,排序的說明方式不同。具體如下:

  • 在使用Column型參數(shù)時(shí),在Column后面加.desc()表示降序,.asc()表示升序(也可以使用functions包中的asc和desc方法)。
  • 當(dāng)使用String型變量時(shí),使用ascending參數(shù)來指定排序方向。

具體用法舉例如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.sort(['C1','C2'],ascending=[0,1]).show() df.sort([func.desc('C2'),func.asc('C3')]).show()

其結(jié)果如下:

2.7 groupBy對(duì)字段進(jìn)行分組聚合

groupBy方法可以對(duì)數(shù)據(jù)進(jìn)行分組,其得到的是GroupedData類型對(duì)象。該對(duì)象的API提供了一些聚合操作。具體如下:

  • avg、max、min、mean、count、sum方法
    這里方法只能接String類型的變量名,并且會(huì)自動(dòng)忽略非數(shù)值型的字段。用法如下:
schema=StructType([StructField('State', StringType()),StructField('Color', StringType()),StructField('Count', IntegerType())]) df=spark.read.csv('data/mnm_dataset.csv',schema=schema,header=True) df.groupby('State').min('Count').show(5) df.groupby('State').sum('Count').show(5) df.groupby('State').count().show(5)

其結(jié)果如下:

  • agg:自定義聚合函數(shù)。
    agg方法中的自定義函數(shù)既可以使用avg、max等內(nèi)置的聚合函數(shù),也可以使用pyspark.sql.functions.pandas_udf定義的GROUPED_AGG類型的函數(shù)。舉例如下:
@pandas_udf('int') #統(tǒng)計(jì)不同值的個(gè)數(shù) def agg_func1(x:pd.Series) -> int:return (~x.duplicated()).sum() df.groupby('State').agg({'Color':'count','Count':'sum'}).show(5) df.groupby('State').agg(agg_func1('Color')).show(5)

其結(jié)果如下:

  • apply/applyInPandas:使用Pandas中的函數(shù)
    applyInPandas和apply方法作用相同,但apply在未來的Spark版本中會(huì)被廢棄掉,且相比applyInPandas方法,apply中函數(shù)的定義稍嫌麻煩。這里僅介紹applyInPandas的用法。具體如下:
def new_func(pdf:pd.DataFrame)-> pd.DataFrame:pdf['Count']=(pdf['Count']-pdf['Count'].mean())/pdf['Count'].std()return pdf[['State','Count']]df.groupby('State').applyInPandas(new_func, schema='State string,Count float').show(5)

其結(jié)果如下:

注意applyInPandas方法中的schema參數(shù)中指定的是自定義函數(shù)的返回值的類型信息,這個(gè)參數(shù)可以使用DDL格式的字符串也可以使用pyspark.sql.types.DataType類型對(duì)象。

  • pivot: 透視表
    pivot方法返回的對(duì)象類型仍為GroupedData類型,所以agg、avg等方法仍然可以繼續(xù)使用。舉例如下:
df.groupby('State').pivot('Color').sum('Count').show()

其結(jié)果如下:

2.8 去重操作

Spark DataFrame中提供了兩種去重操作,具體如下:

  • distinct:返回一個(gè)不包含重復(fù)記錄的DataFrame。
  • drop_duplicates:根據(jù)指定字段去重。

具體用法如下:

data=[[1,2],[1,2],[3,4],[1,3]] df=spark.createDataFrame(data,['A','B']) df.distinct().show() df.drop_duplicates(['A']).show()

其結(jié)果如下:

2.9 合并操作

Spark DataFrame中提供的對(duì)兩個(gè)DataFrame進(jìn)行合并的方法有:union、unionAll和unionByName。具體如下:

  • union、unionAll對(duì)兩個(gè)字段數(shù)目一致的DataFrame進(jìn)行合并。該兩個(gè)方法在合并時(shí)并不會(huì)檢查DataFrame的字段類型及字段名,只會(huì)按照字段的位置進(jìn)行合并。該方法與SQL中Union all作用相同。舉例如下:
data1=[[1,2],[1,2],[3,4],[1,3]] df1=spark.createDataFrame(data1,['A','B']) data2=[['a',6],['b',8]] df2=spark.createDataFrame(data2,['C','D']) df1.union(df2).show()

其結(jié)果如下:

  • unionByName:該方法會(huì)按照兩個(gè)DataFrame中同名的字段名進(jìn)行合并,該方法不要求兩個(gè)DataFrame的字段數(shù)目相同。具體如下:
data1=[[1,2],[1,2],[3,4],[1,3]] df1=spark.createDataFrame(data1,['A','B']) data2=[[5,6],[7,8]] df2=spark.createDataFrame(data2,['B','A']) df1.unionByName(df2).show() data3=[[4],[5]] df3=spark.createDataFrame(data3,['C']) df1.unionByName(df3,allowMissingColumns=True).show()

其結(jié)果如下:

2.10 join操作

join的作用與SQL中的join操作作用類似,這里不贅述。用法舉例如下;

data1=[[1,2],[5,4],[7,3]] df1=spark.createDataFrame(data1,['A','B']) data2=[[5,6],[7,8]] df2=spark.createDataFrame(data2,['A','B']) df1.join(df2,df1['A']==df2['A'],how='outer').select(df1.A,df1.B,df2.B.alias('B_1')).show()

其結(jié)果如下:

2.11 stat獲取指定字段統(tǒng)計(jì)信息

stat方法可計(jì)算指定字段或指定字段之間的統(tǒng)計(jì)信息,比如方差、協(xié)方差、頻繁出現(xiàn)的元素集合等。DataFrame.stat下的子調(diào)用接口如下表:

方法作用
approxQuantile計(jì)算數(shù)值列的近似百分位數(shù)( 關(guān)于該函數(shù)還有一些問題沒解決,以后補(bǔ)充)
corr計(jì)算兩個(gè)字段之的相關(guān)性
cov計(jì)算兩個(gè)字段的協(xié)方差
crosstab交叉表
freqItems計(jì)算某一列或某幾列中出現(xiàn)頻繁的值的集合,support參數(shù)規(guī)定頻繁項(xiàng)的最低支持度
sampleBy對(duì)指定列數(shù)據(jù)進(jìn)行采樣,需要指定某列具體數(shù)值的抽樣比例

舉例如下:

df=spark.read.csv('/data/mnm_dataset.csv',schema=schema,header=True)df.stat.freqItems(cols=['State','Color'],support=0.9).show() df.stat.sampleBy('Color',fractions={'Yellow':0.001}).show() df.stat.crosstab('State','Color').show()

其結(jié)果如下:

2.12 集合類操作

Spark DataFramet提供的集合類操作如下:

  • intersect\intersectAll:獲取兩個(gè)DataFrame中共有的記錄;
  • exceptAll:獲取一個(gè)DataFrame中有另一個(gè)DataFrame中沒有的數(shù)據(jù)記錄;

其具體用法如下:

data1=[[1,2],[3,4],[4,5]] df1=spark.createDataFrame(data1,['A','B']) data2=[[1,2]] df2=spark.createDataFrame(data2,['A','B']) data3=[[2,1]] df3=spark.createDataFrame(data3,['B','A'])df1.intersect(df2).show() df1.intersectAll(df3).show() df1.exceptAll(df2).show()

其結(jié)果如下:

這里需要注意,兩個(gè)DataFrame進(jìn)行集合操作時(shí)并不檢查列名,而是依靠列的位置進(jìn)行判斷的,所以df1和df3的交集結(jié)果為空。

2.13 操作字段名

Spark DataFrame常用的操作字段名方法主要包括以下兩種:

  • withColumnRenamed: 對(duì)DataFrame中的某列改名;
  • withColumn: 對(duì)DataFrame中新增一列;

其用法如下:

data1=[[1,2],[3,4],[4,5]] df1=spark.createDataFrame(data1,['A','B']) df1.withColumn('C',(df1['A']>3).alias('C')).show() df1.withColumnRenamed('A','A_1').show()

其結(jié)果如下:

2.14 處理空值列

na方法可以對(duì)具有空值列的行數(shù)據(jù)進(jìn)行處理,其提供了三種處理方法,具體如下:

  • drop: 刪除指定列的空值行,也可以直接使用dropna方法。
  • fill: 使用指定的值替換指定空值列的值。通過傳入指定空值列列名以及該空值列替換值組成的Map對(duì)象傳入fill方法來替換指定空值列的值。該方法與fillna方法作用相同。
  • replace:對(duì)值進(jìn)行替換;

舉例如下:

data=[[1,None,None],[2,4,8],[9,None,11]] df=spark.createDataFrame(data,['A','B','C']) df.show() #刪除數(shù)據(jù)-只有有一個(gè)空值該行數(shù)據(jù)就刪除 df.na.drop(how='any').show() #刪除數(shù)據(jù)-若一行數(shù)據(jù)中的空值數(shù)>=thresh,則該行記錄刪除 df.na.drop(thresh=2).show() #用0進(jìn)行空值填充 df.na.fill(0).show() #不同的列用不同的值填充 df.na.fill({'B':0,'C':1}).show() #數(shù)據(jù)替換-將A列中的2替換成4,9替換成8 df.na.replace([2,9],[4,8],'A').show()

其結(jié)果如下:

總結(jié)

以上是生活随笔為你收集整理的Pyspark:DataFrame的转化操作及行动操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。