日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【Spark ML】第 2 章: Spark和Spark简介

發(fā)布時間:2023/12/15 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Spark ML】第 2 章: Spark和Spark简介 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

??🔎大家好,我是Sonhhxg_柒,希望你看完之后,能對你有所幫助,不足請指正!共同學(xué)習(xí)交流🔎

📝個人主頁-Sonhhxg_柒的博客_CSDN博客?📃

🎁歡迎各位→點贊👍 + 收藏?? + 留言📝?

📣系列專欄 - 機器學(xué)習(xí)【ML】?自然語言處理【NLP】? 深度學(xué)習(xí)【DL】

?

?🖍foreword

?說明?本人講解主要包括Python、機器學(xué)習(xí)(ML)、深度學(xué)習(xí)(DL)、自然語言處理(NLP)等內(nèi)容。

如果你對這個系列感興趣的話,可以關(guān)注訂閱喲👋

文章目錄

概述

Cluster Managers(集群管理器)

Architecture(建筑)

執(zhí)行Spark?應(yīng)用程序

集群模式

客戶端模式

spark-shell簡介

SparkSession

Creating an RDD

并行化

textFile

Transformations(轉(zhuǎn)換)

Map

FlatMap

Filter(濾波器)

Distinct

ReduceByKey

Keys

Values

Inner Join

RightOuterJoin and LeftOuterJoin

Union

Subtract(減去)

Coalesce(合并)

Repartition(重新分區(qū))

Actions(行動)

Collect(收集)

Count(計數(shù))

Take(拿)

Foreach(前期)

Lazy Evaluation

Caching(緩存)

Accumulator(蓄電池)

Broadcast Variable(廣播變量)

Spark SQL、Dataset 和 DataFrames API

Spark?數(shù)據(jù)源

.CSV

.XML

JSON

關(guān)系數(shù)據(jù)庫和 MPP 數(shù)據(jù)庫

Parquet

HBase

Amazon S3

Solr

Microsoft Excel

Secure FTP(安全 FTP)

簡介Spark MLlib

Spark MLlib算法

ML Pipelines

Pipeline

Transformer

Estimator

ParamGridBuilder

交叉驗證器

Evaluator

特征提取、轉(zhuǎn)換和選擇

字符串索引器

Tokenizer(分詞器)

VectorAssembler(矢量裝配器)

StandardScaler(標(biāo)準(zhǔn)標(biāo)定器)

StopWordsRemover(停止字切換)

n-gram

OneHotEncoderEstimator

SQLTransformer

主成分分析(PCA)

ChiSqSelector(奇思克選機)

Correlation(相關(guān))

評估指標(biāo)

Area Under the Receiver Operating Characteristic (AUROC)

F1 度量值

均方根誤差(RMSE)

Model Persistence(模型持久性)

Spark MLlib示例

圖形處理

超越Spark MLlib:第三方機器學(xué)習(xí)集成

使用Alluxio優(yōu)化Spark和SparkMLlib

Architecture(建筑)

為什么使用Alluxio?

顯著提高大數(shù)據(jù)處理性能和可擴展性

多個框架和應(yīng)用程序可以以內(nèi)存速度共享數(shù)據(jù)

降低硬件要求

Apache Spark 和?Alluxio

總結(jié)


Spark是一個統(tǒng)一的大數(shù)據(jù)處理框架,用于處理和分析大型數(shù)據(jù)集。Spark 在Scala, Python, Java, 和?R?中提供了高級 API,其中包含功能強大的庫,包括用于機器學(xué)習(xí)的 MLlib、用于 SQL 支持的 Spark SQL、用于實時流的 Spark 流式處理和用于圖形處理的 GraphX。第二Spark由Matei Zaharia在加州大學(xué)伯克利分校的AMPLab創(chuàng)立,后來捐贈給Apache軟件基金會,于2014年2月24日成為頂級項目。第三第一個版本于2017年5月30日發(fā)布。

概述


《Spark》的開發(fā)是為了解決哈多普的原始數(shù)據(jù)處理框架MapReduce的局限性。Matei Zaharia看到了MapReduce在加州大學(xué)伯克利分校和Facebook(他在那里實習(xí))的局限性,并試圖創(chuàng)建一個更快,更通用,多用途的數(shù)據(jù)處理框架,可以處理迭代和交互式應(yīng)用程序。在它提供了一個統(tǒng)一的平臺(圖 2-1),支持多種類型的工作負(fù)載,如流式處理、交互式、圖形處理、機器學(xué)習(xí)和批處理。我們Spark 作業(yè)的運行速度比等效的 MapReduce 作業(yè)快許多倍,因為它具有快速的內(nèi)存中功能和高級 DAG(有向無環(huán)圖)執(zhí)行引擎。Spark是用斯卡拉語編寫的,因此它是Spark事實上的編程接口。我們將在本書中通篇使用 Scala。我們將在第7章中使用PySpark,即用于火花的Python API,用于分布式深度學(xué)習(xí)。

?圖 2-1Apache Spark?ecosystem


Cluster Managers(集群管理器)

集群管理器管理和分配集群資源。Spark 支持隨附于 Spark(獨立調(diào)度程序)、YARN、Mesos 和 Kubernetes 附帶的獨立集群管理器。

Architecture(建筑)

在較高級別,Spark 將 Spark 應(yīng)用程序任務(wù)的執(zhí)行分布在群集節(jié)點上(圖 2-2)。每個 Spark 應(yīng)用程序在其驅(qū)動程序中都有一個 SparkContext 對象。SparkContext 表示與集群管理器的連接,集群管理器為 Spark 應(yīng)用程序提供計算資源。連接到群集后,Spark 會獲取工作線程節(jié)點上的執(zhí)行程序。然后,Spark 將應(yīng)用程序代碼發(fā)送給執(zhí)行程序。應(yīng)用程序通常會運行一個或多個作業(yè)以響應(yīng) Spark 操作。然后,每個作業(yè)由 Spark 劃分為階段或任務(wù)的較小有向無環(huán)圖 (DAG)。然后,每個任務(wù)將分發(fā)并發(fā)送到工作線程節(jié)點上的執(zhí)行程序進(jìn)行執(zhí)行。

圖 2-2Apache Spark體系結(jié)構(gòu)
每個 Spark 應(yīng)用程序都有自己的一組執(zhí)行程序。由于來自不同應(yīng)用程序的任務(wù)在不同的 JVM 中運行,因此 Spark 應(yīng)用程序不會干擾另一個 Spark 應(yīng)用程序。這也意味著 Spark 應(yīng)用程序很難在不使用外部數(shù)據(jù)源(如 HDFS 或 S3)的情況下共享數(shù)據(jù)。使用堆外內(nèi)存存儲(如Tachyon(又名Alluxio)可以使數(shù)據(jù)共享更快,更輕松。我將在本章后面更詳細(xì)地討論阿盧克西奧。

執(zhí)行Spark?應(yīng)用程序

您可以使用交互式外殼程序(火花外殼程序或 pyspark)或提交應(yīng)用程序(火花提交)來執(zhí)行 Spark 應(yīng)用程序。有些人更喜歡使用基于Web的交互式筆記本,如阿帕奇齊柏林飛艇和朱皮特與Spark進(jìn)行交互。數(shù)據(jù)磚和云端等商業(yè)供應(yīng)商也提供自己的交互式筆記本環(huán)境。我將在整個章節(jié)中使用火花殼。有兩種部署模式可用于在具有集群管理器(如 YARN)的環(huán)境中啟動 Spark 應(yīng)用程序。

集群模式

在群集模式下,驅(qū)動程序在由 YARN 管理的應(yīng)用程序主服務(wù)器內(nèi)運行??蛻舳丝梢栽诓挥绊憫?yīng)用程序執(zhí)行的情況下退出。要在集群模式下啟動應(yīng)用程序或火花外殼:

spark-shell --master yarn --deploy-mode cluster spark-submit --class mypath.myClass --master yarn --deploy-mode cluster


客戶端模式

在客戶端模式下,驅(qū)動程序在客戶端中運行。應(yīng)用程序主服務(wù)器僅用于從 YARN 請求資源。要在客戶端模式下啟動應(yīng)用程序或火花外殼,

spark-shell --master yarn --deploy-mode client spark-submit --class mypath.myClass --master yarn --deploy-mode client


spark-shell簡介

通常使用交互式 shell 進(jìn)行即席數(shù)據(jù)分析或瀏覽。它也是學(xué)習(xí)火花 API 的好工具?;鸹ǖ慕换ナ酵鈿ぴ?Spark或Python中可用。在以下示例中,我們將創(chuàng)建城市的 RDD,并將它們?nèi)哭D(zhuǎn)換為大寫。當(dāng)您啟動火花外殼時,將自動創(chuàng)建一個名為“spark”的 SparkSession,如清單 2-1 所示。

spark-shell Spark context Web UI available at http://10.0.2.15:4041 Spark context available as 'sc' (master = local[*], app id = local-1574144576837). Spark session available as 'spark'. Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.4.4/_/ Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. scala>val myCities = sc.parallelize(List("tokyo","new york","sydney","san francisco")) scala>val uCities = myCities.map {x =>x.toUpperCase} scala>uCities.collect.foreach(println) TOKYO NEW YORK SYDNEY SAN FRANCISCO

Listing 2-1Introduction to spark-shell


SparkSession


如圖 2-2 所示,SparkContext 支持訪問所有 Spark 特性和功能。該驅(qū)動程序使用 SparkContext 訪問其他上下文,如流式處理上下文、SQL 上下文和配置項上下文。從 Spark 2.0 開始,火花會話提供了一個與 Spark 交互的單一入口點。通過 Spark1.x 中的 Spark 上下文、SQL 上下文、配置上下文和流式處理上下文提供的所有功能現(xiàn)在都可以通過 SparkSession 進(jìn)行訪問。七您可能仍然會遇到在 Spark 1.x 中編寫的代碼。在 Spark 1.x 中,你會寫出這樣的東西。

val sparkConf = new SparkConf().setAppName("MyApp").setMaster("local") val sc = new SparkContext(sparkConf).set("spark.executor.cores", "4") val sqlContext = new org.apache.spark.sql.SQLContext(sc)

在 Spark 2.x 中,您不必顯式創(chuàng)建火花組件、SparkContext 或 SQLContext,因為它們的所有功能都已包含在 SparkSession 中。

val spark = SparkSession. builder(). appName("MyApp"). config("spark.executor.cores", "4"). getOrCreate()


彈性分布式數(shù)據(jù)集 (RDD)


RDD 是跨群集中的一個或多個節(jié)點分區(qū)的對象的彈性不可變分布式集合。RDD 可以通過兩種類型的操作并行處理和操作:轉(zhuǎn)換和操作。

注意RDD 是 Spark 1.x 中 Spark 的主要編程接口,數(shù)據(jù)集已取代 RDD 成為從 Spark 2.0 開始的主 API。建議用戶從RDD切換到數(shù)據(jù)集/數(shù)據(jù)幀,因為編程接口更豐富,性能更好。我將在本章的后面部分討論數(shù)據(jù)集和數(shù)據(jù)幀。

Creating an RDD

創(chuàng)建 RDD 非常簡單。您可以從現(xiàn)有的 Scala 集合或通過讀取存儲在 HDFS 或 S3 中的外部文件來創(chuàng)建 RDD。

并行化

并行化從Scala集合創(chuàng)建 RDD。

val data = (1 to 5).toList val rdd = sc.parallelize(data) val cities = sc.parallelize(List("tokyo","new york","sydney","san francisco"))

textFile

文本文件從存儲在 HDFS 或 S3 中的文本文件創(chuàng)建 RDD。

val rdd = sc.textFile("hdfs://master01:9000/files/mydirectory") val rdd = sc.textFile("s3a://mybucket/files/mydata.csv")

請注意,RDD 是不可變的。數(shù)據(jù)轉(zhuǎn)換會生成另一個 RDD,而不是修改當(dāng)前的 RDD。RDD操作可分為兩類:轉(zhuǎn)換和操作。

Transformations(轉(zhuǎn)換)

轉(zhuǎn)換是創(chuàng)建新的 RDD 的操作。我描述了一些最常見的轉(zhuǎn)換。有關(guān)完整列表,請參閱聯(lián)機 Spark 文檔。

Map

映射對 RDD 中的每個元素執(zhí)行一個函數(shù)。它將創(chuàng)建并返回結(jié)果的新 RDD。地圖的返回類型不一定必須與原始 RDD 的類型相同。

val cities = sc.parallelize(List("tokyo","new york","paris","san francisco")) val upperCaseCities = myCities.map {x =>x.toUpperCase} upperCaseCities.collect.foreach(println) TOKYO NEW YORK PARIS SAN FRANCISCO

讓我們展示另一個地圖示例。

val lines = sc.parallelize(List("Michael Jordan", "iPhone")) val words = lines.map(line =>line.split(" ")) words.collect

res2: Array[Array[String]] = Array(Array(Michael, Jordan), Array(iPhone))?

FlatMap

平面映射對 RDD 中的每個元素執(zhí)行一個函數(shù),然后拼合結(jié)果。

val lines = sc.parallelize(List("Michael Jordan", "iPhone")) val words = lines.flatMap(line =>line.split(" ")) words.collect

res3: Array[String] = Array(Michael, Jordan, iPhone)?

Filter(濾波器)

篩選器返回僅包含與指定條件匹配的元素的 RDD。

val lines = sc.parallelize(List("Michael Jordan", "iPhone","Michael Corleone")) val words = lines.map(line =>line.split(" ")) val results = words.filter(w =>w.contains("Michael")) results.collect

res9: Array[Array[String]] = Array(Array(Michael, Jordan), Array(Michael, Corleone))?

Distinct

非重復(fù)值僅返回非重復(fù)值。

val cities1 = sc.parallelize(List("tokyo","tokyo","paris","sydney")) val cities2 = sc.parallelize(List("perth","tokyo","canberra","sydney")) val cities3 = cities1.union(cities2) cities3.distinct.collect.foreach(println)

sydney
perth
canberra
tokyo
paris?

ReduceByKey

使用指定的化簡函數(shù)將值與同一鍵組合在一起。

val pairRDD = sc.parallelize(List(("a", 1), ("b",2), ("c",3), ("a", 30), ("b",25), ("a",20))) val sumRDD = pairRDD.reduceByKey((x,y) =>x+y) sumRDD.collect

res15: Array[(String, Int)] = Array((b,27), (a,51), (c,3))?

Keys

密鑰返回僅包含密鑰的 RDD。

val rdd = sc.parallelize(List(("a", "Larry"), ("b", "Curly"), ("c", "Moe"))) val keys = rdd.keys keys.collect.foreach(println)

a
b
c?

Values

值返回僅包含值的 RDD。

val rdd = sc.parallelize(List(("a", "Larry"), ("b", "Curly"), ("c", "Moe"))) val value = rdd.values value.collect.foreach(println)

Larry
Curly
Moe


Inner Join

內(nèi)部連接根據(jù)連接謂詞返回來自兩個 RDD 的所有元素的 RDD。

