日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

关于kafka中的timestamp与offset的对应关系

發(fā)布時(shí)間:2024/1/23 编程问答 58 豆豆
生活随笔 收集整理的這篇文章主要介紹了 关于kafka中的timestamp与offset的对应关系 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

關(guān)于kafka中的timestamp與offset的對應(yīng)關(guān)系

@(KAFKA)[storm, kafka, 大數(shù)據(jù)]

  • 關(guān)于kafka中的timestamp與offset的對應(yīng)關(guān)系
    • 獲取單個分區(qū)的情況
    • 同時(shí)從所有分區(qū)獲取消息的情況
    • 結(jié)論
      • 如何指定時(shí)間
      • 出現(xiàn)UpdateOffsetException時(shí)的處理方法
    • 相關(guān)源碼略讀
      • 1入口
      • 2處理邏輯
        • 1建立offset與timestamp的對應(yīng)關(guān)系并保存到數(shù)據(jù)中
        • 2找到最近的最后一個滿足 timestamp target_timestamp 的 index
        • 3找到滿足該條件的offset數(shù)組
      • 3注意事項(xiàng)

獲取單個分區(qū)的情況

kafka通過offset記錄每條日志的偏移量,詳見《Kafka文件存儲機(jī)制那些事》。但是當(dāng)用戶想讀取之前的信息時(shí),他是不可能知道這些消息對應(yīng)的offset的,用戶只能指定時(shí)間,比如說我從昨天的12點(diǎn)開始讀取消息。

這就有個問題了,怎么樣將用戶定義的時(shí)間轉(zhuǎn)化為集群內(nèi)部的offset呢?

先簡單重溫一下kafka的物理存儲機(jī)制:每個topic分成多個分區(qū),而一個分區(qū)對應(yīng)磁盤中的一個目錄,目錄中會有多個文件,比如:

00000000000000000000.index 00000000000000000000.log 00000000000001145974.index 00000000000001145974.log

可以看出來,每個segment file其實(shí)有2部分,一個index文件,一個log文件。文件名是這個文件內(nèi)的第一個消息的offset。log文件記錄的是實(shí)際的消息內(nèi)容。而index對log文件作了索引,當(dāng)需要查看某個消息時(shí),如果指定offset,很容易就定位到log文件中的具體位置。詳見上面說的文章。

但正如剛才所說,用戶不知道offset,而只知道時(shí)間,所以就需要轉(zhuǎn)換了。

kafka用了一個很直觀很簡單的方法:將文件名中的offset與文件的最后修改時(shí)間放入一個map中,然后再查找。詳細(xì)步驟如下:
(1)將文件名及文件的最后時(shí)間放入一個map中,時(shí)間使用的是13位的unix時(shí)間戳
(2)當(dāng)用戶指定一個時(shí)間t0時(shí),在map中找到最后一個時(shí)間t1早于t0的時(shí)間,然后返回這個文件名,即這個文件的第一個offset。
(3)這里只返回了一個分區(qū)的offset,而事實(shí)上需要返回所有分區(qū)的offset,所以對所有分區(qū)采取上述步驟。
(4)使用取到的消息,開始消費(fèi)消息。

舉個例子:

w-r--r-- 1 hadoop hadoop 1073181076 8?? 11 10:20 00000000000066427499.log -rw-r--r-- 1 hadoop hadoop 14832 8?? 11 10:20 00000000000066427499.index -rw-r--r-- 1 hadoop hadoop 1073187364 8?? 11 10:40 00000000000067642947.log -rw-r--r-- 1 hadoop hadoop 14872 8?? 11 10:40 00000000000067642947.index -rw-r--r-- 1 hadoop hadoop 1073486959 8?? 11 11:04 00000000000068857698.log -rw-r--r-- 1 hadoop hadoop 14928 8?? 11 11:04 00000000000068857698.index -rw-r--r-- 1 hadoop hadoop 1073511817 8?? 11 11:25 00000000000070069880.log -rw-r--r-- 1 hadoop hadoop 14920 8?? 11 11:25 00000000000070069880.index -rw-r--r-- 1 hadoop hadoop 10485760 8?? 11 11:28 00000000000071279203.index -rw-r--r-- 1 hadoop hadoop 148277228 8?? 11 11:28 00000000000071279203.log

我們有上述幾個文件
(1)當(dāng)我需要消費(fèi)從8月11日11:00開始的數(shù)據(jù)時(shí),它會返回最后修改時(shí)間早于8月11日11:00的文件名,此外是修改時(shí)間第10:40的文件,offset為67642947.其實(shí)由于它的最后修改時(shí)間在10:40,我們需要的數(shù)據(jù)不可能在它里面,它直接返回11:40的文件即可,但可能是出于更保險(xiǎn)的考慮,它返回了上一個文件。
(2)其它類似,當(dāng)我消費(fèi)11:20的數(shù)據(jù),返回的offset為68857698.
(3)而當(dāng)我消費(fèi)的數(shù)據(jù)早于10:20的話,則返回的offset為空,如果是通過數(shù)組保存offset的,則提取第一個offset時(shí)會出現(xiàn) java.lang.ArrayIndexOutOfBoundsException 異常。如在kafka編程指南中的SimpleConsumer中的代碼:

