日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

hadoop组件---spark----全面了解spark以及与hadoop的区别

發布時間:2023/12/14 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop组件---spark----全面了解spark以及与hadoop的区别 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark是什么

Spark (全稱 Apache Spark?) 是一個專門處理大數據量分析任務的通用數據分析引擎。

spark官網

Spark核心代碼是用scala語言開發的,不過支持使用多種語言進行開發調用比如scala,java,python。

spark github

Spark文檔2.4.4

Spark目前有比較完整的數據處理生態組件,可以部署在多種系統環境中,同時支持處理多種數據源。

Spark發展歷史

2009年,Spark誕生于伯克利大學AMPLab,屬于伯克利大學的研究性項目;

2010年,通過BSD 許可協議正式對外開源發布;

2012年,Spark第一篇論文發布,第一個正式版(Spark 0.6.0)發布;

2013年,成為了Aparch基金項目,進入高速發展期。第三方開發者貢獻了大量的代碼,活躍度非常高;發布Spark Streaming、Spark Mllib(機器學習)、Shark(Spark on Hadoop);

2014 年,Spark 成為 Apache 的頂級項目; 5 月底 Spark1.0.0 發布;發布 Spark Graphx(圖計算)、Spark SQL代替Shark;

2015年,推出DataFrame(大數據分析);2015年至今,Spark在國內IT行業變得愈發火爆,大量的公司開始重點部署或者使用Spark來替代MapReduce、Hive、Storm等傳統的大數據計算框架;

2016年,Spark 2.0.0版本發布,推出dataset(更強的數據分析手段);

2017年,structured streaming 發布;

2018年,Spark2.4.0發布,成為全球最大的開源項目。

截至 2020年1月15號 目前最穩定的最后發布版本為 Spark 2.4.4。

還有一個 新值得期待的 預發布版本 Spark 3.0 主要 是增加了 與k8s等云結合使用的特性。

特點

1、速度快,適合實時分析場景

Spark基于內存進行計算(當然也有部分計算基于磁盤,比如shuffle),在運算方面是hadoop運算速度的一百多倍。

2、容易上手開發

Spark的基于RDD的計算模型,比Hadoop的基于Map-Reduce的計算模型要更加易于理解,更加易于上手開發,實現各種復雜功能,比如二次排序、topN等復雜操作時,更加便捷。

3、支持多種語言

Spark提供Java,Scala,Python和R中的高級API .Spark代碼可以用任何這些語言編寫。 它在Scala和Python中提供了一個shell。 可以通過./bin/spark-shell和Python shell通過./bin/pyspark從已安裝的目錄訪問Scala shell。

4、支持多種格式的數據來源

Spark支持多種數據源,如Parquet,JSON,HDFS、Hbase、Hive和Cassandra,Alluxio,CSV和RDBMS表,還包括通常的格式,如文本文件、CSV和RDBMS表,甚至一些云存儲比如S3等。 Data Source API提供了一種可插拔的機制,用于通過Spark SQL獲取結構化數據。

5、超強的通用性

Spark提供了Spark RDD、Spark SQL、Spark Streaming、Spark MLlib、Spark GraphX等技術組件,可以一站式地完成大數據領域的離線批處理、交互式查詢、流式計算、機器學習、圖計算等常見的任務。

6、集成Hadoop

Spark并不是要成為一個大數據領域的“獨裁者”,一個人霸占大數據領域所有的“地盤”,而是與Hadoop進行了高度的集成,兩者可以完美的配合使用。Hadoop的HDFS、Hive、HBase負責存儲,YARN負責資源調度;Spark負責大數據計算。實際上,Hadoop+Spark的組合,是一種“double win”的組合。

7、可以在任何環境下搭建

spark框架可以運行在各種操作系統上。

最初Spark作為hadoop的一個計算框架組件而發布,現在慢慢長大,可以獨立運行了。意味著 我們不搭建Hadoop集群也能 獨立的安裝運行Spark。

除了運行在Hadoop集群中,

目前Spark支持

(一)local本地模式

只需要一臺機器,運行該模式非常簡單,只需要把Spark的安裝包解壓后,默認也不需修改任何配置文件,取默認值。不用啟動Spark的Master、Worker守護進程( 只有集群的Standalone方式時,才需要這兩個角色),也不用啟動Hadoop的各服務(除非你要用到HDFS)。