val data = Array((100,"Jim Hernandez"), (101,"Shane King")) val employees = sc.parallelize(data) val data2 = Array((100,"Glendale"), (101,"Burbank")) val cities = sc.parallelize(data2) val data3 = Array((100,"CA"), (101,"CA"), (102,"NY")) val states = sc.parallelize(data3) val record = employees.join(cities).join(states) record.collect.foreach(println)

(100,((Jim Hernandez,Glendale),CA))
(101,((Shane King,Burbank),CA))


RightOuterJoin and LeftOuterJoin

右外聯(lián)從右側(cè) RDD 返回元素的 RDD,即使左側(cè) RDD 上沒有匹配的行也是如此。左外連接等效于右外聯(lián),列的順序不同。

val record = employees.join(cities).rightOuterJoin(states) record.collect.foreach(println)

(100,(Some((Jim Hernandez,Glendale)),CA))
(102,(None,NY))
(101,(Some((Shane King,Burbank)),CA))?

Union

Union返回一個 RDD,其中包含兩個或多個 RDD 的組合。

val data = Array((103,"Mark Choi","Torrance","CA"), (104,"Janet Reyes","RollingHills","CA")) val employees = sc.parallelize(data) val data = Array((105,"Lester Cruz","VanNuys","CA"), (106,"John White","Inglewood","CA")) val employees2 = sc.parallelize(data) val rdd = sc.union([employees, employees2]) rdd.collect.foreach(println)

(103,MarkChoi,Torrance,CA)
(104,JanetReyes,RollingHills,CA)
(105,LesterCruz,VanNuys,CA)
(106,JohnWhite,Inglewood,CA)?

Subtract(減去)

減去返回一個 RDD,該 RDD 僅包含第一個 RDD 中的元素。

val data = Array((103,"Mark Choi","Torrance","CA"), (104,"Janet Reyes","Rolling Hills","CA"),(105,"Lester Cruz","Van Nuys","CA")) val rdd = sc.parallelize(data) val data2 = Array((103,"Mark Choi","Torrance","CA")) val rdd2 = sc.parallelize(data2) val employees = rdd.subtract(rdd2) employees.collect.foreach(println)

(105,LesterCruz,Van Nuys,CA)
(104,JanetReyes,Rolling Hills,CA)?

Coalesce(合并)

合并可減少 RDD 中的分區(qū)數(shù)。您可能希望在對大型 RDD 執(zhí)行篩選后使用合并。雖然過濾減少了新RDD消耗的數(shù)據(jù)量,但它繼承了原始RDD的分區(qū)數(shù)。如果新的RDD明顯小于原始RDD,則它可能具有數(shù)百或數(shù)千個小分區(qū),這可能會導(dǎo)致性能問題。

當(dāng)您希望在寫入HDFS時減少Spark生成的文件數(shù)量時,合并也很有用,可以防止可怕的“小文件”問題。每個分區(qū)都作為單獨的文件寫入 HDFS。請注意,使用合并時可能會遇到性能問題,因為您在寫入 HDFS 時有效地降低了并行度。如果發(fā)生這種情況,請嘗試增加分區(qū)數(shù)。在以下示例中,我們只將一個 Parquet 文件寫入 HDFS。

df.coalesce(1).write.mode("append").parquet("/user/hive/warehouse/Mytable")

Repartition(重新分區(qū))

重新分區(qū)可以減少和增加RDD中的分區(qū)數(shù)。在減少分區(qū)時,通常會使用合并,因為它比重新分區(qū)更有效。增加分區(qū)數(shù)對于在寫入 HDFS 時提高并行度非常有用。在以下示例中,我們將六個 Parquet 文件寫入 HDFS。

df.repartition(6).write.mode("append").parquet("/user/hive/warehouse/Mytable")

注意:合并通常比重新分區(qū)快。重新分區(qū)將執(zhí)行完全隨機播放,創(chuàng)建新分區(qū)并在工作線程節(jié)點之間平均分布數(shù)據(jù)。合并可最大限度地減少數(shù)據(jù)移動,并通過使用現(xiàn)有分區(qū)避免完全隨機播放。

Actions(行動)

操作是向驅(qū)動程序返回值的 RDD 操作。我列出了一些最常見的操作。有關(guān)操作的完整列表,請參閱聯(lián)機 Spark 文檔。

Collect(收集)

Collect 將整個數(shù)據(jù)集作為數(shù)組返回到驅(qū)動程序。

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco")) myCities.collect

res2: Array[String] = Array(tokyo, new york, paris, san francisco)?

Count(計數(shù))

Count 返回數(shù)據(jù)集中元素數(shù)的計數(shù)。

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco")) myCities.count

res3: Long = 4

Take(拿)

Take 以數(shù)組的形式返回數(shù)據(jù)集的前 n 個元素。

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco")) myCities.take(2)

res4: Array[String] = Array(tokyo, new york)?


Foreach(前期)

Foreach 對數(shù)據(jù)集的每個元素執(zhí)行一個函數(shù)。

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco")) myCities.collect.foreach(println)

tokyo
newyork
paris
sanFrancisco?

Lazy Evaluation

Spark 支持惰性求值,這對于大數(shù)據(jù)處理至關(guān)重要。Spark 中的所有轉(zhuǎn)換都進(jìn)行了懶惰的評估。Spark 不會立即執(zhí)行轉(zhuǎn)換。您可以繼續(xù)定義更多轉(zhuǎn)換。當(dāng)您最終想要最終結(jié)果時,您將執(zhí)行一個操作,這將導(dǎo)致執(zhí)行轉(zhuǎn)換。

Caching(緩存)

默認(rèn)情況下,每次運行操作時都會重新執(zhí)行每個轉(zhuǎn)換。您可以使用緩存或持久化方法將 RDD 緩存在內(nèi)存中,以避免多次重新執(zhí)行轉(zhuǎn)換。

Accumulator(蓄電池)

累加器是僅“添加”到的變量。它們通常用于實現(xiàn)計數(shù)器。在此示例中,我使用累加器將數(shù)組的元素相加:

val accum = sc.longAccumulator("Accumulator 01") sc.parallelize(Array(10, 20, 30, 40)).foreach(x =>accum.add(x)) accum.value

res2: Long = 100

Broadcast Variable(廣播變量)

廣播變量是存儲在每個節(jié)點內(nèi)存中的只讀變量。Spark 使用高速廣播算法來減少復(fù)制廣播變量的網(wǎng)絡(luò)延遲。使用廣播變量不是將數(shù)據(jù)存儲在慢速存儲引擎(如 HDFS 或 S3)中,而是在每個節(jié)點上存儲數(shù)據(jù)集副本的更快方法。

val broadcastVar = sc.broadcast(Array(10, 20, 30)) broadcastVar.value

res0: Array[Int] = Array(10, 20, 30)?

Spark SQL、Dataset 和 DataFrames API


開發(fā) Spark SQL 是為了簡化處理和分析結(jié)構(gòu)化數(shù)據(jù)的過程。數(shù)據(jù)集類似于RDD,因為它支持強類型,但在引擎蓋下,數(shù)據(jù)集有一個更高效的引擎。從 Spark 2.0 開始,數(shù)據(jù)集 API 現(xiàn)在是主要的編程接口。數(shù)據(jù)幀只是一個具有命名列的數(shù)據(jù)集,類似于關(guān)系表。Spark SQL 和數(shù)據(jù)幀共同為處理和分析結(jié)構(gòu)化數(shù)據(jù)提供了強大的編程接口。下面是有關(guān)如何使用數(shù)據(jù)幀 API 的快速示例。

val jsonDF = spark.read.json("/jsondata/customers.json") jsonDF.show +---+------+--------------+-----+------+-----+ |age| city| name|state|userid| zip| +---+------+--------------+-----+------+-----+ | 35|Frisco| Jonathan West| TX| 200|75034| | 28|Dallas|Andrea Foreman| TX| 201|75001| | 69| Plano| Kirsten Jung| TX| 202|75025| | 52| Allen|Jessica Nguyen| TX| 203|75002| +---+------+--------------+-----+------+-----+ jsonDF.select ("age","city").show +---+------+ |age| city| +---+------+ | 35|Frisco| | 28|Dallas| | 69| Plano| | 52| Allen| +---+------+ jsonDF.filter($"userid" < 202).show() +---+------+--------------+-----+------+-----+ |age| city| name|state|userid| zip| +---+------+--------------+-----+------+-----+ | 35|Frisco| Jonathan West| TX| 200|75034| | 28|Dallas|Andrea Foreman| TX| 201|75001| +---+------+--------------+-----+------+-----+ jsonDF.createOrReplaceTempView("jsonDF") val df = spark.sql("SELECT userid, zip FROM jsonDF") df.show +------+-----+ |userid| zip| +------+-----+ | 200|75034| | 201|75001| | 202|75025| | 203|75002| +------+-----+

說明數(shù)據(jù)幀和數(shù)據(jù)集 API 已在 Spark 2.0 中統(tǒng)一。數(shù)據(jù)幀現(xiàn)在只是行數(shù)據(jù)集的類型別名,其中行是通用的非類型化對象。相反,數(shù)據(jù)集是強類型對象的集合 數(shù)據(jù)集[T].斯卡拉支持強類型和無類型化API,而在Java中,數(shù)據(jù)集[T]是主要的抽象。數(shù)據(jù)幀是 R 和 Python 的主要編程接口,因為它缺乏對編譯時類型安全性的支持。


Spark?數(shù)據(jù)源


讀取和寫入不同的文件格式和數(shù)據(jù)源是最常見的數(shù)據(jù)處理任務(wù)之一。我們將在示例中同時使用 RDD 和數(shù)據(jù)幀 API。

.CSV

Spark 為您提供了從 CSV 文件中讀取數(shù)據(jù)的不同方法。您可以先將數(shù)據(jù)讀入RDD,然后將其轉(zhuǎn)換為數(shù)據(jù)幀。

val dataRDD = sc.textFile("/sparkdata/customerdata.csv") val parsedRDD = dataRDD.map{_.split(",")} case class CustomerData(customerid: Int, name: String, city: String, state: String, zip: String) val dataDF = parsedRDD.map{ a =>CustomerData (a(0).toInt, a(1).toString, a(2).toString,a(3).toString,a(4).toString) }.toDF Starting in Spark 2.0, the CSV connector is already built-in. val dataDF = spark.read.format("csv").option("header", "true").load("/sparkdata/customerdata.csv")


.XML

數(shù)據(jù)磚有一個火花XML包,可以很容易地讀取XML數(shù)據(jù)。

cat users.xml <userid>100</userid><name>Wendell Ryan</name><city>San Diego</city><state>CA</state><zip>92102</zip> <userid>101</userid><name>Alicia Thompson</name><city>Berkeley</city><state>CA</state><zip>94705</zip> <userid>102</userid><name>Felipe Drummond</name><city>Palo Alto</city><state>CA</state><zip>94301</zip> <userid>103</userid><name>Teresa Levine</name><city>Walnut Creek</city><state>CA</state><zip>94507</zip> hadoop fs -mkdir /xmldata hadoop fs -put users.xml /xmldata spark-shell --packages??com.databricks:spark-xml_2.10:0.4.1 Create a DataFrame using Spark XML. In this example, we specify the row tag and the path in HDFS where the XML file is located. import com.databricks.spark.xml._ val xmlDF = spark.read.option("rowTag", "user").xml("/xmldata/users.xml"); xmlDF: org.apache.spark.sql.DataFrame = [city: string, name: string, state: string, userid: bigint, zip: bigint] Let’s also take a look at the data. xmlDF.show +------------+---------------+-----+------+-----+ |????????city|???????????name|state|userid|??zip| +------------+---------------+-----+------+-----+ |???San Diego|???Wendell Ryan|???CA|???100|92102| |????Berkeley|Alicia Thompson|???CA|???101|94705| |???Palo Alto|Felipe Drummond|???CA|???102|94301| |Walnut Creek|??Teresa Levine|???CA|???103|94507| +------------+---------------+-----+------+-----+

JSON

我們將創(chuàng)建一個 JSON 文件作為此示例的示例數(shù)據(jù)。確保該文件位于 HDFS 中名為 /json 數(shù)據(jù)的文件夾中。

cat users.json {"userid": 200, "name": "Jonathan West", "city":"Frisco", "state":"TX", "zip": "75034", "age":35} {"userid": 201, "name": "Andrea Foreman", "city":"Dallas", "state":"TX", "zip": "75001", "age":28} {"userid": 202, "name": "Kirsten Jung", "city":"Plano", "state":"TX", "zip": "75025", "age":69} {"userid": 203, "name": "Jessica Nguyen", "city":"Allen", "state":"TX", "zip": "75002", "age":52} Create a DataFrame from the JSON file. val jsonDF = spark.read.json("/jsondata/users.json") jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, city: string, name: string, state: string, userid: bigint, zip: string] Check the data. jsonDF.show +---+------+--------------+-----+------+-----+ |age|??city|??????????name|state|userid|??zip| +---+------+--------------+-----+------+-----+ | 35|Frisco| Jonathan West|???TX|???200|75034| | 28|Dallas|Andrea Foreman|???TX|???201|75001| | 69| Plano|??Kirsten Jung|???TX|???202|75025| | 52| Allen|Jessica Nguyen|???TX|???203|75002| +---+------+--------------+-----+------+-----+


關(guān)系數(shù)據(jù)庫和 MPP 數(shù)據(jù)庫

在此示例中,我們使用 MySQL,但也支持其他關(guān)系數(shù)據(jù)庫和 MPP 引擎,如甲骨文、雪花、紅移、黑斑羚、普雷斯托和 Azure DW。通常,只要關(guān)系數(shù)據(jù)庫具有 JDBC 驅(qū)動程序,就應(yīng)該可以從 Spark 訪問它。性能取決于 JDBC 驅(qū)動程序?qū)ε幚聿僮鞯闹С?。有關(guān)更多詳細(xì)信息,請查看 JDBC 驅(qū)動程序的文檔。

mysql -u root -pmypassword create databases salesdb; use salesdb; create table customers ( customerid INT, name VARCHAR(100), city VARCHAR(100), state CHAR(3), zip??CHAR(5)); spark-shell --driver-class-path mysql-connector-java-5.1.40-bin.jar

啟動 spark-shell.

將 CSV 文件讀入 RDD 并將其轉(zhuǎn)換為數(shù)據(jù)幀。

val dataRDD = sc.textFile("/home/hadoop/test.csv") val parsedRDD = dataRDD.map{_.split(",")} case class CustomerData(customerid: Int, name: String, city: String, state: String, zip: String) val dataDF = parsedRDD.map{ a =>CustomerData (a(0).toInt, a(1).toString, a(2).toString,a(3).toString,a(4).toString) }.toDF

將數(shù)據(jù)框注冊為臨時表,以便我們可以對其運行 SQL 查詢。

dataDF.createOrReplaceTempView("dataDF")

讓我們設(shè)置連接屬性。

val jdbcUsername = "myuser" val jdbcPassword = "mypass" val jdbcHostname = "10.0.1.112" val jdbcPort = 3306 val jdbcDatabase ="salesdb" val jdbcrewriteBatchedStatements = "true" val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}&rewriteBatchedStatements=${jdbcrewriteBatchedStatements}" val connectionProperties = new java.util.Properties()


