[机器学习] XGBoost on Spark 分布式使用完全手册
一 XGBoost分布式概述
在XGBoost設計之初,就考慮了分布式的實現。樹模型最重要的一個問題即是分割點的確定,XGBoost在單機的環境中,數據全部load進內存,feature已經按照值的大小排好序,采用一個叫做“exact greedy algorithm”算法,經過線性掃描,就可以快速的找到最佳的分割點;但是在分布式環境中,數據分布在各個節點,這種情況下,要找到最佳的分割點是很不容易的,XGBoost提供了一個近似算法來解決這個問題,近似算法的核心在于根據特征的分布來提供一組候選的分割點,至于如何保證候選分割集的有效性和理論上的完備性,不在本文的討論范圍,有興趣的讀者可以參考文獻[2]。
分布式環境中,多個節點共同工作,結果采用Allreduce的機制來同步,xgboost依賴dmlc/rabit來完成這個工作
rabit:可容錯的Allreduce庫
Allreduce是MPI提供的一個主要功能,但是MPI一般不是特別受到廣泛歡迎,原因之一就是它本身不容錯,但如果砍掉MPI的多余接口,只保留Allreduce和Broadcast,支持容錯則簡單很多。原因是Allreduce有一個很好的性質,每一個節點最后拿到的是一樣的結果,這意味著我們可以讓一些節點記住結果。當有節點掛掉重啟的時候,可以直接向還活著的節點索要結果就可以了。
?
容錯過程:
1、Node1在第二個checkpoint之后的第一次和第二次allreduce中間掛了
2、當Node1重啟,它會調用LoadCheckPoint,這樣可以從其他節點得到最近的一次CheckPoint
3、Node1從當前的CheckPoint開始跑,并進行第一次Allreduce,這時其他節點已經知道了結果并把結果告訴Node1
4、當Node1執行到第二個Allreduce,這時大家就已經完全同步上了
有了Allreduce容錯機制和近似算法確定分割點,XGBoost算法可以運行在很多已知的集群上,如MPI,Yarn...
?
二 XGBoost on Spark
由于spark等基于JVM平臺的大數據處理系統應用越來越廣泛,2016.4月 XGBoost推出了基于spark/Flink的XGBoost4J(XGBoost for JVM Platform) XGBoost4J Code Examples??
XGBoost4J的核心與XGBoost一樣,分布式實現仍然采用rabit Allreduce,但是抽象了一套Java/Scala接口,供spark平臺使用。
?
XGBoost-spark 特征處理和參數選擇與單機版本基本一致,在分布式環境中需要注意的幾個問題:
1、numWorker參數應該與executor數量設置一致,executor-cores設置為1(筆者認為的最優化的配置)
2、在train的過程中,每個partition占用的內存最好限制在executor-memory的1/3以內,因為除了本來訓練樣本需要駐留的內存外,xgboost為了速度的提升,為每個線程申請了額外的內存,并且這些內存是JVM所管理不到的
3、對于需要在集群機器上共享的資源,比如字典/庫文件等,spark提供了一套類似于hadoop distribute cached的機制來滿足需求
?
三 XGBoost-spark 實際使用
1 添加依賴
scala:2.11.8 ,? jdk:1.8, xgboost:0.81, spark:2.3
<dependencies><dependency><groupId>ml.dmlc</groupId><artifactId>xgboost4j</artifactId><version>0.81</version></dependency><dependency><groupId>ml.dmlc</groupId><artifactId>xgboost4j-example</artifactId><version>0.81</version></dependency><dependency><groupId>ml.dmlc</groupId><artifactId>xgboost4j-spark</artifactId><version>0.81</version></dependency></dependencies>?
2. xgboost scala 代碼
package xgboostimport ml.dmlc.xgboost4j.scala.spark.XGBoostClassifierimport org.apache.log4j.{Level, Logger} import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler} import scala.collection.mutable.ArrayBufferimport org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrameobject SparkTraining {def processData(df_data: DataFrame): DataFrame = {var columns = df_data.columns.clone()var feature_columns = new ArrayBuffer[String]()for (i <- 1 until columns.length) {feature_columns += columns(i)}val stringIndexer = new StringIndexer().setInputCol("_c0").setOutputCol("class").fit(df_data)val labelTransformed = stringIndexer.transform(df_data).drop("_c0")val train_vectorAssembler = new VectorAssembler().setInputCols(feature_columns.toArray).setOutputCol("features")val xgbData = train_vectorAssembler.transform(labelTransformed).select("features", "class")return xgbData}def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)val spark = SparkSession.builder()//.master("local").appName("xgboost_spark_demo")// .config("spark.memory.fraction", 0.3)// .config("spark.shuffle.memoryFraction", 0.5).getOrCreate()//step 1: 讀取 CSV 數據val df_train = spark.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", true.toString).load("/Users/pboc_train.csv")//step 2: 處理CSV 數據var xgbTrain = processData(df_train)val df_test = spark.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", true.toString).load("/Users/pboc_test.csv")var xgbTest = processData(df_test)val xgbParam = Map("eta" -> 0.1f,"objective" -> "binary:logistic","num_round" -> 100,"num_workers" -> 4)val xgbClassifier = new XGBoostClassifier(xgbParam).setEvalMetric("auc").setMaxDepth(5).setFeaturesCol("features").setLabelCol("class")println("Start Trainning ......")val xgbClassificationModel = xgbClassifier.fit(xgbTrain)println("End Trainning ......")println("Predicting ...")val results = xgbClassificationModel.transform(xgbTest)results.show()} }?
3. 打包并提交spark任務
使用maven 打包成 xgboostDemo.jar
spark-submit --master yarn --deploy-mode client --num-executors 3 --executor-cores 10 --executor-memory 20G --driver-memory 5G --queue demo_dm --class xgboost.SparkTraining xgboostDemo.jar?
四 XGBoost與LR的結合
XGBoost+LR結合的思想源于facebook的研究論文[5],使用樹模型來做特征選擇,最后用LR來輸出CTR分數,充分利用了兩種模型的優點,實踐證明,XGBoost+LR離線評估和線上AB都優于單獨XGBoost。除了論文中提供的方案帶來的收益外,我們還將這種stacking的想法做了發揮,工程上單獨抽取出LR層,這樣做有如下優點:
1、對于一些類似于ID類的非連續特征,可以單獨使用LR層來承載
2、事實上很多feature extraction 強大的模型稍作處理都可以作為LR層的輸入,處理得當的話,LR還是很強大的
3、通過在LR層組合不同的特征來源,方便的做到“刻畫”和“泛化”的結合,類似于deep and wide這樣的思想
4、LR本身的優勢,適合大規模并行,online learning算法成熟等等。。。
?
參考文獻
[1]: XGBoost: A Scalable Tree Boosting System
[2]: XGBoost: A Scalable Tree Boosting System Supplementary Material
[3]: 分享一個spark xgboost可運行的實例
[4]:? XGBoost與spark在廣告排序中的應用
[5]:?關于spark的mllib學習總結(Java版)
[6]: 利用xgboost4j下的xgboost分類模型案例
[7]: xgboost之spark上運行-scala接口
[8]: Using Spark, Scala and XGBoost On The Titanic Dataset from Kaggle
[9]: Evaluation Metrics - RDD-based API
?
總結
以上是生活随笔為你收集整理的[机器学习] XGBoost on Spark 分布式使用完全手册的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: csgo开箱网站都有哪些_csgo官方承
- 下一篇: 配对t检验的应用条件是什么_配对t检验在