日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

Spark知识体系完整解读

發(fā)布時間:2025/4/14 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark知识体系完整解读 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Spark知識體系完整解讀(內(nèi)有福利)

???   作者:楊思義,2014年6月至今工作于北京亞信智慧數(shù)據(jù)科技有限公司 BDX大數(shù)據(jù)事業(yè)部,從2014年9月開始從事項目spark相關(guān)應(yīng)用開發(fā)。

???   來源:數(shù)盟

  Spark簡介


  Spark是整個BDAS的核心組件,是一個大數(shù)據(jù)分布式編程框架,不僅實現(xiàn)了MapReduce的算子map 函數(shù)和reduce函數(shù)及計算模型,還提供更為豐富的算子,如filter、join、groupByKey等。是一個用來實現(xiàn)快速而同用的集群計算的平臺。

  Spark將分布式數(shù)據(jù)抽象為彈性分布式數(shù)據(jù)集(RDD),實現(xiàn)了應(yīng)用任務(wù)調(diào)度、RPC、序列化和壓縮,并為運行在其上的上層組件提供API。其底層采用Scala這種函數(shù)式語言書寫而成,并且所提供的API深度借鑒Scala函數(shù)式的編程思想,提供與Scala類似的編程接口

  Sparkon Yarn

  

  從用戶提交作業(yè)到作業(yè)運行結(jié)束整個運行期間的過程分析。

  一、客戶端進(jìn)行操作


??? 根據(jù)yarnConf來初始化yarnClient,并啟動yarnClient

??? 創(chuàng)建客戶端Application,并獲取Application的ID,進(jìn)一步判斷集群中的資源是否滿足executor和ApplicationMaster申請的資源,如果不滿足則拋出IllegalArgumentException;

??? 設(shè)置資源、環(huán)境變量:其中包括了設(shè)置Application的Staging目錄、準(zhǔn)備本地資源(jar文件、log4j.properties)、設(shè)置Application其中的環(huán)境變量、創(chuàng)建Container啟動的Context等;

??? 設(shè)置Application提交的Context,包括設(shè)置應(yīng)用的名字、隊列、AM的申請的Container、標(biāo)記該作業(yè)的類型為Spark;

??? 申請Memory,并最終通過yarnClient.submitApplication向ResourceManager提交該Application。

  當(dāng)作業(yè)提交到Y(jié)ARN上之后,客戶端就沒事了,甚至在終端關(guān)掉那個進(jìn)程也沒事,因為整個作業(yè)運行在YARN集群上進(jìn)行,運行的結(jié)果將會保存到HDFS或者日志中。

  二、提交到Y(jié)ARN集群,YARN操作


??? 運行ApplicationMaster的run方法;

??? 設(shè)置好相關(guān)的環(huán)境變量。

??? 創(chuàng)建amClient,并啟動;

??? 在Spark UI啟動之前設(shè)置Spark UI的AmIpFilter;

??? 在startUserClass函數(shù)專門啟動了一個線程(名稱為Driver的線程)來啟動用戶提交的Application,也就是啟動了Driver。在Driver中將會初始化SparkContext;

??? 等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(shù)(默認(rèn)為10),如果等待了的次數(shù)超過了配置的,程序?qū)顺?#xff1b;否則用SparkContext初始化yarnAllocator;

??? 當(dāng)SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager注冊ApplicationMaster

??? 分配并啟動Executeors。在啟動Executeors之前,先要通過yarnAllocator獲取到numExecutors個Container,然后在Container中啟動Executeors。

???   那么這個Application將失敗,將Application Status標(biāo)明為FAILED,并將關(guān)閉SparkContext。其實,啟動Executeors是通過ExecutorRunnable實現(xiàn)的,而ExecutorRunnable內(nèi)部是啟動CoarseGrainedExecutorBackend的。

??? 最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業(yè)運行完成。

  Spark節(jié)點的概念


  一、Spark驅(qū)動器是執(zhí)行程序中的main()方法的進(jìn)程。它執(zhí)行用戶編寫的用來創(chuàng)建SparkContext(初始化)、創(chuàng)建RDD,以及運行RDD的轉(zhuǎn)化操作和行動操作的代碼。

  驅(qū)動器節(jié)點driver的職責(zé):

??? 把用戶程序轉(zhuǎn)為任務(wù)task(driver)

???   Spark驅(qū)動器程序負(fù)責(zé)把用戶程序轉(zhuǎn)化為多個物理執(zhí)行單元,這些單元也被稱之為任務(wù)task(詳解見備注)