這將使我們能夠指定正確的保存模式 - 追加,覆蓋等。

import org.apache.spark.sql.SaveMode


將 SELECT 語句返回的數(shù)據(jù)插入到存儲在 MySQL salesdb 數(shù)據(jù)庫中的客戶表中。

spark.sql("select * from dataDF").write.mode(SaveMode.Append).jdbc(jdbcUrl, "customers", connectionProperties)

讓我們使用 JDBC 讀取一個表。讓我們確保用一些測試數(shù)據(jù)填充MySQL中的用戶表。

mysql -u root -pmypassword use salesdb; describe users; +--------+--------------+------+-----+---------+-------+ | Field??| Type?????????| Null | Key | Default | Extra | +--------+--------------+------+-----+---------+-------+ | userid | bigint(20)???| YES??|?????| NULL????|???????| | name???| varchar(100) | YES??|?????| NULL????|???????| | city???| varchar(100) | YES??|?????| NULL????|???????| | state??| char(3)??????| YES??|?????| NULL????|???????| | zip????| char(5)??????| YES??|?????| NULL????|???????| | age????| tinyint(4)???| YES??|?????| NULL????|???????| +--------+--------------+------+-----+---------+-------+ select * from users; Empty set (0.00 sec) insert into users values (300,'Fred Stevens','Torrance','CA',90503,23); insert into users values (301,'Nancy Gibbs','Valencia','CA',91354,49); insert into users values (302,'Randy Park','Manhattan Beach','CA',90267,21); insert into users values (303,'Victoria Loma','Rolling Hills','CA',90274,75); select * from users; +--------+---------------+-----------------+-------+-------+------+ | userid | name??????????| city????????????| state | zip???| age??| +--------+---------------+-----------------+-------+-------+------+ |????300 | Fred Stevens??| Torrance????????| CA????| 90503 |???23 | |????301 | Nancy Gibbs???| Valencia????????| CA????| 91354 |???49 | |????302 | Randy Park????| Manhattan Beach | CA????| 90267 |???21 | |????303 | Victoria Loma | Rolling Hills???| CA????| 90274 |???75 | +--------+---------------+-----------------+-------+-------+------+ spark-shell --driver-class-path mysql-connector-java-5.1.40-bin.jar --jars mysql-connector-java-5.1.40-bin.jar


讓我們設(shè)置 jdbcurl 和連接屬性。

val jdbcURL = s"jdbc:mysql://10.0.1.101:3306/salesdb?user=myuser&password=mypass" val connectionProperties = new java.util.Properties()


我們可以從整個表創(chuàng)建數(shù)據(jù)幀。

val df = spark.read.jdbc(jdbcURL, "users", connectionProperties) df.show +------+-------------+---------------+-----+-----+---+ |userid|?????????name|???????????city|state|??zip|age| +------+-------------+---------------+-----+-----+---+ |???300| Fred Stevens|???????Torrance|???CA|90503| 23| |???301|??Nancy Gibbs|???????Valencia|???CA|91354| 49| |???302|???Randy Park|Manhattan Beach|???CA|90267| 21| |???303|Victoria Loma|??Rolling Hills|???CA|90274| 75| +------+-------------+---------------+-----+-----+---+

Parquet

閱讀和寫入Parquet很簡單。

val df = spark.read.load("/sparkdata/employees.parquet") df.select("id","firstname","lastname","salary").write.format("parquet").save("/sparkdata/myData.parquet") You can run SELECT statements on Parquet files directly. val df = spark.sql("SELECT * FROM parquet.`/sparkdata/myData.parquet`")

HBase

有多種方法可以從星火訪問 HBase。例如,可以使用“存儲”數(shù)據(jù)集將數(shù)據(jù)寫入 HBase。啟動 HBase 外殼。

創(chuàng)建一個 HBase 表并用測試數(shù)據(jù)填充它。

hbase shell create 'users', 'cf1'


啟動spark-shell.

spark-shell val hconf = HBaseConfiguration.create() val jobConf = new JobConf(hconf, this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"users") val num = sc.parallelize(List(1,2,3,4,5,6)) val theRDD = num.filter.map(x=>{val rowkey = "row" + x val put = new Put(Bytes.toBytes(rowkey))put.add(Bytes.toBytes("cf1"), Bytes.toBytes("fname"), Bytes.toBytes("my fname" + x))(newImmutableBytesWritable, put) }) theRDD.saveAsHadoopDataset(jobConf)


您還可以使用來自 Spark 的 HBase 客戶端 API 將數(shù)據(jù)讀取和寫入 HBase。如前所述,斯卡拉可以訪問所有 Java 庫。

啟動 HBase 外殼。創(chuàng)建另一個 HBase 表,并用測試數(shù)據(jù)填充它。

hbase shell create 'employees', 'cf1' put 'employees','400','cf1:name', 'Patrick Montalban' put 'employees','400','cf1:city', 'Los Angeles' put 'employees','400','cf1:state', 'CA' put 'employees','400','cf1:zip', '90010' put 'employees','400','cf1:age', '71' put 'employees','401','cf1:name', 'Jillian Collins' put 'employees','401','cf1:city', 'Santa Monica' put 'employees','401','cf1:state', 'CA' put 'employees','401','cf1:zip', '90402' put 'employees','401','cf1:age', '45' put 'employees','402','cf1:name', 'Robert Sarkisian' put 'employees','402','cf1:city', 'Glendale' put 'employees','402','cf1:state', 'CA' put 'employees','402','cf1:zip', '91204' put 'employees','402','cf1:age', '29' put 'employees','403','cf1:name', 'Warren Porcaro' put 'employees','403','cf1:city', 'Burbank' put 'employees','403','cf1:state', 'CA' put 'employees','403','cf1:zip', '91523' put 'employees','403','cf1:age', '62'


讓我們驗證數(shù)據(jù)是否已成功插入到 HBase 表中。

scan 'employees' ROW???????COLUMN+CELL400??????column=cf1:age, timestamp=1493105325812, value=71400??????column=cf1:city, timestamp=1493105325691, value=Los Angeles400??????column=cf1:name, timestamp=1493105325644, value=Patrick Montalban400??????column=cf1:state, timestamp=1493105325738, value=CA400??????column=cf1:zip, timestamp=1493105325789, value=90010401??????column=cf1:age, timestamp=1493105334417, value=45401??????column=cf1:city, timestamp=1493105333126, value=Santa Monica401??????column=cf1:name, timestamp=1493105333050, value=Jillian Collins401??????column=cf1:state, timestamp=1493105333145, value=CA401??????column=cf1:zip, timestamp=1493105333165, value=90402402??????column=cf1:age, timestamp=1493105346254, value=29402??????column=cf1:city, timestamp=1493105345053, value=Glendale402??????column=cf1:name, timestamp=1493105344979, value=Robert Sarkisian402??????column=cf1:state, timestamp=1493105345074, value=CA402??????column=cf1:zip, timestamp=1493105345093, value=91204403??????column=cf1:age, timestamp=1493105353650, value=62403??????column=cf1:city, timestamp=1493105352467, value=Burbank403??????column=cf1:name, timestamp=1493105352445, value=Warren Porcaro403??????column=cf1:state, timestamp=1493105352513, value=CA403??????column=cf1:zip, timestamp=1493105352549, value=91523


啟動spark-shell.

spark-shell import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; val configuration = HBaseConfiguration.create()

指定 HBase 表和行鍵。

val table = new HTable(configuration, "employees"); val g = new Get(Bytes.toBytes("401")) val result = table.get(g);

從表中提取值。

val val2 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")); val val3 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("city")); val val4 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("state")); val val5 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("zip")); val val6 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age"));


將值轉(zhuǎn)換為適當(dāng)?shù)臄?shù)據(jù)類型。

val id = Bytes.toString(result.getRow()) val name = Bytes.toString(val2); val city = Bytes.toString(val3); val state = Bytes.toString(val4); val zip = Bytes.toString(val5); val age = Bytes.toShort(val6);

打印值。

println(" employee id: " + id + " name: " + name + " city: " + city + " state: " + state + " zip: " + zip + " age: " + age); employee id: 401 name: Jillian Collins city: Santa Monica state: CA zip: 90402 age: 13365

讓我們使用 HBase API 寫入 HBase。

val configuration = HBaseConfiguration.create() val table = new HTable(configuration, "employees");

指定新的行鍵。

val p = new Put(new String("404").getBytes());

用新值填充單元格。

p.add("cf1".getBytes(), "name".getBytes(), new String("Denise Shulman").getBytes()); p.add("cf1".getBytes(), "city".getBytes(), new String("La Jolla").getBytes()); p.add("cf1".getBytes(), "state".getBytes(), new String("CA").getBytes()); p.add("cf1".getBytes(), "zip".getBytes(), new String("92093").getBytes()); p.add("cf1".getBytes(), "age".getBytes(), new String("56").getBytes());

寫入 H 庫表。

table.put(p); table.close();


確認(rèn)這些值已成功插入到 HBase 表中。

hbase shell scan 'employees' ROW???????COLUMN+CELL400??????column=cf1:age, timestamp=1493105325812, value=71400??????column=cf1:city, timestamp=1493105325691, value=Los Angeles400??????column=cf1:name, timestamp=1493105325644, value=Patrick Montalban400??????column=cf1:state, timestamp=1493105325738, value=CA400??????column=cf1:zip, timestamp=1493105325789, value=90010401??????column=cf1:age, timestamp=1493105334417, value=45401??????column=cf1:city, timestamp=1493105333126, value=Santa Monica401??????column=cf1:name, timestamp=1493105333050, value=Jillian Collins401??????column=cf1:state, timestamp=1493105333145, value=CA401??????column=cf1:zip, timestamp=1493105333165, value=90402402??????column=cf1:age, timestamp=1493105346254, value=29402??????column=cf1:city, timestamp=1493105345053, value=Glendale402??????column=cf1:name, timestamp=1493105344979, value=Robert Sarkisian402??????column=cf1:state, timestamp=1493105345074, value=CA402??????column=cf1:zip, timestamp=1493105345093, value=91204403??????column=cf1:age, timestamp=1493105353650, value=62403??????column=cf1:city, timestamp=1493105352467, value=Burbank403??????column=cf1:name, timestamp=1493105352445, value=Warren Porcaro403??????column=cf1:state, timestamp=1493105352513, value=CA403??????column=cf1:zip, timestamp=1493105352549, value=91523404??????column=cf1:age, timestamp=1493123890714, value=56404??????column=cf1:city, timestamp=1493123890714, value=La Jolla404??????column=cf1:name, timestamp=1493123890714, value=Denise Shulman404??????column=cf1:state, timestamp=1493123890714, value=CA404??????column=cf1:zip, timestamp=1493123890714, value=92093

雖然通常較慢,但您也可以通過SQL查詢引擎(如黑斑羚或普雷斯托)訪問HBase。

Amazon S3

Amazon S3 是一種常用的對象存儲,經(jīng)常用作瞬態(tài)集群的數(shù)據(jù)存儲。它也是用于備份和冷數(shù)據(jù)的經(jīng)濟高效的存儲。從 S3 讀取數(shù)據(jù)就像從 HDFS 或任何其他文件系統(tǒng)讀取數(shù)據(jù)一樣。

從亞馬遜 S3 讀取 CSV 文件。確保您已配置 S3 憑證。

val myCSV = sc.textFile("s3a://mydata/customers.csv")

將 CSV 數(shù)據(jù)映射到 RDD。

import org.apache.spark.sql.Row val myRDD = myCSV.map(_.split(',')).map(e ? Row(r(0).trim.toInt, r(1), r(2).trim.toInt, r(3)))

創(chuàng)建架構(gòu)。

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}; val mySchema = StructType(Array( StructField("customerid",IntegerType,false), StructField("customername",StringType,false), StructField("age",IntegerType,false), StructField("city",StringType,false))) val myDF = spark.createDataFrame(myRDD, mySchema)

Solr

您可以使用SolrJ從火花與索爾進(jìn)行交互。

import java.net.MalformedURLException; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocumentList; val solr = new HttpSolrServer("http://master02:8983/solr/mycollection"); val query = new SolrQuery(); query.setQuery("*:*"); query.addFilterQuery("userid:3"); query.setFields("userid","name","age","city"); query.setStart(0); query.set("defType", "edismax"); val response = solr.query(query); val results = response.getResults(); println(results);

從 Spark 訪問 Solr 集合的一種更簡單的方法是通過火花 solr 包。清醒工場啟動了火花索拉項目,以提供火花索爾集成。九與SolrJ相比,使用火花索拉要容易得多,功能也強大得多,允許您從Solr集合創(chuàng)建數(shù)據(jù)幀。

首先從?spark-shell導(dǎo)入 JAR 文件。

spark-shell --jars spark-solr-3.0.1-shaded.jar

指定集合和連接信息。

val options = Map( "collection" -> "mycollection","zkhost" -> "{ master02:8983/solr}")

創(chuàng)建數(shù)據(jù)幀。

val solrDF = spark.read.format("solr").options(options).load

Microsoft Excel

雖然從Spark訪問Excel電子表格是我通常不推薦的,但某些用例需要該功能。一家名為 Crealytics 的公司開發(fā)了一個用于與 Excel 交互的 Spark 插件。該庫需要火花 2.x??梢允褂?--包命令行選項添加包。

spark-shell --packages com.crealytics:spark-excel_2.11:0.9.12

從 Excel 工作表創(chuàng)建數(shù)據(jù)幀。

val ExcelDF = spark.read.format("com.crealytics.spark.excel").option("sheetName", "sheet1").option("useHeader", "true").option("inferSchema", "true").option("treatEmptyValuesAsNulls", "true").load("budget.xlsx")


Write a DataFrame to an Excel worksheet.

ExcelDF2.write.format("com.crealytics.spark.excel").option("sheetName", "sheet1").option("useHeader", "true").mode("overwrite").save("budget2.xlsx")


You can find more details from their GitHub page: github.com/crealytics.

Secure FTP(安全 FTP)


從 SFTP 服務(wù)器下載文件并將其寫入數(shù)據(jù)幀也是一個常見的請求。彈簧ML提供了一個火花SFTP連接器庫。該庫需要 Spark 2.x,并利用 jsch,這是 SSH2 的 Java 實現(xiàn)。讀取和寫入 SFTP 服務(wù)器將作為單個進(jìn)程執(zhí)行。

spark-shell --packages com.springml:spark-sftp_2.11:1.1.

從 SFTP 服務(wù)器中的文件創(chuàng)建數(shù)據(jù)幀。

val sftpDF = spark.read.format("com.springml.spark.sftp").option("host", "sftpserver.com").option("username", "myusername").option("password", "mypassword").option("inferSchema", "true").option("fileType", "csv").option("delimiter", ",").load("/myftp/myfile.csv")

將數(shù)據(jù)幀作為 CSV 文件寫入 FTP 服務(wù)器。

sftpDF2.write.format("com.springml.spark.sftp").option("host", "sftpserver.com").option("username", "myusername").option("password", "mypassword").option("fileType", "csv").option("delimiter", ",").save("/myftp/myfile.csv")

