spark ui_Spark UI的见解
spark ui
作為apache-spark-job解剖的后續(xù)文章,我將分享您如何使用Spark UI進(jìn)行作業(yè)調(diào)整。 我將繼續(xù)使用先前文章中使用的相同示例,新的spark應(yīng)用程序?qū)⒃谝韵路矫嫱瓿晒ぷ?
–閱讀紐約市停車票
–通過“板ID”進(jìn)行匯總并計(jì)算違規(guī)日期
–保存結(jié)果
此代碼的DAG看起來像這樣
這是多階段的工作,因此需要一些數(shù)據(jù)混洗,因?yàn)榇耸纠煜磳懭霝?64mb,輸出為461MB。
讓我們看看我們可以做些什么來減少這種情況?
讓我們從“ Stage2”開始采取自上而下的方法。 首先想到的是探索壓縮。
當(dāng)前代碼
新密碼
aggValue.map {case (key, value) => Array(key, value._1, value._2.mkString(",")).mkString("\t")}.saveAsTextFile(s"/data/output/${now}", classOf[GzipCodec])新代碼僅在寫入時(shí)啟用gzip,讓我們看看我們在Spark UI上看到的內(nèi)容
用Gzip保存
只需寫入編碼器,寫入就減少了70%。 現(xiàn)在達(dá)到135Mb并加快了工作速度。
讓我們先看看還有什么可能,然后再進(jìn)行更多的內(nèi)部調(diào)整
最終輸出如下所示
1RA32? ?1? ? ? ?05/07/2014 92062KA 2? ? ? ?07/29/2013,07/18/2013 GJJ1410 3? ? ? ?12/07/2016,03/04/2017,04/25/2015 FJZ3486 3? ? ? ?10/21/2013,01/25/2014 FDV7798 7? ? ? ?03/09/2014,01/14/2014,07/25/2014,11/21/2015,12/04/2015,01/16/2015進(jìn)攻日期以原始格式存儲,因此可以對此應(yīng)用很少的編碼以獲得更快的速度。
Java 8添加了LocalDate來簡化日期操作,該類帶有一些方便的功能,其中之一就是toEpocDay。
此函數(shù)將日期轉(zhuǎn)換為1970年的日期,因此這意味著在4個(gè)字節(jié)(Int)中,我們最多可以存儲5K年,與當(dāng)前格式占用10個(gè)字節(jié)相比,這似乎節(jié)省了很多。
epocDay的代碼段
val issueDate = LocalDate.parse(row(aggFieldsOffset.get("issue date").get), ISSUE_DATE_FORMAT)val issueDateValues = mutable.Set[Int]()issueDateValues.add(issueDate.toEpochDay.toInt)result = (fieldOffset.map(fieldInfo => row(fieldInfo._2)).mkString(","), (1, issueDateValues))更改后的Spark UI。 我還做了另一項(xiàng)更改以使用KryoSerializer
這是一個(gè)巨大的改進(jìn),隨機(jī)寫入從564Mb更改為409MB(提高27%),輸出從134Mb更改為124 Mb(提高8%)
現(xiàn)在讓我們轉(zhuǎn)到Spark UI上的另一部分,該部分顯示了執(zhí)行者端的日志。
以上運(yùn)行的GC日志顯示以下內(nèi)容
2018-10-28T17:13:35.332+0800: 130.281: [GC (Allocation Failure) [PSYoungGen: 306176K->20608K(327168K)] 456383K->170815K(992768K), 0.0222440 secs] [Times: user=0.09 sys=0.00, real=0.03 secs] 2018-10-28T17:13:35.941+0800: 130.889: [GC (Allocation Failure) [PSYoungGen: 326784K->19408K(327168K)] 476991K->186180K(992768K), 0.0152300 secs] [Times: user=0.09 sys=0.00, real=0.02 secs] 2018-10-28T17:13:36.367+0800: 131.315: [GC (GCLocker Initiated GC) [PSYoungGen: 324560K->18592K(324096K)] 491332K->199904K(989696K), 0.0130390 secs] [Times: user=0.11 sys=0.00, real=0.01 secs] 2018-10-28T17:13:36.771+0800: 131.720: [GC (GCLocker Initiated GC) [PSYoungGen: 323744K->18304K(326656K)] 505058K->215325K(992256K), 0.0152620 secs] [Times: user=0.09 sys=0.00, real=0.02 secs] 2018-10-28T17:13:37.201+0800: 132.149: [GC (Allocation Failure) [PSYoungGen: 323456K->20864K(326656K)] 520481K->233017K(992256K), 0.0199460 secs] [Times: user=0.12 sys=0.00, real=0.02 secs] 2018-10-28T17:13:37.672+0800: 132.620: [GC (Allocation Failure) [PSYoungGen: 326016K->18864K(327168K)] 538169K->245181K(992768K), 0.0237590 secs] [Times: user=0.17 sys=0.00, real=0.03 secs] 2018-10-28T17:13:38.057+0800: 133.005: [GC (GCLocker Initiated GC) [PSYoungGen: 324016K->17728K(327168K)] 550336K->259147K(992768K), 0.0153710 secs] [Times: user=0.09 sys=0.00, real=0.01 secs] 2018-10-28T17:13:38.478+0800: 133.426: [GC (Allocation Failure) [PSYoungGen: 322880K->18656K(326144K)] 564301K->277690K(991744K), 0.0156780 secs] [Times: user=0.00 sys=0.00, real=0.01 secs] 2018-10-28T17:13:38.951+0800: 133.899: [GC (Allocation Failure) [PSYoungGen: 323808K->21472K(326656K)] 582842K->294338K(992256K), 0.0157690 secs] [Times: user=0.09 sys=0.00, real=0.02 secs] 2018-10-28T17:13:39.384+0800: 134.332: [GC (Allocation Failure) [PSYoungGen: 326624K->18912K(317440K)] 599490K->305610K(983040K), 0.0126610 secs] [Times: user=0.11 sys=0.00, real=0.02 secs] 2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K), 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs]讓我們專注于一條線
2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K) , 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs]次要GC之前的堆為600MB,之后為320MB,總堆大小為987MB。
執(zhí)行器分配了2GB內(nèi)存,并且此Spark應(yīng)用程序未使用所有內(nèi)存,我們可以通過發(fā)送更多任務(wù)或更大任務(wù)來給執(zhí)行器增加更多負(fù)載。
我將輸入分區(qū)從270減少到100
帶270個(gè)輸入分區(qū)
帶100個(gè)輸入分區(qū)
100個(gè)輸入分區(qū)看起來更好,可減少約10%以上的數(shù)據(jù)洗牌。
其他技巧
現(xiàn)在,我將分享一些將大大改變GC的東西!
優(yōu)化前的代碼
private def mergeValues(value1: (Int, mutable.Set[Int]), value2: (Int, mutable.Set[Int])): (Int, mutable.Set[Int]) = {val newCount = value1._1 + value2._1val dates = value1._2dates.foreach(d => value2._2.add(d))(newCount, value2._2)}private def saveData(aggValue: RDD[(String, (Int, mutable.Set[Int]))], now: String) = {aggValue.map { case (key, value) => Array(key, value._1, value._2.mkString(",")).mkString("\t") }.coalesce(100).saveAsTextFile(s"/data/output/${now}", classOf[GzipCodec])}優(yōu)化后的代碼
private def mergeValues(value1: GroupByValue, value2: GroupByValue): GroupByValue = {if (value2.days.size > value1.days.size) {value2.count = value1.count + value2.countvalue1.days.foreach(d => value2.days.add(d))value2}else {value1.count = value1.count + value2.countvalue2.days.foreach(d => value1.days.add(d))value1}}private def saveData(aggValue: RDD[(String, GroupByValue)], now: String) = {aggValue.mapPartitions(rows => {val buffer = new StringBuffer()rows.map {case (key, value) =>buffer.setLength(0)buffer.append(key).append("\t").append(value.count).append("\t").append(value.days.mkString(","))buffer.toString}}).coalesce(100).saveAsTextFile(s"/data/output/${now}", classOf[GzipCodec])}新代碼正在對集合進(jìn)行優(yōu)化合并,它向大集合中添加了小集合,并且還引入了Case類。
另一項(xiàng)優(yōu)化是保存功能,其中它使用mapPartitions通過使用StringBuffer減少對象分配。
我使用http://gceasy.io獲得了一些GC統(tǒng)計(jì)信息。
更改代碼之前
更改代碼后
新代碼為例如產(chǎn)生更少的垃圾。
總GC 126 GB和122 GB(約提高4%)
最大GC時(shí)間720ms與520 ms(約好25%)
優(yōu)化看起來很有希望。
該博客中使用的所有代碼都可以在github repo sparkperformance上找到
請繼續(xù)關(guān)注有關(guān)此內(nèi)容的更多信息。
翻譯自: https://www.javacodegeeks.com/2018/11/insights-spark-ui.html
spark ui
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的spark ui_Spark UI的见解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 翀怎么读音 翀字是什么意思
- 下一篇: api 获取网络使用情况_您的API是什