運行客戶端程序(可以是spark自帶的命令行程序,如spark-shell,也可以是程序員利用spark api編寫的程序),就可以完成相應的運行。相當于這一個客戶端進程,充當了所有的角色。

這種模式,只適合開發階段使用,我們可以在該模式下開發和測試代碼,使的代碼的邏輯沒問題,后面再提交到集群上去運行和測試。

如果是學習或者做測試,為了搭建環境的簡化,可以搭建本地模式。

在實際生產環境,spark會采用集群模式來運行,即分布式式運行,spark可以使用多種集群資源管理器來管理自己的集群。

(二)獨立的Spark集群standalone模式

Standalone模式,即獨立模式,自帶完整的服務,使用spark自帶的集群資源管理功能。可單獨部署到一個集群中,無需依賴任何其他資源管理系統。即每臺機器上只需部署下載的Spark版本即可。

這種模式需要提前啟動spark的master和Worker守護進程,才能運行spark客戶端程序。

因為Standalone模式不需要依賴任何第三方組件,如果數據量比較小,且不需要hadoop(如不需要訪問hdfs服務),則使用Standalone模式是一種可選的簡單方便的方案。

(三)在aws的ec2中安裝

這種模式類似于Standalone模式,不過部署的集群是aws的ec2服務器,需要有一些 權限方面的配置,在GitHub中有專門針對 ec2中部署spark的腳本項目, 可以直接根據其中的步驟進行部署。

(四)使用yarn進行管理

該模式,使用hadoop的YARN作為集群資源管理器。這種模式下因為使用yarn的服務進行資源管理,所以不需要啟動Spark的Master、Worker守護進程。

如果你的應用不僅使用spark,還用到hadoop生態圈的其它服務,從兼容性上考慮,使用Yarn作為統一的資源管理是更好的選擇,這樣選擇這種模式就比較適合。

目前spark on yarn的部署方式 最為常用。

(五)使用mesos進行管理

該模式,使用Mesos作為集群資源管理器。如果你的應用還使用了docker,則選擇此模式更加通用。

(六)使用k8s進行管理

Spark本身的設計更偏向使用靜態的資源管理,雖然Spark也支持了類似Yarn等動態的資源管理器,但是這些資源管理并不是面向動態的云基礎設施而設計的,在速度、成本、效率等領域缺乏解決方案。

隨著Kubernetes的快速發展,數據科學家們開始考慮是否可以用Kubernetes的彈性與面向云原生等特點與Spark進行結合。

在Spark 2.3中,Resource Manager中添加了Kubernetes原生的支持。

意味著 我們可以使用k8s對Spark進行管理了,而且能運用云的特性,很好的進行集群伸縮,降低我們的成本以及當運算資源不足時快速增加節點。

(七) 偽分布集群模式

即在一臺機器上模擬集群下的分布式場景,會啟動多個進程。上述的集群模式都可以啟動偽分布式集群模式,當然要求機器的配置滿足要求。

這種模式主要是開發階段和學習使用。

8、極高的社區活躍度

Spark目前是Apache基金會的頂級項目,全世界有大量的優秀工程師是Spark的committer。并且世界上很多頂級的IT公司都在大規模地使用Spark。

spark的使用場景

物聯網領域: 通過物聯網的設備收集到海量的數據,比如環境監控,海洋監控,地震預測等,需要及時的處理反饋。

大健康領域: 用戶健康生活與遺傳信息基因等數據的分析,反饋健康方面的信息給用戶

醫療保健:醫療保健領域使用實時分析來持續檢查關鍵患者的醫療狀況。尋找血液和器官移植的醫院需要在緊急情況下保持實時聯系。及時就醫是患者生死攸關的問題。

政府:政府機構主要在國家安全領域進行實時分析。各國需要不斷跟蹤警察和安全機構對于威脅的更新。

電信:以電話,視頻聊天和流媒體實時分析等形式圍繞服務的公司,以減少客戶流失并保持領先競爭優勢。他們還提取移動網絡的測量結果。

