日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

千万数据去重_基于 Flink 的百亿数据去重实践

發布時間:2024/3/12 编程问答 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 千万数据去重_基于 Flink 的百亿数据去重实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在工作中經常會遇到去重的場景,例如基于 App 的用戶行為日志分析系統,用戶的行為日志從手機客戶端上報到 Nginx 服務端,通過 Logstash、Flume 或其他工具將日志從 Nginx 寫入到 Kafka 中。

由于用戶手機客戶端的網絡可能出現不穩定,所以手機客戶端上傳日志的策略是:寧可重復上報,也不能丟日志。所以導致 Kafka 中必然會出現日志重復的情況,即:同一條日志出現了 2 條或 2 條以上。

通常情況下,Flink 任務的數據源都是 Kafka,若 Kafka 中數據出現了重復,在實時 ETL 或者流計算時都需要考慮對日志主鍵進行去重,否則會導致流計算結果偏高或結果不準確的問題,例如用戶 a 在某個頁面只點擊了一次,但由于日志重復上報,所以用戶 a 在該頁面的點擊日志在 Kafka 中出現了 2 次,最后統計該頁面的 click 數時,結果就會偏高。

這里只闡述了一種可能造成 Kafka 中數據重復的情況,在生產環境中很多情況都可能造成 Kafka 中數據重復,這里不一一列舉,本節主要講述出現了數據重復后,該如何處理。

實現去重的通用解決方案

Kafka 中數據出現重復后,各種解決方案都比較類似,一般需要一個全局 set 集合來維護歷史所有數據的主鍵。當處理新日志時,需要拿到當前日志的主鍵與歷史數據的 set 集合按照規則進行比較,若 set 集合中已經包含了當前日志的主鍵,說明當前日志在之前已經被處理過了,則當前日志應該被過濾掉,否則認為當前日志不應該被過濾應該被處理,而且處理完成后需要將新日志的主鍵加入到 set 集合中,set 集合永遠存放著所有已經被處理過的數據。程序流程圖如下圖所示:

image

處理流程很簡單,關鍵在于如何維護這個 set 集合,可以簡單估算一下這個 set 集合需要占用多大空間。本小節要解決的問題是百億數據去重,所以就按照每天 1 百億的數據量來計算。

由于每天數據量巨大,因此主鍵占用空間通常會比較大,如果主鍵占用空間小意味著表示的數據范圍就比較小,就可能導致主鍵沖突,例如:4 個字節的 int 類型表示數據范圍是為 -2147483648~2147483647,總共可以表示 42 億個數,如果這里每天百億的數據量選用 int 類型做為主鍵的話,很明顯會有大量的主鍵發生沖突,會將不重復的數據認為是發生了重復。

用戶的行為日志是在手機客戶端生成的,沒有全局發號器,一般會選取 UUID 做為日志的主鍵,UUID 會生成 36 位的字符串,例如:"f106c4a1-4c6f-41c1-9d30-bbb2b271284a"。每個主鍵占用 36 字節,每天 1 百億數據,36 字節 * 100億 ≈ 360GB。這僅僅是一天的數據量,所以該 set 集合要想存儲空間不發生持續地爆炸式增長,必須增加一個功能,那就是給所有的主鍵增加 ttl(Time To Live的縮寫,即:過期時間)。

如果不增加 ttl,10 天數據量的主鍵占用空間就 3.6T,100 天數據量的主鍵占用空間 36T,所以在設計之初必須考慮為主鍵設定 ttl。如果要求按天進行去重或者認為日志發生重復上報的時間間隔不可能大于 24 小時,那么為了系統的可靠性 ttl 可以設置為 36 小時。每天數據量 1 百億,且 set 集合中存放著 36 小時的數據量,即 100 億 * 1.5 = 150 億,所以 set 集合中需要維護 150 億的數據量。

且 set 集合中每條數據都增加了 ttl,意味著 set 集合需要為每條數據再附帶保存一個時間戳,來確定該數據什么時候過期。例如 Redis 中為一個 key 設置了 ttl,如果沒有為這個 key 附帶時間戳,那么根本無法判斷該 key 什么時候應該被清理。所以在考慮每條數據占用空間時,不僅要考慮數據本身,還需要考慮是否需要其他附帶的存儲。主鍵本身占用 36 字節加上 long 類型的時間戳 8 字節,所以每條數據至少需要占用 44 字節,150 億 * 44 字節 = 660GB。所以每天百億的數據量,如果我們使用 set 集合的方案來實現,至少需要占用 660GB 以上的存儲空間。