您可以從他們的GitHub頁面找到更多詳細(xì)信息:github.com/springml/spark-sftp。

簡介Spark MLlib

機器學(xué)習(xí)是 Spark 的主要應(yīng)用之一。Spark MLlib 包括用于回歸、分類、聚類、協(xié)同過濾和頻繁模式挖掘的常用機器學(xué)習(xí)算法。它還提供了一組廣泛的功能,用于構(gòu)建管道、模型選擇和調(diào)整以及功能選擇、提取和轉(zhuǎn)換。

Spark MLlib算法

Spark MLlib包含大量用于各種任務(wù)的機器學(xué)習(xí)算法。我們將在后續(xù)章節(jié)中介紹其中的大多數(shù)內(nèi)容。

分類

  • 邏輯回歸(二項式和多項式)
  • 決策樹
  • 隨機森林
  • 梯度提升樹
  • 多層感知器
  • 線性支持向量機
  • 樸素貝葉斯
  • One-vs.-Rest

回歸

  • 線性回歸
  • 決策樹
  • 隨機森林
  • 梯度提升樹
  • Survival 回歸
  • Isotonic 回歸

聚類

  • K-Means
  • Bisecting K-Means(對 K 均值一分為二)
  • 高斯混合模型(GMM)
  • Latent Dirichlet Allocation (LDA)

協(xié)同過濾

  • Alternating Least Square (ALS)交替最小二乘法

Frequent Pattern Mining(頻繁的模式挖掘)

  • FP-Growth
  • PrefixSpan

ML Pipelines

Spark MLlib 的早期版本僅包含基于 RDD 的 API。基于數(shù)據(jù)幀的 API 現(xiàn)在是 Spark 的主要 API。一旦基于數(shù)據(jù)幀的 API 達(dá)到功能奇偶校驗,基于 RDD 的 API 將在 Spark 2.3 中被棄用。x基于 RDD 的 API 將在 Spark 3.0 中刪除。基于 DataFrames 的 API 通過提供用于表示類似于關(guān)系數(shù)據(jù)庫表的表格數(shù)據(jù)的更高級抽象,使轉(zhuǎn)換功能變得容易,使其成為實現(xiàn)管道的自然選擇。

Spark MLlib API 引入了幾個用于創(chuàng)建機器學(xué)習(xí)管道的概念。圖 2-3 顯示了用于處理文本數(shù)據(jù)的簡單 Spark MLlib 管道。分詞器將文本分解為一個單詞袋,將單詞附加到輸出數(shù)據(jù)幀。術(shù)語頻率-反向文檔頻率 (TF–IDF) 將數(shù)據(jù)幀作為輸入,將單詞包轉(zhuǎn)換為特征向量,并將其添加到第三個數(shù)據(jù)幀中。

?圖 2-3一個簡單的火花 MLlib 流水線


Pipeline


管道是用于創(chuàng)建機器學(xué)習(xí)工作流的一系列連接階段。級可以是變壓器或估計器。

Transformer


轉(zhuǎn)換器將數(shù)據(jù)幀作為輸入,并輸出一個新的數(shù)據(jù)幀,并將附加列追加到新數(shù)據(jù)幀。新的數(shù)據(jù)幀包括輸入數(shù)據(jù)幀中的列和其他列。

Estimator

估計器是一種機器學(xué)習(xí)算法,用于根據(jù)訓(xùn)練數(shù)據(jù)擬合模型。估計器接受訓(xùn)練數(shù)據(jù)并生成機器學(xué)習(xí)模型。

ParamGridBuilder

參數(shù)網(wǎng)格生成器用于構(gòu)建參數(shù)網(wǎng)格。交叉驗證器執(zhí)行網(wǎng)格搜索,并使用參數(shù)網(wǎng)格中用戶指定的超參數(shù)組合來訓(xùn)練模型。

交叉驗證器

交叉驗證器交叉評估擬合的機器學(xué)習(xí)模型,并通過嘗試使用用戶指定的超參數(shù)組合擬合基礎(chǔ)估計器來輸出最佳模型。使用交叉驗證器或訓(xùn)練驗證拆分估計器執(zhí)行模型選擇。

Evaluator

評估器計算機器學(xué)習(xí)模型的性能。它輸出精度和召回率等指標(biāo),以衡量擬合模型的性能。賦值器的示例包括分別用于二元分類和多類分類任務(wù)的二元分類計算器和多分類計算器,以及用于回歸任務(wù)的回歸計算器。

特征提取、轉(zhuǎn)換和選擇

大多數(shù)情況下,在使用原始數(shù)據(jù)擬合模型之前,需要進(jìn)行額外的預(yù)處理。例如,基于距離的算法要求對要素進(jìn)行標(biāo)準(zhǔn)化。當(dāng)分類數(shù)據(jù)是單熱編碼時,某些算法的性能更好。文本數(shù)據(jù)通常需要標(biāo)記化和特征矢量化。對于非常大的數(shù)據(jù)集,可能需要降維。Spark MLlib 包括用于這些類型任務(wù)的大量變壓器和估計器。我將討論 Spark MLlib 中一些最常用的變壓器和估計器。

字符串索引器

大多數(shù)機器學(xué)習(xí)算法不能直接處理字符串,并要求數(shù)據(jù)采用數(shù)字格式。字符串索引器是將標(biāo)簽的字符串列轉(zhuǎn)換為索引的估計器。它支持四種不同的方法來生成索引:字母數(shù)據(jù)庫、字母表索引、頻率精確度和頻率輔助碼。默認(rèn)值設(shè)置為頻率Desc,最頻繁的標(biāo)簽設(shè)置為 0,結(jié)果按標(biāo)簽頻率的降序排序。

import org.apache.spark.ml.feature.StringIndexer val df = spark.createDataFrame(Seq((0, "car"), (1, "car"), (2, "truck"), (3, "van"), (4, "van"), (5, "van")) ).toDF("id", "class") df.show +---+-----+ | id|class| +---+-----+ |??0|??car| |??1|??car| |??2|truck| |??3|??van| |??4|??van| |??5|??van| +---+-----+ val model = new StringIndexer().setInputCol("class").setOutputCol("classIndex") val indexer = model.fit(df) val indexed = indexer.transform(df) indexed.show() +---+-----+----------+ | id|class|classIndex| +---+-----+----------+ |??0|??car|???????1.0| |??1|??car|???????1.0| |??2|truck|???????2.0| |??3|??van|???????0.0| |??4|??van|???????0.0| |??5|??van|???????0.0| +---+-----+----------+

Tokenizer(分詞器)

在分析文本數(shù)據(jù)時,通常必須將句子拆分為單獨的術(shù)語或單詞。分詞器正是這樣做的。您可以使用正則表達(dá)式執(zhí)行更高級的標(biāo)記化,使用正則表達(dá)式。標(biāo)記化通常是機器學(xué)習(xí) NLP 管道中的第一步。我將在第 4 章中更詳細(xì)地討論自然語言處理 (NLP)。

import org.apache.spark.ml.feature.Tokenizer val df = spark.createDataFrame(Seq((0, "Mark gave a speech last night in Laguna Beach"),(1, "Oranges are full of nutrients and low in calories"),(2, "Eddie Van Halen is amazing") )).toDF("id", "sentence")df.show(false) +---+-------------------------------------------------+ |id |sentence?????????????????????????????????????????| +---+-------------------------------------------------+ |0??|Mark gave a speech last night in Laguna Beach????| |1??|Oranges are full of nutrients and low in calories| |2??|Eddie Van Halen is amazing???????????????????????| +---+-------------------------------------------------+ val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") val tokenized = tokenizer.transform(df) tokenized.show(false) +---+-------------------------------------------------+ |id |sentence?????????????????????????????????????????| +---+-------------------------------------------------+ |0??|Mark gave a speech last night in Laguna Beach????| |1??|Oranges are full of nutrients and low in calories| |2??|Eddie Van Halen is amazing???????????????????????| +---+-------------------------------------------------+ +-----------------------------------------------------------+ |words??????????????????????????????????????????????????????| +-----------------------------------------------------------+ |[mark, gave, a, speech, last, night, in, laguna, beach]????| |[oranges, are, full, of, nutrients, and, low, in, calories]| |[eddie, van, halen, is, amazing]???????????????????????????| +-----------------------------------------------------------+

VectorAssembler(矢量裝配器)

Spark MLlib 算法要求將要素存儲在單個向量列中。通常,訓(xùn)練數(shù)據(jù)將以表格格式提供,其中數(shù)據(jù)存儲在單獨的列中。矢量組件是將一組列合并為單個矢量列的轉(zhuǎn)換器。

import org.apache.spark.ml.feature.VectorAssembler val df = spark.createDataFrame(Seq((0, 50000, 7, 1)) ).toDF("id", "income", "employment_length", "marital_status") val assembler = new VectorAssembler() .setInputCols(Array("income", "employment_length", "marital_status")) .setOutputCol("features") val df2 = assembler.transform(df) df2.show(false) +---+------+-----------------+--------------+-----------------+ |id |income|employment_length|marital_status|features?????????| +---+------+-----------------+--------------+-----------------+ |0??|50000 |7????????????????|1?????????????|[50000.0,7.0,1.0]| +---+------+-----------------+--------------+-----------------+

StandardScaler(標(biāo)準(zhǔn)標(biāo)定器)

如第1章所述,一些機器學(xué)習(xí)算法需要對特征進(jìn)行規(guī)范化才能正常工作。標(biāo)準(zhǔn)標(biāo)度器是一種估計器,用于將要素歸一化為具有單位標(biāo)準(zhǔn)差和/或零均值。它接受兩個參數(shù):使用Std和使用Mean.使用Std將特征縮放為單位標(biāo)準(zhǔn)差。默認(rèn)情況下,此參數(shù)設(shè)置為 true。在縮放之前,將 Mean 設(shè)置為以平均值為真中心。默認(rèn)情況下,此參數(shù)設(shè)置為 false。

import org.apache.spark.ml.feature.StandardScaler import org.apache.spark.ml.feature.VectorAssembler val df = spark.createDataFrame(Seq((0, 186, 200, 56),(1, 170, 198, 42)) ).toDF("id", "height", "weight", "age") val assembler = new VectorAssembler() .setInputCols(Array("height", "weight", "age")) .setOutputCol("features") val df2 = assembler.transform(df) df2.show(false) +---+------+------+---+------------------+ |id |height|weight|age|features??????????| +---+------+------+---+------------------+ |0??|186???|200???|56 |[186.0,200.0,56.0]| |1??|170???|198???|42 |[170.0,198.0,42.0]| +---+------+------+---+------------------+ val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false) val model = scaler.fit(df2) val scaledData = model.transform(df2) scaledData.select("features","scaledFeatures").show(false) +------------------+------------------------------------------------------+ |features??????????|scaledFeatures????????????????????????????????????????| +------------------+------------------------------------------------------+ |[186.0,200.0,56.0]|[16.440232662587228,141.42135623730948,5.656854249492]| |[170.0,198.0,42.0]|[15.026019100214134,140.0071426749364,4.2426406871192]| +------------------+------------------------------------------------------+

用于重新縮放數(shù)據(jù)的附加變壓器包括歸一化器、最小值標(biāo)定器和最大Abs標(biāo)定器。有關(guān)更多詳細(xì)信息,請查看 Apache Spark 在線文檔。

StopWordsRemover(停止字切換)

通常在文本分析中使用,停止字切換從字符串序列中刪除非索引字。停用詞(如 I、the 和 a)對文檔的含義沒有多大貢獻(xiàn)。

import org.apache.spark.ml.feature.StopWordsRemover val remover = new StopWordsRemover().setInputCol("data").setOutputCol("output") val dataSet = spark.createDataFrame(Seq((0, Seq("She", "is", "a", "cute", "baby")),(1, Seq("Bob", "never", "went", "to", "Seattle")) )).toDF("id", "data") val df = remover.transform(dataSet) df.show(false) +---+-------------------------------+---------------------------+ |id |data???????????????????????????|output?????????????????????| +---+-------------------------------+---------------------------+ |0??|[She, is, a, cute, baby]???????|[cute, baby]???????????????| |1??|[Bob, never, went, to, Seattle]|[Bob, never, went, Seattle]| +---+-------------------------------+---------------------------+


n-gram

When performing text analysis, it is sometimes advantageous to combine terms into n-grams, a combination of terms in a document. Creating n-grams helps extract more meaningful information from a document. For example, the words “San” and “Diego” individually don’t mean much, but combining them into a bigram “San Diego” provides more context. We use n-gram later in Chapter 4.

import org.apache.spark.ml.feature.NGram val df = spark.createDataFrame(Seq((0, Array("Los", "Angeles", "Lobos", "San", "Francisco")),(1, Array("Stand", "Book", "Case", "Phone", "Mobile", "Magazine")),(2, Array("Deep", "Learning", "Machine", "Algorithm", "Pizza")) )).toDF("id", "words") val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams") val df2 = ngram.transform(df) df2.select("ngrams").show(false) +---------------------------------------------------------------------+ |ngrams???????????????????????????????????????????????????????????????| +---------------------------------------------------------------------+ |[Los Angeles, Angeles Lobos, Lobos San, San Francisco]???????????????| |[Stand Book, Book Case, Case Phone, Phone Mobile, Mobile Magazine]???| |[Deep Learning, Learning Machine, Machine Algorithm, Algorithm Pizza]| +---------------------------------------------------------------------+

OneHotEncoderEstimator

One-hot 編碼將分類特征轉(zhuǎn)換為二進(jìn)制向量,其中最多只有一個值,表示所有特征集中存在特定特征值。西單熱編碼分類變量是許多機器學(xué)習(xí)算法(如邏輯回歸和支持向量機)的要求。OneHot編碼器測試器可以隱藏多個列,為每個輸入列生成一個熱編碼的向量列。

import org.apache.spark.ml.feature.StringIndexer val df = spark.createDataFrame(Seq((0, "Male"), (1, "Male"), (2, "Female"), (3, "Female"), (4, "Female"), (5, "Male")) ).toDF("id", "gender") df.show() +---+------+ | id|gender| +---+------+ |??0|??Male| |??1|??Male| |??2|Female| |??3|Female| |??4|Female| |??5|??Male| +---+------+ val indexer = new StringIndexer().setInputCol("gender").setOutputCol("genderIndex") val indexed = indexer.fit(df).transform(df) indexed.show() +---+------+-----------+ | id|gender|genderIndex| +---+------+-----------+ |??0|??Male|????????1.0| |??1|??Male|????????1.0| |??2|Female|????????0.0| |??3|Female|????????0.0| |??4|Female|????????0.0| |??5|??Male|????????1.0| +---+------+-----------+ import org.apache.spark.ml.feature.OneHotEncoderEstimator val encoder = new OneHotEncoderEstimator().setInputCols(Array("genderIndex")).setOutputCols(Array("genderEnc")) val encoded = encoder.fit(indexed).transform(indexed) encoded.show() +---+------+-----------+-------------+ | id|gender|genderIndex|????genderEnc| +---+------+-----------+-------------+ |??0|??Male|????????1.0|????(1,[],[])| |??1|??Male|????????1.0|????(1,[],[])| |??2|Female|????????0.0|(1,[0],[1.0])| |??3|Female|????????0.0|(1,[0],[1.0])| |??4|Female|????????0.0|(1,[0],[1.0])| |??5|??Male|????????1.0|????(1,[],[])| +---+------+-----------+-------------+