銀行業務:銀行業務幾乎涉及全球所有資金。確保整個系統的容錯事務變得非常重要。通過銀行業務的實時分析,可以實現欺詐檢測。

股票市場:股票經紀人使用實時分析來預測股票投資組合的變動。公司通過使用實時分析來推銷其品牌的市場需求,從而重新思考其業務模式。

使用spark的公司和項目也非常多,可以參考官網列表

Project and Product names using

hadoop和spark的關系與區別

Spark作為Hadoop生態中重要的一員,其發展速度堪稱恐怖,不過其作為一個完整的技術棧,在技術和環境的雙重刺激下,得到如此多的關注也是有依據的。

Spark核心在于內存計算模型代替Hadoop生態的MapReduce離線計算模型,用更加豐富Transformation和Action算子來替代map,reduce兩種算子。

計算流程的區別

Hadoop這項大數據處理技術大概已有十年歷史,而且被看做是首選的大數據集合處理的解決方案。

MapReduce是單流程的優秀解決方案,不過對于需要多流程計算和算法的用例來說,并非十分高效。

數據處理流程中的每一步都需要一個Map階段和一個Reduce階段,而且如果要利用這一解決方案,需要將所有用例都轉換成MapReduce模式。

在下一步開始之前,上一步的作業輸出數據必須要存儲到分布式文件系統中。因此,復制和磁盤存儲會導致這種方式速度變慢。

另外Hadoop解決方案中通常會包含難以安裝和管理的集群。而且為了處理不同的大數據用例,還需要集成多種不同的工具(如用于機器學習的Mahout和流數據處理的Storm)。

如果想要完成比較復雜的工作,就必須將一系列的MapReduce作業串聯起來然后順序執行這些作業。每一個作業都是高時延的,而且只有在前一個作業完成之后下一個作業才能開始啟動。

而Spark則允許程序開發者使用有向無環圖(DAG)開發復雜的多步數據管道。而且還支持跨有向無環圖的內存數據共享,以便不同的作業可以共同處理同一個數據。

Spark運行在現有的Hadoop分布式文件系統基礎之上(HDFS)提供額外的增強功能。

