日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

spark mllib源码分析之随机森林(Random Forest)

發(fā)布時(shí)間:2024/1/17 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark mllib源码分析之随机森林(Random Forest) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Spark在mllib中實(shí)現(xiàn)了tree相關(guān)的算法,決策樹(shù)DT(DecisionTree),隨機(jī)森林RF(RandomForest),GBDT(Gradient Boosting Decision Tree),其基礎(chǔ)都是RF,DT是RF一棵樹(shù)時(shí)的情況,而GBDT則是循環(huán)構(gòu)建DT,GBDT與DT的代碼是非常簡(jiǎn)單明了的,本文將分成五部分分別對(duì)RF的源碼進(jìn)行分析,介紹spark在實(shí)現(xiàn)過(guò)程中使用的一些技巧。

1. 決策樹(shù)與隨機(jī)森林
首先對(duì)決策樹(shù)和隨機(jī)森林進(jìn)行簡(jiǎn)單的回顧。

1.1. 決策樹(shù)


在決策樹(shù)的訓(xùn)練中,如上圖所示,就是從根節(jié)點(diǎn)開(kāi)始,不斷的分裂,直到觸發(fā)截止條件,在節(jié)點(diǎn)的分裂過(guò)程中要解決的問(wèn)題其實(shí)就2個(gè)

分裂點(diǎn):一般就是遍歷所有特征的所有特征值,選取impurity最大的分成左右孩子節(jié)點(diǎn),impurity的選取有信息熵(分類),最小均方差(回歸)等方法
預(yù)測(cè)值:一般取當(dāng)前最多的class(分類)或者取均值(回歸)
1.2. 隨機(jī)森林
隨機(jī)森林就是構(gòu)建多棵決策樹(shù)投票,在構(gòu)建多棵樹(shù)過(guò)程中,引入隨機(jī)性,一般體現(xiàn)在兩個(gè)方面,一是每棵樹(shù)使用的樣本進(jìn)行隨機(jī)抽樣,分為有放回和無(wú)放回抽樣。二是對(duì)每棵樹(shù)使用的特征集進(jìn)行抽樣,使用部分特征訓(xùn)練。?
在訓(xùn)練過(guò)程中,如果單機(jī)內(nèi)存能放下所有樣本,可以用多線程同時(shí)訓(xùn)練多棵樹(shù),樹(shù)之間的訓(xùn)練互不影響。

2. spark RF優(yōu)化策略
spark在實(shí)現(xiàn)RF時(shí),使用了一些優(yōu)化技巧,提高訓(xùn)練效率。

2.1. 逐層訓(xùn)練
當(dāng)樣本量過(guò)大,單機(jī)無(wú)法容納時(shí),只能采用分布式的訓(xùn)練方法,數(shù)據(jù)是在集群中的多臺(tái)機(jī)器存放,如果按照單機(jī)的方法,每棵樹(shù)完全獨(dú)立訪問(wèn)樣本數(shù)據(jù),則樣本數(shù)據(jù)的訪問(wèn)次數(shù)為數(shù)的個(gè)數(shù)k*每棵樹(shù)的節(jié)點(diǎn)數(shù)N,相當(dāng)于深度遍歷。在spark的實(shí)現(xiàn)中,因?yàn)閿?shù)據(jù)存放在不同的機(jī)器上,頻繁的訪問(wèn)數(shù)據(jù)效率非常低,因此采用廣度遍歷的方法,每次構(gòu)造所有樹(shù)的一層,例如如果要訓(xùn)練10棵樹(shù),第一次構(gòu)造所有樹(shù)的第一層根節(jié)點(diǎn),第二次構(gòu)造所有深度為2的節(jié)點(diǎn),以此類推,這樣訪問(wèn)數(shù)據(jù)的次數(shù)降為樹(shù)的最大深度,大大減少了機(jī)器之間的通信,提高訓(xùn)練效率。

2.2. 樣本抽樣
當(dāng)樣本存在連續(xù)特征時(shí),其可能的取值可能是無(wú)限的,存儲(chǔ)其可能出現(xiàn)的值占用較大空間,因此spark對(duì)樣本進(jìn)行了抽樣,抽樣數(shù)量

val requiredSamples = math.max(metadata.maxBins * metadata.maxBins, 10000)

最少抽樣1萬(wàn)條,當(dāng)然這樣會(huì)降低模型精度。

2.3. 特征裝箱
其實(shí)沒(méi)什么神秘的,每個(gè)離散特征值(對(duì)于連續(xù)特征,先離散化)稱為一個(gè)Split,上下限[lowSplit, highSplit]組成一個(gè)bin,也就是特征裝箱,默認(rèn)的maxBins是32。對(duì)于連續(xù)特征,離散化時(shí)的bin的個(gè)數(shù)就是maxBins,采用等頻離散化;對(duì)于有序的離散特征,bin的個(gè)數(shù)是特征值個(gè)數(shù)+1;對(duì)于無(wú)序離散特征,bin的個(gè)數(shù)是2^(M-1)-1,M是特征值個(gè)數(shù)

3. 源碼分析
我們從官方給出的分類demo開(kāi)始,逐層分析其實(shí)現(xiàn)

3.1. 訓(xùn)練數(shù)據(jù)的解析
主要是LabelPoint的構(gòu)造,官方demo中要求訓(xùn)練數(shù)據(jù)是LibSVM格式的

parsed.map { case (label, indices, values) =>
? ? ? LabeledPoint(label, Vectors.sparse(d, indices, values))
? ? }

可以看到LabelPoint有兩個(gè)成員,第一個(gè)是樣本label,第二個(gè)是稀疏向量SparseVector,d是其size,在這里其實(shí)是特征數(shù),indices是實(shí)際非0特征的index,values里面是實(shí)際的特征值,這里需要注意的是,SVN格式的特征index是從0開(kāi)始的,這里進(jìn)行了-1,變成從0開(kāi)始了。

3.2. demo中訓(xùn)練參數(shù)說(shuō)明
官方demo中只設(shè)置了部分參數(shù)

val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
? ? ? numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)

categoricalFeaturesInfo:Map[Int, Int],key是特征的index,value為特征值的個(gè)數(shù)(或者說(shuō)幾種),這里值得注意的是,因?yàn)長(zhǎng)abelPoint中進(jìn)行了index-1的變換,這個(gè)里面的key也需要-1(參見(jiàn)后面metadata的numBins的計(jì)算)。例如性別這個(gè)特征在樣本中的index為1,特征值男/女兩種,則0->2
featureSubsetStrategy:特征子集的抽取方法,支持”auto”, “all”, “sqrt”, “l(fā)og2”, “onethird”
impurity:不純度,其實(shí)就是節(jié)點(diǎn)分裂時(shí)的衡量準(zhǔn)則,例如信息熵,均方差等,這里支持三種,gini(基尼指數(shù)),entripy(信息熵),variance(均方差)
maxDepth:樹(shù)的最大深度
maxBins:最大裝箱數(shù),或者說(shuō)是特征的最大可能切分?jǐn)?shù)+1。這個(gè)值必須大于等于最大的離散特征值數(shù)
3.3. 參數(shù)封裝
spark根據(jù)用戶提供的參數(shù)值,進(jìn)行實(shí)際訓(xùn)練參數(shù)的計(jì)算,并且將這些參數(shù)封裝成類,方便傳遞。

3.3.1. Strategy
class Strategy @Since("1.3.0") (
? ? @Since("1.0.0") @BeanProperty var algo: Algo,
? ? @Since("1.0.0") @BeanProperty var impurity: Impurity,
? ? @Since("1.0.0") @BeanProperty var maxDepth: Int,
? ? @Since("1.2.0") @BeanProperty var numClasses: Int = 2,
? ? @Since("1.0.0") @BeanProperty var maxBins: Int = 32,
? ? @Since("1.0.0") @BeanProperty var quantileCalculationStrategy: QuantileStrategy = Sort,
? ? @Since("1.0.0") @BeanProperty var categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int](),
? ? @Since("1.2.0") @BeanProperty var minInstancesPerNode: Int = 1,
? ? @Since("1.2.0") @BeanProperty var minInfoGain: Double = 0.0,
? ? @Since("1.0.0") @BeanProperty var maxMemoryInMB: Int = 256,
? ? @Since("1.2.0") @BeanProperty var subsamplingRate: Double = 1,
? ? @Since("1.2.0") @BeanProperty var useNodeIdCache: Boolean = false,
? ? @Since("1.2.0") @BeanProperty var checkpointInterval: Int = 10)

algo:classification/regression
quantileCalculationStrategy:分位點(diǎn)(Split)策略,目前只支持Sort,對(duì)于連續(xù)型特征值,先把特征值進(jìn)行排序,然后按次序取分位點(diǎn)。從代碼中可以看到原來(lái)可能打算實(shí)現(xiàn)的MinMax和ApproxHist目前沒(méi)有實(shí)現(xiàn)。
minInstancesPerNode:每個(gè)樹(shù)節(jié)點(diǎn)中最小的樣本數(shù),低于將不再對(duì)節(jié)點(diǎn)進(jìn)行分裂,默認(rèn)為1,可作為提前截止條件
minInfoGain:最小增益,節(jié)點(diǎn)分裂后的增益如果小于它,將不再進(jìn)行分裂,可作為提前截止條件
subsamplingRate:樣本抽樣率,默認(rèn)為1,每棵樹(shù)都使用全部樣本
isMulticlassClassification:是否是多分類,判斷條件為Classification 并且類別>2
isMulticlassWithCategoricalFeatures:是否是帶類別特征的多分類,判斷條件再上面的基礎(chǔ)上加categoricalFeaturesInfo的size大于0
3.3.2. metadata
在buildMetadata中根據(jù)strategy計(jì)算得到DecisionTreeMetadata的參數(shù)。

class DecisionTreeMetadata(
? ? val numFeatures: Int,
? ? val numExamples: Long,
? ? val numClasses: Int,
? ? val maxBins: Int,
? ? val featureArity: Map[Int, Int],
? ? val unorderedFeatures: Set[Int],
? ? val numBins: Array[Int],
? ? val impurity: Impurity,
? ? val quantileStrategy: QuantileStrategy,
? ? val maxDepth: Int,
? ? val minInstancesPerNode: Int,
? ? val minInfoGain: Double,
? ? val numTrees: Int,
? ? val numFeaturesPerNode: Int)

部分參數(shù)同Strategy,對(duì)額外參數(shù)和區(qū)別說(shuō)明

numClasses:如為Regression,設(shè)為0
maxPossibleBins:取maxBins和樣本數(shù)量中較小的;必須大于categoricalFeaturesInfo中的最大的離散特征值數(shù)
numBins:所有特征及其特征值數(shù),Int數(shù)組,維數(shù)是特征數(shù),默認(rèn)大小是maxPossibleBins。對(duì)于連續(xù)特征,其值就是默認(rèn)值maxPossibleBins。對(duì)于離散特征,如為二分類或回歸,此處將categoricalFeaturesInfo中的key特征index作為數(shù)組index,value特征個(gè)數(shù)寫入數(shù)組中(這里有疑問(wèn),SVM格式的index是從1開(kāi)始的,因此對(duì)numBins的index應(yīng)該是categoricalFeaturesInfo的key-1,這里沒(méi)有-1,當(dāng)最大值等于maxBins的時(shí)候訪問(wèn)數(shù)組會(huì)拋異常);如果是多分類,先計(jì)算其當(dāng)做當(dāng)UnorderedFeature(無(wú)序的離散特征)的bin,如果個(gè)數(shù)小于等于maxPossibleBins,會(huì)被當(dāng)成UnorderedFeature,否則被當(dāng)成orderedFeatures(為了防止計(jì)算指數(shù)溢出,實(shí)際是把maxPossibleBins取log與特征數(shù)比較),因?yàn)閁norderedFeature的bin是比較大,這里限制了其特征值不能太多,這里僅僅根據(jù)特征值的特殊決定是否是ordered,不太好。每個(gè)split要將所有特征值分成2部分,bin的數(shù)量也就是2*split,因此bin的個(gè)數(shù)是2*(2^(M-1)-1)
numFeaturesPerNode:由featureSubsetStrategy決定,如果為“auto”,且為單棵樹(shù),則使用全部特征;如為多棵樹(shù),分類則是sqrt,回歸為1/3;也可以自己指定,支持”all”, “sqrt”, “l(fā)og2”, “onethird”。?
如果僅對(duì)RF的使用感興趣,了解上述訓(xùn)練參數(shù)也就可以了,后面的文章將對(duì)其訓(xùn)練代碼進(jìn)行分析。
?

4. 特征處理
這部分主要在DecisionTree.scala的findSplitsBins函數(shù),將所有特征封裝成Split,然后裝箱Bin。首先對(duì)split和bin的結(jié)構(gòu)進(jìn)行說(shuō)明

4.1. 數(shù)據(jù)結(jié)構(gòu)
4.1.1. Split
class Split(
? ? @Since("1.0.0") feature: Int,
? ? @Since("1.0.0") threshold: Double,
? ? @Since("1.0.0") featureType: FeatureType,
? ? @Since("1.0.0") categories: List[Double])

feature:特征id
threshold:閾值
featureType:連續(xù)特征(Continuous)/離散特征(Categorical)
categories:離散特征值數(shù)組,離散特征使用。放著此split中所有特征值
4.1.2. Bin
class Bin(
? ? lowSplit: Split,?
? ? highSplit: Split,?
? ? featureType: FeatureType,?
? ? category: Double)

lowSplit/highSplit:上下界
featureType:連續(xù)特征(Continuous)/離散特征(Categorical)
category:離散特征的特征值
4.2. 連續(xù)特征處理
4.2.1. 抽樣
val continuousFeatures = Range(0, numFeatures).filter(metadata.isContinuous)
val sampledInput = if (continuousFeatures.nonEmpty) {
? ? ? // Calculate the number of samples for approximate quantile calculation.
? ? ? val requiredSamples = math.max(metadata.maxBins * metadata.maxBins, 10000)
? ? ? val fraction = if (requiredSamples < metadata.numExamples) {
? ? ? ? requiredSamples.toDouble / metadata.numExamples
? ? ? } else {
? ? ? ? 1.0
? ? ? }
? ? ? logDebug("fraction of data used for calculating quantiles = " + fraction)
? ? ? input.sample(withReplacement = false, fraction, new XORShiftRandom().nextInt())
? ? } else {
? ? ? input.sparkContext.emptyRDD[LabeledPoint]
? ? }

首先篩選出連續(xù)特征集,然后計(jì)算抽樣數(shù)量,抽樣比例,然后無(wú)放回樣本抽樣;如果沒(méi)有連續(xù)特征,則為空RDD

4.2.2. 計(jì)算Split
metadata.quantileStrategy match {
? ? ? case Sort =>
? ? ? ? findSplitsBinsBySorting(sampledInput, metadata, continuousFeatures)
? ? ? case MinMax =>
? ? ? ? throw new UnsupportedOperationException("minmax not supported yet.")
? ? ? case ApproxHist =>
? ? ? ? throw new UnsupportedOperationException("approximate histogram not supported yet.")
? ? }

分位點(diǎn)策略,這里只實(shí)現(xiàn)了Sort這一種,前文有說(shuō)明,下面的計(jì)算在findSplitsBinsBySorting函數(shù)中,入?yún)⑹浅闃訕颖炯?#xff0c;metadata和連續(xù)特征集(里面是特征id,從0開(kāi)始,見(jiàn)LabelPoint的構(gòu)造)

