日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大数据算法:排位问题(2)

發(fā)布時間:2025/5/22 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据算法:排位问题(2) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

此文已由作者葉林寶授權網易云社區(qū)發(fā)布。

歡迎訪問網易云社區(qū),了解更多網易技術產品運營經驗。


方案四:Sort on Cell Values

簡述:

上述方案三, 當數(shù)據(jù)行數(shù)較多, 情況下, 在二次排序還是可能出現(xiàn)oom情況, 而且, 不同的field_index的數(shù)據(jù)可能shuffle到同一個分區(qū),這樣就加大了oom的概率。當field_index本身取值較多 情況下, 增加分區(qū)數(shù)是其中一種解決方法。但是field_index取值本身就少于分區(qū)數(shù)的情況下, 增加分區(qū)數(shù)對緩解oom就沒任何作用了。 如果 當field_value相比field_index較為分散, 且值較多的情況下, 不妨換個思維, 按field_value分區(qū)。 具體算法如下:

算法:

(1)將df 轉換為(field_value, field_index)

(2)對分區(qū)內的數(shù)據(jù), 用sortByKey根據(jù) field_value排序 (rangPartition排序)

(3)利用mapPartitions確定每個分區(qū)內的每個field_index共有多少數(shù)據(jù)(不同分區(qū)中的filed_value相對有序, 例如partiiton1 中的filed_value比partition2中的field_value小)

(4)利用第(3)步數(shù)據(jù), 確定每個field_index中所需要的排名的數(shù)據(jù)在哪個分區(qū)以及分區(qū)內第幾條數(shù)據(jù)。例如要輸出field_index_6的13th位數(shù)據(jù),假設第一個分區(qū)已經包含10條數(shù)據(jù), 則目標數(shù)據(jù)在第二個分區(qū)的第3條數(shù)據(jù)

(5)轉換(4)計算結果為標準輸出格式

代碼:

(1)

/***?將數(shù)據(jù)源df轉換為(field_value,?field_index)格式的rdd*?@param?dataFrame*?@return*/def?getValueColumnPairs(dataFrame?:?DataFrame):?RDD[(Double,?Int)]?={dataFrame.rdd.flatMap{row:?Row?=>?row.toSeq.zipWithIndex.map{case?(v,?index)?=>?(v.toString.toDouble,?index)}}}

(3)

