【Spark ML】第 2 章: Spark和Spark简介
??🔎大家好,我是Sonhhxg_柒,希望你看完之后,能對你有所幫助,不足請指正!共同學(xué)習(xí)交流🔎
📝個人主頁-Sonhhxg_柒的博客_CSDN博客?📃
🎁歡迎各位→點贊👍 + 收藏?? + 留言📝?
📣系列專欄 - 機器學(xué)習(xí)【ML】?自然語言處理【NLP】? 深度學(xué)習(xí)【DL】
?
?🖍foreword
?說明?本人講解主要包括Python、機器學(xué)習(xí)(ML)、深度學(xué)習(xí)(DL)、自然語言處理(NLP)等內(nèi)容。
如果你對這個系列感興趣的話,可以關(guān)注訂閱喲👋
文章目錄
概述
Cluster Managers(集群管理器)
Architecture(建筑)
執(zhí)行Spark?應(yīng)用程序
集群模式
客戶端模式
spark-shell簡介
SparkSession
Creating an RDD
并行化
textFile
Transformations(轉(zhuǎn)換)
Map
FlatMap
Filter(濾波器)
Distinct
ReduceByKey
Keys
Values
Inner Join
RightOuterJoin and LeftOuterJoin
Union
Subtract(減去)
Coalesce(合并)
Repartition(重新分區(qū))
Actions(行動)
Collect(收集)
Count(計數(shù))
Take(拿)
Foreach(前期)
Lazy Evaluation
Caching(緩存)
Accumulator(蓄電池)
Broadcast Variable(廣播變量)
Spark SQL、Dataset 和 DataFrames API
Spark?數(shù)據(jù)源
.CSV
.XML
JSON
關(guān)系數(shù)據(jù)庫和 MPP 數(shù)據(jù)庫
Parquet
HBase
Amazon S3
Solr
Microsoft Excel
Secure FTP(安全 FTP)
簡介Spark MLlib
Spark MLlib算法
ML Pipelines
Pipeline
Transformer
Estimator
ParamGridBuilder
交叉驗證器
Evaluator
特征提取、轉(zhuǎn)換和選擇
字符串索引器
Tokenizer(分詞器)
VectorAssembler(矢量裝配器)
StandardScaler(標(biāo)準(zhǔn)標(biāo)定器)
StopWordsRemover(停止字切換)
n-gram
OneHotEncoderEstimator
SQLTransformer
主成分分析(PCA)
ChiSqSelector(奇思克選機)
Correlation(相關(guān))
評估指標(biāo)
Area Under the Receiver Operating Characteristic (AUROC)
F1 度量值
均方根誤差(RMSE)
Model Persistence(模型持久性)
Spark MLlib示例
圖形處理
超越Spark MLlib:第三方機器學(xué)習(xí)集成
使用Alluxio優(yōu)化Spark和SparkMLlib
Architecture(建筑)
為什么使用Alluxio?
顯著提高大數(shù)據(jù)處理性能和可擴展性
多個框架和應(yīng)用程序可以以內(nèi)存速度共享數(shù)據(jù)
降低硬件要求
Apache Spark 和?Alluxio
總結(jié)
Spark是一個統(tǒng)一的大數(shù)據(jù)處理框架,用于處理和分析大型數(shù)據(jù)集。Spark 在Scala, Python, Java, 和?R?中提供了高級 API,其中包含功能強大的庫,包括用于機器學(xué)習(xí)的 MLlib、用于 SQL 支持的 Spark SQL、用于實時流的 Spark 流式處理和用于圖形處理的 GraphX。第二Spark由Matei Zaharia在加州大學(xué)伯克利分校的AMPLab創(chuàng)立,后來捐贈給Apache軟件基金會,于2014年2月24日成為頂級項目。第三第一個版本于2017年5月30日發(fā)布。
概述
《Spark》的開發(fā)是為了解決哈多普的原始數(shù)據(jù)處理框架MapReduce的局限性。Matei Zaharia看到了MapReduce在加州大學(xué)伯克利分校和Facebook(他在那里實習(xí))的局限性,并試圖創(chuàng)建一個更快,更通用,多用途的數(shù)據(jù)處理框架,可以處理迭代和交互式應(yīng)用程序。在它提供了一個統(tǒng)一的平臺(圖 2-1),支持多種類型的工作負(fù)載,如流式處理、交互式、圖形處理、機器學(xué)習(xí)和批處理。我們Spark 作業(yè)的運行速度比等效的 MapReduce 作業(yè)快許多倍,因為它具有快速的內(nèi)存中功能和高級 DAG(有向無環(huán)圖)執(zhí)行引擎。Spark是用斯卡拉語編寫的,因此它是Spark事實上的編程接口。我們將在本書中通篇使用 Scala。我們將在第7章中使用PySpark,即用于火花的Python API,用于分布式深度學(xué)習(xí)。
?圖 2-1Apache Spark?ecosystem
Cluster Managers(集群管理器)
集群管理器管理和分配集群資源。Spark 支持隨附于 Spark(獨立調(diào)度程序)、YARN、Mesos 和 Kubernetes 附帶的獨立集群管理器。
Architecture(建筑)
在較高級別,Spark 將 Spark 應(yīng)用程序任務(wù)的執(zhí)行分布在群集節(jié)點上(圖 2-2)。每個 Spark 應(yīng)用程序在其驅(qū)動程序中都有一個 SparkContext 對象。SparkContext 表示與集群管理器的連接,集群管理器為 Spark 應(yīng)用程序提供計算資源。連接到群集后,Spark 會獲取工作線程節(jié)點上的執(zhí)行程序。然后,Spark 將應(yīng)用程序代碼發(fā)送給執(zhí)行程序。應(yīng)用程序通常會運行一個或多個作業(yè)以響應(yīng) Spark 操作。然后,每個作業(yè)由 Spark 劃分為階段或任務(wù)的較小有向無環(huán)圖 (DAG)。然后,每個任務(wù)將分發(fā)并發(fā)送到工作線程節(jié)點上的執(zhí)行程序進(jìn)行執(zhí)行。
圖 2-2Apache Spark體系結(jié)構(gòu)
每個 Spark 應(yīng)用程序都有自己的一組執(zhí)行程序。由于來自不同應(yīng)用程序的任務(wù)在不同的 JVM 中運行,因此 Spark 應(yīng)用程序不會干擾另一個 Spark 應(yīng)用程序。這也意味著 Spark 應(yīng)用程序很難在不使用外部數(shù)據(jù)源(如 HDFS 或 S3)的情況下共享數(shù)據(jù)。使用堆外內(nèi)存存儲(如Tachyon(又名Alluxio)可以使數(shù)據(jù)共享更快,更輕松。我將在本章后面更詳細(xì)地討論阿盧克西奧。
執(zhí)行Spark?應(yīng)用程序
您可以使用交互式外殼程序(火花外殼程序或 pyspark)或提交應(yīng)用程序(火花提交)來執(zhí)行 Spark 應(yīng)用程序。有些人更喜歡使用基于Web的交互式筆記本,如阿帕奇齊柏林飛艇和朱皮特與Spark進(jìn)行交互。數(shù)據(jù)磚和云端等商業(yè)供應(yīng)商也提供自己的交互式筆記本環(huán)境。我將在整個章節(jié)中使用火花殼。有兩種部署模式可用于在具有集群管理器(如 YARN)的環(huán)境中啟動 Spark 應(yīng)用程序。
集群模式
在群集模式下,驅(qū)動程序在由 YARN 管理的應(yīng)用程序主服務(wù)器內(nèi)運行??蛻舳丝梢栽诓挥绊憫?yīng)用程序執(zhí)行的情況下退出。要在集群模式下啟動應(yīng)用程序或火花外殼:
spark-shell --master yarn --deploy-mode cluster spark-submit --class mypath.myClass --master yarn --deploy-mode cluster
客戶端模式
在客戶端模式下,驅(qū)動程序在客戶端中運行。應(yīng)用程序主服務(wù)器僅用于從 YARN 請求資源。要在客戶端模式下啟動應(yīng)用程序或火花外殼,
spark-shell --master yarn --deploy-mode client spark-submit --class mypath.myClass --master yarn --deploy-mode client
spark-shell簡介
通常使用交互式 shell 進(jìn)行即席數(shù)據(jù)分析或瀏覽。它也是學(xué)習(xí)火花 API 的好工具?;鸹ǖ慕换ナ酵鈿ぴ?Spark或Python中可用。在以下示例中,我們將創(chuàng)建城市的 RDD,并將它們?nèi)哭D(zhuǎn)換為大寫。當(dāng)您啟動火花外殼時,將自動創(chuàng)建一個名為“spark”的 SparkSession,如清單 2-1 所示。
spark-shell Spark context Web UI available at http://10.0.2.15:4041 Spark context available as 'sc' (master = local[*], app id = local-1574144576837). Spark session available as 'spark'. Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.4.4/_/ Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. scala>val myCities = sc.parallelize(List("tokyo","new york","sydney","san francisco")) scala>val uCities = myCities.map {x =>x.toUpperCase} scala>uCities.collect.foreach(println) TOKYO NEW YORK SYDNEY SAN FRANCISCOListing 2-1Introduction to spark-shell
SparkSession
如圖 2-2 所示,SparkContext 支持訪問所有 Spark 特性和功能。該驅(qū)動程序使用 SparkContext 訪問其他上下文,如流式處理上下文、SQL 上下文和配置項上下文。從 Spark 2.0 開始,火花會話提供了一個與 Spark 交互的單一入口點。通過 Spark1.x 中的 Spark 上下文、SQL 上下文、配置上下文和流式處理上下文提供的所有功能現(xiàn)在都可以通過 SparkSession 進(jìn)行訪問。七您可能仍然會遇到在 Spark 1.x 中編寫的代碼。在 Spark 1.x 中,你會寫出這樣的東西。
在 Spark 2.x 中,您不必顯式創(chuàng)建火花組件、SparkContext 或 SQLContext,因為它們的所有功能都已包含在 SparkSession 中。
val spark = SparkSession. builder(). appName("MyApp"). config("spark.executor.cores", "4"). getOrCreate()
彈性分布式數(shù)據(jù)集 (RDD)
RDD 是跨群集中的一個或多個節(jié)點分區(qū)的對象的彈性不可變分布式集合。RDD 可以通過兩種類型的操作并行處理和操作:轉(zhuǎn)換和操作。
注意RDD 是 Spark 1.x 中 Spark 的主要編程接口,數(shù)據(jù)集已取代 RDD 成為從 Spark 2.0 開始的主 API。建議用戶從RDD切換到數(shù)據(jù)集/數(shù)據(jù)幀,因為編程接口更豐富,性能更好。我將在本章的后面部分討論數(shù)據(jù)集和數(shù)據(jù)幀。
Creating an RDD
創(chuàng)建 RDD 非常簡單。您可以從現(xiàn)有的 Scala 集合或通過讀取存儲在 HDFS 或 S3 中的外部文件來創(chuàng)建 RDD。
并行化
并行化從Scala集合創(chuàng)建 RDD。
val data = (1 to 5).toList val rdd = sc.parallelize(data) val cities = sc.parallelize(List("tokyo","new york","sydney","san francisco"))textFile
文本文件從存儲在 HDFS 或 S3 中的文本文件創(chuàng)建 RDD。
val rdd = sc.textFile("hdfs://master01:9000/files/mydirectory") val rdd = sc.textFile("s3a://mybucket/files/mydata.csv")請注意,RDD 是不可變的。數(shù)據(jù)轉(zhuǎn)換會生成另一個 RDD,而不是修改當(dāng)前的 RDD。RDD操作可分為兩類:轉(zhuǎn)換和操作。
Transformations(轉(zhuǎn)換)
轉(zhuǎn)換是創(chuàng)建新的 RDD 的操作。我描述了一些最常見的轉(zhuǎn)換。有關(guān)完整列表,請參閱聯(lián)機 Spark 文檔。
Map
映射對 RDD 中的每個元素執(zhí)行一個函數(shù)。它將創(chuàng)建并返回結(jié)果的新 RDD。地圖的返回類型不一定必須與原始 RDD 的類型相同。
val cities = sc.parallelize(List("tokyo","new york","paris","san francisco")) val upperCaseCities = myCities.map {x =>x.toUpperCase} upperCaseCities.collect.foreach(println) TOKYO NEW YORK PARIS SAN FRANCISCO讓我們展示另一個地圖示例。
val lines = sc.parallelize(List("Michael Jordan", "iPhone")) val words = lines.map(line =>line.split(" ")) words.collectres2: Array[Array[String]] = Array(Array(Michael, Jordan), Array(iPhone))?
FlatMap
平面映射對 RDD 中的每個元素執(zhí)行一個函數(shù),然后拼合結(jié)果。
val lines = sc.parallelize(List("Michael Jordan", "iPhone")) val words = lines.flatMap(line =>line.split(" ")) words.collectres3: Array[String] = Array(Michael, Jordan, iPhone)?
Filter(濾波器)
篩選器返回僅包含與指定條件匹配的元素的 RDD。
val lines = sc.parallelize(List("Michael Jordan", "iPhone","Michael Corleone")) val words = lines.map(line =>line.split(" ")) val results = words.filter(w =>w.contains("Michael")) results.collectres9: Array[Array[String]] = Array(Array(Michael, Jordan), Array(Michael, Corleone))?
Distinct
非重復(fù)值僅返回非重復(fù)值。
val cities1 = sc.parallelize(List("tokyo","tokyo","paris","sydney")) val cities2 = sc.parallelize(List("perth","tokyo","canberra","sydney")) val cities3 = cities1.union(cities2) cities3.distinct.collect.foreach(println)sydney
perth
canberra
tokyo
paris?
ReduceByKey
使用指定的化簡函數(shù)將值與同一鍵組合在一起。
val pairRDD = sc.parallelize(List(("a", 1), ("b",2), ("c",3), ("a", 30), ("b",25), ("a",20))) val sumRDD = pairRDD.reduceByKey((x,y) =>x+y) sumRDD.collectres15: Array[(String, Int)] = Array((b,27), (a,51), (c,3))?
Keys
密鑰返回僅包含密鑰的 RDD。
val rdd = sc.parallelize(List(("a", "Larry"), ("b", "Curly"), ("c", "Moe"))) val keys = rdd.keys keys.collect.foreach(println)a
b
c?
Values
值返回僅包含值的 RDD。
val rdd = sc.parallelize(List(("a", "Larry"), ("b", "Curly"), ("c", "Moe"))) val value = rdd.values value.collect.foreach(println)Larry
Curly
Moe
Inner Join
內(nèi)部連接根據(jù)連接謂詞返回來自兩個 RDD 的所有元素的 RDD。
val data = Array((100,"Jim Hernandez"), (101,"Shane King")) val employees = sc.parallelize(data) val data2 = Array((100,"Glendale"), (101,"Burbank")) val cities = sc.parallelize(data2) val data3 = Array((100,"CA"), (101,"CA"), (102,"NY")) val states = sc.parallelize(data3) val record = employees.join(cities).join(states) record.collect.foreach(println)(100,((Jim Hernandez,Glendale),CA))
(101,((Shane King,Burbank),CA))
RightOuterJoin and LeftOuterJoin
右外聯(lián)從右側(cè) RDD 返回元素的 RDD,即使左側(cè) RDD 上沒有匹配的行也是如此。左外連接等效于右外聯(lián),列的順序不同。
val record = employees.join(cities).rightOuterJoin(states) record.collect.foreach(println)(100,(Some((Jim Hernandez,Glendale)),CA))
(102,(None,NY))
(101,(Some((Shane King,Burbank)),CA))?
Union
Union返回一個 RDD,其中包含兩個或多個 RDD 的組合。
val data = Array((103,"Mark Choi","Torrance","CA"), (104,"Janet Reyes","RollingHills","CA")) val employees = sc.parallelize(data) val data = Array((105,"Lester Cruz","VanNuys","CA"), (106,"John White","Inglewood","CA")) val employees2 = sc.parallelize(data) val rdd = sc.union([employees, employees2]) rdd.collect.foreach(println)(103,MarkChoi,Torrance,CA)
(104,JanetReyes,RollingHills,CA)
(105,LesterCruz,VanNuys,CA)
(106,JohnWhite,Inglewood,CA)?
Subtract(減去)
減去返回一個 RDD,該 RDD 僅包含第一個 RDD 中的元素。
val data = Array((103,"Mark Choi","Torrance","CA"), (104,"Janet Reyes","Rolling Hills","CA"),(105,"Lester Cruz","Van Nuys","CA")) val rdd = sc.parallelize(data) val data2 = Array((103,"Mark Choi","Torrance","CA")) val rdd2 = sc.parallelize(data2) val employees = rdd.subtract(rdd2) employees.collect.foreach(println)(105,LesterCruz,Van Nuys,CA)
(104,JanetReyes,Rolling Hills,CA)?
Coalesce(合并)
合并可減少 RDD 中的分區(qū)數(shù)。您可能希望在對大型 RDD 執(zhí)行篩選后使用合并。雖然過濾減少了新RDD消耗的數(shù)據(jù)量,但它繼承了原始RDD的分區(qū)數(shù)。如果新的RDD明顯小于原始RDD,則它可能具有數(shù)百或數(shù)千個小分區(qū),這可能會導(dǎo)致性能問題。
當(dāng)您希望在寫入HDFS時減少Spark生成的文件數(shù)量時,合并也很有用,可以防止可怕的“小文件”問題。每個分區(qū)都作為單獨的文件寫入 HDFS。請注意,使用合并時可能會遇到性能問題,因為您在寫入 HDFS 時有效地降低了并行度。如果發(fā)生這種情況,請嘗試增加分區(qū)數(shù)。在以下示例中,我們只將一個 Parquet 文件寫入 HDFS。
df.coalesce(1).write.mode("append").parquet("/user/hive/warehouse/Mytable")Repartition(重新分區(qū))
重新分區(qū)可以減少和增加RDD中的分區(qū)數(shù)。在減少分區(qū)時,通常會使用合并,因為它比重新分區(qū)更有效。增加分區(qū)數(shù)對于在寫入 HDFS 時提高并行度非常有用。在以下示例中,我們將六個 Parquet 文件寫入 HDFS。
df.repartition(6).write.mode("append").parquet("/user/hive/warehouse/Mytable")注意:合并通常比重新分區(qū)快。重新分區(qū)將執(zhí)行完全隨機播放,創(chuàng)建新分區(qū)并在工作線程節(jié)點之間平均分布數(shù)據(jù)。合并可最大限度地減少數(shù)據(jù)移動,并通過使用現(xiàn)有分區(qū)避免完全隨機播放。
Actions(行動)
操作是向驅(qū)動程序返回值的 RDD 操作。我列出了一些最常見的操作。有關(guān)操作的完整列表,請參閱聯(lián)機 Spark 文檔。
Collect(收集)
Collect 將整個數(shù)據(jù)集作為數(shù)組返回到驅(qū)動程序。
val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco")) myCities.collectres2: Array[String] = Array(tokyo, new york, paris, san francisco)?
Count(計數(shù))
Count 返回數(shù)據(jù)集中元素數(shù)的計數(shù)。
val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco")) myCities.countres3: Long = 4
Take(拿)
Take 以數(shù)組的形式返回數(shù)據(jù)集的前 n 個元素。
val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco")) myCities.take(2)res4: Array[String] = Array(tokyo, new york)?
Foreach(前期)
Foreach 對數(shù)據(jù)集的每個元素執(zhí)行一個函數(shù)。
val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco")) myCities.collect.foreach(println)tokyo
newyork
paris
sanFrancisco?
Lazy Evaluation
Spark 支持惰性求值,這對于大數(shù)據(jù)處理至關(guān)重要。Spark 中的所有轉(zhuǎn)換都進(jìn)行了懶惰的評估。Spark 不會立即執(zhí)行轉(zhuǎn)換。您可以繼續(xù)定義更多轉(zhuǎn)換。當(dāng)您最終想要最終結(jié)果時,您將執(zhí)行一個操作,這將導(dǎo)致執(zhí)行轉(zhuǎn)換。
Caching(緩存)
默認(rèn)情況下,每次運行操作時都會重新執(zhí)行每個轉(zhuǎn)換。您可以使用緩存或持久化方法將 RDD 緩存在內(nèi)存中,以避免多次重新執(zhí)行轉(zhuǎn)換。
Accumulator(蓄電池)
累加器是僅“添加”到的變量。它們通常用于實現(xiàn)計數(shù)器。在此示例中,我使用累加器將數(shù)組的元素相加:
val accum = sc.longAccumulator("Accumulator 01") sc.parallelize(Array(10, 20, 30, 40)).foreach(x =>accum.add(x)) accum.valueres2: Long = 100
Broadcast Variable(廣播變量)
廣播變量是存儲在每個節(jié)點內(nèi)存中的只讀變量。Spark 使用高速廣播算法來減少復(fù)制廣播變量的網(wǎng)絡(luò)延遲。使用廣播變量不是將數(shù)據(jù)存儲在慢速存儲引擎(如 HDFS 或 S3)中,而是在每個節(jié)點上存儲數(shù)據(jù)集副本的更快方法。
val broadcastVar = sc.broadcast(Array(10, 20, 30)) broadcastVar.valueres0: Array[Int] = Array(10, 20, 30)?
Spark SQL、Dataset 和 DataFrames API
開發(fā) Spark SQL 是為了簡化處理和分析結(jié)構(gòu)化數(shù)據(jù)的過程。數(shù)據(jù)集類似于RDD,因為它支持強類型,但在引擎蓋下,數(shù)據(jù)集有一個更高效的引擎。從 Spark 2.0 開始,數(shù)據(jù)集 API 現(xiàn)在是主要的編程接口。數(shù)據(jù)幀只是一個具有命名列的數(shù)據(jù)集,類似于關(guān)系表。Spark SQL 和數(shù)據(jù)幀共同為處理和分析結(jié)構(gòu)化數(shù)據(jù)提供了強大的編程接口。下面是有關(guān)如何使用數(shù)據(jù)幀 API 的快速示例。
說明數(shù)據(jù)幀和數(shù)據(jù)集 API 已在 Spark 2.0 中統(tǒng)一。數(shù)據(jù)幀現(xiàn)在只是行數(shù)據(jù)集的類型別名,其中行是通用的非類型化對象。相反,數(shù)據(jù)集是強類型對象的集合 數(shù)據(jù)集[T].斯卡拉支持強類型和無類型化API,而在Java中,數(shù)據(jù)集[T]是主要的抽象。數(shù)據(jù)幀是 R 和 Python 的主要編程接口,因為它缺乏對編譯時類型安全性的支持。
Spark?數(shù)據(jù)源
讀取和寫入不同的文件格式和數(shù)據(jù)源是最常見的數(shù)據(jù)處理任務(wù)之一。我們將在示例中同時使用 RDD 和數(shù)據(jù)幀 API。
.CSV
Spark 為您提供了從 CSV 文件中讀取數(shù)據(jù)的不同方法。您可以先將數(shù)據(jù)讀入RDD,然后將其轉(zhuǎn)換為數(shù)據(jù)幀。
val dataRDD = sc.textFile("/sparkdata/customerdata.csv") val parsedRDD = dataRDD.map{_.split(",")} case class CustomerData(customerid: Int, name: String, city: String, state: String, zip: String) val dataDF = parsedRDD.map{ a =>CustomerData (a(0).toInt, a(1).toString, a(2).toString,a(3).toString,a(4).toString) }.toDF Starting in Spark 2.0, the CSV connector is already built-in. val dataDF = spark.read.format("csv").option("header", "true").load("/sparkdata/customerdata.csv")
.XML
數(shù)據(jù)磚有一個火花XML包,可以很容易地讀取XML數(shù)據(jù)。
cat users.xml <userid>100</userid><name>Wendell Ryan</name><city>San Diego</city><state>CA</state><zip>92102</zip> <userid>101</userid><name>Alicia Thompson</name><city>Berkeley</city><state>CA</state><zip>94705</zip> <userid>102</userid><name>Felipe Drummond</name><city>Palo Alto</city><state>CA</state><zip>94301</zip> <userid>103</userid><name>Teresa Levine</name><city>Walnut Creek</city><state>CA</state><zip>94507</zip> hadoop fs -mkdir /xmldata hadoop fs -put users.xml /xmldata spark-shell --packages??com.databricks:spark-xml_2.10:0.4.1 Create a DataFrame using Spark XML. In this example, we specify the row tag and the path in HDFS where the XML file is located. import com.databricks.spark.xml._ val xmlDF = spark.read.option("rowTag", "user").xml("/xmldata/users.xml"); xmlDF: org.apache.spark.sql.DataFrame = [city: string, name: string, state: string, userid: bigint, zip: bigint] Let’s also take a look at the data. xmlDF.show +------------+---------------+-----+------+-----+ |????????city|???????????name|state|userid|??zip| +------------+---------------+-----+------+-----+ |???San Diego|???Wendell Ryan|???CA|???100|92102| |????Berkeley|Alicia Thompson|???CA|???101|94705| |???Palo Alto|Felipe Drummond|???CA|???102|94301| |Walnut Creek|??Teresa Levine|???CA|???103|94507| +------------+---------------+-----+------+-----+JSON
我們將創(chuàng)建一個 JSON 文件作為此示例的示例數(shù)據(jù)。確保該文件位于 HDFS 中名為 /json 數(shù)據(jù)的文件夾中。
cat users.json {"userid": 200, "name": "Jonathan West", "city":"Frisco", "state":"TX", "zip": "75034", "age":35} {"userid": 201, "name": "Andrea Foreman", "city":"Dallas", "state":"TX", "zip": "75001", "age":28} {"userid": 202, "name": "Kirsten Jung", "city":"Plano", "state":"TX", "zip": "75025", "age":69} {"userid": 203, "name": "Jessica Nguyen", "city":"Allen", "state":"TX", "zip": "75002", "age":52} Create a DataFrame from the JSON file. val jsonDF = spark.read.json("/jsondata/users.json") jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, city: string, name: string, state: string, userid: bigint, zip: string] Check the data. jsonDF.show +---+------+--------------+-----+------+-----+ |age|??city|??????????name|state|userid|??zip| +---+------+--------------+-----+------+-----+ | 35|Frisco| Jonathan West|???TX|???200|75034| | 28|Dallas|Andrea Foreman|???TX|???201|75001| | 69| Plano|??Kirsten Jung|???TX|???202|75025| | 52| Allen|Jessica Nguyen|???TX|???203|75002| +---+------+--------------+-----+------+-----+
關(guān)系數(shù)據(jù)庫和 MPP 數(shù)據(jù)庫
在此示例中,我們使用 MySQL,但也支持其他關(guān)系數(shù)據(jù)庫和 MPP 引擎,如甲骨文、雪花、紅移、黑斑羚、普雷斯托和 Azure DW。通常,只要關(guān)系數(shù)據(jù)庫具有 JDBC 驅(qū)動程序,就應(yīng)該可以從 Spark 訪問它。性能取決于 JDBC 驅(qū)動程序?qū)ε幚聿僮鞯闹С?。有關(guān)更多詳細(xì)信息,請查看 JDBC 驅(qū)動程序的文檔。
mysql -u root -pmypassword create databases salesdb; use salesdb; create table customers ( customerid INT, name VARCHAR(100), city VARCHAR(100), state CHAR(3), zip??CHAR(5)); spark-shell --driver-class-path mysql-connector-java-5.1.40-bin.jar啟動 spark-shell.
將 CSV 文件讀入 RDD 并將其轉(zhuǎn)換為數(shù)據(jù)幀。
val dataRDD = sc.textFile("/home/hadoop/test.csv") val parsedRDD = dataRDD.map{_.split(",")} case class CustomerData(customerid: Int, name: String, city: String, state: String, zip: String) val dataDF = parsedRDD.map{ a =>CustomerData (a(0).toInt, a(1).toString, a(2).toString,a(3).toString,a(4).toString) }.toDF將數(shù)據(jù)框注冊為臨時表,以便我們可以對其運行 SQL 查詢。
dataDF.createOrReplaceTempView("dataDF")讓我們設(shè)置連接屬性。
val jdbcUsername = "myuser" val jdbcPassword = "mypass" val jdbcHostname = "10.0.1.112" val jdbcPort = 3306 val jdbcDatabase ="salesdb" val jdbcrewriteBatchedStatements = "true" val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}&rewriteBatchedStatements=${jdbcrewriteBatchedStatements}" val connectionProperties = new java.util.Properties()
這將使我們能夠指定正確的保存模式 - 追加,覆蓋等。
將 SELECT 語句返回的數(shù)據(jù)插入到存儲在 MySQL salesdb 數(shù)據(jù)庫中的客戶表中。
讓我們使用 JDBC 讀取一個表。讓我們確保用一些測試數(shù)據(jù)填充MySQL中的用戶表。
mysql -u root -pmypassword use salesdb; describe users; +--------+--------------+------+-----+---------+-------+ | Field??| Type?????????| Null | Key | Default | Extra | +--------+--------------+------+-----+---------+-------+ | userid | bigint(20)???| YES??|?????| NULL????|???????| | name???| varchar(100) | YES??|?????| NULL????|???????| | city???| varchar(100) | YES??|?????| NULL????|???????| | state??| char(3)??????| YES??|?????| NULL????|???????| | zip????| char(5)??????| YES??|?????| NULL????|???????| | age????| tinyint(4)???| YES??|?????| NULL????|???????| +--------+--------------+------+-----+---------+-------+ select * from users; Empty set (0.00 sec) insert into users values (300,'Fred Stevens','Torrance','CA',90503,23); insert into users values (301,'Nancy Gibbs','Valencia','CA',91354,49); insert into users values (302,'Randy Park','Manhattan Beach','CA',90267,21); insert into users values (303,'Victoria Loma','Rolling Hills','CA',90274,75); select * from users; +--------+---------------+-----------------+-------+-------+------+ | userid | name??????????| city????????????| state | zip???| age??| +--------+---------------+-----------------+-------+-------+------+ |????300 | Fred Stevens??| Torrance????????| CA????| 90503 |???23 | |????301 | Nancy Gibbs???| Valencia????????| CA????| 91354 |???49 | |????302 | Randy Park????| Manhattan Beach | CA????| 90267 |???21 | |????303 | Victoria Loma | Rolling Hills???| CA????| 90274 |???75 | +--------+---------------+-----------------+-------+-------+------+ spark-shell --driver-class-path mysql-connector-java-5.1.40-bin.jar --jars mysql-connector-java-5.1.40-bin.jar
讓我們設(shè)置 jdbcurl 和連接屬性。
我們可以從整個表創(chuàng)建數(shù)據(jù)幀。
Parquet
閱讀和寫入Parquet很簡單。
val df = spark.read.load("/sparkdata/employees.parquet") df.select("id","firstname","lastname","salary").write.format("parquet").save("/sparkdata/myData.parquet") You can run SELECT statements on Parquet files directly. val df = spark.sql("SELECT * FROM parquet.`/sparkdata/myData.parquet`")HBase
有多種方法可以從星火訪問 HBase。例如,可以使用“存儲”數(shù)據(jù)集將數(shù)據(jù)寫入 HBase。啟動 HBase 外殼。
創(chuàng)建一個 HBase 表并用測試數(shù)據(jù)填充它。
hbase shell create 'users', 'cf1'
啟動spark-shell.
您還可以使用來自 Spark 的 HBase 客戶端 API 將數(shù)據(jù)讀取和寫入 HBase。如前所述,斯卡拉可以訪問所有 Java 庫。
啟動 HBase 外殼。創(chuàng)建另一個 HBase 表,并用測試數(shù)據(jù)填充它。
hbase shell create 'employees', 'cf1' put 'employees','400','cf1:name', 'Patrick Montalban' put 'employees','400','cf1:city', 'Los Angeles' put 'employees','400','cf1:state', 'CA' put 'employees','400','cf1:zip', '90010' put 'employees','400','cf1:age', '71' put 'employees','401','cf1:name', 'Jillian Collins' put 'employees','401','cf1:city', 'Santa Monica' put 'employees','401','cf1:state', 'CA' put 'employees','401','cf1:zip', '90402' put 'employees','401','cf1:age', '45' put 'employees','402','cf1:name', 'Robert Sarkisian' put 'employees','402','cf1:city', 'Glendale' put 'employees','402','cf1:state', 'CA' put 'employees','402','cf1:zip', '91204' put 'employees','402','cf1:age', '29' put 'employees','403','cf1:name', 'Warren Porcaro' put 'employees','403','cf1:city', 'Burbank' put 'employees','403','cf1:state', 'CA' put 'employees','403','cf1:zip', '91523' put 'employees','403','cf1:age', '62'
讓我們驗證數(shù)據(jù)是否已成功插入到 HBase 表中。
啟動spark-shell.
指定 HBase 表和行鍵。
val table = new HTable(configuration, "employees"); val g = new Get(Bytes.toBytes("401")) val result = table.get(g);從表中提取值。
val val2 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")); val val3 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("city")); val val4 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("state")); val val5 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("zip")); val val6 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age"));
將值轉(zhuǎn)換為適當(dāng)?shù)臄?shù)據(jù)類型。
打印值。
println(" employee id: " + id + " name: " + name + " city: " + city + " state: " + state + " zip: " + zip + " age: " + age); employee id: 401 name: Jillian Collins city: Santa Monica state: CA zip: 90402 age: 13365讓我們使用 HBase API 寫入 HBase。
val configuration = HBaseConfiguration.create() val table = new HTable(configuration, "employees");指定新的行鍵。
val p = new Put(new String("404").getBytes());用新值填充單元格。
p.add("cf1".getBytes(), "name".getBytes(), new String("Denise Shulman").getBytes()); p.add("cf1".getBytes(), "city".getBytes(), new String("La Jolla").getBytes()); p.add("cf1".getBytes(), "state".getBytes(), new String("CA").getBytes()); p.add("cf1".getBytes(), "zip".getBytes(), new String("92093").getBytes()); p.add("cf1".getBytes(), "age".getBytes(), new String("56").getBytes());寫入 H 庫表。
table.put(p); table.close();
確認(rèn)這些值已成功插入到 HBase 表中。
雖然通常較慢,但您也可以通過SQL查詢引擎(如黑斑羚或普雷斯托)訪問HBase。
Amazon S3
Amazon S3 是一種常用的對象存儲,經(jīng)常用作瞬態(tài)集群的數(shù)據(jù)存儲。它也是用于備份和冷數(shù)據(jù)的經(jīng)濟高效的存儲。從 S3 讀取數(shù)據(jù)就像從 HDFS 或任何其他文件系統(tǒng)讀取數(shù)據(jù)一樣。
從亞馬遜 S3 讀取 CSV 文件。確保您已配置 S3 憑證。
val myCSV = sc.textFile("s3a://mydata/customers.csv")將 CSV 數(shù)據(jù)映射到 RDD。
import org.apache.spark.sql.Row val myRDD = myCSV.map(_.split(',')).map(e ? Row(r(0).trim.toInt, r(1), r(2).trim.toInt, r(3)))創(chuàng)建架構(gòu)。
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}; val mySchema = StructType(Array( StructField("customerid",IntegerType,false), StructField("customername",StringType,false), StructField("age",IntegerType,false), StructField("city",StringType,false))) val myDF = spark.createDataFrame(myRDD, mySchema)Solr
您可以使用SolrJ從火花與索爾進(jìn)行交互。
import java.net.MalformedURLException; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocumentList; val solr = new HttpSolrServer("http://master02:8983/solr/mycollection"); val query = new SolrQuery(); query.setQuery("*:*"); query.addFilterQuery("userid:3"); query.setFields("userid","name","age","city"); query.setStart(0); query.set("defType", "edismax"); val response = solr.query(query); val results = response.getResults(); println(results);從 Spark 訪問 Solr 集合的一種更簡單的方法是通過火花 solr 包。清醒工場啟動了火花索拉項目,以提供火花索爾集成。九與SolrJ相比,使用火花索拉要容易得多,功能也強大得多,允許您從Solr集合創(chuàng)建數(shù)據(jù)幀。
首先從?spark-shell導(dǎo)入 JAR 文件。
spark-shell --jars spark-solr-3.0.1-shaded.jar指定集合和連接信息。
val options = Map( "collection" -> "mycollection","zkhost" -> "{ master02:8983/solr}")創(chuàng)建數(shù)據(jù)幀。
val solrDF = spark.read.format("solr").options(options).loadMicrosoft Excel
雖然從Spark訪問Excel電子表格是我通常不推薦的,但某些用例需要該功能。一家名為 Crealytics 的公司開發(fā)了一個用于與 Excel 交互的 Spark 插件。該庫需要火花 2.x??梢允褂?--包命令行選項添加包。
spark-shell --packages com.crealytics:spark-excel_2.11:0.9.12從 Excel 工作表創(chuàng)建數(shù)據(jù)幀。
val ExcelDF = spark.read.format("com.crealytics.spark.excel").option("sheetName", "sheet1").option("useHeader", "true").option("inferSchema", "true").option("treatEmptyValuesAsNulls", "true").load("budget.xlsx")
Write a DataFrame to an Excel worksheet.
You can find more details from their GitHub page: github.com/crealytics.
Secure FTP(安全 FTP)
從 SFTP 服務(wù)器下載文件并將其寫入數(shù)據(jù)幀也是一個常見的請求。彈簧ML提供了一個火花SFTP連接器庫。該庫需要 Spark 2.x,并利用 jsch,這是 SSH2 的 Java 實現(xiàn)。讀取和寫入 SFTP 服務(wù)器將作為單個進(jìn)程執(zhí)行。
從 SFTP 服務(wù)器中的文件創(chuàng)建數(shù)據(jù)幀。
val sftpDF = spark.read.format("com.springml.spark.sftp").option("host", "sftpserver.com").option("username", "myusername").option("password", "mypassword").option("inferSchema", "true").option("fileType", "csv").option("delimiter", ",").load("/myftp/myfile.csv")將數(shù)據(jù)幀作為 CSV 文件寫入 FTP 服務(wù)器。
sftpDF2.write.format("com.springml.spark.sftp").option("host", "sftpserver.com").option("username", "myusername").option("password", "mypassword").option("fileType", "csv").option("delimiter", ",").save("/myftp/myfile.csv")您可以從他們的GitHub頁面找到更多詳細(xì)信息:github.com/springml/spark-sftp。
簡介Spark MLlib
機器學(xué)習(xí)是 Spark 的主要應(yīng)用之一。Spark MLlib 包括用于回歸、分類、聚類、協(xié)同過濾和頻繁模式挖掘的常用機器學(xué)習(xí)算法。它還提供了一組廣泛的功能,用于構(gòu)建管道、模型選擇和調(diào)整以及功能選擇、提取和轉(zhuǎn)換。
Spark MLlib算法
Spark MLlib包含大量用于各種任務(wù)的機器學(xué)習(xí)算法。我們將在后續(xù)章節(jié)中介紹其中的大多數(shù)內(nèi)容。
分類
- 邏輯回歸(二項式和多項式)
- 決策樹
- 隨機森林
- 梯度提升樹
- 多層感知器
- 線性支持向量機
- 樸素貝葉斯
- One-vs.-Rest
回歸
- 線性回歸
- 決策樹
- 隨機森林
- 梯度提升樹
- Survival 回歸
- Isotonic 回歸
聚類
- K-Means
- Bisecting K-Means(對 K 均值一分為二)
- 高斯混合模型(GMM)
- Latent Dirichlet Allocation (LDA)
協(xié)同過濾
- Alternating Least Square (ALS)交替最小二乘法
Frequent Pattern Mining(頻繁的模式挖掘)
- FP-Growth
- PrefixSpan
ML Pipelines
Spark MLlib 的早期版本僅包含基于 RDD 的 API。基于數(shù)據(jù)幀的 API 現(xiàn)在是 Spark 的主要 API。一旦基于數(shù)據(jù)幀的 API 達(dá)到功能奇偶校驗,基于 RDD 的 API 將在 Spark 2.3 中被棄用。x基于 RDD 的 API 將在 Spark 3.0 中刪除。基于 DataFrames 的 API 通過提供用于表示類似于關(guān)系數(shù)據(jù)庫表的表格數(shù)據(jù)的更高級抽象,使轉(zhuǎn)換功能變得容易,使其成為實現(xiàn)管道的自然選擇。
Spark MLlib API 引入了幾個用于創(chuàng)建機器學(xué)習(xí)管道的概念。圖 2-3 顯示了用于處理文本數(shù)據(jù)的簡單 Spark MLlib 管道。分詞器將文本分解為一個單詞袋,將單詞附加到輸出數(shù)據(jù)幀。術(shù)語頻率-反向文檔頻率 (TF–IDF) 將數(shù)據(jù)幀作為輸入,將單詞包轉(zhuǎn)換為特征向量,并將其添加到第三個數(shù)據(jù)幀中。
?圖 2-3一個簡單的火花 MLlib 流水線
Pipeline
管道是用于創(chuàng)建機器學(xué)習(xí)工作流的一系列連接階段。級可以是變壓器或估計器。
Transformer
轉(zhuǎn)換器將數(shù)據(jù)幀作為輸入,并輸出一個新的數(shù)據(jù)幀,并將附加列追加到新數(shù)據(jù)幀。新的數(shù)據(jù)幀包括輸入數(shù)據(jù)幀中的列和其他列。
Estimator
估計器是一種機器學(xué)習(xí)算法,用于根據(jù)訓(xùn)練數(shù)據(jù)擬合模型。估計器接受訓(xùn)練數(shù)據(jù)并生成機器學(xué)習(xí)模型。
ParamGridBuilder
參數(shù)網(wǎng)格生成器用于構(gòu)建參數(shù)網(wǎng)格。交叉驗證器執(zhí)行網(wǎng)格搜索,并使用參數(shù)網(wǎng)格中用戶指定的超參數(shù)組合來訓(xùn)練模型。
交叉驗證器
交叉驗證器交叉評估擬合的機器學(xué)習(xí)模型,并通過嘗試使用用戶指定的超參數(shù)組合擬合基礎(chǔ)估計器來輸出最佳模型。使用交叉驗證器或訓(xùn)練驗證拆分估計器執(zhí)行模型選擇。
Evaluator
評估器計算機器學(xué)習(xí)模型的性能。它輸出精度和召回率等指標(biāo),以衡量擬合模型的性能。賦值器的示例包括分別用于二元分類和多類分類任務(wù)的二元分類計算器和多分類計算器,以及用于回歸任務(wù)的回歸計算器。
特征提取、轉(zhuǎn)換和選擇
大多數(shù)情況下,在使用原始數(shù)據(jù)擬合模型之前,需要進(jìn)行額外的預(yù)處理。例如,基于距離的算法要求對要素進(jìn)行標(biāo)準(zhǔn)化。當(dāng)分類數(shù)據(jù)是單熱編碼時,某些算法的性能更好。文本數(shù)據(jù)通常需要標(biāo)記化和特征矢量化。對于非常大的數(shù)據(jù)集,可能需要降維。Spark MLlib 包括用于這些類型任務(wù)的大量變壓器和估計器。我將討論 Spark MLlib 中一些最常用的變壓器和估計器。
字符串索引器
大多數(shù)機器學(xué)習(xí)算法不能直接處理字符串,并要求數(shù)據(jù)采用數(shù)字格式。字符串索引器是將標(biāo)簽的字符串列轉(zhuǎn)換為索引的估計器。它支持四種不同的方法來生成索引:字母數(shù)據(jù)庫、字母表索引、頻率精確度和頻率輔助碼。默認(rèn)值設(shè)置為頻率Desc,最頻繁的標(biāo)簽設(shè)置為 0,結(jié)果按標(biāo)簽頻率的降序排序。
import org.apache.spark.ml.feature.StringIndexer val df = spark.createDataFrame(Seq((0, "car"), (1, "car"), (2, "truck"), (3, "van"), (4, "van"), (5, "van")) ).toDF("id", "class") df.show +---+-----+ | id|class| +---+-----+ |??0|??car| |??1|??car| |??2|truck| |??3|??van| |??4|??van| |??5|??van| +---+-----+ val model = new StringIndexer().setInputCol("class").setOutputCol("classIndex") val indexer = model.fit(df) val indexed = indexer.transform(df) indexed.show() +---+-----+----------+ | id|class|classIndex| +---+-----+----------+ |??0|??car|???????1.0| |??1|??car|???????1.0| |??2|truck|???????2.0| |??3|??van|???????0.0| |??4|??van|???????0.0| |??5|??van|???????0.0| +---+-----+----------+Tokenizer(分詞器)
在分析文本數(shù)據(jù)時,通常必須將句子拆分為單獨的術(shù)語或單詞。分詞器正是這樣做的。您可以使用正則表達(dá)式執(zhí)行更高級的標(biāo)記化,使用正則表達(dá)式。標(biāo)記化通常是機器學(xué)習(xí) NLP 管道中的第一步。我將在第 4 章中更詳細(xì)地討論自然語言處理 (NLP)。
import org.apache.spark.ml.feature.Tokenizer val df = spark.createDataFrame(Seq((0, "Mark gave a speech last night in Laguna Beach"),(1, "Oranges are full of nutrients and low in calories"),(2, "Eddie Van Halen is amazing") )).toDF("id", "sentence")df.show(false) +---+-------------------------------------------------+ |id |sentence?????????????????????????????????????????| +---+-------------------------------------------------+ |0??|Mark gave a speech last night in Laguna Beach????| |1??|Oranges are full of nutrients and low in calories| |2??|Eddie Van Halen is amazing???????????????????????| +---+-------------------------------------------------+ val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") val tokenized = tokenizer.transform(df) tokenized.show(false) +---+-------------------------------------------------+ |id |sentence?????????????????????????????????????????| +---+-------------------------------------------------+ |0??|Mark gave a speech last night in Laguna Beach????| |1??|Oranges are full of nutrients and low in calories| |2??|Eddie Van Halen is amazing???????????????????????| +---+-------------------------------------------------+ +-----------------------------------------------------------+ |words??????????????????????????????????????????????????????| +-----------------------------------------------------------+ |[mark, gave, a, speech, last, night, in, laguna, beach]????| |[oranges, are, full, of, nutrients, and, low, in, calories]| |[eddie, van, halen, is, amazing]???????????????????????????| +-----------------------------------------------------------+VectorAssembler(矢量裝配器)
Spark MLlib 算法要求將要素存儲在單個向量列中。通常,訓(xùn)練數(shù)據(jù)將以表格格式提供,其中數(shù)據(jù)存儲在單獨的列中。矢量組件是將一組列合并為單個矢量列的轉(zhuǎn)換器。
import org.apache.spark.ml.feature.VectorAssembler val df = spark.createDataFrame(Seq((0, 50000, 7, 1)) ).toDF("id", "income", "employment_length", "marital_status") val assembler = new VectorAssembler() .setInputCols(Array("income", "employment_length", "marital_status")) .setOutputCol("features") val df2 = assembler.transform(df) df2.show(false) +---+------+-----------------+--------------+-----------------+ |id |income|employment_length|marital_status|features?????????| +---+------+-----------------+--------------+-----------------+ |0??|50000 |7????????????????|1?????????????|[50000.0,7.0,1.0]| +---+------+-----------------+--------------+-----------------+StandardScaler(標(biāo)準(zhǔn)標(biāo)定器)
如第1章所述,一些機器學(xué)習(xí)算法需要對特征進(jìn)行規(guī)范化才能正常工作。標(biāo)準(zhǔn)標(biāo)度器是一種估計器,用于將要素歸一化為具有單位標(biāo)準(zhǔn)差和/或零均值。它接受兩個參數(shù):使用Std和使用Mean.使用Std將特征縮放為單位標(biāo)準(zhǔn)差。默認(rèn)情況下,此參數(shù)設(shè)置為 true。在縮放之前,將 Mean 設(shè)置為以平均值為真中心。默認(rèn)情況下,此參數(shù)設(shè)置為 false。
import org.apache.spark.ml.feature.StandardScaler import org.apache.spark.ml.feature.VectorAssembler val df = spark.createDataFrame(Seq((0, 186, 200, 56),(1, 170, 198, 42)) ).toDF("id", "height", "weight", "age") val assembler = new VectorAssembler() .setInputCols(Array("height", "weight", "age")) .setOutputCol("features") val df2 = assembler.transform(df) df2.show(false) +---+------+------+---+------------------+ |id |height|weight|age|features??????????| +---+------+------+---+------------------+ |0??|186???|200???|56 |[186.0,200.0,56.0]| |1??|170???|198???|42 |[170.0,198.0,42.0]| +---+------+------+---+------------------+ val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false) val model = scaler.fit(df2) val scaledData = model.transform(df2) scaledData.select("features","scaledFeatures").show(false) +------------------+------------------------------------------------------+ |features??????????|scaledFeatures????????????????????????????????????????| +------------------+------------------------------------------------------+ |[186.0,200.0,56.0]|[16.440232662587228,141.42135623730948,5.656854249492]| |[170.0,198.0,42.0]|[15.026019100214134,140.0071426749364,4.2426406871192]| +------------------+------------------------------------------------------+用于重新縮放數(shù)據(jù)的附加變壓器包括歸一化器、最小值標(biāo)定器和最大Abs標(biāo)定器。有關(guān)更多詳細(xì)信息,請查看 Apache Spark 在線文檔。
StopWordsRemover(停止字切換)
通常在文本分析中使用,停止字切換從字符串序列中刪除非索引字。停用詞(如 I、the 和 a)對文檔的含義沒有多大貢獻(xiàn)。
import org.apache.spark.ml.feature.StopWordsRemover val remover = new StopWordsRemover().setInputCol("data").setOutputCol("output") val dataSet = spark.createDataFrame(Seq((0, Seq("She", "is", "a", "cute", "baby")),(1, Seq("Bob", "never", "went", "to", "Seattle")) )).toDF("id", "data") val df = remover.transform(dataSet) df.show(false) +---+-------------------------------+---------------------------+ |id |data???????????????????????????|output?????????????????????| +---+-------------------------------+---------------------------+ |0??|[She, is, a, cute, baby]???????|[cute, baby]???????????????| |1??|[Bob, never, went, to, Seattle]|[Bob, never, went, Seattle]| +---+-------------------------------+---------------------------+
n-gram
When performing text analysis, it is sometimes advantageous to combine terms into n-grams, a combination of terms in a document. Creating n-grams helps extract more meaningful information from a document. For example, the words “San” and “Diego” individually don’t mean much, but combining them into a bigram “San Diego” provides more context. We use n-gram later in Chapter 4.
import org.apache.spark.ml.feature.NGram val df = spark.createDataFrame(Seq((0, Array("Los", "Angeles", "Lobos", "San", "Francisco")),(1, Array("Stand", "Book", "Case", "Phone", "Mobile", "Magazine")),(2, Array("Deep", "Learning", "Machine", "Algorithm", "Pizza")) )).toDF("id", "words") val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams") val df2 = ngram.transform(df) df2.select("ngrams").show(false) +---------------------------------------------------------------------+ |ngrams???????????????????????????????????????????????????????????????| +---------------------------------------------------------------------+ |[Los Angeles, Angeles Lobos, Lobos San, San Francisco]???????????????| |[Stand Book, Book Case, Case Phone, Phone Mobile, Mobile Magazine]???| |[Deep Learning, Learning Machine, Machine Algorithm, Algorithm Pizza]| +---------------------------------------------------------------------+OneHotEncoderEstimator
One-hot 編碼將分類特征轉(zhuǎn)換為二進(jìn)制向量,其中最多只有一個值,表示所有特征集中存在特定特征值。西單熱編碼分類變量是許多機器學(xué)習(xí)算法(如邏輯回歸和支持向量機)的要求。OneHot編碼器測試器可以隱藏多個列,為每個輸入列生成一個熱編碼的向量列。
import org.apache.spark.ml.feature.StringIndexer val df = spark.createDataFrame(Seq((0, "Male"), (1, "Male"), (2, "Female"), (3, "Female"), (4, "Female"), (5, "Male")) ).toDF("id", "gender") df.show() +---+------+ | id|gender| +---+------+ |??0|??Male| |??1|??Male| |??2|Female| |??3|Female| |??4|Female| |??5|??Male| +---+------+ val indexer = new StringIndexer().setInputCol("gender").setOutputCol("genderIndex") val indexed = indexer.fit(df).transform(df) indexed.show() +---+------+-----------+ | id|gender|genderIndex| +---+------+-----------+ |??0|??Male|????????1.0| |??1|??Male|????????1.0| |??2|Female|????????0.0| |??3|Female|????????0.0| |??4|Female|????????0.0| |??5|??Male|????????1.0| +---+------+-----------+ import org.apache.spark.ml.feature.OneHotEncoderEstimator val encoder = new OneHotEncoderEstimator().setInputCols(Array("genderIndex")).setOutputCols(Array("genderEnc")) val encoded = encoder.fit(indexed).transform(indexed) encoded.show() +---+------+-----------+-------------+ | id|gender|genderIndex|????genderEnc| +---+------+-----------+-------------+ |??0|??Male|????????1.0|????(1,[],[])| |??1|??Male|????????1.0|????(1,[],[])| |??2|Female|????????0.0|(1,[0],[1.0])| |??3|Female|????????0.0|(1,[0],[1.0])| |??4|Female|????????0.0|(1,[0],[1.0])| |??5|??Male|????????1.0|????(1,[],[])| +---+------+-----------+-------------+
SQLTransformer
SQL轉(zhuǎn)換器允許您使用SQL執(zhí)行數(shù)據(jù)轉(zhuǎn)換。虛擬表“__THIS__”對應(yīng)于輸入數(shù)據(jù)集。
Term Frequency–Inverse Document Frequency (TF–IDF)
TF–IDF or term frequency–inverse document frequency is a feature vectorization method commonly used in text analysis. It is frequently used to indicate the importance of a term or word to a document in the corpus. A transformer, HashingTF, uses feature hashing to convert terms into feature vectors. An estimator, IDF, scales the vectors generated by the HashingTF (or CountVectorizer). I discuss TF–IDF in greater detail in Chapter 4.
主成分分析(PCA)
主成分分析 (PCA) 是一種降維技術(shù),它將相關(guān)特征組合成一組較小的線性不相關(guān)特征(稱為主成分)。PCA在圖像識別和異常檢測等多個領(lǐng)域都有應(yīng)用。我將在第 4 章中更詳細(xì)地討論 PCA。
ChiSqSelector(奇思克選機)
ChiSq選擇性器使用卡方獨立性測試進(jìn)行特征選擇??ǚ綑z驗是一種檢驗兩個類別變量關(guān)系的方法。數(shù)字頂部特征是默認(rèn)的選擇方法。它返回基于卡方檢驗的一組特征,或具有最大預(yù)測影響的特征。其他選擇方法包括百分位數(shù)、fpr、fdr 和少數(shù)幾種。
import org.apache.spark.ml.feature.ChiSqSelector import org.apache.spark.ml.linalg.Vectors val data = Seq((0, Vectors.dense(5.1, 2.9, 5.6, 4.8), 0.0),(1, Vectors.dense(7.3, 8.1, 45.2, 7.6), 1.0),(2, Vectors.dense(8.2, 12.6, 19.5, 9.21), 1.0) ) val df = spark.createDataset(data).toDF("id", "features", "class") val selector = new ChiSqSelector().setNumTopFeatures(1).setFeaturesCol("features").setLabelCol("class").setOutputCol("selectedFeatures") val df2 = selector.fit(df).transform(df) df2.show() +---+--------------------+-----+----------------+ | id|????????????features|class|selectedFeatures| +---+--------------------+-----+----------------+ |??0|???[5.1,2.9,5.6,4.8]|??0.0|???????????[5.1]| |??1|??[7.3,8.1,45.2,7.6]|??1.0|???????????[7.3]| |??2|[8.2,12.6,19.5,9.21]|??1.0|???????????[8.2]| +---+--------------------+-----+----------------+
Correlation(相關(guān))
相關(guān)性評估兩個變量之間線性關(guān)系的強度。對于線性問題,您可以使用相關(guān)性來選擇相關(guān)要素(要素類相關(guān)性)并識別冗余要素(要素內(nèi)相關(guān)性)。斯帕克·姆利布支持皮爾遜和斯皮爾曼的相關(guān)性。在以下示例中,相關(guān)性計算輸入向量的相關(guān)矩陣。
import org.apache.spark.ml.linalg.{Matrix, Vectors} import org.apache.spark.ml.stat.Correlation import org.apache.spark.sql.Row val data = Seq(Vectors.dense(5.1, 7.0, 9.0, 6.0),Vectors.dense(3.2, 1.1, 6.0, 9.0),Vectors.dense(3.5, 4.2, 9.1, 3.0),Vectors.dense(9.1, 2.6, 7.2, 1.8) ) val df = data.map(Tuple1.apply).toDF("features") +-----------------+ |?????????features| +-----------------+ |[5.1,7.0,9.0,6.0]| |[3.2,1.1,6.0,9.0]| |[3.5,4.2,9.1,3.0]| |[9.1,2.6,7.2,1.8]| +-----------------+ val Row(c1: Matrix) = Correlation.corr(df, "features").head c1: org.apache.spark.ml.linalg.Matrix = 1.0??????????????????-0.01325851107237613??-0.08794286922175912?-0.6536434849076798 -0.01325851107237613??1.0???????????????????0.8773748081826724??-0.1872850762579899 -0.08794286922175912??0.8773748081826724????1.0?????????????????-0.46050932066780714 -0.6536434849076798??-0.1872850762579899???-0.46050932066780714??1.0 val Row(c2: Matrix) = Correlation.corr(df, "features", "spearman").head c2: org.apache.spark.ml.linalg.Matrix = 1.0???????????????????0.399999999999999????0.19999999999999898??-0.8000000000000014 0.399999999999999?????1.0??????????????????0.8000000000000035???-0.19999999999999743 0.19999999999999898???0.8000000000000035???1.0??????????????????-0.39999999999999486 -0.8000000000000014??-0.19999999999999743??-0.39999999999999486??1.0
您還可以計算存儲在數(shù)據(jù)幀列中的值的相關(guān)性,如下所示。
評估指標(biāo)
如第 1 章所述,精度、召回率和準(zhǔn)確性是評估模型性能的重要評估指標(biāo)。但是,它們可能并不總是某些問題的最佳指標(biāo)。
Area Under the Receiver Operating Characteristic (AUROC)
接收器工作特性 (AUROC) 下方的區(qū)域是用于評估二元分類器的常見性能指標(biāo)。接收器工作特性 (ROC) 是一個圖形,用于繪制真陽性率與假陽性率。曲線下的面積 (AUC) 是 ROC 曲線下方的面積。AUC 可以解釋為模型將隨機正示例排名高于隨機負(fù)示例的概率。十二曲線下方的面積越大(AUROC 越接近 1.0),模型的性能越好。AUROC 為 0.5 的模型是無用的,因為它的預(yù)測準(zhǔn)確性與隨機猜測一樣好。
F1 度量值
F1 度量值或 F1 分?jǐn)?shù)是精度和召回率的諧波平均值或加權(quán)平均值。它是用于評估多類分類器的常見性能指標(biāo)。當(dāng)類分布不均勻時,這也是一個很好的衡量標(biāo)準(zhǔn)。最好的F1分?jǐn)?shù)是1,而最差的分?jǐn)?shù)是0。良好的 F1 度量值意味著您的漏報率低,誤報率低。F1 度量值的公式為:F1-度量值 = 2 ?(精度?召回率)/(精度 + 召回率)。
?
均方根誤差(RMSE)
均方根誤差 (RMSE) 是回歸任務(wù)的最常見指標(biāo)。RMSE 只是均方誤差 (MSE) 的平方根。MSE 指示回歸線與一組數(shù)據(jù)點的接近程度,方法是獲取從點到回歸線的距離或“誤差”,并將它們平方。十三MSE 越小,適合度越高。但是,MSE 與原始數(shù)據(jù)的單位不匹配,因為該值是平方的。RMSE 具有與輸出相同的單位。
我在后續(xù)章節(jié)中介紹了其他評估指標(biāo),例如平方誤差集合和內(nèi) (WSSSE) 和輪廓系數(shù)。有關(guān) Spark MLlib 支持的所有評估指標(biāo)的完整列表,請參閱 Spark 的在線文檔。
Model Persistence(模型持久性)
Spark MLlib 允許您保存模型并在以后加載它們。如果要將模型與第三方應(yīng)用程序集成或與團(tuán)隊的其他成員共享,這將特別有用。
保存單個隨機森林模型
rf = RandomForestClassifier(numBin=10,numTrees=30) model = rf.fit(training) model.save("modelpath")加載單個隨機森林模型
val model2 = RandomForestClassificationModel.load(“modelpath”)保存完整管道
val pipeline = new Pipeline().setStages(Array(labelIndexer,vectorAssembler, rf)) val cv = new CrossValidator().setEstimator(pipeline) val model = cv.fit(training) model.save("modelpath")加載完整管道
val model2 = CrossValidatorModel.load("modelpath")Spark MLlib示例
讓我們來看一個示例。我們將使用心臟病數(shù)據(jù)集十四從UCI機器學(xué)習(xí)存儲庫中預(yù)測心臟病的存在。這些數(shù)據(jù)是由羅伯特·德特拉諾,醫(yī)學(xué)博士,博士及其在VA醫(yī)療中心,長灘和克利夫蘭診所基金會的團(tuán)隊收集的。從歷史上看,克利夫蘭數(shù)據(jù)集一直是許多研究的主題,因此我們將使用該數(shù)據(jù)集。原始數(shù)據(jù)集有76個屬性,但其中只有14個用于ML研究(表2-1)。我們將進(jìn)行二項式分類并確定患者是否患有心臟病(清單2-2)。
Table 2-1克利夫蘭心臟病數(shù)據(jù)集屬性信息
| 屬性 | 描述: __________ |
| age | 年齡 |
| sex | 性別 |
| cp | 胸痛類型 |
| trestbps | 靜息血壓 |
| chol | 血清膽固醇(毫克/分升) |
| fbs | 空腹血糖>120毫克/分升 |
| restecg | 靜息心電圖結(jié)果 |
| thalach | 達(dá)到的最大心率 |
| exang | 運動性心絞痛 |
| oldpeak | 相對于休息的運動引起的 ST 段性抑郁 |
| slope | 峰值運動ST段的斜率 |
| ca | 用氟橡膠著色的主要容器(0-3)的數(shù)量 |
| thal | 鉈應(yīng)力測試結(jié)果 |
| num | 預(yù)測屬性 –?心臟病的診斷 |
讓我們開始吧。下載文件并將其復(fù)制到 HDFS。
wget http://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/cleveland.data head -n 10 processed.cleveland.data 63.0,1.0,1.0,145.0,233.0,1.0,2.0,150.0,0.0,2.3,3.0,0.0,6.0,0 67.0,1.0,4.0,160.0,286.0,0.0,2.0,108.0,1.0,1.5,2.0,3.0,3.0,2 67.0,1.0,4.0,120.0,229.0,0.0,2.0,129.0,1.0,2.6,2.0,2.0,7.0,1 37.0,1.0,3.0,130.0,250.0,0.0,0.0,187.0,0.0,3.5,3.0,0.0,3.0,0 41.0,0.0,2.0,130.0,204.0,0.0,2.0,172.0,0.0,1.4,1.0,0.0,3.0,0 56.0,1.0,2.0,120.0,236.0,0.0,0.0,178.0,0.0,0.8,1.0,0.0,3.0,0 62.0,0.0,4.0,140.0,268.0,0.0,2.0,160.0,0.0,3.6,3.0,2.0,3.0,3 57.0,0.0,4.0,120.0,354.0,0.0,0.0,163.0,1.0,0.6,1.0,0.0,3.0,0 63.0,1.0,4.0,130.0,254.0,0.0,2.0,147.0,0.0,1.4,2.0,1.0,7.0,2 53.0,1.0,4.0,140.0,203.0,1.0,2.0,155.0,1.0,3.1,3.0,0.0,7.0,1 hadoop fs -put processed.cleveland.data /tmp/data
我們使用spark-shell來交互式訓(xùn)練我們的模型。
清單 2-2使用隨機森林執(zhí)行二元分類
現(xiàn)在,我們可以擬合模型了。
對測試數(shù)據(jù)執(zhí)行預(yù)測。
val prediction = model.transform(testData)讓我們評估模型。
import org.apache.spark.ml.param.ParamMap val pm = ParamMap(evaluator.metricName -> "areaUnderROC") val aucTestData = evaluator.evaluate(prediction, pm)圖形處理
Spark包括一個名為 GraphX 的圖形處理框架。有一個單獨的包稱為圖形幀,它基于數(shù)據(jù)幀。圖形幀目前不是核心阿帕奇火花的一部分。在撰寫本文時,GraphX和圖形框架仍在積極開發(fā)中。十五我在第6章中介紹了GraphX。
超越Spark MLlib:第三方機器學(xué)習(xí)集成
Spark可以訪問豐富的第三方框架和庫生態(tài)系統(tǒng),這要歸功于無數(shù)的開源貢獻(xiàn)者以及微軟和谷歌等公司。雖然我介紹了核心的火花MLlib算法,但本書的重點是更強大的下一代算法和框架,如XGBoost,LightGBM,隔離森林,火花NLP和分布式深度學(xué)習(xí)。我將在以后的章節(jié)中介紹它們。
使用Alluxio優(yōu)化Spark和SparkMLlib
Alluxio,前身為Tachyon,是加州大學(xué)伯克利分校AMPLab的一個開源項目。Alluxio是一個以內(nèi)存為中心的分布式存儲系統(tǒng),最初由李浩元于2012年開發(fā),當(dāng)時他是一名博士生,也是AMPLab的Apache Spark提交者的創(chuàng)始者。十六該項目是伯克利數(shù)據(jù)分析堆棧(BDAS)的存儲層。2015年,Alluxio, Inc.由李創(chuàng)立,旨在將Alluxio商業(yè)化,從安德森·霍洛維茨獲得了750萬美元的現(xiàn)金注入。如今,Alluxio擁有來自全球50個組織的200多名貢獻(xiàn)者,如英特爾,IBM,雅虎和紅帽。目前,一些知名公司正在生產(chǎn)中使用Alluxio,如百度,阿里巴巴,Rackspace和巴克萊銀行。十七
Alluxio可用于優(yōu)化Spark機器學(xué)習(xí)和深度學(xué)習(xí)工作負(fù)載,為超大數(shù)據(jù)集提供超快的大數(shù)據(jù)存儲。Alluxio進(jìn)行的深度學(xué)習(xí)基準(zhǔn)測試顯示,從Alluxio而不是S3讀取數(shù)據(jù)時,性能顯著提高。十八
Architecture(建筑)
Alluxio是一個以內(nèi)存為中心的分布式存儲系統(tǒng),旨在成為大數(shù)據(jù)事實上的存儲統(tǒng)一層。它提供了一個虛擬化層,統(tǒng)一訪問不同的存儲引擎(如本地FS,HDFS,S3和NFS)和計算框架(如火花,地圖還原,蜂巢和普雷斯托)。圖 2-4 概述了 Alluxio 的架構(gòu)。
?圖2-4Alluxio?架構(gòu)概述
Alluxio是協(xié)調(diào)數(shù)據(jù)共享和指導(dǎo)數(shù)據(jù)訪問的中間層,同時為計算框架和大數(shù)據(jù)應(yīng)用程序提供高性能的低延遲內(nèi)存速度。阿盧克西奧與火花和Hadoop無縫集成,只需要很小的配置更改。通過利用Alluxio的統(tǒng)一命名空間功能,應(yīng)用程序只需連接到Alluxio即可訪問存儲在任何受支持的存儲引擎中的數(shù)據(jù)。阿盧克西奧有自己的原生API以及一個與Hadoop兼容的文件系統(tǒng)接口。方便類使用戶能夠執(zhí)行最初為 Hadoop 編寫的代碼,而無需更改任何代碼。REST API 提供對其他語言的訪問。我們將在本章的后面部分探討這些 API。
Alluxio的統(tǒng)一命名空間功能不支持關(guān)系數(shù)據(jù)庫和MPP引擎(如紅移或雪花)或文檔數(shù)據(jù)庫(如MongoDB)。當(dāng)然,支持與Alluxio和提到的存儲引擎之間的寫入。開發(fā)人員可以使用 Spark 等計算框架從 Redshift 表創(chuàng)建數(shù)據(jù)幀,并將其以 Parquet 或 CSV 格式存儲在 Alluxio 文件系統(tǒng)中,反之亦然(圖 2-5)。
?圖2-5Alluxio?技術(shù)架構(gòu)
為什么使用Alluxio?
顯著提高大數(shù)據(jù)處理性能和可擴展性
多年來,內(nèi)存變得越來越便宜,而其性能也越來越快。與此同時,硬盤驅(qū)動器的性能僅略有好轉(zhuǎn)。毫無疑問,內(nèi)存中的數(shù)據(jù)處理比處理磁盤上的數(shù)據(jù)快一個數(shù)量級。在幾乎所有的編程范例中,我們都建議在內(nèi)存中緩存數(shù)據(jù)以提高性能。與MapReduce相比,阿帕奇火花的主要優(yōu)勢之一是它能夠緩存數(shù)據(jù)。Alluxio將其提升到一個新的水平,提供的大數(shù)據(jù)應(yīng)用程序不僅僅是一個緩存層,而是一個成熟的分布式高性能以內(nèi)存為中心的存儲系統(tǒng)。
百度運營著世界上最大的Alluxio集群之一,擁有1000個工作節(jié)點,處理超過2PB的數(shù)據(jù)。借助Alluxio,百度在查詢和處理時間方面平均提高了10倍和30倍的性能,從而顯著提高了百度做出重要業(yè)務(wù)決策的能力。十九巴克萊發(fā)表了一篇文章,描述了他們在Alluxio的經(jīng)歷。巴克萊數(shù)據(jù)科學(xué)家吉安馬里奧·斯帕卡尼亞和高級分析主管哈里·鮑威爾能夠使用Alluxio將他們的Spark工作從幾小時調(diào)整到幾秒鐘。斷續(xù)器Qunar.com 是中國最大的旅游搜索引擎之一,使用Alluxio將性能提高了15倍至300倍。
多個框架和應(yīng)用程序可以以內(nèi)存速度共享數(shù)據(jù)
典型的大數(shù)據(jù)集群具有多個會話,這些會話運行不同的計算框架,例如Spark和MapReduce。在Spark的情況下,每個應(yīng)用程序都有自己的執(zhí)行器進(jìn)程,執(zhí)行器中的每個任務(wù)都在自己的JVM上運行,從而將Spark應(yīng)用程序彼此隔離。這意味著 Spark(和 MapReduce)應(yīng)用程序除了寫入 HDFS 或 S3 等存儲系統(tǒng)外,無法共享數(shù)據(jù)。如圖 2-6 所示,Spark 作業(yè)和 MapReduce 作業(yè)使用的是 HDFS 或 S3 中存儲的相同數(shù)據(jù)。在圖 2-7 中,多個 Spark 作業(yè)使用相同的數(shù)據(jù),每個作業(yè)在自己的堆空間中存儲自己的數(shù)據(jù)版本。不僅數(shù)據(jù)重復(fù),而且通過 HDFS 或 S3 共享數(shù)據(jù)可能會很慢,尤其是在共享大量數(shù)據(jù)的情況下。
?圖2-6通過HDFS或S3共享數(shù)據(jù)的不同框架
?圖2-7通過HDFS或S3共享數(shù)據(jù)的不同作業(yè)
通過將 Alluxio 用作堆外存儲(圖 2-8),多個框架和作業(yè)可以以內(nèi)存速度共享數(shù)據(jù),從而減少數(shù)據(jù)重復(fù)、提高吞吐量并減少延遲。
?圖 2-8以內(nèi)存速度共享數(shù)據(jù)的不同作業(yè)和框架
在應(yīng)用程序終止或發(fā)生故障時提供高可用性和持久性
在 Spark 中,執(zhí)行器進(jìn)程和執(zhí)行程序內(nèi)存駐留在同一 JVM 中,所有緩存數(shù)據(jù)存儲在 JVM 堆空間中(圖 2-9)。
?圖 2-9具有自己的堆內(nèi)存的 Spark 作業(yè)
當(dāng)作業(yè)完成或由于某種原因 JVM 由于運行時異常而崩潰時,緩存在堆空間中的所有數(shù)據(jù)都將丟失,如圖 2-10 和 2-11 所示。
?圖 2-10Spark 作業(yè)崩潰或完成
?圖 2-11Spark 作業(yè)崩潰或完成。堆空間丟失
解決方案是將 Alluxio 用作堆外存儲(圖 2-12)。
?圖 2-12使用 Alluxio 作為堆外存儲的 Spark
在這種情況下,即使 Spark JVM 崩潰,數(shù)據(jù)在 Alluxio 中仍然可用(圖 2-13 和圖 2-14)。
?圖 2-13Spark 作業(yè)崩潰或完成
?圖 2-14Spark 作業(yè)崩潰或完成。堆空間丟失。堆外內(nèi)存仍然可用
優(yōu)化整體內(nèi)存使用情況并最大限度地減少垃圾回收
通過使用 Alluxio,內(nèi)存使用效率大大提高,因為數(shù)據(jù)在作業(yè)和框架之間共享,并且由于數(shù)據(jù)存儲在堆外,垃圾回收也最小化,從而進(jìn)一步提高了作業(yè)和應(yīng)用程序的性能(圖 2-15)。
?圖 2-15多個 Spark 和 MapReduce 作業(yè)可以訪問存儲在 Alluxio 中的相同數(shù)據(jù)
降低硬件要求
使用Alluxio進(jìn)行大數(shù)據(jù)處理比使用HDFS和S3要快得多。IBM的測試顯示,在寫入IO方面,Alluxio的表現(xiàn)比HDFS高出110倍。二十三有了這種性能,對額外硬件的需求就會減少,從而節(jié)省了基礎(chǔ)設(shè)施和許可成本。
Apache Spark 和?Alluxio
您可以在Alluxio中訪問數(shù)據(jù),類似于從Spark訪問存儲在HDFS和S3中的數(shù)據(jù)。
val dataRDD = sc.textFile("alluxio://localhost:19998/test01.csv") val parsedRDD = dataRDD.map{_.split(",")} case class CustomerData(userid: Long, city: String, state: String, age: Short) val dataDF = parsedRDD.map{ a =>CustomerData(a(0).toLong, a(1).toString, a(2).toString, a(3).toShort) }.toDF dataDF.show() +------+---------------+-----+---+ |userid|???????????city|state|age| +------+---------------+-----+---+ |???300|???????Torrance|???CA| 23| |???302|Manhattan Beach|???CA| 21| +------+---------------+-----+---+總結(jié)
本章簡要介紹了 Spark 和 Spark MLlib,足以為您提供執(zhí)行常見數(shù)據(jù)處理和機器學(xué)習(xí)任務(wù)所需的技能。我的目標(biāo)是讓你盡快上手。
總結(jié)
以上是生活随笔為你收集整理的【Spark ML】第 2 章: Spark和Spark简介的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 转 《图说区块链》读书笔记(完整版)
- 下一篇: 洗料系列-杂谈篇-麻将自动化---第一章