spark.mllib:回归算法
Spark實(shí)現(xiàn)了三類線性回歸方法:
1、LinearRegression:普通線性回歸模型
2、LassoRegression:加L1正則化的線性回歸
3、RidgeRegression:加L2正則化的線性回歸
Spark采用了模型和訓(xùn)練分離定義的方式,模型和模型的迭代計(jì)算都很清晰:
如LinearRegressionModel和LinearRegressionWithSGD,LassoModel和LassoWithSGD,RidgeRegressionModel和RidgeRegressionWithSGD。其中Model繼承自GeneralizedLinearModel和RegressionModel,為了便于模型的保存和輸出,還繼承了Saveable、Loader和PMMLExportable類,XXXWithSGD繼承自GeneralizedLinearAlgorithm,并實(shí)現(xiàn)來模型訓(xùn)練的train方法其通過調(diào)用父類GeneralizedLinearAlgorithm的run方法來實(shí)現(xiàn)模型參數(shù)求解的邏輯。
LinearRegression(普通線性回歸模型)
三類線性回歸模型的實(shí)現(xiàn)都大同小異,在此以普通的線性回歸LinearRegressionModel和LinearRegressionWithSGD為例來說明。LinearRegressionModel繼承了大量的類,但本身實(shí)現(xiàn)比較簡(jiǎn)單,即覆寫來父類的predictPoint、save和load方法。代碼簡(jiǎn)單,在此不述。
RidgeRegressionWithSGD繼承了GeneralizedLinearAlgorithm類,其主要實(shí)現(xiàn)了一個(gè)方法train,并定義來自己的Gradient類型和Updater類型為模型訓(xùn)練做準(zhǔn)備,另外train實(shí)現(xiàn)了重載:
def train(
? ? input: RDD[LabeledPoint],
? ? numIterations: Int,
? ? stepSize: Double,
? ? regParam: Double,
? ? miniBatchFraction: Double,
? ? initialWeights: Vector): RidgeRegressionModel = {
? new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(
? ? input, initialWeights)
}
def train(
? ? input: RDD[LabeledPoint],
? ? numIterations: Int,
? ? stepSize: Double,
? ? regParam: Double,
? ? miniBatchFraction: Double): RidgeRegressionModel = {
? new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
}
def train(
? ? input: RDD[LabeledPoint],
? ? numIterations: Int,
? ? stepSize: Double,
? ? regParam: Double): RidgeRegressionModel = {
? train(input, numIterations, stepSize, regParam, 1.0)
}
def train(
? ? input: RDD[LabeledPoint],
? ? numIterations: Int): RidgeRegressionModel = {
? train(input, numIterations, 1.0, 0.01, 1.0)
}
不同train方法的區(qū)別主要是初始化參數(shù)值,從這里也可以看到Spark使用來哪些默認(rèn)的參數(shù)值進(jìn)行模型的初始化。train方法內(nèi)部調(diào)用了父類的run方法。
我們?cè)賮砜纯锤割怗eneralizedLinearAlgorithm的run方法干來啥?
run方法首先進(jìn)行了特征值的 Scaling,這里對(duì)特征值的方差進(jìn)行來歸一化:
//run方法的特征值Scaling過程
val scaler = if (useFeatureScaling) {
? new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))
} else {
? null
}
?
// Prepend an extra variable consisting of all 1.0's for the intercept.
// TODO: Apply feature scaling to the weight vector instead of input data.
val data =
? if (addIntercept) {
? ? if (useFeatureScaling) {
? ? ? input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
? ? } else {
? ? ? input.map(lp => (lp.label, appendBias(lp.features))).cache()
? ? }
? } else {
? ? if (useFeatureScaling) {
? ? ? input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
? ? } else {
? ? ? input.map(lp => (lp.label, lp.features))
? ? }
? }
特征值的 Scaling過程是由用戶決定是否需要Scaling,一般來說,用戶可以在數(shù)據(jù)預(yù)處理的步驟中進(jìn)行特征值的Scaling,也可以交給Spark在這里進(jìn)行。關(guān)于
為什么要做特征值的Scaling,在知乎上看到的一個(gè)圖片能很好的說明問題:
沒有進(jìn)過歸一化,尋找最優(yōu)解的過程
經(jīng)過歸一化,把各個(gè)特征的尺度控制在相同的范圍內(nèi):
另外,本人也有3遍介紹歸一化的博文:數(shù)據(jù)預(yù)處理之歸一化、機(jī)器學(xué)習(xí)中的歸一化方法、時(shí)間序列的歸一化方法、也可以看看來自知乎的問答,結(jié)合具體的機(jī)器學(xué)習(xí)算法,還有很多特定的特征Scaling方法。
說完特征值的Scaling,再回過頭來看run方法。run方法除了特征值的Scaling外,還做來一些訓(xùn)練數(shù)據(jù)的整理、模型參數(shù)初始化的過程,之后調(diào)用了Optimizer類實(shí)例來求解模型參數(shù)并在最后調(diào)用createModel方法返回一個(gè)RegressionModel:
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
//val intercept = 這里省略了一些代碼
//var weights =
createModel(weights, intercept)
總結(jié),Spark模型和訓(xùn)練算法模塊分離,對(duì)模型應(yīng)用還是訓(xùn)練來說,都是算法思路清晰、模塊算法低耦合的特點(diǎn),同時(shí),對(duì)算法開發(fā)人員也比較友好,可以單獨(dú)實(shí)現(xiàn)自己的優(yōu)化算法或者單獨(dú)實(shí)現(xiàn)上層的模型。
————————————————
版權(quán)聲明:本文為CSDN博主「大愚若智_」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/zbc1090549839/article/details/65437345
總結(jié)
以上是生活随笔為你收集整理的spark.mllib:回归算法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 胆囊炎能不能吃鹅蛋
- 下一篇: spark.mllib:NaiveBay