SQLTransformer


SQL轉(zhuǎn)換器允許您使用SQL執(zhí)行數(shù)據(jù)轉(zhuǎn)換。虛擬表“__THIS__”對應(yīng)于輸入數(shù)據(jù)集。

import org.apache.spark.ml.feature.SQLTransformer val df = spark.createDataFrame(Seq((0, 5.2, 6.7), (2, 25.5, 8.9))).toDF("id", "col1", "col2") val transformer = new SQLTransformer().setStatement("SELECT ABS(col1 - col2) as c1, MOD(col1, col2) as c2 FROM __THIS__") val df2 = transformer.transform(df) df2.show() +----+-----------------+ |??c1|???????????????c2| +----+-----------------+ | 1.5|??????????????5.2| |16.6|7.699999999999999| +----+-----------------+


Term Frequency–Inverse Document Frequency (TF–IDF)
TF–IDF or term frequency–inverse document frequency is a feature vectorization method commonly used in text analysis. It is frequently used to indicate the importance of a term or word to a document in the corpus. A transformer, HashingTF, uses feature hashing to convert terms into feature vectors. An estimator, IDF, scales the vectors generated by the HashingTF (or CountVectorizer). I discuss TF–IDF in greater detail in Chapter 4.

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer} val df = spark.createDataFrame(Seq((0, "Kawhi Leonard is the league MVP"),(1, "Caravaggio pioneered the Baroque technique"),(2, "Using Apache Spark is cool") )).toDF("label", "sentence") df.show(false) +-----+------------------------------------------+ |label|sentence??????????????????????????????????| +-----+------------------------------------------+ |0????|Kawhi Leonard is the league MVP???????????| |1????|Caravaggio pioneered the Baroque technique| |2????|Using Apache Spark is cool????????????????| +-----+------------------------------------------+ val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") val df2 = tokenizer.transform(df) df2.select("label","words").show(false) +-----+------------------------------------------------+ |label|words???????????????????????????????????????????| +-----+------------------------------------------------+ |0????|[kawhi, leonard, is, the, league, mvp]??????????| |1????|[caravaggio, pioneered, the, baroque, technique]| |2????|[using, apache, spark, is, cool]????????????????| +-----+------------------------------------------------+ val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features").setNumFeatures(20) val df3 = hashingTF.transform(df2) df3.select("label","features").show(false) +-----+-----------------------------------------------+ |label|features???????????????????????????????????????| +-----+-----------------------------------------------+ |0????|(20,[1,4,6,10,11,18],[1.0,1.0,1.0,1.0,1.0,1.0])| |1????|(20,[1,5,10,12],[1.0,1.0,2.0,1.0])?????????????| |2????|(20,[1,4,5,15],[1.0,1.0,1.0,2.0])??????????????| +-----+-----------------------------------------------+ val idf = new IDF().setInputCol("features").setOutputCol("scaledFeatures") val idfModel = idf.fit(df3) val df4 = idfModel.transform(df3) df4.select("label", "scaledFeatures").show(3,50) +-----+--------------------------------------------------+ |label|????????????????????????????????????scaledFeatures| +-----+--------------------------------------------------+ |????0|(20,[1,4,6,10,11,18],[0.0,0.28768207245178085,0...| |????1|(20,[1,5,10,12],[0.0,0.28768207245178085,0.5753...| |????2|(20,[1,4,5,15],[0.0,0.28768207245178085,0.28768...| +-----+--------------------------------------------------+


主成分分析(PCA)


主成分分析 (PCA) 是一種降維技術(shù),它將相關(guān)特征組合成一組較小的線性不相關(guān)特征(稱為主成分)。PCA在圖像識別和異常檢測等多個領(lǐng)域都有應(yīng)用。我將在第 4 章中更詳細(xì)地討論 PCA。

import org.apache.spark.ml.feature.PCA import org.apache.spark.ml.linalg.Vectors val data = Array(Vectors.dense(4.2, 5.4, 8.9, 6.7, 9.1),Vectors.dense(3.3, 8.2, 7.0, 9.0, 7.2),Vectors.dense(6.1, 1.4, 2.2, 4.3, 2.9) ) val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(2).fit(df) val result = pca.transform(df).select("pcaFeatures") result.show(false) +---------------------------------------+ |pcaFeatures????????????????????????????| +---------------------------------------+ |[13.62324332562565,3.1399510055159445] | |[14.130156836243236,-1.432033103462711]| |[3.4900743524527704,0.6866090886347056]| +---------------------------------------+


ChiSqSelector(奇思克選機)

ChiSq選擇性器使用卡方獨立性測試進(jìn)行特征選擇??ǚ綑z驗是一種檢驗兩個類別變量關(guān)系的方法。數(shù)字頂部特征是默認(rèn)的選擇方法。它返回基于卡方檢驗的一組特征,或具有最大預(yù)測影響的特征。其他選擇方法包括百分位數(shù)、fpr、fdr 和少數(shù)幾種。

import org.apache.spark.ml.feature.ChiSqSelector import org.apache.spark.ml.linalg.Vectors val data = Seq((0, Vectors.dense(5.1, 2.9, 5.6, 4.8), 0.0),(1, Vectors.dense(7.3, 8.1, 45.2, 7.6), 1.0),(2, Vectors.dense(8.2, 12.6, 19.5, 9.21), 1.0) ) val df = spark.createDataset(data).toDF("id", "features", "class") val selector = new ChiSqSelector().setNumTopFeatures(1).setFeaturesCol("features").setLabelCol("class").setOutputCol("selectedFeatures") val df2 = selector.fit(df).transform(df) df2.show() +---+--------------------+-----+----------------+ | id|????????????features|class|selectedFeatures| +---+--------------------+-----+----------------+ |??0|???[5.1,2.9,5.6,4.8]|??0.0|???????????[5.1]| |??1|??[7.3,8.1,45.2,7.6]|??1.0|???????????[7.3]| |??2|[8.2,12.6,19.5,9.21]|??1.0|???????????[8.2]| +---+--------------------+-----+----------------+


Correlation(相關(guān))

相關(guān)性評估兩個變量之間線性關(guān)系的強度。對于線性問題,您可以使用相關(guān)性來選擇相關(guān)要素(要素類相關(guān)性)并識別冗余要素(要素內(nèi)相關(guān)性)。斯帕克·姆利布支持皮爾遜和斯皮爾曼的相關(guān)性。在以下示例中,相關(guān)性計算輸入向量的相關(guān)矩陣。

import org.apache.spark.ml.linalg.{Matrix, Vectors} import org.apache.spark.ml.stat.Correlation import org.apache.spark.sql.Row val data = Seq(Vectors.dense(5.1, 7.0, 9.0, 6.0),Vectors.dense(3.2, 1.1, 6.0, 9.0),Vectors.dense(3.5, 4.2, 9.1, 3.0),Vectors.dense(9.1, 2.6, 7.2, 1.8) ) val df = data.map(Tuple1.apply).toDF("features") +-----------------+ |?????????features| +-----------------+ |[5.1,7.0,9.0,6.0]| |[3.2,1.1,6.0,9.0]| |[3.5,4.2,9.1,3.0]| |[9.1,2.6,7.2,1.8]| +-----------------+ val Row(c1: Matrix) = Correlation.corr(df, "features").head c1: org.apache.spark.ml.linalg.Matrix = 1.0??????????????????-0.01325851107237613??-0.08794286922175912?-0.6536434849076798 -0.01325851107237613??1.0???????????????????0.8773748081826724??-0.1872850762579899 -0.08794286922175912??0.8773748081826724????1.0?????????????????-0.46050932066780714 -0.6536434849076798??-0.1872850762579899???-0.46050932066780714??1.0 val Row(c2: Matrix) = Correlation.corr(df, "features", "spearman").head c2: org.apache.spark.ml.linalg.Matrix = 1.0???????????????????0.399999999999999????0.19999999999999898??-0.8000000000000014 0.399999999999999?????1.0??????????????????0.8000000000000035???-0.19999999999999743 0.19999999999999898???0.8000000000000035???1.0??????????????????-0.39999999999999486 -0.8000000000000014??-0.19999999999999743??-0.39999999999999486??1.0


您還可以計算存儲在數(shù)據(jù)幀列中的值的相關(guān)性,如下所示。

dataDF.show +------------+-----------+------------+-----------+-----------+-----+ |sepal_length|sepal_width|petal_length|petal_width|??????class|label| +------------+-----------+------------+-----------+-----------+-----+ |?????????5.1|????????3.5|?????????1.4|????????0.2|Iris-setosa|??0.0| |?????????4.9|????????3.0|?????????1.4|????????0.2|Iris-setosa|??0.0| |?????????4.7|????????3.2|?????????1.3|????????0.2|Iris-setosa|??0.0| |?????????4.6|????????3.1|?????????1.5|????????0.2|Iris-setosa|??0.0| |?????????5.0|????????3.6|?????????1.4|????????0.2|Iris-setosa|??0.0| |?????????5.4|????????3.9|?????????1.7|????????0.4|Iris-setosa|??0.0| |?????????4.6|????????3.4|?????????1.4|????????0.3|Iris-setosa|??0.0| |?????????5.0|????????3.4|?????????1.5|????????0.2|Iris-setosa|??0.0| |?????????4.4|????????2.9|?????????1.4|????????0.2|Iris-setosa|??0.0| |?????????4.9|????????3.1|?????????1.5|????????0.1|Iris-setosa|??0.0| |?????????5.4|????????3.7|?????????1.5|????????0.2|Iris-setosa|??0.0| |?????????4.8|????????3.4|?????????1.6|????????0.2|Iris-setosa|??0.0| |?????????4.8|????????3.0|?????????1.4|????????0.1|Iris-setosa|??0.0| |?????????4.3|????????3.0|?????????1.1|????????0.1|Iris-setosa|??0.0| |?????????5.8|????????4.0|?????????1.2|????????0.2|Iris-setosa|??0.0| |?????????5.7|????????4.4|?????????1.5|????????0.4|Iris-setosa|??0.0| |?????????5.4|????????3.9|?????????1.3|????????0.4|Iris-setosa|??0.0| |?????????5.1|????????3.5|?????????1.4|????????0.3|Iris-setosa|??0.0| |?????????5.7|????????3.8|?????????1.7|????????0.3|Iris-setosa|??0.0| |?????????5.1|????????3.8|?????????1.5|????????0.3|Iris-setosa|??0.0| +------------+-----------+------------+-----------+-----------+-----+ dataDF.stat.corr("petal_length","label") res48: Double = 0.9490425448523336 dataDF.stat.corr("petal_width","label") res49: Double = 0.9564638238016178 dataDF.stat.corr("sepal_length","label") res50: Double = 0.7825612318100821 dataDF.stat.corr("sepal_width","label") res51: Double = -0.41944620026002677


評估指標(biāo)

如第 1 章所述,精度、召回率和準(zhǔn)確性是評估模型性能的重要評估指標(biāo)。但是,它們可能并不總是某些問題的最佳指標(biāo)。

Area Under the Receiver Operating Characteristic (AUROC)


接收器工作特性 (AUROC) 下方的區(qū)域是用于評估二元分類器的常見性能指標(biāo)。接收器工作特性 (ROC) 是一個圖形,用于繪制真陽性率與假陽性率。曲線下的面積 (AUC) 是 ROC 曲線下方的面積。AUC 可以解釋為模型將隨機正示例排名高于隨機負(fù)示例的概率。十二曲線下方的面積越大(AUROC 越接近 1.0),模型的性能越好。AUROC 為 0.5 的模型是無用的,因為它的預(yù)測準(zhǔn)確性與隨機猜測一樣好。

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC").setRawPredictionCol("rawPrediction").setLabelCol("label")


F1 度量值

F1 度量值或 F1 分?jǐn)?shù)是精度和召回率的諧波平均值或加權(quán)平均值。它是用于評估多類分類器的常見性能指標(biāo)。當(dāng)類分布不均勻時,這也是一個很好的衡量標(biāo)準(zhǔn)。最好的F1分?jǐn)?shù)是1,而最差的分?jǐn)?shù)是0。良好的 F1 度量值意味著您的漏報率低,誤報率低。F1 度量值的公式為:F1-度量值 = 2 ?(精度?召回率)/(精度 + 召回率)。
?

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator val evaluator = new MulticlassClassificationEvaluator().setMetricName("f1").setLabelCol("label").setPredictionCol("prediction")


均方根誤差(RMSE)


均方根誤差 (RMSE) 是回歸任務(wù)的最常見指標(biāo)。RMSE 只是均方誤差 (MSE) 的平方根。MSE 指示回歸線與一組數(shù)據(jù)點的接近程度,方法是獲取從點到回歸線的距離或“誤差”,并將它們平方。十三MSE 越小,適合度越高。但是,MSE 與原始數(shù)據(jù)的單位不匹配,因為該值是平方的。RMSE 具有與輸出相同的單位。

import org.apache.spark.ml.evaluation.RegressionEvaluator val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")


我在后續(xù)章節(jié)中介紹了其他評估指標(biāo),例如平方誤差集合和內(nèi) (WSSSE) 和輪廓系數(shù)。有關(guān) Spark MLlib 支持的所有評估指標(biāo)的完整列表,請參閱 Spark 的在線文檔。

Model Persistence(模型持久性)

Spark MLlib 允許您保存模型并在以后加載它們。如果要將模型與第三方應(yīng)用程序集成或與團(tuán)隊的其他成員共享,這將特別有用。

保存單個隨機森林模型

rf = RandomForestClassifier(numBin=10,numTrees=30) model = rf.fit(training) model.save("modelpath")

加載單個隨機森林模型

val model2 = RandomForestClassificationModel.load(“modelpath”)

保存完整管道

val pipeline = new Pipeline().setStages(Array(labelIndexer,vectorAssembler, rf)) val cv = new CrossValidator().setEstimator(pipeline) val model = cv.fit(training) model.save("modelpath")

加載完整管道

val model2 = CrossValidatorModel.load("modelpath")

Spark MLlib示例

讓我們來看一個示例。我們將使用心臟病數(shù)據(jù)集十四從UCI機器學(xué)習(xí)存儲庫中預(yù)測心臟病的存在。這些數(shù)據(jù)是由羅伯特·德特拉諾,醫(yī)學(xué)博士,博士及其在VA醫(yī)療中心,長灘和克利夫蘭診所基金會的團(tuán)隊收集的。從歷史上看,克利夫蘭數(shù)據(jù)集一直是許多研究的主題,因此我們將使用該數(shù)據(jù)集。原始數(shù)據(jù)集有76個屬性,但其中只有14個用于ML研究(表2-1)。我們將進(jìn)行二項式分類并確定患者是否患有心臟病(清單2-2)。

Table 2-1克利夫蘭心臟病數(shù)據(jù)集屬性信息

