伴鱼:借助 Flink 完成机器学习特征系统的升级
簡介:?Flink 用于機(jī)器學(xué)習(xí)特征工程,解決了特征上線難的問題;以及 SQL + Python UDF 如何用于生產(chǎn)實(shí)踐。
本文作者陳易生,介紹了伴魚平臺(tái)機(jī)器學(xué)習(xí)特征系統(tǒng)的升級(jí),在架構(gòu)上,從 Spark 轉(zhuǎn)為 Flink,解決了特征上線難的問題,以及 SQL + Python UDF 如何用于生產(chǎn)實(shí)踐。 主要內(nèi)容為:
一、前言
在伴魚,我們?cè)诙鄠€(gè)在線場景使用機(jī)器學(xué)習(xí)提高用戶的使用體驗(yàn),例如:在伴魚繪本中,我們根據(jù)用戶的帖子瀏覽記錄,為用戶推薦他們感興趣的帖子;在轉(zhuǎn)化后臺(tái)里,我們根據(jù)用戶的繪本購買記錄,為用戶推薦他們可能感興趣的課程等。
特征是機(jī)器學(xué)習(xí)模型的輸入。如何高效地將特征從數(shù)據(jù)源加工出來,讓它能夠被在線服務(wù)高效地訪問,決定了我們能否在生產(chǎn)環(huán)境可靠地使用機(jī)器學(xué)習(xí)。為此,我們搭建了特征系統(tǒng),系統(tǒng)性地解決這一問題。目前,伴魚的機(jī)器學(xué)習(xí)特征系統(tǒng)運(yùn)行了接近 100 個(gè)特征,支持了多個(gè)業(yè)務(wù)線的模型對(duì)在線獲取特征的需求。
下面,我們將介紹特征系統(tǒng)在伴魚的演進(jìn)過程,以及其中的權(quán)衡考量。
二、舊版特征系統(tǒng) V1
特征系統(tǒng) V1 由三個(gè)核心組件構(gòu)成:特征管道,特征倉庫,和特征服務(wù)。整體架構(gòu)如下圖所示:
特征管道包括流特征管道和批特征管道,它們分別消費(fèi)流數(shù)據(jù)源和批數(shù)據(jù)源,對(duì)數(shù)據(jù)經(jīng)過預(yù)處理加工成特征 (這一步稱為特征工程),并將特征寫入特征倉庫。
- 批特征管道使用 Spark 實(shí)現(xiàn),由 DolphinScheduler 進(jìn)行調(diào)度,跑在 YARN 集群上;
- 出于技術(shù)棧的一致考慮,流特征管道使用 Spark Structured Streaming 實(shí)現(xiàn),和批特征管道一樣跑在 YARN 集群上。
特征倉庫選用合適的存儲(chǔ)組件 (Redis) 和數(shù)據(jù)結(jié)構(gòu) (Hashes),為模型服務(wù)提供低延遲的特征訪問能力。之所以選用 Redis 作為存儲(chǔ),是因?yàn)?#xff1a;
- 伴魚有豐富的 Redis 使用經(jīng)驗(yàn);
- 包括?DoorDash Feature Store和?Feast在內(nèi)的業(yè)界特征倉庫解決方案都使用了 Redis。
特征服務(wù)屏蔽特征倉庫的存儲(chǔ)和數(shù)據(jù)結(jié)構(gòu),對(duì)外暴露 RPC 接口?GetFeatures(EntityName, FeatureNames),提供對(duì)特征的低延遲點(diǎn)查詢。在實(shí)現(xiàn)上,這一接口基本對(duì)應(yīng)于 Redis 的?HMGET EntityName FeatureName_1 ... FeatureName_N?操作。
這一版本的特征系統(tǒng)存在幾個(gè)問題:
- 算法工程師缺少控制,導(dǎo)致迭代效率低。這個(gè)問題與系統(tǒng)涉及的技術(shù)棧和公司的組織架構(gòu)有關(guān)。在整個(gè)系統(tǒng)中,特征管道的迭代需求最高,一旦模型對(duì)特征有新的需求,就需要修改或者編寫一個(gè)新的 Spark 任務(wù)。而 Spark 任務(wù)的編寫需要有一定的 Java 或 Scala 知識(shí),不屬于算法工程師的常見技能,因此交由大數(shù)據(jù)團(tuán)隊(duì)全權(quán)負(fù)責(zé)。大數(shù)據(jù)團(tuán)隊(duì)同時(shí)負(fù)責(zé)多項(xiàng)數(shù)據(jù)需求,往往有很多排期任務(wù)。結(jié)果便是新特征的上線涉及頻繁的跨部門溝通,迭代效率低;
- 特征管道只完成了輕量的特征工程,降低在線推理的效率。由于特征管道由大數(shù)據(jù)工程師而非算法工程師編寫,復(fù)雜的數(shù)據(jù)預(yù)處理涉及更高的溝通成本,因此這些特征的預(yù)處理程度都比較輕量,更多的預(yù)處理被留到模型服務(wù)甚至模型內(nèi)部進(jìn)行,增大了模型推理的時(shí)延。
為了解決這幾個(gè)問題,特征系統(tǒng) V2 提出幾個(gè)設(shè)計(jì)目的:
- 將控制權(quán)交還算法工程師,提高迭代效率;
- 將更高權(quán)重的特征工程交給特征管道,提高在線推理的效率。
三、新版特征系統(tǒng) V2
特征系統(tǒng) V2 相比特征系統(tǒng) V1 在架構(gòu)上的唯一不同點(diǎn)在于,它將特征管道切分為三部分:特征生成管道,特征源,和特征注入管道。值得一提的是,管道在實(shí)現(xiàn)上均從 Spark 轉(zhuǎn)為 Flink,和公司數(shù)據(jù)基礎(chǔ)架構(gòu)的發(fā)展保持一致。特征系統(tǒng) V2 的整體架構(gòu)如下圖所示:
1. 特征生成管道
特征生成管道讀取原始數(shù)據(jù)源,加工為特征,并將特征寫入指定特征源 (而非特征倉庫)。
- 如果管道以流數(shù)據(jù)源作為原始數(shù)據(jù)源,則它是流特征生成管道;
- 如果管道以批數(shù)據(jù)源作為原始數(shù)據(jù)源,則它是批特征生成管道。
特征生成管道的邏輯由算法工程師全權(quán)負(fù)責(zé)編寫。其中,批特征生成管道使用 HiveQL 編寫,由 DolphinScheduler 調(diào)度。流特征生成管道使用 PyFlink 實(shí)現(xiàn),詳情見下圖:
算法工程師需要遵守下面步驟:
這一套流程確保了:
- 算法工程師掌握上線特征的自主權(quán);
- 平臺(tái)工程師把控特征生成管道的代碼質(zhì)量,并在必要時(shí)可以對(duì)它們實(shí)現(xiàn)重構(gòu),而無需算法工程師的介入。
2. 特征源
特征源存儲(chǔ)從原始數(shù)據(jù)源加工形成的特征。值得強(qiáng)調(diào)的是,它同時(shí)還是連接算法工程師和 AI 平臺(tái)工程師的橋梁。算法工程師只負(fù)責(zé)實(shí)現(xiàn)特征工程的邏輯,將原始數(shù)據(jù)加工為特征,寫入特征源,剩下的事情就交給 AI 平臺(tái)。平臺(tái)工程師實(shí)現(xiàn)特征注入管道,將特征寫入特征倉庫,以特征服務(wù)的形式對(duì)外提供數(shù)據(jù)訪問服務(wù)。
3. 特征注入管道
特征注入管道將特征從特征源讀出,寫入特征倉庫。由于 Flink 社區(qū)缺少對(duì) Redis sink 的原生支持,我們通過拓展?RichSinkFunction簡單地實(shí)現(xiàn)了?StreamRedisSink?和?BatchRedisSink,很好地滿足我們的需求。
其中,BatchRedisSink?通過?Flink Operator State?和?Redis Pipelining的簡單結(jié)合,大量參考 Flink 文檔中的?BufferingSink,實(shí)現(xiàn)了批量寫入,大幅減少對(duì) Redis Server 的請(qǐng)求量,增大吞吐,寫入效率相比逐條插入提升了 7 倍?。BatchRedisSink?的簡要實(shí)現(xiàn)如下。其中,flush?實(shí)現(xiàn)了批量寫入 Redis 的核心邏輯,checkpointedState?/?bufferedElements?/?snapshotState?/?initializeState?實(shí)現(xiàn)了使用 Flink 有狀態(tài)算子管理元素緩存的邏輯。
class BatchRedisSink(pipelineBatchSize: Int ) extends RichSinkFunction[(String, Timestamp, Map[String, String])]with CheckpointedFunction {@transientprivate var checkpointedState: ListState[(String, java.util.Map[String, String])] = _private val bufferedElements: ListBuffer[(String, java.util.Map[String, String])] =ListBuffer.empty[(String, java.util.Map[String, String])]private var jedisPool: JedisPool = _override def invoke(value: (String, Timestamp, Map[String, String]),context: SinkFunction.Context): Unit = {import scala.collection.JavaConverters._val (key, _, featureKVs) = valuebufferedElements += (key -> featureKVs.asJava)if (bufferedElements.size == pipelineBatchSize) {flush()}}private def flush(): Unit = {var jedis: Jedis = nulltry {jedis = jedisPool.getResourceval pipeline = jedis.pipelined()for ((key, hash) <- bufferedElements) {pipeline.hmset(key, hash)}pipeline.sync()} catch { ... } finally { ... }bufferedElements.clear()}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.clear()for (element <- bufferedElements) {checkpointedState.add(element)}}override def initializeState(context: FunctionInitializationContext): Unit = {val descriptor =new ListStateDescriptor[(String, java.util.Map[String, String])]("buffered-elements",TypeInformation.of(new TypeHint[(String, java.util.Map[String, String])]() {}))checkpointedState = context.getOperatorStateStore.getListState(descriptor)import scala.collection.JavaConverters._if (context.isRestored) {for (element <- checkpointedState.get().asScala) {bufferedElements += element}}}override def open(parameters: Configuration): Unit = {try {jedisPool = new JedisPool(...)} catch { ... }}override def close(): Unit = {flush()if (jedisPool != null) {jedisPool.close()}} }特征系統(tǒng) V2 很好地滿足了我們提出的設(shè)計(jì)目的。
- 由于特征生成管道的編寫只需用到 SQL 和 Python 這兩種算法工程師十分熟悉的工具,因此他們?nèi)珯?quán)負(fù)責(zé)特征生成管道的編寫和上線,無需依賴大數(shù)據(jù)團(tuán)隊(duì),大幅提高了迭代效率。在熟悉后,算法工程師通常只需花費(fèi)半個(gè)小時(shí)以內(nèi),就可以完成流特征的編寫、調(diào)試和上線。而這個(gè)過程原本需要花費(fèi)數(shù)天,取決于大數(shù)據(jù)團(tuán)隊(duì)的排期;
- 出于同樣的原因,算法工程師可以在有需要的前提下,完成更重度的特征工程,從而減少模型服務(wù)和模型的負(fù)擔(dān),提高模型在線推理效率。
四、總結(jié)
特征系統(tǒng) V1 解決了特征上線的問題,而特征系統(tǒng) V2 在此基礎(chǔ)上,解決了特征上線難的問題。在特征系統(tǒng)的演進(jìn)過程中,我們總結(jié)出作為平臺(tái)研發(fā)的幾點(diǎn)經(jīng)驗(yàn):
- 平臺(tái)應(yīng)該提供用戶想用的工具。這與 Uber ML 平臺(tái)團(tuán)隊(duì)在內(nèi)部推廣的經(jīng)驗(yàn)相符。算法工程師在 Python 和 SQL 環(huán)境下工作效率最高,而不熟悉 Java 和 Scala。那么,想讓算法工程師自主編寫特征管道,平臺(tái)應(yīng)該支持算法工程師使用 Python 和 SQL 編寫特征管道,而不是讓算法工程師去學(xué) Java 和 Scala,或是把工作轉(zhuǎn)手給大數(shù)據(jù)團(tuán)隊(duì)去做;
- 平臺(tái)應(yīng)該提供易用的本地調(diào)試工具。我們提供的 Docker 環(huán)境封裝了 Kafka 和 Flink,讓用戶可以在本地快速調(diào)試 PyFlink 腳本,而無需等待管道部署到測試環(huán)境后再調(diào)試;
- 平臺(tái)應(yīng)該在鼓勵(lì)用戶自主使用的同時(shí),通過自動(dòng)化檢查或代碼審核等方式牢牢把控質(zhì)量。
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。?
總結(jié)
以上是生活随笔為你收集整理的伴鱼:借助 Flink 完成机器学习特征系统的升级的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聚焦业务价值:分众传媒在 Serverl
- 下一篇: 阿里云马涛:什么是操作系统的云原生?