long[] offsets = response.offsets(topic, partition);return offsets[0];

當(dāng)然,也可以合理處理,當(dāng)返回為空時(shí),直接返回最早的offset即可。

(4)當(dāng)消費(fèi)的數(shù)據(jù)晚于最晚時(shí)刻,返回最新的消息。

注意:
(1)這里對kafka集群本身沒有任何的負(fù)擔(dān),kafka消息也不需要記錄時(shí)間點(diǎn)這個字段,只有在需要定位的時(shí)候,才臨時(shí)構(gòu)建一個map,然后將offset與時(shí)間讀入這個map中。
(2)冗余很多消息。這種方法粒度非常粗,是以文件作為粒度的,因此冗余的消息數(shù)據(jù)和文件的大小有關(guān)系,默認(rèn)為1G。如果這個topic的數(shù)據(jù)非常少,則這1G的數(shù)據(jù)可以就是幾天前的數(shù)據(jù)了。
(3)有2個特殊的時(shí)間點(diǎn):
需要查找的 timestamp 是 -1 或者 -2時(shí),特殊處理

case OffsetRequest.LatestTime => // OffsetRequest.LatestTime = -1 startIndex = offsetTimeArray.length -1 case OffsetRequest.EarliestTime => // OffsetRequest.EarliestTime = -2startIndex =0

同時(shí)從所有分區(qū)獲取消息的情況

1、當(dāng)同時(shí)從多個分區(qū)讀取消息時(shí),只要有其中一個分區(qū),它的所有文件的修改時(shí)間均晚于你指定的時(shí)間,就會出錯,因?yàn)檫@個分區(qū)返回的offset為空,除非你作了合理的處理。

2、storm!!!
storm0.9x版本遇到上述問題時(shí),同樣會出錯,出現(xiàn)以下異常

storm.kafka.UpdateOffsetException

而從0.10版本開始,改為了從最早時(shí)間開始消費(fèi)消息。

3、還有個問題,如何將消息均勻的分布但各個分區(qū)中。比如在我們一個topic中,其中一個分區(qū)已經(jīng)有60G數(shù)據(jù),而另一個分區(qū)還不足2G,如果指定時(shí)間的話,由于小的那個分區(qū)的修改時(shí)間肯定是在近期的,所以當(dāng)指定一個較前的時(shí)間點(diǎn)就會出錯。而且即使不出錯,從不同分區(qū)返回的消息也可能時(shí)間相差很遠(yuǎn)。

如何將數(shù)據(jù)均勻的分布到各個分區(qū),請參考kafka編程指南的partitioner介紹。

只要出現(xiàn)這個問題,都是由于數(shù)據(jù)不存在,有可能是:

(1)數(shù)據(jù)真的丟失了

(2)數(shù)據(jù)傾斜嚴(yán)重

結(jié)論

如何指定時(shí)間

如果需要指定從某個時(shí)間點(diǎn)開始處理日志,則:

(1)就指定那個時(shí)間即可,不需要提前,因此返回的消息一定是在這個時(shí)間點(diǎn)之前的。

(2)如果這個時(shí)間點(diǎn)是繁忙時(shí)段,它返回的消息時(shí)間可能只是這個時(shí)間點(diǎn)之前的一小段時(shí)間。

(3)如果這個時(shí)間點(diǎn)是個空閑時(shí)間,它返回的日志時(shí)間可能是很長一段時(shí)間的日志。

(4)但不管是繁忙時(shí)間還是空閑時(shí)間,它都是多讀一個日志文件,所以冗余的日志數(shù)量是相同的。

舉個例子:

如果需要處理2015-08-15 15:00:00后的日志,則

(1)直接指定這個時(shí)間即可,不需要指定它之前的時(shí)間,如1:00, 2:00之類的,因?yàn)榉祷氐娜罩緯r(shí)間決不會是3:00前的。同時(shí),3:00進(jìn)入kafka的數(shù)據(jù)有可能是3:00前的數(shù)據(jù),決不會是3:00后的數(shù)據(jù),所以也不需要考慮指定提前時(shí)間。

(2)由于這個時(shí)間日志一般較多,它返回的日志可能是2:30左右開始的。相反,如果是凌晨3:00,由于這個時(shí)間點(diǎn)日志較少,它返回的日志有可以是2、3小時(shí)前的。

