bilibili基于 Flink 的机器学习工作流平台在 b 站的应用
分享嘉賓:張楊,B 站資深開發工程師
導讀:整個機器學習的過程,從數據上報、到特征計算、到模型訓練、再到線上部署、最終效果評估,整個流程非常冗長。在 b 站,多個團隊都會搭建自己的機器學習鏈路,來完成各自的機器學習需求,工程效率和數據質量都難以保證。于是我們基于 Flink 社區的 aiflow 項目,構建了整套機器學習的標準工作流平臺,加速機器學習流程構建,提升多個場景的數據實效和準確性。本次分享將介紹 b 站的機器學習工作流平臺 ultron 在 b 站多個機器學習場景上的應用。目錄:
1、機器學習實時化
2、Flink 在 B 站機器學習的使用
3、機器學習工作流平臺構建
4、未來規劃
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
一、機器學習實時化
首先講下機器學習的實時化,主要是分為三部分:
- 第一是樣本的實時化。傳統的機器學習,樣本全部都是 t+1,也就是說,今天模型用的是昨天的訓練數據,每天早上使用昨天的全天數據訓練一次模型;
- 第二是特征的實時化。以前的特征也基本都是 t+1,這樣就會帶來一些推薦不準確的問題。比如,今天我看了很多新的視頻,但給我推薦的卻還是一些昨天或者更久之前看到的內容;
- 第三就是模型訓練的實時化。我們有了樣本的實時化和特征的實時化之后,模型訓練也是完全可以做到在線訓練實時化的,能帶來更實時的推薦效果。
傳統離線鏈路
上圖是傳統的離線鏈路圖,首先是 APP 產生日志或者服務端產生 log,整個數據會通過數據管道落到 HDFS 上,然后每天 t+1 做一些特征生成和模型訓練,特征生成會放到特征存儲里面,可能是 redis 或者一些其他的 kv 存儲,再給到上面的 inference 在線服務。
傳統離線鏈路的不足
那它有什么問題呢?
- 第一是 t+1 數據模型的特征時效性都很低,很難做到特別高時效性的更新;
- 第二是整個模型訓練或者一些特征生產的過程中,每天都要用天級的數據,整個訓練或者特征生產的時間非常長,對集群的算力要求非常高。
實時鏈路
上圖我們進行優化之后整個實時鏈路的過程,紅叉的部分是被去掉的。整個數據上報后通過 pipeline 直接落到實時的 kafka,之后會做一個實時特征的生成,還有實時樣本的生成,特征結果會寫到 feature store 里面去,樣本的生成也需要從 feature store 里面去讀取一些特征。
生成完樣本之后我們直接進行實時訓練。整個右邊的那個很長的鏈路已經去掉了,但是離線特征的部分我們還是保存了。因為針對一些特殊特征我們還是要做一些離線計算,比如一些特別復雜不好實時化的或者沒有實時化需求的。
二、Flink 在 b 站機器學習的使用
下面講下我們是怎么做到實時樣本、實時特征和實時效果評估的。
- 第一個是實時樣本。Flink 目前托管 b 站所有推薦業務樣本數據生產流程;
- 第二個是實時特征。目前相當一部分特征都使用了 Flink 進行實時計算,時效性非常高。有很多特征是使用離線 + 實時組合的方式得出結果,歷史數據用離線算,實時數據用 Flink,讀取特征的時候就用拼接。
但是,這兩套計算邏輯有的時候不能復用,所以我們也在嘗試使用 Flink 做批流一體,將特征的定義全部用 Flink 來做,根據業務需要,實時算或者離線算,底層的計算引擎全部是 Flink;
- 第三是實時效果的一個評估,我們使用了 Flink+olap 來打通整個實時計算 + 實時分析鏈路,進行最終的模型效果評估。
實時樣本生成
上圖是目前實時樣本的生成,是針對整個推薦業務鏈路的。日志數據落入 kafka 后,首先我們做一個 Flink 的 label-join,把點擊和展現進行拼接。結果繼續落入 kafka 后,再接一個 Flink 任務進行特征 join,特征 join 會拼接多個特征,有些特征是公域特征,有些是業務方的私域特征。特征的來源比較多樣,有離線也有實時。特征全部補全之后,就會生成一個 instance 樣本數據落到 kafka,給后面的訓練模型使用。
實時特征生成
上圖是實時特征的生成,這邊列的是一個比較復雜的特征的過程,整個計算流程涉及到了 5 個任務。第一個任務是離線任務,后面有 4 個 Flink 任務,一系列復雜計算后生成的一個特征落到 kafka 里面,再寫入 feature-store,然后被在線預測或者實時訓練所用到。
實時效果評估
上圖是實時效果的評估,推薦算法關注的一個非常核心的指標就是 ctr 點擊率,做完 label-join 之后,就可以算出 ctr 數據了,除了進行下一步的樣本生成之外,同時會導一份數據到 clickhouse 里面,報表系統對接后就可以看到非常實時的效果。數據本身會帶上實驗標簽,在 clickhouse 里面可以根據標簽進行實驗區分,看出對應的實驗效果。
三、機器學習工作流平臺構建
痛點
- 機器學習的整個鏈路里面有樣本生成、特征生成、訓練、預測、效果評估,每個部分都要配置開發很多任務,一個模型的上線最終需要橫跨多個任務,鏈路非常長。
- 新的算法同學很難去理解這個復雜鏈路的全貌,學習成本極高。
- 整個鏈路的改動牽一發而動全身,非常容易出故障。
- 計算層用到多個引擎,批流混用,語義很難保持一致,同樣的邏輯要開發兩套,保持沒有 gap 也很困難。
- 整個實時化成本門檻也比較高,需要有很強的實時離線能力,很多小的業務團隊在沒有平臺支持下難以完成。
上圖是一個模型從數據準備到訓練的大概過程,中間涉及到了七八個節點,那我們能不能在一個平臺上完成所有的流程操作?我們為什么要用 Flink?是因為我們團隊實時計算平臺是基于 Flink 來做的,我們也看到了 Flink 在批流一體上的潛力以及在實時模型訓練和部署上一些未來發展路徑。
引入 Aiflow
Aiflow 是阿里的 Flink 生態團隊開源的一套機器學習工作流平臺,專注于流程和整個機器學習鏈路的標準化。去年八、九月份,我們在和他們接觸后,引入了這樣一套系統,一起共建完善,并開始逐漸在 b 站落地。它把整個機器學習抽象成圖上的 example、transform 、Train、validation、inference 這些過程。在項目架構上非常核心的能力調度就是支持流批混合依賴,元數據層支持模型管理,非常方便的進行模型的迭代更新。我們基于此搭建了我們的機器學習工作流平臺。
平臺特性
接下來講一下平臺特性:
- 第一是使用 Python 定義工作流。在 ai 方向,大家用 Python 還是比較多的,我們也參考了一些外部的,像 Netflix 也是使用 Python 來定義這種機器學習的工作流。
- 第二是支持批流任務混合依賴。在一個完整鏈路里面,涉及到的實時離線過程都可以加入到里面,并且批流任務之間可以通過信號就行互相依賴。
- 第三是支持一鍵克隆整個實驗過程。從原始 log 到最終整個實驗拉起訓練這塊,我們是希望能夠一鍵整體鏈路克隆,快速拉起一個全新的實驗鏈路。
- 第四是一些性能方面的優化,支持資源共享。
- 第五是支持特征回溯批流一體。很多特征的冷啟動需要計算歷史很長時間的數據,專門為冷啟動寫一套離線特征計算邏輯成本非常高,而且很難和實時特征計算結果對齊,我們支持直接在實時鏈路上來回溯離線特征。
基本架構
上圖是基本架構,最上面是業務,最下面是引擎。目前支持的引擎也比較多:Flink、spark、Hive、kafka、Hbase、Redis。其中有計算引擎,也有存儲引擎。以 aiflow 作為中間的工作流程管理,Flink 作為核心的計算引擎,來設計整個工流平臺。
工作流描述
整個工作流是用 Python 來描述的,在 python 里面用戶只需要定義計算節點和資源節點,以及這些節點之間的依賴關系即可,語法有點像調度框架 airflow。
依賴關系定義
批流的依賴關系主要有 4 種:流到批,流到流,批到流,批到批。基本可以滿足目前我們業務上的所有需求。
資源共享
資源共享主要是用來做性能方面,因為很多時候一個機器的學習鏈路非常長,比如剛剛那個圖里面我經常改動的可能只有五六個節點,當我想重新拉起整個實驗流程,把整個圖克隆一遍,中間我只需要改動其中的部分節點或者大部分節點,上游節點是可以做數據共享的。
這個是技術上的實現,克隆之后對共享節點做了一個狀態追蹤。
實時訓練
上圖是實時訓練的過程。特征穿越是一個非常常見的問題,多個計算任務的進度不一致時就會發生。在工作流平臺里面,我們定義好各個節點的依賴關系即可,一旦節點之間發生了依賴,處理進度就會進行同步,通俗來說就是快的等慢的,避免特征穿越。在 Flink 里面我們是使用 watermark 來定義處理進度。
特征回溯
上圖是特征回溯的過程,我們使用實時鏈路,直接去回溯它歷史數據。離線和實時數據畢竟不同,這中間有很多問題需要解決,因此也用到了 spark,后面這塊我們會改成 Flink。
特征回溯的問題
特征回溯有幾個比較大的問題:
- 第一是如何保證數據的順序性。實時數據有個隱含的語義就是數據是順序進來的,生產出來立馬處理,天然有一定的順序性。但是離線的 HDFS 不是,HDFS 是有分區的,分區內的數據完全亂序,實際業務里面大量計算過程是依賴時序的,如何解決離線數據的亂序是一個很大的問題。
- 第二是如何保證特征和樣本版本的一致性。比如有兩條鏈路,一條是特征的生產,一條是樣本生產,樣本生產依賴特征生產,如何保證它們之間版本的一致性,沒有穿越?
- 第三就是如何保證實時鏈路和回溯鏈路計算邏輯的一致?這個問題其實對我們來說不用擔心,我們是直接在實時鏈路上回溯離線數據。
- 第四是一些性能方面的問題,怎么快速得算完大量的歷史數據。
解決方案
以下是第一、第二個問題的解決方案:
- 第一個問題。為了數據的順序性,我們 HDFS 的離線數據進行 kafka 化處理,這里不是把它灌到 kafka 里面去,而是模擬 kafka 的數據架構,分區并且分區內有序,我們把 HDFS 數據也處理成類似的架構,模擬成邏輯上的分區,并且邏輯分區內有序,Flink 讀取的 hdfssource 也進行了對應的開發支持這種模擬的數據架構。這塊的模擬計算目前是使用 spark 做的,后面我們會改成 Flink。
第二個問題分為兩部分:
- 實時特征部分的解決依賴于 Hbase 存儲,Hbase 支持根據版本查詢。特征計算完后直接按照版本寫入 Hbase,樣本生成的時候去查 Hbase 帶上對應的版本號即可,這里面的版本通常是數據時間。
- 離線特征部分,因為不需要重新計算了,離線存儲 hdfs 都有,但是不支持點查,這塊進行 kv 化處理就好,為了性能我們做了異步預加載。
異步預加載的過程如圖。
四、未來規劃
接下來介紹下我們后面規劃。
- 一個是數據質量保證。現在整個鏈路越來越長,可能有 10 個節點、 20 個節點,那怎么在整個鏈路出問題的時候快速發現問題點。這里我們是想針對節點集來做 dpc,對每個節點我們可以自定義一些數據質量校驗規則,數據通過旁路到統一的 dqc-center 進行規則運算告警。
- 第二是全鏈路的 exactly once,工作流節點之間如何保證精確一致,這塊目前還沒有想清楚。
- 第三是我們會在工作流里面加入模型訓練和部署的節點。訓練和部署可以是連接到別的平臺,也可能是 Flink 本身支持的訓練模型和部署服務。
嘉賓介紹:張楊,17 年入職 b 站,從事大數據方面工作。
原文鏈接:https://developer.aliyun.com/article/784356?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的bilibili基于 Flink 的机器学习工作流平台在 b 站的应用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 回顾 | Apache Flink 1.
- 下一篇: Flink + Iceberg 在去哪儿