日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

从 Hive 大规模迁移作业到 Spark 在有赞的实践

發布時間:2024/2/28 编程问答 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从 Hive 大规模迁移作业到 Spark 在有赞的实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

編者薦語:

本文介紹了有贊遷移 Hive 做到到 SparkSQL 上所做的一些改進,以及如何做到 SparkSQL 占比提升到 91% 以上,最后也分享一些在 Spark 踩過的坑和經驗希望能幫助到大家。

以下文章來源于有贊coder?,作者有贊技術

作者:胡加華

團隊:大數據團隊

?

一、前言

在 2019 年 1 月份的時候,我們發表過一篇博客 SparkSQL在有贊大數據的實踐,里面講述我們在 Spark 里所做的一些優化和任務遷移相關的內容。本文會接著上次的話題繼續講一下我們之后在 SparkSQL 上所做的一些改進,以及如何做到 SparkSQL 占比提升到 91% 以上,最后也分享一些在 Spark 踩過的坑和經驗希望能幫助到大家。

本文主要的內容包括:

  • Thrift Server 穩定性建設

  • SparkSQL 第二次遷移

  • 一些踩坑和經驗

二、Thrift Server 穩定性建設

隨著完成了 Hive 到 SparkSQL 的第一次遷移,SparkSQL 任務占比一下子達到了 60% 以上,而隨之而來的是各種問題(包括用戶的任務失敗和服務不可用等),服務的穩定性受到了比較大的挑戰。

首先講一下我們使用 SparkSQL 的部署方式,我們是以部署 Thrift Server 服務提供 JDBC 訪問方式,即 YARN client 的部署方式。離線計算的調度任務以 beeline 的方式使用 Thrift Server,同時其他平臺應用以 JDBC 的連接接入服務,比如提供 Ad-hoc 查詢服務應用,數據質量檢驗服務應用等。

以 Thrift Server 方式的好處是由一個 Driver 來管理用于執行任務的 Executor 池,并發的執行不同的 SQL 任務可以讓 Executor 資源的利用率達到最高。而如果選擇 $SPARK_HOME/bin/spark-sql 方式來執行 SQL 的方式可以讓每一個 SQL 任務獨享一個 Driver ,不會受到其他 SQL 任務影響(特別是一些 SQL 可能導致 Driver OOM ),對比 Thrift Server 更加穩定;但是 YARN 資源利用率不高,比如啟動 YARN Application 的 Overhead ,向 YARN 申請和釋放 Executor 的 Overhead,SQL 任務傾斜導致的 Executor 空閑浪費等。考慮到目前我們大部分 SQL 任務的執行時間很短,基本在 3 分鐘之內,單個 Task 執行時間很短,所以選擇 Thrift Server 相對合適一些。

基于上面所說的部署方式,我們開展了以下主要的穩定性建設工作:

  • AB 測試灰度發布功能

  • Spark Metric 指標的采集、分析以及處理

  • 基于 SQL 引擎選擇服務攔截不合適的 SQL 任務

2.1 AB 灰度測試

在 Hive 遷移到 SparkSQL 這段時間,yz-spark (基于社區的二次開發版本)的迭代和配置變更開始有些頻繁。我們希望有一套自定義的 AB 測試的解決方案來降低上線風險,特別對一些大的迭代版本和影響比較大的變更。

這套 AB 測試灰度發布不能簡單的只是基于任務流量,我們希望先灰度一些非核心低優先級的 SQL 任務并且在白天時間段(低峰)來驗證。有贊大數據離線調度任務是基于 Apache Airflow 為基礎構建,因此實現方式是通過擴展 Airflow 增加了一些路由配置來支持 SparkSQL 任務可以按 優先級、 時間段、 流量比例等配置的AB測試功能。

這套 AB 灰度測試方案在整個遷移過程還是發揮出比較大的作用。比如我們將 yz-spark 從社區版本 2.2.1 rebase 到 2.3.3 版本,又或者為節省內存資源給算法任務而將Executor 內存配置從 3G 調整到 2G 等比較重大的變更,都降低了一些坑產生的影響。

2.2 Spark Metric 采集

