日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[机器学习] XGBoost on Spark 分布式使用完全手册

發布時間:2023/12/15 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [机器学习] XGBoost on Spark 分布式使用完全手册 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一 XGBoost分布式概述

在XGBoost設計之初,就考慮了分布式的實現。樹模型最重要的一個問題即是分割點的確定,XGBoost在單機的環境中,數據全部load進內存,feature已經按照值的大小排好序,采用一個叫做“exact greedy algorithm”算法,經過線性掃描,就可以快速的找到最佳的分割點;但是在分布式環境中,數據分布在各個節點,這種情況下,要找到最佳的分割點是很不容易的,XGBoost提供了一個近似算法來解決這個問題,近似算法的核心在于根據特征的分布來提供一組候選的分割點,至于如何保證候選分割集的有效性和理論上的完備性,不在本文的討論范圍,有興趣的讀者可以參考文獻[2]。

分布式環境中,多個節點共同工作,結果采用Allreduce的機制來同步,xgboost依賴dmlc/rabit來完成這個工作

rabit:可容錯的Allreduce庫

Allreduce是MPI提供的一個主要功能,但是MPI一般不是特別受到廣泛歡迎,原因之一就是它本身不容錯,但如果砍掉MPI的多余接口,只保留Allreduce和Broadcast,支持容錯則簡單很多。原因是Allreduce有一個很好的性質,每一個節點最后拿到的是一樣的結果,這意味著我們可以讓一些節點記住結果。當有節點掛掉重啟的時候,可以直接向還活著的節點索要結果就可以了。

?

容錯過程:

1、Node1在第二個checkpoint之后的第一次和第二次allreduce中間掛了

2、當Node1重啟,它會調用LoadCheckPoint,這樣可以從其他節點得到最近的一次CheckPoint

3、Node1從當前的CheckPoint開始跑,并進行第一次Allreduce,這時其他節點已經知道了結果并把結果告訴Node1

4、當Node1執行到第二個Allreduce,這時大家就已經完全同步上了

有了Allreduce容錯機制和近似算法確定分割點,XGBoost算法可以運行在很多已知的集群上,如MPI,Yarn...

?

二 XGBoost on Spark

由于spark等基于JVM平臺的大數據處理系統應用越來越廣泛,2016.4月 XGBoost推出了基于spark/Flink的XGBoost4J(XGBoost for JVM Platform) XGBoost4J Code Examples??

XGBoost4J的核心與XGBoost一樣,分布式實現仍然采用rabit Allreduce,但是抽象了一套Java/Scala接口,供spark平臺使用。
?

XGBoost-spark 特征處理和參數選擇與單機版本基本一致,在分布式環境中需要注意的幾個問題:

1、numWorker參數應該與executor數量設置一致,executor-cores設置為1(筆者認為的最優化的配置)

2、在train的過程中,每個partition占用的內存最好限制在executor-memory的1/3以內,因為除了本來訓練樣本需要駐留的內存外,xgboost為了速度的提升,為每個線程申請了額外的內存,并且這些內存是JVM所管理不到的

3、對于需要在集群機器上共享的資源,比如字典/庫文件等,spark提供了一套類似于hadoop distribute cached的機制來滿足需求

?

三 XGBoost-spark 實際使用

1 添加依賴

scala:2.11.8 ,? jdk:1.8, xgboost:0.81, spark:2.3

<dependencies><dependency><groupId>ml.dmlc</groupId><artifactId>xgboost4j</artifactId><version>0.81</version></dependency><dependency><groupId>ml.dmlc</groupId><artifactId>xgboost4j-example</artifactId><version>0.81</version></dependency><dependency><groupId>ml.dmlc</groupId><artifactId>xgboost4j-spark</artifactId><version>0.81</version></dependency></dependencies>

?

2. xgboost scala 代碼

package xgboostimport ml.dmlc.xgboost4j.scala.spark.XGBoostClassifierimport org.apache.log4j.{Level, Logger} import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler} import scala.collection.mutable.ArrayBufferimport org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrameobject SparkTraining {def processData(df_data: DataFrame): DataFrame = {var columns = df_data.columns.clone()var feature_columns = new ArrayBuffer[String]()for (i <- 1 until columns.length) {feature_columns += columns(i)}val stringIndexer = new StringIndexer().setInputCol("_c0").setOutputCol("class").fit(df_data)val labelTransformed = stringIndexer.transform(df_data).drop("_c0")val train_vectorAssembler = new VectorAssembler().setInputCols(feature_columns.toArray).setOutputCol("features")val xgbData = train_vectorAssembler.transform(labelTransformed).select("features", "class")return xgbData}def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)val spark = SparkSession.builder()//.master("local").appName("xgboost_spark_demo")// .config("spark.memory.fraction", 0.3)// .config("spark.shuffle.memoryFraction", 0.5).getOrCreate()//step 1: 讀取 CSV 數據val df_train = spark.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", true.toString).load("/Users/pboc_train.csv")//step 2: 處理CSV 數據var xgbTrain = processData(df_train)val df_test = spark.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", true.toString).load("/Users/pboc_test.csv")var xgbTest = processData(df_test)val xgbParam = Map("eta" -> 0.1f,"objective" -> "binary:logistic","num_round" -> 100,"num_workers" -> 4)val xgbClassifier = new XGBoostClassifier(xgbParam).setEvalMetric("auc").setMaxDepth(5).setFeaturesCol("features").setLabelCol("class")println("Start Trainning ......")val xgbClassificationModel = xgbClassifier.fit(xgbTrain)println("End Trainning ......")println("Predicting ...")val results = xgbClassificationModel.transform(xgbTest)results.show()} }

?

3. 打包并提交spark任務

使用maven 打包成 xgboostDemo.jar

spark-submit --master yarn --deploy-mode client --num-executors 3 --executor-cores 10 --executor-memory 20G --driver-memory 5G --queue demo_dm --class xgboost.SparkTraining xgboostDemo.jar

?

四 XGBoost與LR的結合

XGBoost+LR結合的思想源于facebook的研究論文[5],使用樹模型來做特征選擇,最后用LR來輸出CTR分數,充分利用了兩種模型的優點,實踐證明,XGBoost+LR離線評估和線上AB都優于單獨XGBoost。除了論文中提供的方案帶來的收益外,我們還將這種stacking的想法做了發揮,工程上單獨抽取出LR層,這樣做有如下優點:

1、對于一些類似于ID類的非連續特征,可以單獨使用LR層來承載

2、事實上很多feature extraction 強大的模型稍作處理都可以作為LR層的輸入,處理得當的話,LR還是很強大的

3、通過在LR層組合不同的特征來源,方便的做到“刻畫”和“泛化”的結合,類似于deep and wide這樣的思想

4、LR本身的優勢,適合大規模并行,online learning算法成熟等等。。。

?

參考文獻

[1]: XGBoost: A Scalable Tree Boosting System

[2]: XGBoost: A Scalable Tree Boosting System Supplementary Material

[3]: 分享一個spark xgboost可運行的實例

[4]:? XGBoost與spark在廣告排序中的應用

[5]:?關于spark的mllib學習總結(Java版)

[6]: 利用xgboost4j下的xgboost分類模型案例

[7]: xgboost之spark上運行-scala接口

[8]: Using Spark, Scala and XGBoost On The Titanic Dataset from Kaggle

[9]: Evaluation Metrics - RDD-based API

?

總結

以上是生活随笔為你收集整理的[机器学习] XGBoost on Spark 分布式使用完全手册的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。