使用 BloomFilter 來實現去重

有些流計算的場景對準確性要求并不是很高,例如傳統的 Labmda 架構中,都會有離線去矯正實時計算的結果,所以根據業務場景,當業務要求可以接受結果有小量誤差時,可以選擇使用一些低成本的數據結構。BloomFilter 和 HyperLogLog 都是相對低成本的數據結構,分別有自己的應用場景,且兩種數據結構都有一定誤差。

HyperLogLog 可以估算出 HyperLogLog 中插入了多少個不重復的元素,而不能告訴我們之前是否插入了哪些元素。BloomFilter 則恰好相反,比起 BloomFilter 更像是一個 set 集合,BloomFilter 可以告訴你 BloomFilter 中肯定不包含元素 a,或者告訴你 BloomFilter 中可能包含元素 b,但 BloomFilter 不能告訴你 BloomFilter 中插入了多少個元素。接下來了解一下 BloomFilter 的實現原理。

bitmap 位圖

了解 BloomFilter,從 bitmap(位圖)開始說起。現在有 1 千萬個整數,數據范圍在 0 到 2 千萬之間。如何快速查找某個整數是否在這 1 千萬個整數中呢?可以將這 1 千萬個數保存在 HashMap 中,不考慮對象頭及其他空間,1000 萬個 int 類型數據需要占用大約 1000 萬 * 4 字節 ≈ 40MB 存儲空間。有沒有其他方案呢?因為數據范圍是 0 到 2 千萬,所以如下圖所示,可以申請一個長度為 2000 萬、boolean 類型的數組。將這 1 千萬個整數作為數組下標,將其對應的數組值設置成 true,如下圖所示,數組下標為 2、666、999 的位置存儲的數據為 true,表示 1 千萬個數中包含了 2、666、999 等。當查詢某個整數 K 是否在這 1 千萬個整數中時,只需要將對應的數組值 array[K] 取出來,看是否等于 true。如果等于 true,說明 1 千萬整數中包含這個整數 K,否則表示不包含這個整數 K。

image

Java 的 boolean 基本類型占用一個字節(8bit)的內存空間,所以上述方案需要申請 2000 萬字節。如下圖所示,可以通過編程語言用二進制位來模擬布爾類型,二進制的 1 表示 true、二進制的 0 表示 false。通過二進制模擬布爾類型的方案,只需要申請 2000 萬 bit 即可,相比 boolean 類型而言,存儲空間占用僅為原來的 1/8。2000 萬 bit ≈ 2.4MB,相比存儲原始數據的方案 40 MB 而言,占用的存儲空間少了很多。

image

假如這 1 千萬個整數的數據范圍是 0 到 100 億,那么就需要申請 100 億個 bit 約等于 1200MB,比存儲原始數據方案的 40MB 還要大很多。該情況下,直接使用位圖使用的存儲空間更多了,怎么解決呢?可以只申請 1 億 bit 的存儲空間,對 1000 萬個數求hash,映射到 1 億的二進制位上,最后大約占用 12 MB 的存儲空間,但是可能存在 hash 沖突的情況。例如 3 和 100000003(一億零三)這兩個數對一億求余都為 3,所以映射到長度為 1 億的位圖上,這兩個數會占用同一個 bit,就會導致一個問題:1 千萬個整數中包含了一億零三,所以位圖中下標為 3 的位置存儲著二進制 1。當查詢 1 千萬個整數中是否包含數字 3 時,同樣也是去位圖中下標 3 的位置去查找,發現下標為 3 的位置存儲著二進制 1,所以誤以為 1 千萬個整數中包含數字 3。為了減少 hash 沖突,于是誕生了 BloomFilter。

BloomFilter 原理介紹

hash 存在 hash 沖突(碰撞)的問題,兩個不同的 key 通過同一個 hash 函數得到的值有可能相同。為了減少沖突,可以多引入幾個 hash 函數,如果通過其中的一個 hash 函數發現某元素不在集合中,那么該元素肯定不在集合中。當所有的 hash 函數告訴我們該元素在集合中時,才能確定該元素存在于集合中,這便是BloomFilter的基本思想。