它支持將Spark應用部署到現存的Hadoop v1集群(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN集群甚至是Apache Mesos之中。

我們應該將Spark看作是Hadoop MapReduce的一個替代品而不是Hadoop的替代品。其意圖并非是替代Hadoop,而是為了提供一個管理不同的大數據用例和需求的全面且統一的解決方案。

關鍵區別

hadoop是批處理工具,更擅長處理離線數據,而spark在內存中處理數據,可以是實時處理。

Hadoop基于大數據的批處理。 這意味著數據會在一段時間內先存儲下來,然后使用Hadoop進行處理。

在Spark中,處理可以實時進行。

Spark中的這種實時處理能力幫助我們解決實時分析問題。

除此之外,Spark能夠比Hadoop MapReduce( Hadoop處理框架)快100倍地進行批處理。

因此,目前Apache Spark是業界大數據處理的首選工具。

hadoop和spark發展的歷史故事參考

https://www.zhihu.com/question/23036370?sort=created

組件框架的區別

針對核心關鍵的功能 ,Hadoop和Spark都發展出了相應的組件

HadoopSpark
處理引擎MapreduceSpark RDD(Spark Core)
交互式查詢HiveSpark SQL
實時流計算StormSpark Streaming
機器學習MahoutMLlib
圖計算Hama或者 GiraphGraphX

Spark相關概念

Spark Shell

Spark的shell提供了一種學習API的簡單方法,以及一種以交互方式分析數據的強大工具。

Spark Session

在早期版本的Spark中,Spark Context是Spark的入口點。 對于每個其他API,我們需要使用不同的上下文。 對于流式傳輸,我們需要StreamingContext,SQL sqlContext和hive HiveContext。 為了解決這個問題,SparkSession應運而生。 它本質上是SQLContext,HiveContext和StreamingContext的組合。

數據源

Data Source API提供了一種可插拔的機制,用于通過Spark SQL訪問結構化數據。 Data Source API用于將結構化和半結構化數據讀取并存儲到Spark SQL中。 數據源不僅僅是簡單的管道,可以轉換數據并將其拉入Spark。

RDD

彈性分布式數據集(RDD)是Spark的基本數據結構。 它是一個不可變的分布式對象集合。 RDD中的每個數據集被劃分為邏輯分區,其可以在集群的不同節點上計算。 RDD可以包含任何類型的Python,Java或Scala對象,包括用戶定義的類。

RDD可被分發到集群各個節點上,進行并行操作。RDDs 可以通過 Hadoop InputFormats 創建(如 HDFS),或者從其他 RDDs 轉化而來。

獲得RDD的三種方式:

Parallelize:將一個存在的集合,變成一個RDD,這種方式試用于學習spark和做一些spark的測試

>>>sc.parallelize(['cat','apple','bat’])

MakeRDD:只有scala版本才有此函數,用法與parallelize類似

textFile:從外部存儲中讀取數據來創建 RDD

>>>sc.textFile(“file\\\usr\local\spark\README.md”)

RDD的兩個特性:不可變;分布式。

RDD支持兩種操作;

Transformation(轉化操作:返回值還是RDD)如map(),filter()等。這種操作是lazy(惰性)的,即從一個RDD轉換生成另一個RDD的操作不是馬上執行,只是記錄下來,只有等到有Action操作是才會真正啟動計算,將生成的新RDD寫到內存或hdfs里,不會對原有的RDD的值進行改變;

Action(行動操作:返回值不是RDD)會實際觸發Spark計算,對RDD計算出一個結果,并把結果返回到內存或hdfs中,如count(),first()等。

RDD的緩存策略

Spark最為強大的功能之一便是能夠把數據緩存在集群的內存里。這通過調用RDD的cache函數來實現:rddFromTextFile.cache,

調用一個RDD的cache函數將會告訴Spark將這個RDD緩存在內存中。在RDD首次調用一個執行操作時,這個操作對應的計算會立即執行,數據會從數據源里讀出并保存到內存。因此,首次調用cache函數所需要的時間會部分取決于Spark從輸入源讀取數據所需要的時間。但是,當下一次訪問該數據集的時候,數據可以直接從內存中讀出從而減少低效的I/O操作,加快計算。多數情況下,這會取得數倍的速度提升。

廣播變量

廣播變量(broadcast variable)為只讀變量,它由運行SparkContext的驅動程序創建后發送給會參與計算的節點。對那些需要讓各工作節點高效地訪問相同數據的應用場景,比如機器學習,這非常有用。Spark下創建廣播變量只需在SparkContext上調用一個方法即可:

>>> broadcastAList = sc.broadcast(list(["a", "b", "c", "d", "e"]))

累加器Accumulator

在Spark中如果想在Task計算的時候統計某些事件的數量,使用filter/reduce也可以,但是使用累加器是一種更方便的方式,累加器一個比較經典的應用場景是用來在Spark Streaming應用中記錄某些事件的數量。

使用累加器時需要注意只有Driver能夠取到累加器的值,Task端進行的是累加操作。

創建的Accumulator變量的值能夠在Spark Web UI上看到,在創建時應該盡量為其命名

Spark內置了三種類型的Accumulator,分別是LongAccumulator用來累加整數型,DoubleAccumulator用來累加浮點型,CollectionAccumulator用來累加集合元素。

后續我們會記錄累加器的用法。

Dataset

Dataset是分布式數據集合。 數據集可以從JVM對象構造,然后使用功能轉換(map,flatMap,filter等)進行操作。 數據集API在Scala和Java中可用。

DataFrames

DataFrame是命名列組織成數據集。 它在概念上等同于關系數據庫中的表或R / Python中的數據框,但在引擎蓋下具有更豐富的優化。 DataFrame可以從多種來源構建,例如:結構化數據文件,Hive中的表,外部數據庫或現有RDD。

RDD、Dataframe、DataSet區別

spark中 RDD、DataFrame、Dataset的關系及區別 以及相互轉換

Spark 組件

Spark組件使Apache Spark快速可靠。 構建了很多這些Spark組件來解決使用Hadoop MapReduce時出現的問題。 Apache Spark具有以下組件:

Spark Core
Spark Streaming
Spark SQL
GraphX
MLlib (Machine Learning)


用戶使用的SQL、Streaming、MLib、GraphX接口最終都會轉換成Spark Core分布式運行。

Spark Core

Spark Core是大規模并行和分布式數據處理的基礎引擎。 核心是分布式執行引擎,Java,Scala和Python API為分布式ETL應用程序開發提供了一個平臺。 此外,在核心上構建的其他庫允許用于流式傳輸,SQL和機器學習的各種工作負載。 它負責:

內存管理和故障恢復
在群集上調度,分發和監視作業
與存儲系統交互

Spark Streaming

Spark Streaming是Spark的組件,用于處理實時流數據。 因此,它是核心Spark API的補充。 它支持實時數據流的高吞吐量和容錯流處理。 基本流單元是DStream,它基本上是一系列用于處理實時數據的RDD(彈性分布式數據集)。

Spark Streaming是spark中一個非常重要的擴展庫,它是Spark核心API的一個擴展,可以實現高吞吐量的、具備容錯機制的實時流數據的處理。支持從多種數據源獲取數據,包括Kafk、Flume、以及TCP socket等,從數據源獲取數據之后,可以使用諸如map、reduce和window等高級函數進行復雜算法的處理。最后還可以將處理結果存儲到文件系統和數據庫等。

但從Spark2.0開始,提出了新的實時流框架 Structured Streaming (2.0和2.1是實驗版本,從Spark2.2開始為穩定版本)來替代Spark streaming,這時Spark streaming就進入維護模式。相比Spark Streaming,Structured Streaming的Api更加好用,功能強大。

Spark SQL

Spark SQL是Spark中的一個新模塊,它使用Spark編程API實現集成關系處理。 它支持通過SQL或Hive查詢查詢數據。 對于那些熟悉RDBMS的人來說,Spark SQL將很容易從之前的工具過渡到可以擴展傳統關系數據處理的邊界。

Spark SQL通過函數編程API集成關系處理。 此外,它為各種數據源提供支持,并且使用代碼轉換編織SQL查詢,從而產生一個非常強大的工具。

以下是Spark SQL的四個庫。

Data Source API
DataFrame API
Interpreter & Optimizer
SQL Service

Spark SQL是Spark用來操作結構化數據的組件。通過Spark SQL,用戶可以使用SQL或者Apache Hive版本的SQL方言(HQL)來查詢數據。Spark SQL支持多種數據源類型,例如Hive表、Parquet以及JSON等。Spark SQL不僅為Spark提供了一個SQL接口,還支持開發者將SQL語句融入到Spark應用程序開發過程中,無論是使用Python、Java還是Scala,用戶可以在單個的應用中同時進行SQL查詢和復雜的數據分析。

GraphX

GraphX是用于圖形和圖形并行計算的Spark API。 因此,它使用彈性分布式屬性圖擴展了Spark RDD。

屬性圖是一個有向多圖,它可以有多個平行邊。 每個邊和頂點都有與之關聯的用戶定義屬性。 這里,平行邊緣允許相同頂點之間的多個關系。 在高層次上,GraphX通過引入彈性分布式屬性圖來擴展Spark RDD抽象:一個定向多圖,其屬性附加到每個頂點和邊。

為了支持圖形計算,GraphX公開了一組基本運算符(例如,subgraph,joinVertices和mapReduceTriplets)以及Pregel API的優化變體。 此外,GraphX包含越來越多的圖算法和構建器,以簡化圖形分析任務。

GraphX是Spark面向圖計算提供的框架與算法庫。GraphX中提出了彈性分布式屬性圖的概念,并在此基礎上實現了圖視圖與表視圖的有機結合與統一;同時針對圖數據處理提供了豐富的操作,例如取子圖操作subgraph、頂點屬性操作mapVertices、邊屬性操作mapEdges等。GraphX還實現了與Pregel的結合,可以直接使用一些常用圖算法,如PageRank、三角形計數等。

MlLib (Machine Learning)

MLlib代表機器學習庫。 Spark MLlib用于在Apache Spark中執行機器學習。

MLlib是Spark提供的一個機器學習算法庫,其中包含了多種經典、常見的機器學習算法,主要有分類、回歸、聚類、協同過濾等。MLlib不僅提供了模型評估、數據導入等額外的功能,還提供了一些更底層的機器學習原語,包括一個通用的梯度下降優化基礎算法。所有這些方法都被設計為可以在集群上輕松伸縮的架構。

如何運行Spark程序

在實際編程中,我們不需關心以上調度細節.只需使用 Spark 提供的指定語言的編程接口調用相應的 API 即可.
  在 Spark API 中, 一個 應用(Application) 對應一個 SparkContext 的實例。一個 應用 可以用于單個 Job,或者分開的多個 Job 的 session,或者響應請求的長時間生存的服務器。與 MapReduce 不同的是,一個 應用 的進程(我們稱之為 Executor),會一直在集群上運行,即使當時沒有 Job 在上面運行。
  而調用一個Spark內部的 Action 會產生一個 Spark job 來完成它。 為了確定這些job實際的內容,Spark 檢查 RDD 的DAG再計算出執行 plan 。這個 plan 以最遠端的 RDD 為起點(最遠端指的是對外沒有依賴的 RDD 或者 數據已經緩存下來的 RDD),產生結果 RDD 的 Action 為結束 。并根據是否發生 shuffle 劃分 DAG 的 stage.

Spark原生架構和運行原理

架構和粗流程描述

一個完整的Spark應用程序,在提交集群運行時,它的處理流程涉及到如下圖所示的架構:

每個Spark應用都由一個驅動器程序(drive program)來發起集群上的各種并行操作。

驅動器程序包含應用的main函數。

驅動器負責創建SparkContext。

SparkContext可以與不同種類的集群資源管理器(Cluster Manager),例如Hadoop YARN,Mesos進行通信。

獲取到集群進行所需的資源后,SparkContext將得到集群中工作節點(Worker Node)上對應的Executor。

不同的Spark程序有不同的Executor,他們之間是相互獨立的進程,Executor為應用程序提供分布式計算以及數據存儲功能。

之后SparkContext將應用程序代碼發送到各Executor,將任務(Task)分配給executors執行。

ClusterManager

在Standalone模式中即為Master節點(主節點),控制整個集群,監控Worker.在YARN中為ResourceManager

Worker

從節點,負責控制計算節點,啟動Executor或Driver。在YARN模式中為NodeManager,負責計算節點的控制。

Driver

運行Application的main()函數并創建SparkContect。

Executor

執行器,在worker node上執行任務的組件、用于啟動線程池運行任務。每個Application擁有獨立的一組Executor。

SparkContext

整個應用的上下文,控制應用的生命周期。

RDD

Spark的計算單元,一組RDD可形成執行的有向無環圖RDD Graph。

DAG Scheduler

根據作業(Job)構建基于Stage的DAG,并提交Stage給TaskScheduler。

TaskScheduler

將任務(Task)分發給Executor。

SparkEnv

線程級別的上下文,存儲運行時的重要組件的引用。

SparkEnv內構建并包含如下一些重要組件的引用。

1)MapOutPutTracker:負責Shuffle元信息的存儲。
2)BroadcastManager:負責廣播變量的控制與元信息的存儲。
3)BlockManager:負責存儲管理、創建和查找快。
4)MetricsSystem:監控運行時性能指標信息。
5)SparkConf:負責存儲配置信息。

