千万数据去重_基于 Flink 的百亿数据去重实践
在工作中經(jīng)常會(huì)遇到去重的場(chǎng)景,例如基于 App 的用戶行為日志分析系統(tǒng),用戶的行為日志從手機(jī)客戶端上報(bào)到 Nginx 服務(wù)端,通過(guò) Logstash、Flume 或其他工具將日志從 Nginx 寫入到 Kafka 中。
由于用戶手機(jī)客戶端的網(wǎng)絡(luò)可能出現(xiàn)不穩(wěn)定,所以手機(jī)客戶端上傳日志的策略是:寧可重復(fù)上報(bào),也不能丟日志。所以導(dǎo)致 Kafka 中必然會(huì)出現(xiàn)日志重復(fù)的情況,即:同一條日志出現(xiàn)了 2 條或 2 條以上。
通常情況下,Flink 任務(wù)的數(shù)據(jù)源都是 Kafka,若 Kafka 中數(shù)據(jù)出現(xiàn)了重復(fù),在實(shí)時(shí) ETL 或者流計(jì)算時(shí)都需要考慮對(duì)日志主鍵進(jìn)行去重,否則會(huì)導(dǎo)致流計(jì)算結(jié)果偏高或結(jié)果不準(zhǔn)確的問(wèn)題,例如用戶 a 在某個(gè)頁(yè)面只點(diǎn)擊了一次,但由于日志重復(fù)上報(bào),所以用戶 a 在該頁(yè)面的點(diǎn)擊日志在 Kafka 中出現(xiàn)了 2 次,最后統(tǒng)計(jì)該頁(yè)面的 click 數(shù)時(shí),結(jié)果就會(huì)偏高。
這里只闡述了一種可能造成 Kafka 中數(shù)據(jù)重復(fù)的情況,在生產(chǎn)環(huán)境中很多情況都可能造成 Kafka 中數(shù)據(jù)重復(fù),這里不一一列舉,本節(jié)主要講述出現(xiàn)了數(shù)據(jù)重復(fù)后,該如何處理。
實(shí)現(xiàn)去重的通用解決方案
Kafka 中數(shù)據(jù)出現(xiàn)重復(fù)后,各種解決方案都比較類似,一般需要一個(gè)全局 set 集合來(lái)維護(hù)歷史所有數(shù)據(jù)的主鍵。當(dāng)處理新日志時(shí),需要拿到當(dāng)前日志的主鍵與歷史數(shù)據(jù)的 set 集合按照規(guī)則進(jìn)行比較,若 set 集合中已經(jīng)包含了當(dāng)前日志的主鍵,說(shuō)明當(dāng)前日志在之前已經(jīng)被處理過(guò)了,則當(dāng)前日志應(yīng)該被過(guò)濾掉,否則認(rèn)為當(dāng)前日志不應(yīng)該被過(guò)濾應(yīng)該被處理,而且處理完成后需要將新日志的主鍵加入到 set 集合中,set 集合永遠(yuǎn)存放著所有已經(jīng)被處理過(guò)的數(shù)據(jù)。程序流程圖如下圖所示:
image
處理流程很簡(jiǎn)單,關(guān)鍵在于如何維護(hù)這個(gè) set 集合,可以簡(jiǎn)單估算一下這個(gè) set 集合需要占用多大空間。本小節(jié)要解決的問(wèn)題是百億數(shù)據(jù)去重,所以就按照每天 1 百億的數(shù)據(jù)量來(lái)計(jì)算。
由于每天數(shù)據(jù)量巨大,因此主鍵占用空間通常會(huì)比較大,如果主鍵占用空間小意味著表示的數(shù)據(jù)范圍就比較小,就可能導(dǎo)致主鍵沖突,例如:4 個(gè)字節(jié)的 int 類型表示數(shù)據(jù)范圍是為 -2147483648~2147483647,總共可以表示 42 億個(gè)數(shù),如果這里每天百億的數(shù)據(jù)量選用 int 類型做為主鍵的話,很明顯會(huì)有大量的主鍵發(fā)生沖突,會(huì)將不重復(fù)的數(shù)據(jù)認(rèn)為是發(fā)生了重復(fù)。
用戶的行為日志是在手機(jī)客戶端生成的,沒(méi)有全局發(fā)號(hào)器,一般會(huì)選取 UUID 做為日志的主鍵,UUID 會(huì)生成 36 位的字符串,例如:"f106c4a1-4c6f-41c1-9d30-bbb2b271284a"。每個(gè)主鍵占用 36 字節(jié),每天 1 百億數(shù)據(jù),36 字節(jié) * 100億 ≈ 360GB。這僅僅是一天的數(shù)據(jù)量,所以該 set 集合要想存儲(chǔ)空間不發(fā)生持續(xù)地爆炸式增長(zhǎng),必須增加一個(gè)功能,那就是給所有的主鍵增加 ttl(Time To Live的縮寫,即:過(guò)期時(shí)間)。
如果不增加 ttl,10 天數(shù)據(jù)量的主鍵占用空間就 3.6T,100 天數(shù)據(jù)量的主鍵占用空間 36T,所以在設(shè)計(jì)之初必須考慮為主鍵設(shè)定 ttl。如果要求按天進(jìn)行去重或者認(rèn)為日志發(fā)生重復(fù)上報(bào)的時(shí)間間隔不可能大于 24 小時(shí),那么為了系統(tǒng)的可靠性 ttl 可以設(shè)置為 36 小時(shí)。每天數(shù)據(jù)量 1 百億,且 set 集合中存放著 36 小時(shí)的數(shù)據(jù)量,即 100 億 * 1.5 = 150 億,所以 set 集合中需要維護(hù) 150 億的數(shù)據(jù)量。
且 set 集合中每條數(shù)據(jù)都增加了 ttl,意味著 set 集合需要為每條數(shù)據(jù)再附帶保存一個(gè)時(shí)間戳,來(lái)確定該數(shù)據(jù)什么時(shí)候過(guò)期。例如 Redis 中為一個(gè) key 設(shè)置了 ttl,如果沒(méi)有為這個(gè) key 附帶時(shí)間戳,那么根本無(wú)法判斷該 key 什么時(shí)候應(yīng)該被清理。所以在考慮每條數(shù)據(jù)占用空間時(shí),不僅要考慮數(shù)據(jù)本身,還需要考慮是否需要其他附帶的存儲(chǔ)。主鍵本身占用 36 字節(jié)加上 long 類型的時(shí)間戳 8 字節(jié),所以每條數(shù)據(jù)至少需要占用 44 字節(jié),150 億 * 44 字節(jié) = 660GB。所以每天百億的數(shù)據(jù)量,如果我們使用 set 集合的方案來(lái)實(shí)現(xiàn),至少需要占用 660GB 以上的存儲(chǔ)空間。
使用 BloomFilter 來(lái)實(shí)現(xiàn)去重
有些流計(jì)算的場(chǎng)景對(duì)準(zhǔn)確性要求并不是很高,例如傳統(tǒng)的 Labmda 架構(gòu)中,都會(huì)有離線去矯正實(shí)時(shí)計(jì)算的結(jié)果,所以根據(jù)業(yè)務(wù)場(chǎng)景,當(dāng)業(yè)務(wù)要求可以接受結(jié)果有小量誤差時(shí),可以選擇使用一些低成本的數(shù)據(jù)結(jié)構(gòu)。BloomFilter 和 HyperLogLog 都是相對(duì)低成本的數(shù)據(jù)結(jié)構(gòu),分別有自己的應(yīng)用場(chǎng)景,且兩種數(shù)據(jù)結(jié)構(gòu)都有一定誤差。
HyperLogLog 可以估算出 HyperLogLog 中插入了多少個(gè)不重復(fù)的元素,而不能告訴我們之前是否插入了哪些元素。BloomFilter 則恰好相反,比起 BloomFilter 更像是一個(gè) set 集合,BloomFilter 可以告訴你 BloomFilter 中肯定不包含元素 a,或者告訴你 BloomFilter 中可能包含元素 b,但 BloomFilter 不能告訴你 BloomFilter 中插入了多少個(gè)元素。接下來(lái)了解一下 BloomFilter 的實(shí)現(xiàn)原理。
bitmap 位圖
了解 BloomFilter,從 bitmap(位圖)開(kāi)始說(shuō)起。現(xiàn)在有 1 千萬(wàn)個(gè)整數(shù),數(shù)據(jù)范圍在 0 到 2 千萬(wàn)之間。如何快速查找某個(gè)整數(shù)是否在這 1 千萬(wàn)個(gè)整數(shù)中呢?可以將這 1 千萬(wàn)個(gè)數(shù)保存在 HashMap 中,不考慮對(duì)象頭及其他空間,1000 萬(wàn)個(gè) int 類型數(shù)據(jù)需要占用大約 1000 萬(wàn) * 4 字節(jié) ≈ 40MB 存儲(chǔ)空間。有沒(méi)有其他方案呢?因?yàn)閿?shù)據(jù)范圍是 0 到 2 千萬(wàn),所以如下圖所示,可以申請(qǐng)一個(gè)長(zhǎng)度為 2000 萬(wàn)、boolean 類型的數(shù)組。將這 1 千萬(wàn)個(gè)整數(shù)作為數(shù)組下標(biāo),將其對(duì)應(yīng)的數(shù)組值設(shè)置成 true,如下圖所示,數(shù)組下標(biāo)為 2、666、999 的位置存儲(chǔ)的數(shù)據(jù)為 true,表示 1 千萬(wàn)個(gè)數(shù)中包含了 2、666、999 等。當(dāng)查詢某個(gè)整數(shù) K 是否在這 1 千萬(wàn)個(gè)整數(shù)中時(shí),只需要將對(duì)應(yīng)的數(shù)組值 array[K] 取出來(lái),看是否等于 true。如果等于 true,說(shuō)明 1 千萬(wàn)整數(shù)中包含這個(gè)整數(shù) K,否則表示不包含這個(gè)整數(shù) K。
image
Java 的 boolean 基本類型占用一個(gè)字節(jié)(8bit)的內(nèi)存空間,所以上述方案需要申請(qǐng) 2000 萬(wàn)字節(jié)。如下圖所示,可以通過(guò)編程語(yǔ)言用二進(jìn)制位來(lái)模擬布爾類型,二進(jìn)制的 1 表示 true、二進(jìn)制的 0 表示 false。通過(guò)二進(jìn)制模擬布爾類型的方案,只需要申請(qǐng) 2000 萬(wàn) bit 即可,相比 boolean 類型而言,存儲(chǔ)空間占用僅為原來(lái)的 1/8。2000 萬(wàn) bit ≈ 2.4MB,相比存儲(chǔ)原始數(shù)據(jù)的方案 40 MB 而言,占用的存儲(chǔ)空間少了很多。
image
假如這 1 千萬(wàn)個(gè)整數(shù)的數(shù)據(jù)范圍是 0 到 100 億,那么就需要申請(qǐng) 100 億個(gè) bit 約等于 1200MB,比存儲(chǔ)原始數(shù)據(jù)方案的 40MB 還要大很多。該情況下,直接使用位圖使用的存儲(chǔ)空間更多了,怎么解決呢?可以只申請(qǐng) 1 億 bit 的存儲(chǔ)空間,對(duì) 1000 萬(wàn)個(gè)數(shù)求hash,映射到 1 億的二進(jìn)制位上,最后大約占用 12 MB 的存儲(chǔ)空間,但是可能存在 hash 沖突的情況。例如 3 和 100000003(一億零三)這兩個(gè)數(shù)對(duì)一億求余都為 3,所以映射到長(zhǎng)度為 1 億的位圖上,這兩個(gè)數(shù)會(huì)占用同一個(gè) bit,就會(huì)導(dǎo)致一個(gè)問(wèn)題:1 千萬(wàn)個(gè)整數(shù)中包含了一億零三,所以位圖中下標(biāo)為 3 的位置存儲(chǔ)著二進(jìn)制 1。當(dāng)查詢 1 千萬(wàn)個(gè)整數(shù)中是否包含數(shù)字 3 時(shí),同樣也是去位圖中下標(biāo) 3 的位置去查找,發(fā)現(xiàn)下標(biāo)為 3 的位置存儲(chǔ)著二進(jìn)制 1,所以誤以為 1 千萬(wàn)個(gè)整數(shù)中包含數(shù)字 3。為了減少 hash 沖突,于是誕生了 BloomFilter。
BloomFilter 原理介紹
hash 存在 hash 沖突(碰撞)的問(wèn)題,兩個(gè)不同的 key 通過(guò)同一個(gè) hash 函數(shù)得到的值有可能相同。為了減少?zèng)_突,可以多引入幾個(gè) hash 函數(shù),如果通過(guò)其中的一個(gè) hash 函數(shù)發(fā)現(xiàn)某元素不在集合中,那么該元素肯定不在集合中。當(dāng)所有的 hash 函數(shù)告訴我們?cè)撛卦诩现袝r(shí),才能確定該元素存在于集合中,這便是BloomFilter的基本思想。
如下圖所示,是往 BloomFilter 中插入元素 a、b 的過(guò)程,有 3 個(gè) hash 函數(shù),元素 a 經(jīng)過(guò) 3 個(gè) hash 函數(shù)后對(duì)應(yīng)的 2、8、10 這三個(gè)二進(jìn)制位,所以將這三個(gè)二進(jìn)制位置為 1,元素 b 經(jīng)過(guò) 3 個(gè) hash 函數(shù)后,對(duì)應(yīng)的 5、10、14 這三個(gè)二進(jìn)制位,將這三個(gè)二進(jìn)制位也置為 1,其中下標(biāo)為 10 的二進(jìn)制位被 a、b 元素都涉及到。
image
如下圖所示,是從 BloomFilter 中查找元素 c、d 的過(guò)程,同樣包含了 3 個(gè) hash 函數(shù),元素 c 經(jīng)過(guò) 3 個(gè) hash 函數(shù)后對(duì)應(yīng)的 2、6、9 這三個(gè)二進(jìn)制位,其中下標(biāo) 6 和 9 對(duì)應(yīng)的二進(jìn)制位為 0,所以會(huì)認(rèn)為 BloomFilter 中不存在元素 c。元素 d 經(jīng)過(guò) 3 個(gè) hash 函數(shù)后對(duì)應(yīng)的 5、8、14 這三個(gè)二進(jìn)制位,這三個(gè)位對(duì)應(yīng)的二進(jìn)制位都為 1,所以會(huì)認(rèn)為 BloomFilter 中存在元素 d,但其實(shí) BloomFilter 中并不存在元素 d,是因?yàn)樵?a 和元素 b 也對(duì)應(yīng)到了 5、8、14 這三個(gè)二進(jìn)制位上,所以 BloomFilter 會(huì)有誤判。但是從實(shí)現(xiàn)原理來(lái)看,當(dāng) BloomFilter 告訴你不包含元素 c 時(shí),BloomFilter 中肯定不包含元素 c,當(dāng) BloomFilter 告訴你 BloomFilter 中包含元素 d 時(shí),它只是可能包含,也有可能不包含。
image
使用 BloomFilter 實(shí)現(xiàn)數(shù)據(jù)去重
Redis 4.0 之后 BloomFilter 以插件的形式加入到 Redis 中,關(guān)于 api 的具體使用這里不多贅述。BloomFilter 在創(chuàng)建時(shí)支持設(shè)定一個(gè)預(yù)期容量和誤判率,預(yù)期容量即預(yù)計(jì)插入的數(shù)據(jù)量,誤判率即:當(dāng) BloomFilter 中插入的數(shù)據(jù)達(dá)到預(yù)期容量時(shí),誤判的概率,如果 BloomFilter 中插入數(shù)據(jù)較少的話,誤判率會(huì)更低。
經(jīng)筆者測(cè)試,申請(qǐng)一個(gè)預(yù)期容量為 10 億,誤判率為千分之一的 BloomFilter,BloomFilter 會(huì)申請(qǐng)約 143 億個(gè) bit,即:14G左右,相比之前 660G 的存儲(chǔ)空間小太多了。但是在使用過(guò)程中,需要記錄 BloomFilter 中插入元素的個(gè)數(shù),當(dāng)插入元素個(gè)數(shù)達(dá)到 10 億時(shí),為了保障誤差率,可以將當(dāng)前 BloomFilter 清除,重新申請(qǐng)一個(gè)新的 BloomFilter。
通過(guò)使用 Redis 的 BloomFilter,我們可以通過(guò)相對(duì)較小的內(nèi)存實(shí)現(xiàn)百億數(shù)據(jù)的去重,但是 BloomFilter 有誤差,所以只能使用在那些對(duì)結(jié)果能承受一定誤差的應(yīng)用場(chǎng)景,對(duì)于廣告計(jì)費(fèi)等對(duì)數(shù)據(jù)精度要求非常高的場(chǎng)景,極力推薦大家使用精準(zhǔn)去重的方案來(lái)實(shí)現(xiàn)。
使用 HBase 維護(hù)全局 set 實(shí)現(xiàn)去重
通過(guò)之前分析,我們知道要想實(shí)現(xiàn)百億數(shù)據(jù)量的精準(zhǔn)去重,需要維護(hù) 150 億數(shù)據(jù)量的 set 集合,每條數(shù)據(jù)占用 44 KB,總共需要 660 GB 的存儲(chǔ)空間。注意這里說(shuō)的是存儲(chǔ)空間而不是內(nèi)存空間,為什么呢?因?yàn)?660G 的內(nèi)存實(shí)在是太貴了,660G 的 Redis 云服務(wù)一個(gè)月至少要 2 萬(wàn) RMB 以上,俗話說(shuō)設(shè)計(jì)架構(gòu)不考慮成本等于耍流氓。這里使用 Redis 確實(shí)可以解決問(wèn)題,但是成本較高。HBase 基于 rowkey Get 的效率比較高,所以這里可以考慮將這個(gè)大的 set 集合以 HBase rowkey 的形式存放到 HBase 中。HBase 表設(shè)置 ttl 為 36 小時(shí),最近 36 小時(shí)的 150 億條日志的主鍵都存放到 HBase 中,每來(lái)一條數(shù)據(jù),先拿到主鍵去 HBase 中查詢,如果 HBase 表中存在該主鍵,說(shuō)明當(dāng)前日志已經(jīng)被處理過(guò)了,當(dāng)前日志應(yīng)該被過(guò)濾。如果 HBase 表中不存在該主鍵,說(shuō)明當(dāng)前日志之前沒(méi)有被處理過(guò),此時(shí)應(yīng)該被處理,且處理完成后將當(dāng)前主鍵 Put 到 HBase 表中。由于數(shù)據(jù)量比較大,所以一定要提前對(duì) HBase 表進(jìn)行預(yù)分區(qū),將壓力分散到各個(gè) RegionServer 上。
使用 HBase rowkey 去重帶來(lái)的問(wèn)題
一天 100 億的數(shù)據(jù)量,平均一秒 11.57 萬(wàn)條日志。但是數(shù)據(jù)一般都會(huì)有高峰期,例如外賣軟件高峰期肯定是飯前的一兩個(gè)小時(shí),其余時(shí)間段數(shù)據(jù)量相對(duì)比較少。所以雖然每天 100 億數(shù)據(jù)量,但是到了數(shù)據(jù)高峰期每秒數(shù)據(jù)量可以達(dá)到 20 萬(wàn)左右。按照之前的思路,每條數(shù)據(jù)來(lái)了都會(huì)對(duì) HBase 進(jìn)行一次 Get 操作,當(dāng)前數(shù)據(jù)處理完還會(huì)對(duì) HBase 進(jìn)行一次 Put 操作,所以每秒需要對(duì) HBase 請(qǐng)求 40 萬(wàn)次。單個(gè)的 Get 和 Put 請(qǐng)求效率比較低,這里可以優(yōu)化為批量操作的 API 或異步 API 來(lái)提高訪問(wèn) HBase 的效率。
性能問(wèn)題優(yōu)化后,再分析這里使用 HBase 去重到底能不能保證 Exactly Once?拿計(jì)算 PV 的案例來(lái)講。
假如 PV 信息維護(hù)在 Flink 的狀態(tài)中,通過(guò)冪等性將 PV 統(tǒng)計(jì)結(jié)果寫入到 Redis 供其他業(yè)務(wù)方查詢實(shí)時(shí)統(tǒng)計(jì)的 PV 值。如下圖所示,Flink 處理完日志 b 后進(jìn)行 Checkpoint,將 PV = 2 和 Kafka 對(duì)應(yīng)的 offset 信息保存起來(lái),此時(shí) HBase 表中有兩條 rowkey 分別是 a、b,表示主鍵為 a 和 b 的日志已經(jīng)被處理過(guò)了。
接著往后處理,當(dāng)處理完日志 d 以后,PV = 4,HBase 表中有 4 條 rowkey 分別是 a、b、c、d,表示主鍵為 a、b、c、d 的日志已經(jīng)被處理過(guò)了。但此時(shí)機(jī)器突然故障,導(dǎo)致 Flink 任務(wù)掛掉,如右圖所示 Flink 任務(wù)會(huì)從最近一次成功的 Checkpoint 處恢復(fù)任務(wù),從日志 b 之后的位置開(kāi)始消費(fèi),且 PV 恢復(fù)為 2,因?yàn)樘幚硗耆罩?b 時(shí) PV 為 2。
但由于 HBase 中的數(shù)據(jù)不是由 Flink 來(lái)維護(hù),所以無(wú)法恢復(fù)到 Checkpoint 時(shí)的狀態(tài)。所以 Flink 任務(wù)恢復(fù)后,PV = 2 且 HBase 中 rowkey 為 a、b、c、d。此時(shí) Flink 任務(wù)從日志 c 開(kāi)始繼續(xù)處理數(shù)據(jù),當(dāng)處理日志 c 和 d 時(shí),Flink 任務(wù)會(huì)先查詢 HBase,發(fā)現(xiàn) HBase 中已經(jīng)保存了主鍵 c 和 d,所以認(rèn)為日志 c 和 d 已經(jīng)被處理了,會(huì)將日志 c 和 d 過(guò)濾掉,于是就產(chǎn)生了丟數(shù)據(jù)的現(xiàn)象,日志 c 和 d 其實(shí)并沒(méi)有參與 PV 的計(jì)算。
image
同學(xué)們可能會(huì)想,日志 c 和 d 已經(jīng)被處理過(guò)了,此時(shí)就算從 Checkpoint 處恢復(fù),PV 值也應(yīng)該為 4,不應(yīng)該是 2。請(qǐng)注意上述方案,筆者描述的是 PV 信息維護(hù)在 Flink 的狀態(tài)中,所以從 Checkpoint 處恢復(fù)任務(wù)時(shí),會(huì)將 Checkpoint 時(shí)狀態(tài)中保存的 PV 信息恢復(fù),所以恢復(fù)為 2。
當(dāng)然還有其他統(tǒng)計(jì) PV 的方式,不需要將 PV 信息維護(hù)在 Flink 狀態(tài)中,而是僅僅在 Redis 中保存 PV 結(jié)果,每處理一條數(shù)據(jù),將 Redis 中的 PV 值加一即可。如下圖所示,PV 不維護(hù)在狀態(tài)中,所以當(dāng)處理完日志 b 進(jìn)行 Checkpoint 時(shí),只會(huì)將當(dāng)前消費(fèi)的 offset 信息維護(hù)起來(lái)。處理完日志 d 以后,由于機(jī)器故障,Flink 任務(wù)掛掉,任務(wù)依然會(huì)從日志 b 之后開(kāi)始消費(fèi),此時(shí) Redis 中保存的 PV=4,且 HBase 中保存的 rowkey 信息為 a、b、c、d。緊接著開(kāi)始處理 c 和 d,因?yàn)?HBase 中保存了主鍵 c、d,因此不會(huì)重復(fù)處理日志 c、d,因此 PV 值計(jì)算正確,也不會(huì)出現(xiàn)重復(fù)消費(fèi)的問(wèn)題。
image
這種策略貌似沒(méi)有問(wèn)題,但是問(wèn)題百出。我們的任務(wù)處理元素 d 需要兩個(gè)操作:
① 將 Redis 中 PV 值加一 ② 將主鍵 id 加入到 HBase
由于 Redis 和 HBase 都不支持事務(wù),所以以上兩個(gè)操作并不能保障原子性。如果代碼中先執(zhí)行步驟 ①,可能會(huì)造成 ① 執(zhí)行成功 ② 還未執(zhí)行成功,那么恢復(fù)任務(wù)時(shí) PV=4,HBase 中保存主鍵 a、b、c,此時(shí)日志 d 就會(huì)重復(fù)計(jì)算,就會(huì)造成 PV 值計(jì)算偏高的問(wèn)題。如果代碼中先執(zhí)行步驟 ②,可能會(huì)造成 ② 執(zhí)行成功 ① 還未執(zhí)行成功,那么恢復(fù)任務(wù)時(shí) PV=3,HBase 中保存主鍵 a、b、c、d,此時(shí)日志 d 就會(huì)被漏計(jì)算,就會(huì)造成 PV 值計(jì)算偏低的問(wèn)題。這里只是拿 HBase 舉例而已,上述情況中外部的任何存儲(chǔ)介質(zhì)維護(hù) set 集合都不能保證 Exactly Once,因?yàn)?Flink 從 Checkpoint 處恢復(fù)時(shí),外部存儲(chǔ)介質(zhì)并不能恢復(fù)到 Checkpoint 時(shí)的狀態(tài)。既然外部存儲(chǔ)介質(zhì)不能恢復(fù)到 Checkpoint 時(shí)的狀態(tài),那使用 Flink 內(nèi)置的狀態(tài)后端可以嗎?當(dāng)然可以!!!
使用 Flink 的 KeyedState 實(shí)現(xiàn)去重
使用 Flink 狀態(tài)來(lái)維護(hù) set 集合的優(yōu)勢(shì)
總結(jié)
以上是生活随笔為你收集整理的千万数据去重_基于 Flink 的百亿数据去重实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: STC8F2K08S2
- 下一篇: 性与潜能:性能量是一切天才的创造力源泉