日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[大数据]PySpark原理与基本操作

發布時間:2023/12/15 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [大数据]PySpark原理与基本操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一 PySpark

Spark運行時架構

首先我們先回顧下Spark的基本運行時架構,如下圖所示,其中橙色部分表示為JVM,Spark應用程序運行時主要分為Driver和Executor,Driver負載總體調度及UI展示,Executor負責Task運行,Spark可以部署在多種資源管理系統中,例如Yarn、Mesos等,同時Spark自身也實現了一種簡單的Standalone(獨立部署)資源管理系統,可以不用借助其他資源管理系統即可運行。

更多細節請參考 Spark Scheduler內部原理剖析。

用戶的Spark應用程序運行在Driver上(某種程度上說,用戶的程序就是Spark Driver程序),經過Spark調度封裝成一個個Task,再將這些Task信息發給Executor執行,Task信息包括代碼邏輯以及數據信息,Executor不直接運行用戶的代碼。

?

PySpark運行時架構

為了不破壞Spark已有的運行時架構,Spark在外圍包裝一層Python API,借助Py4j實現Python和Java的交互,進而實現通過Python編寫Spark應用程序,其運行時架構如下圖所示。

其中白色部分是新增的Python進程,在Driver端,通過Py4j實現在Python中調用Java的方法,即將用戶寫的PySpark程序”映射”到JVM中,例如,用戶在PySpark中實例化一個Python的SparkContext對象,最終會在JVM中實例化Scala的SparkContext對象;在Executor端,則不需要借助Py4j,因為Executor端運行的Task邏輯是由Driver發過來的,那是序列化后的字節碼,雖然里面可能包含有用戶定義的Python函數或Lambda表達式,Py4j并不能實現在Java里調用Python的方法,為了能在Executor端運行用戶定義的Python函數或Lambda表達式,則需要為每個Task單獨啟一個Python進程,通過socket通信方式將Python函數或Lambda表達式發給Python進程執行。語言層面的交互總體流程如下圖所示,實線表示方法調用,虛線表示結果返回。

PySpark官方文檔:https://spark.apache.org/docs/latest/api/python/index.html

pyspark編程指南(英文):https://www.datacamp.com/community/tutorials/apache-spark-python#PySpark

二? PySpark 的操作

備忘清單:https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf

?

?

三 spark-submit提交任務模板示例

spark-submit提交方式官網:http://spark.apache.org/docs/latest/submitting-applications.html

spark-submit \--master yarn \--deploy-mode client \--num-executors 10 \--executor-memory 10g \--executor-cores 8 \--driver-memory 10g \--conf spark.pyspark.python=python3 \--conf spark.pyspark.driver.python=python3 \--py-files depend.zip \demo.py 2020-08-23

說明:
1、depend.zip是demo.py的依賴包,注:depend.zip不包含demo.py
2、demo.py中可以直接使用import depend.xx.xx 或 from depend.xx.xx import xx類似語句引入依賴包
3、上述示例中的2020-08-23是傳給demo.py的參數
參考:
pyspark spark-submit 集群提交任務以及引入虛擬環境依賴包攻略:https://www.cnblogs.com/piperck/p/10121097.html

?

四 代碼示例

1 WordCount詞頻統計

# -*- coding: utf-8 -*- import sys import os import datetime from pyspark import SparkConf,SparkContextsc = SparkConf().setAppName("wordcount") spark = SparkContext(conf=sc)text_file = spark.textFile("hdfs://examples/pyspark/words.txt") word_cnt_rdd = text_file.flatMap(lambda line : line.split(' ')).map(lambda word : (word, 1)).reduceByKey(lambda x, y: x + y) word_cnt_rdd.saveAsTextFile('hdfs://user/wordcount_result')

?

#spark-cluster-mode ./spark-submit \ --verbose \ --master yarn \ --deploy-mode cluster \ --num-executors 10 \ --executor-cores 1 \ --executor-memory 8G \ --driver-memory 4G \ --conf spark.pyspark.python=python3 \ wordcount.py#spark-client-mode ./spark-submit \ --verbose \ --master yarn \ --deploy-mode client \ --num-executors 10 \ --executor-cores 1 \ --executor-memory 8G \ --driver-memory 4G \ --conf spark.pyspark.python=python3 \ --conf spark.pyspark.driver.python=python3 \ wordcount.py

?

五、RDD操作示例

1 flatMap, map

flatMap有著一對多的表現,輸入一輸出多。并且會將每一個輸入對應的多個輸出整合成一個大的集合,當然不用擔心這個集合會超出內存的范圍,因為spark會自覺地將過多的內容溢寫到磁盤。當然如果對運行的機器的內存有著足夠的信心,也可以將內容存儲到內存中。

用同樣的方法來展示map操作,與flatMap不同的是,map通常是一對一,即輸入一個,對應輸出一個。但是輸出的結果可以是一個元組,一個元組則可能包含多個數據,但是一個元組是一個整體,因此算是一個元素。

2 spark的filter

3 reduce和reduceByKey的區別

educe和reduceByKey是spark中使用地非常頻繁的。那么reduce和reduceBykey的區別在哪呢?

reduce處理數據時有著一對一的特性,而reduceByKey則有著多對一的特性。比如reduce中會把數據集合中每一個元素都處理一次,并且每一個元素都對應著一個輸出。而reduceByKey則不同,它會把所有key相同的值處理并且進行歸并,其中歸并的方法可以自己定義。

在單詞統計中,我們采用的就是reduceByKey,對于每一個單詞我們設置成一個鍵值對(key,value),我們把單詞作為key,即key=word,而value=1,因為遍歷過程中,每個單詞的出現一次,則標注1。那么reduceByKey則會把key相同的進行歸并,然后根據我們定義的歸并方法即對value進行累加處理,最后得到每個單詞出現的次數。而reduce則沒有相同Key歸并的操作,而是將所有值統一歸并,一并處理。

?

參考:

http://sharkdtu.com/posts/pyspark-internal.html

?

?

總結

以上是生活随笔為你收集整理的[大数据]PySpark原理与基本操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。