Spark 在 Job 運行過程中會記錄很多 Metric 信息包括 Job 、Stage 、Task 和Executor 等幾個方面。Spark 提供了 REST API 來查詢 Metrics 數據,需要開啟 Spark UI 的配置,不過需要注意是 Spark Driver在內存里保存最近運行 Job 信息,所以歷史數據無法查詢。另外 Spark 支持開啟 EventLog 將相關的 Spark Event(攜帶了相關 Metrics 數據)進行持久化(比如配置 HDFS )。REST API 和 EventLog 功能的詳細說明可以查看官方資料 (https://spark.apache.org/docs/latest/monitoring.html )。

結合了 REST API 和 EventLog 功能,我們搭建一個 spark-monitor 應用。這個應用主要職責是近實時的讀取 EventLog 產生的 Spark 事件,通過事件回放并結合 REST API 最終形成我們需要的 Job 數據,并寫到 Hbase 表保存。基于采集到的數據主要有以下幾個方面的用途:

  • 實時預警和干預。通過實時收集的 Job 數據,我們可以分析出哪些正在運行的 Job 是異常的,比如耗時非常久,產生很多 Task 等等場景,然后針對異常場景的 Action 可以是 Alarm 或者 Kill Job 等。

?

  • 離線分析。每天從 Hbase 離線的同步到hive表做一些離線分析,比如統計存在 Failed Task 的任務、Peak Execution Memory 使用比較高的任務,或者數據傾斜的任務等。找出已發生問題或者潛在問題的任務,去優化 SQL 任務或者分析原因并反哺去調校 Thrift Server 配置。

    ?

  • 歷史 Job 數據保存,用于排查歷史任務的日志查找和分析( Spark UI 因為內存限制無法保留歷史任務的記錄)。

2.3 基于引擎選擇的 SQL 攔截

我們開發了一套 SQL 引擎選擇服務,他的主要職責是給 Ad-hoc 服務增加了 SQL 智能選擇的能力。有贊的大數據離線計算提供了 Presto/SparkSQL/Hive 三種引擎選擇,大數據經驗比較弱的用戶在執行 Ad-hoc 的 SQL 時往往不知道該怎么選擇。而 SQL 引擎選擇通過 SQL 解析,語法檢查,規則匹配,各個引擎的資源負載情況等等因素最終給用戶選擇合適的引擎來執行計算。

SparkSQL 正是通過添加一些自定義規則來攔截對 Spark 引擎不合適的 SQL 任務,從而提高 Spark 服務的穩定性。比如 Not in Subquery, CrossJoin 等 SQL 場景。

三、SparkSQL 第二次遷移

第一次做的遷移主要完成了 P4 和 P5 低優先級的任務。在生產上經過一段時間的充分驗證后,并且在 Spark Thrift Server 的穩定性得到極大的提升之后,然后就開始了第二次大規模的 Hive 到 Spark 的遷移,完成了 P1 ~ P3的所有適合任務。截止目前執行引擎選擇的作業數中 SparkSQL 占比已經提升到 91% 以上。

而之所以把核心任務也遷移到SparkSQL,這樣的做的好處有兩個:

  • 節約離線集群資源成本。通過?tpcds?的性能測試,同等資源情況下 SparkSQL 是 Hive 的?2~10?倍(不同的 query 性能表現不一樣)。在生產驗證下來大部分確實有差不多?2?倍的性能。

  • 計算資源更合理的分配。由于 Spark 自身實現任務調度和資源分配,可以通過它已有的功能針對不同優先級的任務配置不同的資源配額。比如原先使用 Hive 時每一個 SQL 任務的 map 或者 reduce 并發數默認都是一樣的,而使用 SparkSQL 時可以讓?資源的比例按優先級傾斜(即 scheduler pool 的功能)。

四、踩坑和經驗

在使用 Spark 過程中,我們不可避免的踩過一些坑,也積累了一些經驗。從這些經驗積累選擇了一部分想分享一下,希望對正在使用 Spark 的同學有所幫助。

4.1 spark.sql.autoBroadcastJoinThreshold

這個配置在大家使用 SparkSQL 的時候會比較熟悉,在 join 的場景判斷相關的表是否可以使用 BroadcastJoin ,默認閥值是 10 MB。目前閥值判斷的比較邏輯會參考幾個因素:文件的大小和 字段選擇的裁剪。比如某張 Hive 表的數據大小為 13 MB , 表 schema 為 struct<id:long,name:string>,而假設當前 SQL 只使用到 name 字段,那根據字段選擇情況并對文件大小進行裁剪估算所需總字節的公式為:20/(8+20)*13約等于9.3MB(各個字段類型有不同的估算字節,比如long 是 8 個字節 ,string 是 20 個字節等),從而滿足 BroadcastJoin 的條件。但是這里有幾種情況需要額外考慮:1、表存儲格式帶來的差異,比如 使用 ZLIB 壓縮的 ORC 格式跟 TEXT 格式就在數據存儲上的文件大小可能會差很多,即使兩張表都是 ORC 格式,壓縮率的差異也是存在; 2、字段字節估算本身就有一定的誤差,比如 string 字段認為是 20 個字節,對于一些極端情況的 string 大字段,那估算誤差就會比較大; 3、讀取 Hive 表的 "raw" 數據到內存然后展開成 Java 對象,內存的消耗也有一定放大系數。所以 10M 的閥值,最終實際上需要的內存可能達到 1G,這個我們也是在生產環境上碰到過。

4.2 spark.blacklist.enabled

Spark 針對 Task 失敗有重試機制,但是當一個 Task 在某一臺 host上的 一個 Spark Executor 實例執行失敗,下一次重試調度因為考慮 Data Locality 還是會大概率的選擇那個 host 上的 Executor。如果失敗是因為 機器壞盤引起的,那重試還是會失敗,重試次數達到最大后那最終整個 Job 失敗。而開啟 blacklist 功能可以解決此類問題,將發生失敗的 Executor 實例或者 host 添加到黑名單,那么重試可以選擇其他實例或者 host ,從而提高任務的 容錯能力。

4.3 spark.scheduler.pool

當我們的調度離線計算 SQL 任務,大部分都使用 SparkSQL 帶來的問題是有些低優先級的任務可能會消耗很多 Executor 資源,從而讓高優先級的任務一直得不到充分的資源去完成任務。我們希望資源調度的策略是讓優先級高的任務優先得到資源,所以使用 FairScheduler 策略,并配置不同 資源權重的 Pool 給不同優先級的任務。

注:Spark 2.4 以下版本的這個功能存在 SPARK-26992 的問題,在不指定 pool 的情況下可能不會使用默認 pool 。

4.4 spark.sql.adaptive.enabled

adaptive 功能支持 shuffle 下游 stage 可以根據上游 stage 產生的 shuffle 數據量自動調節下游 stage 的 task 數,這個功能我們主要是為了解決 Hive 表數據表很多 小文件的問題(Map Only 的 SQL 場景不起作用)。adaptive 功能在 Spark 1.6 版本就已經支持,但是我們目前 yz-spark 版本合入是Intel 實現的增強版本(該版本還實現了另兩個功能:動態調整執行計劃和 動態處理數據傾斜),目前官方版本還沒有合入(https://github.com/Intel-bigdata/spark-adaptive)

4.5?SPARK-24809

?

這是一個 correctness 的 bug, 在 broadcast join 的情況下可能會發生數據結果不正確的情況。當用于 broadcast 的 LongToUnsafeRowMap 對象如果被多次的序列化反序列化就會觸發,導致野指針的產生,從而可能產生不正確的結果。當時這個問題導致我們一張核心倉庫表產生錯誤數據。由于這個 bug 幾周才偶現一次,復現的困難導致花費了一個月時間才定位原因。這次教訓也讓我們意識到需要經常的去關注社區版本的迭代,及早發現特別是那些比較嚴重的 bug fix,避免生產上的故障。

4.6?SPARK-26604

這是 Spark External Shuffle 的一個內存泄漏 bug ,所以在開啟該功能的情況才會觸發。它在某些場景下會導致 NodeManager 的 ExternalShuffleService 開始內存泄漏,這個內存泄漏比較大的危害是導致一個 HashMap 對象變的越來越大,最終導致 shuffle fetch 請求越來越慢(每次 fetch 請求需要對這個 HashMap 的 values 進行 sum 統計,這個邏輯變慢),從而最終導致了我們生產環境的離線任務耗時時間在某天突然多了 30% 以上。

4.7 低效且危險的 Not in Subquery

舉個例子, select * from t1 where f1 not in (select f1 from t2),對于 “ not in subquery ”的場景最終都會選擇 BroadcastNestedLoopJoinExec 物理執行計劃,而 BroadcastNestedLoopJoinExec 是一個非常低效的物理執行計劃,內部實現將 subquery broadcast 成一個 list,然后 t1 每一條記錄通過 loop 遍歷 list 去匹配是否存在。由于它的低效可能會長時間占用 executor 資源,同時 subquery 結果數據量比較大的情況下,broadcast 可能帶來 driver 的 OOM 風險。

四、結語

至今,有贊大數據離線計算從 Hive 切換到 SparkSQL 達到了一個階段性的里程碑。雖然 SparkSQL 對比 Hive 的穩定性有所不如,特別是內存管理上一些不完善導致各種內存所引發的問題,但是性能上非常明顯的優勢也值得作為一種新的選擇,我們也一直努力著希望將 SparkSQL 調校成具有 Hive 一樣的穩定性。

總結

以上是生活随笔為你收集整理的从 Hive 大规模迁移作业到 Spark 在有赞的实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。