詳細流程描述

使用spark-submit提交一個Spark作業之后,這個作業就會啟動一個對應的Driver進程。

根據你使用的部署模式(deploy-mode)不同,Driver進程可能在本地啟動,也可能在集群中某個工作節點上啟動。

而Driver進程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的資源管理集群,比如使用YARN作為資源管理集群)申請運行Spark作業需要使用的資源,這里的資源指的就是Executor進程。

YARN集群管理器會根據我們為Spark作業設置的資源參數,在各個工作節點上,啟動一定數量的Executor進程,每個Executor進程都占有一定數量的內存和CPU core。

在申請到了作業執行所需的資源之后,Driver進程就會開始調度和執行我們編寫的作業代碼了。

Driver進程會將我們編寫的Spark作業代碼分拆為多個stage,每個stage執行一部分代碼片段,并為每個stage創建一批Task,然后將這些Task分配到各個Executor進程中執行。

Task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個Task處理的數據不同而已。

一個stage的所有Task都執行完畢之后,會在各個節點本地的磁盤文件中寫入計算中間結果,然后Driver就會調度運行下一個stage。

下一個stage的Task的輸入數據就是上一個stage輸出的中間結果。

如此循環往復,直到將我們自己編寫的代碼邏輯全部執行完,并且計算完所有的數據,得到我們想要的結果為止。

