用户画像之Spark ML实现
? ? ? ? ? ? ? ? ? ? ? ? ? 用戶畫像之Spark ML實(shí)現(xiàn)
1?Spark ML簡單介紹
Spark ML是面向DataFrame編程的。Spark的核心開發(fā)是基于RDD(彈性分布式數(shù)據(jù)集),但是RDD,但是RDD的處理并不是非常靈活,如果要做一些結(jié)構(gòu)化的處理,將RDD轉(zhuǎn)換成DataFrame,DataFrame實(shí)際上就是行對象的RDD+schema,類似于原本的文本數(shù)據(jù),加上schema,做一下結(jié)構(gòu)的轉(zhuǎn)換就變成數(shù)據(jù)庫里面的表,表是有元數(shù)據(jù)的,有字段有類型。所以DataFrame處理起來更加靈活。
要進(jìn)行機(jī)器學(xué)習(xí)是有一系列的流程,通常離線的處理現(xiàn)有一組數(shù)據(jù)集,然后進(jìn)行預(yù)處理特征工程,完成之后分成訓(xùn)練集合測試集,基于訓(xùn)練集訓(xùn)練模型,然后選擇算法,進(jìn)行評估..這是可以形成一個管道的,整體是一個DAG有向無環(huán)圖。
其實(shí)整個進(jìn)行模型算法訓(xùn)練的過程就是一個管道,管道中就會有各種各樣的組件,這些組件總體來說可以分成兩類,①第一個是Transformers:transform()用于轉(zhuǎn)換,把一個DataFrame轉(zhuǎn)換為另一個DataFrame,如把原本的數(shù)據(jù)集拆分成測試集,那就是DataFrame的轉(zhuǎn)換,像分詞,抽樣,模型的測試都是非常常見的轉(zhuǎn)換操作,②第二種類型就是Estimators:fit()應(yīng)用在DF上生成一個轉(zhuǎn)換器算法,Estimators評估器,用到的函數(shù)是fit(),Estimators是為了生成一個轉(zhuǎn)換器,在機(jī)器學(xué)習(xí)中會用到一些算法,需要去建模,根據(jù)訓(xùn)練集得到模型,模型本質(zhì)上就是轉(zhuǎn)換器,進(jìn)行預(yù)測是用的這個模型進(jìn)行預(yù)測,所以轉(zhuǎn)換是基于這個模型進(jìn)行預(yù)測,所以轉(zhuǎn)換就是基于這個模型的轉(zhuǎn)換器轉(zhuǎn)換時他的實(shí)例來進(jìn)行轉(zhuǎn)換。
2 Spark ML的工作流程
首先進(jìn)行預(yù)處理,包括模型訓(xùn)練的整個過程是一個管道pipline,這個pipline的目的是為了得到一個Estimator,即得到一個模型,假如說用邏輯回歸,輸入的數(shù)據(jù)是普通的文本,首先進(jìn)行Toknizer分詞,分完次后計(jì)算他的詞頻,這兩個本質(zhì)上否是transform的操作,接下來就要創(chuàng)建一個邏輯回歸的實(shí)例,本質(zhì)上就是一個Estimator,得到一個轉(zhuǎn)換器。
模型有了接下來就要做預(yù)測,不管是訓(xùn)練集還是測試集,都是要進(jìn)行分詞,計(jì)算詞頻的,這個piplineModel整個都是transform操作,這個模型邏輯回歸就是上一步通過訓(xùn)練的到的模型。
參數(shù)是所有轉(zhuǎn)換器和評估器共享的一個公共api,參數(shù)名Param是一個參數(shù),可以通過setter單獨(dú)定義;也可以通過ParamMap定義一個參數(shù)的集合(parameter,value),傳遞參數(shù)的兩種方式:①通過setter為實(shí)例設(shè)置參數(shù)②傳遞ParamMap給fit或者transform方法
3 Estimator,Transformer,Param使用案例
(1)準(zhǔn)備帶標(biāo)簽和特征的數(shù)據(jù)
(2)創(chuàng)建邏輯回歸的評估器
(3)使用setter方法設(shè)置參數(shù)
(4)使用存儲在lr中的參數(shù)來訓(xùn)練一個模型
(5)使用ParamMap選擇指定的參數(shù)
(6)準(zhǔn)備測試數(shù)據(jù)
(7)預(yù)測結(jié)果
代碼具體實(shí)現(xiàn)
(1)準(zhǔn)備帶標(biāo)簽和特征的數(shù)據(jù)
任何應(yīng)用首先要把需要的類通過import引入,性別預(yù)測是分類問題,選擇邏輯回歸
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.linalg.{Vector,Vectors} import org.apache.spark.sql.Row定義一個初始的DataFrame,通過sqlContext創(chuàng)建,用Seq序列的方式創(chuàng)建一個集合,第一個參數(shù)是標(biāo)簽即目標(biāo)值,后面的為特征,
val sqlContext=new org.apache.spark.sql.SQLContext(sc) val training = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(1.0,2.1,1.1)),(0.0, Vectors.dense(3.0,2.0,-2.0)),(0.0, Vectors.dense(3.0,0.3,1.0)),(1.0, Vectors.dense(1.0,1.2,-1.5)) )).toDF("label","features")(2)創(chuàng)建邏輯回歸的評估器,設(shè)置參數(shù)
val lr = new LogisticRegression() //評估器會帶一些默認(rèn)的參數(shù),通過explainParams()查看 println(lr.explainParams()) //通過set方式修改迭代次數(shù)和正則化參數(shù) lr.setMaxIter(10).setRegParam(0.01)//定義模型, val model1 = lr.fit(training) //查看模型的參數(shù) model1.parent.extractParamMap//通過ParamMap設(shè)置參數(shù) val paramMap = ParamMap(lr.maxIter -> 20). put(lr.maxIter,30). put(lr.regParam -> 0.1, lr.threshold -> 0.55)val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") //將兩個ParamMap對象合并 val paramMapCombined = paramMap ++ paramMap2//根據(jù)ParamMap設(shè)置的參數(shù)定義模型, val model2 = lr.fit(training, paramMapCombined) model2.parent.extractParamMap(3)準(zhǔn)備測試數(shù)據(jù)
val test = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(-1.2,1.8,1.3)),(0.0, Vectors.dense(4.0,1.8,-0.1)),(1.0, Vectors.dense(0.0,1.9,-1.5)) )).toDF("label","features")(4)預(yù)測結(jié)果
//調(diào)用模型1 model1.transform(test).select("label","features","probability","prediction").collect().foreach{case Row(label: Double, features: Vector, probability: Vector, prediction: Double) => println(s"($features, $label) -> probability=$probability, prediction=$prediction")}4 構(gòu)建Pipline和保存Pipline
步驟:
(1)準(zhǔn)備訓(xùn)練的文檔
(2)配置ML管道,包含三個stage:Tokenizer,HashingTF和LR
(3)安裝管道到數(shù)據(jù)上
(4)保存管道到磁盤,包括安裝好的和未安裝好的
(5)加載管道
(6)準(zhǔn)備測試文檔
(7)預(yù)測結(jié)果
代碼實(shí)現(xiàn):
(1)引入需要的類
//用的數(shù)邏輯回歸 import org.apache.spark.ml.classification.LogisticRegression //因?yàn)樘卣鞴こ烫幚淼氖翘卣飨蛄?#xff0c;所以需要Vector,輸入輸出會用到 import org.apache.spark.ml.linalg.Vector //行對象,為了輸出美化 import org.apache.spark.sql.Row //需要分詞需要Tokenizer,需要轉(zhuǎn)換計(jì)算詞頻需要HashingTF import org.apache.spark.ml.feature.{Tokenizer,HashingTF} //需要Pipeline將多個Transformers和Estimators連接起來以確定一個ML工作流程 import org.apache.spark.ml.{Pipeline,PipelineModel}(2)準(zhǔn)備數(shù)據(jù)集
//含Sprak的為一類 val training = sqlContext.createDataFrame(Seq((0L, "Spark Write applications quickly in Java, Scala, Python, R, and SQL.", 1.0),(1L, "Live and learn", 0.0),(2L, "Spark Run workloads 100x faster.", 1.0),(3L, "study hard and make progress every day", 0.0) )).toDF("id","text","label")(3)定義管道中的Tokenizer,HashingTF,LR這三個組件
//創(chuàng)建tokenizer分詞器 //setInputCol指明輸入DataFrame中的哪一列是被處理的,輸入?yún)?shù)是Dataframe中存在的列名 //setOutputCol設(shè)置新增加列的名字,及對輸入的列變換后會產(chǎn)生一個新列,該方法設(shè)置增加新列的列名 val tokenizer = new Tokenizer(). setInputCol("text"). setOutputCol("words")//創(chuàng)建hashingTF詞頻統(tǒng)計(jì),他的inputcolumn是tokenizerget出來的 //setNumFeatures設(shè)置特征值的數(shù)量 val hashingTF = new HashingTF(). setNumFeatures(1000). setInputCol(tokenizer.getOutputCol). setOutputCol("features")//創(chuàng)建邏輯回歸對象,setMaxIter設(shè)置邏輯回歸的迭代次數(shù),setRegParam設(shè)置正則化 val lr = new LogisticRegression(). setMaxIter(10).setRegParam(0.01)(4)定義管道
//創(chuàng)建管道,setStages將各個計(jì)算階段按照tokenizer,hashingTF,lr順序,pipeline是沒有安裝好的管道 val pipeline = new Pipeline(). setStages(Array(tokenizer,hashingTF,lr))//使用pipeline構(gòu)建模型,model是安裝好的管道 val model = pipeline.fit(training)(5)保存管道到磁盤
pipeline.save("/portrait/sparkML-LRpipeline") model.save("/portrait/sparkML-LRmodel")(6)加載模型
//加載保存到磁盤中模型 val model2 = PipelineModel.load("/portrait/sparkML-LRmodel")(7)準(zhǔn)備測試文檔,通過回歸預(yù)測,沒有測試集
val test = sqlContext.createDataFrame(Seq((4L, "learn Spark"),(5L, "hadoop hive"),(6L, "bigdata hdfs a"),(7L, "apache Spark") )).toDF("id","text")(8)預(yù)測結(jié)果
model.transform(test).select("id","text","probability","prediction").collect().foreach{case Row(id: Long, text: String, probability: Vector, prediction: Double) => println(s"($id, $text) -> probability=$probability, prediction=$prediction")}5 通過網(wǎng)格參數(shù)和交叉驗(yàn)證進(jìn)行模型調(diào)優(yōu)
所謂的調(diào)優(yōu)就是怎樣根據(jù)數(shù)據(jù)選擇好的模型,或者為整個模型整個管道選擇好的參數(shù),這里是關(guān)注參數(shù)的調(diào)優(yōu),模型就選擇邏輯回歸。參數(shù)調(diào)優(yōu)就是給一組參數(shù)而不是一個參數(shù),讓模型自己選擇。調(diào)優(yōu)是基于管道整體進(jìn)行調(diào)優(yōu)。
(1)準(zhǔn)備訓(xùn)練的文檔
(2)配置ML管道,包含三個stage:Tokenizer,HashingTF和LR
(3)使用ParamGridBuilder構(gòu)建一個參數(shù)網(wǎng)格
(4)使用CrossValidator來選擇模型和參數(shù),CrossValidator需要一個estimator,一個評估器參數(shù)集合,和一個evaluator
(5)運(yùn)行交叉驗(yàn)證,選擇最好的參數(shù)集
(6)準(zhǔn)備測試數(shù)據(jù)
(7)預(yù)測結(jié)果
代碼實(shí)現(xiàn)過程:
(1)引入需要的包
//用的數(shù)邏輯回歸 import org.apache.spark.ml.classification.LogisticRegression //因?yàn)樘卣鞴こ烫幚淼氖翘卣飨蛄?#xff0c;所以需要Vector,輸入輸出會用到 import org.apache.spark.ml.linalg.Vector //行對象,為了輸出美化 import org.apache.spark.sql.Row //需要分詞需要Tokenizer,需要轉(zhuǎn)換計(jì)算詞頻需要HashingTF import org.apache.spark.ml.feature.{Tokenizer,HashingTF} //需要Pipeline將多個Transformers和Estimators連接起來以確定一個ML工作流程 import org.apache.spark.ml.{Pipeline,PipelineModel} //因?yàn)槭嵌?#xff0c;所以用BinaryClassificationEvaluator評估器 import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator //使用交叉校驗(yàn)CrossValidator,把所有參數(shù)排列組合,交叉進(jìn)行校驗(yàn)。ParamGridBuilder參數(shù)網(wǎng)格 import org.apache.spark.ml.tuning.{CrossValidator,ParamGridBuilder} //需要引入SQLContext import org.apache.spark.sql.SQLContext(2)準(zhǔn)備數(shù)據(jù)
val sqlContext=new SQLContext(sc) val training = sqlContext.createDataFrame(Seq((0L, "Spark Write applications quickly in Java, Scala, Python, R, and SQL.", 1.0),(1L, "Live and learn", 0.0),(2L, "Spark Run workloads 100x faster.", 1.0),(3L, "study hard and make progress every day", 0.0),(4L, "Rdd Spark who", 1.0),(5L, "good good study", 0.0),(6L, "Spark faster", 1.0),(7L, "day day up", 0.0),(8L, "Spark program", 1.0),(9L, "hello world", 0.0),(10L, "hello Spark", 1.0),(11L, "hi how are you", 0.0) )).toDF("id","text","label")(3)構(gòu)建管道
//創(chuàng)建tokenizer分詞器 //setInputCol指明輸入DataFrame中的哪一列是被處理的,輸入?yún)?shù)是Dataframe中存在的列名 //setOutputCol設(shè)置新增加列的名字,及對輸入的列變換后會產(chǎn)生一個新列,該方法設(shè)置增加新列的列名 val tokenizer = new Tokenizer(). setInputCol("text"). setOutputCol("words") //創(chuàng)建hashingTF詞頻統(tǒng)計(jì),他的inputcolumn是tokenizerget出來的 //特征值的數(shù)量網(wǎng)格調(diào)優(yōu) val hashingTF = new HashingTF(). setInputCol(tokenizer.getOutputCol). setOutputCol("features") //創(chuàng)建邏輯回歸對象,setMaxIter設(shè)置,正則化參數(shù)網(wǎng)格調(diào)優(yōu) val lr = new LogisticRegression(). setMaxIter(10) //創(chuàng)建管道,setStages將各個計(jì)算階段按照tokenizer,hashingTF,lr順序,pipeline是沒有安裝好的管道 val pipeline = new Pipeline(). setStages(Array(tokenizer,hashingTF,lr))(4)構(gòu)建網(wǎng)格參數(shù)
//構(gòu)建網(wǎng)格參數(shù),addGrid添加網(wǎng)格,hashingTF.numFeatures設(shè)置hashingTF特征數(shù)量, val paramGrid = new ParamGridBuilder(). addGrid(hashingTF.numFeatures, Array(10,100,1000)). addGrid(lr.regParam, Array(0.1,0.01)). build()(5)創(chuàng)建交叉驗(yàn)證CrossValidator對象
//創(chuàng)建CrossValidator交叉驗(yàn)證對象,setEstimator設(shè)置評估器,setEstimatorParamMaps設(shè)置參數(shù)集,setEvaluator設(shè)置評估器,setNumFolds創(chuàng)建交叉驗(yàn)證器,他會把訓(xùn)練集分成NumFolds份,實(shí)際生產(chǎn)要比2大 val cv = new CrossValidator(). setEstimator(pipeline). setEstimatorParamMaps(paramGrid). setEvaluator(new BinaryClassificationEvaluator()). setNumFolds(2)(6)根據(jù)最優(yōu)參數(shù)構(gòu)建模型
//構(gòu)借助參數(shù)網(wǎng)格,交叉驗(yàn)證,選擇最優(yōu)的參數(shù)構(gòu)建模型 val cvModel = cv.fit(training)(7)添加測試數(shù)據(jù)
//添加測試集 val test = sqlContext.createDataFrame(Seq((12L, "learn Spark"),(13L, "hadoop hive"),(14L, "bigdata hdfs a"),(15L, "apache Spark") )).toDF("id","text")(8)預(yù)測結(jié)果
cvModel.transform(test).select("id","text","probability","prediction").collect().foreach{case Row(id: Long, text: String, probability: Vector, prediction: Double) => println(s"($id, $text) -> probability=$probability, prediction=$prediction")}?
6 通過訓(xùn)練校驗(yàn)分類來調(diào)優(yōu)模型
前面交叉驗(yàn)證是把數(shù)據(jù)分成多份,每一份把所有參數(shù)組合計(jì)算一次。而校驗(yàn)分類只需要把每一組參數(shù)計(jì)算一次,把數(shù)據(jù)自動分成訓(xùn)練集合校驗(yàn)集,這種方式依賴于比較大的數(shù)據(jù)量,如果數(shù)量不夠生成的結(jié)果是不可信任的。不像校驗(yàn)驗(yàn)證數(shù)據(jù)集小沒關(guān)系會交叉驗(yàn)證多次,所以即使數(shù)據(jù)量少但是計(jì)算多次,多次的結(jié)果足夠評估選出最優(yōu)的參數(shù)。所以TrainValidationSplit需要的數(shù)據(jù)量就要大,只會計(jì)算一次。這個例子采用線性回歸。
與CrossValidator不同,TrainValidationSplit創(chuàng)建一個(訓(xùn)練,測試)數(shù)據(jù)集對。 它使用trainRatio參數(shù)將數(shù)據(jù)集分成這兩個部分。 例如,trainRatio = 0.75,TrainValidationSplit將生成訓(xùn)練和測試數(shù)據(jù)集對,其中75%的數(shù)據(jù)用于訓(xùn)練,25%用于驗(yàn)證。
步驟:
(1)準(zhǔn)備訓(xùn)練和測試數(shù)據(jù)
(2)使用ParamGridBuilder構(gòu)建一個參數(shù)網(wǎng)格
(3)使用TrainValidationSplit來選擇模型和參數(shù),CrossValidator需要一個estimator,一個評估器參數(shù)集合,和一個evaluator
(4)運(yùn)行校驗(yàn)分類選擇最好的參數(shù)
(5)在測試數(shù)據(jù)上做測試,模型是參數(shù)組合中執(zhí)行最好的一個
//使用線性回歸求解 import org.apache.spark.ml.regression.LinearRegression 因?yàn)槭腔貧w問題,所以用RegressionEvaluator回歸評估器 import org.apache.spark.ml.evaluation.RegressionEvaluator //使用ParamGridBuilder參數(shù)網(wǎng)格和,TrainValidationSplit import org.apache.spark.ml.tuning.{TrainValidationSplit,ParamGridBuilder} //需要引入SQLContext import org.apache.spark.sql.SQLContextval = sqlContext = new SQLContext(sc) val data = sqlContext.read.format("libsvm").load("file:/data/sample_linear_regression_data.txt")//randomSplits隨機(jī)拆分,seed隨機(jī)種子 val Array(training, test) = data.randomSplit(Array(0.75, 0.25), seed=12345)//創(chuàng)建線性回歸 val lr = new LinearRegression()//elasticNetParam是Elastic net (回歸)參數(shù),取值介于0和1之間。 //fitIntercept是否允許階段,默認(rèn)是true。regParam參數(shù)定義規(guī)范化項(xiàng)的權(quán)重 val paramGrid = new ParamGridBuilder(). addGrid(lr.elasticNetParam, Array(0.0,0.5,1.0)). addGrid(lr.fitIntercept). addGrid(lr.regParam, Array(0.1,0.01)). build()//訓(xùn)練校驗(yàn)的比例setTrainRatio val trainValidationSplit = new TrainValidationSplit(). setEstimator(lr). setEstimatorParamMaps(paramGrid). setEvaluator(new RegressionEvaluator). setTrainRatio(0.8)val model = trainValidationSplit.fit(training)model.transform(test).select("features","label","prediction").show()?
總結(jié)
以上是生活随笔為你收集整理的用户画像之Spark ML实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode第 46 场双周赛
- 下一篇: R语言第四讲 之R语言数据类型