/***?對按照field_value排序后的sortedValueColumnPairs,?計算出每個分區(qū)上,?每個field_index分別有多少數(shù)據(jù)*?@param?sortedValueColumnPairs*?@param?numOfColumns*?@return*/def?getColumnsFreqPerPartition(sortedValueColumnPairs:?RDD[(Double,?Int)],numOfColumns?:?Int):?Array[(Int,?Array[Long])]?=?{val?zero?=?Array.fill[Long](numOfColumns)(0)????def?aggregateColumnFrequencies?(partitionIndex?:?Int,?valueColumnPairs?:?Iterator[(Double,?Int)])?=?{val?columnsFreq?:?Array[Long]?=?valueColumnPairs.aggregate(zero)((a?:?Array[Long],?v?:?(Double,?Int))?=>?{val?(value,?colIndex)?=?v??????????//increment?the?cell?in?the?zero?array?corresponding?to?this?columna(colIndex)?=?a(colIndex)?+?1La},(a?:?Array[Long],?b?:?Array[Long])?=>?{a.zip(b).map{?case(aVal,?bVal)?=>?aVal?+?bVal}})Iterator((partitionIndex,?columnsFreq))}sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()}

舉例說明:

假設對(1)中轉換后的數(shù)據(jù), 按照field_value排序后, 各個分區(qū)的數(shù)據(jù)如下所示

Partition 1: (1.5, 0) (1.75, 1) (2.0, 2) (5.25, 0)

Partition 2: (7.5, 1) (9.5, 2)

則(2)的輸出結果為:

[(0, [2, 1, 1]), (1, [0, 1, 1])]

(4)

/***?計算每個field_index所需排位數(shù)據(jù)在第幾個分區(qū)的第幾條數(shù)據(jù)*?@param?targetRanks?排位數(shù)組*?@param?partitionColumnsFreq?每個分區(qū)的每個field_index包含多少數(shù)據(jù)*?@param?numOfColumns?field個數(shù)*?@return*/def?getRanksLocationsWithinEachPart(targetRanks?:?List[Long],partitionColumnsFreq?:?Array[(Int,?Array[Long])],numOfColumns?:?Int)?:?Array[(Int,?List[(Int,?Long)])]?=?{????//?二維數(shù)組,?存儲當前每個field_index,?遍歷到到第幾條數(shù)據(jù)val?runningTotal?=?Array.fill[Long](numOfColumns)(0)????//?The?partition?indices?are?not?necessarily?in?sorted?order,?so?we?need//?to?sort?the?partitionsColumnsFreq?array?by?the?partition?index?(the//?first?value?in?the?tuple).partitionColumnsFreq.sortBy(_._1).map?{??????//?relevantIndexList?存儲分區(qū)上,?滿足排位數(shù)組的field_index在該分區(qū)的第幾條數(shù)據(jù)case?(partitionIndex,?columnsFreq)?=>?val?relevantIndexList?=?new?mutable.MutableList[(Int,?Long)]()columnsFreq.zipWithIndex.foreach{?case?(colCount,?colIndex)?=>??????????//?當天field_index(即colIndex),?遍歷到第幾條數(shù)據(jù)val?runningTotalCol?=?runningTotal(colIndex)??????????//??當前field_index(即colIndex),排位數(shù)組中哪些排位位于當前分區(qū)val?ranksHere:?List[Long]?=?targetRanks.filter(rank?=>runningTotalCol?<?rank?&&?runningTotalCol?+?colCount?>=?rank)??????????//?計算出當前分區(qū),當前field_index(即colIndex),?滿足排位數(shù)組的field_value在當前分區(qū)的位置relevantIndexList?++=?ranksHere.map(rank?=>?(colIndex,?rank?-?runningTotalCol))runningTotal(colIndex)?+=?colCount}(partitionIndex,?relevantIndexList.toList)}}

舉個例子:

假如目標排位:targetRanks: [5]

各分區(qū)各feild_index數(shù)據(jù)量:partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])]

字段個數(shù):numOfColumns: 2

輸出結果: [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]

(5)

/***?過濾出每個field_index?所需排位的數(shù)值*?@param?sortedValueColumnPairs*?@param?ranksLocations?(4)中計算出的滿足排位數(shù)組要求的每個分區(qū)上,每個field_index在該分區(qū)的第幾條數(shù)據(jù)*?@return*/def?findTargetRanksIteratively(?sortedValueColumnPairs?:?RDD[(Double,?Int)],?ranksLocations?:?Array[(Int,?List[(Int,?Long)])]):RDD[(Int,?Double)]?=?{sortedValueColumnPairs.mapPartitionsWithIndex((partitionIndex?:?Int,?valueColumnPairs?:?Iterator[(Double,?Int)])?=>?{????????//?當前分區(qū)上,?滿足排位數(shù)組的feild_index及其在該分區(qū)上的位置val?targetsInThisPart:?List[(Int,?Long)]?=?ranksLocations(partitionIndex)._2????????if?(targetsInThisPart.nonEmpty)?{??????????//?map中的key為field_index,?value為該feild_index在當前分區(qū)中的哪些位置上的數(shù)據(jù)滿足排位數(shù)組要求val?columnsRelativeIndex:?Map[Int,?List[Long]]?=?targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))val?columnsInThisPart?=?targetsInThisPart.map(_._1).distinct??????????//?存儲各個field_index,?在分區(qū)遍歷了多少條數(shù)據(jù)val?runningTotals?:?mutable.HashMap[Int,?Long]=?new?mutable.HashMap()runningTotals?++=?columnsInThisPart.map(columnIndex?=>?(columnIndex,?0L)).toMap??????????//?遍歷當前分區(qū)的數(shù)據(jù)源,?格式為(field_value,?field_index),?過濾出滿足排位數(shù)據(jù)要求的數(shù)據(jù)valueColumnPairs.filter{????????????case(value,?colIndex)?=>lazy?val?thisPairIsTheRankStatistic:?Boolean?=?{????????????????//?每遍歷一條數(shù)據(jù),?runningTotals上對應的field_index?當前已遍歷數(shù)據(jù)量+1val?total?=?runningTotals(colIndex)?+?1LrunningTotals.update(colIndex,?total)columnsRelativeIndex(colIndex).contains(total)}(runningTotals?contains?colIndex)?&&?thisPairIsTheRankStatistic}.map(_.swap)}?else?{Iterator.empty}})}


分析:

(1)這種方法代碼可讀性較差

(2)需要遍歷兩遍原始數(shù)據(jù)

(3)相比于方案三, 更加有效避免executor內oom

(4)當field_value分布較離散的情況下, 這種方案相比于前三種, 效率更高

(5)上述算法中, 有兩個潛在的問題, 當field_value傾斜情況下(即某個范圍的值特別多),算法效率嚴重依賴于算法描述中的步驟(2)是否能將所有的field_value均勻的分配到各個partition;另一個問題是,當某些field_value重復現(xiàn)象比較多時, 是否可以合并對這些field_value的計數(shù),而不是在一個partition中的iterator中挨個遍歷這些重復數(shù)據(jù)。

備注:上述內容(問題背景、解決算法)取自《High Performance Spark Best Practices for Scaling and Optimizing Apache Spark》(作者: Holden Karau and Rachel Warren)


免費體驗云安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點擊。


相關文章:
【推薦】?[翻譯]pytest測試框架(一)
【推薦】?淺談js拖拽
【推薦】?HBase最佳實踐-集群規(guī)劃

轉載于:https://www.cnblogs.com/163yun/p/9881058.html

總結

以上是生活随笔為你收集整理的大数据算法:排位问题(2)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。