日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Pyspark:DataFrame的转化操作及行动操作

發布時間:2024/1/1 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Pyspark:DataFrame的转化操作及行动操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark版本:V3.2.1
持續補充

序言

??Spark DataFrame中的創建及常用的列操作可以參考博客:https://blog.csdn.net/yeshang_lady/article/details/89528090

正文

??因為Spark DataFrame是基于RDD創建的,所以DataFrame的操作也可以分為兩種類型:轉化操作和行動操作。轉化操作可以將Spark DataFrame轉化為新的DataFrame,而不改變原有數據。轉化操作都是惰性的,不會立即計算出來。而行動操作會觸發所有轉化操作的實際求值。

1. 行動操作

1.1 show展示數據

show方法作可以以表格的形式展示DataFrame中的數據,該方法主要有以下幾個參數:

  • n:要展示的數據行數;
  • truncate: 是否對字符串截斷,也可以直接使用該字段指定顯示的字符串長度數;
  • vertical: 是否垂直顯示DataFrame中的行;

其用法舉例如下:

from pyspark.sql.types import * from pyspark.sql import SparkSession from pyspark.sql import functions as func import pandas as pd from pyspark.sql.functions import pandas_udf spark=SparkSession.builder.appName("jsonRDD").getOrCreate() data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df.show(2,truncate=3) df.show(vertical=True)

其結果如下:

1.2 獲取所有數據到數組

show方法只能將DataFrame中的數據展示出來,但無法使用變量接收DataFrame。為了獲取數據,可以使用collect方法將DataFrame中的數據保存到List對象中。具體用法如下:

data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df_array=df.collect() print(df_array)

其結果如下:

需要注意一點,collect方法會將集群中DataFrame的所有數據取回到一個節點當中,所以單臺節點的內存不足以保存整個DataFrame中的所有數據時就會報內存溢出錯誤。

1.3 獲取若干行記錄

first、head、take、tail這四個方法可以獲取DataFrame中的若干行記錄。這四個方法比較類似,其中:

  • first: 該方法獲取DataFrame的第1行記錄。
  • head: 該方法獲取DataFrame的前nnn行記錄。該方法只適用于DataFrame數據量較小,且所有數據都保存在內存中的情況。
  • take: 獲取DataFrame的前nnn行記錄。
  • tail: 獲取DataFrame的后nnn行記錄。

僅以take為例進行用法說明(結果不再展示):

data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) res_1=df.take(2) print(res_1)
1.4 將DataFrame轉化pandas.DataFrame

toPandas方法可以將spark DataFrame轉化為Pandas DataFrame。用法如下:

data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) dt=df.toPandas() print(dt)

其結果如下:

2. 轉化操作

在具體介紹轉化操作之前,需要說明以下幾點:

  • Spark DataFrame中的轉化操作方法中的字段名參數的數據類型一般為:String類型及Column對象,或者這兩種對象組成的List對象。當方法能同時接收多個字段名時,String類型和Column不能混用。
  • Spark DataFrame的轉化操作返回的結果均為Spark DataFrame類型,所以這些方法可以連續使用。
2.1 decribe獲取指定字段的統計信息

describe方法接收一個或多個String類型(Column類型不可以)的字段名,返回對應字段的統計值,包括:Count、Mean、Stddev、Min、Max。該方法也可以不指定字段名,此時則返回所有字段的統計信息。具體用法如下:

data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df.describe(['Name','age']).show()

其結果如下:

2.2 where/filter篩選數據

where和filter的作用相同,都可以對DataFrame的數據進行篩選。這里僅以where方法為例進行說明。where(condition)中的condition可以接收兩種參數類型。具體如下:

  • 當接收的參數為String型時,其寫法參照SQL語言中where子句;
  • 當接收的參數為Column類型時,對于每一個字段的篩選要求需要分別描述,然后使用邏輯運算組合起來即可(與或非寫法為:&、|、~)。

其用法舉例如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) #條件: C2不為空且C1大于25 df.where('C2 is not null and C1>25').show() #條件: C2為空 df.where(func.isnull('C2')).show() #條件: C2不為空且C3長度大于3 df.where(~func.isnull('C2')).where(func.length('C3')>3).show() #條件: C2不為空或C1大于25 df.filter((~func.isnull('C2'))|(func.col('C1')>25)).show()

