1. spark ML概述
ML組件
ML的標準API使用管道(pipeline)這樣的方式,可以將多個算法或者數據處理過程整合到一個管道或者一個流程里運行,其中包含下面幾個部分:
1. dataFrame:用于ML的dataset,保存數據
2. transformer:將一個dataFrame按照某種計算轉換成另外一個dataFrame,例如把一個包含特征的dataFrame通過模型預測,生成一個包含特征和預測的dataFrame
3. estimator:根據訓練樣本進行模型訓練(fit),并且得到一個對應的transformer
4. pipeline:將多個transformer和estimator串成一個ML的工作流
5. parameter:transformer和estimator共用一套API來確定參數
spark的機器學習包有兩個,一個是ML,一個是MLlib,前者是基于dataFrame的API實現的,后者是基于RDD的API實現的,官網推薦用前者,使用比較方便。
transformer
一個transformer包含特征轉換和已學習得到的數據模型,它實現了一個方法transform()
例如:一個特征transformer可能將一個dataFrame的某些列映射成新的列,然后輸出處理后的新的dataFrame;一個學習得到的模型將讀取一個包含特征的dataFrame,對每個樣本進行預測,并且把預測結果附加到這個dataFrame,得到一個新的dataFrame
Estimators
主要用于訓練模型,實現了一個方法fit(),接受一個包含特征的dataFrame,然后訓練得到一個模型,那個模型就是一個transformer
例如:一個LogisticRegression是一個estimator,然后通過調用fit(),得到一個LogisticRegressionModel,這是一個transformer。
每個transformer和estimator都有一個唯一ID,用于保存對應的參數
pipeline
例如一個文本挖掘包含以下三個步驟:
1. 將文本切分成詞
2. 將詞轉換成特征向量
3. 訓練得到一個模型,然后用于預測
spark ML將這樣一個工作流定義為pipeline,一個pipeline包含多個PipelineStages (transformer和estimator),通過dataFrame在各個stage中進行傳遞。
這是一個訓練模型的例子,包含了三個步驟,藍色的是指transformer,紅色是estimator
這是一個使用已訓練模型預測樣本的例子,
Parameters
一個Paramap包含多個(parameter, value)的鍵值對
有兩種方法將參數傳給算法:
1. 將參數設置到算法的一個實例,例如lr是LogisticRegression的一個實例,則他可以調用lr.setMaxIter(10)來設置訓練循環次數
2. 將paramap作為輸入參數,給fit()或者transform(),這些參數會都會覆蓋掉原來set的值
我們可以將paramap傳給不同實例,例如lr1和lr2是LogisticRegression的兩個實例,我們可以建立ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)的參數列表,即將兩個實例的參數都放在paramMap中
spark1.6的版本可以使用import/export導出模型或者pipeline到磁盤上
范例1
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.param.ParamMap import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.sql.Rowval training = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(0.0, 1.1, 0.1)),(0.0, Vectors.dense(2.0, 1.0, -1.0)),(0.0, Vectors.dense(2.0, 1.3, 1.0)),(1.0, Vectors.dense(0.0, 1.2, -0.5)) )).toDF("label", "features")//創建一個LogisticRegression實例,這是一個Estimator. val lr = new LogisticRegression() //打印參數 println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")//調用實例的set方法設置參數 lr.setMaxIter(10).setRegParam(0.01)// 學習LogisticRegression模型,model1是一個transformer val model1 = lr.fit(training)println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)// 通過paramap來設置參數 val paramMap = ParamMap(lr.maxIter -> 20).put(lr.maxIter, 30) .put(lr.regParam -> 0.1, lr.threshold -> 0.55)// 兩個ParamMap之間可以相加合并. val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name val paramMapCombined = paramMap ++ paramMap2val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)//測試數據 val test = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(-1.0, 1.5, 1.3)),(0.0, Vectors.dense(3.0, 2.0, -0.1)),(1.0, Vectors.dense(0.0, 2.2, -1.5)) )).toDF("label", "features")//model2的transform()會只選擇features的數據,不會把label數據包含進去 model2.transform(test).select("features", "label", "myProbability", "prediction").collect().foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>println(s"($features, $label) -> prob=$prob, prediction=$prediction")} import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.sql.Rowval training = sqlContext.createDataFrame(Seq((0L, "a b c d e spark", 1.0),(1L, "b d", 0.0),(2L, "spark f g h", 1.0),(3L, "hadoop mapreduce", 0.0) )).toDF("id", "text", "label")/* 初始化一個pipeline,包含三個步驟: tokenizer, hashingTF, and lr. tokenizer 負責切詞,hashingTF負責按詞進行特征排列,lr負責模型訓練 */ val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01) val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))val model = pipeline.fit(training)// 將訓練后得到的模型保存到磁盤 model.save("/tmp/spark-logistic-regression-model")// 把未訓練的pipeline保存到磁盤 pipeline.save("/tmp/unfit-lr-model")// 從磁盤讀取模型 val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")// 測試數據 val test = sqlContext.createDataFrame(Seq((4L, "spark i j k"),(5L, "l m n"),(6L, "mapreduce spark"),(7L, "apache hadoop") )).toDF("id", "text")model.transform(test).select("id", "text", "probability", "prediction").collect().foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>println(s"($id, $text) --> prob=$prob, prediction=$prediction")}model selection
- ML里面用CrossValidator類來做交叉驗證,這個類包含一個estimator、一堆paramMap、和一個evaluator。
evaluator有三個子類,包括regressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator。
- ML中除了cv以外,還有一種指定樣本劃分的驗證方式,TrainValidationSplit 類,默認是0.75,即3/4用于做訓練,1/4用于做測試。其他跟cv一樣
總結
以上是生活随笔為你收集整理的1. spark ML概述的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vue实现幻灯片切换效果
- 下一篇: 乘法逆元模板