Spark是根據shuffle類算子來進行stage的劃分。

如果我們的代碼中執行了某個shuffle類算子(比如reduceByKey、join等),那么就會在該算子處,劃分出一個stage界限來。

可以大致理解為,shuffle算子執行之前的代碼會被劃分為一個stage,shuffle算子執行以及之后的代碼會被劃分為下一個stage。

因此一個stage剛開始執行的時候,它的每個Task可能都會從上一個stage的Task所在的節點,去通過網絡傳輸拉取需要自己處理的所有key,然后對拉取到的所有相同的key使用我們自己編寫的算子函數執行聚合操作(比如reduceByKey()算子接收的函數)。這個過程就是shuffle。

當我們在代碼中執行了cache/persist等持久化操作時,根據我們選擇的持久化級別的不同,每個Task計算出來的數據也會保存到Executor進程的內存或者所在節點的磁盤文件中。

因此Executor的內存主要分為三塊:

第一塊是讓Task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;

第二塊是讓Task通過shuffle過程拉取了上一個stage的Task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;

第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。

Task的執行速度是跟每個Executor進程的CPU core數量有直接關系的。

一個CPU core同一時間只能執行一個線程。而每個Executor進程上分配到的多個Task,都是以每個Task一條線程的方式,多線程并發運行的。