??? 為執(zhí)行器節(jié)點調(diào)度任務(wù)(executor)

???   有了物理計劃之后,Spark驅(qū)動器在各個執(zhí)行器節(jié)點進(jìn)程間協(xié)調(diào)任務(wù)的調(diào)度。Spark驅(qū)動器程序會根據(jù)當(dāng)前的執(zhí)行器節(jié)點,把所有任務(wù)基于數(shù)據(jù)所在位置分配給合適的執(zhí)行器進(jìn)程。當(dāng)執(zhí)行任務(wù)時,執(zhí)行器進(jìn)程會把緩存的數(shù)據(jù)存儲起來,而驅(qū)動器進(jìn)程同樣會跟蹤這些緩存數(shù)據(jù)的位置,并利用這些位置信息來調(diào)度以后的任務(wù),以盡量減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸。(就是所謂的移動計算,而不移動數(shù)據(jù))。

  二、執(zhí)行器節(jié)點

  作用:

??? 負(fù)責(zé)運行組成Spark應(yīng)用的任務(wù),并將結(jié)果返回給驅(qū)動器進(jìn)程;

??? 通過自身的塊管理器(blockManager)為用戶程序中要求緩存的RDD提供內(nèi)存式存儲。RDD是直接緩存在執(zhí)行器進(jìn)程內(nèi)的,因此任務(wù)可以在運行時充分利用緩存數(shù)據(jù)加快運算。

  驅(qū)動器的職責(zé):

  所有的Spark程序都遵循同樣的結(jié)構(gòu):程序從輸入數(shù)據(jù)創(chuàng)建一系列RDD,再使用轉(zhuǎn)化操作派生成新的RDD,最后使用行動操作手機(jī)或存儲結(jié)果RDD,Spark程序其實是隱式地創(chuàng)建出了一個由操作組成的邏輯上的有向無環(huán)圖DAG。當(dāng)驅(qū)動器程序執(zhí)行時,它會把這個邏輯圖轉(zhuǎn)為物理執(zhí)行計劃。

  這樣 Spark就把邏輯計劃轉(zhuǎn)為一系列步驟(stage),而每個步驟又由多個任務(wù)組成。這些任務(wù)會被打包送到集群中。

  Spark初始化


??? 每個Spark應(yīng)用都由一個驅(qū)動器程序來發(fā)起集群上的各種并行操作。驅(qū)動器程序包含應(yīng)用的main函數(shù),并且定義了集群上的分布式數(shù)據(jù)集,以及對該分布式數(shù)據(jù)集應(yīng)用了相關(guān)操作。