val continuousSplits = {
? ? // reduce the parallelism for split computations when there are less
? ? // continuous features than input partitions. this prevents tasks from
? ? // being spun up that will definitely do no work.
? ? val numPartitions = math.min(continuousFeatures.length,input.partitions.length)
? ? input.flatMap(point => continuousFeatures.map(idx => ?(idx,point.features(idx))))
? ? ? ? ?.groupByKey(numPartitions)
? ? ? ? ?.map { case (k, v) => findSplits(k, v) }
? ? ? ? ?.collectAsMap()
? ? }

特征id為key,value是樣本對(duì)應(yīng)的該特征下的所有特征值,傳給findSplits函數(shù),其中又調(diào)用了findSplitsForContinuousFeature函數(shù)獲得連續(xù)特征的Split,入?yún)闃颖?#xff0c;metadata和特征id

def findSplitsForContinuousFeature(
? ? ? featureSamples: Array[Double],?
? ? ? metadata: DecisionTreeMetadata,
? ? ? featureIndex: Int): Array[Double] = {
? ? require(metadata.isContinuous(featureIndex),
? ? ? "findSplitsForContinuousFeature can only be used to find splits for a continuous feature.")

? ? val splits = {
? ? //連續(xù)特征的split是numBins-1
? ? ? val numSplits = metadata.numSplits(featureIndex)
? ? //統(tǒng)計(jì)所有特征值其出現(xiàn)的次數(shù)
? ? ? // get count for each distinct value
? ? ? val valueCountMap = featureSamples.foldLeft(Map.empty[Double, Int]) { (m, x) =>
? ? ? ? m + ((x, m.getOrElse(x, 0) + 1))
? ? ? }
? ? ? //按特征值排序
? ? ? // sort distinct values
? ? ? val valueCounts = valueCountMap.toSeq.sortBy(_._1).toArray

? ? ? // if possible splits is not enough or just enough, just return all possible splits
? ? ? val possibleSplits = valueCounts.length
? ? ? if (possibleSplits <= numSplits) {
? ? ? ? valueCounts.map(_._1)
? ? ? } else {
? ? ? //等頻離散化
? ? ? ? // stride between splits
? ? ? ? val stride: Double = featureSamples.length.toDouble / (numSplits + 1)
? ? ? ? logDebug("stride = " + stride)

? ? ? ? // iterate `valueCount` to find splits
? ? ? ? val splitsBuilder = Array.newBuilder[Double]
? ? ? ? var index = 1
? ? ? ? // currentCount: sum of counts of values that have been visited
? ? ? ? var currentCount = valueCounts(0)._2
? ? ? ? // targetCount: target value for `currentCount`.
? ? ? ? // If `currentCount` is closest value to `targetCount`,
? ? ? ? // then current value is a split threshold.
? ? ? ? // After finding a split threshold, `targetCount` is added by stride.
? ? ? ? var targetCount = stride
? ? ? ? while (index < valueCounts.length) {
? ? ? ? ? val previousCount = currentCount
? ? ? ? ? currentCount += valueCounts(index)._2
? ? ? ? ? val previousGap = math.abs(previousCount - targetCount)
? ? ? ? ? val currentGap = math.abs(currentCount - targetCount)
? ? ? ? ? // If adding count of current value to currentCount
? ? ? ? ? // makes the gap between currentCount and targetCount smaller,
? ? ? ? ? // previous value is a split threshold.
? ? ? ? ? //每次步進(jìn)targetCount個(gè)樣本,取上一個(gè)特征值與下一個(gè)特征值gap較小的
? ? ? ? ? if (previousGap < currentGap) {
? ? ? ? ? ? splitsBuilder += valueCounts(index - 1)._1
? ? ? ? ? ? targetCount += stride
? ? ? ? ? }
? ? ? ? ? index += 1
? ? ? ? }

? ? ? ? splitsBuilder.result()
? ? ? }
? ? }

? ? // TODO: Do not fail; just ignore the useless feature.
? ? assert(splits.length > 0,
? ? ? s"DecisionTree could not handle feature $featureIndex since it had only 1 unique value." +
? ? ? ? " ?Please remove this feature and then try again.")

? ? // the split metadata must be updated on the driver

? ? splits
? }

在構(gòu)造split的過(guò)程中,如果統(tǒng)計(jì)到的值的個(gè)數(shù)possibleSplits 還不如你設(shè)置的numSplits多,那么所有的值都作為分割點(diǎn);否則,用等頻分隔法,首先計(jì)算分隔步長(zhǎng)stride,然后再循環(huán)中每次累加到targetCount中,作為理想分割點(diǎn),但是理想分割點(diǎn)可能會(huì)包含的特征值過(guò)多,想取一個(gè)里理想分割點(diǎn)盡量近的特征值,例如,理想分割點(diǎn)是100,落在特征值fcfc里,但是當(dāng)前特征值里面有30個(gè)樣本,而前一個(gè)特征值fpfp只有5個(gè)樣本,因此我們?nèi)绻cfc作為split,則當(dāng)前區(qū)間實(shí)際多25個(gè)樣本,如果取fpfp,則少5個(gè)樣本,顯然取fpfp更為合理。?
具體到代碼實(shí)現(xiàn),在if判斷里步進(jìn)stride個(gè)樣本,累加在targetCount中。while循環(huán)逐次把每個(gè)特征值的個(gè)數(shù)加到currentCount里,計(jì)算前一次previousCount和這次currentCount到targetCount的距離,有3種情況,一種是pre和cur都在target左邊,肯定是cur小,繼續(xù)循環(huán),進(jìn)入第二種情況;第二種一左一右,如果pre小,肯定是pre是最好的分割點(diǎn),如果cur還是小,繼續(xù)循環(huán)步進(jìn),進(jìn)入第三種情況;第三種就是都在右邊,顯然是pre小。因此if的判斷條件pre<curpre<cur,只要滿足肯定就是split。整體下來(lái)的效果就能找到離target最近的一個(gè)特征值。?
findSplits函數(shù)使用本函數(shù)得到的離散化點(diǎn)作為threshold,構(gòu)造Split

val splits = {
? ? val featureSplits = findSplitsForContinuousFeature(
? ? ? ? ? featureSamples.toArray,
? ? ? ? ? metadata,
? ? ? ? ? featureIndex)
? ? logDebug(s"featureIndex = $featureIndex, numSplits = ${featureSplits.length}")

? ? featureSplits.map(threshold => new Split(featureIndex, threshold, Continuous, Nil))
}

這樣就得到了連續(xù)特征所有的Split?
4.2.3. 計(jì)算bin?
得到splits后,即可類似滑窗得到bin的上下界,構(gòu)造bins

val bins = {
? ? val lowSplit = new DummyLowSplit(featureIndex, Continuous)
? ? val highSplit = new DummyHighSplit(featureIndex, Continuous)

? ? // tack the dummy splits on either side of the computed splits
? ? val allSplits = lowSplit +: splits.toSeq :+ highSplit

? ? // slide across the split points pairwise to allocate the bins
? ? allSplits.sliding(2).map {
? ? ? ? ?case Seq(left, right) => new Bin(left, right, Continuous, Double.MinValue)
? ? }.toArray
}

在計(jì)算splits的時(shí)候,個(gè)數(shù)是bin的個(gè)數(shù)減1,這里加上第一個(gè)DummyLowSplit(threshold是Double.MinValue),和最后一個(gè)DummyHighSplit(threshold是Double.MaxValue)構(gòu)造的bin,恰好個(gè)數(shù)是numBins中的個(gè)數(shù)

4.3. 離散特征
bin的主要作用其實(shí)就是用來(lái)做連續(xù)特征離散化,離散特征是用不著的。?
對(duì)有序離散特征而言,其split直接用特征值表征,因此這里的splits和bins都是空的Array。?
對(duì)于無(wú)序離散特征而言,其split是特征值的組合,不是簡(jiǎn)單的上下界比較關(guān)系,bin是空Array,而split需要計(jì)算。

4.3.1. split
// Unordered features
// 2^(maxFeatureValue - 1) - 1 combinations
val featureArity = metadata.featureArity(i)
val split = Range(0, metadata.numSplits(i)).map { splitIndex =>
? ? val categories = extractMultiClassCategories(splitIndex + 1, featureArity)
? ? new Split(i, Double.MinValue, Categorical, categories)
}

featureArity來(lái)自參數(shù)categoricalFeaturesInfo中設(shè)置的離散特征的特征值數(shù)。?
metadata.numSplits是吧numBins中的數(shù)量/2,相當(dāng)于返回了2^(M-1)-1,M是特征值數(shù)。?
調(diào)用extractMultiClassCategories函數(shù),入?yún)⑹?到2^(M-1)和特征數(shù)M。

/**
? ?* Nested method to extract list of eligible categories given an index. It extracts the
? ?* position of ones in a binary representation of the input. If binary
? ?* representation of an number is 01101 (13), the output list should (3.0, 2.0,
? ?* 0.0). The maxFeatureValue depict the number of rightmost digits that will be tested for ones.
? ?*/
def extractMultiClassCategories(
? ? ?input: Int,
? ? ?maxFeatureValue: Int): List[Double] = {
? ? var categories = List[Double]()
? ? var j = 0
? ? var bitShiftedInput = input
? ? while (j < maxFeatureValue) {
? ? ? if (bitShiftedInput % 2 != 0) {
? ? ? ? // updating the list of categories.
? ? ? ? categories = j.toDouble :: categories
? ? ? }
? ? ? // Right shift by one
? ? ? bitShiftedInput = bitShiftedInput >> 1
? ? ? j += 1
? ? }
? ? categories
}

如注釋所述,這個(gè)函數(shù)返回給定的input的二進(jìn)制表示中1的index,這里實(shí)際返回的是特征的組合,之前文章介紹過(guò)的《組合數(shù)》。

5. 樣本處理
將輸入樣本LabelPoint與上述特征進(jìn)一步封裝,方便后面進(jìn)行分區(qū)統(tǒng)計(jì)。

5.1. TreePoint
構(gòu)造TreePoint的過(guò)程,是一系列函數(shù)的調(diào)用鏈,我們逐層分析。

val treeInput = TreePoint.convertToTreeRDD(retaggedInput, bins, metadata)

RandomForest.scala中將輸入轉(zhuǎn)化成TreePoint的rdd,調(diào)用convertToTreeRDD函數(shù)

def convertToTreeRDD(
? ? input: RDD[LabeledPoint],
? ? bins: Array[Array[Bin]],
? ? metadata: DecisionTreeMetadata): RDD[TreePoint] = {
? ? // Construct arrays for featureArity for efficiency in the inner loop.
? ? val featureArity: Array[Int] = new Array[Int](metadata.numFeatures)
? ? var featureIndex = 0
? ? while (featureIndex < metadata.numFeatures) {
? ? ? featureArity(featureIndex) = metadata.featureArity.getOrElse(featureIndex, 0)
? ? ? featureIndex += 1
? ? }
? ? input.map { x =>
? ? ? TreePoint.labeledPointToTreePoint(x, bins, featureArity)
? ? }
? }

convertToTreeRDD函數(shù)的入?yún)nput是所有樣本,bins是二維數(shù)組,第一維是特征,第二維是特征的Bin數(shù)組。函數(shù)首先計(jì)算每個(gè)特征的特征數(shù)量,放在featureArity中,如果是連續(xù)特征,設(shè)為0。對(duì)每個(gè)樣本調(diào)用labeledPointToTreePoint函數(shù),構(gòu)造TreePoint。

private def labeledPointToTreePoint(
? ? ? labeledPoint: LabeledPoint,
? ? ? bins: Array[Array[Bin]],
? ? ? featureArity: Array[Int]): TreePoint = {
? ? val numFeatures = labeledPoint.features.size
? ? val arr = new Array[Int](numFeatures)
? ? var featureIndex = 0
? ? while (featureIndex < numFeatures) {
? ? ? arr(featureIndex) = findBin(featureIndex, labeledPoint, featureArity(featureIndex),
? ? ? ? bins)
? ? ? featureIndex += 1
? ? }
? ? new TreePoint(labeledPoint.label, arr)
? }

labeledPointToTreePoint計(jì)算每個(gè)樣本的所有特征對(duì)應(yīng)的特征值屬于哪個(gè)bin,放在在arr數(shù)組中;如果是連續(xù)特征,存放的實(shí)際是binIndex,或者說(shuō)是第幾個(gè)bin;如果是離散特征,直接featureValue.toInt,這其實(shí)暗示著,對(duì)有序離散值,其編碼只能是[0,featureArity - 1],閉區(qū)間,其后的部分邏輯也依賴于這個(gè)假設(shè)。這部分是在findBin函數(shù)中完成的,這里不再贅述。?
我們?cè)谶@里把TreePoint的成員再羅列一下,方便查閱

class TreePoint(val label: Double, val binnedFeatures: Array[Int])

這里是把每個(gè)樣本從LabelPoint轉(zhuǎn)換成TreePoint,label就是樣本label,binnedFeatures就是上述的arr數(shù)組。

5.2. BaggedPoint
同理構(gòu)造BaggedPoint的過(guò)程,也是一系列函數(shù)的調(diào)用鏈,我們逐層分析。

val withReplacement = if (numTrees > 1) true else false
val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput,
? ? ? ? ? strategy.subsamplingRate, numTrees,
? ? ? ? ? withReplacement, seed).persist(StorageLevel.MEMORY_AND_DISK)

這里同時(shí)對(duì)樣本進(jìn)行了抽樣,如果樹(shù)個(gè)數(shù)大于1,就有放回抽樣,否則無(wú)放回抽樣,調(diào)用convertToTreeRDD函數(shù)將TreePoint轉(zhuǎn)化成BaggedPoint的rdd

/**
? ?* Convert an input dataset into its BaggedPoint representation,
? ?* choosing subsamplingRate counts for each instance.
? ?* Each subsamplingRate has the same number of instances as the original dataset,
? ?* and is created by subsampling without replacement.
? ?* @param input Input dataset.
? ?* @param subsamplingRate Fraction of the training data used for learning decision tree.
? ?* @param numSubsamples Number of subsamples of this RDD to take.
? ?* @param withReplacement Sampling with/without replacement.
? ?* @param seed Random seed.
? ?* @return BaggedPoint dataset representation.
? ?*/
? def convertToBaggedRDD[Datum] (
? ? ? input: RDD[Datum],
? ? ? subsamplingRate: Double,
? ? ? numSubsamples: Int,
? ? ? withReplacement: Boolean,
? ? ? seed: Long = Utils.random.nextLong()): RDD[BaggedPoint[Datum]] = {
? ? if (withReplacement) {
? ? ? convertToBaggedRDDSamplingWithReplacement(input, subsamplingRate, numSubsamples, seed)
? ? } else {
? ? ? if (numSubsamples == 1 && subsamplingRate == 1.0) {
? ? ? ? convertToBaggedRDDWithoutSampling(input)
? ? ? } else {
? ? ? ? convertToBaggedRDDSamplingWithoutReplacement(input, subsamplingRate, numSubsamples, seed)
? ? ? }
? ? }
? }

根據(jù)有放回還是無(wú)放回,或者不抽樣分別調(diào)用相應(yīng)函數(shù)。無(wú)放回抽樣