如果CPU core數量比較充足,而且分配到的Task數量比較合理,那么通常來說,可以比較快速和高效地執行完這些Task線程。

以上就是Spark作業的基本運行原理的說明.

shuffle 和 stage

shuffle 是劃分 DAG 中 stage 的標識,同時影響 Spark 執行速度的關鍵步驟.

RDD 的 Transformation 函數中,又分為窄依賴(narrow dependency)和寬依賴(wide dependency)的操作.

窄依賴跟寬依賴的區別在于 是否發生 shuffle(洗牌) 操作.

寬依賴會發生 shuffle 操作. 窄依賴是子 RDD的各個分片(partition)不依賴于其他分片,能夠獨立計算得到結果,寬依賴指子 RDD 的各個分片會依賴于父RDD 的多個分片,所以會造成父 RDD 的各個分片在集群中重新分片, 看如下兩個示例:

// Map: "cat" -> c, cat val rdd1 = rdd.Map(x => (x.charAt(0), x)) // groupby same key and count val rdd2 = rdd1.groupBy(x => x._1).Map(x => (x._1, x._2.toList.length))

第一個 Map 操作將 RDD 里的各個元素進行映射, RDD 的各個數據元素之間不存在依賴,可以在集群的各個內存中獨立計算,也就是并行化

第二個 groupby 之后的 Map 操作,為了計算相同 key 下的元素個數,需要把相同 key 的元素聚集到同一個 partition 下,所以造成了數據在內存中的重新分布,即 shuffle 操作.

shuffle 操作是 spark 中最耗時的操作,應盡量避免不必要的 shuffle.

寬依賴主要有兩個過程: shuffle write 和 shuffle fetch.

類似 Hadoop 的 Map 和 Reduce 階段.
shuffle write 將 ShuffleMapTask 任務產生的中間結果緩存到內存中, shuffle fetch 獲得 ShuffleMapTask 緩存的中間結果進行 ShuffleReduceTask 計算,這個過程容易造成OutOfMemory.

shuffle 過程內存分配使用 ShuffleMemoryManager 類管理,會針對每個 Task 分配內存,Task 任務完成后通過 Executor 釋放空間.

這里可以把 Task 理解成不同 key 的數據對應一個 Task.

早期的內存分配機制使用公平分配,即不同 Task 分配的內存是一樣的,但是這樣容易造成內存需求過多的 Task 的 OutOfMemory, 從而造成多余的 磁盤 IO 過程,影響整體的效率.

(例:某一個 key 下的數據明顯偏多,但因為大家內存都一樣,這一個 key 的數據就容易 OutOfMemory).