??? 驅(qū)動器程序通過一個SparkContext對象來訪問spark,這個對象代表對計算集群的一個連接。(比如在sparkshell啟動時已經(jīng)自動創(chuàng)建了一個SparkContext對象,是一個叫做SC的變量。(下圖,查看變量sc)

???   

??? 一旦創(chuàng)建了sparkContext,就可以用它來創(chuàng)建RDD。比如調(diào)用sc.textFile()來創(chuàng)建一個代表文本中各行文本的RDD。(比如vallinesRDD = sc.textFile(“yangsy.text”),val spark = linesRDD.filter(line=>line.contains(“spark”),spark.count())

???   執(zhí)行這些操作,驅(qū)動器程序一般要管理多個執(zhí)行器,就是我們所說的executor節(jié)點。

??? 在初始化SparkContext的同時,加載sparkConf對象來加載集群的配置,從而創(chuàng)建sparkContext對象。

???   從源碼中可以看到,在啟動thriftserver時,調(diào)用了spark- daemon.sh文件,該文件源碼如左圖,加載spark_home下的conf中的文件。

???   

???   (在執(zhí)行后臺代碼時,需要首先創(chuàng)建conf對象,加載相應(yīng)參數(shù), val sparkConf = newSparkConf().setMaster("local").setAppName("cocapp").set("spark.executor.memory","1g"), val sc: SparkContext = new SparkContext(sparkConf))

  RDD工作原理:


  RDD(Resilient DistributedDatasets)[1] ,彈性分布式數(shù)據(jù)集,是分布式內(nèi)存的一個抽象概念,RDD提供了一種高度受限的共享內(nèi)存模型,即RDD是只讀的記錄分區(qū)的集合,只能通過在其他RDD執(zhí)行確定的轉(zhuǎn)換操作(如map、join和group by)而創(chuàng)建,然而這些限制使得實現(xiàn)容錯的開銷很低。對開發(fā)者而言,RDD可以看作是Spark的一個對象,它本身運行于內(nèi)存中,如讀文件是一個RDD,對文件計算是一個RDD,結(jié)果集也是一個RDD ,不同的分片、數(shù)據(jù)之間的依賴、key-value類型的map數(shù)據(jù)都可以看做RDD。

  主要分為三部分:創(chuàng)建RDD對象,DAG調(diào)度器創(chuàng)建執(zhí)行計劃,Task調(diào)度器分配任務(wù)并調(diào)度Worker開始運行。

  SparkContext(RDD相關(guān)操作)→通過(提交作業(yè))→(遍歷RDD拆分stage→生成作業(yè))DAGScheduler→通過(提交任務(wù)集)→任務(wù)調(diào)度管理(TaskScheduler)→通過(按照資源獲取任務(wù))→任務(wù)調(diào)度管理(TaskSetManager)

  Transformation返回值還是一個RDD。它使用了鏈?zhǔn)秸{(diào)用的設(shè)計模式,對一個RDD進(jìn)行計算后,變換成另外一個RDD,然后這個RDD又可以進(jìn)行另外一次轉(zhuǎn)換。這個過程是分布式的。

  Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統(tǒng)中

  轉(zhuǎn)換(Transformations)(如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉(zhuǎn)換生成另一個RDD的操作不是馬上執(zhí)行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,并不會去執(zhí)行,需要等到有Actions操作的時候才會真正啟動計算過程進(jìn)行計算。

  操作(Actions)(如:count, collect, save等),Actions操作會返回結(jié)果或把RDD數(shù)據(jù)寫到存儲系統(tǒng)中。Actions是觸發(fā)Spark啟動計算的動因。

  它們本質(zhì)區(qū)別是:Transformation返回值還是一個RDD。它使用了鏈?zhǔn)秸{(diào)用的設(shè)計模式,對一個RDD進(jìn)行計算后,變換成另外一個RDD,然后這個RDD又可以進(jìn)行另外一次轉(zhuǎn)換。這個過程是分布式的。Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統(tǒng)中。關(guān)于這兩個動作,在Spark開發(fā)指南中會有就進(jìn)一步的詳細(xì)介紹,它們是基于Spark開發(fā)的核心。

  RDD基礎(chǔ)

??? Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區(qū),這些分區(qū)運行在集群的不同節(jié)點上。創(chuàng)建RDD的方法有兩種:一種是讀取一個外部數(shù)據(jù)集;一種是在群東程序里分發(fā)驅(qū)動器程序中的對象集合,不如剛才的示例,讀取文本文件作為一個字符串的RDD的示例。

??? 創(chuàng)建出來后,RDD支持兩種類型的操作:轉(zhuǎn)化操作和行動操作

???   轉(zhuǎn)化操作會由一個RDD生成一個新的RDD。(比如剛才的根據(jù)謂詞篩選)

???   行動操作會對RDD計算出一個結(jié)果,并把結(jié)果返回到驅(qū)動器程序中,或把結(jié)果存儲到外部存儲系統(tǒng)(比如HDFS)中。比如first()操作就是一個行動操作,會返回RDD的第一個元素。

???   注:轉(zhuǎn)化操作與行動操作的區(qū)別在于Spark計算RDD的方式不同。雖然你可以在任何時候定義一個新的RDD,但Spark只會惰性計算這些RDD。它們只有第一個在一個行動操作中用到時,才會真正的計算。之所以這樣設(shè)計,是因為比如剛才調(diào)用sc.textFile(...)時就把文件中的所有行都讀取并存儲起來,就會消耗很多存儲空間,而我們馬上又要篩選掉其中的很多數(shù)據(jù)。

???   這里還需要注意的一點是,spark會在你每次對它們進(jìn)行行動操作時重新計算。如果想在多個行動操作中重用同一個RDD,那么可以使用RDD.persist()或RDD.collect()讓Spark把這個RDD緩存下來。(可以是內(nèi)存,也可以是磁盤)

??? Spark會使用譜系圖來記錄這些不同RDD之間的依賴關(guān)系,Spark需要用這些信息來按需計算每個RDD,也可以依靠譜系圖在持久化的RDD丟失部分?jǐn)?shù)據(jù)時用來恢復(fù)所丟失的數(shù)據(jù)。(如下圖,過濾errorsRDD與warningsRDD,最終調(diào)用union()函數(shù))

???   

  RDD計算方式

  

  RDD的寬窄依賴

  

  窄依賴 (narrowdependencies) 和寬依賴 (widedependencies) 。窄依賴是指 父 RDD 的每個分區(qū)都只被子 RDD 的一個分區(qū)所使用 。相應(yīng)的,那么寬依賴就是指父 RDD 的分區(qū)被多個子 RDD 的分區(qū)所依賴。例如, map 就是一種窄依賴,而 join 則會導(dǎo)致寬依賴

  這種劃分有兩個用處。首先,窄依賴支持在一個結(jié)點上管道化執(zhí)行。例如基于一對一的關(guān)系,可以在 filter 之后執(zhí)行 map 。其次,窄依賴支持更高效的故障還原。因為對于窄依賴,只有丟失的父 RDD 的分區(qū)需要重新計算。而對于寬依賴,一個結(jié)點的故障可能導(dǎo)致來自所有父 RDD 的分區(qū)丟失,因此就需要完全重新執(zhí)行。因此對于寬依賴,Spark 會在持有各個父分區(qū)的結(jié)點上,將中間數(shù)據(jù)持久化來簡化故障還原,就像 MapReduce 會持久化 map 的輸出一樣。

  SparkExample

  

  步驟 1 :創(chuàng)建 RDD 。上面的例子除去最后一個 collect 是個動作,不會創(chuàng)建 RDD 之外,前面四個轉(zhuǎn)換都會創(chuàng)建出新的 RDD 。因此第一步就是創(chuàng)建好所有 RDD( 內(nèi)部的五項信息 ) 。

  步驟 2 :創(chuàng)建執(zhí)行計劃。Spark 會盡可能地管道化,并基于是否要重新組織數(shù)據(jù)來劃分 階段 (stage) ,例如本例中的 groupBy() 轉(zhuǎn)換就會將整個執(zhí)行計劃劃分成兩階段執(zhí)行。最終會產(chǎn)生一個 DAG(directedacyclic graph ,有向無環(huán)圖 ) 作為邏輯執(zhí)行計劃。

  步驟 3 :調(diào)度任務(wù)。 將各階段劃分成不同的 任務(wù) (task) ,每個任務(wù)都是數(shù)據(jù)和計算的合體。在進(jìn)行下一階段前,當(dāng)前階段的所有任務(wù)都要執(zhí)行完成。因為下一階段的第一個轉(zhuǎn)換一定是重新組織數(shù)據(jù)的,所以必須等當(dāng)前階段所有結(jié)果數(shù)據(jù)都計算出來了才能繼續(xù)。

  假設(shè)本例中的 hdfs://names 下有四個文件塊,那么 HadoopRDD 中 partitions 就會有四個分區(qū)對應(yīng)這四個塊數(shù)據(jù),同時 preferedLocations 會指明這四個塊的最佳位置。現(xiàn)在,就可以創(chuàng)建出四個任務(wù),并調(diào)度到合適的集群結(jié)點上。

  Spark數(shù)據(jù)分區(qū)


??? Spark的特性是對數(shù)據(jù)集在節(jié)點間的分區(qū)進(jìn)行控制。在分布式系統(tǒng)中,通訊的代價是巨大的,控制數(shù)據(jù)分布以獲得最少的網(wǎng)絡(luò)傳輸可以極大地提升整體性能。Spark程序可以通過控制RDD分區(qū)方式來減少通訊的開銷。

??? Spark中所有的鍵值對RDD都可以進(jìn)行分區(qū)。確保同一組的鍵出現(xiàn)在同一個節(jié)點上。比如,使用哈希分區(qū)將一個RDD分成了100個分區(qū),此時鍵的哈希值對100取模的結(jié)果相同的記錄會被放在一個節(jié)點上。

???   (可使用partitionBy(newHashPartitioner(100)).persist()來構(gòu)造100個分區(qū))

??? Spark中的許多操作都引入了將數(shù)據(jù)根據(jù)鍵跨界點進(jìn)行混洗的過程。(比如:join(),leftOuterJoin(),groupByKey(),reducebyKey()等)對于像reduceByKey()這樣只作用于單個RDD的操作,運行在未分區(qū)的RDD上的時候會導(dǎo)致每個鍵的所有對應(yīng)值都在每臺機(jī)器上進(jìn)行本地計算。

  SparkSQL的shuffle過程


  

  Spark SQL的核心是把已有的RDD,帶上Schema信息,然后注冊成類似sql里的”Table”,對其進(jìn)行sql查詢。這里面主要分兩部分,一是生成SchemaRD,二是執(zhí)行查詢。

  如果是spark-hive項目,那么讀取metadata信息作為Schema、讀取hdfs上數(shù)據(jù)的過程交給Hive完成,然后根據(jù)這倆部分生成SchemaRDD,在HiveContext下進(jìn)行hql()查詢。

  SparkSQL結(jié)構(gòu)化數(shù)據(jù)

??? 首先說一下ApacheHive,Hive可以在HDFS內(nèi)或者在其他存儲系統(tǒng)上存儲多種格式的表。SparkSQL可以讀取Hive支持的任何表。要把Spark SQL連接已有的hive上,需要提供Hive的配置文件。hive-site.xml文件復(fù)制到spark的conf文件夾下。再創(chuàng)建出HiveContext對象(sparksql的入口),然后就可以使用HQL來對表進(jìn)行查詢,并以由行足證的RDD的形式拿到返回的數(shù)據(jù)。

??? 創(chuàng)建Hivecontext并查詢數(shù)據(jù)

???   importorg.apache.spark.sql.hive.HiveContext

???   valhiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)