如下圖所示,是往 BloomFilter 中插入元素 a、b 的過程,有 3 個 hash 函數,元素 a 經過 3 個 hash 函數后對應的 2、8、10 這三個二進制位,所以將這三個二進制位置為 1,元素 b 經過 3 個 hash 函數后,對應的 5、10、14 這三個二進制位,將這三個二進制位也置為 1,其中下標為 10 的二進制位被 a、b 元素都涉及到。

image

如下圖所示,是從 BloomFilter 中查找元素 c、d 的過程,同樣包含了 3 個 hash 函數,元素 c 經過 3 個 hash 函數后對應的 2、6、9 這三個二進制位,其中下標 6 和 9 對應的二進制位為 0,所以會認為 BloomFilter 中不存在元素 c。元素 d 經過 3 個 hash 函數后對應的 5、8、14 這三個二進制位,這三個位對應的二進制位都為 1,所以會認為 BloomFilter 中存在元素 d,但其實 BloomFilter 中并不存在元素 d,是因為元素 a 和元素 b 也對應到了 5、8、14 這三個二進制位上,所以 BloomFilter 會有誤判。但是從實現原理來看,當 BloomFilter 告訴你不包含元素 c 時,BloomFilter 中肯定不包含元素 c,當 BloomFilter 告訴你 BloomFilter 中包含元素 d 時,它只是可能包含,也有可能不包含。

image

使用 BloomFilter 實現數據去重

Redis 4.0 之后 BloomFilter 以插件的形式加入到 Redis 中,關于 api 的具體使用這里不多贅述。BloomFilter 在創建時支持設定一個預期容量和誤判率,預期容量即預計插入的數據量,誤判率即:當 BloomFilter 中插入的數據達到預期容量時,誤判的概率,如果 BloomFilter 中插入數據較少的話,誤判率會更低。

經筆者測試,申請一個預期容量為 10 億,誤判率為千分之一的 BloomFilter,BloomFilter 會申請約 143 億個 bit,即:14G左右,相比之前 660G 的存儲空間小太多了。但是在使用過程中,需要記錄 BloomFilter 中插入元素的個數,當插入元素個數達到 10 億時,為了保障誤差率,可以將當前 BloomFilter 清除,重新申請一個新的 BloomFilter。

通過使用 Redis 的 BloomFilter,我們可以通過相對較小的內存實現百億數據的去重,但是 BloomFilter 有誤差,所以只能使用在那些對結果能承受一定誤差的應用場景,對于廣告計費等對數據精度要求非常高的場景,極力推薦大家使用精準去重的方案來實現。

使用 HBase 維護全局 set 實現去重

通過之前分析,我們知道要想實現百億數據量的精準去重,需要維護 150 億數據量的 set 集合,每條數據占用 44 KB,總共需要 660 GB 的存儲空間。注意這里說的是存儲空間而不是內存空間,為什么呢?因為 660G 的內存實在是太貴了,660G 的 Redis 云服務一個月至少要 2 萬 RMB 以上,俗話說設計架構不考慮成本等于耍流氓。這里使用 Redis 確實可以解決問題,但是成本較高。HBase 基于 rowkey Get 的效率比較高,所以這里可以考慮將這個大的 set 集合以 HBase rowkey 的形式存放到 HBase 中。HBase 表設置 ttl 為 36 小時,最近 36 小時的 150 億條日志的主鍵都存放到 HBase 中,每來一條數據,先拿到主鍵去 HBase 中查詢,如果 HBase 表中存在該主鍵,說明當前日志已經被處理過了,當前日志應該被過濾。如果 HBase 表中不存在該主鍵,說明當前日志之前沒有被處理過,此時應該被處理,且處理完成后將當前主鍵 Put 到 HBase 表中。由于數據量比較大,所以一定要提前對 HBase 表進行預分區,將壓力分散到各個 RegionServer 上。

使用 HBase rowkey 去重帶來的問題

一天 100 億的數據量,平均一秒 11.57 萬條日志。但是數據一般都會有高峰期,例如外賣軟件高峰期肯定是飯前的一兩個小時,其余時間段數據量相對比較少。所以雖然每天 100 億數據量,但是到了數據高峰期每秒數據量可以達到 20 萬左右。按照之前的思路,每條數據來了都會對 HBase 進行一次 Get 操作,當前數據處理完還會對 HBase 進行一次 Put 操作,所以每秒需要對 HBase 請求 40 萬次。單個的 Get 和 Put 請求效率比較低,這里可以優化為批量操作的 API 或異步 API 來提高訪問 HBase 的效率。

