Pick!闲鱼亿级商品库中的秒级实时选品
一、業(yè)務(wù)背景
在電商運營工作中,營銷活動是非常重要的部分,對用戶增長和GMV都有很大幫助。對電商運營來說,如何從龐大的商品庫中篩選出賣家優(yōu)質(zhì)商品并推送給有需要的買家購買是每時每刻都要思索的問題,而且這個過程需要盡可能快和實時。保證快和實時就可以提升買賣雙方的用戶體驗,提高用戶粘性。
二、實時選品
為了解決上面提到的問題,閑魚研發(fā)了馬赫系統(tǒng)。馬赫是一個實時高性能的商品選品系統(tǒng),解決在億級別商品中通過規(guī)則篩選優(yōu)質(zhì)商品并進行投放的場景。有了馬赫系統(tǒng)之后,閑魚的運營同學可以在馬赫系統(tǒng)上創(chuàng)建篩選規(guī)則,比如商品標題包含“小豬佩奇”、類目為“玩具”、價格不超過100元且商品狀態(tài)為未賣出。在運營創(chuàng)建規(guī)則后,馬赫系統(tǒng)會同時進行兩步操作,第一步是從存量商品數(shù)據(jù)篩選符合條件的商品進行打標;第二步是對商品實時變更進行規(guī)則計算,實時同步規(guī)則命中結(jié)果。
馬赫系統(tǒng)最大的特點是快而實時,體現(xiàn)在命中規(guī)模為100w的規(guī)則可以在10分鐘之內(nèi)完成打標;商品本身變更導致的規(guī)則命中結(jié)果同步時間為1秒鐘。運營可以通過馬赫系統(tǒng)快速篩選商品向用戶投放,閑魚的流量也可以精準投給符合條件的商品并且將流量利用到最大化。
那么馬赫系統(tǒng)是如何解決這一典型的電商問題的呢,馬赫系統(tǒng)和流計算有什么關(guān)系呢,這是下面要詳細說明的部分。
三、流計算
流計算是持續(xù)、低延遲、事件觸發(fā)的數(shù)據(jù)處理模型。流計算模型是使用實時數(shù)據(jù)集成工具,將數(shù)據(jù)實時變化傳輸?shù)搅魇綌?shù)據(jù)存儲,此時數(shù)據(jù)的傳輸變成實時化,將長時間累積大量的數(shù)據(jù)平攤到每個時間點不停地小批量實時傳輸;流計算會將計算邏輯封裝為常駐計算服務(wù),一旦啟動就一直處于等待事件觸發(fā)狀態(tài),當有數(shù)據(jù)流入后會觸發(fā)計算迅速得到結(jié)果;當流計算得到計算結(jié)果后可以立刻將數(shù)據(jù)輸出,無需等待整體數(shù)據(jù)的計算結(jié)果。
閑魚實時選品系統(tǒng)使用的流計算框架是Blink,Blink是阿里巴巴基于開源流計算框架Flink定制研發(fā)的企業(yè)級流計算框架,可以認為是Flink的加強版,現(xiàn)在已經(jīng)開源。Flink是一個高吞吐、低延遲的計算引擎,同時還提供很多高級功能。比如它提供有狀態(tài)的計算,支持狀態(tài)管理,支持強一致性的數(shù)據(jù)語義以及支持Event Time,WaterMark對消息亂序的處理等特性,為閑魚實時選品系統(tǒng)的超低延時選品提供了有力支持。
3.1、Blink之State
State是指流計算過程中計算節(jié)點的中間計算結(jié)果或元數(shù)據(jù)屬性,比如在aggregation過程中要在state中記錄中間聚合結(jié)果,比如Apache Kafka作為數(shù)據(jù)源時候,我們也要記錄已經(jīng)讀取記錄的offset,這些State數(shù)據(jù)在計算過程中會進行持久化(插入或更新)。所以Blink中的State就是與時間相關(guān)的,Blink任務(wù)的內(nèi)部數(shù)據(jù)(計算數(shù)據(jù)和元數(shù)據(jù)屬性)的快照。
馬赫系統(tǒng)會在State中保存商品合并之后的全部數(shù)據(jù)和規(guī)則運行結(jié)果數(shù)據(jù)。當商品發(fā)生變更后,馬赫系統(tǒng)會將商品變更信息與State保存的商品信息進行合并,并將合并的信息作為入?yún)⑦\行所有規(guī)則,最后將規(guī)則運行結(jié)果與State保存的規(guī)則運行結(jié)果進行Diff后得到最終有效的運行結(jié)果。所以Blink的State特性是馬赫系統(tǒng)依賴的關(guān)鍵特性。
3.2、Blink之Window
Blink的Window特性特指流計算系統(tǒng)特有的數(shù)據(jù)分組方式,Window的創(chuàng)建是數(shù)據(jù)驅(qū)動的,也就是說,窗口是在屬于此窗口的第一個元素到達時創(chuàng)建。當窗口結(jié)束時候刪除窗口及狀態(tài)數(shù)據(jù)。Blink的Window主要包括兩種,分別為滾動窗口(Tumble)和滑動窗口(Hop)。
滾動窗口有固定大小,在每個窗口結(jié)束時進行一次數(shù)據(jù)計算,也就是說滾動窗口任務(wù)每經(jīng)過一次固定周期就會進行一次數(shù)據(jù)計算,例如每分鐘計算一次總量。
滑動窗口與滾動窗口類似,窗口有固定的size,與滾動窗口不同的是滑動窗口可以通過slide參數(shù)控制滑動窗口的新建頻率。因此當slide值小于窗口size的值的時候多個滑動窗口會重疊,此時數(shù)據(jù)會被分配給多個窗口,如下圖所示:
Blink的Window特性在數(shù)據(jù)計算統(tǒng)計方面有很多使用場景,馬赫系統(tǒng)主要使用窗口計算系統(tǒng)處理數(shù)據(jù)的實時速度和延時,用來進行數(shù)據(jù)統(tǒng)計和監(jiān)控告警。
3.3、Blink之UDX
UDX是Blink中用戶自定義函數(shù),可以在任務(wù)中調(diào)用以實現(xiàn)一些定制邏輯。Blink的UDX包括三種,分別為:
- UDF - User-Defined Scalar Function
UDF是最簡單的自定義函數(shù),輸入是一行數(shù)據(jù)的任意字段,輸出是一個字段,可以實現(xiàn)數(shù)據(jù)比較、數(shù)據(jù)轉(zhuǎn)換等操作。 - UDTF - User-Defined Table-Valued Function
UDTF 是表值函數(shù),每個輸入(單column或多column)返回N(N>=0)Row數(shù)據(jù),Blink框架提供了少量的UDTF,比如:STRING_SPLIT,JSON_TUPLE和GENERATE_SERIES3個built-in的UDTF。 - UDAF - User-Defined Aggregate Function
UDAF是聚合函數(shù),輸入是多行數(shù)據(jù),輸出是一個字段。Blink框架Built-in的UDAF包括MAX,MIN,AVG,SUM,COUNT等,基本滿足了80%常用的集合場景,但仍有一定比例的復雜業(yè)務(wù)場景,需要定制自己的聚合函數(shù)。
馬赫系統(tǒng)中使用了大量的UDX進行邏輯定制,包括消息解析、數(shù)據(jù)處理等。而馬赫系統(tǒng)最核心的商品數(shù)據(jù)合并、規(guī)則運行和結(jié)果Diff等流程就是通過UDAF實現(xiàn)的。
四、秒級選品方案
選品系統(tǒng)在項目立項后也設(shè)計有多套技術(shù)方案。經(jīng)過多輪討論后,最終決定對兩套方案實施驗證后決定最終實現(xiàn)方案。
第一套方案是基于PostgreSQL的方案,PostgreSQL可以很便捷的定義Function進行數(shù)據(jù)合并操作,在PostgreSQL的trigger上定義執(zhí)行規(guī)則邏輯。基于PostgreSQL的技術(shù)實現(xiàn)較復雜,但能滿足功能需求。不過性能測試結(jié)果顯示PostgreSQL處理小數(shù)據(jù)量(百萬級)性能較好;當trigger數(shù)量多、trigger邏輯復雜或處理億級別數(shù)據(jù)時,PostgreSQL的性能會有較大下滑,不能滿足秒級選品的性能指標。因此基于PostgreSQL的方案被否決(在閑魚小商品池場景中仍在使用)。
第二套方案是基于Blink流計算方案,通過驗證發(fā)現(xiàn)Blink SQL很適合用來表達數(shù)據(jù)處理邏輯而且Blink性能很好,綜合對比之后最終選擇Blink流計算方案作為實際實施的技術(shù)方案。
為了配合使用流計算方案,馬赫系統(tǒng)經(jīng)過設(shè)計和解耦,無縫對接Blink計算引擎。其中數(shù)據(jù)處理模塊是馬赫系統(tǒng)核心功能模塊,負責接入商品相關(guān)各類數(shù)據(jù)、校驗數(shù)據(jù)、合并數(shù)據(jù)、執(zhí)行規(guī)則和處理執(zhí)行結(jié)果并輸出等步驟,所以數(shù)據(jù)處理模塊的處理速度和延時在很大程度上能代表馬赫系統(tǒng)數(shù)據(jù)處理速度和延時。接下來我們看下數(shù)據(jù)處理模塊如何與Blink深度結(jié)合將數(shù)據(jù)處理延遲降到秒級。
數(shù)據(jù)處理模塊結(jié)構(gòu)如上圖,包含數(shù)據(jù)接入層、數(shù)據(jù)合并層、規(guī)則運行層和規(guī)則運行結(jié)果處理層。每層都針對流計算處理模式進行了單獨設(shè)計。
4.1、數(shù)據(jù)接入層
數(shù)據(jù)接入層是數(shù)據(jù)處理模塊前置,負責對接多渠道各種類型的業(yè)務(wù)數(shù)據(jù),主要邏輯如下:
- 數(shù)據(jù)接入層對接多個渠道多種類型的業(yè)務(wù)數(shù)據(jù);
- 解析業(yè)務(wù)數(shù)據(jù)并做簡單校驗;
- 統(tǒng)計各渠道業(yè)務(wù)數(shù)據(jù)量級并進行監(jiān)控,包括總量和同比變化量;
- 通過元數(shù)據(jù)中心獲取字段級別的Metadata配置。元數(shù)據(jù)中心是用來保存和管理所有字段的MetaData配置信息組件。Metadata配置代表字段元數(shù)據(jù)配置,包括字段值類型,值范圍和值格式等基礎(chǔ)信息;
- 根據(jù)Metadata配置進行字段級別數(shù)據(jù)校驗;
- 按照馬赫定義的標準數(shù)據(jù)范式組裝數(shù)據(jù)。
這樣設(shè)計的考慮是因為業(yè)務(wù)數(shù)據(jù)是多種多樣的,比如商品信息包括數(shù)據(jù)庫的商品表記錄、商品變更的MQ消息和算法產(chǎn)生的離線數(shù)據(jù),如果直接通過Blink對接這些業(yè)務(wù)數(shù)據(jù)源的話,需要創(chuàng)建多個Blink任務(wù)來對接不同類型業(yè)務(wù)數(shù)據(jù)源,這種處理方式太重,而且數(shù)據(jù)接入邏輯與Blink緊耦合,不夠靈活。
數(shù)據(jù)接入層可以很好的解決上述問題,數(shù)據(jù)接入層可以靈活接入多種業(yè)務(wù)數(shù)據(jù),并且將數(shù)據(jù)接入與Blink解耦,最終通過同一個Topic發(fā)出消息。而Blink任務(wù)只要監(jiān)聽對應(yīng)的Topic就可以連續(xù)不斷的收到業(yè)務(wù)數(shù)據(jù)流,觸發(fā)接下來的數(shù)據(jù)處理流程。
4.2、數(shù)據(jù)合并層
數(shù)據(jù)合并是數(shù)據(jù)處理流程的重要步驟,數(shù)據(jù)合并的主要作用是將商品的最新信息與內(nèi)存中保存的商品信息合并供后續(xù)規(guī)則運行使用。數(shù)據(jù)合并主要邏輯是:
- 監(jiān)聽指定消息隊列Topic,獲取業(yè)務(wù)數(shù)據(jù)消息;
- 解析消息,并將消息內(nèi)容按照字段重新組裝數(shù)據(jù),格式為{key:[timestamp, value]},key是字段名稱,value是字段值,timestamp為字段數(shù)據(jù)產(chǎn)生時間戳;
- 將組裝后的數(shù)據(jù)和內(nèi)存中保存的歷史數(shù)據(jù)根據(jù)timestamp進行字段級別數(shù)據(jù)合并,合并算法為比較timestamp大小取最新字段值,具體邏輯見下圖。
數(shù)據(jù)合并有幾個前提:
這個是Blink提供的特性,Blink可以將任務(wù)運行過程中產(chǎn)生的存量數(shù)據(jù)保存在內(nèi)存中,在下一次運行時從內(nèi)存中取出繼續(xù)處理。
這點需要一個巧妙設(shè)計:商品信息有很多字段,每個字段的值是數(shù)組,不僅要記錄實際值,還要記錄當前值的修改時間戳。在合并商品信息時,按照字段進行合并,合并規(guī)則是取時間戳最大的值為準。
舉例來說,內(nèi)存中保存的商品ID=1的信息是{"desc": [1, "描述1"], "price": [4, 100.5]},數(shù)據(jù)流中商品ID=1的信息是{"desc": [2, "描述2"], "price": [3, 99.5]},那么合并結(jié)果就是{"desc": [2, "描述2"], "price": [4, 100.5]},每個字段的值都是最新的,代表商品當前最新信息。
當商品信息發(fā)生變化后,最新數(shù)據(jù)由數(shù)據(jù)接入層流入,通過數(shù)據(jù)合并層將數(shù)據(jù)合并到內(nèi)存,Blink內(nèi)存中保存的是商品當前最新的全部數(shù)據(jù)。
4.3、規(guī)則運行層
規(guī)則運行層是數(shù)據(jù)處理流程核心模塊,通過規(guī)則運算得出商品對各規(guī)則命中結(jié)果,邏輯如下:
- 規(guī)則運行層接受輸入為經(jīng)過數(shù)據(jù)合并后的數(shù)據(jù);
- 通過元數(shù)據(jù)中心獲取字段級別Metadata配置;
- 根據(jù)字段Metadata配置解析數(shù)據(jù);
- 通過規(guī)則中心獲取有效規(guī)則列表,規(guī)則中心是指創(chuàng)建和管理規(guī)則生命周期的組件;
- 循環(huán)規(guī)則列表,運行單項規(guī)則,將規(guī)則命中結(jié)果保存在內(nèi)存;
- 記錄運行規(guī)則拋出異常的數(shù)據(jù),并進行監(jiān)控告警。
這里的規(guī)則指的是運營創(chuàng)建的業(yè)務(wù)規(guī)則,比如商品價格大于50且狀態(tài)為在線。規(guī)則的輸入是經(jīng)過數(shù)據(jù)合并后的商品數(shù)據(jù),輸出是true或false,即是否命中規(guī)則條件。規(guī)則代表的是業(yè)務(wù)投放場景,馬赫系統(tǒng)的業(yè)務(wù)價值就是在商品發(fā)生變更后盡快判斷是否命中之前未命中的規(guī)則或是不命中之前已經(jīng)命中的規(guī)則,并將命中和不命中結(jié)果盡快體現(xiàn)到投放場景中。
規(guī)則運行需利用Blink強大算力來保證快速執(zhí)行,馬赫系統(tǒng)當前有將近300條規(guī)則,而且還在快速增長。這意味著每個商品發(fā)生變更后要在Blink上運行成百上千條規(guī)則,閑魚每天有上億商品發(fā)生變更,這背后需要的運算量是非常驚人的。
4.4、運行結(jié)果處理層
讀者讀到這里可能會奇怪,明明經(jīng)過規(guī)則運行之后直接把運行結(jié)果輸出到投放場景就可以了,不需要運行結(jié)果處理層。實際上運行結(jié)果處理層是數(shù)據(jù)處理模塊最重要的部分。
因為在實際場景中,商品的變更在大部分情況只會命中很少一部分規(guī)則,而且命中結(jié)果也很少會變化。也就是說商品對很多規(guī)則的命中結(jié)果是沒有意義的,如果將這些命中結(jié)果也輸出的話,只會增加操作TPS,對實際結(jié)果沒有任何幫助。而篩選出有效的運行結(jié)果,這就是運行結(jié)果處理層的作用。運行結(jié)果處理層邏輯如下:
- 獲取商品數(shù)據(jù)的規(guī)則運行結(jié)果;
- 按照是否命中規(guī)則解析運行結(jié)果;
將運行結(jié)果與內(nèi)存中保存的歷史運行結(jié)果進行diff,diff作用是排除新老結(jié)果中相同的命中子項,邏輯見下圖。
運行結(jié)果處理層利用Blink內(nèi)存保存商品上一次變更后規(guī)則運行結(jié)果,并將當前變更后規(guī)則運行結(jié)果與內(nèi)存中結(jié)果進行比較,計算出有效運行結(jié)果。舉例來說,商品A上一次變更后規(guī)則命中結(jié)果為{"rule1":true, "rule2":true, "rule3":false, "rule4":false},當前變更后規(guī)則命中結(jié)果為{"rule1":true, "rule2":false, "rule3":false, "rule4":true}。因為商品A變更后對rule1和rule3的命中結(jié)果沒有變化,所以實際有效的命中結(jié)果是{"rule2":false, "rule4":true},通過運行結(jié)果處理層處理后輸出的是有效結(jié)果的最小集,可以極大減小無效結(jié)果輸出,提高數(shù)據(jù)處理的整體性能和效率。
4.5、難點解析
雖然閑魚實時選品系統(tǒng)在立項之初經(jīng)過預研和論證,但因為使用很多新技術(shù)框架和流計算思路,在開發(fā)過程中遇到一些難題,包括設(shè)計和功能實現(xiàn)方面的,很多是設(shè)計流計算系統(tǒng)的典型問題。我們就其中一個問題與各位讀者探討-規(guī)則公式轉(zhuǎn)換。
4.5.1、規(guī)則公式轉(zhuǎn)換
這個問題的業(yè)務(wù)場景是:運營同學在馬赫系統(tǒng)頁面上篩選商品字段后保存規(guī)則,服務(wù)端是已有的老系統(tǒng),邏輯是根據(jù)規(guī)則生成一段SQL,SQL的where條件和運營篩選條件相同。SQL有兩方面的作用,一方面是作為離線規(guī)則,在離線數(shù)據(jù)庫中執(zhí)行SQL篩選符合規(guī)則的離線商品數(shù)據(jù);另一方面是轉(zhuǎn)換成在線規(guī)則,在Blink任務(wù)中對實時商品變更數(shù)據(jù)執(zhí)行規(guī)則以判斷是否命中。
因為實時規(guī)則運行使用的是MVEL表達式引擎,MVEL表達式是類Java語法的,所以問題就是將離線規(guī)則的SQL轉(zhuǎn)換成在線規(guī)則的Java表達式,兩者邏輯需一致,并且需兼顧性能和效率。問題的解決方案很明確,解析SQL后將SQL操作符轉(zhuǎn)換成Java操作符,并將SQL特有語法轉(zhuǎn)成Java語法,例如A like '%test%'轉(zhuǎn)成A.contains('test')。
這個問題的難點是如何解析SQL和將解析后的語義轉(zhuǎn)成Java語句。經(jīng)過調(diào)研之后給出了簡單而優(yōu)雅的解決方案,主要步驟如下:
- 使用Druid框架解析SQL語句,轉(zhuǎn)成一個二叉樹,單獨取出其中的where條件子樹;
-
通過后序遍歷算法遍歷where條件子樹;
- 將SQL操作符換成對應(yīng)的Java操作符;
目前支持且、或、等于、不等于、大于、大于等于、小于、小于等于、like、not like和in等操作。 - 將SQL語法格式轉(zhuǎn)成Java語法;
將in語法改成Java的或語法,例如A in ('hello', 'world')轉(zhuǎn)成(A == 'hello') || (A == 'world')。
- 將SQL操作符換成對應(yīng)的Java操作符;
實際運行結(jié)果如下:
代碼邏輯如下(主要是二叉樹后續(xù)遍歷和操作符轉(zhuǎn)換,不再詳細解釋):
五、結(jié)論
馬赫系統(tǒng)上線以來,已經(jīng)支持近400場活動和投放場景,每天處理近1.4億條消息,峰值TPS達到50000。馬赫系統(tǒng)已經(jīng)成為閑魚選品投放的重要支撐。
本文主要闡述馬赫系統(tǒng)中數(shù)據(jù)處理的具體設(shè)計方案,說明整體設(shè)計的來龍去脈。雖然閑魚實時選品系統(tǒng)針對的是商品選品,但數(shù)據(jù)處理流計算技術(shù)方案的輸入是MQ消息,輸出也是MQ消息,不與具體業(yè)務(wù)綁定,所以數(shù)據(jù)處理流計算技術(shù)方案不只適用于商品選品,也適合其他類似實時篩選業(yè)務(wù)場景。希望我們的技術(shù)方案和設(shè)計思路能給你帶來一些想法和思考,也歡迎和我們留言討論,謝謝。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Pick!闲鱼亿级商品库中的秒级实时选品的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据不足,如何进行迁移学习?
- 下一篇: 应用监控的选型思考