???   valrows = hiveCtx.sql(“SELECT name,age FROM users”)

???   valfitstRow – rows.first()

???   println(fitstRow.getSgtring(0)) //字段0是name字段

??? 通過jdbc連接外部數(shù)據(jù)源更新與加載

???   Class.forName("com.mysql.jdbc.Driver")

???   val conn =DriverManager.getConnection(mySQLUrl)

???   val stat1 =conn.createStatement()

???   stat1.execute("UPDATE CI_LABEL_INFO set DATA_STATUS_ID = 2 , DATA_DATE ='" + dataDate +"' where LABEL_ID in ("+allCreatedLabels.mkString(",")+")")

???   stat1.close()

???   //加載外部數(shù)據(jù)源數(shù)據(jù)到內(nèi)存

???   valDIM_COC_INDEX_MODEL_TABLE_CONF =sqlContext.jdbc(mySQLUrl,"DIM_COC_INDEX_MODEL_TABLE_CONF").cache()

???   val targets =DIM_COC_INDEX_MODEL_TABLE_CONF.filter("TABLE_DATA_CYCLE ="+TABLE_DATA_CYCLE).collect

  SparkSQL解析

  

  首先說下傳統(tǒng)數(shù)據(jù)庫的解析,傳統(tǒng)數(shù)據(jù)庫的解析過程是按Rusult、Data Source、Operation的次序來解析的。傳統(tǒng)數(shù)據(jù)庫先將讀入的SQL語句進(jìn)行解析,分辨出SQL語句中哪些詞是關(guān)鍵字(如select,from,where),哪些是表達(dá)式,哪些是Projection,哪些是Data Source等等。進(jìn)一步判斷SQL語句是否規(guī)范,不規(guī)范就報錯,規(guī)范則按照下一步過程綁定(Bind)。過程綁定是將SQL語句和數(shù)據(jù)庫的數(shù)據(jù)字典(列,表,視圖等)進(jìn)行綁定,如果相關(guān)的Projection、Data Source等都存在,就表示這個SQL語句是可以執(zhí)行的。在執(zhí)行過程中,有時候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運行剛運行過的SQL語句,直接從數(shù)據(jù)庫的緩沖池中獲取返回結(jié)果。在數(shù)據(jù)庫解析的過程中SQL語句時,將會把SQL語句轉(zhuǎn)化成一個樹形結(jié)構(gòu)來進(jìn)行處理,會形成一個或含有多個節(jié)點(TreeNode)的Tree,然后再后續(xù)的處理政對該Tree進(jìn)行一系列的操作。

  Spark SQL對SQL語句的處理和關(guān)系數(shù)據(jù)庫對SQL語句的解析采用了類似的方法,首先會將SQL語句進(jìn)行解析,然后形成一個Tree,后續(xù)如綁定、優(yōu)化等處理過程都是對Tree的操作,而操作方法是采用Rule,通過模式匹配,對不同類型的節(jié)點采用不同的操作。SparkSQL有兩個分支,sqlContext和hiveContext。sqlContext現(xiàn)在只支持SQL語法解析器(Catalyst),hiveContext支持SQL語法和HiveContext語法解析器。 《新程序員》:云原生和全面數(shù)字化實踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的Spark知识体系完整解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。