其結果如下:

2.3 select/selectExpr查詢指定列

select和selectExpr的作用相同,區別在于這兩個方法接收的參數類型不同。具體如下:

  • select:該操作接收Sting類型(列名)、Column或List型的參數。如果想要查詢所有列,也可以使用?*?;
  • selectExpr:該操作接收SQL表達式,可以同時對特定字段進行函數處理;

其用法舉例如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) #查詢所有列 df.select('*').show() #多個Column列組成的List df.select([func.length('C3'),func.lower('C3')]).show() #接收SQL表達式 df.selectExpr('length(C3)','C1>25').show()

其結果如下:

2.4 drop刪除指定列

drop方法中既可以接String型的參數,也可以使用Column型參數。使用前者時,可以同時刪除多列,使用后者時一次只能刪除一列。其用法舉例如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.drop('C1').show() df.drop('C1','C2').show() #注意這里不能寫成List df.drop(df.C1).show()

其結果如下:

2.5 limit獲取前n行記錄

limit方法獲取指定DataFrame的前n行記錄,其用法如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.limit(2).show()

其結果如下:

2.6 orderBy/sort/sortWithinPartitions按指定字段排序

orderBy和sort方法使用方法相同,而sortWithinPartitions方法可以對每個Parition排序,這里僅以sort為例進行說明。sort方法可以接受兩種類型的參數,不同的參數類型,排序的說明方式不同。具體如下:

  • 在使用Column型參數時,在Column后面加.desc()表示降序,.asc()表示升序(也可以使用functions包中的asc和desc方法)。
  • 當使用String型變量時,使用ascending參數來指定排序方向。

具體用法舉例如下:

data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.sort(['C1','C2'],ascending=[0,1]).show() df.sort([func.desc('C2'),func.asc('C3')]).show()

其結果如下:

2.7 groupBy對字段進行分組聚合

groupBy方法可以對數據進行分組,其得到的是GroupedData類型對象。該對象的API提供了一些聚合操作。具體如下:

  • avg、max、min、mean、count、sum方法
    這里方法只能接String類型的變量名,并且會自動忽略非數值型的字段。用法如下:
schema=StructType([StructField('State', StringType()),StructField('Color', StringType()),StructField('Count', IntegerType())]) df=spark.read.csv('data/mnm_dataset.csv',schema=schema,header=True) df.groupby('State').min('Count').show(5) df.groupby('State').sum('Count').show(5) df.groupby('State').count().show(5)

其結果如下:

  • agg:自定義聚合函數。
    agg方法中的自定義函數既可以使用avg、max等內置的聚合函數,也可以使用pyspark.sql.functions.pandas_udf定義的GROUPED_AGG類型的函數。舉例如下:
@pandas_udf('int') #統計不同值的個數 def agg_func1(x:pd.Series) -> int:return (~x.duplicated()).sum() df.groupby('State').agg({'Color':'count','Count':'sum'}).show(5) df.groupby('State').agg(agg_func1('Color')).show(5)

其結果如下:

  • apply/applyInPandas:使用Pandas中的函數
    applyInPandas和apply方法作用相同,但apply在未來的Spark版本中會被廢棄掉,且相比applyInPandas方法,apply中函數的定義稍嫌麻煩。這里僅介紹applyInPandas的用法。具體如下:
def new_func(pdf:pd.DataFrame)-> pd.DataFrame:pdf['Count']=(pdf['Count']-pdf['Count'].mean())/pdf['Count'].std()return pdf[['State','Count']]df.groupby('State').applyInPandas(new_func, schema='State string,Count float').show(5)

其結果如下:

注意applyInPandas方法中的schema參數中指定的是自定義函數的返回值的類型信息,這個參數可以使用DDL格式的字符串也可以使用pyspark.sql.types.DataType類型對象。

  • pivot: 透視表
    pivot方法返回的對象類型仍為GroupedData類型,所以agg、avg等方法仍然可以繼續使用。舉例如下:
df.groupby('State').pivot('Color').sum('Count').show()

其結果如下:

2.8 去重操作

