Flink 流式计算在节省资源方面的简单分析
本文由小米的王加勝同學分享,文章介紹了 Apache Flink 在小米的發展,從 Spark Streaming 遷移到 Flink ,在調度計算與調度數據、Mini batch 與 streaming、數據序列化等方面對比了 Spark Streaming 和 Flink 的一些區別。
?
Flink 在小米的發展簡介
?
?
小米在流式計算方面經歷了 Storm、Spark Streaming 和 Flink 的發展歷程;從2019 年 1 月接觸 Flink 到現在,已經過去了大半年的時間了。對 Flink 的接觸越深,越能感受到它在流式計算方面的強大能力;無論是實時性、時間語義還是對狀態計算的支持等,都讓很多之前需要復雜業務邏輯實現的功能轉變成了簡潔的 API 調用。還有不斷完善的 Flink SQL 功能也讓人充滿期待,相信實時數據分析的門檻會越來越低,更多的業務能夠挖掘出數據更實時更深入的價值。
?
在這期間,我們逐步完善了穩定性、作業管理、日志和監控收集展示等關系到用戶易用性和運維能力的特性,幫助越來越多的業務接入到了 Flink。
?
流式作業管理服務的界面:
?
?
Flink作業的監控指標收集展示:
?
?
Flink 作業異常日志的收集展示:
?
?
Spark?Streaming 遷移到 Flink 的效果小結
?
在業務從 Spark Streaming 遷移到 Flink 的過程中,我們也一直在關注著一些指標的變化,比如數據處理的延遲、資源使用的變化、作業的穩定性等。其中有一些指標的變化是在預期之中的,比如數據處理延遲大大降低了,一些狀態相關計算的“準確率”提升了;但是有一項指標的變化是超出我們預期的,那就是節省的資源。
?
信息流推薦業務是小米從 Spark Streaming 遷移到 Flink 流式計算最早也是使用 Flink 最深的業務之一,在經過一段時間的合作優化后,對方同學給我們提供了一些使用效果小結,其中有幾個關鍵點:
?
-
對于無狀態作業,數據處理的延遲由之前 Spark Streaming 的 16129ms 降低到 Flink 的 926ms,有 94.2% 的顯著提升(有狀態作業也有提升,但是和具體業務邏輯有關,不做介紹);
-
對后端存儲系統的寫入延遲從 80ms 降低到了 20ms 左右,如下圖(這是因為 Spark Streaming 的 mini batch 模式會在 batch 最后有批量寫存儲系統的操作,從而造成寫請求尖峰,Flink 則沒有類似問題):
?
?
-
對于簡單的從消息隊列 Talos 到存儲系統 HDFS 的數據清洗作業(ETL),由之前 Spark Streaming 的占用 210 個 CPU Core 降到了 Flink 的 32 個 CPU Core,資源利用率提高了 84.8%;
?
其中前兩點優化效果是比較容易理解的,主要是第三點我們覺得有點超出預期。為了驗證這一點,信息流推薦的同學幫助我們做了一些測試,嘗試把之前的 Spark Streaming 作業由 210 個 CPU Core 降低到 64 個,但是測試結果是作業出現了數據擁堵。這個 Spark Streaming 測試作業的 batch interval 是 10s,大部分 batch 能夠在 8s 左右運行完,偶爾抖動的話會有十幾秒,但是當晚高峰流量上漲之后,這個 Spark Streaming 作業就會開始擁堵了,而 Flink 使用 32 個 CPU Core 卻沒有遇到擁堵問題。
?
很顯然,更低的資源占用幫助業務更好的節省了成本,節省出來的計算資源則可以讓更多其他的業務使用;為了讓節省成本能夠得到“理論”上的支撐,我們嘗試從幾個方面研究并對比了 Spark Streaming 和 Flink 的一些區別:
?
調度計算 VS 調度數據
?
對于任何一個分布式計算框架而言,如果“數據”和“計算”不在同一個節點上,那么它們中必須有一個需要移動到另一個所在的節點。如果把計算調度到數據所在的節點,那就是“調度計算”,反之則是“調度數據”;在這一點上 Spark Streaming 和 Flink 的實現是不同的。
?
?
Spark 的核心數據結構RDD包含了幾個關鍵信息,包括數據的分片(partitions)、依賴(dependencies)等,其中還有一個用于優化執行的信息,就是分片的“preferred locations”
// RDD/** * Optionally overridden by subclasses to specify placement preferences. */protected def getPreferredLocations(split: Partition): Seq[String] = Nil
?
這個信息提供了該分片數據的位置信息,即所在的節點;Spark 在調度該分片的計算的時候,會盡量把該分片的計算調度到數據所在的節點,從而提高計算效率。比如對于 KafkaRDD,該方法返回的就是 topic partition 的 leader 節點信息:
?
// KafkaRDDoverride def getPreferredLocations(thePart: Partition): Seq[String] = { val part = thePart.asInstanceOf[KafkaRDDPartition] Seq(part.host) // host: preferred kafka host, i.e. the leader at the time the rdd was created }?
”調度計算”的方法在批處理中有很大的優勢,因為“計算”相比于“數據”來講一般信息量比較小,如果“計算”可以在“數據”所在的節點執行的話,會省去大量網絡傳輸,節省帶寬的同時提高了計算效率。但是在流式計算中,以 Spark Streaming 的調度方法為例,由于需要頻繁的調度”計算“,則會有一些效率上的損耗。
?
首先,每次”計算“的調度都是要消耗一些時間的,比如“計算”信息的序列化 → 傳輸 → 反序列化 → 初始化相關資源 → 計算執行→執行完的清理和結果上報等,這些都是一些“損耗”。
?
另外,用戶的計算中一般會有一些資源的初始化邏輯,比如初始化外部系統的客戶端(類似于 Kafka Producer 或 Consumer);每次計算的重復調度容易導致這些資源的重復初始化,需要用戶對執行邏輯有一定的理解,才能合理地初始化資源,避免資源的重復創建;這就提高了使用門檻,容易埋下隱患;通過業務支持發現,在實際生產過程中,經常會遇到大并發的 Spark Streaming 作業給 Kafka 或 HBase 等存儲系統帶來巨大連接壓力的情況,就是因為用戶在計算邏輯中一直重復創建連接。
?
Spark 在官方文檔提供了一些避免重復創建網絡連接的示例代碼,其核心思想就是通過連接池來復用連接:
rdd.foreachPartition { partitionOfRecords =>// ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }}?
需要指出的是,即使用戶代碼層面合理的使用了連接池,由于同一個“計算”邏輯不一定調度到同一個計算節點,還是可能會出現在不同計算節點上重新創建連接的情況。
?
?
Flink 和 Storm 類似,都是通過“調度數據”來完成計算的,也就是“計算邏輯”初始化并啟動后,如果沒有異常會一直執行,源源不斷地消費上游的數據,處理后發送到下游;有點像工廠里的流水線,貨物在傳送帶上一直傳遞,每個工人專注完成自己的處理邏輯即可。
?
雖然“調度數據”和“調度計算”有各自的優勢,但是在流式計算的實際生產場景中,“調度計算”很可能“有力使不出來”;比如一般流式計算都是消費消息隊列 Kafka或 Talos 的數據進行處理,而實際生產環境中為了保證消息隊列的低延遲和易維護,一般不會和計算節點(比如 Yarn 服務的節點)混布,而是有各自的機器(不過很多時候是在同一機房);所以無論是 Spark 還是 Flink,都無法避免消息隊列數據的跨網絡傳輸。所以從實際使用體驗上講,Flink 的調度數據模式,顯然更容易減少損耗,提高計算效率,同時在使用上更符合用戶“直覺”,不易出現重復創建資源的情況。
?
不過這里不得不提的一點是,Spark Streaming 的“調度計算”模式,對于處理計算系統中的“慢節點”或“異常節點”有天然的優勢。比如如果 Yarn 集群中有一臺節點磁盤存在異常,導致計算不停地失敗,Spark 可以通過 blacklist 機制停止調度計算到該節點,從而保證整個作業的穩定性。或者有一臺計算節點的 CPU Load 偏高,導致處理比較慢,Spark 也可以通過 speculation 機制及時把同一計算調度到其他節點,避免慢節點拖慢整個作業;而以上特性在 Flink 中都是缺失的。
?
Mini?batch VS?streaming
?
?
Spark Streaming 并不是真正意義上的流式計算,而是從批處理衍生出來的 mini batch 計算。如圖所示,Spark 根據 RDD 依賴關系中的 shuffle dependency 進行作業的 Stage 劃分,每個 Stage 根據 RDD 的 partition 信息切分成不同的分片;在實際執行的時候,只有當每個分片對應的計算結束之后,整個 Stage 才算計算完成。
?
?
這種模式容易出現“長尾效應”,比如如果某個分片數據量偏大,那么其他分片也必須等這個分片計算完成后,才能進行下一輪的計算(Spark speculation對這種情況也沒有好的作用,因為這個是由于分片數據不均勻導致的),這樣既增加了其他分片的數據處理延遲,也浪費了資源。
?
?而 Flink 則是為真正的流式計算而設計的(并且把批處理抽象成有限流的數據計算),上游數據是持續發送到下游的,這樣就避免了某個長尾分片導致其他分片計算“空閑”的情況,而是持續在處理數據,這在一定程度上提高了計算資源的利用率,降低了延遲。
?
?
當然,這里又要說一下 mini batch 的優點了,那就在異常恢復的時候,可以以比較低的代價把缺失的分片數據恢復過來,這個主要歸功于 RDD 的依賴關系抽象;如上圖所示,如果黑色塊表示的數據丟失(比如節點異常),Spark 僅需要通過重放“Good-Replay”表示的數據分片就可以把丟失的數據恢復,這個恢復效率是很高的。
?
?
而 Flink 的話則需要停止整個“流水線”上的算子,并從 Checkpoint 恢復和重放數據;雖然 Flink 對這一點有一些優化,比如可以配置 failover strategy 為 region 來減少受影響的算子,不過相比于 Spark 只需要從上個 Stage 的數據恢復受影響的分片來講,代價還是有點大。
?
總之,通過對比可以看出,Flink 的 streaming 模式對于低延遲處理數據比較友好,Spark 的 mini batch 模式則于異常恢復比較友好;如果在大部分情況下作業運行穩定的話,Flink 在資源利用率和數據處理效率上確實更占優勢一些。
?
數據序列化
?
?
簡單來說,數據的序列化是指把一個 object 轉化為 byte stream,反序列化則相反。序列化主要用于對象的持久化或者網絡傳輸。
?
常見的序列化格式有 binary、json、xml、yaml 等;常見的序列化框架有 Java 原生序列化、Kryo、Thrift、Protobuf、Avro等。
?
對于分布式計算來講,數據的傳輸效率非常重要。好的序列化框架可以通過較低????的序列化時間和較低的內存占用大大提高計算效率和作業穩定性。在數據序列化上,Flink 和 Spark 采用了不同的方式;Spark 對于所有數據默認采用 Java 原生序列化方式,用戶也可以配置使用 Kryo;而 Flink 則是自己實現了一套高效率的序列化方法。
?
首先說一下 Java 原生的序列化方式,這種方式的好處是比較簡單通用,只要對象實現了 Serializable 接口即可;缺點就是效率比較低,而且如果用戶沒有指定 serialVersionUID 的話,很容易出現作業重新編譯后,之前的數據無法反序列化出來的情況(這也是 Spark Streaming Checkpoint 的一個痛點,在業務使用中經常出現修改了代碼之后,無法從 Checkpoint 恢復的問題);當然Java原生序列化還有一些其他弊端,這里不做深入討論。
?
有意思的是,Flink 官方文檔里對于不要使用Java原生序列化強調了三遍,甚至網上有傳言 Oracle 要拋棄 Java 原生序列化:
?
?
相比于 Java 原生序列化方式,無論是在序列化效率還是序列化結果的內存占用上,Kryo 則更好一些(Spark 聲稱一般 Kryo 會比 Java 原生節省 10x 內存占用);Spark 文檔中表示它們之所以沒有把 Kryo 設置為默認序列化框架的唯一原因是因為 Kryo 需要用戶自己注冊需要序列化的類,并且建議用戶通過配置開啟 Kryo。
?
雖然如此,根據 Flink 的測試,Kryo 依然比 Flink 自己實現的序列化方式效率要低一些;如圖所示是 Flink 序列化器(PojoSerializer、RowSerializer、TupleSerializer)和 Kryo 等其他序列化框架的對比,可以看出 Flink 序列化器還是比較占優勢的:
?
?
那么 Flink 到底是怎么做的呢?網上關于 Flink 序列化的文章已經很多了,這里我簡單地說一下我的理解。
?
像 Kryo 這種序列化方式,在序列化數據的時候,除了數據中的“值”信息本身,還需要把一些數據的 meta 信息也寫進去(比如對象的 Class 信息;如果是已經注冊過的 Class,則寫一個更節省內存的 ID)。
?
?
但是在 Flink 場景中則完全不需要這樣,因為在一個 Flink 作業 DAG 中,上游和下游之間傳輸的數據類型是固定且已知的,所以在序列化的時候只需要按照一定的排列規則把“值”信息寫入即可(當然還有一些其他信息,比如是否為 null)。
?
?
如圖所示是一個內嵌 POJO 的 Tuple3 類型的序列化形式,可以看出這種序列化方式非常地“緊湊”,大大地節省了內存并提高了效率。另外,Flink 自己實現的序列化方式還有一些其他優勢,比如直接操作二進制數據等。
?
凡事都有兩面性,自己實現序列化方式也是有一些劣勢,比如狀態數據的格式兼容性(State Schema Evolution);如果你使用 Flink 自帶的序列化框架序進行狀態保存,那么修改狀態數據的類信息后,可能在恢復狀態時出現不兼容問題(目前 Flink僅支持 POJO 和 Avro 的格式兼容升級)。
?
另外,用戶為了保證數據能使用Flink自帶的序列化器,有時候不得不自己再重寫一個 POJO 類,把外部系統中數據的值再“映射”到這個 POJO 類中;而根據開發人員對 POJO 的理解不同,寫出來的效果可能不一樣,比如之前有個用戶很肯定地說自己是按照 POJO 的規范來定義的類,我查看后發現原來他不小心多加了個 logger,這從側面說明還是有一定的用戶使用門檻的。
?
//?Not?a?POJO?demo.public?class?Person?{??private?Logger?logger?=?LoggerFactory.getLogger(Person.class);??public?String?name;??public?int?age;}?
針對這一情況我們做了一些優化嘗試,由于在小米內部很多業務是通過 Thrfit 定義的數據,正常情況下 Thrift 類是通過 Kryo 的默認序列化器進行序列化和反序列化的,效率比較低。雖然官方提供了優化文檔,可以通過如下方式進行優化,但是對業務來講也是存在一定使用門檻;
?
final?ExecutionEnvironment?env?=?ExecutionEnvironment.getExecutionEnvironment(); // register the serializer included with Apache Thrift as the standard serializer// TBaseSerializer states it should be initialized as a default Kryo serializerenv.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);?
于是我們通過修改 Flink 中 Kryo 序列化器的相關邏輯,實現了對 Thrfit 類默認使用 Thrift 自己序列化器的優化,在大大提高了數據序列化效率的同時,也降低了業務的使用門檻。
?
總之,通過自己定制序列化器的方式,確實讓Flink在數據處理效率上更有優勢,這樣作業就可以通過占用更低的帶寬和更少的計算資源完成計算了。
?
本文小結
?
Flink 和 Spark Streaming 有非常大的差別,也有各自的優勢,這里我只是簡單介紹了一下自己淺薄的理解,不是很深入。不過從實際應用效果來看,Flink 確實通過高效的數據處理和資源利用,實現了成本上的優化;希望能有更多業務可以了解并試用 Flink,后續我們也會通過 Flink SQL 為更多業務提供簡單易用的流式計算支持,謝謝!
?
參考文獻
?
{1}《Deep Dive on Apache Flink State》 - Seth Wiesman
https://www.slideshare.net/dataArtisans/webinar-deep-dive-on-apache-flink-state-seth-wiesman
{2}Flink 原理與實現:內存管理
https://ververica.cn/developers/flink-principle-memory-management
{3}Batch Processing — Apache Spark
https://blog.k2datascience.com/batch-processing-apache-spark-a67016008167
總結
以上是生活随笔為你收集整理的Flink 流式计算在节省资源方面的简单分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux下的查找命令
- 下一篇: 集群分发文件脚本