新手福利:Apache Spark 入门攻略
本文聚焦 Apache Spark 入門,了解其在大數(shù)據(jù)領(lǐng)域的地位,覆蓋 Apache Spark 的安裝及應(yīng)用程序的建立,并解釋一些常見的行為和操作。
一、 為什么要使用 Apache Spark
時(shí)下,我們正處在一個(gè)“大數(shù)據(jù)”的時(shí)代,每時(shí)每刻,都有各種類型的數(shù)據(jù)被生產(chǎn)。而在此紫外,數(shù)據(jù)增幅的速度也在顯著增加。從廣義上看,這些數(shù)據(jù)包含交易數(shù)據(jù)、社交媒體內(nèi)容(比如文本、圖像和視頻)以及傳感器數(shù)據(jù)。那么,為什么要在這些內(nèi)容上投入如此多精力,其原因無(wú)非就是從海量數(shù)據(jù)中提取洞見可以對(duì)生活和生產(chǎn)實(shí)踐進(jìn)行很好的指導(dǎo)。
在幾年前,只有少部分公司擁有足夠的技術(shù)力量和資金去儲(chǔ)存和挖掘大量數(shù)據(jù),并對(duì)其挖掘從而獲得洞見。然而,被雅虎 2009 年開源的 Apache Hadoop 對(duì)這一狀況產(chǎn)生了顛覆性的沖擊——通過(guò)使用商用服務(wù)器組成的集群大幅度地降低了海量數(shù)據(jù)處理的門檻。因此,許多行業(yè)(比如 Health care、Infrastructure、Finance、Insurance、Telematics、Consumer、Retail、Marketing、E-commerce、Media、 Manufacturing 和 Entertainment)開始了 Hadoop 的征程,走上了海量數(shù)據(jù)提取價(jià)值的道路。著眼 Hadoop ,其主要提供了兩個(gè)方面的功能:
- 通過(guò)水平擴(kuò)展商用主機(jī),HDFS提供了一個(gè)廉價(jià)的方式對(duì)海量數(shù)據(jù)進(jìn)行容錯(cuò)存儲(chǔ)。
- MapReduce 計(jì)算范例,提供了一個(gè)簡(jiǎn)單的編程模型來(lái)挖掘數(shù)據(jù)并獲得洞見。
下圖展示了 MapReduce 的數(shù)據(jù)處理流程,其中一個(gè) Map-Reduce step 的輸出將作為下一個(gè)典型 Hadoop job 的輸入結(jié)果。
在整個(gè)過(guò)程中,中間結(jié)果會(huì)借助磁盤傳遞,因此對(duì)比計(jì)算,大量的 Map-Reduced 作業(yè)都受限于 IO 。然而對(duì)于 ETL 、數(shù)據(jù)整合和清理這樣的用例來(lái)說(shuō),IO 約束并不會(huì)產(chǎn)生很大的影響,因?yàn)檫@些場(chǎng)景對(duì)數(shù)據(jù)處理時(shí)間往往不會(huì)有較高的需求。然而,在現(xiàn)實(shí)世界中,同樣存在許多對(duì)延時(shí)要求較為苛刻的用例,比如:
毫無(wú)疑問(wèn),歷經(jīng)數(shù)年發(fā)展,Hadoop 生態(tài)圈中的豐富工具已深受用戶喜愛,然而這里仍然存在眾多問(wèn)題給使用帶來(lái)了挑戰(zhàn):
每個(gè)用例都需要多個(gè)不同的技術(shù)堆棧來(lái)支撐,在不同使用場(chǎng)景下,大量的解決方案往往捉襟見肘。
在生產(chǎn)環(huán)境中機(jī)構(gòu)往往需要精通數(shù)門技術(shù)。
許多技術(shù)存在版本兼容性問(wèn)題。
無(wú)法在并行 job 中更快地共享數(shù)據(jù)。
而通過(guò) Apache Spark,上述問(wèn)題迎刃而解!Apache Spark 是一個(gè)輕量級(jí)的內(nèi)存集群計(jì)算平臺(tái),通過(guò)不同的組件來(lái)支撐批、流和交互式用例,如下圖。
二、 關(guān)于 Apache Spark
Apache Spark 是個(gè)開源和兼容 Hadoop 的集群計(jì)算平臺(tái)。由加州大學(xué)伯克利分校的 AMPLabs 開發(fā),作為 Berkeley Data Analytics Stack(BDAS) 的一部分,當(dāng)下由大數(shù)據(jù)公司 Databricks 保駕護(hù)航,更是 Apache 旗下的頂級(jí)項(xiàng)目,下圖顯示了 Apache Spark 堆棧中的不同組件。
Apache Spark 的5大優(yōu)勢(shì):
更高的性能,因?yàn)閿?shù)據(jù)被加載到集群主機(jī)的分布式內(nèi)存中。數(shù)據(jù)可以被快速的轉(zhuǎn)換迭代,并緩存用以后續(xù)的頻繁訪問(wèn)需求。很多對(duì) Spark 感興趣的朋友可能也會(huì)聽過(guò)這樣一句話——在數(shù)據(jù)全部加載到內(nèi)存的情況下, Spark 可以比 Hadoop 快 100 倍,在內(nèi)存不夠存放所有數(shù)據(jù)的情況下快 Hadoop 10 倍。
通過(guò)建立在 Java、Scala、Python、SQL (應(yīng)對(duì)交互式查詢)的標(biāo)準(zhǔn) API 以方便各行各業(yè)使用,同時(shí)還含有大量開箱即用的機(jī)器學(xué)習(xí)庫(kù)。
與現(xiàn)有 Hadoop v1 ( SIMR ) 和 2.x (YARN) 生態(tài)兼容,因此機(jī)構(gòu)可以進(jìn)行無(wú)縫遷移。
方便下載和安裝。方便的 shell(REPL: Read-Eval-Print-Loop)可以對(duì) API 進(jìn)行交互式的學(xué)習(xí)。
借助高等級(jí)的架構(gòu)提高生產(chǎn)力,從而可以講精力放到計(jì)算上。
同時(shí), Apache Spark 由 Scala 實(shí)現(xiàn),代碼非常簡(jiǎn)潔。
三、安裝Apache Spark
下表列出了一些重要鏈接和先決條件:[+]查看原圖
如上圖所示,Apache Spark 的部署方式包括 standalone、Hadoop V1 SIMR、Hadoop 2 YARN/Mesos 。Apache Spark 需求一定的 Java、Scala 或 Python 知識(shí)。這里,我們將專注 standalone 配置下的安裝和運(yùn)行。
安裝 JDK 1.6+、Scala 2.10+、Python [2.6,3] 和 sbt
下載 Apache Spark 1.0.1 Release
在指定目錄下 Untar 和 Unzip spark-1.0.1.tgz
akuntamukkala@localhost~/Downloads$ pwd /Users/akuntamukkala/Downloads akuntamukkala@localhost~/Downloads$ tar -zxvf spark- 1.0.1.tgz -C /Users/akuntamukkala/spark運(yùn)行 sbt 建立 Apache Spark
akuntamukkala@localhost~/spark/spark-1.0.1$ pwd /Users/akuntamukkala/spark/spark-1.0.1 akuntamukkala@localhost~/spark/spark-1.0.1$ sbt/sbt assembly發(fā)布 Scala 的 Apache Spark standalone REPL
/Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell如果是 Python
/Users/akuntamukkala/spark/spark-1.0.1/bin/ pyspark查看 SparkUI @ http://localhost:4040
四、Apache Spark 的工作模式
Spark 引擎提供了在集群中所有主機(jī)上進(jìn)行分布式內(nèi)存數(shù)據(jù)處理的能力,下圖顯示了一個(gè)典型 Spark job 的處理流程。
下圖顯示了 Apache Spark 如何在集群中執(zhí)行一個(gè)作業(yè)。
Master 控制數(shù)據(jù)如何被分割,利用了數(shù)據(jù)本地性,并在 Slaves 上跟蹤所有分布式計(jì)算。在某個(gè)Slave不可用時(shí),其存儲(chǔ)的數(shù)據(jù)會(huì)分配給其他可用的 Slaves 。雖然當(dāng)下( 1.0.1 版本) Master 還存在單點(diǎn)故障,但后期必然會(huì)被修復(fù)。
五、彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset,RDD)
彈性分布式數(shù)據(jù)集(RDD,從 Spark 1.3 版本開始已被 DataFrame 替代)是 Apache Spark 的核心理念。它是由數(shù)據(jù)組成的不可變分布式集合,其主要進(jìn)行兩個(gè)操作:transformation 和 action 。Transformation 是類似在 RDD 上做 filter()、map() 或 union() 以生成另一個(gè) RDD 的操作,而 action 則是 count()、first()、take(n)、collect() 等促發(fā)一個(gè)計(jì)算并返回值到 Master 或者穩(wěn)定存儲(chǔ)系統(tǒng)的操作。Transformations 一般都是 lazy 的,直到 action 執(zhí)行后才會(huì)被執(zhí)行。Spark Master/Driver 會(huì)保存 RDD 上的 Transformations 。這樣一來(lái),如果某個(gè) RDD 丟失(也就是 salves 宕掉),它可以快速和便捷地轉(zhuǎn)換到集群中存活的主機(jī)上。這也就是 RDD 的彈性所在。
下圖展示了 Transformation 的 lazy :
我們可以通過(guò)下面示例來(lái)理解這個(gè)概念:從文本中發(fā)現(xiàn) 5 個(gè)最常用的 word 。下圖顯示了一個(gè)可能的解決方案。
在上面命令中,我們對(duì)文本進(jìn)行讀取并且建立字符串的 RDD 。每個(gè)條目代表了文本中的 1 行。
scala> val hamlet = sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”) hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12scala> val topWordCount = hamlet.flatMap(str=>str.split(“ “)). filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case (word, count) => (count, word)}.sortByKey(false) topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14
通過(guò)上述命令我們可以發(fā)現(xiàn)這個(gè)操作非常簡(jiǎn)單——通過(guò)簡(jiǎn)單的 Scala API 來(lái)連接 transformations 和 actions 。
可能存在某些 words 被 1 個(gè)以上空格分隔的情況,導(dǎo)致有些 words 是空字符串,因此需要使用 filter(!_.isEmpty) 將它們過(guò)濾掉。
每個(gè) word 都被映射成一個(gè)鍵值對(duì):map(word=>(word,1))。
為了合計(jì)所有計(jì)數(shù),這里需要調(diào)用一個(gè) reduce 步驟—— reduceByKey(+) 。 + 可以非常便捷地為每個(gè) key 賦值。
我們得到了 words 以及各自的 counts,下一步需要做的是根據(jù) counts 排序。在 Apache Spark ,用戶只能根據(jù) key 排序,而不是值。因此,這里需要使用 map{case (word, count) => (count, word)} 將 (word, count) 流轉(zhuǎn)到 (count, word)。
需要計(jì)算最常用的 5 個(gè) words ,因此需要使用 sortByKey(false) 做一個(gè)計(jì)數(shù)的遞減排序。
上述命令包含了一個(gè) .take(5) (an action operation, which triggers computation) 和在 /Users/akuntamukkala/temp/gutenburg.txt 文本中輸出 10 個(gè)最常用的 words 。在 Python shell 中用戶可以實(shí)現(xiàn)同樣的功能。
RDD lineage 可以通過(guò) toDebugString (一個(gè)值得記住的操作)來(lái)跟蹤。
scala> topWordCount.take(5).foreach(x=>println(x)) (1044,the) (730,and) (679,of) (648,to) (511,I)
常用的 Transformations:
[+]查看原圖[+]查看原圖
常見集合操作[+]查看原圖[+]查看原圖
更多 transformations 信息,請(qǐng)查看 http://spark.apache.org/docs/latest/programming-guide.html#transformations
常用的 actions[+]查看原圖
更多 actions 參見 http://spark.apache.org/docs/latest/programming-guide.html#actions
六、RDD持久性
Apache Spark 中一個(gè)主要的能力就是在集群內(nèi)存中持久化/緩存 RDD 。這將顯著地提升交互速度。下表顯示了 Spark 中各種選項(xiàng)。
[+]查看原圖
上面的存儲(chǔ)等級(jí)可以通過(guò) RDD. cache() 操作上的 persist () 操作訪問(wèn),可以方便地指定 MEMORY_ONLY 選項(xiàng)。關(guān)于持久化等級(jí)的更多信息,可以訪問(wèn)這里 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。
Spark 使用 Least Recently Used (LRU) 算法來(lái)移除緩存中舊的、不常用的 RDD ,從而釋放出更多可用內(nèi)存。同樣還提供了一個(gè) unpersist() 操作來(lái)強(qiáng)制移除緩存/持久化的 RDD 。
七、變量共享
Accumulators。Spark 提供了一個(gè)非常便捷地途徑來(lái)避免可變的計(jì)數(shù)器和計(jì)數(shù)器同步問(wèn)題—— Accumulators 。Accumulators 在一個(gè) Spark context 中通過(guò)默認(rèn)值初始化,這些計(jì)數(shù)器在 Slaves 節(jié)點(diǎn)上可用,但是 Slaves 節(jié)點(diǎn)不能對(duì)其進(jìn)行讀取。它們的作用就是來(lái)獲取原子更新,并將其轉(zhuǎn)發(fā)到 Master 。 Master 是唯一可以讀取和計(jì)算所有更新合集的節(jié)點(diǎn)。舉個(gè)例子:
akuntamukkala@localhost~/temp$ cat output.log error warning info trace error info info scala> val nErrors=sc.accumulator(0.0) scala> val logs = sc.textFile(“/Users/akuntamukkala/temp/output.log”) scala> logs.filter(_.contains(“error”)).foreach(x=>nErrors+=1) scala> nErrors.value Result:Int = 2
Broadcast Variables。實(shí)際生產(chǎn)中,通過(guò)指定 key 在 RDDs 上對(duì)數(shù)據(jù)進(jìn)行合并的場(chǎng)景非常常見。在這種情況下,很可能會(huì)出現(xiàn)給 slave nodes 發(fā)送大體積數(shù)據(jù)集的情況,讓其負(fù)責(zé)托管需要做 join 的數(shù)據(jù)。因此,這里很可能存在巨大的性能瓶頸,因?yàn)榫W(wǎng)絡(luò) IO 比內(nèi)存訪問(wèn)速度慢 100 倍。為了解決這個(gè)問(wèn)題,Spark 提供了 Broadcast Variables,如其名稱一樣,它會(huì)向 slave nodes 進(jìn)行廣播。因此,節(jié)點(diǎn)上的 RDD 操作可以快速訪問(wèn) Broadcast Variables 值。舉個(gè)例子,期望計(jì)算一個(gè)文件中所有路線項(xiàng)的運(yùn)輸成本。通過(guò)一個(gè) look-up table指定每種運(yùn)輸類型的成本,這個(gè)look-up table 就可以作為 Broadcast Variables 。
akuntamukkala@localhost~/temp$ cat packagesToShip.txt ground express media priority priority ground express media scala> val map = sc.parallelize(Seq((“ground”,1),(“med”,2), (“priority”,5),(“express”,10))).collect().toMap map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10) scala> val bcMailRates = sc.broadcast(map)
上述命令中,我們建立了一個(gè) broadcast variable,基于服務(wù)類別成本的 map 。
scala> val pts = sc.textFile(“/Users/akuntamukkala/temp/packagesToShip.txt”)
在上述命令中,我們通過(guò) broadcast variable 的 mailing rates 來(lái)計(jì)算運(yùn)輸成本。
scala> pts.map(shipType=>(shipType,1)).reduceByKey(+). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect()
通過(guò)上述命令,我們使用 accumulator 來(lái)累加所有運(yùn)輸?shù)某杀?。詳?xì)信息可通過(guò)下面的 PDF 查看 http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。
八、Spark SQL
通過(guò) Spark Engine,Spark SQL 提供了一個(gè)便捷的途徑來(lái)進(jìn)行交互式分析,使用一個(gè)被稱為 SchemaRDD 類型的 RDD 。SchemaRDD 可以通過(guò)已有 RDDs 建立,或者其他外部數(shù)據(jù)格式,比如 Parquet files、JSON 數(shù)據(jù),或者在 Hive 上運(yùn)行 HQL。SchemaRDD 非常類似于 RDBMS 中的表格。一旦數(shù)據(jù)被導(dǎo)入 SchemaRDD,Spark 引擎就可以對(duì)它進(jìn)行批或流處理。Spark SQL 提供了兩種類型的 Contexts——SQLContext 和 HiveContext,擴(kuò)展了 SparkContext 的功能。
SparkContext 提供了到簡(jiǎn)單 SQL parser 的訪問(wèn),而 HiveContext 則提供了到 HiveQL parser 的訪問(wèn)。HiveContext 允許企業(yè)利用已有的 Hive 基礎(chǔ)設(shè)施。
這里看一個(gè)簡(jiǎn)單的 SQLContext 示例。
下面文本中的用戶數(shù)據(jù)通過(guò) “ | ” 來(lái)分割。
John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854
定義 Scala case class 來(lái)表示每一行:
case class Customer(name:String,age:Int,gender:String,address: String)下面的代碼片段體現(xiàn)了如何使用 SparkContext 來(lái)建立 SQLContext ,讀取輸入文件,將每一行都轉(zhuǎn)換成 SparkContext 中的一條記錄,并通過(guò)簡(jiǎn)單的 SQL 語(yǔ)句來(lái)查詢 30 歲以下的男性用戶。
val sparkConf = new SparkConf().setAppName(“Customers”) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’)) val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”) sqlContext.sql(“select * from customers where gender=’M’ and age <30”).collect().foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris,TX,75461]更多使用 SQL 和 HiveQL 的示例請(qǐng)?jiān)L問(wèn)下面鏈接 https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。
九、Spark Streaming
Spark Streaming 提供了一個(gè)可擴(kuò)展、容錯(cuò)、高效的途徑來(lái)處理流數(shù)據(jù),同時(shí)還利用了 Spark 的簡(jiǎn)易編程模型。從真正意義上講,Spark Streaming 會(huì)將流數(shù)據(jù)轉(zhuǎn)換成 micro batches,從而將 Spark 批處理編程模型應(yīng)用到流用例中。這種統(tǒng)一的編程模型讓 Spark 可以很好地整合批量處理和交互式流分析。下圖顯示了 Spark Streaming 可以從不同數(shù)據(jù)源中讀取數(shù)據(jù)進(jìn)行分析。
Spark Streaming 中的核心抽象是 Discretized Stream(DStream)。DStream 由一組 RDD 組成,每個(gè) RDD 都包含了規(guī)定時(shí)間(可配置)流入的數(shù)據(jù)。上圖很好地展示了 Spark Streaming 如何通過(guò)將流入數(shù)據(jù)轉(zhuǎn)換成一系列的 RDDs,再轉(zhuǎn)換成 DStream 。每個(gè) RDD 都包含兩秒(設(shè)定的區(qū)間長(zhǎng)度)的數(shù)據(jù)。在 Spark Streaming 中,最小長(zhǎng)度可以設(shè)置為 0.5 秒,因此處理延時(shí)可以達(dá)到 1 秒以下。
Spark Streaming 同樣提供了 window operators ,它有助于更有效率在一組 RDD ( a rolling window of time)上進(jìn)行計(jì)算。同時(shí),DStream 還提供了一個(gè) API ,其操作符(transformations 和 output operators)可以幫助用戶直接操作 RDD 。下面不妨看向包含在 Spark Streaming 下載中的一個(gè)簡(jiǎn)單示例。示例是在 Twitter 流中找出趨勢(shì) hashtags ,詳見下面代碼。
spark- 1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala val sparkConf = new SparkConf().setAppName(“TwitterPopularTags”) val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters)上述代碼用于建立 Spark Streaming Context 。Spark Streaming 將在 DStream 中建立一個(gè) RDD ,包含了每 2 秒流入的 tweets 。
val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))上述代碼片段將 Tweet 轉(zhuǎn)換成一組 words ,并過(guò)濾出所有以 a# 開頭的。
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))上述代碼展示了如何整合計(jì)算 60 秒內(nèi)一個(gè) hashtag 流入的總次數(shù)。
topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println(“\nPopular topics in last 60 seconds (%s total):”.format(rdd.count())) topList.foreach{case (count, tag) => println(“%s (%s tweets)”.format(tag, count))} })上面代碼將找出 top 10 趨勢(shì) tweets ,然后將其打印。
ssc.start()上述代碼讓 Spark Streaming Context 開始檢索 tweets 。一起聚焦一些常用操作,假設(shè)我們正在從一個(gè) socket 中讀入流文本。
al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)更多 operators 請(qǐng)?jiān)L問(wèn) http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations
Spark Streaming 擁有大量強(qiáng)大的 output operators ,比如上文提到的 foreachRDD(),了解更多可訪問(wèn) http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations。
十、附加學(xué)習(xí)資源
- Wikipedia article (good): http://en.wikipedia.org/wiki/Apache_Spark
- Launching a Spark cluster on EC2: http://ampcamp.berkeley.edu/exercises-strata-conf-2013/launching-a-cluster.html
- Quick start: https://spark.apache.org/docs/1.0.1/quick-start.html
- The Spark platform provides MLLib(machine learning) and GraphX(graph algorithms). The following links provide more information:https://spark.apache.org/docs/latest/mllib-guide.html、https://spark.apache.org/docs/1.0.1/graphx-programming-guide.html、https://dzone.com/refcardz/apache-spark
原文鏈接:Apache Spark:An Engine for Large-Scale Data Processing
本文系 OneAPM 工程師編譯整理。
總結(jié)
以上是生活随笔為你收集整理的新手福利:Apache Spark 入门攻略的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 涉足计算机视觉领域要知道的
- 下一篇: 数字图像处理:第六章 几何运算