def convertToBaggedRDDSamplingWithoutReplacement[Datum] (
? ? ? input: RDD[Datum],
? ? ? subsamplingRate: Double,
? ? ? numSubsamples: Int,
? ? ? seed: Long): RDD[BaggedPoint[Datum]] = {
? ? //對(duì)每個(gè)partition獨(dú)立抽樣
? ? input.mapPartitionsWithIndex { (partitionIndex, instances) =>
? ? ? // Use random seed = seed + partitionIndex + 1 to make generation reproducible.
? ? ? val rng = new XORShiftRandom
? ? ? rng.setSeed(seed + partitionIndex + 1)
? ? ? instances.map { instance =>
? ? ? //對(duì)每條樣本進(jìn)行numSubsamples(實(shí)際是樹(shù)的個(gè)數(shù))次抽樣,
? ? ? //一次將本條樣本在所有樹(shù)中是否會(huì)被抽取都獲得,犧牲空間減少訪問(wèn)數(shù)據(jù)次數(shù)
? ? ? ? val subsampleWeights = new Array[Double](numSubsamples)
? ? ? ? var subsampleIndex = 0
? ? ? ? while (subsampleIndex < numSubsamples) {
? ? ? ? ? val x = rng.nextDouble()
? ? ? ? ? //無(wú)放回抽樣,只需要決定本樣本是否被抽取,被抽取就是1,沒(méi)有就是0
? ? ? ? ? subsampleWeights(subsampleIndex) = {
? ? ? ? ? ? if (x < subsamplingRate) 1.0 else 0.0
? ? ? ? ? }
? ? ? ? ? subsampleIndex += 1
? ? ? ? }
? ? ? ? new BaggedPoint(instance, subsampleWeights)
? ? ? }
? ? }
? }

有放回抽樣

def convertToBaggedRDDSamplingWithReplacement[Datum] (
? ? ? input: RDD[Datum],
? ? ? subsample: Double,
? ? ? numSubsamples: Int,
? ? ? seed: Long): RDD[BaggedPoint[Datum]] = {
? ? input.mapPartitionsWithIndex { (partitionIndex, instances) =>
? ? ? // Use random seed = seed + partitionIndex + 1 to make generation reproducible.
? ? ? val poisson = new PoissonDistribution(subsample)
? ? ? poisson.reseedRandomGenerator(seed + partitionIndex + 1)
? ? ? instances.map { instance =>
? ? ? ? val subsampleWeights = new Array[Double](numSubsamples)
? ? ? ? var subsampleIndex = 0
? ? ? ? while (subsampleIndex < numSubsamples) {
? ? ? ? //與無(wú)放回抽樣對(duì)比,這里用泊松抽樣返回的是樣本被抽取的次數(shù),
? ? ? ? //可能大于1,而無(wú)放回是0/1,也可認(rèn)為是被抽取的次數(shù)
? ? ? ? ? subsampleWeights(subsampleIndex) = poisson.sample()
? ? ? ? ? subsampleIndex += 1
? ? ? ? }
? ? ? ? new BaggedPoint(instance, subsampleWeights)
? ? ? }
? ? }
? }

不抽樣,或者說(shuō)抽樣率為1

def convertToBaggedRDDWithoutSampling[Datum] (
? ? ? input: RDD[Datum]): RDD[BaggedPoint[Datum]] = {
? ? input.map(datum => new BaggedPoint(datum, Array(1.0)))
? }

這里再啰嗦的羅列下BaggedPoint

class BaggedPoint[Datum](
? ? val datum: Datum,?
? ? val subsampleWeights: Array[Double])

datum是TreePoint,subsampleWeights是數(shù)組,維數(shù)等于numberTrees,每個(gè)值是樣本在每棵樹(shù)中被抽取的次數(shù)

至此,Random Forest的初始化工作已經(jīng)完成

timer.stop("init")
?

6. 隨機(jī)森林訓(xùn)練
6.1. 數(shù)據(jù)結(jié)構(gòu)
6.1.1. Node
樹(shù)中的每個(gè)節(jié)點(diǎn)是一個(gè)Node結(jié)構(gòu)

class Node @Since("1.2.0") (
? ? @Since("1.0.0") val id: Int,
? ? @Since("1.0.0") var predict: Predict,
? ? @Since("1.2.0") var impurity: Double,
? ? @Since("1.0.0") var isLeaf: Boolean,
? ? @Since("1.0.0") var split: Option[Split],
? ? @Since("1.0.0") var leftNode: Option[Node],
? ? @Since("1.0.0") var rightNode: Option[Node],
? ? @Since("1.0.0") var stats: Option[InformationGainStats])

emptyNode,只初始化nodeIndex,其他都是默認(rèn)值

def emptyNode(nodeIndex: Int): Node =?
? ? new Node(nodeIndex, new Predict(Double.MinValue),
? ? -1.0, false, None, None, None, None)

根據(jù)node的id,計(jì)算孩子節(jié)點(diǎn)的id

? ?* Return the index of the left child of this node.
? ?*/
? def leftChildIndex(nodeIndex: Int): Int = nodeIndex << 1

? /**
? ?* Return the index of the right child of this node.
? ?*/
? def rightChildIndex(nodeIndex: Int): Int = (nodeIndex << 1) + 1

左孩子節(jié)點(diǎn)就是當(dāng)前id * 2,右孩子是id * 2+1。

6.1.2. Entropy
6.1.2.1. Entropy
Entropy是個(gè)Object,里面最重要的是calculate函數(shù)

/**
? ?* :: DeveloperApi ::
? ?* information calculation for multiclass classification
? ?* @param counts Array[Double] with counts for each label
? ?* @param totalCount sum of counts for all labels
? ?* @return information value, or 0 if totalCount = 0
? ?*/
? @Since("1.1.0")
? @DeveloperApi
? override def calculate(counts: Array[Double], totalCount: Double): Double = {
? ? if (totalCount == 0) {
? ? ? return 0
? ? }
? ? val numClasses = counts.length
? ? var impurity = 0.0
? ? var classIndex = 0
? ? while (classIndex < numClasses) {
? ? ? val classCount = counts(classIndex)
? ? ? if (classCount != 0) {
? ? ? ? val freq = classCount / totalCount
? ? ? ? impurity -= freq * log2(freq)
? ? ? }
? ? ? classIndex += 1
? ? }
? ? impurity
? }

熵的計(jì)算公式?
H=E[?logpi]=?∑i=1n?pilogpi
H=E[?logpi]=?∑i=1n?pilogpi

因此這里的入?yún)ount是各class的出現(xiàn)的次數(shù),先計(jì)算出現(xiàn)概率,然后取log累加。
6.1.2.2. EntropyAggregator
class EntropyAggregator(numClasses: Int)
? extends ImpurityAggregator(numClasses)

只有一個(gè)成員變量class的個(gè)數(shù),關(guān)鍵是update函數(shù)

/**
? ?* Update stats for one (node, feature, bin) with the given label.
? ?* @param allStats ?Flat stats array, with stats for this (node, feature, bin) contiguous.
? ?* @param offset ? ?Start index of stats for this (node, feature, bin).
? ?*/
? def update(allStats: Array[Double], offset: Int, label: Double, instanceWeight: Double): Unit = {
? ? if (label >= statsSize) {
? ? ? throw new IllegalArgumentException(s"EntropyAggregator given label $label" +
? ? ? ? s" but requires label < numClasses (= $statsSize).")
? ? }
? ? if (label < 0) {
? ? ? throw new IllegalArgumentException(s"EntropyAggregator given label $label" +
? ? ? ? s"but requires label is non-negative.")
? ? }
? ? allStats(offset + label.toInt) += instanceWeight
? }

offset是特征值偏移,加上label就是該class在allStats里的位置,累加出現(xiàn)的次數(shù)

/**
? ?* Get an [[ImpurityCalculator]] for a (node, feature, bin).
? ?* @param allStats ?Flat stats array, with stats for this (node, feature, bin) contiguous.
? ?* @param offset ? ?Start index of stats for this (node, feature, bin).
? ?*/
? def getCalculator(allStats: Array[Double], offset: Int): EntropyCalculator = {
? ? new EntropyCalculator(allStats.view(offset, offset + statsSize).toArray)
? }

截取allStats中屬于該特征的split的部分?jǐn)?shù)組,長(zhǎng)度是statSize,也就是class數(shù)

6.1.2.3. EntropyCalculator
/**
? ?* Calculate the impurity from the stored sufficient statistics.
? ?*/
? def calculate(): Double = Entropy.calculate(stats, stats.sum)

結(jié)合上面的函數(shù)可以看到,計(jì)算entropy的路徑是調(diào)用Entropy的getCalculator函數(shù),里面截取allStats中屬于該split的部分,然后實(shí)際調(diào)用Entropy的calculate函數(shù)計(jì)算熵。?
這里還重載了prob函數(shù),主要是返回label的概率,例如0的統(tǒng)計(jì)有3個(gè),1的統(tǒng)計(jì)7個(gè),則label 0的概率就是0.3.

6.1.3. DTStatsAggregator
這里啰嗦下node分裂時(shí)需要怎樣統(tǒng)計(jì),這與DTStatsAggregator的設(shè)計(jì)是相關(guān)的。以使用信息熵為例,node分裂時(shí),迭代每個(gè)特征的每個(gè)split,這個(gè)split會(huì)把樣本集分成兩部分,要計(jì)算entropy,需要分別統(tǒng)計(jì)左/右部分class的分布情況,然后計(jì)算概率,進(jìn)而計(jì)算entropy,因此aggregator中statsSize等于numberclasses,同時(shí)allStats里記錄了所有的統(tǒng)計(jì)值,實(shí)際這個(gè)統(tǒng)計(jì)值就是class的分布情況

class DTStatsAggregator(
? ? val metadata: DecisionTreeMetadata,
? ? featureSubset: Option[Array[Int]]) extends Serializable {

? /**
? ?* [[ImpurityAggregator]] instance specifying the impurity type.
? ?*/
? val impurityAggregator: ImpurityAggregator = metadata.impurity match {
? ? case Gini => new GiniAggregator(metadata.numClasses)
? ? case Entropy => new EntropyAggregator(metadata.numClasses)
? ? case Variance => new VarianceAggregator()
? ? case _ => throw new IllegalArgumentException(s"Bad impurity parameter: ${metadata.impurity}")
? }

? /**
? ?* Number of elements (Double values) used for the sufficient statistics of each bin.
? ?*/
? private val statsSize: Int = impurityAggregator.statsSize

? /**
? ?* Number of bins for each feature. ?This is indexed by the feature index.
? ?*/
? private val numBins: Array[Int] = {
? ? if (featureSubset.isDefined) {
? ? ? featureSubset.get.map(metadata.numBins(_))
? ? } else {
? ? ? metadata.numBins
? ? }
? }

? /**
? ?* Offset for each feature for calculating indices into the [[allStats]] array.
? ?*/
? private val featureOffsets: Array[Int] = {
? ? numBins.scanLeft(0)((total, nBins) => total + statsSize * nBins)
? }

? /**
? ?* Total number of elements stored in this aggregator
? ?*/
? private val allStatsSize: Int = featureOffsets.last

? /**
? ?* Flat array of elements.
? ?* Index for start of stats for a (feature, bin) is:
? ?* ? index = featureOffsets(featureIndex) + binIndex * statsSize
? ?* Note: For unordered features,
? ?* ? ? ? the left child stats have binIndex in [0, numBins(featureIndex) / 2))
? ?* ? ? ? and the right child stats in [numBins(featureIndex) / 2), numBins(featureIndex))
? ?*/
? private val allStats: Array[Double] = new Array[Double](allStatsSize)

每個(gè)node有一個(gè)DTStatsAggregator,構(gòu)造函數(shù)接受2個(gè)參數(shù),metadata和node使用的特征子集。其他的類成員?
- impurityAggregator:目前支持Gini,Entropy和Variance,后面我們以Entropy為例,其他類似?
- statsSize:每個(gè)bin需要的統(tǒng)計(jì)數(shù),分類時(shí)等于numClasses,因?yàn)橛诿總€(gè)class都需要單獨(dú)統(tǒng)計(jì);回歸等于3,分別存著特征值個(gè)數(shù),特征值sum,特征值平方和,為計(jì)算variance?
- numBins:node所用特征對(duì)應(yīng)的numBins數(shù)組元素?
- featureOffsets:計(jì)算特征在allStats中的index,與每個(gè)特征的bin個(gè)數(shù)和statsSize有關(guān),例如我們有3個(gè)特征,其bins分別為3,2,2,statsSize為2,則第一個(gè)特征需要的bin的個(gè)數(shù)是3 * 2=6,2 * 2=4,2 * 2=4,則featureOffsets為0,6,10,14,是從左到右的累計(jì)值?
- allStatsSize:需要的桶的個(gè)數(shù)?
- allStats:存儲(chǔ)統(tǒng)計(jì)值的桶?

f0,f1,f2是3個(gè)特征,f0有3個(gè)特征值(其實(shí)是binIndex)0/1/2,f1有2個(gè)0/1,f2有2個(gè)0/1,每個(gè)特征值都有statsSize個(gè)狀態(tài)桶,因此共14個(gè),個(gè)數(shù)allStatsSize=14, 比如我們想在f1的v1的c1的index,就是從featureOffsets中取得f1的特征偏移量featureOffsets(1)=6,v1的binIndex相當(dāng)于是1,statsSize是2,其label是1,則桶的index=6+1*2+1=9,恰好是圖中f1v1的c1的桶的index

我們對(duì)其中的關(guān)鍵函數(shù)進(jìn)行說(shuō)明

/**
? ?* Update the stats for a given (feature, bin) for ordered features, using the given label.
? ?*/
? def update(featureIndex: Int, binIndex: Int, label: Double, instanceWeight: Double): Unit = {
? //第一部分是特征偏移
? //binIndex相當(dāng)于特征內(nèi)特征值的偏移,每個(gè)特征有statsSize個(gè)桶,因此兩者相加就是這個(gè)特征值對(duì)應(yīng)的桶
? //例如Entropy的update函數(shù),里面再加上label.toInt就是這個(gè)label的桶
? //從這里特征偏移的計(jì)算可以看出ordered特征其特征值最好是連續(xù)的,中間無(wú)間斷,并且必須從0開(kāi)始
? //當(dāng)然如果有間斷,這里相當(dāng)于浪費(fèi)部分空間
? ? val i = featureOffsets(featureIndex) + binIndex * statsSize
? ? impurityAggregator.update(allStats, i, label, instanceWeight)
? }
? /**
? ?* Get an [[ImpurityCalculator]] for a given (node, feature, bin).
? ?* @param featureOffset ?For ordered features, this is a pre-computed (node, feature) offset
? ?* ? ? ? ? ? ? ? ? ? ? ? ? ? from [[getFeatureOffset]].
? ?* ? ? ? ? ? ? ? ? ? ? ? ? ? For unordered features, this is a pre-computed
? ?* ? ? ? ? ? ? ? ? ? ? ? ? ? (node, feature, left/right child) offset from
? ?* ? ? ? ? ? ? ? ? ? ? ? ? ? [[getLeftRightFeatureOffsets]].
? ?*/
? def getImpurityCalculator(featureOffset: Int, binIndex: Int): ImpurityCalculator = {
? //偏移的計(jì)算同上,不過(guò)這里特征偏移是入?yún)⒔o出的,不需要再計(jì)算
? ? impurityAggregator.getCalculator(allStats, featureOffset + binIndex * statsSize)
? }

6.2. 訓(xùn)練初始化
// FIFO queue of nodes to train: (treeIndex, node)
val nodeQueue = new mutable.Queue[(Int, Node)]()

val topNodes: Array[Node] = Array.fill[Node](numTrees)(Node.emptyNode(nodeIndex = 1))
? ? Range(0, numTrees).foreach(treeIndex => nodeQueue.enqueue((treeIndex, topNodes(treeIndex))))

構(gòu)造了numTrees個(gè)Node,賦默認(rèn)值emptyNode,這些node將作為每棵樹(shù)的root node,參與后面的訓(xùn)練。將這些node與treeIndex封裝加入到隊(duì)列nodeQueue中,后面會(huì)將所有待split的node都加入到這個(gè)隊(duì)列中,依次split,直到所有node觸發(fā)截止條件,也就是后面的while循環(huán)中隊(duì)列為空了。