Spark DataFrame中提供了兩種去重操作,具體如下:

  • distinct:返回一個不包含重復記錄的DataFrame。
  • drop_duplicates:根據指定字段去重。

具體用法如下:

data=[[1,2],[1,2],[3,4],[1,3]] df=spark.createDataFrame(data,['A','B']) df.distinct().show() df.drop_duplicates(['A']).show()

其結果如下:

2.9 合并操作

Spark DataFrame中提供的對兩個DataFrame進行合并的方法有:union、unionAll和unionByName。具體如下:

  • union、unionAll對兩個字段數目一致的DataFrame進行合并。該兩個方法在合并時并不會檢查DataFrame的字段類型及字段名,只會按照字段的位置進行合并。該方法與SQL中Union all作用相同。舉例如下:
data1=[[1,2],[1,2],[3,4],[1,3]] df1=spark.createDataFrame(data1,['A','B']) data2=[['a',6],['b',8]] df2=spark.createDataFrame(data2,['C','D']) df1.union(df2).show()

其結果如下:

  • unionByName:該方法會按照兩個DataFrame中同名的字段名進行合并,該方法不要求兩個DataFrame的字段數目相同。具體如下:
data1=[[1,2],[1,2],[3,4],[1,3]] df1=spark.createDataFrame(data1,['A','B']) data2=[[5,6],[7,8]] df2=spark.createDataFrame(data2,['B','A']) df1.unionByName(df2).show() data3=[[4],[5]] df3=spark.createDataFrame(data3,['C']) df1.unionByName(df3,allowMissingColumns=True).show()

其結果如下:

2.10 join操作

join的作用與SQL中的join操作作用類似,這里不贅述。用法舉例如下;

data1=[[1,2],[5,4],[7,3]] df1=spark.createDataFrame(data1,['A','B']) data2=[[5,6],[7,8]] df2=spark.createDataFrame(data2,['A','B']) df1.join(df2,df1['A']==df2['A'],how='outer').select(df1.A,df1.B,df2.B.alias('B_1')).show()

其結果如下:

2.11 stat獲取指定字段統計信息

stat方法可計算指定字段或指定字段之間的統計信息,比如方差、協方差、頻繁出現的元素集合等。DataFrame.stat下的子調用接口如下表:

方法作用
approxQuantile計算數值列的近似百分位數( 關于該函數還有一些問題沒解決,以后補充)
corr計算兩個字段之的相關性
cov計算兩個字段的協方差
crosstab交叉表
freqItems計算某一列或某幾列中出現頻繁的值的集合,support參數規定頻繁項的最低支持度
sampleBy對指定列數據進行采樣,需要指定某列具體數值的抽樣比例

舉例如下:

df=spark.read.csv('/data/mnm_dataset.csv',schema=schema,header=True)df.stat.freqItems(cols=['State','Color'],support=0.9).show() df.stat.sampleBy('Color',fractions={'Yellow':0.001}).show() df.stat.crosstab('State','Color').show()

其結果如下:

2.12 集合類操作

Spark DataFramet提供的集合類操作如下:

  • intersect\intersectAll:獲取兩個DataFrame中共有的記錄;
  • exceptAll:獲取一個DataFrame中有另一個DataFrame中沒有的數據記錄;

其具體用法如下:

data1=[[1,2],[3,4],[4,5]] df1=spark.createDataFrame(data1,['A','B']) data2=[[1,2]] df2=spark.createDataFrame(data2,['A','B']) data3=[[2,1]] df3=spark.createDataFrame(data3,['B','A'])df1.intersect(df2).show() df1.intersectAll(df3).show() df1.exceptAll(df2).show()

其結果如下:

這里需要注意,兩個DataFrame進行集合操作時并不檢查列名,而是依靠列的位置進行判斷的,所以df1和df3的交集結果為空。

2.13 操作字段名

Spark DataFrame常用的操作字段名方法主要包括以下兩種:

  • withColumnRenamed: 對DataFrame中的某列改名;
  • withColumn: 對DataFrame中新增一列;

其用法如下:

data1=[[1,2],[3,4],[4,5]] df1=spark.createDataFrame(data1,['A','B']) df1.withColumn('C',(df1['A']>3).alias('C')).show() df1.withColumnRenamed('A','A_1').show()

其結果如下:

