dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化
Spark大數(shù)據(jù)分析中涉及到RDD、Data Frame和SparkSQL的操作,本文簡要介紹三種方式在數(shù)據(jù)統(tǒng)計中的算子使用。
1、在IPython Notebook運行Python Spark程序
IPython Notebook具備交互式界面,可以在Web界面輸入Python命令后立刻看到結(jié)果,還可將數(shù)據(jù)分析的過程和運行后的命令與結(jié)果存儲成筆記本,下次可以打開筆記本,重新執(zhí)行這些命令,IPython Notebook筆記本可以包含文字、數(shù)學(xué)公式、程序代碼、結(jié)果、圖形等。
1.1 安裝IPython
1)若無gcc,需先安裝gcc
[root@tango-spark01 /]# gcc –v[root@tango-spark01 /]# yum install gcc
2)若無pip,安裝pip
[root@tango-spark01 /]# pip –v[root@tango-spark01 /]# wget https://bootstrap.pypa.io/get-pip.py --no-check-certificate
3)安裝Python開發(fā)包
[root@tango-spark01 /]# yum install python-devel4)執(zhí)行以下命令安裝IPython和IPython Notebook:
[root@tango-spark01 /]# pip install ipython[root@tango-spark01 /]# pip install urllib3
[root@tango-spark01 /]# pip install jupyter
5)輸入ipython進(jìn)入交互界面
6)輸入jupyter notebook
1.2 IPython配置
1)創(chuàng)建遠(yuǎn)程連接密碼
In [2]: from notebook.auth import passwd;In [3]: passwd()
Enter password:
Verify password:
Out[3]: 'sha1:fea9052f4d92:114eb32c684004bf4a6196faac0b26a28156fe5d'
2)生成jupyter配置文件
[root@tango-spark01 /]# jupyter notebook --generate-configWriting default config to: /root/.jupyter/jupyter_notebook_config.py
3)打開配置文件,設(shè)置以下內(nèi)容
## The IP address the notebook server will listen on.#c.NotebookApp.ip = 'localhost'
c.NotebookApp.ip = '0.0.0.0'
## The directory to use for notebooks and kernels.
#c.NotebookApp.notebook_dir = u''
c.NotebookApp.notebook_dir = u'/usr/local/spark/ipynotebook'
## Hashed password to use for web authentication.
# To generate, type in a python/IPython shell:
# from notebook.auth import passwd; passwd()
# The string should be of the form type:salt:hashed-password.
#c.NotebookApp.password = u''
c.NotebookApp.password = u'sha1:fea9052f4d92:114eb32c684004bf4a6196faac0b26a28156fe5d'
4)打開jupyter notebook
[root@tango-spark01 /]# jupyter notebook --allow-root[I 14:20:05.618 NotebookApp] Serving notebooks from local directory: /usr/local/spark/ipynotebook
[I 14:20:05.618 NotebookApp] The Jupyter Notebook is running at:
[I 14:20:05.619 NotebookApp] http://(tango-spark01 or 127.0.0.1):8888/
[I 14:20:05.619 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[W 14:20:05.619 NotebookApp] No web browser found: could not locate runnable browser.
[I 14:21:00.346 NotebookApp] 302 GET / (192.168.112.1) 2.50ms
[I 14:21:00.352 NotebookApp] 302 GET /tree? (192.168.112.1) 1.71ms
[I 14:22:16.241 NotebookApp] 302 POST /login?next=%2Ftree%3F (192.168.112.1) 1.58ms
5)瀏覽器輸入地址和端口
輸入密碼登錄進(jìn)去
1.3 在IPython Notebook中使用Spark
1)進(jìn)入ipynotebook工作目錄
[root@tango-spark01 /]# cd /usr/local/spark/ipynotebook[root@tango-spark01 ipynotebook]#
2)在IPython Notebook界面中運行pyspark
[root@tango-spark01 ipynotebook]# PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark3)單擊New選擇Python 2,新建Notebook
4)新建Notebook后會出現(xiàn)新的頁面,默認(rèn)notebook名稱為Untitled,單擊后修改名稱
5)在Notebook運行程序代碼
6)保存Notebook下次可繼續(xù)打開使用
2、Spark SQL、DataFrame、RDD數(shù)據(jù)統(tǒng)計與可視化
2.1 RDD、DataFrame和Spark SQL比較
RDD和Data Frame都是Spark平臺下分布式彈性數(shù)據(jù)集,都有惰性機制,在進(jìn)行創(chuàng)建、轉(zhuǎn)換時不會立即執(zhí)行,等到Action時才會遍歷運算。
RDD API進(jìn)行數(shù)據(jù)統(tǒng)計,主要使用map配合reduceByKey,需要有Map/Reduce概念
與RDD不同的是Data Frame更像是傳統(tǒng)的數(shù)據(jù)庫表格,除了數(shù)據(jù)以外,還記錄了數(shù)據(jù)的結(jié)構(gòu)信息
Spark SQL則是由DataFrame派生出來,必須先創(chuàng)建DataFrame,然后通過登錄Spark SQL temp table就可以使用Spark SQL語句,直接使用SQL語句進(jìn)行查詢
下表列出在進(jìn)行數(shù)據(jù)統(tǒng)計計算時候,RDD、Data Frame和Spark SQL使用的不同方法。
| RDD API | userRDD.map(lambda x:(x[2],1)).reduceByKey(lambda x,y: x+y).collect() |
| DataFrame | user_df.select(“gender”).groupby(“gender”).count().show() |
| Spark SQL | sqlContext.sql(“””SELECT gender,count(*) counts FROM user_table GROUP BY gender”””).show() |
2.2 創(chuàng)建RDD、DataFrame和Spark SQL
在Hadoop YARN-client模式運行IPython Notebook
[root@tango-spark01 ipynotebook]# PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" HADOOP_CONF_DIR=/usr/local/spark/hadoop-2.9.0/etc/hadoop pyspark --master yarn --deploy-mode client創(chuàng)建RDD
1)配置文件讀取路徑
global Pathif sc.master[0:5] =="local":
Path="file:/usr/local/spark/ipynotebook/"
else:
Path="hdfs://tango-spark01:9000/input/"
如果sc.master[0:5]是“l(fā)ocal”,代表當(dāng)前在本地運行,讀取本地文件
如果sc.master[0:5]不是“l(fā)ocal”,有可能是YARN client或Spark Stand Alone,必須讀取HDFS文件
2)讀取文本文件并且查看數(shù)據(jù)項數(shù)
RawUserRDD=sc.textFile(Path+"data/u.user")RawUserRDD.count()
RawUserRDD.take(5)
3)獲取每一個字段
userRDD= RawUserRDD.map(lambda line:line.split("|"))userRDD.take(5)
創(chuàng)建Data Frame
1)創(chuàng)建sqlContext:在Spark早期版本中,spark context是Spark的入口、SQLContext是SQL入口、HiveContext是hive入口。在Spark 2.0中,使用Spark Session可同時具備spark context、sqlContext、HiveContext功能
sqlContext=SparkSession.builder.getOrCreate()2)定義Schema:定義DataFrames的每個字段名與數(shù)據(jù)類型
from pyspark.sql import Rowuser_Rows = userRDD.map(lambda p:
Row(userid=int(p[0]),age=int(p[1]),gender=p[2],occupation=p[3],zipcode=p[4]))
user_Rows.take(5)
3)創(chuàng)建DataFrames:使用sqlContext.createDataFrame()方法創(chuàng)建DataFrame
user_df=sqlContext.createDataFrame(user_Rows)user_df.printSchema()
4)查看DataFrames數(shù)據(jù)
user_df.show(5)5)為DataFrame創(chuàng)建別名:可以使用.alias幫DataFrame創(chuàng)建別名
df=user_df.alias("df")df.show(5)
使用SparkSQL
創(chuàng)建DataFrame后,使用該DataFrame登錄Spark SQL temp table,登錄后可以使用Spark SQL
1)登錄臨時表
user_df.registerTempTable("user_table")2)使用Spark SQL查看項數(shù)
sqlContext.sql("SELECT count(*) counts FROM user_table").show()3)多行輸入Spark SQL語句,需要使用3個雙引號引住SQL
sqlContext.sql("""SELECT count(*) counts
FROM user_table
""").show()
4)使用SparkSQL查看數(shù)據(jù),限定數(shù)據(jù)項
sqlContext.sql("SELECT * FROM user_table").show()sqlContext.sql("SELECT * FROM user_table").show(5)
sqlContext.sql("SELECT * FROM user_table LIMIT 5").show()
2.3 數(shù)據(jù)統(tǒng)計操作
2.3.1 篩選數(shù)據(jù)
使用RDD篩選數(shù)據(jù)
RDD中使用filter方法篩選每一項數(shù)據(jù),配合lambda語句創(chuàng)建匿名函數(shù)傳入?yún)?shù)
userRDD.filter(lambda r:r[3]=='technician' and r[2]=='M' and r[1]=='24').take(5)輸入DataFrames篩選數(shù)據(jù)
使用Spark SQL篩選數(shù)據(jù)
SELECT *
FROM user_table
where occupation='technician' and gender='M' and age=24""").show(5)
2.3.2 按字段給數(shù)據(jù)排序
RDD按字段給數(shù)據(jù)排序
userRDD.takeOrdered(5,key=lambda x:-1*int(x[1]))——降序排序
userRDD.takeOrdered(5,key=lambda x:(-int(x[1]),x[2]))——多個字段排序
使用DataFrame排序
user_df.select("userid","occupation","gender","age").orderBy("age",ascending=0).show(5)
df.orderBy(["age","gender"],ascending=[0,1]).show(5)——多個字段排序
使用Spark SQL排序
SELECT userid,occupation,gender,age FROM user_table
order by age desc,gender""").show(5)
2.3.3 顯示不重復(fù)數(shù)據(jù)
RDD顯示不重復(fù)數(shù)據(jù)
DataFrame顯示不重復(fù)數(shù)據(jù)
Spark SQL顯示不重復(fù)數(shù)據(jù)
2.3.4 分組統(tǒng)計數(shù)據(jù)
1)RDD分組統(tǒng)計數(shù)據(jù)
userRDD.map(lambda x:(x[2],1)).reduceByKey(lambda x,y:x+y).collect()2)DataFrames分組統(tǒng)計數(shù)據(jù)
user_df.select("gender").groupby("gender").count().show()3)Spark SQL分組統(tǒng)計數(shù)據(jù)
sqlContext.sql("""SELECT gender,count(*) counts FROM user_table
group by gender""").show()
2.3.5 Join聯(lián)接數(shù)據(jù)
準(zhǔn)備zipcode數(shù)據(jù)
1)拷貝數(shù)據(jù)到HDFS目錄下
[root@tango-spark01 data]# hadoop fs -copyFromLocal -f /usr/local/spark/ipynotebook/data/free-zipcode-database-Primary.csv /input/data2)讀取并查看數(shù)據(jù)
Path="hdfs://tango-spark01:9000/input/"rawDataWithHeader=sc.textFile(Path+"data/free-zipcode-database-Primary.csv")
rawDataWithHeader.take(5)
3)刪除第一項數(shù)據(jù)
header = rawDataWithHeader.first()rawData = rawDataWithHeader.filter(lambda x:x !=header)
4)刪除特殊符號
rawData.first()rData=rawData.map(lambda x:x.replace("\"",""))
rData.first()
5)獲取每一個字段
zipRDD=rData.map(lambda x:x.split(","))zipRDD.first()
創(chuàng)建zipcode_tab
1)創(chuàng)建zipCode Row的schema
from pyspark.sql import Rowzipcode_data = zipRDD.map(lambda p:
Row(zipcode=int(p[0]),zipCodeType=p[1],city=p[2],state=p[3]))
zipcode_data.take(5)
2)Row類型數(shù)據(jù)創(chuàng)建DataFrames
zipcode_df=sqlContext.createDataFrame(zipcode_data)zipcode_df.printSchema()
3)創(chuàng)建登錄臨時表
zipcode_df.registerTempTable("zipcode_table")zipcode_df.show(10)
Spark SQL聯(lián)接zipcode_table
select u.*,z.city,z.state from user_table u
left join zipcode_table z ON u.zipcode=z.zipcode
where z.state='NY'
""").show(10)
2.3.6 使用Pandas DataFrame繪圖
按照不同的州統(tǒng)計并以直方圖顯示
1)轉(zhuǎn)換為Pandas DataFrames
import pandas as pdGroupByState_pandas_df = GroupByState_df.toPandas().set_index('state')
GroupByState_pandas_df
2)使用Pandas DataFrames繪出直方圖
import matplotlib.pyplot as plt#matplotlib inline
ax=GroupByState_pandas_df['count'].plot(kind='bar',title='State',figsize=(12,6),legend=True,fontsize=12)
plt.show()
按照不同的職業(yè)統(tǒng)計并以餅圖顯示
1)創(chuàng)建Occupation_df
Occupation_df=sqlContext.sql("""SELECT u.occupation,count(*) counts
FROM user_table u
GROUP BY occupation
""")
Occupation_df.show(5)
2)創(chuàng)建Occupation_pandas_df
Occupation_pandas_df=Occupation_df.toPandas().set_index('occupation')Occupation_pandas_df
3)用Pandas DataFrame是繪出餅圖PieChart
ax=Occupation_pandas_df['counts'].plot(kind='pie',title='occupation',figsize=(8,8),startangle=90,autopct='%1.1f%%')
ax.legend(bbox_to_anchor=(1.05,1),loc=2,borderaxespad=0.)
plt.show()
kind='pie':繪制餅圖
startangle=90:設(shè)置圖形旋轉(zhuǎn)角度
autopct='%1.1f%%':設(shè)置顯示餅圖%
參考資料
Python+Spark 2.0+Hadoop機器學(xué)習(xí)與大數(shù)據(jù)實戰(zhàn),林大貴
總結(jié)
以上是生活随笔為你收集整理的dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: can报文的帧格式
- 下一篇: groovy+mysql数据库_使用Gr