6.3. 選擇待分裂node
這部分邏輯在selectNodesToSplit中,主要是從nodeQueue中取出本輪需要分裂的node,并計(jì)算node的參數(shù)。

/**
? ?* Pull nodes off of the queue, and collect a group of nodes to be split on this iteration.
? ?* This tracks the memory usage for aggregates and stops adding nodes when too much memory
? ?* will be needed; this allows an adaptive number of nodes since different nodes may require
? ?* different amounts of memory (if featureSubsetStrategy is not "all").
? ?*
? ?* @param nodeQueue ?Queue of nodes to split.
? ?* @param maxMemoryUsage ?Bound on size of aggregate statistics.
? ?* @return ?(nodesForGroup, treeToNodeToIndexInfo).
? ?* ? ? ? ? ?nodesForGroup holds the nodes to split: treeIndex --> nodes in tree.
? ?*
? ?* ? ? ? ? ?treeToNodeToIndexInfo holds indices selected features for each node:
? ?* ? ? ? ? ? ?treeIndex --> (global) node index --> (node index in group, feature indices).
? ?* ? ? ? ? ?The (global) node index is the index in the tree; the node index in group is the
? ?* ? ? ? ? ? index in [0, numNodesInGroup) of the node in this group.
? ?* ? ? ? ? ?The feature indices are None if not subsampling features.
? ?*/
? private[tree] def selectNodesToSplit(
? ? ? nodeQueue: mutable.Queue[(Int, Node)],
? ? ? maxMemoryUsage: Long,
? ? ? metadata: DecisionTreeMetadata,
? ? ? rng: scala.util.Random): (Map[Int, Array[Node]], Map[Int, Map[Int, NodeIndexInfo]]) = {
? ? // Collect some nodes to split:
? ? // ?nodesForGroup(treeIndex) = nodes to split
? ? val mutableNodesForGroup = new mutable.HashMap[Int, mutable.ArrayBuffer[Node]]()
? ? val mutableTreeToNodeToIndexInfo =
? ? ? new mutable.HashMap[Int, mutable.HashMap[Int, NodeIndexInfo]]()
? ? var memUsage: Long = 0L
? ? var numNodesInGroup = 0
? ? while (nodeQueue.nonEmpty && memUsage < maxMemoryUsage) {
? ? ? val (treeIndex, node) = nodeQueue.head
? ? ? //用蓄水池抽樣(之前的文章有介紹)對(duì)node使用的特征集抽樣
? ? ? // Choose subset of features for node (if subsampling).
? ? ? val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) {
? ? ? ? Some(SamplingUtils.reservoirSampleAndCount(Range(0,
? ? ? ? ? metadata.numFeatures).iterator, metadata.numFeaturesPerNode, rng.nextLong)._1)
? ? ? } else {
? ? ? ? None
? ? ? }
? ? ? // Check if enough memory remains to add this node to the group.
? ? ? val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L
? ? ? if (memUsage + nodeMemUsage <= maxMemoryUsage) {
? ? ? ? nodeQueue.dequeue()
? ? ? ? mutableNodesForGroup.getOrElseUpdate(treeIndex, new mutable.ArrayBuffer[Node]()) += node
? ? ? ? mutableTreeToNodeToIndexInfo
? ? ? ? ? .getOrElseUpdate(treeIndex, new mutable.HashMap[Int, NodeIndexInfo]())(node.id)
? ? ? ? ? = new NodeIndexInfo(numNodesInGroup, featureSubset)
? ? ? }
? ? ? numNodesInGroup += 1
? ? ? memUsage += nodeMemUsage
? ? }
? ? // Convert mutable maps to immutable ones.
? ? val nodesForGroup: Map[Int, Array[Node]] = mutableNodesForGroup.mapValues(_.toArray).toMap
? ? val treeToNodeToIndexInfo = mutableTreeToNodeToIndexInfo.mapValues(_.toMap).toMap
? ? (nodesForGroup, treeToNodeToIndexInfo)
? }

代碼比較簡(jiǎn)單明確,受限于內(nèi)存,將本次能夠處理的node從nodeQueue中取出,放入nodesForGroup和treeToNodeToIndexInfo中。?
是否對(duì)特征集進(jìn)行抽樣的條件是metadata的 numFeatures是否等于numFeaturesPerNode,這兩個(gè)參數(shù)是metadata的入?yún)?#xff0c;在buildMetadata時(shí),根據(jù)featureSubsetStrateg確定,參見(jiàn)前文。?
nodesForGroup是Map[Int, Array[Node]],其key是treeIndex,value是Node數(shù)組,其中放著該tree本次要分裂的node。?
treeToNodeToIndexInfo的類型是Map[Int, Map[Int, NodeIndexInfo]],key為treeIndex,value中Map的key是node.id,這個(gè)id來(lái)自Node初始化時(shí)的第一個(gè)參數(shù),第一輪時(shí)node的id都是1。其value為NodeIndexInfo結(jié)構(gòu),

class NodeIndexInfo(
? ? ? val nodeIndexInGroup: Int,
? ? ? val featureSubset: Option[Array[Int]])

第一個(gè)成員是此node在本次node選擇的while循環(huán)中的index,稱為groupIndex,第二個(gè)成員是特征子集。
?

6.4. node分裂
邏輯主要在DecisionTree.findBestSplits函數(shù)中,是RF訓(xùn)練最核心的部分

DecisionTree.findBestSplits(baggedInput, metadata, topNodes, nodesForGroup,
? ? ? ? treeToNodeToIndexInfo, splits, bins, nodeQueue, timer, nodeIdCache = nodeIdCache)

6.4.1. 數(shù)據(jù)統(tǒng)計(jì)
數(shù)據(jù)統(tǒng)計(jì)分成兩部分,先在各個(gè)partition上分別統(tǒng)計(jì),再累積各partition成全局統(tǒng)計(jì)。

6.4.1.1. 取出node的特征子集
val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)

取出各node的特征子集,如果不需要抽樣則為None;否則返回Map[Int, Array[Int]],其實(shí)就是將之前treeToNodeToIndexInfo中的NodeIndexInfo轉(zhuǎn)換為map結(jié)構(gòu),將其作為廣播變量nodeToFeaturesBc。

6.4.1.2. 分區(qū)統(tǒng)計(jì)
一系列函數(shù)的調(diào)用鏈,我們逐層分析

val partitionAggregates : RDD[(Int, DTStatsAggregator)] = if (nodeIdCache.nonEmpty) {
? ? ? input.zip(nodeIdCache.get.nodeIdsForInstances).mapPartitions { points =>
? ? ? ? // Construct a nodeStatsAggregators array to hold node aggregate stats,
? ? ? ? // each node will have a nodeStatsAggregator
? ? ? ? val nodeStatsAggregators = Array.tabulate(numNodes) { nodeIndex =>
? ? ? ? ? val featuresForNode = nodeToFeaturesBc.value.flatMap { nodeToFeatures =>
? ? ? ? ? ? Some(nodeToFeatures(nodeIndex))
? ? ? ? ? }
? ? ? ? ? new DTStatsAggregator(metadata, featuresForNode)
? ? ? ? }

? ? ? ? // iterator all instances in current partition and update aggregate stats
? ? ? ? points.foreach(binSeqOpWithNodeIdCache(nodeStatsAggregators, _))

? ? ? ? // transform nodeStatsAggregators array to (nodeIndex, nodeAggregateStats) pairs,
? ? ? ? // which can be combined with other partition using `reduceByKey`
? ? ? ? nodeStatsAggregators.view.zipWithIndex.map(_.swap).iterator
? ? ? }
? ? } else {
? ? ? input.mapPartitions { points =>
? ? ? ? // Construct a nodeStatsAggregators array to hold node aggregate stats,
? ? ? ? // each node will have a nodeStatsAggregator
? ? ? ? val nodeStatsAggregators = Array.tabulate(numNodes) { nodeIndex =>
? ? ? ? ? val featuresForNode = nodeToFeaturesBc.value.flatMap { nodeToFeatures =>
? ? ? ? ? ? Some(nodeToFeatures(nodeIndex))
? ? ? ? ? }
? ? ? ? ? new DTStatsAggregator(metadata, featuresForNode)
? ? ? ? }

? ? ? ? // iterator all instances in current partition and update aggregate stats
? ? ? ? points.foreach(binSeqOp(nodeStatsAggregators, _))

? ? ? ? // transform nodeStatsAggregators array to (nodeIndex, nodeAggregateStats) pairs,
? ? ? ? // which can be combined with other partition using `reduceByKey`
? ? ? ? nodeStatsAggregators.view.zipWithIndex.map(_.swap).iterator
? ? ? }
? ? }

首先對(duì)每個(gè)partition構(gòu)造一個(gè)DTStatsAggregator數(shù)組,長(zhǎng)度是node的個(gè)數(shù),注意這里實(shí)際使用的是數(shù)組,node怎樣與自己的aggregator的對(duì)應(yīng)?前面我們提到NodeIndexInfo的第一個(gè)成員是groupIndex,其值就是node的次序,和這里aggregator數(shù)組index其實(shí)是對(duì)應(yīng)的,也就是說(shuō)可以從NodeIndexInfo中取得groupIndex,然后作為數(shù)組index取得對(duì)應(yīng)node的agg。DTStatsAggregator的入?yún)⑹莔etadata和每個(gè)node的特征子集。然后將每個(gè)點(diǎn)統(tǒng)計(jì)到DTStatsAggregator中,其中調(diào)用了內(nèi)部函數(shù)binSeqOp,

?/**
? ? ?* Performs a sequential aggregation over a partition.
? ? ?*
? ? ?* Each data point contributes to one node. For each feature,
? ? ?* the aggregate sufficient statistics are updated for the relevant bins.
? ? ?*
? ? ?* @param agg ?Array storing aggregate calculation, with a set of sufficient statistics for
? ? ?* ? ? ? ? ? ? each (node, feature, bin).
? ? ?* @param baggedPoint ? Data point being aggregated.
? ? ?* @return ?agg
? ? ?*/
? ? def binSeqOp(
? ? ? ? agg: Array[DTStatsAggregator],
? ? ? ? baggedPoint: BaggedPoint[TreePoint]): Array[DTStatsAggregator] = {
? ? //對(duì)每個(gè)node
? ? ? treeToNodeToIndexInfo.foreach { case (treeIndex, nodeIndexToInfo) =>
? ? ? ? val nodeIndex = predictNodeIndex(topNodes(treeIndex), baggedPoint.datum.binnedFeatures,
? ? ? ? ? bins, metadata.unorderedFeatures)
? ? ? ? nodeBinSeqOp(treeIndex, nodeIndexToInfo.getOrElse(nodeIndex, null), agg, baggedPoint)
? ? ? }

? ? ? agg
? ? }

首先調(diào)用函數(shù)predictNodeIndex計(jì)算nodeIndex,如果是首輪或者葉子節(jié)點(diǎn),直接返回node.id;如果不是首輪,因?yàn)閭魅氲氖敲靠脴?shù)的root node,就從root node開(kāi)始,逐漸往下判斷該point應(yīng)該是屬于哪個(gè)node的,因?yàn)槲覀円呀?jīng)對(duì)node進(jìn)行了分裂,這里其實(shí)實(shí)現(xiàn)了樣本的劃分。舉個(gè)栗子,當(dāng)前node如果是root的左孩子節(jié)點(diǎn),而point預(yù)測(cè)節(jié)點(diǎn)應(yīng)該屬于右孩子,則調(diào)用nodeBinSepOp時(shí)就直接返回了,不會(huì)將這個(gè)point統(tǒng)計(jì)進(jìn)去,用不大的時(shí)間換取樣本集劃分的空間,還是比較巧妙的。

/**
? ?* Get the node index corresponding to this data point.
? ?* This function mimics prediction, passing an example from the root node down to a leaf
? ?* or unsplit node; that node's index is returned.
? ?*
? ?* @param node ?Node in tree from which to classify the given data point.
? ?* @param binnedFeatures ?Binned feature vector for data point.
? ?* @param bins possible bins for all features, indexed (numFeatures)(numBins)
? ?* @param unorderedFeatures ?Set of indices of unordered features.
? ?* @return ?Leaf index if the data point reaches a leaf.
? ?* ? ? ? ? ?Otherwise, last node reachable in tree matching this example.
? ?* ? ? ? ? ?Note: This is the global node index, i.e., the index used in the tree.
? ?* ? ? ? ? ? ? ? ?This index is different from the index used during training a particular
? ?* ? ? ? ? ? ? ? ?group of nodes on one call to [[findBestSplits()]].
? ?*/
? private def predictNodeIndex(
? ? ? node: Node,
? ? ? binnedFeatures: Array[Int],
? ? ? bins: Array[Array[Bin]],
? ? ? unorderedFeatures: Set[Int]): Int = {
? ? if (node.isLeaf || node.split.isEmpty) {
? ? ? // Node is either leaf, or has not yet been split.
? ? ? node.id
? ? } else {
? ? //判斷point屬于當(dāng)前node的左孩子還是右孩子
? ? ? val featureIndex = node.split.get.feature
? ? ? val splitLeft = node.split.get.featureType match {
? ? ? ? case Continuous => {
? ? ? ? ? val binIndex = binnedFeatures(featureIndex)
? ? ? ? ? val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
? ? ? ? ? // bin binIndex has range (bin.lowSplit.threshold, bin.highSplit.threshold]
? ? ? ? ? // We do not need to check lowSplit since bins are separated by splits.
? ? ? ? ? featureValueUpperBound <= node.split.get.threshold
? ? ? ? }
? ? ? ? case Categorical => {
? ? ? ? ? val featureValue = binnedFeatures(featureIndex)
? ? ? ? ? node.split.get.categories.contains(featureValue)
? ? ? ? }
? ? ? ? case _ => throw new RuntimeException(s"predictNodeIndex failed for unknown reason.")
? ? ? }
? ? ? if (node.leftNode.isEmpty || node.rightNode.isEmpty) {
? ? ? //下面還有完整的左右孩子node,遞歸判斷
? ? ? ? // Return index from next layer of nodes to train
? ? ? ? if (splitLeft) {
? ? ? ? ? Node.leftChildIndex(node.id)
? ? ? ? } else {
? ? ? ? ? Node.rightChildIndex(node.id)
? ? ? ? }
? ? ? } else {
? ? ? ? if (splitLeft) {
? ? ? ? ? predictNodeIndex(node.leftNode.get, binnedFeatures, bins, unorderedFeatures)
? ? ? ? } else {
? ? ? ? ? predictNodeIndex(node.rightNode.get, binnedFeatures, bins, unorderedFeatures)
? ? ? ? }
? ? ? }
? ? }
? }

然后調(diào)用nodeBinSeqOp函數(shù)

