日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

pyspark学习

發布時間:2024/1/1 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 pyspark学习 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

由于公司的項目需要用pyspark做數據清洗等工作,于是現學現用,也有很多不懂的地方,如果文章里面有什么總結得有問題的,歡迎大家指出。

更詳細的介紹也可以參考PySpark教程:使用Python學習Apache Spark

  • 一. pyspark簡介

1. pyspark是什么

要學習pyspark,肯定首先要知道pyspark是什么。

Apache Spark是用Scala編程語言編寫的。為了讓Spark支持Python,Apache Spark社區發布了一個工具PySpark,從而可以以交互的方式使用Python編寫Spark程序。

然后需要了解RDD是怎么一回事。

2. RDD是什么

這部分內容參考了另一篇文章:pyspark的使用和操作(基礎整理)

pyspark里最核心的模塊是SparkContext(簡稱sc),最重要的數據載體是RDD。RDD就像一個NumPy array或者一個Pandas Series(NumPy和Pandas都是Python的包),可以視作一個有序的item集合。只不過這些item并不存在driver端的內存里,而是被分割成很多個partitions,每個partition的數據存在集群的executor的內存中。

總結一下,在pyspark中創建RDD有兩種方法,一種是并行化一個列表,一種是直接讀取文件

但是所有工作的前提是初始化SparkSession,SparkSession是Spark 2引入的新概念。SparkSession為用戶提供了統一的切入點,來讓用戶學習spark的各項功能。spark2將SparkConf、SparkConText、SQLContext和HiveContext和StreamingContext進行了組合。所以在SQLContext和HiveContext等上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了SparkContext,所以計算實際上是由SparkContext完成的。

我的平臺裝的是spark2.2,可以用spark2的寫法,但spark1的寫法也不會報錯。

#spark2的寫法 from pyspark.sql import SparkSession from pyspark import SparkContext spark = SparkSession.builder.appName("Project").getOrCreate() sc = SparkContext.getOrCreate()#spark1的寫法 #from pyspark import SparkContext as sc #from pyspark import SparkConf #conf = SparkConf().setAppName("Project").setMaster("local[*]")#代碼只在本地運行 ##conf = SparkConf().setAppName("lg").setMaster("spark://192.168.10.10:7077")#代碼在master上運行,設置master的ip和端口 #sc = sc.getOrCreate(conf)##方法一 并行化一個集合 #使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame轉成Spark RDD。 rdd1 = sc.parallelize([('蘋果', 8), ('香蕉', 4), ('葡萄',6), ('梨', 3), ('火龍果', 9)]) rdd1 #ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195##方法二 讀取文件 rdd2 = sc.textFile("c1.py")#讀入一個文件 rdd3 = sc.wholeTextFiles("c1.py")#讀入整個文件夾的所有文件,RDD中的每個item實際上是一個形如(文件名,文件所有內容)的元組。 rdd2 #Output:c1.py MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0#getNumPartitions()方法可以查看list被分成了幾部分 rdd1.getNumPartitions() #Output:8#glom().collect()查看分區狀況 #小數據量情況下,可以直接將分布式的RDD通過轉換函數collect()轉換成一個數組,大數據量如上BT文件讀入會爆掉內存…… rdd1.glom().collect() #Output:[[], [('蘋果', 8)], [], [('香蕉', 4)], [('葡萄', 6)], [], [('梨', 3)], [('火龍果', 9)]] #如果是大數據量的rdd,可以用take(n)來選取n個數據查看 rdd1.take(3) #Output:[('蘋果', 8), ('香蕉', 4), ('葡萄', 6)]#first()方法取讀入的rdd數據第一個item rdd1.first() #Output:('蘋果', 8)

通過以下命令查看CPU個數:

import psutil print(u'cpu個數:',psutil.cpu_count()) #Output:cpu個數: 8

CPU個數為8,用list創建RDD時,就被分成了8個部分,可見分區數是按CPU個數決定的。

更多RDD的內容繼續參考另一篇文章,寫得挺不錯的(嬉笑臉):pyspark的使用和操作(基礎整理)