屬性描述: __________
age年齡
sex性別
cp胸痛類型
trestbps靜息血壓
chol血清膽固醇(毫克/分升)
fbs空腹血糖>120毫克/分升
restecg靜息心電圖結(jié)果
thalach達(dá)到的最大心率
exang運動性心絞痛
oldpeak相對于休息的運動引起的 ST 段性抑郁
slope峰值運動ST段的斜率
ca用氟橡膠著色的主要容器(0-3)的數(shù)量
thal鉈應(yīng)力測試結(jié)果
num預(yù)測屬性 –?心臟病的診斷

讓我們開始吧。下載文件并將其復(fù)制到 HDFS。

wget http://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/cleveland.data head -n 10 processed.cleveland.data 63.0,1.0,1.0,145.0,233.0,1.0,2.0,150.0,0.0,2.3,3.0,0.0,6.0,0 67.0,1.0,4.0,160.0,286.0,0.0,2.0,108.0,1.0,1.5,2.0,3.0,3.0,2 67.0,1.0,4.0,120.0,229.0,0.0,2.0,129.0,1.0,2.6,2.0,2.0,7.0,1 37.0,1.0,3.0,130.0,250.0,0.0,0.0,187.0,0.0,3.5,3.0,0.0,3.0,0 41.0,0.0,2.0,130.0,204.0,0.0,2.0,172.0,0.0,1.4,1.0,0.0,3.0,0 56.0,1.0,2.0,120.0,236.0,0.0,0.0,178.0,0.0,0.8,1.0,0.0,3.0,0 62.0,0.0,4.0,140.0,268.0,0.0,2.0,160.0,0.0,3.6,3.0,2.0,3.0,3 57.0,0.0,4.0,120.0,354.0,0.0,0.0,163.0,1.0,0.6,1.0,0.0,3.0,0 63.0,1.0,4.0,130.0,254.0,0.0,2.0,147.0,0.0,1.4,2.0,1.0,7.0,2 53.0,1.0,4.0,140.0,203.0,1.0,2.0,155.0,1.0,3.1,3.0,0.0,7.0,1 hadoop fs -put processed.cleveland.data /tmp/data


我們使用spark-shell來交互式訓(xùn)練我們的模型。