/**
? ? ?* Performs a sequential aggregation over a partition for a particular tree and node.
? ? ?*
? ? ?* For each feature, the aggregate sufficient statistics are updated for the relevant
? ? ?* bins.
? ? ?*
? ? ?* @param treeIndex Index of the tree that we want to perform aggregation for.
? ? ?* @param nodeInfo The node info for the tree node.
? ? ?* @param agg Array storing aggregate calculation, with a set of sufficient statistics
? ? ?* ? ? ? ? ? ?for each (node, feature, bin).
? ? ?* @param baggedPoint Data point being aggregated.
? ? ?*/
? ? def nodeBinSeqOp(
? ? ? ? treeIndex: Int,
? ? ? ? nodeInfo: RandomForest.NodeIndexInfo,
? ? ? ? agg: Array[DTStatsAggregator],
? ? ? ? baggedPoint: BaggedPoint[TreePoint]): Unit = {
? ? ? if (nodeInfo != null) {
? ? ? //node的groupIndex,見(jiàn)前文
? ? ? ? val aggNodeIndex = nodeInfo.nodeIndexInGroup
? ? ? ? //node使用的特征子集
? ? ? ? val featuresForNode = nodeInfo.featureSubset
? ? ? ? //取樣本在這棵樹(shù)中出現(xiàn)的次數(shù) 0/1/k
? ? ? ? val instanceWeight = baggedPoint.subsampleWeights(treeIndex)
? ? ? ? if (metadata.unorderedFeatures.isEmpty) {
? ? ? ? ? orderedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, instanceWeight, featuresForNode)
? ? ? ? } else {
? ? ? ? ? mixedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, splits,
? ? ? ? ? ? metadata.unorderedFeatures, instanceWeight, featuresForNode)
? ? ? ? }
? ? ? }
? ? }

函數(shù)的入?yún)⑹莟reeIndex,該node的NodeIndexInfo結(jié)構(gòu),所有node的累加器數(shù)組,樣本。本函數(shù)是針對(duì)單個(gè)node的操作,這里可以看到取node對(duì)應(yīng)的aggregator就是通過(guò)NodeIndexInfo的第一個(gè)成員nodeIndexInGroup作為agg數(shù)組的index。?
如果不包含無(wú)序特征,調(diào)用orderedBinSeqOp函數(shù)

?/**
? ?* Helper for binSeqOp, for regression and for classification with only ordered features.
? ?*
? ?* For each feature, the sufficient statistics of one bin are updated.
? ?*
? ?* @param agg ?Array storing aggregate calculation, with a set of sufficient statistics for
? ?* ? ? ? ? ? ? each (feature, bin).
? ?* @param treePoint ?Data point being aggregated.
? ?* @param instanceWeight ?Weight (importance) of instance in dataset.
? ?*/
? private def orderedBinSeqOp(
? ? ? agg: DTStatsAggregator, //node的agg
? ? ? treePoint: TreePoint,
? ? ? instanceWeight: Double,
? ? ? featuresForNode: Option[Array[Int]]): Unit = {
? ? val label = treePoint.label

? ? // Iterate over features.
? ? if (featuresForNode.nonEmpty) {
? ? ? // Use subsampled features
? ? ? var featureIndexIdx = 0
? ? ? while (featureIndexIdx < featuresForNode.get.size) {
? ? ? //連續(xù)特征:離散化后的index
? ? ? //離散特征:featureValue.toInt
? ? ? ? val binIndex = treePoint.binnedFeatures(featuresForNode.get.apply(featureIndexIdx))
? ? ? ? agg.update(featureIndexIdx, binIndex, label, instanceWeight)
? ? ? ? featureIndexIdx += 1
? ? ? }
? ? } else {
? ? ? // Use all features
? ? ? val numFeatures = agg.metadata.numFeatures
? ? ? var featureIndex = 0
? ? ? while (featureIndex < numFeatures) {
? ? ? ? val binIndex = treePoint.binnedFeatures(featureIndex)
? ? ? ? agg.update(featureIndex, binIndex, label, instanceWeight)
? ? ? ? featureIndex += 1
? ? ? }
? ? }
? }

函數(shù)中區(qū)分了是否使用了全部特征,區(qū)別僅在于如果使用了部分特征(特征抽樣),需要先在featuresForNode中取得特征的實(shí)際index。?
函數(shù)其實(shí)就是取出樣本的使用特征,特征值,label和weight,更新到aggregator中,更新邏輯我們?cè)谇拔囊呀?jīng)說(shuō)明過(guò)了。?
包含了無(wú)序離散特征,則使用mixedBinSeqOp,只有無(wú)序離散特征處理方法不同于orderedBinSeqOp函數(shù)

// Unordered feature
val featureValue = treePoint.binnedFeatures(featureIndex)
//找到特征值對(duì)應(yīng)的allStats中的范圍
//特征起始位置從featureOffsets中取得,長(zhǎng)度是bins的個(gè)數(shù)乘以分類個(gè)數(shù),2*(2^(M-1)-1)*statsSize,
//每一個(gè)split將樣本集分成2部分,allStats中左邊部分連續(xù)存放,右半部分連續(xù)存放,而不是左右一起存放。
//因此,左邊的起始位置直接可以從featureOffsets中獲取,右邊起始位置是(2^(M-1)-1)*statsSize
val (leftNodeFeatureOffset, rightNodeFeatureOffset) =
agg.getLeftRightFeatureOffsets(featureIndexIdx)
// Update the left or right bin for each split.
val numSplits = agg.metadata.numSplits(featureIndex)
var splitIndex = 0
while (splitIndex < numSplits) {
? ? //split中的categories中包含左半邊特征值組合,splitIndex相當(dāng)于其離散化后的特征index
? ? if (splits(featureIndex)(splitIndex).categories.contains(featureValue)) {
? ? agg.featureUpdate(leftNodeFeatureOffset, splitIndex, treePoint.label,
? ? ? ? ? ? ? instanceWeight)
? ? } else {
? ? ? ? agg.featureUpdate(rightNodeFeatureOffset, splitIndex, treePoint.label,
? ? ? ? ? ? ? instanceWeight)
? ? }
? ? ? ? ? splitIndex += 1
}

6.4.1.3. 全局統(tǒng)計(jì)
partitionAggregates.reduceByKey((a, b) => a.merge(b))
1
就是將所有存在allStats中的分區(qū)統(tǒng)計(jì)結(jié)果逐個(gè)對(duì)應(yīng)相加得到全局統(tǒng)計(jì)結(jié)果。

6.4.2. bestSplits
獲得所有的統(tǒng)計(jì)后,就可以遍歷所有的特征,計(jì)算impurity gain,確定最佳的split。

val nodeToBestSplits = partitionAggregates.reduceByKey((a, b) => a.merge(b))
? ? .map { case (nodeIndex, aggStats) =>
? ? ? ? ?val featuresForNode = nodeToFeaturesBc.value.map { nodeToFeatures =>
? ? ? ? nodeToFeatures(nodeIndex)
? ? ? ? ?}
? ? ? ? ?// find best split for each node
? ? ? ? val (split: Split, stats: InformationGainStats, predict: Predict) =
? ? ? ? ? ? binsToBestSplit(aggStats, splits, featuresForNode, nodes(nodeIndex))
? ? ? ? ? (nodeIndex, (split, stats, predict))
? ? ? ? }.collectAsMap()

對(duì)每個(gè)node其中調(diào)用了binsToBestSplit函數(shù),下面進(jìn)行詳細(xì)說(shuō)明。

6.4.2.1 init
函數(shù)首先獲取node在樹(shù)的第幾層,樹(shù)結(jié)構(gòu)如圖?
?
樹(shù)的id如圖所示,判斷node在第幾層只需要判斷id的二進(jìn)制表示的最高位的1在第幾位即可,比如6的二進(jìn)制表示是110,最高位的1是在第3位,則其在第3層。?
然后獲取當(dāng)前node的預(yù)測(cè)值和impurity

// calculate predict and impurity if current node is top node
? ? val level = Node.indexToLevel(node.id)
? ? var predictWithImpurity: Option[(Predict, Double)] = if (level == 0) {
? ? ? None
? ? } else {
? ? ? Some((node.predict, node.impurity))
? ? }

6.4.2.2 連續(xù)特征
對(duì)于連續(xù)特征而言,當(dāng)取其某個(gè)特征值為best split后,node的樣本會(huì)被分成大于該特征值和小于等于該特征值兩部分,需要分別統(tǒng)計(jì)兩部分的class分布情況;另一方面,我們查找best,因此要遍歷所有特征值的情況,一種巧妙的方法是,從左邊開(kāi)始逐次累積統(tǒng)計(jì)數(shù)據(jù),需要從某個(gè)特征值作為split時(shí),當(dāng)前累計(jì)值就是左邊小于等于的情況,用最右的值減去左邊就是右邊的情況。?

例如上圖中的情況,是某特征6個(gè)特征值分布情況,第一行是左累計(jì),第二行是原始分布,當(dāng)以v2作為split時(shí),左邊的分布就是c0:8,c1:5,右邊是v6的分布減去v2,c0:19-8=11,c1:14-5=9。

