深入理解XGBoost:分布式实现
? ? ??
本文將重點(diǎn)介紹XGBoost基于Spark平臺(tái)Scala版本的實(shí)現(xiàn),帶領(lǐng)大家逐步完成特征提取、變換和選擇、XGBoost模型訓(xùn)練、Pipelines、模型選擇。
XGBoost回顧
? ? ? ?
? ? ? ?XGBoost(Extreme Gradient Boosting)由華盛頓大學(xué)的陳天奇博士提出,最開始作為分布式(深度)機(jī)器學(xué)習(xí)研究社區(qū)(DMLC)小組的研究項(xiàng)目之一。后因在希格斯(Higgs)機(jī)器學(xué)習(xí)挑戰(zhàn)賽中大放異彩,被業(yè)界所熟知,在數(shù)據(jù)科學(xué)應(yīng)用中廣泛應(yīng)用。目前,一些主流的互聯(lián)網(wǎng)公司如騰訊、阿里巴巴等都已將XGBoost應(yīng)用到其業(yè)務(wù)中,在各種數(shù)據(jù)科學(xué)競(jìng)賽中XGBoost也成為競(jìng)賽者們奪冠的利器。XGBoost在推薦、搜索排序、用戶行為預(yù)測(cè)、點(diǎn)擊率預(yù)測(cè)、產(chǎn)品分類等問題上取得了良好的效果。雖然這些年神經(jīng)網(wǎng)絡(luò)(尤其是深度神經(jīng)網(wǎng)絡(luò))變得越來越流行,但XGBoost仍舊在訓(xùn)練樣本有限、訓(xùn)練時(shí)間短、、調(diào)參知識(shí)缺乏的場(chǎng)景下具有獨(dú)特的優(yōu)勢(shì)。相比深度神經(jīng)網(wǎng)絡(luò),XGBoost能夠更好地處理表格數(shù)據(jù),并具有更強(qiáng)的可解釋性,另外具有易于調(diào)參、輸入數(shù)據(jù)不變性等優(yōu)勢(shì)。
? ? ? ?XGBoost是Gradient Boosting的實(shí)現(xiàn),相比其他實(shí)現(xiàn)方法,XGBoost做了很多優(yōu)化,在模型訓(xùn)練速度和精度上都有明顯提升,其優(yōu)良特性如下。
1)將正則項(xiàng)加入目標(biāo)函數(shù)中,控制模型的復(fù)雜度,防止過擬合。
2)對(duì)目標(biāo)函數(shù)進(jìn)行二階泰勒展開,同時(shí)用到了一階導(dǎo)數(shù)和二階導(dǎo)數(shù)。
3)實(shí)現(xiàn)了可并行的近似直方圖算法。
4)實(shí)現(xiàn)了縮減和列采樣(借鑒了GBDT和隨機(jī)森林)。
5)實(shí)現(xiàn)了快速直方圖算法,引入了基于loss-guide的樹構(gòu)建方法(借鑒了LightGBM)。
6)實(shí)現(xiàn)了求解帶權(quán)值的分位數(shù)近似算法(weighted quantile sketch)。
7)可根據(jù)樣本自動(dòng)學(xué)習(xí)缺失值的分裂方向,進(jìn)行缺失值處理。
8)數(shù)據(jù)預(yù)先排序,并以塊(block)的形式保存,有利于并行計(jì)算。
9)采用緩存感知訪問、外存塊計(jì)算等方式提高數(shù)據(jù)訪問和計(jì)算效率。
10)基于Rabit實(shí)現(xiàn)分布式計(jì)算,并集成于主流大數(shù)據(jù)平臺(tái)中。
11)除CART作為基分類器外,XGBoost還支持線性分類器及LambdaMART排序模型等算法。
12)實(shí)現(xiàn)了DART,引入Dropout技術(shù)。
? ? ? ?目前已經(jīng)有越來越多的開發(fā)人員為XGBoost開源社區(qū)做出了貢獻(xiàn)。XGBoost實(shí)現(xiàn)了多種語言的包,如Python、Scala、Java等。Python用戶可將XGBoost與scikit-learn集成,實(shí)現(xiàn)更為高效的機(jī)器學(xué)習(xí)應(yīng)用。另外,XGBoost集成到了Spark、Flink等主流大數(shù)據(jù)平臺(tái)中。
分布式XGBoost
? ? ? ?也許在競(jìng)賽中我們很少或者從不使用分布式XGBoost版本,可是在工業(yè)界爆炸式增長(zhǎng)的數(shù)據(jù)規(guī)模,單機(jī)模式是很難滿足用戶需求,XGBoost也相應(yīng)推出了分布式版本,這也是XGBoost如此流行的重要原因。本文將重點(diǎn)介紹XGBoost基于Spark平臺(tái)的實(shí)現(xiàn),帶領(lǐng)大家逐步完成Spark版本的特征提取、變換和選擇,以及XGBoost模型訓(xùn)練、Pipelines、模型選擇。
1.基于Spark平臺(tái)實(shí)現(xiàn)
? ? ? Spark是一個(gè)通用且高效的大數(shù)據(jù)處理引擎,它是基于內(nèi)存的大數(shù)據(jù)并行計(jì)算框架。因?yàn)镾park計(jì)算基于內(nèi)存,因此能夠保證大數(shù)據(jù)計(jì)算的實(shí)時(shí)性,相比傳統(tǒng)的Hadoop MapReduce效率提升很多。Spark擁有一個(gè)豐富的生態(tài)環(huán)境,以Spark為核心,涵蓋支持:結(jié)構(gòu)化數(shù)據(jù)查詢與分析的Spark SQL、分布式機(jī)器學(xué)習(xí)庫(kù)MLlib、并行圖計(jì)算框架GraphX、可容錯(cuò)流計(jì)算框架Spark Streaming等。由于Spark在工業(yè)界廣泛應(yīng)用,用戶群體龐大,因此XGBoost推出了XGBoost4J-Spark以支持Spark平臺(tái)。
1.1?Spark架構(gòu)
如圖1所示,Spark主要由如下組件構(gòu)成。
Client:提交Spark job的客戶端。
Driver:接受Spark job請(qǐng)求,啟動(dòng)SparkContext。
SparkContext:整個(gè)應(yīng)用的上下文,可以控制應(yīng)用的生命周期。
ClusterManager:集群管理器,為Application分配資源,包括多種類型,如Spark自帶的Standalone、Meso或者YARN等。
Worker:集群中任意可執(zhí)行Application代碼的節(jié)點(diǎn),運(yùn)行一個(gè)或者多個(gè)Executor。
Executor:在Worker節(jié)點(diǎn)中提交Application的進(jìn)程,啟動(dòng)并運(yùn)行任務(wù),負(fù)責(zé)將數(shù)據(jù)存于內(nèi)存或者硬盤中。每個(gè)Application均有各自的Executor執(zhí)行任務(wù)。
? ? ? ?由圖1可知,Spark作業(yè)提交流程如下:首先Client提交應(yīng)用,Driver接收到請(qǐng)求后,啟動(dòng)SparkContext。SparkContext連接ClusterManager,ClusterManager負(fù)責(zé)為應(yīng)用分配資源。Spark將在集群節(jié)點(diǎn)中獲取到執(zhí)行任務(wù)的Executor,這些Executor負(fù)責(zé)執(zhí)行計(jì)算和存儲(chǔ)數(shù)據(jù)。Spark將應(yīng)用程序的代碼發(fā)送給Executor,最后SparkContext將任務(wù)分配給Executor去執(zhí)行。
圖1 Spark結(jié)構(gòu)
? ? ? ?在Spark應(yīng)用中,整個(gè)執(zhí)行流程在邏輯上會(huì)轉(zhuǎn)化為RDD(Resilient Distributed Dataset,彈性分布式數(shù)據(jù)集)的DAG(Directed Acyclic Graph,有向無環(huán)圖)。RDD是Spark的基本運(yùn)算單元,后續(xù)會(huì)詳細(xì)介紹。Spark將任務(wù)轉(zhuǎn)化為DAG形式的工作流進(jìn)行調(diào)度,并進(jìn)行分布式分發(fā)。圖2通過示例展示了Spark執(zhí)行DAG的整個(gè)流程。
圖2?Spark執(zhí)行DAG的整個(gè)流程
? ? ? ?在圖2中,Transformations是RDD的一類操作,包括map、flatMap、filter等,該類操作是延遲執(zhí)行的,即從一個(gè)RDD轉(zhuǎn)化為另一個(gè)RDD不立即執(zhí)行,而只是將操作記錄下來,直到遇到Actions類的操作才會(huì)真正啟動(dòng)計(jì)算過程進(jìn)行計(jì)算。Actions類操作會(huì)返回結(jié)果或?qū)DD數(shù)據(jù)寫入存儲(chǔ)系統(tǒng),是觸發(fā)Spark啟動(dòng)計(jì)算的動(dòng)因。Action算子觸發(fā)后,將所有記錄的算子生成一個(gè)RDD,Spark根據(jù)RDD之間的依賴關(guān)系將任務(wù)切分為不同的階段(stage),然后由調(diào)度器調(diào)度RDD中的任務(wù)進(jìn)行計(jì)算。圖2中的A~E分別代表不同的RDD,RDD中的方塊代表不同的分區(qū)。Spark首先通過HDFS將數(shù)據(jù)讀入內(nèi)存,形成RDD A和RDD C。RDD A轉(zhuǎn)化為RDD B,RDD C執(zhí)行map操作轉(zhuǎn)化為RDD D,RDD B和RDD E執(zhí)行join操作轉(zhuǎn)化為RDD F。RDD B和RDD E連接轉(zhuǎn)化為RDD F的過程中會(huì)執(zhí)行Shuffle操作,最后RDD F通過函數(shù)saveAsSequenceFile輸出并保存到HDFS上。
1.2?RDD
? ? ??
? ? ? ?Spark引入了RDD概念,RDD是分布式內(nèi)存數(shù)據(jù)的抽象,是一個(gè)容錯(cuò)的、并行的數(shù)據(jù)結(jié)構(gòu),是Spark中基本的數(shù)據(jù)結(jié)構(gòu),所有計(jì)算均基于該結(jié)構(gòu)進(jìn)行,Spark通過RDD和RDD操作設(shè)計(jì)上層算法。
? ? ? RDD作為數(shù)據(jù)結(jié)構(gòu),本質(zhì)上是一個(gè)只讀的分區(qū)記錄的集合,邏輯上可以把它想象成一個(gè)分布式數(shù)組,數(shù)組中的元素可以為任意的數(shù)據(jù)結(jié)構(gòu)。一個(gè)RDD可以包含多個(gè)分區(qū),每個(gè)分區(qū)都是數(shù)據(jù)集的一個(gè)子集。RDD可以相互依賴,通過依賴關(guān)系形成Spark的調(diào)度順序,通過RDD的操作形成整個(gè)Spark程序。
RDD有兩種操作算子:轉(zhuǎn)換(transformation)與行動(dòng)(actions)。
1. 轉(zhuǎn)換
轉(zhuǎn)換操作是延遲執(zhí)行的,即從一個(gè)RDD轉(zhuǎn)化為另一個(gè)RDD,且不立即執(zhí)行,而只是將操作記錄下來,直到遇到Actions類的操作才會(huì)真正啟動(dòng)計(jì)算過程。轉(zhuǎn)換操作包括map、flatMap、mapPartitions等多種操作,下面對(duì)常用的轉(zhuǎn)換操作進(jìn)行介紹。
map:對(duì)原始RDD中的每個(gè)元素執(zhí)行一個(gè)用戶自定義函數(shù)生成一個(gè)新的RDD。任何原始RDD中的元素在新的RDD中有且只有一個(gè)元素與之對(duì)應(yīng)。
flatMap:與map類似,原始RDD中的元素通過函數(shù)生成新的元素,并將生成的RDD的每個(gè)集合中的元素合并為一個(gè)集合。
mapPartitions:獲取每個(gè)分區(qū)的迭代器,在函數(shù)中對(duì)整個(gè)迭代器的元素(即整個(gè)分區(qū)的元素)進(jìn)行操作。
union:將兩個(gè)RDD合并,合并后不進(jìn)行去重操作,保留所有元素。使用該操作的前提是需要保證RDD元素的數(shù)據(jù)類型相同。
filter:對(duì)元素進(jìn)行過濾,對(duì)每個(gè)元素應(yīng)用函數(shù),返回值為True的元素被保留。
sample:對(duì)RDD中的元素進(jìn)行采樣,獲取所有元素的子集。
cache:將RDD元素從磁盤緩存到內(nèi)存,相當(dāng)于persist(MEMORY_ONLY)。
persist:對(duì)RDD數(shù)據(jù)進(jìn)行緩存,由參數(shù)StorageLevel決定數(shù)據(jù)緩存到哪里,如DISK_ONLY表示僅磁盤緩存、MEMORY_AND_DISK表示內(nèi)存和磁盤均緩存等。
groupBy:將RDD中元素通過函數(shù)生成相應(yīng)的key,然后通過key對(duì)元素進(jìn)行分組。
reduceByKey:將數(shù)據(jù)中每個(gè)key對(duì)應(yīng)的多個(gè)value進(jìn)行用戶自定義的規(guī)約操作。
join:相當(dāng)于SQL中的內(nèi)連接,返回兩個(gè)RDD以key作為連接條件的內(nèi)連接。
2. 行動(dòng)
? ? ? ?行動(dòng)操作會(huì)返回結(jié)果或?qū)DD數(shù)據(jù)寫入存儲(chǔ)系統(tǒng),是觸發(fā)Spark啟動(dòng)計(jì)算的動(dòng)因。行動(dòng)操作包括foreach、collect等。下面對(duì)常用的行動(dòng)操作進(jìn)行介紹。
foreach:對(duì)RDD中每個(gè)元素都調(diào)用用戶自定義函數(shù)操作,返回Unit。
collect:對(duì)于分布式RDD,返回一個(gè)scala中的Array數(shù)組。
count:返回RDD中元素的個(gè)數(shù)。
saveAsTextFile:將數(shù)據(jù)以文本的形式存儲(chǔ)到HDFS的指定目錄。
? ? ? ?DataSet是分布式的數(shù)據(jù)集合,它是在Spark 1.6之后新增的一個(gè)接口,其不但具有RDD的優(yōu)點(diǎn),而且同時(shí)具有Spark SQL優(yōu)化執(zhí)行引擎的優(yōu)勢(shì)。DataFrame是一個(gè)具有列名的分布式數(shù)據(jù)集,可以近似看作關(guān)系數(shù)據(jù)庫(kù)中的表,但DataFrame可以從多種數(shù)據(jù)源進(jìn)行構(gòu)建,如結(jié)構(gòu)化數(shù)據(jù)文件、Hive中的表、RDD等。DataFrame API可以在Scala、Java、Python和R中使用。下面只介紹幾個(gè)常用的API(更多API可以參考相關(guān)資料[插圖])。
select(cols:Column*):選取滿足表達(dá)式的列,返回一個(gè)新的DataFrame。其中,cols為列名或表達(dá)式的列表。
filter(condition:Column):通過給定條件過濾行。
count():返回DataFrame行數(shù)。
describe(cols:String*):計(jì)算數(shù)值型列的統(tǒng)計(jì)信息,包括數(shù)量、均值、標(biāo)準(zhǔn)差、最小值、最大值。
groupBy(cols:Column*):通過指定列進(jìn)行分組,分組后可通過聚合函數(shù)對(duì)數(shù)據(jù)進(jìn)行聚合。
join(right:Dataset[_]):和另一個(gè)DataFrame進(jìn)行join操作。
withColumn(colName:String,col:Column):添加列或者替換具有相同名字的列,返回新的DataFrame。
1.3?XGBoost4J-Spark
? ? ?
? ? ? 隨著Spark在工業(yè)界的廣泛應(yīng)用,積累了大量的用戶,越來越多的企業(yè)以Spark為核心構(gòu)建自己的數(shù)據(jù)平臺(tái)來支持挖掘分析類計(jì)算、交互式實(shí)時(shí)查詢計(jì)算,于是XGBoost4J-Spark應(yīng)運(yùn)而生。本節(jié)將介紹如何通過Spark實(shí)現(xiàn)機(jī)器學(xué)習(xí),如何將XGBoost4J-Spark很好地應(yīng)用于Spark機(jī)器學(xué)習(xí)處理的流水線中。
? ? ? XGBoost4J-Spark在jvm-package中實(shí)現(xiàn),因此在工程中調(diào)用XGBoost4J時(shí),只需在pom.xml文件中加入如下依賴即可:
<dependency><groupId>ml.dmlc</groupId><artifactId>xgboost4j-spark</artifactId><version>0.7</version> </dependency>? ? ? ?圖3展示了如何將XGBoost4J-Spark應(yīng)用于Spark機(jī)器學(xué)習(xí)處理的流水線框架中。首先通過Spark將數(shù)據(jù)加載為RDD、DataFrame或DataSet。如果加載類型為DataFrame/DataSet,則可通過Spark SQL對(duì)其進(jìn)行進(jìn)一步處理,如去掉某些指定的列等。由Spark MLlib庫(kù)完成特征工程,其提供了多種特征工程的方法供用戶選擇,此步驟是機(jī)器學(xué)習(xí)過程中非常重要的一步,因?yàn)楹玫奶卣骺梢詻Q定機(jī)器學(xué)習(xí)的上限。特征工程完成后,便可將生成的訓(xùn)練數(shù)據(jù)送入XGBoost4J-Spark中進(jìn)行訓(xùn)練,在此過程中可通過Spark MLlib進(jìn)行參數(shù)調(diào)優(yōu),得到最優(yōu)模型。得到訓(xùn)練模型后對(duì)預(yù)測(cè)集進(jìn)行預(yù)測(cè),最終得到預(yù)測(cè)結(jié)果。為了避免每次重復(fù)的訓(xùn)練模型,可將訓(xùn)練好的模型保存下來,在使用時(shí)直接加載即可。另外,訓(xùn)練完成后,XGBoost4J-Spark可對(duì)特征重要程度進(jìn)行排名。最后,形成數(shù)據(jù)產(chǎn)品應(yīng)用于相關(guān)業(yè)務(wù)。
圖3?XGBoost4J-Spark模型訓(xùn)練流程圖
? ? ? ?0.70版本及以上版本的XGBoost4J-Spark支持用戶在Spark中使用低級(jí)和高級(jí)內(nèi)存抽象,即RDD和DataFrame/DataSet,而低版本(0.60版本)的僅支持RDD方式。DataFrame/DataSet可以近似看作數(shù)據(jù)庫(kù)的一張表,不但包含數(shù)據(jù),而且包含表結(jié)構(gòu),是結(jié)構(gòu)化的數(shù)據(jù)。用戶可以方便地利用Spark提供的DataFrame/DataSet API對(duì)其操作,也可以通過用戶自定義函數(shù)(UDF)進(jìn)行處理,例如,通過select函數(shù)可以很方便地選取需要的特征形成一個(gè)新的DataFrame/DataSet。以下示例將結(jié)構(gòu)化數(shù)據(jù)保存在JSON文件中,并通過Spark的API解析為DataFrame,并以兩行Scala代碼來訓(xùn)練XGBoost模型。
1.val df = spark.read.json("data.json") 2.//調(diào)用 XGBoost API 訓(xùn)練DataFrame類型的訓(xùn)練集 3.val xgboostModel = XGBoost.trainWithDataFrame( 4. df, paramMap, numRound, nWorkers, useExternalMemory)上述代碼是XGBoost4J-Spark 0.7x版本的實(shí)現(xiàn)代碼,XGBoost4J-Spark 0.8x及以上版本中的部分API有所改動(dòng)。訓(xùn)練代碼如下:
1.val xgbClassifier = new XGBoostClassifier(paramMap). 2. setFeaturesCol("features"). 3. setLabelCol("label") 4.val xgbClassificationModel = xgbClassifier.fit(df)下面通過示例簡(jiǎn)單介紹XGBoost4J-Spark中的一些常用API,其他可參考官方文檔。首先,加載數(shù)據(jù)集,可通過Spark進(jìn)行讀取,例如外部文件加載、Spark SQL等。然后,設(shè)置模型參數(shù),可根據(jù)具體問題及數(shù)據(jù)分布調(diào)整模型參數(shù):
1.val paramMap = Map( 2. "eta" -> 0.1f, 3. "num_class" -> 3, 4. "max_depth" -> 3, 5. "objective" -> "multi:softmax")模型訓(xùn)練調(diào)用方式這里不再贅述,下面介紹訓(xùn)練函數(shù)中各參數(shù)的含義
trainingData:訓(xùn)練集RDD。
params:模型訓(xùn)練參數(shù)。
round:模型迭代輪數(shù)。
nWorkers:XGBoost訓(xùn)練節(jié)點(diǎn)個(gè)數(shù),如果設(shè)為0,則XGBoost會(huì)將訓(xùn)練集RDD的分區(qū)數(shù)作為nWorkers的數(shù)量。
obj:用戶定義的目標(biāo)函數(shù),默認(rèn)為Null。
eval:用戶定義的評(píng)價(jià)函數(shù),默認(rèn)為Null。
useExternalMemory:是否利用外存緩存,如果設(shè)置為True,則可以節(jié)省運(yùn)行XGBoost的RAM成本。
missing:數(shù)據(jù)集中指定為缺省值的值(注意,此處為XGBoost會(huì)將 missing值作為缺省值,在訓(xùn)練之前會(huì)將missing值置為空)。
? ? ? 模型訓(xùn)練完成之后,可將模型文件進(jìn)行保存以供預(yù)測(cè)時(shí)使用。模型被保存為Hadoop文件,存儲(chǔ)于HDFS上。0.7版本通過saveModelAsHadoopFile可實(shí)現(xiàn)該功能,調(diào)用示例如下:
xgboostModel.saveModelAsHadoopFile("/tmp/bst.model")0.8及以上版本直接可通過save函數(shù)實(shí)現(xiàn),如下:
xgboostModel.write.overwrite().save("/tmp/bst.model")XGBoost可以將之前訓(xùn)練好的模型文件直接加載,以供使用,0.7x版本代碼如下:
val model = XGBoost.loadModelFromHadoopFile("/tmp/bst.model")0.8及以上版本,如下:
val model = XGBoostClassificationModel.load("/tmp/bst.model")此處為分類模型,若為回歸模型則為:
val model = XGBoostRegressionModel.load("/tmp/bst.model")將預(yù)測(cè)集傳入訓(xùn)練好的模型即可進(jìn)行預(yù)測(cè),0.7x版本對(duì)RDD類型數(shù)據(jù)預(yù)測(cè)代碼,如下:
val predicts = model.predict(test)0.8及以上版本則直接對(duì)DataSet類型數(shù)據(jù)進(jìn)行預(yù)測(cè),如下:
val predicts = model.transform(test)? ? ? ?Spark訓(xùn)練好的模型也可以下載到本地,通過本地的XGBoost(Python、Java或Scala)加載并進(jìn)行預(yù)測(cè)。這樣既可以實(shí)現(xiàn)模型通過分布式訓(xùn)練海量樣本,提高模型的準(zhǔn)確度,又可以通過單機(jī)調(diào)用分布式訓(xùn)練的模型進(jìn)行預(yù)測(cè),提高模型預(yù)測(cè)速度。
? ? ? ?用戶不僅可以通過DataFrame/DataSet API對(duì)數(shù)據(jù)集進(jìn)行操作,而且可以通過Spark提供的MLlib機(jī)器學(xué)習(xí)包對(duì)特征進(jìn)行處理。MLlib是構(gòu)建于Spark之上的機(jī)器學(xué)習(xí)庫(kù),由通用的學(xué)習(xí)算法和工具類組成。通過MLlib可以方便地對(duì)特征進(jìn)行提取和轉(zhuǎn)化。MLlib還提供了非常豐富的算法,包括分類、回歸、聚類、協(xié)同過濾、降維等,用戶可以根據(jù)應(yīng)用場(chǎng)景將這些算法和XGBoost結(jié)合使用。另外,MLlib還提供了模型選擇工具,用戶可以通過API定義的自動(dòng)參數(shù)搜索過程來選擇最佳模型。
特征提取、變換和選擇
? ? ? 在將訓(xùn)練集送入XGBoost4J-Spark訓(xùn)練之前,可以首先通過MLlib對(duì)特征進(jìn)行處理,包括特征提取、變換和選擇。這是在進(jìn)行模型訓(xùn)練前十分重要的一步,但不是必需的,用戶可以根據(jù)應(yīng)用場(chǎng)景進(jìn)行選擇。
在MLlib中,特征提取方法主要有如下3種。
TF-IDF:詞頻率-逆文檔頻率,是常見的文本預(yù)處理步驟。字詞的重要性隨著它在文件中出現(xiàn)的次數(shù)呈正比增加,但也會(huì)隨著它在語料庫(kù)中出現(xiàn)的頻率呈反比下降。
Word2Vec:其將文檔中的每個(gè)單詞都映射為一個(gè)唯一且固定長(zhǎng)度的向量。
CountVectorizer:用向量表示文檔中每個(gè)詞出現(xiàn)的次數(shù)。
? ? ? 特征變換在Spark機(jī)器學(xué)習(xí)流水線中占有重要地位,廣泛應(yīng)用在各種機(jī)器學(xué)習(xí)場(chǎng)景中。MLlib提供了多種特征變換的方法,此處只選擇常用的方法進(jìn)行介紹。
(1)StringIndexer
StringIndexer將標(biāo)簽的字符串列編碼為標(biāo)簽索引列。索引取值為[0,numLabels],按標(biāo)簽頻率排序。如下表所示,category列為原數(shù)據(jù)列,categoryIndex列為通過StringIndexer編碼后的列。a出現(xiàn)最頻繁(編碼為0.0),依次為c(編碼為1.0)、b(編碼為2.0)。
調(diào)用代碼非常簡(jiǎn)單,只需如下兩行即可實(shí)現(xiàn):
1.val indexer = new StringIndexer() 2. .setInputCol("category") 3. .setOutputCol("categoryIndex") 4. 5.val indexed = indexer.fit(df).transform(df)(2)OneHotEncoder
OneHotEncoder將一列標(biāo)簽索引映射到一列二進(jìn)制向量,最多只有一個(gè)單值,可以將前面StringIndexer生成的索引列轉(zhuǎn)化為向量。OneHotEncoder主要應(yīng)用于類別特征上,如性別、國(guó)籍等。類別特征不能直接應(yīng)用于機(jī)器學(xué)習(xí)模型中,因?yàn)榧词雇ㄟ^StringIndexer將字符串轉(zhuǎn)為數(shù)值型特征后,模型往往默認(rèn)數(shù)據(jù)是連續(xù)的,并且是有序的;但是,類別特征數(shù)字并不是有序的,只是每個(gè)數(shù)字代表一個(gè)類別。
OneHotEncoder可以結(jié)合StringIndexer使用,代碼如下:
1.val indexer = new StringIndexer() 2. .setInputCol("category") 3. .setOutputCol("categoryIndex") 4. .fit(df) 5.val indexed = indexer.transform(df) 6. 7.val encoder = new OneHotEncoder() 8. .setInputCol("categoryIndex") 9. .setOutputCol("categoryVec") 10. 11.val encoded = encoder.transform(indexed)(3)Normalizer
Normalizer可以將多行向量輸入轉(zhuǎn)化為統(tǒng)一的形式。參數(shù)p(默認(rèn)為2)用來指定正則化操作中使用的p-norm。正則化操作可以使輸入數(shù)據(jù)標(biāo)準(zhǔn)化并提高后期模型的效果。
1.val normalizer = new Normalizer() 2. .setInputCol("features") 3. .setOutputCol("normFeatures") 4. .setP(1.0) 5. 6.val l1NormData = normalizer.transform(dataFrame)(4)StandardScaler
StandardScaler處理Vector數(shù)據(jù),標(biāo)準(zhǔn)化每個(gè)特征使得其有統(tǒng)一的標(biāo)準(zhǔn)差及(或者)均值為零。它有如下參數(shù):
1)withStd:默認(rèn)值為真,使用統(tǒng)一標(biāo)準(zhǔn)差方式。
2)withMean:默認(rèn)為假。這種方法將產(chǎn)生一個(gè)稠密輸出,所以不適用于稀疏輸入。
1.val scaler = new StandardScaler() 2. .setInputCol("features") 3. .setOutputCol("scaledFeatures") 4. .setWithStd(true) 5. .setWithMean(false) 6. 7.// 通過擬合StandardScaler計(jì)算匯總統(tǒng)計(jì)信息 8.val scalerModel = scaler.fit(dataFrame) 9. 10.// 標(biāo)準(zhǔn)化特征 11.val scaledData = scalerModel.transform(dataFrame)(5)MinMaxScaler
MinMaxScaler通過重新調(diào)節(jié)大小將Vector形式的列轉(zhuǎn)換到指定的范圍內(nèi),通常為[0,1]。它的參數(shù)有以下2個(gè)。
1)min:默認(rèn)為0.0,為轉(zhuǎn)換后所有特征的上邊界。
2)max:默認(rèn)為1.0,為轉(zhuǎn)換后所有特征的下邊界。
1.val scaler = new MinMaxScaler() 2. .setInputCol("features") 3. .setOutputCol("scaledFeatures") 4. 5.// 計(jì)算統(tǒng)計(jì)信息,生成MinMaxScalerModel 6.val scalerModel = scaler.fit(dataFrame) 7. 8.// 重新縮放每個(gè)特征至[min, max]范圍 9.val scaledData = scalerModel.transform(dataFrame)(6)SQLTransformer
SQLTransformer實(shí)現(xiàn)了基于SQL語句定義的特征轉(zhuǎn)換,如“SELECT...FROM__THIS__...”,其中“__THIS__”表示輸入數(shù)據(jù)集的基礎(chǔ)表。
1.val df = spark.createDataFrame( 2. Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2") 3. 4.val sqlTrans = new SQLTransformer().setStatement( 5. "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") 6. 7.sqlTrans.transform(df)(7)VectorAssembler
VectorAssembler將給定的列列表組合到單個(gè)向量列中。它可以將原始特征和一系列通過其他轉(zhuǎn)換器得到的特征合并為單一的特征向量,以訓(xùn)練如邏輯回歸和決策樹等機(jī)器學(xué)習(xí)算法。
1.val assembler = new VectorAssembler() 2. .setInputCols(Array("hour", "mobile", "userFeatures")) 3. .setOutputCol("features") 4. 5.val output = assembler.transform(dataset)除了以上介紹的幾種方法之外,MLlib還提供了其他特征變換方法,如用于特征分桶的Bucketizer、用于降維的PCA等,此處不再一一介紹,讀者如感興趣可查閱相關(guān)資料[插圖],基于應(yīng)用場(chǎng)景合理選擇相應(yīng)的特征轉(zhuǎn)變換方法。
特征選擇是指通過剔除不相關(guān)或冗余的特征,從而達(dá)到減少特征個(gè)數(shù)、提高模型精確度、減少運(yùn)行時(shí)間的目的。MLlib提供了如下幾種特征選擇的方法
VectorSlicer:從特征向量中輸出一個(gè)新特征向量,該新特征向量為原特征向量的子集,在向量列中提取特征時(shí)很有用。
RFormula:選擇由R模型公式指定的列。
ChiSqSelector:Chi-Squared特征選擇,應(yīng)用于類別特征數(shù)據(jù)。
XGBoost模型訓(xùn)練
? ? ? ?
? ? ? 在進(jìn)行XGBoost模型訓(xùn)練前,通過MLlib對(duì)數(shù)據(jù)集進(jìn)行特征提取、變換、選擇,能夠使數(shù)據(jù)集的特征更具有代表性,減少模型受到的噪聲干擾,提高模型精度。另外,選取出真正相關(guān)的特征簡(jiǎn)化模型,協(xié)助理解數(shù)據(jù)產(chǎn)生的過程。下面通過示例介紹如何將MLlib的特征提取、變換、選擇與XGBoost結(jié)合起來,此處采用iris數(shù)據(jù)集。下面給出來0.8x版本的具體實(shí)現(xiàn):
1.import ml.dmlc.xgboost4j.scala.spark.{TrackerConf, XGBoostClassificationModel, XGBoostClassifier, XGBoostRegressionModel, XGBoostRegressor} 2.import org.apache.spark.ml.feature.StringIndexer 3.import org.apache.spark.ml.feature.VectorAssembler 4.import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} 5. 6.// 讀取數(shù)據(jù)集,生成DataFrame 7.val schema = new StructType(Array( 8. StructField("sepal length", DoubleType, true), 9. StructField("sepal width", DoubleType, true), 10. StructField("petal length", DoubleType, true), 11. StructField("petal width", DoubleType, true), 12. StructField("class", StringType, true))) 13.val df = spark.read.schema(schema).csv("{HDFS PATH}/iris.txt") 14. 15.// 定義StringIndexer,將字符串類型列class轉(zhuǎn)為數(shù)值型列l(wèi)abel 16.val indexer = new StringIndexer() 17. .setInputCol("class") 18. .setOutputCol("label") 19. 20.// 對(duì)前述定義的列進(jìn)行轉(zhuǎn)換,并去掉原來的classz字段 21.val labelTransformed = indexer.fit(df).transform(df).drop("class") 22. 23.// 對(duì)特征進(jìn)行vectorAssembler,生成features列 24.val vectorAssembler = new VectorAssembler(). 25. setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")). 26. setOutputCol("features") 27.val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "label") 28. 29.// 定義訓(xùn)練參數(shù) 30.val paramMap = Map( 31. "eta" -> 0.1f, 32. "num_class" -> 3, 33. "max_depth" -> 3, 34. "objective" -> "multi:softmax", 35. "num_round" -> 10, 36. "num_workers" -> 1) 37. 38.// 訓(xùn)練模型 39.val xgbClassifier = new XGBoostClassifier(paramMap).setFeaturesCol("features").setLabelCol("label") 40.val xgbClassificationModel = xgbClassifier.fit(xgbInput)Piplelines
? ? ? ?MLlib中的Pipeline主要受scikit-learn項(xiàng)目的啟發(fā),旨在更容易地將多個(gè)算法組合成單個(gè)管道或工作流,向用戶提供基于DataFrame的更高層次的API庫(kù),以更方便地構(gòu)建復(fù)雜的機(jī)器學(xué)習(xí)工作流式應(yīng)用。一個(gè)Pipeline可以集成多個(gè)任務(wù),如特征變換、模型訓(xùn)練、參數(shù)設(shè)置等。下面介紹幾個(gè)重要的概念。
DataFrame:相比于RDD,DataFrame還包含schema信息,可以將其近似看作數(shù)據(jù)庫(kù)中的表。
Transformer:Transformer可以看作將一個(gè)DataFrame轉(zhuǎn)換成另一個(gè)DataFrame的算法。例如,模型即可看作一個(gè)Transformer,它將預(yù)測(cè)集的DataFrame轉(zhuǎn)換成了預(yù)測(cè)結(jié)果的DataFrame。
Estimator:一種可以適應(yīng)DataFrame來生成Transformer的算法,操作于DataFrame數(shù)據(jù)并生成一個(gè)Transformer。
Pipeline:可以連接多個(gè)Transformer和Estimator形成機(jī)器學(xué)習(xí)的工作流。
Parameter:設(shè)置Transformer和Estimator的參數(shù)。
? ? ? ?Pipeline是多個(gè)階段形成的一個(gè)序列,每個(gè)階段都是一個(gè)Transformer或者Estimator。這些階段按順序執(zhí)行,當(dāng)數(shù)據(jù)通過DataFrame輸入Pipeline中時(shí),數(shù)據(jù)在每個(gè)階段按相應(yīng)規(guī)則進(jìn)行轉(zhuǎn)換。在Transformer階段,對(duì)DataFrame調(diào)用transform()方法。在Estimator階段,對(duì)DataFrame調(diào)用fit()方法產(chǎn)生一個(gè)Transformer,然后調(diào)用該Transformer的transform()。
? ? ? ?MLlib允許用戶將特征提取/變換/選擇、模型訓(xùn)練、數(shù)據(jù)預(yù)測(cè)等構(gòu)成一個(gè)完整的Pipeline。XGBoost也可以作為Pipeline集成到Spark的機(jī)器學(xué)習(xí)工作流中。下面通過示例介紹如何將特征處理的Transformer和XGBoost結(jié)合起來構(gòu)成Spark的Pipeline。0.8.x版本的實(shí)現(xiàn)代碼如下:
1.import ml.dmlc.xgboost4j.scala.spark.{TrackerConf, XGBoostClassificationModel, XGBoostClassifier, XGBoostRegressionModel, XGBoostRegressor} 2.import ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator 3.import org.apache.spark.ml.feature.StringIndexer 4.import org.apache.spark.ml.feature.VectorAssembler 5.import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} 6.import org.apache.spark.ml.Pipeline 7. 8.// 讀取數(shù)據(jù)集,生成DataFrame 9.val schema = new StructType(Array( 10. StructField("sepal length", DoubleType, true), 11. StructField("sepal width", DoubleType, true), 12. StructField("petal length", DoubleType, true), 13. StructField("petal width", DoubleType, true), 14. StructField("class", StringType, true))) 15.val df = spark.read.schema(schema).csv("{HDFS PATH}/iris.txt") 16. 17.// 定義StringIndexer,將字符串類型列class轉(zhuǎn)為數(shù)值型列l(wèi)abel 18.val indexer = new StringIndexer(). 19. setInputCol("class"). 20. setOutputCol("label") 21. 22.// 對(duì)特征進(jìn)行vectorAssembler,生成features列 23.val vectorAssembler = new VectorAssembler(). 24. setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")). 25. setOutputCol("features") 26. 27.// 定義訓(xùn)練參數(shù) 28.val paramMap = Map( 29. "eta" -> 0.1f, 30. "num_class" -> 3, 31. "max_depth" -> 3, 32. "objective" -> "multi:softmax", 33. "num_round" -> 10, 34. "num_workers" -> 1) 35. 36.// 定義模型 37.val xgbClassifier = new XGBoostClassifier(paramMap).setFeaturesCol("features").setLabelCol("label") 38. 39.// 構(gòu)建pipeline 40.val pipeline = new Pipeline().setStages(Array(indexer, vectorAssembler, xgbClassifier)) 41.val model = pipeline.fit(df) 42. 43.// 預(yù)測(cè) 44.val predict = model.transform(df)模型選擇
? ? ??
? ? ? ?模型選擇是機(jī)器學(xué)習(xí)中非常重要的任務(wù),即通過數(shù)據(jù)找到具體問題的最佳模型和參數(shù),也稱超參數(shù)調(diào)整。模型選擇可以在單獨(dú)的Estimator(如邏輯回歸)中完成,也可以在包含多個(gè)算法或者其他步驟的Pipeline中完成。用戶可以一次調(diào)整整個(gè)Pipeline中的參數(shù),而不是單獨(dú)調(diào)整Pipeline中的每一個(gè)元素。MLlib支持CrossValidator和TrainValidationSplit兩個(gè)模型選擇工具。
(1)CrossValidator
? ? ? ?即交叉驗(yàn)證,將數(shù)據(jù)集劃分為若干份子集分別進(jìn)行訓(xùn)練和測(cè)試。例如,設(shè)置k值為3,CrossValidator將產(chǎn)生3組數(shù)據(jù),每組數(shù)據(jù)中的2/3作為訓(xùn)練集進(jìn)行訓(xùn)練,1/3作為測(cè)試集進(jìn)行測(cè)試。CrossValidator計(jì)算3組數(shù)據(jù)訓(xùn)練模型的評(píng)估準(zhǔn)則的平均值。確定了最佳參數(shù)之后,CrossValidator使用最佳參數(shù)重新對(duì)整個(gè)數(shù)據(jù)集進(jìn)行擬合得到最終模型。
(2)Train-Validation Split
? ? ? ?除了CrossValidator之外,MLlib還提供了Train-Validation Split用以超參數(shù)調(diào)整。和CrossValidator不同的是,Train-Validation Split只驗(yàn)證1次,而非k次。Train-Validation Split的計(jì)算代價(jià)相較于CrossValidator更低,但是當(dāng)訓(xùn)練數(shù)據(jù)集不夠大時(shí),結(jié)果可靠性不高。Train-Validation Split通過trainRatio參數(shù)將數(shù)據(jù)集分成兩個(gè)部分。例如,設(shè)置trainRatio=0.75,TrainValidation Split則將75%的數(shù)據(jù)用于訓(xùn)練,25%的數(shù)據(jù)用于測(cè)試。
? ? ? ?模型選擇確定最佳參數(shù)是最大限度提高XGBoost模型的關(guān)鍵步驟之一。通過手工調(diào)整參數(shù)是一項(xiàng)費(fèi)時(shí)又乏味的過程。最新版本的XGBoost4J-Spark可以通過MLlib的模型選擇工具進(jìn)行參數(shù)調(diào)優(yōu),極大地提高了機(jī)器學(xué)習(xí)過程中參數(shù)調(diào)優(yōu)的效率。下面通過一個(gè)示例來說明如何利用MLlib模型選擇工具對(duì)XGBoost進(jìn)行參數(shù)調(diào)優(yōu)。0.8x版本的實(shí)現(xiàn)代碼如下:
1.import org.apache.spark.ml.tuning.ParamGridBuilder 2.import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator 3.import org.apache.spark.ml.tuning.TrainValidationSplit 4. 5.// 創(chuàng)建xgbClassifier 6.val xgbClassifier = new XGBoostClassifier(paramMap).setFeaturesCol("features").setLabelCol("label") 7. 8.// 設(shè)定參數(shù)調(diào)優(yōu)時(shí)參數(shù)的范圍 9.val paramGrid = new ParamGridBuilder(). 10. addGrid(xgbEstimator.maxDepth, Array(5, 6)). 11. addGrid(xgbEstimator.eta, Array(0.1, 0.4)). 12. build() 13. 14.// 構(gòu)建TrainValidationSplit,設(shè)置trainRatio=0.8,即80%的數(shù)據(jù)用于訓(xùn)練,20%的數(shù)據(jù)用于測(cè)試 15.val tv = new TrainValidationSplit(). 16. setEstimator(xgbEstimator). 17. setEvaluator(new MulticlassClassificationEvaluator().setLabelCol("label")). 18. setEstimatorParamMaps(paramGrid). 19. setTrainRatio(0.8) 20.val model = tv.fit(xgbInput)? ? ? ?上述示例利用MLlib中的Train-Validation Split和RegressionEvaluator對(duì)XGBoost的eta和maxDepth兩個(gè)參數(shù)進(jìn)行調(diào)整,選擇RegressionEvaluator定義的最小成本函數(shù)值的模型作為最佳模型。
? ? ? ?通過XGBoost4J-Spark,用戶可以構(gòu)建一個(gè)基于Spark的更高效的數(shù)據(jù)處理流水線。該流水線可以很好地利用DataFrame/DataSet API對(duì)結(jié)構(gòu)化數(shù)據(jù)進(jìn)行處理,并且同時(shí)擁有強(qiáng)大的XGBoost作為機(jī)器學(xué)習(xí)模型。另外,XGBoost4J-Spark使得XGBoost和Spark MLlib無縫連接,使得特征提取/變換/選擇和參數(shù)調(diào)優(yōu)工作比以前更容易。
以上內(nèi)容摘自《 深入理解XGBoost 》一書并進(jìn)行整理,經(jīng)出版方授權(quán)發(fā)布。
往期精彩回顧適合初學(xué)者入門人工智能的路線及資料下載機(jī)器學(xué)習(xí)在線手冊(cè)深度學(xué)習(xí)在線手冊(cè)AI基礎(chǔ)下載(pdf更新到25集)本站qq群1003271085,加入微信群請(qǐng)回復(fù)“加群”獲取一折本站知識(shí)星球優(yōu)惠券,請(qǐng)回復(fù)“知識(shí)星球”喜歡文章,點(diǎn)個(gè)在看
總結(jié)
以上是生活随笔為你收集整理的深入理解XGBoost:分布式实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2020年应届生找工作难?Python爬
- 下一篇: 【实战】使用pyecharts绘制词云图