1.5版以后 Task 共用一個內存池,內存池的大小默認為 JVM 最大運行時內存容量的16%

分配機制如下:

假如有 N 個 Task,ShuffleMemoryManager 保證每個 Task 溢出之前至少可以申請到1/2N 內存,且至多申請到1/N

N 為當前活動的 shuffle Task 數
因為N 是一直變化的,所以 manager 會一直追蹤 Task 數的變化,重新計算隊列中的1/N 和1/2N.

但是這樣仍然容易造成內存需要多的 Task 任務溢出,所以最近有很多相關的研究是針對 shuffle 過程內存優化的.

如下 DAG 流程圖中,分別讀取數據,經過處理后 join 2個 RDD 得到結果


在這個圖中,根據是否發生 shuffle 操作能夠將其分成如下的 stage 類型:

(join 需要針對同一個 key 合并,所以需要 shuffle)

運行到每個 stage 的邊界時,數據在父 stage 中按照 Task 寫到磁盤上,而在子 stage 中通過網絡按照 Task 去讀取數據。這些操作會導致很重的網絡以及磁盤的I/O,所以 stage 的邊界是非常占資源的,在編寫 Spark 程序的時候需要盡量避免的 。父 stage 中 partition 個數與子 stage 的 partition 個數可能不同,所以那些產生 stage 邊界的 Transformation 常常需要接受一個 numPartition 的參數來覺得子 stage 中的數據將被切分為多少個 partition[^demoa]。

PS:shuffle 操作的時候可以用 combiner 壓縮數據,減少 IO 的消耗

Spark原生框架處理數據流程

1、Client提交應用。
2、Master找到一個Worker啟動Driver
3、Driver向Master或者資源管理器申請資源,之后將應用轉化為RDD Graph
4、再由DAGSchedule將RDD Graph轉化為Stage的有向無環圖提交給TaskSchedule。
5、再由TaskSchedule提交任務給Executor執行。
6、其它組件協同工作,確保整個應用順利執行。

Executor執行任務原理

Executor完成一個任務需要做兩部分工具,一部分就是加載數據源,也就是Spark的基礎數據單元RDD。

RDD的數據來源可以是多種多樣的,我們這里以HDFS為例。

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。

transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。

只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然后用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最后的操作,比如遍歷,reduce,保存到文件等,并可以返回結果給Driver程序。

action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

Spark on yarn 框架處理數據流程

1、基于YARN的Spark作業首先由客戶端生成作業信息,提交給ResourceManager。
2、ResourceManager在某一NodeManager匯報時把AppMaster分配給NodeManager。
3、NodeManager啟動SparkAppMaster。
4、SparkAppMastere啟動后初始化然后向ResourceManager申請資源。
5、申請到資源后,SparkAppMaster通過RPC讓NodeManager啟動相應的SparkExecutor。
6、SparkExecutor向SparkAppMaster匯報并完成相應的任務。
7、SparkClient會通過AppMaster獲取作業運行狀態。

如何運行Spark程序

在實際編程中,我們不需要關心調度細節.

只需使用 Spark 提供的指定語言的編程接口調用相應的 API 即可.

在 Spark API 中, 一個 應用(Application) 對應一個 SparkContext 的實例。

一個 應用 可以用于單個 Job,或者分開的多個 Job 的 session,或者響應請求的長時間生存的服務器。

與 MapReduce 不同的是,一個 應用 的進程(我們稱之為 Executor),會一直在集群上運行,即使當時沒有 Job 在上面運行。

而調用一個Spark內部的 Action 會產生一個 Spark job 來完成它。

為了確定這些job實際的內容,Spark 檢查 RDD 的DAG再計算出執行 plan 。

這個 plan 以最遠端的 RDD 為起點(最遠端指的是對外沒有依賴的 RDD 或者 數據已經緩存下來的 RDD),產生結果 RDD 的 Action 為結束 。

并根據是否發生 shuffle 劃分 DAG 的 stage.

參考鏈接:

https://www.aboutyun.com/forum.php?mod=viewthread&tid=24883

https://www.cnblogs.com/cxxjohnson/p/8909578.html

總結

以上是生活随笔為你收集整理的hadoop组件---spark----全面了解spark以及与hadoop的区别的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。