性能問題優化后,再分析這里使用 HBase 去重到底能不能保證 Exactly Once?拿計算 PV 的案例來講。

假如 PV 信息維護在 Flink 的狀態中,通過冪等性將 PV 統計結果寫入到 Redis 供其他業務方查詢實時統計的 PV 值。如下圖所示,Flink 處理完日志 b 后進行 Checkpoint,將 PV = 2 和 Kafka 對應的 offset 信息保存起來,此時 HBase 表中有兩條 rowkey 分別是 a、b,表示主鍵為 a 和 b 的日志已經被處理過了。

接著往后處理,當處理完日志 d 以后,PV = 4,HBase 表中有 4 條 rowkey 分別是 a、b、c、d,表示主鍵為 a、b、c、d 的日志已經被處理過了。但此時機器突然故障,導致 Flink 任務掛掉,如右圖所示 Flink 任務會從最近一次成功的 Checkpoint 處恢復任務,從日志 b 之后的位置開始消費,且 PV 恢復為 2,因為處理完日志 b 時 PV 為 2。

但由于 HBase 中的數據不是由 Flink 來維護,所以無法恢復到 Checkpoint 時的狀態。所以 Flink 任務恢復后,PV = 2 且 HBase 中 rowkey 為 a、b、c、d。此時 Flink 任務從日志 c 開始繼續處理數據,當處理日志 c 和 d 時,Flink 任務會先查詢 HBase,發現 HBase 中已經保存了主鍵 c 和 d,所以認為日志 c 和 d 已經被處理了,會將日志 c 和 d 過濾掉,于是就產生了丟數據的現象,日志 c 和 d 其實并沒有參與 PV 的計算。

image

同學們可能會想,日志 c 和 d 已經被處理過了,此時就算從 Checkpoint 處恢復,PV 值也應該為 4,不應該是 2。請注意上述方案,筆者描述的是 PV 信息維護在 Flink 的狀態中,所以從 Checkpoint 處恢復任務時,會將 Checkpoint 時狀態中保存的 PV 信息恢復,所以恢復為 2。

當然還有其他統計 PV 的方式,不需要將 PV 信息維護在 Flink 狀態中,而是僅僅在 Redis 中保存 PV 結果,每處理一條數據,將 Redis 中的 PV 值加一即可。如下圖所示,PV 不維護在狀態中,所以當處理完日志 b 進行 Checkpoint 時,只會將當前消費的 offset 信息維護起來。處理完日志 d 以后,由于機器故障,Flink 任務掛掉,任務依然會從日志 b 之后開始消費,此時 Redis 中保存的 PV=4,且 HBase 中保存的 rowkey 信息為 a、b、c、d。緊接著開始處理 c 和 d,因為 HBase 中保存了主鍵 c、d,因此不會重復處理日志 c、d,因此 PV 值計算正確,也不會出現重復消費的問題。

image

這種策略貌似沒有問題,但是問題百出。我們的任務處理元素 d 需要兩個操作:

① 將 Redis 中 PV 值加一 ② 將主鍵 id 加入到 HBase

由于 Redis 和 HBase 都不支持事務,所以以上兩個操作并不能保障原子性。如果代碼中先執行步驟 ①,可能會造成 ① 執行成功 ② 還未執行成功,那么恢復任務時 PV=4,HBase 中保存主鍵 a、b、c,此時日志 d 就會重復計算,就會造成 PV 值計算偏高的問題。如果代碼中先執行步驟 ②,可能會造成 ② 執行成功 ① 還未執行成功,那么恢復任務時 PV=3,HBase 中保存主鍵 a、b、c、d,此時日志 d 就會被漏計算,就會造成 PV 值計算偏低的問題。這里只是拿 HBase 舉例而已,上述情況中外部的任何存儲介質維護 set 集合都不能保證 Exactly Once,因為 Flink 從 Checkpoint 處恢復時,外部存儲介質并不能恢復到 Checkpoint 時的狀態。既然外部存儲介質不能恢復到 Checkpoint 時的狀態,那使用 Flink 內置的狀態后端可以嗎?當然可以!!!

使用 Flink 的 KeyedState 實現去重

使用 Flink 狀態來維護 set 集合的優勢

總結

以上是生活随笔為你收集整理的千万数据去重_基于 Flink 的百亿数据去重实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。