PySpark 数据分析基础:PySpark 原理详解
?一、基礎原理
我們知道 spark 是用 scala 開發的,而 scala 又是基于 Java 語言開發的,那么 spark 的底層架構就是 Java 語言開發的。如果要使用 python 來進行與 java 之間通信轉換,那必然需要通過 JVM 來轉換。我們先看原理構建圖:
從圖中我們發現在 python 環境中我們編寫的程序將以 SparkContext 的形式存在,Pythpn 通過于 Py4j 建立 Socket 通信,通過 Py4j 實現在 Python 中調用 Java 的方法,將我們編寫成 python 的 SpakrContext 對象通過 Py4j,最終在 JVM Driver 中實例化為 Scala 的 SparkContext。
那么我們再從 Spark 集群運行機制來看:
主節點運行 Spark 任務是通過 SparkContext 傳遞任務分發到各個從節點,標橙色的方框就為 JVM。通過 JVM 中間語言與其他從節點的 JVM 進行通信。之后 Executor 通信結束之后下發 Task 進行執行。
此時我們再把 python 在每個主從節點展示出來:
這樣就一目了然了:主節點的 Python 通過 Py4j 通信傳遞 SparkContext,最后在 JVM Driver 上面生成 SparkContxt。主節點 JVM Driver 與其他從節點的 JVM Executor 通信傳輸 SparkContext,JVM Executor 通過分解 SparkContext 為許多 Task,給 pyspark.daemon 調用 pyspark.work 從 socket 中讀取要執行的 python 函數和數據,開始真正的數據處理邏輯。數據處理完成之后將處理結果寫回 socket,jvm 中通過 PythonRDD 的 read 方法讀取,并返回結果。最終 executor 將 PythonRDD 的執行結果上報到 drive 上,返回給用戶。
完整了解 PySpark 在集群上運行的原理之后,再看上圖就很容易理解了。
Executor 端運行的 Task 邏輯是由 Driver 發過來的,那是序列化后的字節碼,雖然里面可能包含有用戶定義的 Python 函數或 Lambda 表達式,Py4j 并不能實現在 Java 里調用 Python 的方法,為了能在 Executor 端運行用戶定義的 Python 函數或 Lambda 表達式,則需要為每個 Task 單獨啟一個 Python 進程,通過 socket 通信方式將 Python 函數或 Lambda 表達式發給 Python 進程執行。
二、程序運行原理
1.主節點 JVM 運行過程
當我們提交 pyspark 的任務時,會先上傳 python 腳本以及依賴并申請資源,申請到資源后會通過 PythonRunner 拉起 JVM。
首先 PythonRunner 開啟 Pyj4 GatewayServer,通過 Java Process 方式運行用戶上傳的 Python 腳本。
? 用戶 Python 腳本起來后,首先會實例化 Python 版的 SparkContext 對象,并且實例化 Py4j GatewayClient,連接 JVM 中的 Py4j GatewayServer,后續在 Python 中調用 Java 的方法都是借助這個 Py4j Gateway。然后通過 Py4j Gateway 在 JVM 中實例化 SparkContext 對象。
? 過上面兩步后,SparkContext 對象初始化完畢,與其他從節點通信。開始申請 Executor 資源,同時開始調度任務。用戶 Python 腳本中定義的一系列處理邏輯最終遇到 action 方法后會觸發 Job 的提交,提交 Job 時是直接通過 Py4j 調用 Java 的 PythonRDD.runJob 方法完成,映射到 JVM 中,會轉給 sparkContext.runJob 方法,Job 運行完成后,JVM 中會開啟一個本地 Socket 等待 Python 進程拉取,對應地,Python 進程在調用 PythonRDD.runJob 后就會通過 Socket 去拉取結果。
2.從節點 JVM 運行過程
當 Driver 得到 Executor 資源時,通過 CoarseGrainedExecutorBackend(其中有 main 方法)通信 JVM,啟動一些必要的服務后等待 Driver 的 Task 下發,在還沒有 Task 下發過來時,Executor 端是沒有 Python 進程的。當收到 Driver 下發過來的 Task 后,Executor 的內部運行過程如下圖所示。
Executor 端收到 Task 后,會通過 launchTask 運行 Task,最后會調用到 PythonRDD 的 compute 方法,來處理一個分區的數據,PythonRDD 的 compute 方法的計算流程大致分三步走:
-
如果不存在 pyspark.deamon 后臺 Python 進程,那么通過 Java Process 的方式啟動 pyspark.deamon 后臺進程,注意每個 Executor 上只會有一個 pyspark.deamon 后臺進程,否則,直接通過 Socket 連接 pyspark.deamon,請求開啟一個 pyspark.worker 進程運行用戶定義的
-
Python 函數或 Lambda 表達式。pyspark.deamon 是一個典型的多進程服務器,來一個 Socket 請求,fork 一個 pyspark.worker 進程處理,一個 Executor 上同時運行多少個 Task,就會有多少個對應的 pyspark.worker 進程。
-
緊接著會單獨開一個線程,給 pyspark.worker 進程輸入數據,pyspark.worker 則會調用用戶定義的 Python 函數或 Lambda 表達式處理計算。在一邊輸入數據的過程中,另一邊則通過 Socket 去拉取 pyspark.worker 的計算結果。
把前面運行時架構圖中 Executor 部分單獨拉出來,如下圖所示,橙色部分為 JVM 進程,白色部分為 Python 進程,每個 Executor 上有一個公共的 pyspark.deamon 進程,負責接收 Task 請求,并 fork pyspark.worker 進程單獨處理每個 Task,實際數據處理過程中,pyspark.worker 進程和 JVM Task 會較頻繁地進行本地 Socket 數據通信。
三、總結
總體而言,PySpark 是借助 Py4j 實現 Python 調用 Java,來驅動 Spark 應用程序,本質上主要還是 JVM runtime,Java 到 Python 的結果返回是通過本地 Socket 完成。雖然這種架構保證了 Spark 核心代碼的獨立性,但是在大數據場景下,JVM 和 Python 進程間頻繁的數據通信導致其性能損耗較多,惡劣時還可能會直接卡死,所以建議對于大規模機器學習或者 Streaming 應用場景還是慎用 PySpark,盡量使用原生的 Scala/Java 編寫應用程序,對于中小規模數據量下的簡單離線任務,可以使用 PySpark 快速部署提交
總結
以上是生活随笔為你收集整理的PySpark 数据分析基础:PySpark 原理详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: bat自动清理(girl的电脑桌面)
- 下一篇: WorkNC刀柄轮廓导入方法