2.14 處理空值列

na方法可以對具有空值列的行數據進行處理,其提供了三種處理方法,具體如下:

  • drop: 刪除指定列的空值行,也可以直接使用dropna方法。
  • fill: 使用指定的值替換指定空值列的值。通過傳入指定空值列列名以及該空值列替換值組成的Map對象傳入fill方法來替換指定空值列的值。該方法與fillna方法作用相同。
  • replace:對值進行替換;

舉例如下:

data=[[1,None,None],[2,4,8],[9,None,11]] df=spark.createDataFrame(data,['A','B','C']) df.show() #刪除數據-只有有一個空值該行數據就刪除 df.na.drop(how='any').show() #刪除數據-若一行數據中的空值數>=thresh,則該行記錄刪除 df.na.drop(thresh=2).show() #用0進行空值填充 df.na.fill(0).show() #不同的列用不同的值填充 df.na.fill({'B':0,'C':1}).show() #數據替換-將A列中的2替換成4,9替換成8 df.na.replace([2,9],[4,8],'A').show()

其結果如下:

總結

以上是生活随笔為你收集整理的Pyspark:DataFrame的转化操作及行动操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产午夜精品一区二区三区 | av中文网 | 老色鬼在线 | 草草久久久无码国产专区 | 日日草草| 亚洲最黄视频 | 男生和女生差差视频 | 欧美特级aaa| 国产精品免费在线播放 | 91中文字幕在线观看 | 婷婷久久丁香 | 国产三级短视频 | 女仆乖h调教跪趴1v1 | 一个人看的www片免费高清中文 | 97视频免费在线 | 日本东京热一区二区 | 性久久久久久久久久 | 日韩二区在线 | 日韩欧美网 | 深夜免费福利视频 | 亚洲毛片在线免费观看 | 午夜激情欧美 | 91精品小视频 | 国产精品无码在线 | 免费在线黄色片 | 国产成人精品一区二区三区网站观看 | 小明成人免费视频 | 午夜视频免费观看 | 日本 欧美 国产 | 小珊的性放荡羞辱日记 | 成人精品免费看 | 欧美一区二区三区免费 | 天天色天天射天天操 | 私密spa按摩按到高潮 | 日韩欧美综合久久 | 精品久久一二三区 | 美女又爽又黄视频 | 天堂av影院 | 久久重口味 | 欧美一级一区 | 日韩精品aaa | 午夜久久影院 | 懂色av蜜臀av粉嫩av分享吧 | 色婷婷av一区二区三区大白胸 | 免费大黄网站 | 亚洲一区二区黄色 | 国产最新精品视频 | 强迫凌虐淫辱の牝奴在线观看 | 国产一区二区精品久久 | 国产地址一| 综合久色 | 亚洲一品道| 国产又粗又深又猛又爽又在线观看 | 动漫美女视频 | 国产男女猛烈无遮挡免费视频 | 日日舔夜夜摸 | 黄色香蕉视频 | 福利网站在线 | 成人福利网站在线观看 | 精品99在线| 国产一区中文字幕 | 亚洲综合一区二区 | 国产小视频免费观看 | 国产又黄又爽 | 玉米地疯狂的吸允她的奶视频 | 久久精品国产亚洲AV无码麻豆 | 欧美精品乱码久久久久久按摩 | 一区二区三区黄色 | 一级免费片| 午夜视频91 | 欧美亚洲高清 | 成年人国产视频 | 99热99热| 丝袜国产一区 | 女女综合网 | 亚洲欧美日韩精品久久亚洲区 | 日韩性网站 | 国产91绿帽单男绿奴 | 五月天激情小说 | 日韩乱码一区二区 | 中文字幕在线天堂 | 女人扒开腿让男人桶爽 | 欧美成人午夜影院 | av在线免| 老司机精品在线 | av激情在线观看 | 国产精品久久久久91 | 欧美三个黑人玩3p | 亚洲国产亚洲 | 四虎图库| 午夜在线一区 | 精品国产乱码久久久久久1区二区 | 538国产精品一区二区 | 亚洲影视精品 | 天堂视频免费 | 久久毛片网 | 深夜福利在线免费观看 | 日韩欧美理论片 | 91精品久久久久久久久中文字幕 |