日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

PySpark简介、搭建以及使用

發布時間:2024/1/1 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 PySpark简介、搭建以及使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • 一、PySpark簡介
    • 使用場景
    • 結構體系
  • 二、PySpark集成搭建
  • 三、 PySpark的使用
    • PySpark包介紹
    • PySpark處理數據
    • PySpark中使用匿名函數
    • 加載本地文件
    • PySpark中使用SparkSQL
    • Spark與Python第三方庫混用
    • Pandas DF與Spark DF
    • 使用PySpark通過圖形進行數據探索

一、PySpark簡介

使用場景

大數據處理或機器學習時的原型( prototype)開發

  • 驗證算法
  • 執行效率可能不高
  • 要求能夠快速開發

結構體系

二、PySpark集成搭建

準備環境:JDK、Spark需要提前安裝好

下載Anaconbda

  • 地址:點擊這里
  • 選擇:Anaconda3-5.1.0-Linux-x86_64.sh

至于版本最好不要使用過低版本,可能無法使用

安裝bzip2

缺少 bzip2 安裝 Anaconda 會失敗

  • 在Linux下安裝bzip2:yum install -y bzip2

上傳/解壓Anaconbda

  • 將下載好的Anaconbda上傳至Linux中

  • 解壓安裝Anaconbda:bash Anaconda3-5.1.0-Linux-x86_64.sh

  • 回車,開始安裝,然后提示接受協議(輸入yes回車),然后指定安裝到的位置,根路徑必須已存在,(否則默認安裝在/root/anaconbda3下面)

  • 處理完上面的步驟后會提示是否自動添加環境變量,輸入yes即可

  • 然后還會提示是否安裝VSCode,這里linux不需要安裝,輸入no即可

Linux默認自帶python,安裝Anacondd會覆蓋原有的Python,可以通過修改.bashrc使兩個版本pyrhon共存

設置兩個版本的python共存

  • 配置文件:vim /root/.bashrc
#添加以下內容,自行修改自己安裝的路徑 export PATH="/opt/install/anaconda3/bin:$PATH" alias pyana="/opt/install/anaconda3/bin/python" alias python="/bin/python"
  • 保存退出后生效配置文件:source /root/.bashrc

生成 PySpark 配置文件

  • 在當前用戶文件夾下運行以下命令生成配置文件:jupyter notebook --generate-config

  • 查看生成的配置文件:ll /root/.jupyter/

  • 修改配置文件,但在這之前,需要先執行以下操作

  • 使用 pyana,進入交互模式,運行以下代碼

from notebook.auth import passwd passwd() #按照提示設置密碼后會生成與之對應的加密密碼,然后保存這個生成的字符串,后面會賦值給 c.NotebookApp.password 屬性

  • 修改配置文件,允許從外部訪問 Jupyter:vi ./.jupyter/jupyter_notebook_config.py
c.NotebookApp.allow_root = True c.NotebookApp.ip = '*' c.NotebookApp.open_browser = False c.NotebookApp.password = 'sha1:*****************'#將前面生成的值放到這里 c.NotebookApp.port = 7070 #指定外部訪問的端口號
  • 修改環境變量,將Jupyter作為PySpark的編輯運行工具:vim /root/.bashrc
export PYSPARK_PYTHON=/opt/install/anaconda3/bin/python3 #指定/anaconda3/bin/python3 export PYSPARK_DRIVER_PYTHON=/opt/install/anaconda3/bin/jupyter #指定/anaconda3/bin/jupyter export PYSPARK_DRIVER_PYTHON_OPTS="notebook" ipython_opts="notebook -pylab inline"
  • 生效環境變量:source /root/.bashrc

  • 注意關閉防火墻

  • 啟動pyspark:pyspark

  • 使用瀏覽器打開Jupyter:192.168.**.**:7070,并輸入預先設置的密碼

  • 這里安裝就算完成了

三、 PySpark的使用

  • 初次使用建議創建一個文件夾,在這個文件夾保存操作過的代碼
  • 進入到新創建的文件夾下面,new->python3
  • 然后就可以開始操作學習
  • 執行命令
    shift+回車:執行并開啟新的一行
    ctrl+回車:僅執行

PySpark包介紹

PySpark

Core Classes: pyspark.SparkContext pyspark.RDD pyspark.sql.SQLContext pyspark.sql.DataFrame

pyspark.streaming

pyspark.streaming.StreamingContext pyspark.streaming.DStream

pyspark.ml

pyspark.mllib

PySpark處理數據

  • 導包
from pyspark import SparkContext
  • 獲取SparkContext對象
sc=SparkContext.getOrCreate()
  • 創建RDD
#不支持 makeRDD() #支持 parallelize() textFile() wholeTextFiles()
  • 演示

PySpark中使用匿名函數

  • 使用Python的Lambda函數實現匿名函數
  • scala與python對比
#scala val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle")) val b=a.map(x=>(x,1)) b.collect#python a=sc.parallelize(("dog","tiger","lion","cat","panther","eagle")) b=a.map(lambda x:(x,1)) b.collect()
  • 演示