通過上面的內容應該對pyspark和RDD有所了解了吧,知道是怎么回事之后我們就繼續開始后面的工作。

  • 二. pyspark的安裝

1.準備spark和python的環境

以下內容都是在Linux環境下實現。

pyspark既然是python+spark,那么肯定需要spark和python了,我這里是在這之前已經搭建好了大數據集群,下面是我的集群:

?

如果還沒有spark環境的,網上有很多相關文章,這里就不做過多的說明。

也可以參考我寫的《大數據平臺環境部署手冊》,我做了完整的總結:https://www.jianshu.com/p/bb09da06e045

Linux系統是自帶了python的,不過是python2,現在基本上都用python3了,未來python2會停止維護。

安裝Anaconda,Anaconda是個超級好用的工具,關于Anaconda的安裝,網上有很多相關文章,這里也不做過多的說明。

2.安裝pyspark

在Anaconda中使用pyspark,直接執行如下命令,就可以直接導入pyspark模塊了,可以拿上面RDD的代碼做測試。

conda install pyspark

接下來要開始我重要工作了。

  • 三. RDD轉換sparkDataFrame

本來我的寫到一半了,結果看到了有其他的博主寫了pyspark學習系列的文章:

pyspark學習系列(一)創建RDD

pyspark學習系列(二)讀取CSV文件 為RDD或者DataFrame進行數據處理

……這學習系列還有很多,基本上滿足了我對于pyspark的所有學習需求

看到這些文章,我就又忍不住想偷懶了(@~@),不過我還是繼續簡單的記錄一下自己學到的東西吧,跟緊大佬的步伐。

我們知道DataFrame是Python中Pandas庫中的一種重要的數據結構,操作起數據來相當方便。spark也有DataFrame,概念差不多,RDD是最重要的數據載體。但RDD是無schema的數據結構,DataFrame是有schema的數據結構。

所以RDD想轉換成DataFrame就是在RDD基礎上加上schema,如果沒有提前定義好schema的名稱,轉化過程中默認schema為:_1,_2,_3

RDD轉換成DataFrame:

from pyspark.sql import SQLContext sqlContest = SQLContext(sc)data1 = spark.createDataFrame(rdd1) data2 = sqlContest.createDataFrame(rdd1)

然后toPandas()就可以查看結果:

data1.toPandas()

同樣 dataframe也可以轉換成rdd: rdd.map(lambda x: -----)

  • 四.?讀取CSV文件 為RDD或者DataFrame進行數據處理

1. 本地csv文件讀取:

最簡單的方法:

import pandas as pd file = pd.read_csv(file) df = sqlContest.createDataFrame(file)

?2. hdfs上的csv文件讀取:

1)采用先讀為RDD再轉換的形式

2)采用sqlContext.read.format()

有興趣的可以參考下這個,不過我還沒有實踐過不知道能不能成功:python對hdfs/spark讀寫操作(hdfs/pyspark)

五. 其他

還可以利用SQL進行查詢

#用createOrReplaceTempView方法創建臨時表 df.createOrReplaceTempView("table") #用SQL語句對這個臨時表進行查詢統計 spark.sql("select * from table").show()

?RDD還有很多跟Python的dataframe差不多的方法:

map() 對RDD的每一個item都執行同一個操作 flatMap() 對RDD中的item執行同一個操作以后得到一個list,然后以平鋪的方式把這些list里所有的結果組成新的list filter() 篩選出來滿足條件的item distinct() 對RDD中的item去重 sample() 從RDD中的item中采樣一部分出來,有放回或者無放回 sortBy() 對RDD中的item進行排序

將RDD轉化為dataframe之后,大多數就可以用dataframe的方法進行數據清洗了:去重,刪除空值,統計等

這篇文章暫時就總結到這里,有問題的歡迎指正,也歡迎一起探討一起學習哦。

?

“當你的才華還撐不起你的野心的時候,你就應該靜下心來學習”

以后這句話將會出現在我的每一篇博文中,用于提醒我自己,靜下來好好學習。

總結

以上是生活随笔為你收集整理的pyspark学习的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。