if (binAggregates.metadata.isContinuous(featureIndex)) {
? ? ? ? // Cumulative sum (scanLeft) of bin statistics.
? ? ? ? // Afterwards, binAggregates for a bin is the sum of aggregates for
? ? ? ? // that bin + all preceding bins.
? ? ? ? //如上所述,累計(jì)
? ? ? ? val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
? ? ? ? var splitIndex = 0
? ? ? ? while (splitIndex < numSplits) {
? ? ? ? ? ? binAggregates.mergeForFeature(nodeFeatureOffset, splitIndex + 1, splitIndex)
? ? ? ? ? ? splitIndex += 1
? ? ? ? }
? ? ? ? // Find best split.
? ? ? ? val (bestFeatureSplitIndex, bestFeatureGainStats) =
? ? ? ? ? ? Range(0, numSplits).map { case splitIdx =>
? ? ? ? ? ? ? val leftChildStats = binAggregates.getImpurityCalculator(nodeFeatureOffset, splitIdx)
? ? ? ? ? ? ? val rightChildStats =
? ? ? ? ? ? ? ? binAggregates.getImpurityCalculator(nodeFeatureOffset, numSplits)
? ? ? ? ? ? ? rightChildStats.subtract(leftChildStats)
? ? ? ? ? ? ? //獲得node的impurity,level==0時(shí),需要根據(jù)當(dāng)前class的分布計(jì)算
? ? ? ? ? ? ? predictWithImpurity = Some(predictWithImpurity.getOrElse(
? ? ? ? ? ? ? ? calculatePredictImpurity(leftChildStats, rightChildStats)))
? ? ? ? ? ? ? val gainStats = calculateGainForSplit(leftChildStats,
? ? ? ? ? ? ? ? rightChildStats, binAggregates.metadata, predictWithImpurity.get._2)
? ? ? ? ? ? ? (splitIdx, gainStats)
? ? ? ? ? ? }.maxBy(_._2.gain)
? ? ? ? (splits(featureIndex)(bestFeatureSplitIndex), bestFeatureGainStats)

計(jì)算split分裂node的impurity增益時(shí),調(diào)用了calculateGainForSplit函數(shù),其中分別計(jì)算了左右的增益,然后概率合并,并計(jì)算了左右的預(yù)測(cè)值,代碼比較簡(jiǎn)單,這里不再贅述。

6.4.2.3. Unordered categorical feature
只有獲取左右class的統(tǒng)計(jì)情況方法不一致,其他是一樣的。

6.4.2.4. Ordered categorical feature
對(duì)于連續(xù)特征,特征值或者是binIndex是有序的,或者說(shuō)其數(shù)值可以排序,因此如果某個(gè)特征值被當(dāng)做split,分隔的就是左右兩部分;對(duì)于無(wú)序離散特征,其被split分隔后特征值屬于哪個(gè)bin是確定的;對(duì)于有序離散特征,其特征值代表一定次序關(guān)系,但是不具有絕對(duì)大小的含義,其處理方法可以近似按照連續(xù)特征的方法處理,但是spark這里處理了下,可能更優(yōu)點(diǎn)。?
spark首先會(huì)確定一個(gè)centroid,然后特征會(huì)按這個(gè)排序,這個(gè)相當(dāng)于連續(xù)特征的binIndex。例如centroid如果取每個(gè)特征值中class1的個(gè)數(shù),假設(shè)有特征值0,1,2,3,class1的個(gè)數(shù)分別為4,2,1,3,其中如果按照連續(xù)特征的處理方法,假設(shè)用1作為node的分裂點(diǎn),計(jì)算impurity gain的時(shí)候分成0,1和2,3兩部分統(tǒng)計(jì)。如果按照centroid的方法,其特征值排序次序應(yīng)該是2,1,3,0,以1作為分裂點(diǎn),會(huì)被分成2,1和3,0兩部分。

// Ordered categorical feature
val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
val numBins = binAggregates.metadata.numBins(featureIndex)

/* Each bin is one category (feature value).
* The bins are ordered based on centroidForCategories, and this ordering determines which
* splits are considered. ?(With K categories, we consider K - 1 possible splits.)
*
* centroidForCategories is a list: (category, centroid)
*/
val centroidForCategories = Range(0, numBins).map { case featureValue =>
? ? val categoryStats = binAggregates.getImpurityCalculator(nodeFeatureOffset, featureValue)
? ? val centroid = if (categoryStats.count != 0) {
? ? ? ? if (binAggregates.metadata.isMulticlass) {
? ? ? ? // For categorical variables in multiclass classification,
? ? ? ? // the bins are ordered by the impurity of their corresponding labels.
? ? ? ? ? ? categoryStats.calculate()
? ? ? ? } else if (binAggregates.metadata.isClassification) {
? ? ? ? // For categorical variables in binary classification,
? ? ? ? // the bins are ordered by the count of class 1.
? ? ? ? ? ? categoryStats.stats(1)
? ? ? ? } else {
? ? ? ? ? ? // For categorical variables in regression,
? ? ? ? ? ? // the bins are ordered by the prediction.
? ? ? ? ? ? categoryStats.predict
? ? ? ? }
? ? } else {
? ? ? ? Double.MaxValue
? ? }
? ? (featureValue, centroid)
}

logDebug("Centroids for categorical variable: " + centroidForCategories.mkString(","))

// bins sorted by centroids
val categoriesSortedByCentroid = centroidForCategories.toList.sortBy(_._2)

上面的代碼為不同的情況設(shè)置不同的centroid的選取方法。如果是多分類,使用impurity;如果是二分類,使用class1的個(gè)數(shù);如果是回歸,使用預(yù)測(cè)值(實(shí)際是均值)。然后將特征值按centroid重排序。?
下面的處理基本與連續(xù)特征類似,先按排序次序累計(jì),然后計(jì)算左右的impurity,計(jì)算impurity gain。由于要返回split,之前離散特征的split返回的是空Array,這里構(gòu)造了split,第四個(gè)參數(shù)中加入了實(shí)際的特征值,類比unordered的情況。

計(jì)算完完所有的特征的gain,就可以選取最大增益時(shí)的split,最后collectAsMap,key是nodeIndex,value是split, InfomationGainStats,predict的三元組。

6.4.3. node分裂
計(jì)算完節(jié)點(diǎn)的best split,就要根據(jù)這個(gè)split進(jìn)行node的分裂,包括當(dāng)前節(jié)點(diǎn)的一些屬性完善,左右孩子節(jié)點(diǎn)的構(gòu)造等。

// Iterate over all nodes in this group.
? ? nodesForGroup.foreach { case (treeIndex, nodesForTree) =>
? ? ? nodesForTree.foreach { node =>
? ? ? ? val nodeIndex = node.id
? ? ? ? val nodeInfo = treeToNodeToIndexInfo(treeIndex)(nodeIndex)
? ? ? ? val aggNodeIndex = nodeInfo.nodeIndexInGroup
? ? ? ? //從剛剛計(jì)算的best split中獲取相關(guān)數(shù)據(jù)
? ? ? ? val (split: Split, stats: InformationGainStats, predict: Predict) =
? ? ? ? ? nodeToBestSplits(aggNodeIndex)
? ? ? ? logDebug("best split = " + split)

? ? ? ? // Extract info for this node. ?Create children if not leaf.
? ? ? ? //截止條件
? ? ? ? val isLeaf = (stats.gain <= 0) || (Node.indexToLevel(nodeIndex) == metadata.maxDepth)
? ? ? ? assert(node.id == nodeIndex)
? ? ? ? node.predict = predict
? ? ? ? node.isLeaf = isLeaf
? ? ? ? node.stats = Some(stats)
? ? ? ? node.impurity = stats.impurity
? ? ? ? logDebug("Node = " + node)
? ? ? ? //如果不是葉子節(jié)點(diǎn),需要構(gòu)造左右孩子節(jié)點(diǎn)
? ? ? ? if (!isLeaf) {
? ? ? ? ? node.split = Some(split)
? ? ? ? ? //葉子節(jié)點(diǎn)的depth,當(dāng)前l(fā)evel+1
? ? ? ? ? val childIsLeaf = (Node.indexToLevel(nodeIndex) + 1) == metadata.maxDepth
? ? ? ? ? //左右孩子節(jié)點(diǎn)是否是葉子節(jié)點(diǎn)
? ? ? ? ? val leftChildIsLeaf = childIsLeaf || (stats.leftImpurity == 0.0)
? ? ? ? ? val rightChildIsLeaf = childIsLeaf || (stats.rightImpurity == 0.0)
? ? ? ? ? //構(gòu)造左右孩子節(jié)點(diǎn)
? ? ? ? ? node.leftNode = Some(Node(Node.leftChildIndex(nodeIndex),
? ? ? ? ? ? stats.leftPredict, stats.leftImpurity, leftChildIsLeaf))
? ? ? ? ? node.rightNode = Some(Node(Node.rightChildIndex(nodeIndex),
? ? ? ? ? ? stats.rightPredict, stats.rightImpurity, rightChildIsLeaf))

? ? ? ? ? if (nodeIdCache.nonEmpty) {
? ? ? ? ? ? val nodeIndexUpdater = NodeIndexUpdater(
? ? ? ? ? ? ? split = split,
? ? ? ? ? ? ? nodeIndex = nodeIndex)
? ? ? ? ? ? nodeIdUpdaters(treeIndex).put(nodeIndex, nodeIndexUpdater)
? ? ? ? ? }
? ? ? ? //如果不是葉子節(jié)點(diǎn),加入到nodeQueue待分裂隊(duì)列中
? ? ? ? ? // enqueue left child and right child if they are not leaves
? ? ? ? ? if (!leftChildIsLeaf) {
? ? ? ? ? ? nodeQueue.enqueue((treeIndex, node.leftNode.get))
? ? ? ? ? }
? ? ? ? ? if (!rightChildIsLeaf) {
? ? ? ? ? ? nodeQueue.enqueue((treeIndex, node.rightNode.get))
? ? ? ? ? }

? ? ? ? ? logDebug("leftChildIndex = " + node.leftNode.get.id +
? ? ? ? ? ? ", impurity = " + stats.leftImpurity)
? ? ? ? ? logDebug("rightChildIndex = " + node.rightNode.get.id +
? ? ? ? ? ? ", impurity = " + stats.rightImpurity)
? ? ? ? }
? ? ? }
? ? }

這里將當(dāng)前節(jié)點(diǎn)的左右孩子節(jié)點(diǎn)繼續(xù)加入nodeQueue中,這里面放的是需要繼續(xù)分裂的節(jié)點(diǎn),至此本輪的findBestSplits就完成了。

// Choose node splits, and enqueue new nodes as needed.
timer.start("findBestSplits")
DecisionTree.findBestSplits(baggedInput,
? ? metadata, topNodes, nodesForGroup,
? ? treeToNodeToIndexInfo, splits, bins, nodeQueue,
? ? timer, nodeIdCache = nodeIdCache)
timer.stop("findBestSplits")

6.5. 循環(huán)訓(xùn)練
上節(jié)我們說(shuō)到最后待分裂的節(jié)點(diǎn)會(huì)加入到nodeQueue中,回到RandomForest.run函數(shù)中

while (nodeQueue.nonEmpty) {
? ? ? // Collect some nodes to split, and choose features for each node (if subsampling).
? ? ? // Each group of nodes may come from one or multiple trees, and at multiple levels.
? ? ? val (nodesForGroup, treeToNodeToIndexInfo) =
? ? ? ? RandomForest.selectNodesToSplit(nodeQueue, maxMemoryUsage, metadata, rng)
? ? ? // Sanity check (should never occur):
? ? ? assert(nodesForGroup.size > 0,
? ? ? ? s"RandomForest selected empty nodesForGroup. ?Error for unknown reason.")

? ? ? // Choose node splits, and enqueue new nodes as needed.
? ? ? timer.start("findBestSplits")
? ? ? DecisionTree.findBestSplits(baggedInput, metadata, topNodes, nodesForGroup,
? ? ? ? treeToNodeToIndexInfo, splits, bins, nodeQueue, timer, nodeIdCache = nodeIdCache)
? ? ? timer.stop("findBestSplits")
? ? }

當(dāng)有非葉子節(jié)點(diǎn)不斷加入nodeQueue中,這里不斷分裂出節(jié)點(diǎn),直到所有節(jié)點(diǎn)觸發(fā)截止條件。
?

7. 構(gòu)造隨機(jī)森林
在上面的訓(xùn)練過(guò)程可以看到,從根節(jié)點(diǎn)topNode中不斷向下分裂一直到觸發(fā)截止條件就構(gòu)造了一棵樹(shù)所有的node,因此構(gòu)造整個(gè)森林也是非常簡(jiǎn)單

//構(gòu)造
val trees = topNodes.map(topNode => new DecisionTreeModel(topNode, strategy.algo))
//返回rf模型
new RandomForestModel(strategy.algo, trees)

8. 隨機(jī)森林模型
8.1. TreeEnsembleModel
隨機(jī)森林RandomForestModel繼承自樹(shù)集合模型TreeEnsembleModel

class TreeEnsembleModel(
? ? protected val algo: Algo,
? ? protected val trees: Array[DecisionTreeModel],
? ? protected val treeWeights: Array[Double],
? ? protected val combiningStrategy: EnsembleCombiningStrategy)

algo:Regression/Classification
trees:樹(shù)數(shù)組
treeWeights:每棵樹(shù)的權(quán)重,在RF中每棵樹(shù)的權(quán)重是相同的,在Adaboost可能是不同的
combiningStrategy:樹(shù)合并時(shí)的策略,Sum/Average/Vote,分類的話應(yīng)該是Vote,RF應(yīng)該是Average,GBDT應(yīng)該是Sum。
sumWeights:成員變量,不在參數(shù)表中,是treeWeights的sum
預(yù)測(cè)函數(shù)

/**
? ?* Predicts for a single data point using the weighted sum of ensemble predictions.
? ?*
? ?* @param features array representing a single data point
? ?* @return predicted category from the trained model
? ?*/
? private def predictBySumming(features: Vector): Double = {
? ? val treePredictions = trees.map(_.predict(features))
? ? blas.ddot(numTrees, treePredictions, 1, treeWeights, 1)
? }

將每棵樹(shù)的預(yù)測(cè)結(jié)果與各自的weight向量相乘

/**
? ?* Classifies a single data point based on (weighted) majority votes.
? ?*/
? private def predictByVoting(features: Vector): Double = {
? ? val votes = mutable.Map.empty[Int, Double]
? ? trees.view.zip(treeWeights).foreach { case (tree, weight) =>
? ? ? val prediction = tree.predict(features).toInt
? ? ? votes(prediction) = votes.getOrElse(prediction, 0.0) + weight
? ? }
? ? votes.maxBy(_._2)._1
? }

將每棵樹(shù)的預(yù)測(cè)class為key,將樹(shù)的weight累加到Map中作為value,最后取權(quán)重和最大對(duì)應(yīng)的class

8.2. RandomForestModel
RandomForestModel @Since("1.2.0") (
? ? @Since("1.2.0") override val algo: Algo,
? ? @Since("1.2.0") override val trees: Array[DecisionTreeModel])
? extends TreeEnsembleModel(algo, trees, Array.fill(trees.length)(1.0),
? ? combiningStrategy = if (algo == Classification) Vote else Average)

對(duì)于隨機(jī)森林,其weight都是1,樹(shù)合并策略如果是分類就是Vote,回歸是Average。?
模型生成后,如果要應(yīng)用到線上,需要將訓(xùn)練后的模型保存下來(lái),自己寫代碼解析模型文件,進(jìn)行預(yù)測(cè),因此要了解模型的保存和加載。

8.2.1. 模型保存
分為兩部分,第一部分是metadata,保存了一些配置,包括模型名,模型版本,模型的algo是classification/regression,合并策略,每棵樹(shù)的權(quán)重。

implicit val format = DefaultFormats
val ensembleMetadata = Metadata(model.algo.toString,
? ? model.trees(0).algo.toString,
? ? model.combiningStrategy.toString,?
? ? model.treeWeights)
val metadata = compact(render(
? ? ("class" -> className) ~ ("version" -> thisFormatVersion) ~
? ? ("metadata" -> Extraction.decompose(ensembleMetadata))))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))

第二部分是隨機(jī)森林的每棵樹(shù)的保存

// Create Parquet data.
val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) =>
? ? tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
}.toDF()
dataRDD.write.parquet(Loader.dataPath(path))

其中首先調(diào)用node的subtreeIterator函數(shù),返回所有node的Iterator,然后轉(zhuǎn)成DataFrame結(jié)構(gòu),再寫成parquet格式的文件。我們來(lái)看subtreeIterator函數(shù)

/** Returns an iterator that traverses (DFS, left to right) the subtree of this node. */
? private[tree] def subtreeIterator: Iterator[Node] = {
? ? Iterator.single(this) ++ leftNode.map(_.subtreeIterator).getOrElse(Iterator.empty) ++
? ? ? rightNode.map(_.subtreeIterator).getOrElse(Iterator.empty)
? }

其實(shí)就是用前序遍歷的方式返回了樹(shù)中的所有node的Iterrator。?
我們?cè)賮?lái)看NodeData,看看每個(gè)node保存了什么數(shù)據(jù)

def apply(treeId: Int, n: Node): NodeData = {
? ? NodeData(treeId, n.id, PredictData(n.predict), n.impurity,
? ? n.isLeaf, n.split.map(SplitData.apply), n.leftNode.map(_.id),?
? ? n.rightNode.map(_.id), n.stats.map(_.gain))
}

保存了node的預(yù)測(cè)值,impurity,是否是否葉子節(jié)點(diǎn),split,左右孩子節(jié)點(diǎn)的id,gain。其中split中包含了特征id,特征閾值,特征類型,離散特征數(shù)組(其實(shí)就是Split結(jié)構(gòu))。

8.2.2. 模型加載
metadata的加載就是解析json,主要是樹(shù)的重建

val trees = TreeEnsembleModel.SaveLoadV1_0.loadTrees(sc,?
? ? path, metadata.treeAlgo)
new RandomForestModel(Algo.fromString(metadata.algo), trees)

其中調(diào)用了loadTrees函數(shù)

/**
?* Load trees for an ensemble, and return them in order.
?* @param path path to load the model from
?* @param treeAlgo Algorithm for individual trees (which may differ from the ensemble's
?* ? ? ? ? ? ? ? ? algorithm).
?*/
def loadTrees(
? ? ? ? sc: SparkContext,
? ? ? ? path: String,
? ? ? ? treeAlgo: String): Array[DecisionTreeModel] = {
? ? val datapath = Loader.dataPath(path)
? ? val sqlContext = SQLContext.getOrCreate(sc)
? ? val nodes = sqlContext.read.parquet(datapath).map(NodeData.apply)
? ? val trees = constructTrees(nodes)
? ? trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo)))
}

先是讀取數(shù)據(jù)文件,讀成NodeData格式,然后調(diào)用constructTrees重建樹(shù)結(jié)構(gòu)

? ? def constructTrees(nodes: RDD[NodeData]): Array[Node] = {
? ? ? val trees = nodes
? ? ? ? .groupBy(_.treeId)
? ? ? ? .mapValues(_.toArray)
? ? ? ? .collect()
? ? ? ? .map { case (treeId, data) =>
? ? ? ? ? (treeId, constructTree(data))
? ? ? ? }.sortBy(_._1)
? ? ? val numTrees = trees.size
? ? ? val treeIndices = trees.map(_._1).toSeq
? ? ? assert(treeIndices == (0 until numTrees),
? ? ? ? s"Tree indices must start from 0 and increment by 1, but we found $treeIndices.")
? ? ? trees.map(_._2)
? ? }

主要功能按樹(shù)的id分組后,調(diào)用constructTree重建樹(shù)

? ? /**
? ? ?* Given a list of nodes from a tree, construct the tree.
? ? ?* @param data array of all node data in a tree.
? ? ?*/
? ? def constructTree(data: Array[NodeData]): Node = {
? ? ? val dataMap: Map[Int, NodeData] = data.map(n => n.nodeId -> n).toMap
? ? ? assert(dataMap.contains(1),
? ? ? ? s"DecisionTree missing root node (id = 1).")
? ? ? constructNode(1, dataMap, mutable.Map.empty)
? ? }

? ? /**
? ? ?* Builds a node from the node data map and adds new nodes to the input nodes map.
? ? ?*/
? ? private def constructNode(
? ? ? id: Int,
? ? ? dataMap: Map[Int, NodeData],
? ? ? nodes: mutable.Map[Int, Node]): Node = {
? ? ? if (nodes.contains(id)) {
? ? ? ? return nodes(id)
? ? ? }
? ? ? val data = dataMap(id)
? ? ? val node =
? ? ? ? if (data.isLeaf) {
? ? ? ? ? Node(data.nodeId, data.predict.toPredict, data.impurity, data.isLeaf)
? ? ? ? } else {
? ? ? ? ? val leftNode = constructNode(data.leftNodeId.get, dataMap, nodes)
? ? ? ? ? val rightNode = constructNode(data.rightNodeId.get, dataMap, nodes)
? ? ? ? ? val stats = new InformationGainStats(data.infoGain.get, data.impurity, leftNode.impurity,
? ? ? ? ? ? rightNode.impurity, leftNode.predict, rightNode.predict)
? ? ? ? ? new Node(data.nodeId, data.predict.toPredict, data.impurity, data.isLeaf,
? ? ? ? ? ? data.split.map(_.toSplit), Some(leftNode), Some(rightNode), Some(stats))
? ? ? ? }
? ? ? nodes += node.id -> node
? ? ? node
? ? }

其實(shí)就是遞歸的從NodeData中獲取數(shù)據(jù),重建node

從上面的分析可以看到,spark保存模型使用了parquet格式,對(duì)于我們?cè)趧e的環(huán)境中使用是非常不方便的,訓(xùn)練完模型后,我們可以參照spark的做法,按照前序遍歷的方法以json的格式保存node,在別的環(huán)境下復(fù)建樹(shù)結(jié)構(gòu)就可以了。

9. 坑
特征id,樣本是libsvm格式的,特征id從1開(kāi)始,但是設(shè)置離散特征數(shù)categoricalFeaturesInfo需要從0開(kāi)始,相當(dāng)于樣本特征id-1
離散特征值,一旦在categoricalFeaturesInfo中指定了特征值的個(gè)數(shù)k,spark會(huì)認(rèn)為這個(gè)特征是從0開(kāi)始,連續(xù)到k-1。如果其中特征不連續(xù),特征數(shù)應(yīng)該設(shè)置成最大特征值+1
對(duì)于連續(xù)特征,spark使用等頻離散化方法,又對(duì)樣本進(jìn)行了抽樣,效果其實(shí)很難保證,不知道作者是否比較過(guò)這種方法與等間隔離散化效果孰優(yōu)孰劣
maxBins的設(shè)置需要考慮連續(xù)特征離散化效果,連續(xù)特征離散化值的個(gè)數(shù)是maxBins-1,同時(shí)maxBins必須大于categoricalFeaturesInfo中最大離散特征值的個(gè)數(shù)
ordered feature,之前的理解是有誤的,這里的order僅僅是說(shuō)這種特征是可以經(jīng)過(guò)某種方式排列后變成有序,排序標(biāo)準(zhǔn)根據(jù)分類/回歸而不同,在上面的文章有具體介紹。在我們的實(shí)踐中,有的離散特征,例如薪資,1代表0-1000元,2代表1000-2000元,3代表2000-3000元,特征值的大小本身就表征了實(shí)際意義,這種應(yīng)該直接按連續(xù)特征處理(當(dāng)然也可以對(duì)比下效果決定)。
10. 結(jié)語(yǔ)
我們基本上是逐行分析了spark隨機(jī)森林的實(shí)現(xiàn),展現(xiàn)了其實(shí)現(xiàn)過(guò)程中使用的技巧,希望對(duì)大家在理解隨機(jī)森林和其實(shí)現(xiàn)方法有所幫助。