出現(xiàn)UpdateOffsetException時(shí)的處理方法

(1)若是小項(xiàng)目,由于數(shù)據(jù)量不多,建議同頭開始處理,并通知SA檢查。

(2)若是大項(xiàng)目,一般是出現(xiàn)了數(shù)據(jù)傾斜,通知SA檢查數(shù)據(jù)情況。

相關(guān)源碼略讀

1、入口

Kafka Server 處理 Client 發(fā)送來的請求的入口在
文件夾: core/src/main/scala/kafka/server
類:kafka.server.KafkaApis
方法: handle
處理offset請求的函數(shù): handleOffsetRequest

2、處理邏輯

處理邏輯主要分為四步
獲取partition
從partition中獲取offset
high water mark 處理(這一段的資料太少了)
異常處理
由于request中包含查詢多個partition的offset的請求。所以最終會返回一個map,保存有每個partition對應(yīng)的offset
這里主要介紹從某一個partition中獲取offset的邏輯,代碼位置
kafka.log.Log#getOffsetsBefore(timestamp, maxNumOffsets)
從一個partition中獲取offset

(1)建立offset與timestamp的對應(yīng)關(guān)系,并保存到數(shù)據(jù)中

//每個Partition由多個segment file組成。獲取當(dāng)前partition中的segment列表 val segsArray = segments.view// 創(chuàng)建數(shù)組 var offsetTimeArray: Array[(Long, Long)] =null if(segsArray.last.size >0)offsetTimeArray =newArray[(Long, Long)](segsArray.length +1) elseoffsetTimeArray =newArray[(Long, Long)](segsArray.length)// 將 offset 與 timestamp 的對應(yīng)關(guān)系添加到數(shù)組中 for(i <-0until segsArray.length)// 數(shù)據(jù)中的每個元素是一個二元組,(segment file 的起始 offset,segment file的最近修改時(shí)間)offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified) if(segsArray.last.size >0)// 如果最近一個 segment file 不為空,將(最近的 offset, 當(dāng)前之間)也添加到該數(shù)組中offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds) 通過這段邏輯,獲的一個數(shù)據(jù) offsetTimeArray,每個元素是一個二元組,二元組內(nèi)容是(offset, timestamp)

(2)找到最近的最后一個滿足 timestamp < target_timestamp 的 index

var startIndex = -1 timestamp match {// 需要查找的 timestamp 是 -1 或者 -2時(shí),特殊處理caseOffsetRequest.LatestTime => // OffsetRequest.LatestTime = -1startIndex = offsetTimeArray.length -1caseOffsetRequest.EarliestTime => // OffsetRequest.EarliestTime = -2startIndex =0case_ =>var isFound =falsedebug("Offset time array = "+ offsetTimeArray.foreach(o =>"%d, %d".format(o._1, o._2)))startIndex = offsetTimeArray.length -1 // 從最后一個元素反向找while(startIndex >=0&& !isFound) { // 找到滿足條件或者if(offsetTimeArray(startIndex)._2 <= timestamp) // offsetTimeArray 的每個元素是二元組,第二個位置是 timestampisFound =trueelsestartIndex -=1} }

通過這段邏輯,實(shí)際找到的是 “最近修改時(shí)間早于目標(biāo)timestamp的最近修改的segment file的起始o(jì)ffset”
但是獲取offset的邏輯并沒有結(jié)束,后續(xù)仍有處理

(3)找到滿足該條件的offset數(shù)組

實(shí)際上這個函數(shù)的功能是找到一組offset,而不是一個offset。第二個參數(shù) maxNumOffsets 指定最多找?guī)讉€滿足條件的 offset。

獲取一組offset的邏輯 // 返回的數(shù)據(jù)的長度 = min(maxNumOffsets, startIndex + 1),startIndex是邏輯2中找到的index val retSize = maxNumOffsets.min(startIndex +1) val ret = newArray[Long](retSize)// 逐個將滿足條件的offset添加到返回的數(shù)據(jù)中 for(j <-0until retSize) {ret(j) = offsetTimeArray(startIndex)._1startIndex -=1 }// 降序排序返回。offset 越大數(shù)據(jù)越新。 // ensure that the returned seq is in descending order of offsets ret.toSeq.sortBy(- _)

最終返回這個數(shù)組

3、注意事項(xiàng)

實(shí)際找到的offset并不是從目標(biāo)timestamp開始的第一個offset。需要注意
當(dāng) timestamp 小于最老的數(shù)據(jù)文件的最近修改時(shí)間時(shí),返回值是一個空數(shù)組。可能會導(dǎo)致使用時(shí)的問題。
調(diào)整segment file文件拆分策略的配置時(shí),需要注意可能會造成的影響。

總結(jié)

以上是生活随笔為你收集整理的关于kafka中的timestamp与offset的对应关系的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。