當前位置:
首頁 >
spark如何防止内存溢出_spark开发十大原则
發布時間:2024/10/8
44
豆豆
生活随笔
收集整理的這篇文章主要介紹了
spark如何防止内存溢出_spark开发十大原则
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
前言
本文主要闡述的是在開發spark的時候遵循十大開發原則,這些原則都是我們的前輩辛辛苦苦的總結而來,但是也不是憑空創造的,是有依據可循的,就在官網上面,讓我們來認識一下吧。
網址:http://spark.apache.org/docs/2.2.3/tuning.html
通過上面的網址我們可以找到所有優化spark的內容,記下來讓我開始闡述一下這十大開發原則吧。
原則一:避免創建重復的RDD
一般而言,我們加載數據在hdfs或者本地或者任何地方 比如下面的語句: val rdd1 = sc.textFile("hdfs://node1:9000/hello.txt") 上面的語句我們從hdfs加載了一份hello.txt文件,這是正確的,毫無疑問,但是有時候當我們的代碼達到一定 數量級別之后,比如超過500行或者一千行,這個時候我們就可能忘記我們已經加載過這個數據源了,很大的程度 上我們還會寫下面的代碼 val rdd2 = sc.textFile("hdfs://node1:9000/hello.txt") 所以你發現了沒?我們對同一份數據進行了兩次加載,讀者讀到這里可能會笑,怎么可能,我不會犯這樣的錯誤, 或者讀者根本沒有意識到這種有什么問題? 很簡單,在代碼多的時候這種問題出現的概率很大,而且同一份數據加載兩次就意味著我們第二次執行了一次耗時 的操作,而且還沒有啥用處。 所以,我們再寫代碼的時候一定要注意這個問題,當數據量特別大的時候,而且是越大的時候,這個問題越嚴重,要 仔細檢查我們的代碼,避免重復加載同一份數據。原則二:盡可能的復用同一個RDD
比如我們這里創建了一個rdd操作 val rdd1 = xxxxx 總之我們通過一種方式獲取了一種rdd 比如rdd1的數據格式現在為(key1,value1),這個時候我們想要做一種操作,我們指向用rdd1中的value1 所以我們很有可能的操作是下面這種 val rdd2 = rdd1.map(x=>(x._2)) 于是通過上面的操作我們又獲得了一個新的rdd,于是接下來我們繼續用rdd2操作 rdd2.map(x=>x*2) 到這里,讀者你發現了問題了沒?首先rdd2.map(x=>x*2)這個操作的執行步驟當在運行的時候還依舊是先去執行 rdd1,然后創建rdd2,然后把里面的每個元素乘以2,有沒有覺得多余呢?是的,很多余。 那么我們正確的做法就是直接用rdd1來操作即可比如這個樣子: val rdd1 = xxxx rdd1.map(x=>x._2 * 2)) 以上操作,我們并沒有創建新的rdd,但是我們做到了相同的效果所以,我們再寫rdd的時候盡可能的用同一個rdd,避免創建更多的rdd,以減少開銷,減少算子的執行次數原則三:對多次使用的RDD進行持久化
比如下面的場景 val rdd1 = sc.textFile("hdfs://node1:9000/hello.txt") rdd1.map(..) rdd1.reduce(..) 我們觀察一下,發現第二個rdd1.reduce實際上它的執行過程,依舊是從rdd1的加載開始執行,而rdd1.map也是從 rdd1的加載數據開始執行,發現沒?無論我們愿意還是不愿意,我們都將rdd1的過程執行了兩次 那么遇到這種情況,我們就可以將rdd1進行持久化,這樣我們再次執行rdd1.reduce方法的時候實際上我們是從內存 中直接加載的rdd1,并未重新執行rdd1的加載過程。 正確代碼如下: val rdd1 = sc.textFile("hdfs://node1:9000/hello.txt").cache() rdd1.map(..).foreach() rdd1.reduce(..) 是的,接下來就要說明了一個問題了,cache()這個操作呢,需要action操作才能進行持久化。 那么都有哪些action 操作呢? 我來列舉一下 collect() first() take(n) takeSample(withReplacement, num, [seed]) takeOrdered(n, [ordering]) saveAsTextFile(path) saveAsSequenceFile(path) saveAsObjectFile(path) countByKey() foreach(func) 以上就是我們常用的action操作。 接下來我們來聊一聊持久化除了cache()這種的其他持久化方法 我們還可以使用persist方法指定其他形式的操作我們先來一個代碼進行展示一下如何設置 val rdd1 = sc.txttFile("hdfs://node1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER) 是的,就這樣子設置 然后再來讓我們認識一下持久化的級別都有哪些?持久化級別列表:
接下來讓我們聊一聊該如何設置這些級別,也就是如何選擇一種最合適的的持久化策略。1.默認情況下,性能最高的的當然是MEMORY_ONLY,但是前提是你的內存足夠大,可以綽綽有余的存放下整個rdd的所有數據。 因為不進行序列化和反序列化操作,就避免了這部分的性能開銷,對這個rdd的后續算子操作,都是基于純內存中的數據的操作。 不需要從磁盤文件中讀取數據,性能最高,而且不需要復制一份數據副本,并遠程傳送到其他節點上,但是這里要注意的時候,在 實際的生產環境中,恐怕能夠直接用到這種常見還是有限的,如果rdd中的數據較多的時候,比如有幾十億,直接用這種持久化級別 會導致jvm的oom內存溢出。2.如果上述使用MEMORY_ONLY級別時發生了內存溢出,那么建議嘗試使用MEMORY_ONLY_SER級別,該級別會將rdd數據序列化后 再保存到內存中,此時每個partition僅僅是一個字節數組而已,大大減少了對象數量,并降低了內存占用,這種級別比MEMORY_ONLY 多出來的性能開銷,主要就是序列化和反序列化的開銷,但是后續算子可以基于純內存進行操作,因此性能總體還是可以的,此外, 可能發生的問題同上,如果rdd中的數量過多的話,還是有可能導致OOM內存溢出。3.如果純內存的級別都無法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略,因為既然到了這一步 就說明rdd的數量很大,內存無法完全放下,序列化后的數據比較小,可以節省內存和磁盤的空間開銷,同時該策略優先盡量嘗試將數據 緩存在內存中,內存放不下,然后再寫入磁盤中。注意:通常不建議使用DISK_ONLY和后綴為2的級別:因為完全基于磁盤文件進行數據的讀寫,會導致性能急劇降低,有時還不如重新 計算一次所有的rdd,后綴為2的級別,必須將所有數據都復制一份副本,并發送到其他節點上,數據復制以及網絡傳輸會導致較大的 性能開銷,除非是要求作業的高可用性,否則不建議,其實在實際中,也沒人使用。原則四:盡量避免使用shuffle類算子
如果有可能的情況下,我們盡量避免使用shuffle類的算子,我們都是在spark的運行過程中,shuffle算子會從多個節點上 將key拉到同一個節點上,進行聚合或者join操作,在這個過程中,會產生大量的網絡磁盤IO,所以我們要盡量避免,以減少磁盤IO的 開銷。 舉個例子:val rdd3= rdd1.join(rdd2) 在這個過程中會把其他節點上的key通過網絡拉到一個節點上,這是錯誤的。遇到這種問題,我們就可以使用廣播變量的方式 比如: val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data)原則五:使用map-side預聚合的shuffle操作
當然了,在原則四的基礎上,我們可能有時候根本無法避免使用shuffle操作,那么這個時候我們就使用預聚合的shuffle操作算子 比如我們優先使用reduceBykey,而不是使用groupBykey算子怎么理解呢?我們要從reduceBykey和gropBykey的執行過程來說明這個過程。我們先說groupBykey操作,它是將其他節點上的數據先傳輸到reduce端,然后進行聚合的操作 然后是reduceBykey是現在Map端進行先聚合,然后再傳輸到reduce端 讀者發現了沒? groupBykey是將全量的數據進行傳輸,也就是原始數據進行傳輸 而reduceBykey呢?他是將聚合后的數據傳輸,這樣子實際上經過了聚合之后,數據量已經縮小了很大,極大的減少了網絡的傳輸IO 通過這樣方式,我們在無法避免使用shuffle操作算子的情況下,我們使用了一種更優化的方法來執行。原則六:使用高性能的算子
在原則五的基礎上,我們總結了一些算子是比較高性能的,我們優先選擇這些高性能的算子操作 第一個就是優先使用reduceBykey,而不是使用groupBYkey 第二個使用MapPartitions替代普通的map操作 我們知道map是對數據的每一條進行處理,比如我們有這么一個場景,我們需要把數據寫入到mysql中,如果我們使用map操作,我們則需要 一條一條的插入,這會產生大量的數據庫鏈接操作,但是使用MapPartitons的時候,我們可以一個分區一個分區的進行插入mysql,操作 量一下子就小了,但是也需要注意點的就是,因為MapPartions操作是進行分區操作,所以會產生內存溢出的問題,所以,我們在我們內存 夠用的時候使用MapPartitions更優于使用Map 第三個使用foreach Partitions代替foreache 這個其實和第二個及其相似,也就是我們能夠在批量處理的時候就盡量使用批量處理的函數,而不是使用單個出來的函數第四個使用filter之后進行coalesce操作 這里要說明一下coalesce和repartition操作都是進行分區操作 我們在使用filter通常是過濾操作,于是數據量就減少了,但是分區這個分區并沒有減少,所以,我們這里減少分區,則會優化一定的性能 一般建議,使用coalesce來減少分區,使用repartition來增加分區第五個使用repartitionAndSortWithinPartitions替代repartition與sort類操作 因為這兩個操作都是同樣的操作,但是官方建立使用前者更好原則六:廣播大變量
比如我們這里有一個場景val list1 = ... rdd1.map(x=>x.list1)假設上面的list1的數量級別為100M或者更大比如1個G,那么如果按照上面的代碼來看的話,因為list1是在driver端來形成的 當每個Executor中的task要使用的時候,就需要把list1中的數據傳輸到每一個task中,我們知道,對于一個Executor來說 它的內部可能被分配多個task,于是乎,每一個task需要一份數據,那么這個數據就被傳輸了和task數量級別的數據 比如一個Exector中有3個task,而list1中的數據為1個G,這個時候,我們就要傳輸3個G的大小,到Exector中,數量增大了很多 尤其是在內存不是那么大的情況下,這個內存溢出的可能性非常大。所以針對這種情況,我們使用廣播變量就會減少數據傳輸 比如下面這種寫法: val list1 = ... val listBradcast = sc.broadcast(list1) rdd1.map(x=>x.list1.value)在這種情況下,list1這個數據只會傳輸到Executor中保留一份,其他的所有task共享這一份數據,于是就跟task的個數無關, 原先傳輸3個G的情況下, 這個時候只會傳輸1個G。 數據量的減少就會減少網絡傳輸,就會增加算子的執行時間。原則八:使用Kryo優化序列性能
上面我們我們可以使用廣播變量將數據給廣播給Executor,以減少數據量的傳輸,但是實際上,spark默認就是 將數據進行序列化,那么默認情況下spark使用的是Java的序列化機制,如果這個時候一種更好的序列化方式豈不是更好嗎? 是的,在spark的官方文檔上,也就是我前言中寫到的官網的文檔地址中,官方建議是用Kryo這種序列化的算法更好。那么怎么設置這種呢?val conf = new SparkConf().setMaster(..).setAppName(..)zconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerizlizer") # 這里設置# 在某種情況下比如我們上面的注冊自定義的,則可以這樣設置conf.registerKryoClasses(Array(classOf[MyClass], classOf[Myclass2]))建議:在寫spark代碼的時候就直接先設置上,管他用不用原則九:優化數據結構
這個規則是官網給出的建議,但是呢,這個比較扯淡,看看就行了1.能用json字符串的不要用對象表示,因為對象頭額外占用16個字節,多個對象就會占用x乘以16個字節,而字符串始終占用40個字節 2.能不用字符串就用不用字符串,因為字符串占用40個字節,比如 能用 數字1 就不用 字符串 “1“ 3.盡量用數組代碼集合類型 4.上面的嘛看看就行了,你說不用對象,那么面向對象的思想何在,代碼的可讀性都沒有了,所以看一看注意一下即可原則十:盡可能的數據本地化
我們先來認識一下進程化級別: 1.PROCESS_LOCAl :進程本地化 代碼和數據在同一個進程中,也就是同一個Executor中,計算數據的task和數據在同一個Executor中,這樣子就不用從其他節點來 傳輸數據到task中 2.NODE_LOCAL: 節點本地化 也就是在做不到計算數據和task在同一個進程的情況下,我們考慮計算數據和task在同一個節點上,比如同一個服務器上,這樣子也可以 避免數據從其他節點上傳輸過來,也減少了網絡傳輸 3.NO_PREF:對于task來說,數據從哪里獲取都一樣,沒有好壞之分 4.RACK_LOCAL:機架本地化 在我們做不到進程本地化和節點本地化的時候,我們可以將task和計算數據進行在同一個機架上,這樣子也可以減少不同機架中的數據傳輸 5.ANY 上面的都做不到,那么只能任何地方都可以了,這種性能最差那么我們知道了上面的級別之后,我們該如何調優呢?在spark中,上面都是等待3秒不行,就去換到下一個級別 所以我們可以設置的多一些 spark.locality.wait = 3sspark.locality.wait.process 30s 我們讓他等待30s spark.locality.wait.node 30s 等待30s總結
以上就是開發spark的十大開發原則,關注這些會讓你的代碼更優化,更有優化效率,另外,點擊關注,后面我們寫
如何spark內存模型的調優,知乎會推送給您的,在您點擊關注之后。
總結
以上是生活随笔為你收集整理的spark如何防止内存溢出_spark开发十大原则的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 部队退休干部可以在私企打工吗
- 下一篇: appscan无法连接到服务器_对于cs