---------------------?
原文:https://blog.csdn.net/snaillup/article/details/72820346?
?

總結(jié)

以上是生活随笔為你收集整理的spark mllib源码分析之随机森林(Random Forest)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

日韩激情一二三区 | 91精品国产92久久久久 | 国产精品一区一区三区 | 激情久久网 | 手机在线黄色网址 | 国产一级视频在线观看 | 亚洲网站在线 | 福利一区视频 | 天天激情 | 成人av教育 | 韩国在线视频一区 | 久久99久久久久久 | 又黄又刺激 | 久草视频在线播放 | 999久久久 | 九九久久久久久久久激情 | www.天天色.com | 成人久久久电影 | 亚洲日本va午夜在线影院 | 久久天堂影院 | 8x成人免费视频 | 毛片网在线观看 | 国产精品第一页在线 | 久久字幕网 | 亚洲夜夜综合 | 国产免费观看高清完整版 | 国产视频久久久久 | 国产免费资源 | 日韩在线看片 | 亚洲国产中文字幕在线观看 | 视频 国产区| 亚洲精品久久久久中文字幕m男 | 日韩一级电影网站 | 99久久99久久精品 | 中文字幕在线观看av | 日韩久久视频 | 亚洲一区日韩在线 | 午夜精品久久久久久久99 | 欧美人人 | 国产精品片 | 午夜精品视频一区二区三区在线看 | 久久综合色8888 | 18岁免费看片 | 久久99精品久久久久久秒播蜜臀 | 91麻豆精品国产91久久久无限制版 | 国模视频一区二区 | 国产精品久久久久久久久毛片 | 免费精品人在线二线三线 | 国产精品女同一区二区三区久久夜 | 国产无遮挡又黄又爽馒头漫画 | 91女子私密保健养生少妇 | 天天干天天操 | 国产日韩精品一区二区在线观看播放 | 色网站免费在线观看 | 精品嫩模福利一区二区蜜臀 | 成人在线观看资源 | 日本中文字幕观看 | 亚洲精品久久久久999中文字幕 | 久草在线电影网 | 亚洲国内精品 | 国产色女 | 一区二区三区在线视频111 | 国产福利不卡视频 | 啪啪肉肉污av国网站 | 欧美激情视频在线观看免费 | 丁香色婷| 人人干人人超 | 欧美精品久久久久久久亚洲调教 | 九色在线 | 美女精品| 成人一级 | 最新精品国产 | 国产69精品久久久久99 | 丁香花在线观看视频在线 | 亚洲精品黄色在线观看 | 国产高清不卡av | 狠狠狠狠狠色综合 | 中文字幕专区高清在线观看 | 国产视频亚洲 | 99热日本 | 中文字幕视频观看 | 香蕉视频91 | 国产xxxx| 国产精品久久网站 | 国产精品久久久一区二区三区网站 | 最新日韩在线观看视频 | 日本精品视频在线观看 | 久久国产亚洲视频 | 五月网婷婷| 99在线免费视频观看 | 日韩精品aaa | 亚洲精品乱码久久久久 | 久久久久久久久久久久久久av | 婷婷六月天天 | 在线观看黄色大片 | 中文日韩在线视频 | 首页中文字幕 | 国产四虎在线 | 国产亚洲视频在线观看 | 久久久久久视频 | 久久香蕉国产精品麻豆粉嫩av | 视频一区二区三区视频 | 国产精品免费一区二区三区在线观看 | 国产一区免费视频 | 亚洲精选国产 | 国产成人av电影在线 | 在线播放 日韩专区 | 久久久久久综合网天天 | 国产黄色一级片在线 | 精品嫩模福利一区二区蜜臀 | 91免费网站在线观看 | .国产精品成人自产拍在线观看6 | 久久桃花网 | 新版资源中文在线观看 | 免费在线一区二区三区 | 亚洲欧洲精品久久 | 天天干天天做天天操 | 日本aaaa级毛片在线看 | 美女在线免费观看视频 | 成人av中文字幕在线观看 | 精品福利片 | 中国一级片在线 | 美女视频黄频大全免费 | 成年人在线免费看片 | 黄色激情网址 | 中文字幕国产 | 国产亚洲日本 | 一区二区三区国 | 精品国产一区二区三区日日嗨 | 在线视频免费观看 | 国产一级黄色电影 | 欧美a级免费视频 | 四虎在线免费观看 | 久久久久久免费 | 天天干,天天草 | 日日夜夜操av | 激情综合网五月激情 | 91新人在线观看 | 在线观看麻豆av | 久久久电影 | 日日夜夜91| 日韩理论片在线 | 欧美一区二区三区四区夜夜大片 | 日韩欧美一区二区三区黑寡妇 | 国产在线p| 午夜在线免费视频 | 欧美福利在线播放 | 午夜影院先 | a级片久久久 | 久久99久久99精品中文字幕 | 香蕉日日 | 日韩av一区二区在线播放 | 最近中文字幕高清字幕在线视频 | 免费又黄又爽视频 | 西西人体www444 | 国产精品视频你懂的 | 国产福利一区二区在线 | www视频在线观看 | 一区电影 | 99久久国产免费免费 | 国产日本在线播放 | 久久免费看毛片 | 韩国精品一区二区三区六区色诱 | 国产手机视频在线播放 | 岛国一区在线 | 毛片一二区| 亚洲涩综合 | 精品电影一区二区 | 91片在线观看| 啪啪动态视频 | 一区在线观看 | 欧美做受69 | 日本精品视频一区 | 久久久精品欧美一区二区免费 | 色视频国产直接看 | 免费国产在线观看 | 久久久久国产精品厨房 | 免费热情视频 | 久久人人艹 | 欧美久久久久久久久中文字幕 | 成人国产网站 | 久久免费视屏 | 久久久久亚洲精品成人网小说 | 久久久久免费观看 | 亚洲一区二区三区精品在线观看 | 999久久 | 亚洲精品91天天久久人人 | 免费在线播放黄色 | 黄a在线观看| 在线午夜av| 啪啪肉肉污av国网站 | 久久99精品久久久久久三级 | 亚洲黄a| 久久五月天综合 | 国产精品第一页在线 | 亚洲色图色 | 成年人免费av | 狠狠躁夜夜a产精品视频 | 欧美va在线观看 | 中文字幕一区二区三区久久 | 色视频成人在线观看免 | 9幺看片| 久草在线视频网站 | 国内精品视频一区二区三区八戒 | 欧美在线视频免费 | 黄色大片国产 | 国产小视频在线免费观看 | 国产专区一 | 五月天天在线 | 免费的黄色的网站 | 久久免费视频精品 | 国产精品综合在线 | 久久99国产精品自在自在app | 五月婷婷六月丁香 | 九九免费在线观看视频 | 亚洲国产精品日韩 | 欧美一级片在线免费观看 | 91在线91| 韩国精品在线 | 99综合影院在线 | 亚洲人xxx | 欧美在线观看视频一区二区三区 | 国产精品久久久久久久久久妇女 | 97高清视频 | 热久久免费国产视频 | 国产中文a| 99色在线观看 | 欧美大片mv免费 | 麻豆免费视频网站 | 激情综合亚洲 | 色.com| 999久久a精品合区久久久 | 又爽又黄又无遮挡网站动态图 | 98涩涩国产露脸精品国产网 | 国产亚洲精品久久久久久 | 国产精品美女久久久久久久久久久 | 久久深爱网 | 国产精品国产三级国产不产一地 | 国产成人精品日本亚洲999 | 国产vs久久 | 毛片无卡免费无播放器 | 91一区啪爱嗯打偷拍欧美 | 免费观看一区二区 | 日韩,中文字幕 | 国内毛片毛片 | www色婷婷com| 天干啦夜天干天干在线线 | 成年人在线免费看 | 一区二区三区免费看 | 中文字幕 在线看 | www.狠狠操.com| 国产超碰97 | 中文字幕你懂的 | 黄色毛片在线看 | 国产手机av在线 | 999精品视频 | 天天操天天色天天射 | a√天堂中文在线 | 国产精品系列在线观看 | 国产一区二区三区午夜 | 中文字幕日韩精品有码视频 | 成人黄色大片 | 中文av在线免费观看 | 国产一区 在线播放 | 日日操夜 | 色婷婷激情四射 | 三级免费黄色 | 精品主播网红福利资源观看 | 天天想夜夜操 | 久久久久综合 | 五月天综合网 | 特级毛片在线 | 欧美一区免费在线观看 | 高清国产午夜精品久久久久久 | 亚洲性xxxx| 日韩免费在线观看 | 欧美在线观看视频免费 | 国产精品久久久久久久久久免费 | 久久无码精品一区二区三区 | 欧美日韩啪啪 | 久久久免费观看 | 伊人电影在线观看 | 久草电影在线观看 | 精品国产aⅴ一区二区三区 在线直播av | 精品一二三区视频 | 黄色精品视频 | 婷婷黄色片 | 亚洲无人区小视频 | 国产国语在线 | 日韩精品免费在线观看视频 | 久久艹国产视频 | 96精品视频 | 五月天丁香亚洲 | 久久69精品 | 成人在线免费观看视视频 | 久久久在线 | 中国一级特黄毛片大片久久 | 亚洲精品日韩在线观看 | 久久99在线观看 | 色婷婷伊人 | 欧美va天堂在线电影 | 国产在线美女 | 中文字幕日韩精品有码视频 | 国产精品电影一区二区 | 久久99精品久久久久久久久久久久 | 五月开心激情 | 91影视成人 | 天天综合网在线 | 天堂av在线中文在线 | 999久久| 久草久草久草久草 | 久草视频播放 | 四虎精品成人免费网站 | 久久综合九色99 | 精品国产免费观看 | 国产资源在线免费观看 | 国产激情久久久 | 中文字幕4 | 毛片精品免费在线观看 | 免费看的黄色片 | 亚州国产精品久久久 | 国产精品久久久久久久久久不蜜月 | 精品无人国产偷自产在线 | 国产精品丝袜在线 | 日本资源中文字幕在线 | 亚洲国产精品一区二区尤物区 | 国产精品黄色影片导航在线观看 | 毛片黄色一级 | 国产破处在线视频 | 五月天亚洲综合小说网 | 国产麻豆成人传媒免费观看 | 91大神一区二区三区 | 在线成人中文字幕 | 久久毛片网站 | 五月婷av | 欧美日韩精品免费观看 | 久久美女免费视频 | 91亚洲网| 成人小视频免费在线观看 | 网站免费黄 | 欧美少妇xx| 中文字幕在线观看第三页 | 成人精品一区二区三区中文字幕 | 91男人影院 | 国产精品久久一 | 久久天天拍| 97国产情侣爱久久免费观看 | 在线看日韩 | 免费黄色小网站 | 久久综合久色欧美综合狠狠 | 欧美日韩另类视频 | 欧美黑人巨大xxxxx | 日日夜夜网站 | 91视频在线免费下载 | 久热久草在线 | 亚洲女欲精品久久久久久久18 | 日韩精品一区二区三区水蜜桃 | 人人网av| 亚洲少妇影院 | 91黄色成人| 香蕉视频国产在线 | 日本韩国中文字幕 | av高清一区二区三区 | 亚洲dvd| 91精品欧美一区二区三区 | www.伊人网| 久久精品网站视频 | 久久99精品久久久久蜜臀 | 久久精品国产99 | 久久电影中文字幕视频 | 久久综合婷婷国产二区高清 | 最新色站 | 狠狠色丁香婷婷综合最新地址 | 成人精品国产免费网站 | 五月婷婷综合激情 | 欧美日韩aaaa | 99精品视频免费看 | 麻豆传媒在线免费看 | 国产一级性生活视频 | 亚洲视频,欧洲视频 | 国产黄免费 | 奇米网8888 | 不卡的av在线 | 国产美女在线免费观看 | 天天插日日射 | 黄色日批网站 | 日本中文字幕网 | 国产精品久久精品 | 国产亚洲精品久久 | 亚洲精品综合一二三区在线观看 | 在线免费高清一区二区三区 | 在线免费91| 91在线免费播放视频 | 天堂入口网站 | 亚洲一区二区三区四区精品 | 日韩网站免费观看 | 天堂av免费| 免费看黄的视频 | 丁香久久激情 | 欧美激情另类文学 | 在线黄频 | 亚洲国产精品电影 | 91正在播放 | 91九色蝌蚪国产 | 国产成人精品不卡 | 二区视频在线观看 | av成人免费观看 | 伊人手机在线 | 婷婷综合视频 | 色婷婷激情电影 | 91视频最新网址 | 91精品久久久久久综合乱菊 | 午夜精品剧场 | 亚州人成在线播放 | 日韩91在线 | 91在线观看高清 | 午夜性盈盈 | 丁香色天天| 亚洲黄网址 | 久久艹综合| 成人午夜电影免费在线观看 | 亚洲国产精品va在线 | av中文字幕日韩 | 国产黄色片免费 | 黄色av在| 欧美a级在线免费观看 | 国产精品黄色影片导航在线观看 | 久久精品一二三区白丝高潮 | 国产精品一区二区白浆 | 最近免费观看的电影完整版 | 99精品免费在线 | 99久久国产免费,99久久国产免费大片 | 国产精品色在线 | 国产一区播放 | 成人久久久久久久久久 | 欧美性粗大hdvideo | 国内揄拍国内精品 | 中文国产字幕在线观看 | 91丨九色丨蝌蚪丰满 | 夜夜视频欧洲 | 最新国产福利 | 国产免码va在线观看免费 | 国内精品在线一区 | 国产色黄网站 | 久久久精品国产免费观看同学 | 久久综合狠狠 | 波多野结衣动态图 | 伊人婷婷激情 | 天天躁日日躁狠狠躁av中文 | 成人黄色小说网 | 成人午夜剧场在线观看 | 99这里只有精品视频 | 超碰成人免费电影 | 欧美伦理一区二区三区 | 91九色pron| 人人讲| 久久乐九色婷婷综合色狠狠182 | 青青草国产成人99久久 | 亚洲码国产日韩欧美高潮在线播放 | 久久国产精品影视 | 日韩免费一区二区三区 | 亚洲一区二区精品3399 | 久久综合婷婷国产二区高清 | 精品国产乱码久久久久久1区2匹 | 日日爱视频 | 久草在线免费在线观看 | 东方av在线免费观看 | av在线一级 | 日一日操一操 | 在线 你懂 | av性网站 | 中文字幕影视 | 国产成人一区二区三区久久精品 | 久久99精品久久久久久久久久久久 | 91国内在线 | 精品国产一区二区三区久久久蜜月 | 狠狠ri| 国产一二三区在线观看 | 亚洲国产激情 | 日韩欧美视频免费在线观看 | 超碰人人超碰 | 国产精品一区二区免费 | 国产在线精 | 国产在线视频在线观看 | 天天色天天搞 | 国产精品av电影 | 亚洲美女视频在线 | 懂色av懂色av粉嫩av分享吧 | aaa日本高清在线播放免费观看 | 国产 亚洲 欧美 在线 | 99久久综合精品五月天 | 成年人毛片在线观看 | 国产精品密入口果冻 | 欧美另类xxx | 国产成人黄色片 | 精品国产乱码久久久久久1区2匹 | 热久久在线视频 | 精品视频在线看 | 国产日韩精品在线 | 天天干天天射天天操 | 一二三区在线 | 在线观看免费视频你懂的 | www.91国产 | 久久一区精品 | 欧美成天堂网地址 | 精品国产一区二区三区久久久久久 | 亚洲电影第一页av | 福利视频第一页 | 国产xvideos免费视频播放 | 日日夜夜网站 | 久久免费高清 | 久久国产二区 | av成人免费 | 日本最新中文字幕 | 欧美一区二区在线免费看 | 91手机视频在线 | 免费情缘| 国内外成人免费在线视频 | 婷婷色五 | 亚洲精品字幕在线观看 | 久久免费视频观看 | 久久er99热精品一区二区三区 | 欧美疯狂性受xxxxx另类 | 亚洲成av人片 | 国产成人99久久亚洲综合精品 | 久热久草在线 | 91精品国产麻豆国产自产影视 | 五月天亚洲婷婷 | 亚洲天堂网视频在线观看 | 久久综合久久久 | 色综合久久久网 | 成人午夜电影免费在线观看 | 国产精品一区二区久久精品爱涩 | 超碰在线天天 | 欧美成人性网 | 日韩中文字幕一区 | 日韩激情免费视频 | 奇米7777狠狠狠琪琪视频 | 国产精品成人av电影 | 韩国av永久免费 | 成人午夜电影在线 | 99精品一区 | 精品毛片一区二区免费看 | 色94色欧美| www天天操| 91精品国产乱码在线观看 | 黄色毛片一级 | 韩日视频在线 | 一区二区电影在线观看 | 男女拍拍免费视频 | 久久久久久久久久久网站 | 一区二区三区在线免费 | 精品国产一区二区三区在线观看 | 国产亚洲精品bv在线观看 | 久久久久久高潮国产精品视 | 午夜视频二区 | 久 久久影院 | 99久e精品热线免费 99国产精品久久久久久久久久 | 婷婷中文在线 | 97**国产露脸精品国产 | 国产无遮挡又黄又爽在线观看 | 久草爱| 精品你懂的 | 国产色就色| 日韩特黄一级欧美毛片特黄 | 91最新国产 | 日韩成人精品在线观看 | 黄色影院在线免费观看 | 日韩18p| 91欧美日韩国产 | 国产视频一区在线播放 | 久久99国产视频 | av黄色免费在线观看 | 人人澡人人爽欧一区 | 99久久www| 一区二区三区久久精品 | 国产精品美女 | 亚洲热视频 | 国产精品亚洲精品 | 成人黄色大片网站 | 黄色美女免费网站 | 在线观看成人福利 | 色狠狠一区二区 | 欧美日韩一区二区久久 | 97精品国产97久久久久久粉红 | 免费色视频网站 | 免费黄色激情视频 | 欧美另类交在线观看 | 国产成人精品综合久久久久99 | 伊人色综合久久天天 | 国产激情小视频在线观看 | 国产黄色精品网站 | 亚洲精品av中文字幕在线在线 | 丁香婷婷久久久综合精品国产 | 久久精品国产99国产 | 久久婷婷综合激情 | 韩国在线视频一区 | 日韩精品免费一区二区在线观看 | 在线草| 久久久av电影 | 涩涩爱夜夜爱 | 国产精品网红福利 | 99视频免费播放 | 五月天综合网站 | 亚洲视频久久久久 | 成人毛片一区二区三区 | 色91在线 | 国产高清精品在线 | 日韩欧美高清 | 一区二区视频免费在线观看 | 国产精品都在这里 | 日韩在线网 | 亚洲最大av网 | www.国产精品| 日韩黄色一区 | 99在线视频精品 | 国产精品ⅴa有声小说 | 国内精品久久久 | 久久久久一区二区三区 | 999久久久精品视频 日韩高清www | 超碰97中文 | 99高清视频有精品视频 | 色吊丝在线永久观看最新版本 | 性色在线视频 | а天堂中文最新一区二区三区 | 日本女人的性生活视频 | 狠狠干狠狠艹 | 国产又粗又猛又黄又爽视频 | 精品一二三四视频 | 天天射天天射天天射 | 中文乱码视频在线观看 | 美女激情影院 | 在线观看你懂的网址 | 国产一区二区三区免费在线观看 | 成人福利在线 | 亚洲高清视频在线 | 日韩a在线观看 | 国产成人久久精品一区二区三区 | 青青草国产成人99久久 | 丁香花五月 | 国内精品久久影院 | 久久黄色美女 | 欧美亚洲成人xxx | 国产永久免费高清在线观看视频 | 久久免费成人精品视频 | 国产日韩精品一区二区 | 久久99精品国产 | 91精品国产91p65 | 亚洲区视频在线观看 | 国产在线高清精品 | 国产精品欧美久久久久久 | 三级午夜片 | 国产精品毛片一区视频播 | 精品国产免费久久 | 在线看片a | 91免费观看网站 | 亚洲视频中文 | 婷婷综合影院 | 久久午夜鲁丝片 | 九九九九精品 | 国产精品福利一区 | 欧美天堂久久 | 天天曰夜夜操 | 天天曰天天射 | 国产馆在线播放 | 97激情影院 | 免费看一级黄色 | 五月婷网站 | 国产精品ssss在线亚洲 | 中文字幕在线观看91 | 久久麻豆视频 | 99re国产视频| 久久激情视频网 | 国产视频网站在线观看 | 韩日三级在线 | 成人免费观看网站 | 韩国一区二区三区视频 | 91黄色在线看 | 亚洲高清在线精品 | 最近更新中文字幕 | 天天玩天天干 | 国产精品免费大片视频 | 在线免费观看国产黄色 | 中文在线免费看视频 | 久久理论电影网 | 国产精品一区二区美女视频免费看 | 91麻豆精品国产91久久久使用方法 | 日韩,精品电影 | 亚洲男男gaygay无套同网址 | 国产精品美女www爽爽爽视频 | 丁香六月婷婷激情 | 国产精品久久久久久久久久三级 | 97精品国产97久久久久久免费 | 9999精品视频 | 久久久国产精品成人免费 | 色中文字幕在线观看 | 国产精品爽爽爽 | 美女福利视频一区二区 | 国产精品99久久久 | 99成人免费视频 | 欧美成年人在线视频 | 丁香婷婷激情国产高清秒播 | 99精品视频在线观看播放 | 国产中文在线播放 | 最新影院 | 日韩大片在线免费观看 | 久久久久久久免费观看 | 国产一区二区不卡在线 | 国产五月天婷婷 | 精品人人人人 | 欧美日韩性生活 | 国产精品免费在线 | 国产99自拍| 精品av在线播放 | 亚洲国内在线 | 日韩va欧美va亚洲va久久 | 91中文在线视频 | 超碰在线观看av.com | 久久99爱视频| www色com| 911久久| 国产高清永久免费 | 国产视频色 | 一级黄色片在线免费观看 | 久久久久久久综合色一本 | 超碰97在线人人 | 在线а√天堂中文官网 | 久久久久色 | 国产精品999久久久 久产久精国产品 | 欧美激情另类 | 五月婷婷激情综合网 | 成人av av在线| 91中文字幕在线 | 亚洲欧美婷婷六月色综合 | av色网站| 日韩精品资源 | 欧美一级电影 | 超碰成人免费电影 | 精品资源在线 | 国产理论免费 | 国产午夜精品一区二区三区欧美 | 97精品久久 | 国产午夜影院 | 婷五月天激情 | 人人天天夜夜 | 婷婷午夜 | 色综合久久88| 国产精品色在线 | 久久久精品欧美一区二区免费 | 国产99免费 | 婷婷丁香激情 | 午夜黄色 | 亚洲成a人片在线观看网站口工 | 日韩毛片在线播放 | 中文字幕高清视频 | 在线视频99 | 久久九九国产视频 | 免费看成人| 国产精品入口66mio女同 | 国产精品欧美久久久久天天影视 | 成年人在线观看视频免费 | 日韩欧美高清一区二区 | 日韩免费一级a毛片在线播放一级 | 天天舔天天搞 | 国产精品区免费视频 | 97超碰在线久草超碰在线观看 | 久久婷婷色综合 | 91免费的视频在线播放 | 9免费视频| 91精品电影 | 99久久精品国产系列 | 久草手机视频 | 日日草天天草 | av一级片在线观看 | 成人性生交大片免费看中文网站 | 丁香六月av | 日韩美一区二区三区 | 一级免费黄视频 | 国产精品手机看片 | 丁香六月五月婷婷 | 91精品国产99久久久久久红楼 | 国内少妇自拍视频一区 | 欧美欧美| 亚洲一区二区麻豆 | 国产剧情一区 | 国产精品视屏 | 色com| 国产一级电影在线 | 久久久久久视频 | 999免费视频 | 日韩综合视频在线观看 | 免费a网| 国产精品一区二区久久精品 | 国产精品亚洲人在线观看 | 国产一性一爱一乱一交 | avhd高清在线谜片 | 99精品国产免费久久久久久下载 | 高清精品久久 | 欧美成人性战久久 | 精品96久久久久久中文字幕无 | 五月激情丁香婷婷 | 操处女逼| 日日夜夜人人天天 | 色狠狠久久av五月综合 | 欧美视频在线观看免费网址 | 91精品毛片 | 色噜噜日韩精品一区二区三区视频 | 欧美另类重口 | 午夜天天操 | 五月丁色 | a级国产乱理伦片在线观看 亚洲3级 | 激情久久久久 | 韩国av在线| 日韩精品中文字幕av | 中文字幕免费观看 | 精品久久久一区二区 | 九九久久影视 | 亚洲精品乱码久久久久久蜜桃不爽 | 黄色软件在线观看免费 | 少妇高潮冒白浆 | 中文字幕资源在线观看 | 国产精品久久久久久模特 | 三级黄色大片在线观看 | 午夜12点| 欧美日韩在线视频一区 | 国产精品久久久久久99 | 免费av电影网站 | 99久久99久久精品 | 福利电影久久 | 亚洲国产精品久久久久婷婷884 | 黄色片免费电影 | 青青河边草免费 | 午夜精品99久久免费 | 看v片| 久久精品欧美一区 | 久久国产麻豆 | 大型av综合网站 | 色在线网站 | 欧美国产日韩在线观看 | 国产99精品| 伊人宗合网 | 中文字幕中文字幕在线中文字幕三区 | 欧美精品久久久久久久久久丰满 | 免费av网址大全 | 久久久精品国产一区二区三区 | 国产高清av免费在线观看 | 黄色软件大全网站 | 最近免费中文字幕大全高清10 | 国产日韩精品在线观看 | 国产精品免费观看久久 | 久久人91精品久久久久久不卡 | 久久精品视频网 | 亚洲电影一级黄 | 97自拍超碰 | 久草在线一免费新视频 | 欧美日韩亚洲第一 | 日日夜夜天天综合 | 天天干天天操天天做 | 国产精品伦一区二区三区视频 | 视频一区二区免费 | av免费福利 | 91麻豆精品国产91久久久久 | 亚洲视频久久久久 | 日本中文字幕系列 | 国产99久久久欧美黑人 | 在线看中文字幕 | 欧美一区影院 | 国产超碰在线观看 | 免费看片成年人 | 天天激情综合 | 国产精品自在线 | 国产福利精品一区二区 | 久久艹综合 | 久草视频免费观 | 日韩视频二区 | 久久国内免费视频 | 国产原创在线 | 狠狠操电影网 | 久久久久久久久爱 | 亚洲高清色综合 | 久久久久国产精品午夜一区 | 激情视频在线高清看 | 涩五月婷婷 | 五月婷在线播放 | 亚洲欧美视频一区二区三区 | 99久久精品国产一区二区三区 | 最新日韩视频在线观看 | 91在线精品视频 | 婷婷色在线观看 | 一区二区三区久久精品 | 五月婷婷中文字幕 | 伊人婷婷色 | 精品视频在线视频 | 欧美日韩一级视频 | 中文理论片 | 日韩欧美一区二区三区视频 | 亚洲免费黄色 | 2019av在线视频 | 午夜精品视频在线 | 亚洲码国产日韩欧美高潮在线播放 | 婷婷综合网 | 欧美成人中文字幕 | 精品一区三区 | 在线观看一区视频 | 深爱激情开心 | 中文字幕中文字幕 | 中文乱码视频在线观看 | 国语久久| 午夜黄色一级片 | 香蕉视频导航 | 成人午夜毛片 | 久久9视频 | 免费又黄又爽的视频 | 成人一级黄色片 | 97国产人人| 久草在线视频新 | 国产精品免费在线播放 | 国产精品观看 | 欧美一级久久久久 | 国产精品一区二区中文字幕 | 91尤物国产尤物福利在线播放 | 国产精品白浆视频 | 6080yy午夜一二三区久久 | 欧美日韩精品电影 | 中文av不卡| 激情婷婷久久 | 国产伦精品一区二区三区… | 91伊人久久大香线蕉蜜芽人口 | 黄色精品免费 | 国产成人精品久久二区二区 | www.com久久| 91精品综合在线观看 | 中文字幕在线观看网址 | 日韩精品资源 | 手机在线欧美 | 麻豆国产精品永久免费视频 | 亚洲黄色片 | 国产一二三四在线观看视频 | 国产午夜精品一区二区三区四区 | 国产成人精品不卡 | 日韩在线视频网址 | 97高清免费视频 | 精品久久久久久久久中文字幕 | 亚洲国产美女精品久久久久∴ | 日韩在线视 | 高清有码中文字幕 | 国产亚洲视频系列 | 欧美日韩精品在线播放 | 视频国产区 | 久久免费观看少妇a级毛片 久久久久成人免费 | 天天综合天天做天天综合 | 又紧又大又爽精品一区二区 | 国产v在线播放 | 在线播放一区 | 久久久久国产精品一区二区 | 超碰国产人人 | 亚洲国产wwwccc36天堂 | 四虎国产精品免费观看视频优播 | 国产成人精品av在线 | 18国产精品白浆在线观看免费 | 狠狠躁夜夜a产精品视频 | 天天操天天操天天操天天操天天操天天操 | 天天综合网在线 | 精品久久福利 | 9在线观看免费 | 正在播放国产一区 | 久久精品视频网址 | 黄网站免费看 | 黄色亚洲大片免费在线观看 | 四虎影视欧美 | 91aaa在线观看| 精品国产乱码久久久久久浪潮 | 国产麻豆视频网站 | 国产精品免费一区二区三区 | 久久夜视频 | 久久视频免费 | 日韩av成人在线观看 | 日韩一二三在线 | 中文字幕av影院 | 亚洲一区二区三区四区精品 | 韩国在线一区二区 | 色婷婷在线播放 | 日本精油按摩3 | av网站手机在线观看 | 久久免费毛片视频 | 日韩av一区二区在线播放 | 国产精品18久久久久白浆 | 国产一级黄大片 | 在线一区av | 超碰在线免费97 | 中文字幕一区av | 国产成人av在线 | 91丨九色丨首页 | 免费观看成人av | 亚洲精品国产精品久久99 | 88av网站| 欧美色婷 | 在线不卡的av |