Spark - Isotonic Regression 理论与实战
??一.引言
保序回歸又稱為單調回歸,算法中 Isotonic 意為等滲,其來源于希臘詞根, 'iso' 意為相等、'tonos' 意為擴展,表達其在常規(guī)線性回歸的基礎上做了拓展即增加了保序的限制。本文將基于 Spark?Isotonic Regression 的原理和實踐進行簡單的理論分析和代碼實戰(zhàn)。
二.Isotonic Regression 理論
1.函數形式
保序回歸屬于回歸算法家族,形式上與常規(guī)回歸類似,給定一組觀測值 y1、y2、... 和原始值 x1、x2、...,找到一個映射函數 f(x) 使得下述損失函數達到最小:
其中要求保序:
這里映射函數 f(x) 就是我們最終生成的回歸函數。
2.PAV 算法簡介
Spark.Mlib 通過 Pool Adjacent Violators Algorithm PAV 算法去近似實現保序回歸。首先有一個前提條件,即像其他線性模型一樣假定回歸模型中方差是相同的。假定每個 yi 滿足正態(tài)分布:
且滿足:
為了滿足單調性約束,我們需要將違反單調性約束的點與其相鄰的點構成一個單調序列。在序列范圍內保證其滿足同一分布,即如果存在亂序情況:
則令當前的 x 序列同屬于新的分布:
此時 f(x) 給定 x[i] 和 x[i+1] 都會返回 (y[i] + yi+1) / 2 作為預測,即 u =?(y[i] + y[i+1]) / 2。繼續(xù)向 x[i+2] 點前進,如果? y[i] + y[i+1]) / 2 > y[i+2] ,則代表當前序列依然存在亂序情況,此時吸納 y[i+2] 生成新的分布:
如果往復循環(huán)吸納序列,如果?y[i] + y[i+1]) / 2 <?y[i+2] ,則此時 x[i]、x[i+1] 同屬一個分布,?x[i+2] 構造自己的 μ = y[i+2] 的分布,并重復上述序列擴展過程,直到所有 x 的分布 y 滿足單調性。簡單來說就是將不滿足單調性的點與周圍點綁定為一個整體合并為一個分布,此時對應區(qū)間預測值也會成為一條直線。這里我們都是假定了權重 w 相同且都為1的情況,如果權重不同,則分布均值的構造過程將會變成加權平均而不是直接平均。
Tips:
在回歸任務中我們的 f(x) 給定同一個 x 返回的是同一個 y,而原始數據中可能存在同一個 x 不同 y 的情況,針對這樣的 x 我們首先需要將所有 x 的 y 值拿到,然后去 mean(y) 作為 x 的預測值進入下面的步驟。這里取平均也很好理解,因為我們采用均方差的損失函數,給定 y1、y2、...、yn,取 mean(y) 時損失函數最小。
3.算法圖例
下圖展示了 PAV?算法的序列構成與吸納成同分布的方法。
通過回歸損失函數與單調性要求,獲得的 f(x) 最終為分段函數形式。
三.Isotonic Regression 實戰(zhàn)?
1.模擬亂序數據
此處我們模擬 y = 10 * x 的回歸問題,并在隨機范圍內生成亂序數據。
// 構造亂序數據val dataBuffer = new ArrayBuffer[(Double, Double)]()val random = new Random()(0 to 10000).foreach(num => {val x = random.nextDouble()val y = if (random.nextDouble() < 0.1) {x * 10 - random.nextDouble() * 2} else {x * 10}dataBuffer.append((y, x))})// 劃分訓練、預測集val splits = sc.parallelize(dataBuffer).map(data => {(data._1, data._2, 1.0)}).randomSplit(Array(0.6, 0.4))val training = splits(0)val test = splits(1)2.模型訓練
劃分訓練集預測集并訓練,setIsotonic 為 True 即要求模型保序。
// 劃分訓練、預測集val splits = sc.parallelize(dataBuffer).map(data => {(data._1, data._2, 1.0)}).randomSplit(Array(0.6, 0.4))val training = splits(0)val test = splits(1)// 模型訓練val model = new IsotonicRegression().setIsotonic(true).run(training)3.模型評估
使用原始 label 與預測值計算 MSE。
// 預測值與真實值val predictionAndLabel = test.map { point =>val predictedLabel = model.predict(point._2)(predictedLabel, point._1)}// 計算 MSEval meanSquaredError = predictionAndLabel.map { case (p, l) => math.pow((p - l), 2) }.mean()println(s"Mean Squared Error = $meanSquaredError")4.模型保存與加載
通過 .save 方法保存模型,并通過 .load 方法加載模型。
// Save and load model model.save(sc, "target/tmp/myIsotonicRegressionModel") val sameModel = IsotonicRegressionModel.load(sc, "target/tmp/myIsotonicRegressionModel")Tips:?完整代碼
val conf = (new SparkConf).setAppName("IsotonicLR").setMaster("local[*]")val spark = SparkSession.builder.config(conf).getOrCreate()val sc = spark.sparkContext// 構造亂序數據val dataBuffer = new ArrayBuffer[(Double, Double)]()val random = new Random()(0 to 10000).foreach(num => {val x = random.nextDouble()val y = if (random.nextDouble() < 0.1) {x * 10 - random.nextDouble() * 2} else {x * 10}dataBuffer.append((y, x))})// 劃分訓練、預測集val splits = sc.parallelize(dataBuffer).map(data => {(data._1, data._2, 1.0)}).randomSplit(Array(0.6, 0.4))val training = splits(0)val test = splits(1)// 模型訓練val model = new IsotonicRegression().setIsotonic(true).run(training)// 預測值與真實值val predictionAndLabel = test.map { point =>val predictedLabel = model.predict(point._2)(predictedLabel, point._1)}// 計算 MSEval meanSquaredError = predictionAndLabel.map { case (p, l) => math.pow((p - l), 2) }.mean()println(s"Mean Squared Error = $meanSquaredError")四.Isotonic Regression 應用?
1.自定義加載模型
實際場景下,Spark 訓練得到的?Isotonic Regression 模型加載需要傳入 SC 即 SparkContext:
但如果我們是在 Flink 環(huán)境下使用,就不太好初始化 SparkContext 了,這里我們主要到 sc 主要負責讀取源數據,而保序回歸的源數據其實很簡單,兩個 Array[Double] 一個為 Boundaries 邊界一個是 Predictions 預測 以及標識 isotonic 的 Boolean,所以我們可以通過 Redis 保存這兩個數組,在自己的任務中直接 new IsotonicRegressionModel 即可:
val selfModel = new IsotonicRegressionModel(boundaries, predictions, isotonic)這樣的好處是避免了 sc 的限制,且改模型支持序列化,所以廣播也沒有問題。
2.自定義代碼測試
val boundaries = model.boundariesval predictions = model.predictionsprintln(boundaries.mkString(","))println(predictions.mkString(","))val isotonic = trueval selfModel = new IsotonicRegressionModel(boundaries, predictions, isotonic)val predictionAndLabelSelf = test.map { point =>val predictedLabel = selfModel.predict(point._2)(predictedLabel, point._1)}// Calculate mean squared error between predicted and real labels.val meanSquaredErrorSelf = predictionAndLabelSelf.map { case (p, l) => math.pow((p - l), 2) }.mean()println(s"Mean Squared Error Self = $meanSquaredError")這里使用剛才訓練模型的?Boundaries 和?Predictions 直接構造保序回歸模型,計算得到的預測結果和 MES 與 load 得到的模型是相同的。這里實戰(zhàn)的話把數組存在 Redis 等存儲介質即可。
3.自定義模型預測
下面是官方 predict 的源碼,思路也很清晰,首先使用二分法判斷當前測試數據即 x 是否在 boundaries 中,隨后根據數據分四種情況計算:
- 小于所有邊界:取 Predictions.head?
- 大于所有邊界: 取 Predictions.last
- 找不到索引: 執(zhí)行?linearInterpolation 線性插值
- 找到索引: 直接去 Predictions[Index]
有興趣的同學可以把源碼 copy 下來自己調用看看~
五.總結
保序回歸經典的應用案例就是藥物使用量試驗上,假設藥物用量與病人反應成正單調,但如過按照藥物反應排序時藥物用量亂序則不好評估用藥量。在這種情況下,使用保序回歸,即不改變 X 的排列順序,又求的 Y 的平均值狀況。例如模型預測后用藥量在 20-30 時預測值相同,則我們從經濟以及病人抗藥性等因素的考慮,可以認為 20 的量是理想的。
除此之外,保序回歸也可以應用于點擊率預估的矯正,因為我們的先驗認知是模型的預測分和其對應的真實點擊率應該成正比,基于這個先驗認知我們也可以通過保序回歸矯正 CTR。
參考:
Spark Isotonic regression DOCS
Predicting Good Probabilities With Supervised Learning
Isotonic Regression Research And Process
總結
以上是生活随笔為你收集整理的Spark - Isotonic Regression 理论与实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【WZOI】2019愚人节比赛题目分析
- 下一篇: 关于拼板邮票孔制作规范