[Kafka与Spark集成系列三] Spark编程模型
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關(guān)注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-and-spark-integration-3-spark-code-model/
在Spark中,我們通過對分布式數(shù)據(jù)集的操作來表達(dá)我們的計算意圖,這些計算會自動地在集群上并行進(jìn)行。這樣的數(shù)據(jù)集被稱為彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset),簡稱RDD。RDD是Spark對分布式數(shù)據(jù)和計算的基本抽象。在Spark中,對數(shù)據(jù)的所有操作不外乎創(chuàng)建RDD、轉(zhuǎn)換已有RDD以及調(diào)用RDD操作進(jìn)行求值。在《Spark的安裝及簡單應(yīng)用》的單詞統(tǒng)計示例中,rdd和wordmap都是MapPartitionsRDD類型的RDD,而wordreduce是ShuffledRDD類型的RDD。
RDD支持2種類型的操作:轉(zhuǎn)換操作(Transformation Operation)和行動操作(Action Operation)。有些資料還會細(xì)分為創(chuàng)建操作、轉(zhuǎn)換操作、控制操作以及行動操作等4種類型。轉(zhuǎn)換操作會由一個RDD生成一個新的RDD。行動操作會對RDD計算出一個結(jié)果,并把結(jié)果返回到驅(qū)動器程序中,或者把結(jié)果存儲到外部存儲系統(tǒng)中。轉(zhuǎn)換操作和行動操作的區(qū)別在于Spark計算RDD的方式不同。雖然你可以在任何時候定義新的RDD,但Spark只會惰性計算這些RDD。它們只有第一次在一個行動操作中用到時,才會真正計算。表中給出了轉(zhuǎn)換操作和行動操作之間對比的更多細(xì)節(jié)。
| 轉(zhuǎn)換操作 | map、filter、groupBy、join、union、reduce、sort、partitionBy等 | 返回值還是RDD,不會立馬提交給Spark集群運(yùn)行 |
| 行動操作 | count、collect、take、save、show等 | 返回值不是RDD,會形成DAG圖,提交給Spark集群運(yùn)行并立即返回結(jié)果 |
通過轉(zhuǎn)換操作,從已有的RDD中派生出新的RDD,Spark會使用譜系圖(lineage graph,很多資料也會翻譯為“血統(tǒng)”)來記錄這些不同RDD之間的依賴關(guān)系。Spark需要用這些信息來按需計算每個RDD,也可以依賴譜系圖在持久化的RDD丟失部分?jǐn)?shù)據(jù)時恢復(fù)所丟失的數(shù)據(jù)。行動操作會把最終求得的結(jié)果返回到驅(qū)動器程序,或者寫入外部存儲系統(tǒng)中。由于行動操作需要生產(chǎn)實際的輸出,它們會強(qiáng)制執(zhí)行那些求值必須用到的RDD的轉(zhuǎn)換操作。
Spark中RDD計算是以分區(qū)(Partition)為單位的,將RDD劃分為很多個分區(qū)分布到集群的節(jié)點(diǎn)中,分區(qū)的多少涉及對這個RDD進(jìn)行并行計算的粒度。如下圖所示,實線方框A、B、C、D、E、F、G都表示的是RDD,陰影背景的矩形則表示分區(qū)。A、B、C、D、E、F、G之間的依賴關(guān)系構(gòu)成整個應(yīng)用的譜系圖。
依賴關(guān)系還可以分為窄依賴和寬依賴。窄依賴(Narrow Dependencies)是指每個父RDD的分區(qū)都至多被一個子RDD的分區(qū)使用,而寬依賴(Wide Dependencies)是指多個子RDD的分區(qū)依賴一個父RDD的分區(qū)。圖中,C和D之間是窄依賴,而A和B之間是寬依賴。RDD中行動操作的執(zhí)行將會以寬依賴為分界來構(gòu)建各個調(diào)度階段,各個調(diào)度階段內(nèi)部的窄依賴則前后鏈接構(gòu)成流水線。圖中的3個虛線方框分別代表了3個不同的調(diào)度階段。
對于執(zhí)行失敗的任務(wù),只要它對應(yīng)的調(diào)度階段的父類信息仍然可用,該任務(wù)就會分散到其它節(jié)點(diǎn)重新執(zhí)行。如果某些調(diào)度階段不可用,則重新提交相應(yīng)的任務(wù),并以并行方式計算丟失的地方。在整個作業(yè)中如果某個任務(wù)執(zhí)行緩慢,系統(tǒng)則會在其他節(jié)點(diǎn)上執(zhí)行該任務(wù)的副本,并最終取最先得到的結(jié)果作為最終的結(jié)果。
下面就以與《Spark的安裝及簡單應(yīng)用》中相同的單詞統(tǒng)計程序來分析一下Spark的編程模型,與《Spark的安裝及簡單應(yīng)用》中所不同的是,這里的是一個完整的Scala程序,程序所對應(yīng)的Maven依賴如下:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.1</version> </dependency>具體代碼示例如下:
package scala.spark.demo import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit ={val conf = new SparkConf().setAppName("WordCount").setMaster("local")①val sc = new SparkContext(conf)②val rdd = sc.textFile("/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-shell")③val wordcount = rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)④val wordsort = wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))⑤wordsort.saveAsTextFile("/tmp/spark")⑥sc.stop()⑦} }main()方法主體中第①和第②行中首先創(chuàng)建一個SparkConf對象來配置應(yīng)用程序,然后基于這個SparkConf創(chuàng)建了一個SparkContext對象。一旦有了SparkContext,就可以用它來創(chuàng)建RDD,第③行代碼中調(diào)用了sc.textFile()來創(chuàng)建一個代表文件中各行文本的RDD。第④行中rdd.flatMap(_.split(" ")).map(x=>(x,1))這一段內(nèi)容的依賴關(guān)系是窄依賴,而reduceByKey(_+_)操作對單詞進(jìn)行計數(shù)時屬于寬依賴。第⑥行中將排序后的結(jié)果存儲起來。最后第⑦行中使用stop()方法來關(guān)閉應(yīng)用。
在$SPARK_HOME/bin目錄中還有一個spark-submit腳本,用于將應(yīng)用快速部署到Spark集群中。比如這里的WordCount程序,當(dāng)我們希望通過spark-submit部署時,只需要將應(yīng)用打包成jar包(即下面示例中的wordcount.jar)并上傳到Spark集群中,然后通過spark-submit進(jìn)行部署,示例如下:
[root@node1 spark]# bin/spark-submit --class scala.spark.demo.WordCount wordcount.jar --executor-memory 1G --master spark://localhost:7077 2018-08-06 15:39:54 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2018-08-06 15:39:55 INFO SparkContext:54 - Running Spark version 2.3.1 2018-08-06 15:39:55 INFO SparkContext:54 - Submitted application: WordCount 2018-08-06 15:39:55 INFO SecurityManager:54 - Changing view acls to: root 2018-08-06 15:39:55 INFO SecurityManager:54 - Changing modify acls to: root (....省略若干) 2018-08-07 12:25:47 INFO AbstractConnector:318 - Stopped Spark@6299e2c1{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 2018-08-07 12:25:47 INFO SparkUI:54 - Stopped Spark web UI at http://10.199.172.111:4040 2018-08-07 12:25:47 INFO MapOutputTrackerMasterEndpoint:54 – MapOutputTrackerMasterEndpoint stopped! 2018-08-07 12:25:47 INFO MemoryStore:54 - MemoryStore cleared 2018-08-07 12:25:47 INFO BlockManager:54 - BlockManager stopped 2018-08-07 12:25:47 INFO BlockManagerMaster:54 - BlockManagerMaster stopped 2018-08-07 12:25:47 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 – OutputCommitCoordinator stopped! 2018-08-06 15:46:57 INFO SparkContext:54 - Successfully stopped SparkContext 2018-08-06 15:46:57 INFO ShutdownHookManager:54 - Shutdown hook called 2018-08-06 15:46:57 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-fa955139-270c-4899-82b7-4959983a1cb0 2018-08-06 15:46:57 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-3f359966-2167-4bb9-863a-2d8a8d5e8fbe示例中的–class用來指定應(yīng)用程序的主類,這里為scala.spark.demo.WordCount;–executor-memory用來指定執(zhí)行器節(jié)點(diǎn)的內(nèi)容,這里設(shè)置為1G。最后得到的輸出結(jié)果如下所示:
[root@node1 spark]# ls /tmp/spark part-00000 _SUCCESS [root@node1 spark]# cat /tmp/spark/part-00000 (,91) (#,37) (the,19) (in,7) (to,7) (for,6) (if,5) (then,5) (under,4) (stty,4) (not,4)歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-and-spark-integration-3-spark-code-model/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關(guān)注筆者的微信公眾號:朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的[Kafka与Spark集成系列三] Spark编程模型的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [Kafka与Spark集成系列二]
- 下一篇: [Kafka与Spark集成系列四]