加載本地文件

addFile(path, recursive = False)

  • 接收本地文件
  • 通過SparkFiles.get()方法來獲取文件的絕對路徑

addPyFile( path )

  • 加載已存在的文件并調用其中的方法
  • 現在本地創建一個文件:vi sci.py寫入下面兩個方法人,然后保存退出
#sci.py def sqrt(num):return num * numdef circle_area(r):return 3.14 * sqrt(r)
  • 在pyspark中通過addPyFile加載該文件
#加載預寫入方法的文件 sc.addPyFile("file:///root/sci.py") #導入文件中的方法 from sci import circle_area #創建rdd并使用文件中的方法 sc.parallelize([5, 9, 21]).map(lambda x : circle_area(x)).collect()
  • 演示

PySpark中使用SparkSQL

  • 導包
from pyspark.sql import SparkSession
  • 創建SparkSession對象
ss = SparkSession.builder.getOrCreate()
  • 加載csv文件
ss.read.format("csv").option("header", "true").load("file:///xxx.csv")

演示

  • 測試數據
Afghanistan 48.673000 SAs Albania 76.918000 EuCA Algeria 73.131000 MENA Angola 51.093000 SSA Argentina 75.901000 Amer Armenia 74.241000 EuCA Aruba 75.246000 Amer Australia 81.907000 EAP Austria 80.854000 EuCA Azerbaijan 70.739000 EuCA Bahamas 75.620000 Amer Bahrain 75.057000 MENA Bangladesh 68.944000 SAs Barbados 76.835000 Amer Belarus 70.349000 EuCA Belgium 80.009000 EuCA Belize 76.072000 Amer Benin 56.081000 SSA Bhutan 67.185000 SAs Bolivia 66.618000 Amer Bosnia_and_Herzegovina 75.670000 EuCA Botswana 53.183000 SSA Brazil 73.488000 Amer Brunei 78.005000 EAP Bulgaria 73.371000 EuCA Burkina_Faso 55.439000 SSA Burundi 50.411000 SSA Cambodia 63.125000 EAP Cameroon 51.610000 SSA Canada 81.012000 Amer Cape_Verde 74.156000 SSA Central_African_Rep. 48.398000 SSA Chad 49.553000 SSA Channel_Islands 80.055000 EuCA Chile 79.120000 Amer China 73.456000 EAP Colombia 73.703000 Amer Comoros 61.061000 SSA Congo_Dem._Rep. 48.397000 SSA Congo_Rep. 57.379000 SSA Costa_Rica 79.311000 Amer Cote_d'Ivoire 55.377000 SSA Croatia 76.640000 EuCA Cuba 79.143000 Amer
  • 操作代碼
#導包 from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.types import DoubleType #創建sparkSession對象 ss = SparkSession.builder.getOrCreate()#讀取本地csv文件,并為每列設置名稱 #pyspark中一條語句換行需要加斜杠 df = ss.read.format("csv").option("delimiter", " ").load("file:///root/example/LifeExpentancy.txt") \.withColumn("Country", col("_c0")) \.withColumn("LifeExp", col("_c2").cast(DoubleType())) \.withColumn("Region", col("_c4")) \.select(col("Country"), col("LifeExp"), col("Region")) df.describe("LifeExp").show()
  • 效果展示

Spark與Python第三方庫混用

  • 使用Spark做大數據ETL
  • 處理后的數據使用Python第三方庫分析或展示
1.Pandas做數據分析#Pandas DataFrame 轉 Spark DataFrame spark.createDataFrame(pandas_df)#Spark DataFrame轉Pandas DataFrame spark_df.toPandas() 2.Matplotlib實現數據可視化3.Scikit-learn完成機器學習

Pandas DF與Spark DF

  • PandasDF與SparkDF間的轉換方法
  • 測試數據
  • 操作代碼
# Pandas DataFrame to Spark DataFrame import numpy as np import pandas as pd from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() pandas_df = pd.read_csv("./products.csv", header=None, usecols=[1, 3, 5]) print(pandas_df)# convert to Spark DataFrame spark_df = spark.createDataFrame(pandas_df) spark_df.show() df2 = spark_df.withColumnRenamed("1", "id").withColumnRenamed("3", "name").withColumnRenamed("5", "remark")# convert back to Pandas DataFrame df2.toPandas()
  • 演示


使用PySpark通過圖形進行數據探索

  • 將數據劃分為多個區間,并統計區間中的數據個數
# 獲取上面演示示例中的第一個df對象 rdd = df.select("LifeExp").rdd.map(lambda x: x[0]) #把數據劃為10個區間,并獲得每個區間中的數據個數 (countries, bins) = rdd.histogram(10) print(countries) print(bins)#導入圖形生成包 import matplotlib.pyplot as plt import numpy as np plt.hist(rdd.collect(), 10) # by default the # of bins is 10 plt.title("Life Expectancy Histogram") plt.xlabel("Life Expectancy") plt.ylabel("# of Countries")
  • 演示

總結

以上是生活随笔為你收集整理的PySpark简介、搭建以及使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。