spark-shell val dataDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(d("/tmp/data/processed.cleveland.data").toDF("id","age","sex","cp","trestbps","chol","fbs","restecg","thalach","exang","oldpeak","slope","ca","thal","num") dataDF.printSchema root|-- id: string (nullable = false)|-- age: float (nullable = true)|-- sex: float (nullable = true)|-- cp: float (nullable = true)|-- trestbps: float (nullable = true)|-- chol: float (nullable = true)|-- fbs: float (nullable = true)|-- restecg: float (nullable = true)|-- thalach: float (nullable = true)|-- exang: float (nullable = true)|-- oldpeak: float (nullable = true)|-- slope: float (nullable = true)|-- ca: float (nullable = true)|-- thal: float (nullable = true)|-- num: float (nullable = true) val myFeatures = Array("age", "sex", "cp", "trestbps", "chol", "fbs","restecg", "thalach", "exang", "oldpeak", "slope","ca", "thal", "num") import org.apache.spark.ml.feature.VectorAssembler val assembler = new VectorAssembler().setInputCols(myFeatures).setOutputCol("features") val dataDF2 = assembler.transform(dataDF) import org.apache.spark.ml.feature.StringIndexer val labelIndexer = new StringIndexer().setInputCol("num").setOutputCol("label") val dataDF3 = labelIndexer.fit(dataDF2).transform(dataDF2) val dataDF4 = dataDF3.where(dataDF3("ca").isNotNull).where(dataDF3("thal").isNotNull).where(dataDF3("num").isNotNull) val Array(trainingData, testData) = dataDF4.randomSplit(Array(0.8, 0.2), 101) import org.apache.spark.ml.classification.RandomForestClassifier val rf = new RandomForestClassifier().setFeatureSubsetStrategy("auto").setSeed(101) import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator val evaluator = new BinaryClassificationEvaluator().setLabelCol("label") import org.apache.spark.ml.tuning.ParamGridBuilder val pgrid = new ParamGridBuilder().addGrid(rf.maxBins, Array(10, 20, 30)).addGrid(rf.maxDepth, Array(5, 10, 15)).addGrid(rf.numTrees, Array(20, 30, 40)).addGrid(rf.impurity, Array("gini", "entropy")).build() import org.apache.spark.ml.Pipeline val pipeline = new Pipeline().setStages(Array(rf)) import org.apache.spark.ml.tuning.CrossValidator val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(pgrid).setNumFolds(3)

清單 2-2使用隨機森林執(zhí)行二元分類
現(xiàn)在,我們可以擬合模型了。

val model = cv.fit(trainingData)

對測試數(shù)據(jù)執(zhí)行預(yù)測。

val prediction = model.transform(testData)

讓我們評估模型。

import org.apache.spark.ml.param.ParamMap val pm = ParamMap(evaluator.metricName -> "areaUnderROC") val aucTestData = evaluator.evaluate(prediction, pm)

圖形處理

Spark包括一個名為 GraphX 的圖形處理框架。有一個單獨的包稱為圖形幀,它基于數(shù)據(jù)幀。圖形幀目前不是核心阿帕奇火花的一部分。在撰寫本文時,GraphX和圖形框架仍在積極開發(fā)中。十五我在第6章中介紹了GraphX。

超越Spark MLlib:第三方機器學(xué)習(xí)集成

Spark可以訪問豐富的第三方框架和庫生態(tài)系統(tǒng),這要歸功于無數(shù)的開源貢獻(xiàn)者以及微軟和谷歌等公司。雖然我介紹了核心的火花MLlib算法,但本書的重點是更強大的下一代算法和框架,如XGBoost,LightGBM,隔離森林,火花NLP和分布式深度學(xué)習(xí)。我將在以后的章節(jié)中介紹它們。

使用Alluxio優(yōu)化Spark和SparkMLlib

Alluxio,前身為Tachyon,是加州大學(xué)伯克利分校AMPLab的一個開源項目。Alluxio是一個以內(nèi)存為中心的分布式存儲系統(tǒng),最初由李浩元于2012年開發(fā),當(dāng)時他是一名博士生,也是AMPLab的Apache Spark提交者的創(chuàng)始者。十六該項目是伯克利數(shù)據(jù)分析堆棧(BDAS)的存儲層。2015年,Alluxio, Inc.由李創(chuàng)立,旨在將Alluxio商業(yè)化,從安德森·霍洛維茨獲得了750萬美元的現(xiàn)金注入。如今,Alluxio擁有來自全球50個組織的200多名貢獻(xiàn)者,如英特爾,IBM,雅虎和紅帽。目前,一些知名公司正在生產(chǎn)中使用Alluxio,如百度,阿里巴巴,Rackspace和巴克萊銀行。十七

Alluxio可用于優(yōu)化Spark機器學(xué)習(xí)和深度學(xué)習(xí)工作負(fù)載,為超大數(shù)據(jù)集提供超快的大數(shù)據(jù)存儲。Alluxio進(jìn)行的深度學(xué)習(xí)基準(zhǔn)測試顯示,從Alluxio而不是S3讀取數(shù)據(jù)時,性能顯著提高。十八

Architecture(建筑)

Alluxio是一個以內(nèi)存為中心的分布式存儲系統(tǒng),旨在成為大數(shù)據(jù)事實上的存儲統(tǒng)一層。它提供了一個虛擬化層,統(tǒng)一訪問不同的存儲引擎(如本地FS,HDFS,S3和NFS)和計算框架(如火花,地圖還原,蜂巢和普雷斯托)。圖 2-4 概述了 Alluxio 的架構(gòu)。

?圖2-4Alluxio?架構(gòu)概述
Alluxio是協(xié)調(diào)數(shù)據(jù)共享和指導(dǎo)數(shù)據(jù)訪問的中間層,同時為計算框架和大數(shù)據(jù)應(yīng)用程序提供高性能的低延遲內(nèi)存速度。阿盧克西奧與火花和Hadoop無縫集成,只需要很小的配置更改。通過利用Alluxio的統(tǒng)一命名空間功能,應(yīng)用程序只需連接到Alluxio即可訪問存儲在任何受支持的存儲引擎中的數(shù)據(jù)。阿盧克西奧有自己的原生API以及一個與Hadoop兼容的文件系統(tǒng)接口。方便類使用戶能夠執(zhí)行最初為 Hadoop 編寫的代碼,而無需更改任何代碼。REST API 提供對其他語言的訪問。我們將在本章的后面部分探討這些 API。

Alluxio的統(tǒng)一命名空間功能不支持關(guān)系數(shù)據(jù)庫和MPP引擎(如紅移或雪花)或文檔數(shù)據(jù)庫(如MongoDB)。當(dāng)然,支持與Alluxio和提到的存儲引擎之間的寫入。開發(fā)人員可以使用 Spark 等計算框架從 Redshift 表創(chuàng)建數(shù)據(jù)幀,并將其以 Parquet 或 CSV 格式存儲在 Alluxio 文件系統(tǒng)中,反之亦然(圖 2-5)。

?圖2-5Alluxio?技術(shù)架構(gòu)


為什么使用Alluxio?

顯著提高大數(shù)據(jù)處理性能和可擴展性

多年來,內(nèi)存變得越來越便宜,而其性能也越來越快。與此同時,硬盤驅(qū)動器的性能僅略有好轉(zhuǎn)。毫無疑問,內(nèi)存中的數(shù)據(jù)處理比處理磁盤上的數(shù)據(jù)快一個數(shù)量級。在幾乎所有的編程范例中,我們都建議在內(nèi)存中緩存數(shù)據(jù)以提高性能。與MapReduce相比,阿帕奇火花的主要優(yōu)勢之一是它能夠緩存數(shù)據(jù)。Alluxio將其提升到一個新的水平,提供的大數(shù)據(jù)應(yīng)用程序不僅僅是一個緩存層,而是一個成熟的分布式高性能以內(nèi)存為中心的存儲系統(tǒng)。

百度運營著世界上最大的Alluxio集群之一,擁有1000個工作節(jié)點,處理超過2PB的數(shù)據(jù)。借助Alluxio,百度在查詢和處理時間方面平均提高了10倍和30倍的性能,從而顯著提高了百度做出重要業(yè)務(wù)決策的能力。十九巴克萊發(fā)表了一篇文章,描述了他們在Alluxio的經(jīng)歷。巴克萊數(shù)據(jù)科學(xué)家吉安馬里奧·斯帕卡尼亞和高級分析主管哈里·鮑威爾能夠使用Alluxio將他們的Spark工作從幾小時調(diào)整到幾秒鐘。斷續(xù)器Qunar.com 是中國最大的旅游搜索引擎之一,使用Alluxio將性能提高了15倍至300倍。

多個框架和應(yīng)用程序可以以內(nèi)存速度共享數(shù)據(jù)

典型的大數(shù)據(jù)集群具有多個會話,這些會話運行不同的計算框架,例如Spark和MapReduce。在Spark的情況下,每個應(yīng)用程序都有自己的執(zhí)行器進(jìn)程,執(zhí)行器中的每個任務(wù)都在自己的JVM上運行,從而將Spark應(yīng)用程序彼此隔離。這意味著 Spark(和 MapReduce)應(yīng)用程序除了寫入 HDFS 或 S3 等存儲系統(tǒng)外,無法共享數(shù)據(jù)。如圖 2-6 所示,Spark 作業(yè)和 MapReduce 作業(yè)使用的是 HDFS 或 S3 中存儲的相同數(shù)據(jù)。在圖 2-7 中,多個 Spark 作業(yè)使用相同的數(shù)據(jù),每個作業(yè)在自己的堆空間中存儲自己的數(shù)據(jù)版本。不僅數(shù)據(jù)重復(fù),而且通過 HDFS 或 S3 共享數(shù)據(jù)可能會很慢,尤其是在共享大量數(shù)據(jù)的情況下。

?圖2-6通過HDFS或S3共享數(shù)據(jù)的不同框架

?圖2-7通過HDFS或S3共享數(shù)據(jù)的不同作業(yè)
通過將 Alluxio 用作堆外存儲(圖 2-8),多個框架和作業(yè)可以以內(nèi)存速度共享數(shù)據(jù),從而減少數(shù)據(jù)重復(fù)、提高吞吐量并減少延遲。

?圖 2-8以內(nèi)存速度共享數(shù)據(jù)的不同作業(yè)和框架
在應(yīng)用程序終止或發(fā)生故障時提供高可用性和持久性
在 Spark 中,執(zhí)行器進(jìn)程和執(zhí)行程序內(nèi)存駐留在同一 JVM 中,所有緩存數(shù)據(jù)存儲在 JVM 堆空間中(圖 2-9)。

?圖 2-9具有自己的堆內(nèi)存的 Spark 作業(yè)
當(dāng)作業(yè)完成或由于某種原因 JVM 由于運行時異常而崩潰時,緩存在堆空間中的所有數(shù)據(jù)都將丟失,如圖 2-10 和 2-11 所示。

?圖 2-10Spark 作業(yè)崩潰或完成

?圖 2-11Spark 作業(yè)崩潰或完成。堆空間丟失
解決方案是將 Alluxio 用作堆外存儲(圖 2-12)。

?圖 2-12使用 Alluxio 作為堆外存儲的 Spark
在這種情況下,即使 Spark JVM 崩潰,數(shù)據(jù)在 Alluxio 中仍然可用(圖 2-13 和圖 2-14)。

?圖 2-13Spark 作業(yè)崩潰或完成

?圖 2-14Spark 作業(yè)崩潰或完成。堆空間丟失。堆外內(nèi)存仍然可用
優(yōu)化整體內(nèi)存使用情況并最大限度地減少垃圾回收
通過使用 Alluxio,內(nèi)存使用效率大大提高,因為數(shù)據(jù)在作業(yè)和框架之間共享,并且由于數(shù)據(jù)存儲在堆外,垃圾回收也最小化,從而進(jìn)一步提高了作業(yè)和應(yīng)用程序的性能(圖 2-15)。

?圖 2-15多個 Spark 和 MapReduce 作業(yè)可以訪問存儲在 Alluxio 中的相同數(shù)據(jù)

降低硬件要求

使用Alluxio進(jìn)行大數(shù)據(jù)處理比使用HDFS和S3要快得多。IBM的測試顯示,在寫入IO方面,Alluxio的表現(xiàn)比HDFS高出110倍。二十三有了這種性能,對額外硬件的需求就會減少,從而節(jié)省了基礎(chǔ)設(shè)施和許可成本。

Apache Spark 和?Alluxio

您可以在Alluxio中訪問數(shù)據(jù),類似于從Spark訪問存儲在HDFS和S3中的數(shù)據(jù)。

val dataRDD = sc.textFile("alluxio://localhost:19998/test01.csv") val parsedRDD = dataRDD.map{_.split(",")} case class CustomerData(userid: Long, city: String, state: String, age: Short) val dataDF = parsedRDD.map{ a =>CustomerData(a(0).toLong, a(1).toString, a(2).toString, a(3).toShort) }.toDF dataDF.show() +------+---------------+-----+---+ |userid|???????????city|state|age| +------+---------------+-----+---+ |???300|???????Torrance|???CA| 23| |???302|Manhattan Beach|???CA| 21| +------+---------------+-----+---+

總結(jié)

本章簡要介紹了 Spark 和 Spark MLlib,足以為您提供執(zhí)行常見數(shù)據(jù)處理和機器學(xué)習(xí)任務(wù)所需的技能。我的目標(biāo)是讓你盡快上手。

總結(jié)

以上是生活随笔為你收集整理的【Spark ML】第 2 章: Spark和Spark简介的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

亚洲天天在线 | 日本性xxx| 草樱av | 国产精品v欧美精品v日韩 | av一级二级 | 在线观看日本韩国电影 | 久久久久久久久久久影视 | 激情久久伊人 | 草久久av| 国产va精品免费观看 | 亚洲日本va午夜在线影院 | 免费在线观看中文字幕 | 日韩欧美在线观看一区二区三区 | 毛片888 | 国产成在线观看免费视频 | 欧美国产日韩在线观看 | 波多野结衣视频一区 | 久久专区| 成年人免费看的视频 | 久草五月 | 日韩在线视频网 | 99久久精品免费看国产麻豆 | 久久免费中文视频 | 国色天香第二季 | 手机av电影在线观看 | 91精品视频网站 | 天天综合网国产 | 日韩欧美高清视频在线观看 | 黄色软件网站在线观看 | 日本黄色免费看 | 四虎亚洲精品 | 亚洲精品久久视频 | 国产亚洲欧洲 | 日本高清xxxx | 最新日韩在线 | 中文字幕之中文字幕 | 日精品| 色婷婷97 | 国产97视频在线 | 91久久久久久久 | 亚洲精品乱码久久久久久写真 | 亚洲视频99 | 91漂亮少妇露脸在线播放 | 男女全黄一级一级高潮免费看 | 91视频在线观看免费 | 日韩av中文 | 91亚洲精品在线观看 | 九色91福利| 欧美精品中文 | 色精品视频 | 97视频在线播放 | 美女网站久久 | 日韩a级免费视频 | 亚洲第二色 | 波多野结依在线观看 | 日韩欧美xx| 欧美日韩国产亚洲乱码字幕 | 国产成人精品久久久久蜜臀 | 亚洲国产精品视频 | 午夜精选视频 | 狠狠躁夜夜躁人人爽视频 | 国产成人精品一区二三区 | 久草视频免费 | 久草在线资源网 | 中文有码在线 | 国产一区在线免费观看 | 国产va精品免费观看 | 丁香婷婷久久久综合精品国产 | 国产精品1000 | 日本中文字幕在线 | 高清av网站 | 日韩三级一区 | 日韩一区二区三区免费电影 | 91亚洲国产| 91爱看片 | 久久激情五月婷婷 | 亚洲综合在线观看视频 | 99精品在线 | av线上看| 欧美另类性 | 亚洲国产综合在线 | 97精品超碰一区二区三区 | 色中射| 永久免费毛片 | 欧美激情综合五月 | 人人揉人人揉人人揉人人揉97 | 日韩中文字幕在线观看 | 国产精品一区二区三区电影 | 一级黄色免费网站 | 美女国内精品自产拍在线播放 | 97超碰免费 | 日本特黄一级 | 99热最新精品 | 97精品视频在线播放 | 成人一级电影在线观看 | 国产又粗又猛又爽 | 99久久精品国产观看 | 色综合五月天 | 国产精品18毛片一区二区 | 在线国产中文字幕 | 13日本xxxxxⅹxxx20 | 东方av免费在线观看 | 91精品一区二区在线观看 | 日韩一区二区三区高清在线观看 | 91香蕉嫩草 | 天天干夜夜夜操天 | 91自拍视频在线观看 | 91网站免费观看 | 久久亚洲精品电影 | 国产精品免费成人 | 久久香蕉影视 | 国产在线精品区 | 人人玩人人弄 | 欧美日本啪啪无遮挡网站 | 香蕉一区 | 国产精品中文字幕在线观看 | 一区二区精 | 97在线观看免费视频 | 天天干,天天干 | 日本99热| 日本中文字幕系列 | 亚洲高清视频在线播放 | 99精品欧美一区二区三区 | 中文字幕在线看视频国产 | 91视频a| 成人a v视频| 国产精品久久久久久久久久久久午 | 中文网丁香综合网 | www色网站| 国产亚洲一区二区三区 | 精品一区二区在线看 | 色99之美女主播在线视频 | 亚洲精品色| 国产精品手机在线播放 | 国产成人综合精品 | 337p西西人体大胆瓣开下部 | 最近中文字幕在线 | 天堂网av 在线 | 久久久av电影 | av成人免费 | 成人手机在线视频 | 91麻豆精品国产91久久久无限制版 | 亚洲精品国偷拍自产在线观看 | 久久九九精品 | 久久99久久99精品免费看小说 | 少妇bbb搡bbbb搡bbbb′ | 精品在线观看一区二区 | 国产高清视频免费 | 欧美污网站 | 日日夜夜天天久久 | 欧美日韩一二三四区 | 久久精品中文字幕免费mv | 中文字幕丰满人伦在线 | 亚洲免费av在线播放 | 涩涩色亚洲一区 | 亚洲片在线| 亚洲精品ww | 国产色婷婷精品综合在线手机播放 | 碰超在线观看 | 日韩精品一区二区三区免费观看视频 | 黄网站app在线观看免费视频 | 韩日电影在线观看 | 欧美日韩久 | 欧美国产日韩在线观看 | 青青五月天| 久久中文字幕视频 | 啪啪免费观看网站 | 午夜国产一区 | 99国产精品免费网站 | 天天操天天草 | 国产成人精品一区二区三区网站观看 | 91精品国自产拍天天拍 | 精品一区二区6 | 日日夜色| 国产一二区在线观看 | 成人免费一级片 | 天天操天天操天天操天天 | 日韩精品91偷拍在线观看 | 黄色性av | 国产精品九九热 | 99久久精品国产毛片 | 欧美日韩国产三级 | av在线最新 | 成人毛片一区 | 福利视频网址 | 国产色就色 | 免费在线观看午夜视频 | 99精品在线看 | 国产专区第一页 | 美女性爽视频国产免费app | 最近日本韩国中文字幕 | 国产精品自拍在线 | 怡红院久久 | 91综合久久一区二区 | 国产精品成人一区二区 | 91精品国产自产91精品 | av线上看| 91在线视频播放 | 美女露久久 | 成年美女黄网站色大片免费看 | 日韩一区视频在线 | 久久er99热精品一区二区三区 | 涩涩网站在线观看 | 久久亚洲电影 | 成年人国产视频 | 国产一级一级国产 | 最新国产在线 | 伊人五月天.com | 综合在线色 | 天天伊人狠狠 | av在线中文 | 国产精品嫩草影院123 | 中文字幕在线视频国产 | 黄网站色视频免费观看 | 丁香婷婷色综合亚洲电影 | 超碰在线资源 | 国产精品免费观看国产网曝瓜 | 国产在线成人 | 综合网婷婷 | 五月婷色| 成人动漫一区二区三区 | 国产黄a三级 | 色婷婷激情电影 | 月丁香婷婷 | 久草在线视频网站 | 国产啊v在线 | www.xxx.性狂虐 | 国产成人亚洲在线观看 | 一级一级一片免费 | 欧美一二区在线 | 亚洲一级理论片 | 婷婷激情影院 | 黄色三级在线 | 免费视频久久久 | 99热免费在线 | 亚洲欧美色婷婷 | 久久这里只有精品1 | 欧美吞精| 91在线免费视频观看 | 99re亚洲国产精品 | 激情av在线资源 | 精品免费视频. | 日韩一区二区三区在线看 | 成年人免费在线 | 欧美精品二 | 久久艹在线 | 久久精品视频在线播放 | 国产v欧美| 91av久久| 成人在线视频论坛 | 欧美日韩国产亚洲乱码字幕 | 免费一级片在线观看 | 97av影院 | 久久综合狠狠综合 | 日韩影视大全 | 久久久久久久久网站 | 免费中午字幕无吗 | 在线免费观看涩涩 | 国产一级免费视频 | 女人魂免费观看 | 黄色三级网站 | 免费男女羞羞的视频网站中文字幕 | 精品在线亚洲视频 | 欧美成天堂网地址 | 精品日本视频 | 欧美男女爱爱视频 | 啪嗒啪嗒免费观看完整版 | 免费观看国产视频 | 久久久久久国产精品亚洲78 | 免费人成在线观看网站 | 天天综合网 天天综合色 | 中文字幕视频 | 久久亚洲国产精品 | 97偷拍在线视频 | 亚洲成人av片在线观看 | 久黄色| 成人中心免费视频 | 色免费在线 | 在线色亚洲 | 国产精品永久免费观看 | 色香蕉在线 | 日本精品视频在线 | 区一区二在线 | 99精品欧美一区二区蜜桃免费 | 国产中文字幕视频在线观看 | 欧美日韩高清 | 一区 在线观看 | 黄色成人在线观看 | 日韩美av在线 | 日本精品一区二区三区在线播放视频 | 久久夜色精品国产亚洲aⅴ 91chinesexxx | 亚洲四虎 | 日韩一级成人av | 三级动图| 爱爱av网站| 久久久久亚洲精品国产 | 亚洲精品在线免费播放 | 黄色一级免费电影 | 国产精品视频免费看 | 一级黄色片在线免费观看 | 精品国产aⅴ一区二区三区 在线直播av | 欧美激情第八页 | 久久99久久99精品免观看软件 | 成年人视频免费在线播放 | 欧美视频www | 天堂av最新网址 | 91av视频在线播放 | 久久久久久亚洲精品 | www.久热| 久久精品亚洲一区二区三区观看模式 | 亚洲网久久 | 国产资源免费在线观看 | 在线小视频你懂的 | 日韩在线一二三区 | 国产69精品久久app免费版 | 欧美日韩在线精品一区二区 | 激情欧美一区二区三区免费看 | 国产免费又爽又刺激在线观看 | 国产精品欧美久久久久三级 | 国产一区电影在线观看 | 香蕉视频在线播放 | 亚洲精品乱码久久久久久写真 | 青青久视频 | 深夜精品福利 | 国产午夜三级 | 91精品久久久久久粉嫩 | 在线观看你懂的网站 | 色小说av | 国产黄a三级三级三级三级三级 | 日本xxxx裸体xxxx17 | 精品国产一区二区久久 | 国产麻豆精品一区二区 | 国产在线欧美 | 天天操天| 免费高清在线观看电视网站 | 国产韩国日本高清视频 | 久久超| 久久久久久久久久久久久久电影 | 波多野结衣综合网 | 深爱婷婷 | 日韩女同av| 韩国精品视频在线观看 | 日本论理电影 | 黄色一级大片免费看 | 99人久久精品视频最新地址 | 国产成在线观看免费视频 | 成人午夜性影院 | 在线观看免费av网 | 亚洲dvd| 免费黄a大片 | 亚洲精品xxxx | 午夜精品久久久久久久99水蜜桃 | 少妇资源站 | 欧美在一区 | 久久国产视频网 | 精品一区二区免费视频 | 黄色成人av网址 | 在线观看亚洲精品视频 | 99久精品 | 在线a视频免费观看 | 亚洲精品视频在线观看网站 | 国偷自产视频一区二区久 | 中文字幕av全部资源www中文字幕在线观看 | 91精品成人久久 | 久久黄色美女 | 国产精品久久久久久999 | 看国产黄色片 | 国产成人一区二区三区免费看 | 日韩高清免费无专码区 | 国产伦精品一区二区三区高清 | 久久久久成人精品免费播放动漫 | 婷婷六月综合亚洲 | 精品视频免费观看 | 国产精品观看视频 | 天天插天天狠 | 中文字幕视频观看 | 丁香在线视频 | 九草视频在线观看 | 亚洲精品在线免费观看视频 | 99 视频 高清 | 色偷偷888欧美精品久久久 | 日本激情中文字幕 | 中文字幕一区二区三区四区久久 | 久久久高清 | 精品中文字幕在线播放 | 深爱五月网 | 国产精品久久一区二区三区, | 狠狠色丁香婷婷综合橹88 | 国产黄色精品在线 | 日韩免费观看一区二区三区 | 欧美日韩视频在线观看一区二区 | 在线电影 一区 | 亚洲一区久久 | 日韩精品在线看 | 香蕉视频在线网站 | 91探花在线视频 | 狠狠干狠狠色 | 国产色综合天天综合网 | 亚洲成人免费观看 | 视频直播国产精品 | 国产成人av综合色 | 亚洲国产成人在线观看 | 狠狠操91| 日本久久久久久久久久 | 成人av一区二区三区 | 亚洲精品国产精品国自产观看 | 亚洲人人av | 色五月成人 | 久久久一本精品99久久精品66 | 亚洲精品乱码久久久久久蜜桃动漫 | 成年人免费看av | 免费精品国产va自在自线 | wwwwwww色| 欧美精品在线视频 | 青草草在线视频 | 天天操天天怕 | 96超碰在线 | 99精品国产免费久久久久久下载 | 中文字幕国产一区二区 | 福利一区在线 | 国产精选视频 | 97成人资源站 | 婷婷综合av | 天堂在线视频免费观看 | 国产特级毛片aaaaaaa高清 | 91激情视频在线播放 | av丝袜制服 | 久久国产精品区 | 国产成人一区二区啪在线观看 | 欧美成年黄网站色视频 | 亚洲精品av在线 | 2019av在线视频 | 亚洲精选视频在线 | 久久久久久久毛片 | 九九九视频在线 | 91激情视频在线 | 欧美一级视频免费看 | 99久久一区 | 成人啊 v | 日韩av资源在线观看 | 成人资源在线 | 在线观看av片| 天天草天天干 | 91资源在线视频 | 久久综合中文字幕 | 久久久久国产一区二区三区四区 | 婷婷激情av | 青春草视频在线播放 | av在线一| 欧美在线观看视频 | 看片的网址 | 日韩av成人在线观看 | 免费观看一级特黄欧美大片 | bbbb操bbbb | www.黄色在线| 天天操夜夜拍 | 中国一级片在线 | 国产美女久久久 | 久久国产精品一区二区 | 99亚洲精品 | 成人资源在线观看 | 精品视频久久久 | 国产视频亚洲 | 国产在线精品一区二区 | 久草在线观看资源 | 亚洲激情校园春色 | 亚洲经典在线 | 免费高清在线视频一区· | 精品专区一区二区 | 在线精品国产 | 国产中文字幕在线播放 | 福利二区视频 | 成人理论电影 | 国产精品久久久久高潮 | 亚洲视频免费在线观看 | 日韩一区二区三免费高清在线观看 | 国产爽视频 | 狠狠狠狠狠狠天天爱 | 日韩高清一区在线 | 又黄又爽的免费高潮视频 | 999久久久久久久久久久 | 91麻豆精品国产91久久久更新时间 | 国产在线不卡一区 | 狠狠色丁香久久婷婷综合丁香 | 最新色站 | 最近中文字幕mv免费高清在线 | 国产精品国产自产拍高清av | 亚洲欧洲国产精品 | 天天操人人要 | 成人四虎影院 | 天天爱天天操天天干 | 亚州精品成人 | 美女黄视频免费 | 97超碰在线免费 | 欧美午夜性 | 久久免费试看 | 精品国产一区二区三区久久久蜜月 | 美女视频黄在线观看 | 久久久久久久久久久久av | 免费观看一区二区 | 69亚洲乱 | 夜夜视频资源 | 国产精品久久久久久一二三四五 | 最近日本韩国中文字幕 | av亚洲产国偷v产偷v自拍小说 | 国产超碰在线观看 | 亚洲精品在线免费观看视频 | 国产精品激情偷乱一区二区∴ | 日本论理电影 | 亚洲国产99 | 欧美精品亚州精品 | 成人免费在线网 | www国产精品com | 美女福利视频网 | 伊人久久婷婷 | 久久久精品国产免费观看一区二区 | 亚洲精品人人 | 狠狠色伊人亚洲综合成人 | 黄色av网站在线观看 | 欧美日韩高清一区二区 国产亚洲免费看 | 日本一区二区免费在线观看 | 极品国产91在线网站 | 色婷婷九月 | 国产精品99蜜臀久久不卡二区 | 国产精成人品免费观看 | 成人黄色在线电影 | 在线看片成人 | 久久视频免费在线观看 | 欧美日韩高清一区二区 国产亚洲免费看 | 亚洲视频电影在线 | 在线看片一区 | 亚洲精品成人 | 国产精品粉嫩 | 亚洲精品在线视频观看 | 精品国产一区二区三区久久 | 夜夜夜夜猛噜噜噜噜噜初音未来 | 国产精品免费在线播放 | 国产精品久久久久一区二区三区共 | 综合久久久久久 | 激情五月播播久久久精品 | 国产色在线观看 | 色五丁香 | 麻豆成人精品视频 | 狠狠色狠狠综合久久 | 97色在线视频| 日本美女xx | 9幺看片| 国产精品二区在线 | 射射射av| 天天舔天天搞 | 国产成人精品午夜在线播放 | 欧美 高跟鞋交 xxxxhd | 久草视频在 | 视频高清| 在线欧美小视频 | 免费看十八岁美女 | www.黄色片网站 | 在线观看亚洲成人 | 亚洲在线资源 | 成人免费看电影 | 日韩久久精品一区二区 | 99精品在线 | 国产精品久久亚洲 | 国产精品孕妇 | 天天色天天操综合网 | 久久99热这里只有精品 | www激情com| 麻豆播放 | 国产一级片免费观看 | 毛片基地黄久久久久久天堂 | 天天干夜夜擦 | 日韩在线视频一区二区三区 | 日韩有码专区 | 亚洲成a人片在线www | 亚洲精品国产精品国产 | 欧美国产日韩一区二区三区 | 91av99| 久久天堂精品视频 | 亚洲在线黄色 | 日韩免费在线视频 | 日本精品免费看 | 在线观看小视频 | 欧美激情第一区 | 色狠狠干| www.com久久 | 欧美一区成人 | 欧美伦理电影一区二区 | 三级黄在线 | 久久久在线| 日日夜夜亚洲 | 午夜精品影院 | 一本一道波多野毛片中文在线 | 天天做日日爱夜夜爽 | 中文字幕在线有码 | 中文字幕永久 | 婷婷色网视频在线播放 | 五月天视频网站 | 欧美一级片播放 | 国产精品女人网站 | 久草香蕉在线 | 狠狠狠狠狠狠狠干 | 超碰在线天天 | 精品一区二区三区香蕉蜜桃 | 中文字幕在线免费播放 | 国产综合片 | 一级黄色在线视频 | 色av色av色av | 国产精品第 | 午夜国产一区二区 | 久久久久久久久福利 | 天天操天天插 | 在线精品观看国产 | 五月婷婷av在线 | 国产区高清在线 | 国产一级片直播 | 91片在线观看 | 欧美精品黑人性xxxx | 欧美精品久久久久久久 | 99精品免费视频 | 色噜噜狠狠色综合中国 | 欧美老少交 | 黄色国产高清 | 中文字幕一区二区三 | 天天色天天爱天天射综合 | wwwav视频| 日韩成人精品一区二区三区 | 久久草草影视免费网 | 欧美一区二区三区在线观看 | 狠狠久久综合 | 日本久久久久久久久久久 | 热re99久久精品国产66热 | 99久久久久成人国产免费 | 免费国产在线精品 | 欧美色综合天天久久综合精品 | 五月视频 | 久久天堂精品视频 | 99热这里只有精品久久 | 欧美日韩精品区 | 九九欧美 | 天天玩天天干 | 日韩中字在线 | 麻豆国产视频下载 | 亚洲欧洲av | 欧美在线观看视频一区二区三区 | 国产生活一级片 | 成人综合免费 | 人人插人人做 | www.狠狠插.com| 欧美精品乱码久久久久久按摩 | 欧美日韩国产亚洲乱码字幕 | 亚洲视频综合 | 中文字幕在线观看网址 | 国产剧情一区二区 | 中文字幕在线视频免费播放 | 婷婷综合久久 | 精品国产伦一区二区三区观看方式 | 成人午夜黄色 | 日日夜av| 成人黄色片免费看 | 色福利网 | 日韩av一区在线观看 | 国产日韩一区在线 | 97偷拍视频 | 91久久精品日日躁夜夜躁国产 | 在线av资源 | 天天综合91| 人人插人人射 | 国产欧美日韩精品一区二区免费 | 丁香六月婷| 国产在线视频一区 | 九九免费观看视频 | 欧美激情va永久在线播放 | 97看片网 | 91成人午夜 | 国产精品嫩草影院99网站 | 久久免费成人网 | 日日夜操 | 日日操操操 | 欧美一区二区日韩一区二区 | 久久免费看a级毛毛片 | 国产精品视频资源 | 免费看的黄网站软件 | 久久黄色美女 | 狠狠色狠狠色综合日日小说 | 日本不卡一区二区 | 欧美成亚洲 | 91 在线视频 | 大荫蒂欧美视频另类xxxx | 人人爽人人爽人人爽学生一级 | 久久只有精品 | 国产色黄网站 | 狠狠色丁香婷婷综合橹88 | 日韩三级不卡 | 激情婷婷在线 | 欧美a级免费视频 | 久久综合给合久久狠狠色 | 香蕉日日 | 亚洲六月丁香色婷婷综合久久 | 欧美一级大片在线观看 | 色婷婷a| 香蕉网在线播放 | 午夜精品久久久久久久99 | 丁香综合av| 免费看一级片 | 91大神精品视频 | 丁香激情综合久久伊人久久 | 国产精品免费一区二区 | 麻豆影视在线观看 | 91九色精品女同系列 | 国产成人久久精品77777 | 五月开心六月伊人色婷婷 | 欧美一级黄色网 | 香蕉视频免费在线播放 | 日韩r级在线 | 韩国一区视频 | 欧洲在线免费视频 | 成人毛片在线观看 | 96国产在线 | 国外成人在线视频网站 | 九九欧美| 9999精品 | 人人看黄色 | 国产一区国产精品 | 国内少妇自拍视频一区 | 在线观看视频免费大全 | 天天看天天干天天操 | 欧美成人999| 欧美性高跟鞋xxxxhd | 91av在线免费播放 | av在线播放免费 | 久久视讯 | 911久久| 久久高清国产视频 | 色免费在线 | 成年人在线观看网站 | 激情综合五月天 | 日韩一级成人av | 日b黄色片 | 久久精品中文字幕少妇 | 制服丝袜亚洲 | 久久午夜免费观看 | 日日夜夜人人天天 | 国产精品无 | 午夜影视av | 夜夜操天天| www91在线观看 | 在线成人国产 | 99热国内精品 | 久草视频在线新免费 | 精品一区二区日韩 | 日韩电影一区二区三区 | 日日干夜夜干 | 亚洲午夜久久久影院 | 久久国产精品一二三区 | 久久福利综合 | 在线观看网站黄 | 丝袜美女视频网站 | 国产精品二区三区 | 亚洲精品高清在线 | 九九国产视频 | a黄色片在线观看 | 天天曰天天射 | 手机av永久免费 | 婷婷电影网| 国产精品黄色在线观看 | 日日狠狠| 精品视频在线视频 | 成人小电影在线看 | 国产成人av网址 | 五月婷婷中文 | www免费在线观看 | 在线视频国产区 | 丝袜美腿亚洲 | 日本在线视频网址 | 精品久久国产精品 | av三级在线免费观看 | 亚洲精品美女久久久久 | 天天爽综合网 | 国产成人在线看 | 精品久久片 | 国产一区成人 | 五月婷婷欧美 | 国产一二三四在线观看视频 | 久久人人精品 | 美女久久久久久久久久久 | 日韩av图片 | 午夜av在线播放 | 福利视频第一页 | 日韩久久一区二区 | 欧美aaa级片 | 天天狠狠操 | 色综合久久久久久久久五月 | 亚洲精品一区二区三区在线观看 | 国产一级视频在线免费观看 | 国产a网站 | 在线午夜 | 久久免费看视频 | 国产午夜激情视频 | 日本公妇色中文字幕 | 久久99九九99精品 | 成人国产精品入口 | 丁香六月综合网 | 欧美日韩视频免费 | 久久精品99久久 | 成年人黄色免费看 | avwww在线 | 国产第一页福利影院 | 国产一级在线看 | 国产一区在线免费观看视频 | 香蕉影视在线观看 | 国产成人精品久久久 | 亚洲九九| 国产精品18p | 在线视频一区二区 | 亚洲做受高潮欧美裸体 | 国产精品尤物 | 激情开心色 | 国内精品视频一区二区三区八戒 | 成人91免费视频 | 黄色精品久久久 | 精品久久综合 | 国产精品久久一区二区无卡 | 久久国产一区二区 | 久久夜色精品国产欧美一区麻豆 | 欧美男男激情videos | 午夜精品视频一区二区三区在线看 | 久久久激情网 | 久久久毛片 | 99这里只有久久精品视频 | 亚洲 成人 一区 | 天天曰| 91久久精品一区二区三区 | 一区二区三区电影大全 | 91精品中文字幕 | 九九视频这里只有精品 | 亚洲久久视频 | 深爱激情站 | 精品久久久久久久久久久久久久久久久久 | 久久精品国产99 | 亚洲成av | 99在线观看精品 | 日本精品视频在线 | a成人v | 久久久久久久久亚洲精品 | 亚洲九九影院 | 国产亚洲精品久久久久久网站 | 又色又爽又激情的59视频 | 欧美一级电影免费观看 | 五月婷婷在线视频观看 | 香蕉在线影院 | 亚州欧美精品 | 一级黄色视屏 | 99精彩视频在线观看免费 | 黄色毛片视频免费观看中文 | 亚洲欧美日本A∨在线观看 青青河边草观看完整版高清 | 美女精品在线 | 人成午夜视频 | 欧美日本一二三 | 国产欧美日韩视频 | 久草| av免费看在线 | 少妇自拍av | 不卡的av在线播放 | 久久成人人人人精品欧 | av 一区二区三区 | 在线看日韩av | 丝袜制服天堂 | 成年人在线免费看片 | 婷婷五月在线视频 | 国产91亚洲精品 | 成人亚洲欧美 | 欧美成人在线免费观看 | 国产视频一区在线播放 | 亚洲成人第一区 | 午夜精品久久久 | 国产精品免费大片视频 | 正在播放国产精品 | 婷婷播播网 | 日日干网 | 免费看污黄网站 | 免费在线国产视频 | 国产精品美女久久久 | 又长又大又黑又粗欧美 | 天天操人人干 | 91精品视频免费看 | 婷婷色吧 | 蜜臀av夜夜澡人人爽人人桃色 | 国产精品男女啪啪 | 伊人狠狠色丁香婷婷综合 | 中文字幕在线看人 | 成人久久久久久久久 | 超碰在线观看97 | 国产麻豆成人传媒免费观看 | www视频免费在线观看 | 久久伊人五月天 | 免费久久99精品国产婷婷六月 | 国产69精品久久久久久 | 精品久久久久久久久久久院品网 | 中文字幕4 | www.av中文字幕.com | av电影免费看 | 成人影视免费看 | 久久99久久99精品免观看粉嫩 | 日韩欧美电影在线 | 中文在线a天堂 | 欧美日一级片 | www.香蕉| 美女免费网站 | 国产在线欧美 | av电影一区二区三区 | 日韩在线电影观看 | 亚洲91在线 | 国产精品免费久久久久影院仙踪林 | 亚洲国产69 | 久久精品站 | 国产尤物在线 | 久久久久久久久久久高潮一区二区 | 天堂v中文 | www.五月婷婷.com| 日韩在线观看视频一区二区三区 | 欧美性超爽 | 国产精品久久久久久久久免费看 | a v在线观看 | 国产免费成人 | 在线免费视频你懂的 | 91精品中文字幕 | 久久99精品久久久久久清纯直播 | 国产黄色大片免费看 | 不卡中文字幕在线 | 精品久久精品 | 国产成人久久精品 | 99久久精品网| 2021国产在线视频 | 免费在线观看中文字幕 | a黄在线观看 | 手机成人在线电影 | 97人人看| 99国内精品久久久久久久 | 麻豆91精品视频 | 激情网第四色 | 国产黄在线观看 | 国产高清视频 | 在线免费观看的av网站 | 在线观看福利网站 | 超碰在线1| 日本福利视频在线 | 五月激情六月丁香 | 一区二区三区在线看 | 四川bbb搡bbb爽爽视频 | 99精品视频观看 | 麻豆视频免费在线观看 | 看片网站黄色 | 性日韩欧美在线视频 | 黄网站免费大全入口 | 亚洲欧美日本一区二区三区 | 久久精品一区二区国产 | 久久草av| 日韩两性视频 | 欧美性色综合网 | 国产欧美最新羞羞视频在线观看 | 亚洲激情网站免费观看 | 深爱婷婷 | 91在线国产观看 | 国产最顶级的黄色片在线免费观看 | 国产精品ssss在线亚洲 | www五月天婷婷 | 久久成人国产精品免费软件 | 亚州欧美精品 | 久久免费毛片 | 色夜影院 | 天天干 天天摸 天天操 | 黄色aaa级片 | 粉嫩高清一区二区三区 | 9色在线视频 | 日本中文字幕在线免费观看 | 狠狠躁天天躁 | 在线中文字母电影观看 | 黄色小网站在线观看 | 免费中午字幕无吗 | 91麻豆高清视频 | 免费黄色激情视频 | 亚洲三级黄色 | a一片一级 | 99久热在线精品 | 日韩久久久久久 | 亚洲国产精品传媒在线观看 | 深爱激情站 | 九七在线视频 | 99热免费在线 | www.天天成人国产电影 | 亚洲综合激情网 | www免费看片com | 在线观看91视频 | 蜜臀91丨九色丨蝌蚪老版 | 黄色小视频在线观看免费 | 国产最顶级的黄色片在线免费观看 | 亚洲另类在线视频 | 亚洲精品成人 |